svetakvsundhar commented on a change in pull request #15809:
URL: https://github.com/apache/beam/pull/15809#discussion_r740620999
##########
File path: sdks/python/apache_beam/dataframe/frames.py
##########
@@ -1430,6 +1430,72 @@ def corr(self, other, method, min_periods):
[self._expr, other._expr],
requires_partition_by=partitionings.Singleton(reason=reason)))
+ @frame_base.with_docs_from(pd.Series)
+ @frame_base.args_to_kwargs(pd.Series)
+ @frame_base.populate_defaults(pd.Series)
+ def skew(self, axis, skipna, level, numeric_only, **kwargs):
+ if level is not None:
+ raise NotImplementedError("per-level aggregation")
+ if skipna is None or skipna:
+ self = self.dropna() # pylint: disable=self-cls-assignment
+ # See the online, numerically stable formulae at
+ #
https://en.wikipedia.org/wiki/Algorithms_for_calculating_variance#Higher-order_statistics
+ def compute_moments(x):
+ n = len(x)
+ if n == 0:
+ m, s, third_moment = 0, 0, 0
+ elif n < 3:
+ m = x.std(ddof=0)**2 * n
+ s = x.sum()
+ third_moment = (((x - x.mean())**3).sum())
+ else:
+ m = x.std(ddof=0)**2 * n
+ s = x.sum()
+ third_moment = (((x - x.mean())**3).sum())
+ return pd.DataFrame(
+ dict(m=[m], s=[s], n=[n], third_moment=[third_moment]))
+
+ def combine_moments(data):
+ m = s = n = third_moment = 0.0
+ for datum in data.itertuples():
+ if datum.n == 0:
+ continue
+ elif n == 0:
+ m, s, n, third_moment = datum.m, datum.s, datum.n, datum.third_moment
+ else:
+ mean_b = s / n
+ mean_a = datum.s / datum.n
+ delta = mean_b - mean_a
+ n_a = datum.n
+ n_b = n
+ combined_n = n + datum.n
+ third_moment += datum.third_moment + (
+ (delta**3 * ((n_a * n_b) * (n_a - n_b)) / ((combined_n)**2)) +
+ ((3 * delta) * ((n_a * m) - (n_b * datum.m)) / (combined_n)))
+ m += datum.m + delta**2 * n * datum.n / (n + datum.n)
+ s += datum.s
+ n += datum.n
+
+ if n < 3:
+ return float('nan')
+ elif m == 0:
+ return float(0)
Review comment:
I think m2 can only be 0 if every element is equal.
>> This is true since m2 is just the variance (spread of the data). In fact,
if m2 is 0, I think the unbias skew will be NaN based on
https://en.wikipedia.org/wiki/Skewness#Sample_skewness and a sample skew
calculator I ran
##########
File path: sdks/python/apache_beam/dataframe/frames.py
##########
@@ -1430,6 +1430,72 @@ def corr(self, other, method, min_periods):
[self._expr, other._expr],
requires_partition_by=partitionings.Singleton(reason=reason)))
+ @frame_base.with_docs_from(pd.Series)
+ @frame_base.args_to_kwargs(pd.Series)
+ @frame_base.populate_defaults(pd.Series)
+ def skew(self, axis, skipna, level, numeric_only, **kwargs):
+ if level is not None:
+ raise NotImplementedError("per-level aggregation")
+ if skipna is None or skipna:
+ self = self.dropna() # pylint: disable=self-cls-assignment
+ # See the online, numerically stable formulae at
+ #
https://en.wikipedia.org/wiki/Algorithms_for_calculating_variance#Higher-order_statistics
+ def compute_moments(x):
+ n = len(x)
+ if n == 0:
+ m, s, third_moment = 0, 0, 0
+ elif n < 3:
+ m = x.std(ddof=0)**2 * n
+ s = x.sum()
+ third_moment = (((x - x.mean())**3).sum())
+ else:
+ m = x.std(ddof=0)**2 * n
+ s = x.sum()
+ third_moment = (((x - x.mean())**3).sum())
Review comment:
I ran some tests to time this. using the explicit formula (Sum of the
expectations), makes sense here based on the numbers i saw even for rather
large sizes of the dataset.
##########
File path: sdks/python/apache_beam/dataframe/frames.py
##########
@@ -1430,6 +1430,72 @@ def corr(self, other, method, min_periods):
[self._expr, other._expr],
requires_partition_by=partitionings.Singleton(reason=reason)))
+ @frame_base.with_docs_from(pd.Series)
+ @frame_base.args_to_kwargs(pd.Series)
+ @frame_base.populate_defaults(pd.Series)
+ def skew(self, axis, skipna, level, numeric_only, **kwargs):
+ if level is not None:
+ raise NotImplementedError("per-level aggregation")
+ if skipna is None or skipna:
+ self = self.dropna() # pylint: disable=self-cls-assignment
+ # See the online, numerically stable formulae at
+ #
https://en.wikipedia.org/wiki/Algorithms_for_calculating_variance#Higher-order_statistics
+ def compute_moments(x):
+ n = len(x)
+ if n == 0:
+ m, s, third_moment = 0, 0, 0
+ elif n < 3:
+ m = x.std(ddof=0)**2 * n
+ s = x.sum()
+ third_moment = (((x - x.mean())**3).sum())
+ else:
+ m = x.std(ddof=0)**2 * n
+ s = x.sum()
+ third_moment = (((x - x.mean())**3).sum())
+ return pd.DataFrame(
+ dict(m=[m], s=[s], n=[n], third_moment=[third_moment]))
+
+ def combine_moments(data):
+ m = s = n = third_moment = 0.0
+ for datum in data.itertuples():
+ if datum.n == 0:
+ continue
+ elif n == 0:
+ m, s, n, third_moment = datum.m, datum.s, datum.n, datum.third_moment
+ else:
+ mean_b = s / n
+ mean_a = datum.s / datum.n
+ delta = mean_b - mean_a
+ n_a = datum.n
+ n_b = n
Review comment:
rewritten, however, I think third_moment can stay as is.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]