TheNeuralBit commented on a change in pull request #12982:
URL: https://github.com/apache/beam/pull/12982#discussion_r501362686



##########
File path: sdks/python/apache_beam/dataframe/frames.py
##########
@@ -34,6 +36,124 @@ def __array__(self, dtype=None):
 
   between = frame_base._elementwise_method('between')
 
+  @frame_base.args_to_kwargs(pd.Series)
+  @frame_base.populate_defaults(pd.Series)
+  def std(self, axis, skipna, level, ddof, **kwargs):
+    if level is not None:
+      raise NotImplementedError("per-level aggregation")
+    if skipna:
+      self = self.dropna()
+
+    # See the online, numerically stable formulae at
+    # https://en.wikipedia.org/wiki/Algorithms_for_calculating_variance
+    def compute_moments(x):
+      n = len(x)
+      m = x.std(ddof=0)**2 * n
+      s = x.sum()
+      return pd.DataFrame(dict(m=[m], s=[s], n=[n]))
+
+    def combine_moments(data):
+      m = s = n = 0.0
+      for datum in data.itertuples():
+        if datum.n == 0:
+          continue
+        elif n == 0:
+          m, s, n = datum.m, datum.s, datum.n
+        else:
+          m += datum.m + (s / n - datum.s / datum.n)**2 * n * datum.n / (
+              n + datum.n)
+          s += datum.s
+          n += datum.n
+      if n <= ddof:
+        return float('nan')
+      else:
+        return math.sqrt(m / (n - ddof))
+
+    moments = expressions.ComputedExpression(
+        'compute_moments',
+        compute_moments, [self._expr],
+        requires_partition_by=partitionings.Nothing())
+    with expressions.allow_non_parallel_operations(True):
+      return frame_base.DeferredFrame.wrap(
+          expressions.ComputedExpression(
+              'combine_moments',
+              combine_moments, [moments],
+              requires_partition_by=partitionings.Singleton()))
+
+  @frame_base.args_to_kwargs(pd.Series)
+  @frame_base.populate_defaults(pd.Series)
+  def corr(self, other, method, min_periods):
+    if method == 'pearson':  # Note that this is the default.
+      x = self.dropna()
+      y = other.dropna()
+
+      # Do this first to filter to the entries that are present on both sides.
+      def join(x, y):
+        return pd.concat([x, y], axis=1, join='inner').rename(
+            lambda c: 'xy'[c], axis=1)
+
+      # Use the formulae from

Review comment:
       I think the reference to pearson correlation coefficient is no longer 
necessary after this was moved to `_cov_aligned`, which can be understood just 
from https://en.wikipedia.org/wiki/Algorithms_for_calculating_variance#Online. 
It would still be helpful to be more specific here though.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to