TheNeuralBit commented on a change in pull request #15809:
URL: https://github.com/apache/beam/pull/15809#discussion_r740461343



##########
File path: sdks/python/apache_beam/dataframe/frames.py
##########
@@ -1430,6 +1430,72 @@ def corr(self, other, method, min_periods):
               [self._expr, other._expr],
               requires_partition_by=partitionings.Singleton(reason=reason)))
 
+  @frame_base.with_docs_from(pd.Series)
+  @frame_base.args_to_kwargs(pd.Series)
+  @frame_base.populate_defaults(pd.Series)
+  def skew(self, axis, skipna, level, numeric_only, **kwargs):
+    if level is not None:
+      raise NotImplementedError("per-level aggregation")
+    if skipna is None or skipna:
+      self = self.dropna()  # pylint: disable=self-cls-assignment
+    # See the online, numerically stable formulae at
+    # 
https://en.wikipedia.org/wiki/Algorithms_for_calculating_variance#Higher-order_statistics
+    def compute_moments(x):
+      n = len(x)
+      if n == 0:
+        m, s, third_moment = 0, 0, 0
+      elif n < 3:
+        m = x.std(ddof=0)**2 * n
+        s = x.sum()
+        third_moment = (((x - x.mean())**3).sum())
+      else:
+        m = x.std(ddof=0)**2 * n
+        s = x.sum()
+        third_moment = (((x - x.mean())**3).sum())

Review comment:
       This uses the same logic as the `elif` block. I think the intention is 
to use the pandas `skew` here since we think it is more efficient, right?
   
   Could you also run some microbenchmarks to see if the approach with pandas 
`skew` actually is more performant?

##########
File path: sdks/python/apache_beam/dataframe/frames_test.py
##########
@@ -1397,7 +1398,6 @@ def test_series_agg(self, agg_method):
         'median',
         'sem',
         'mad',
-        'skew',

Review comment:
       There are a few other places where this is copied, can you make the 
change there as well? That should fix the precommit failures
   
   Alternatively, it could be nice to have the list of non parallelizable 
aggregations defined in one place in a constant, then just update it there.

##########
File path: sdks/python/apache_beam/dataframe/frames.py
##########
@@ -1430,6 +1430,72 @@ def corr(self, other, method, min_periods):
               [self._expr, other._expr],
               requires_partition_by=partitionings.Singleton(reason=reason)))
 
+  @frame_base.with_docs_from(pd.Series)
+  @frame_base.args_to_kwargs(pd.Series)
+  @frame_base.populate_defaults(pd.Series)
+  def skew(self, axis, skipna, level, numeric_only, **kwargs):
+    if level is not None:
+      raise NotImplementedError("per-level aggregation")
+    if skipna is None or skipna:
+      self = self.dropna()  # pylint: disable=self-cls-assignment
+    # See the online, numerically stable formulae at
+    # 
https://en.wikipedia.org/wiki/Algorithms_for_calculating_variance#Higher-order_statistics
+    def compute_moments(x):
+      n = len(x)
+      if n == 0:
+        m, s, third_moment = 0, 0, 0
+      elif n < 3:
+        m = x.std(ddof=0)**2 * n
+        s = x.sum()
+        third_moment = (((x - x.mean())**3).sum())
+      else:
+        m = x.std(ddof=0)**2 * n
+        s = x.sum()
+        third_moment = (((x - x.mean())**3).sum())
+      return pd.DataFrame(
+          dict(m=[m], s=[s], n=[n], third_moment=[third_moment]))
+
+    def combine_moments(data):
+      m = s = n = third_moment = 0.0
+      for datum in data.itertuples():
+        if datum.n == 0:
+          continue
+        elif n == 0:
+          m, s, n, third_moment = datum.m, datum.s, datum.n, datum.third_moment
+        else:
+          mean_b = s / n
+          mean_a = datum.s / datum.n
+          delta = mean_b - mean_a
+          n_a = datum.n
+          n_b = n
+          combined_n = n + datum.n
+          third_moment += datum.third_moment + (
+              (delta**3 * ((n_a * n_b) * (n_a - n_b)) / ((combined_n)**2)) +
+              ((3 * delta) * ((n_a * m) - (n_b * datum.m)) / (combined_n)))
+          m += datum.m + delta**2 * n * datum.n / (n + datum.n)
+          s += datum.s
+          n += datum.n

Review comment:
       nit: you're computing `n + datum.n` in several places. Ccould you just 
reference `combined_n` everywhere?

##########
File path: sdks/python/apache_beam/dataframe/frames.py
##########
@@ -1430,6 +1430,72 @@ def corr(self, other, method, min_periods):
               [self._expr, other._expr],
               requires_partition_by=partitionings.Singleton(reason=reason)))
 
+  @frame_base.with_docs_from(pd.Series)
+  @frame_base.args_to_kwargs(pd.Series)
+  @frame_base.populate_defaults(pd.Series)
+  def skew(self, axis, skipna, level, numeric_only, **kwargs):
+    if level is not None:
+      raise NotImplementedError("per-level aggregation")
+    if skipna is None or skipna:
+      self = self.dropna()  # pylint: disable=self-cls-assignment
+    # See the online, numerically stable formulae at
+    # 
https://en.wikipedia.org/wiki/Algorithms_for_calculating_variance#Higher-order_statistics
+    def compute_moments(x):
+      n = len(x)
+      if n == 0:
+        m, s, third_moment = 0, 0, 0
+      elif n < 3:
+        m = x.std(ddof=0)**2 * n
+        s = x.sum()
+        third_moment = (((x - x.mean())**3).sum())
+      else:
+        m = x.std(ddof=0)**2 * n
+        s = x.sum()
+        third_moment = (((x - x.mean())**3).sum())
+      return pd.DataFrame(
+          dict(m=[m], s=[s], n=[n], third_moment=[third_moment]))
+
+    def combine_moments(data):
+      m = s = n = third_moment = 0.0
+      for datum in data.itertuples():
+        if datum.n == 0:
+          continue
+        elif n == 0:
+          m, s, n, third_moment = datum.m, datum.s, datum.n, datum.third_moment
+        else:
+          mean_b = s / n
+          mean_a = datum.s / datum.n
+          delta = mean_b - mean_a
+          n_a = datum.n
+          n_b = n
+          combined_n = n + datum.n
+          third_moment += datum.third_moment + (
+              (delta**3 * ((n_a * n_b) * (n_a - n_b)) / ((combined_n)**2)) +
+              ((3 * delta) * ((n_a * m) - (n_b * datum.m)) / (combined_n)))
+          m += datum.m + delta**2 * n * datum.n / (n + datum.n)
+          s += datum.s
+          n += datum.n
+
+      if n < 3:
+        return float('nan')
+      elif m == 0:
+        return float(0)
+      else:
+        return combined_n * math.sqrt(combined_n - 1) / (combined_n -
+                                                         2) * third_moment / (
+                                                             m**(3 / 2))

Review comment:
       Could you add a comment here that this is scaling for _unbiased_ (aka 
sample) skewness, which is the reason it's slightly different from the equation 
at 
https://en.wikipedia.org/wiki/Algorithms_for_calculating_variance#Higher-order_statistics,
 which is producing biased (aka population) skew.
   
   You might also add a link to 
https://en.wikipedia.org/wiki/Skewness#Sample_skewness which has definitions 
for each.

##########
File path: sdks/python/apache_beam/dataframe/frames.py
##########
@@ -1430,6 +1430,72 @@ def corr(self, other, method, min_periods):
               [self._expr, other._expr],
               requires_partition_by=partitionings.Singleton(reason=reason)))
 
+  @frame_base.with_docs_from(pd.Series)
+  @frame_base.args_to_kwargs(pd.Series)
+  @frame_base.populate_defaults(pd.Series)
+  def skew(self, axis, skipna, level, numeric_only, **kwargs):
+    if level is not None:
+      raise NotImplementedError("per-level aggregation")
+    if skipna is None or skipna:
+      self = self.dropna()  # pylint: disable=self-cls-assignment
+    # See the online, numerically stable formulae at
+    # 
https://en.wikipedia.org/wiki/Algorithms_for_calculating_variance#Higher-order_statistics
+    def compute_moments(x):
+      n = len(x)
+      if n == 0:
+        m, s, third_moment = 0, 0, 0
+      elif n < 3:
+        m = x.std(ddof=0)**2 * n
+        s = x.sum()
+        third_moment = (((x - x.mean())**3).sum())
+      else:
+        m = x.std(ddof=0)**2 * n
+        s = x.sum()
+        third_moment = (((x - x.mean())**3).sum())
+      return pd.DataFrame(
+          dict(m=[m], s=[s], n=[n], third_moment=[third_moment]))
+
+    def combine_moments(data):
+      m = s = n = third_moment = 0.0
+      for datum in data.itertuples():
+        if datum.n == 0:
+          continue
+        elif n == 0:
+          m, s, n, third_moment = datum.m, datum.s, datum.n, datum.third_moment
+        else:
+          mean_b = s / n
+          mean_a = datum.s / datum.n
+          delta = mean_b - mean_a
+          n_a = datum.n
+          n_b = n
+          combined_n = n + datum.n
+          third_moment += datum.third_moment + (
+              (delta**3 * ((n_a * n_b) * (n_a - n_b)) / ((combined_n)**2)) +
+              ((3 * delta) * ((n_a * m) - (n_b * datum.m)) / (combined_n)))
+          m += datum.m + delta**2 * n * datum.n / (n + datum.n)
+          s += datum.s
+          n += datum.n
+
+      if n < 3:
+        return float('nan')
+      elif m == 0:
+        return float(0)

Review comment:
       Is it always true that skew is 0 if the second moment is 0? I'm 
wondering what happens if m2 is 0 and m3 is non-zero. Perhaps that's not 
possible. I think m2 can only be 0 if every element is equal. Does that seem 
right @robertwb?

##########
File path: sdks/python/apache_beam/dataframe/frames.py
##########
@@ -1430,6 +1430,72 @@ def corr(self, other, method, min_periods):
               [self._expr, other._expr],
               requires_partition_by=partitionings.Singleton(reason=reason)))
 
+  @frame_base.with_docs_from(pd.Series)
+  @frame_base.args_to_kwargs(pd.Series)
+  @frame_base.populate_defaults(pd.Series)
+  def skew(self, axis, skipna, level, numeric_only, **kwargs):
+    if level is not None:
+      raise NotImplementedError("per-level aggregation")
+    if skipna is None or skipna:
+      self = self.dropna()  # pylint: disable=self-cls-assignment
+    # See the online, numerically stable formulae at
+    # 
https://en.wikipedia.org/wiki/Algorithms_for_calculating_variance#Higher-order_statistics
+    def compute_moments(x):
+      n = len(x)
+      if n == 0:
+        m, s, third_moment = 0, 0, 0
+      elif n < 3:
+        m = x.std(ddof=0)**2 * n
+        s = x.sum()
+        third_moment = (((x - x.mean())**3).sum())
+      else:
+        m = x.std(ddof=0)**2 * n
+        s = x.sum()
+        third_moment = (((x - x.mean())**3).sum())
+      return pd.DataFrame(
+          dict(m=[m], s=[s], n=[n], third_moment=[third_moment]))
+
+    def combine_moments(data):
+      m = s = n = third_moment = 0.0
+      for datum in data.itertuples():
+        if datum.n == 0:
+          continue
+        elif n == 0:
+          m, s, n, third_moment = datum.m, datum.s, datum.n, datum.third_moment
+        else:
+          mean_b = s / n
+          mean_a = datum.s / datum.n
+          delta = mean_b - mean_a
+          n_a = datum.n
+          n_b = n

Review comment:
       I think defining `n_a` and `n_b` here just confuses things, for a couple 
of reasons:
   1. You're not using them everywhere (the `m` calculation still references 
the original names).
   2. You're only doing it for `n`. What about the other variables that are 
being accumulated (`s`, `m`, `third_moment`)?
   
   I think this code should either a) define variables like this for everything 
that's being accumulated, and reference them everyhwere, or b) just reference 
the original names. It's up to you which you'd prefer. I think (a) improved 
clarity substantially, but at the cost of some performance.
   
   Speaking of clarity, maybe `m` should be called `second_moment`, or `m2`, to 
distinguish it from the third moment?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to