TheNeuralBit commented on a change in pull request #15809:
URL: https://github.com/apache/beam/pull/15809#discussion_r740461343
##########
File path: sdks/python/apache_beam/dataframe/frames.py
##########
@@ -1430,6 +1430,72 @@ def corr(self, other, method, min_periods):
[self._expr, other._expr],
requires_partition_by=partitionings.Singleton(reason=reason)))
+ @frame_base.with_docs_from(pd.Series)
+ @frame_base.args_to_kwargs(pd.Series)
+ @frame_base.populate_defaults(pd.Series)
+ def skew(self, axis, skipna, level, numeric_only, **kwargs):
+ if level is not None:
+ raise NotImplementedError("per-level aggregation")
+ if skipna is None or skipna:
+ self = self.dropna() # pylint: disable=self-cls-assignment
+ # See the online, numerically stable formulae at
+ #
https://en.wikipedia.org/wiki/Algorithms_for_calculating_variance#Higher-order_statistics
+ def compute_moments(x):
+ n = len(x)
+ if n == 0:
+ m, s, third_moment = 0, 0, 0
+ elif n < 3:
+ m = x.std(ddof=0)**2 * n
+ s = x.sum()
+ third_moment = (((x - x.mean())**3).sum())
+ else:
+ m = x.std(ddof=0)**2 * n
+ s = x.sum()
+ third_moment = (((x - x.mean())**3).sum())
Review comment:
This uses the same logic as the `elif` block. I think the intention is
to use the pandas `skew` here since we think it is more efficient, right?
Could you also run some microbenchmarks to see if the approach with pandas
`skew` actually is more performant?
##########
File path: sdks/python/apache_beam/dataframe/frames_test.py
##########
@@ -1397,7 +1398,6 @@ def test_series_agg(self, agg_method):
'median',
'sem',
'mad',
- 'skew',
Review comment:
There are a few other places where this is copied, can you make the
change there as well? That should fix the precommit failures
Alternatively, it could be nice to have the list of non parallelizable
aggregations defined in one place in a constant, then just update it there.
##########
File path: sdks/python/apache_beam/dataframe/frames.py
##########
@@ -1430,6 +1430,72 @@ def corr(self, other, method, min_periods):
[self._expr, other._expr],
requires_partition_by=partitionings.Singleton(reason=reason)))
+ @frame_base.with_docs_from(pd.Series)
+ @frame_base.args_to_kwargs(pd.Series)
+ @frame_base.populate_defaults(pd.Series)
+ def skew(self, axis, skipna, level, numeric_only, **kwargs):
+ if level is not None:
+ raise NotImplementedError("per-level aggregation")
+ if skipna is None or skipna:
+ self = self.dropna() # pylint: disable=self-cls-assignment
+ # See the online, numerically stable formulae at
+ #
https://en.wikipedia.org/wiki/Algorithms_for_calculating_variance#Higher-order_statistics
+ def compute_moments(x):
+ n = len(x)
+ if n == 0:
+ m, s, third_moment = 0, 0, 0
+ elif n < 3:
+ m = x.std(ddof=0)**2 * n
+ s = x.sum()
+ third_moment = (((x - x.mean())**3).sum())
+ else:
+ m = x.std(ddof=0)**2 * n
+ s = x.sum()
+ third_moment = (((x - x.mean())**3).sum())
+ return pd.DataFrame(
+ dict(m=[m], s=[s], n=[n], third_moment=[third_moment]))
+
+ def combine_moments(data):
+ m = s = n = third_moment = 0.0
+ for datum in data.itertuples():
+ if datum.n == 0:
+ continue
+ elif n == 0:
+ m, s, n, third_moment = datum.m, datum.s, datum.n, datum.third_moment
+ else:
+ mean_b = s / n
+ mean_a = datum.s / datum.n
+ delta = mean_b - mean_a
+ n_a = datum.n
+ n_b = n
+ combined_n = n + datum.n
+ third_moment += datum.third_moment + (
+ (delta**3 * ((n_a * n_b) * (n_a - n_b)) / ((combined_n)**2)) +
+ ((3 * delta) * ((n_a * m) - (n_b * datum.m)) / (combined_n)))
+ m += datum.m + delta**2 * n * datum.n / (n + datum.n)
+ s += datum.s
+ n += datum.n
Review comment:
nit: you're computing `n + datum.n` in several places. Ccould you just
reference `combined_n` everywhere?
##########
File path: sdks/python/apache_beam/dataframe/frames.py
##########
@@ -1430,6 +1430,72 @@ def corr(self, other, method, min_periods):
[self._expr, other._expr],
requires_partition_by=partitionings.Singleton(reason=reason)))
+ @frame_base.with_docs_from(pd.Series)
+ @frame_base.args_to_kwargs(pd.Series)
+ @frame_base.populate_defaults(pd.Series)
+ def skew(self, axis, skipna, level, numeric_only, **kwargs):
+ if level is not None:
+ raise NotImplementedError("per-level aggregation")
+ if skipna is None or skipna:
+ self = self.dropna() # pylint: disable=self-cls-assignment
+ # See the online, numerically stable formulae at
+ #
https://en.wikipedia.org/wiki/Algorithms_for_calculating_variance#Higher-order_statistics
+ def compute_moments(x):
+ n = len(x)
+ if n == 0:
+ m, s, third_moment = 0, 0, 0
+ elif n < 3:
+ m = x.std(ddof=0)**2 * n
+ s = x.sum()
+ third_moment = (((x - x.mean())**3).sum())
+ else:
+ m = x.std(ddof=0)**2 * n
+ s = x.sum()
+ third_moment = (((x - x.mean())**3).sum())
+ return pd.DataFrame(
+ dict(m=[m], s=[s], n=[n], third_moment=[third_moment]))
+
+ def combine_moments(data):
+ m = s = n = third_moment = 0.0
+ for datum in data.itertuples():
+ if datum.n == 0:
+ continue
+ elif n == 0:
+ m, s, n, third_moment = datum.m, datum.s, datum.n, datum.third_moment
+ else:
+ mean_b = s / n
+ mean_a = datum.s / datum.n
+ delta = mean_b - mean_a
+ n_a = datum.n
+ n_b = n
+ combined_n = n + datum.n
+ third_moment += datum.third_moment + (
+ (delta**3 * ((n_a * n_b) * (n_a - n_b)) / ((combined_n)**2)) +
+ ((3 * delta) * ((n_a * m) - (n_b * datum.m)) / (combined_n)))
+ m += datum.m + delta**2 * n * datum.n / (n + datum.n)
+ s += datum.s
+ n += datum.n
+
+ if n < 3:
+ return float('nan')
+ elif m == 0:
+ return float(0)
+ else:
+ return combined_n * math.sqrt(combined_n - 1) / (combined_n -
+ 2) * third_moment / (
+ m**(3 / 2))
Review comment:
Could you add a comment here that this is scaling for _unbiased_ (aka
sample) skewness, which is the reason it's slightly different from the equation
at
https://en.wikipedia.org/wiki/Algorithms_for_calculating_variance#Higher-order_statistics,
which is producing biased (aka population) skew.
You might also add a link to
https://en.wikipedia.org/wiki/Skewness#Sample_skewness which has definitions
for each.
##########
File path: sdks/python/apache_beam/dataframe/frames.py
##########
@@ -1430,6 +1430,72 @@ def corr(self, other, method, min_periods):
[self._expr, other._expr],
requires_partition_by=partitionings.Singleton(reason=reason)))
+ @frame_base.with_docs_from(pd.Series)
+ @frame_base.args_to_kwargs(pd.Series)
+ @frame_base.populate_defaults(pd.Series)
+ def skew(self, axis, skipna, level, numeric_only, **kwargs):
+ if level is not None:
+ raise NotImplementedError("per-level aggregation")
+ if skipna is None or skipna:
+ self = self.dropna() # pylint: disable=self-cls-assignment
+ # See the online, numerically stable formulae at
+ #
https://en.wikipedia.org/wiki/Algorithms_for_calculating_variance#Higher-order_statistics
+ def compute_moments(x):
+ n = len(x)
+ if n == 0:
+ m, s, third_moment = 0, 0, 0
+ elif n < 3:
+ m = x.std(ddof=0)**2 * n
+ s = x.sum()
+ third_moment = (((x - x.mean())**3).sum())
+ else:
+ m = x.std(ddof=0)**2 * n
+ s = x.sum()
+ third_moment = (((x - x.mean())**3).sum())
+ return pd.DataFrame(
+ dict(m=[m], s=[s], n=[n], third_moment=[third_moment]))
+
+ def combine_moments(data):
+ m = s = n = third_moment = 0.0
+ for datum in data.itertuples():
+ if datum.n == 0:
+ continue
+ elif n == 0:
+ m, s, n, third_moment = datum.m, datum.s, datum.n, datum.third_moment
+ else:
+ mean_b = s / n
+ mean_a = datum.s / datum.n
+ delta = mean_b - mean_a
+ n_a = datum.n
+ n_b = n
+ combined_n = n + datum.n
+ third_moment += datum.third_moment + (
+ (delta**3 * ((n_a * n_b) * (n_a - n_b)) / ((combined_n)**2)) +
+ ((3 * delta) * ((n_a * m) - (n_b * datum.m)) / (combined_n)))
+ m += datum.m + delta**2 * n * datum.n / (n + datum.n)
+ s += datum.s
+ n += datum.n
+
+ if n < 3:
+ return float('nan')
+ elif m == 0:
+ return float(0)
Review comment:
Is it always true that skew is 0 if the second moment is 0? I'm
wondering what happens if m2 is 0 and m3 is non-zero. Perhaps that's not
possible. I think m2 can only be 0 if every element is equal. Does that seem
right @robertwb?
##########
File path: sdks/python/apache_beam/dataframe/frames.py
##########
@@ -1430,6 +1430,72 @@ def corr(self, other, method, min_periods):
[self._expr, other._expr],
requires_partition_by=partitionings.Singleton(reason=reason)))
+ @frame_base.with_docs_from(pd.Series)
+ @frame_base.args_to_kwargs(pd.Series)
+ @frame_base.populate_defaults(pd.Series)
+ def skew(self, axis, skipna, level, numeric_only, **kwargs):
+ if level is not None:
+ raise NotImplementedError("per-level aggregation")
+ if skipna is None or skipna:
+ self = self.dropna() # pylint: disable=self-cls-assignment
+ # See the online, numerically stable formulae at
+ #
https://en.wikipedia.org/wiki/Algorithms_for_calculating_variance#Higher-order_statistics
+ def compute_moments(x):
+ n = len(x)
+ if n == 0:
+ m, s, third_moment = 0, 0, 0
+ elif n < 3:
+ m = x.std(ddof=0)**2 * n
+ s = x.sum()
+ third_moment = (((x - x.mean())**3).sum())
+ else:
+ m = x.std(ddof=0)**2 * n
+ s = x.sum()
+ third_moment = (((x - x.mean())**3).sum())
+ return pd.DataFrame(
+ dict(m=[m], s=[s], n=[n], third_moment=[third_moment]))
+
+ def combine_moments(data):
+ m = s = n = third_moment = 0.0
+ for datum in data.itertuples():
+ if datum.n == 0:
+ continue
+ elif n == 0:
+ m, s, n, third_moment = datum.m, datum.s, datum.n, datum.third_moment
+ else:
+ mean_b = s / n
+ mean_a = datum.s / datum.n
+ delta = mean_b - mean_a
+ n_a = datum.n
+ n_b = n
Review comment:
I think defining `n_a` and `n_b` here just confuses things, for a couple
of reasons:
1. You're not using them everywhere (the `m` calculation still references
the original names).
2. You're only doing it for `n`. What about the other variables that are
being accumulated (`s`, `m`, `third_moment`)?
I think this code should either a) define variables like this for everything
that's being accumulated, and reference them everyhwere, or b) just reference
the original names. It's up to you which you'd prefer. I think (a) improved
clarity substantially, but at the cost of some performance.
Speaking of clarity, maybe `m` should be called `second_moment`, or `m2`, to
distinguish it from the third moment?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]