TheNeuralBit commented on a change in pull request #12982:
URL: https://github.com/apache/beam/pull/12982#discussion_r501310799



##########
File path: sdks/python/apache_beam/dataframe/frames.py
##########
@@ -34,6 +36,124 @@ def __array__(self, dtype=None):
 
   between = frame_base._elementwise_method('between')
 
+  @frame_base.args_to_kwargs(pd.Series)
+  @frame_base.populate_defaults(pd.Series)
+  def std(self, axis, skipna, level, ddof, **kwargs):
+    if level is not None:
+      raise NotImplementedError("per-level aggregation")
+    if skipna:
+      self = self.dropna()
+
+    # See the online, numerically stable formulae at
+    # https://en.wikipedia.org/wiki/Algorithms_for_calculating_variance
+    def compute_moments(x):
+      n = len(x)
+      m = x.std(ddof=0)**2 * n
+      s = x.sum()
+      return pd.DataFrame(dict(m=[m], s=[s], n=[n]))
+
+    def combine_moments(data):
+      m = s = n = 0.0
+      for datum in data.itertuples():
+        if datum.n == 0:
+          continue
+        elif n == 0:
+          m, s, n = datum.m, datum.s, datum.n
+        else:
+          m += datum.m + (s / n - datum.s / datum.n)**2 * n * datum.n / (

Review comment:
       nit: consider defining a variable for `delta` to make this easier to 
relate to the formula in 
https://en.wikipedia.org/wiki/Algorithms_for_calculating_variance#Parallel_algorithm

##########
File path: sdks/python/apache_beam/dataframe/frames_test.py
##########
@@ -36,7 +36,7 @@ def _run_test(self, func, *args):
             expressions.ConstantExpression(arg, arg[0:0])) for arg in args
     ]
     expected = func(*args)
-    actual = expressions.PartitioningSession({}).evaluate(
+    actual = expressions.Session({}).evaluate(

Review comment:
       Why revert this?

##########
File path: sdks/python/apache_beam/dataframe/frames.py
##########
@@ -34,6 +36,124 @@ def __array__(self, dtype=None):
 
   between = frame_base._elementwise_method('between')
 
+  @frame_base.args_to_kwargs(pd.Series)
+  @frame_base.populate_defaults(pd.Series)
+  def std(self, axis, skipna, level, ddof, **kwargs):
+    if level is not None:
+      raise NotImplementedError("per-level aggregation")
+    if skipna:
+      self = self.dropna()
+
+    # See the online, numerically stable formulae at
+    # https://en.wikipedia.org/wiki/Algorithms_for_calculating_variance

Review comment:
       Can this link directly to 
https://en.wikipedia.org/wiki/Algorithms_for_calculating_variance#Parallel_algorithm
 and/or 
https://en.wikipedia.org/wiki/Algorithms_for_calculating_variance#Welford's_online_algorithm?

##########
File path: sdks/python/apache_beam/dataframe/frames.py
##########
@@ -34,6 +36,124 @@ def __array__(self, dtype=None):
 
   between = frame_base._elementwise_method('between')
 
+  @frame_base.args_to_kwargs(pd.Series)
+  @frame_base.populate_defaults(pd.Series)
+  def std(self, axis, skipna, level, ddof, **kwargs):
+    if level is not None:
+      raise NotImplementedError("per-level aggregation")
+    if skipna:
+      self = self.dropna()
+
+    # See the online, numerically stable formulae at
+    # https://en.wikipedia.org/wiki/Algorithms_for_calculating_variance
+    def compute_moments(x):
+      n = len(x)
+      m = x.std(ddof=0)**2 * n
+      s = x.sum()
+      return pd.DataFrame(dict(m=[m], s=[s], n=[n]))
+
+    def combine_moments(data):
+      m = s = n = 0.0
+      for datum in data.itertuples():
+        if datum.n == 0:
+          continue
+        elif n == 0:
+          m, s, n = datum.m, datum.s, datum.n
+        else:
+          m += datum.m + (s / n - datum.s / datum.n)**2 * n * datum.n / (
+              n + datum.n)
+          s += datum.s
+          n += datum.n
+      if n <= ddof:
+        return float('nan')
+      else:
+        return math.sqrt(m / (n - ddof))
+
+    moments = expressions.ComputedExpression(
+        'compute_moments',
+        compute_moments, [self._expr],
+        requires_partition_by=partitionings.Nothing())
+    with expressions.allow_non_parallel_operations(True):
+      return frame_base.DeferredFrame.wrap(
+          expressions.ComputedExpression(
+              'combine_moments',
+              combine_moments, [moments],
+              requires_partition_by=partitionings.Singleton()))
+
+  @frame_base.args_to_kwargs(pd.Series)
+  @frame_base.populate_defaults(pd.Series)
+  def corr(self, other, method, min_periods):
+    if method == 'pearson':  # Note that this is the default.
+      x = self.dropna()
+      y = other.dropna()
+
+      # Do this first to filter to the entries that are present on both sides.
+      def join(x, y):
+        return pd.concat([x, y], axis=1, join='inner').rename(
+            lambda c: 'xy'[c], axis=1)
+
+      # Use the formulae from
+      # 
https://en.wikipedia.org/wiki/Algorithms_for_calculating_variance#Covariance
+      def compute_co_moments(x, y):
+        n = len(x)
+        if n <= 1:
+          c = 0
+        else:
+          c = x.corr(y) * x.std() * y.std() * (n - 1)
+        sx = x.sum()
+        sy = y.sum()
+        return pd.DataFrame(dict(c=[c], sx=[sx], sy=[sy], n=[n]))
+
+      def combine_co_moments(data, std_x, std_y):
+        c = sx = sy = n = 0.0
+        for datum in data.itertuples():
+          if datum.n == 0:
+            continue
+          elif n == 0:
+            c, sx, sy, n = datum.c, datum.sx, datum.sy, datum.n
+          else:
+            c += (
+                datum.c + (sx / n - datum.sx / datum.n) *
+                (sy / n - datum.sy / datum.n) * n * datum.n / (n + datum.n))
+            sx += datum.sx
+            sy += datum.sy
+            n += datum.n
+        if n < max(2, min_periods or 0):
+          return float('nan')
+        else:
+          return c / (n - 1) / std_x / std_y
+
+      joined = frame_base.DeferredFrame.wrap(
+          expressions.ComputedExpression(
+              'join',
+              join, [x._expr, y._expr],
+              requires_partition_by=partitionings.Index()))
+      std_x = joined.x.std()
+      std_y = joined.y.std()
+
+      moments = expressions.ComputedExpression(
+          'compute_co_moments',
+          compute_co_moments, [joined.x._expr, joined.y._expr])
+
+      with expressions.allow_non_parallel_operations(True):
+        return frame_base.DeferredFrame.wrap(
+            expressions.ComputedExpression(
+                'comnine_co_moments',
+                combine_co_moments, [moments, std_x._expr, std_y._expr],
+                requires_partition_by=partitionings.Singleton()))
+
+    else:
+      # The rank-based correlations are not obviously parallelizable, though
+      # perhaps an approximation could be done with a knowledge of quantiles
+      # and custom partitioning.
+      return frame_base.DeferredFrame.wrap(
+          expressions.ComputedExpression(
+              'corr',
+              lambda df,
+              other: df.corr(other, method=method, DataFrame=min_periods)[

Review comment:
       ```suggestion
                 other: df.corr(other, method=method, min_periods=min_periods)[
   ```

##########
File path: sdks/python/apache_beam/dataframe/frames.py
##########
@@ -150,8 +115,72 @@ def combine_co_moments(data, std_x, std_y):
           expressions.ComputedExpression(
               'corr',
               lambda df,
-              other: df.corr(other, method=method, DataFrame=min_periods)[
-                  self._expr, other._expr],
+              other: df.corr(other, method=method, min_periods=min_periods),
+              [self._expr, other._expr],
+              requires_partition_by=partitionings.Singleton()))
+
+  def _corr_aligned(self, other, method, min_periods):
+    std_x = self.std()
+    std_y = other.std()
+    cov = self._cov_aligned(other, min_periods)
+    with expressions.allow_non_parallel_operations(True):
+      return frame_base.DeferredFrame.wrap(
+          expressions.ComputedExpression(
+              'normalize',
+              lambda cov,
+              std_x,
+              std_y: cov / (std_x * std_y),
+              [cov._expr, std_x._expr, std_y._expr],
+              requires_partition_by=partitionings.Singleton()))
+
+  @frame_base.args_to_kwargs(pd.Series)
+  @frame_base.populate_defaults(pd.Series)
+  def cov(self, other, min_periods, ddof):
+    x, y = self.dropna().align(other.dropna(), 'inner')
+    return x._cov_aligned(y, min_periods, ddof)
+
+  def _cov_aligned(self, other, min_periods, ddof=1):
+    # Use the formulae from
+    # 
https://en.wikipedia.org/wiki/Algorithms_for_calculating_variance#Covariance
+    def compute_co_moments(x, y):
+      n = len(x)
+      if n <= 1:
+        c = 0
+      else:
+        c = x.corr(y) * x.std() * y.std() * (n - 1)

Review comment:
       nit: this could become `x.cov(y) * (n-1)` which makes this more easily 
relatable to the wiki link

##########
File path: sdks/python/apache_beam/dataframe/frames.py
##########
@@ -34,6 +36,124 @@ def __array__(self, dtype=None):
 
   between = frame_base._elementwise_method('between')
 
+  @frame_base.args_to_kwargs(pd.Series)
+  @frame_base.populate_defaults(pd.Series)
+  def std(self, axis, skipna, level, ddof, **kwargs):
+    if level is not None:
+      raise NotImplementedError("per-level aggregation")
+    if skipna:
+      self = self.dropna()
+
+    # See the online, numerically stable formulae at
+    # https://en.wikipedia.org/wiki/Algorithms_for_calculating_variance
+    def compute_moments(x):
+      n = len(x)
+      m = x.std(ddof=0)**2 * n
+      s = x.sum()
+      return pd.DataFrame(dict(m=[m], s=[s], n=[n]))
+
+    def combine_moments(data):
+      m = s = n = 0.0
+      for datum in data.itertuples():
+        if datum.n == 0:
+          continue
+        elif n == 0:
+          m, s, n = datum.m, datum.s, datum.n
+        else:
+          m += datum.m + (s / n - datum.s / datum.n)**2 * n * datum.n / (
+              n + datum.n)
+          s += datum.s
+          n += datum.n
+      if n <= ddof:
+        return float('nan')
+      else:
+        return math.sqrt(m / (n - ddof))
+
+    moments = expressions.ComputedExpression(
+        'compute_moments',
+        compute_moments, [self._expr],
+        requires_partition_by=partitionings.Nothing())
+    with expressions.allow_non_parallel_operations(True):
+      return frame_base.DeferredFrame.wrap(
+          expressions.ComputedExpression(
+              'combine_moments',
+              combine_moments, [moments],
+              requires_partition_by=partitionings.Singleton()))
+
+  @frame_base.args_to_kwargs(pd.Series)
+  @frame_base.populate_defaults(pd.Series)
+  def corr(self, other, method, min_periods):
+    if method == 'pearson':  # Note that this is the default.
+      x = self.dropna()
+      y = other.dropna()
+
+      # Do this first to filter to the entries that are present on both sides.
+      def join(x, y):
+        return pd.concat([x, y], axis=1, join='inner').rename(
+            lambda c: 'xy'[c], axis=1)
+
+      # Use the formulae from

Review comment:
       It would help to be more specific here, I spent a while trying to find 
references. It looks like `combine_co_moments`  is the formula for combining 
covariance from two sets hidden at the bottom of the [Online Covariance 
section](https://en.wikipedia.org/wiki/Algorithms_for_calculating_variance#Online).
   
   The other critical piece is the translation between co-moment and pearson 
correlation coefficient. Is there something we can reference for that? It seems 
to follow from the last definition 
[here](https://en.wikipedia.org/wiki/Pearson_correlation_coefficient#For_a_sample).
 IIUC the co-moment is the numerator in that definition, and that fits with 
your code.

##########
File path: sdks/python/apache_beam/dataframe/frames.py
##########
@@ -34,6 +36,124 @@ def __array__(self, dtype=None):
 
   between = frame_base._elementwise_method('between')
 
+  @frame_base.args_to_kwargs(pd.Series)
+  @frame_base.populate_defaults(pd.Series)
+  def std(self, axis, skipna, level, ddof, **kwargs):
+    if level is not None:
+      raise NotImplementedError("per-level aggregation")
+    if skipna:
+      self = self.dropna()
+
+    # See the online, numerically stable formulae at
+    # https://en.wikipedia.org/wiki/Algorithms_for_calculating_variance
+    def compute_moments(x):
+      n = len(x)
+      m = x.std(ddof=0)**2 * n
+      s = x.sum()
+      return pd.DataFrame(dict(m=[m], s=[s], n=[n]))
+
+    def combine_moments(data):
+      m = s = n = 0.0
+      for datum in data.itertuples():
+        if datum.n == 0:
+          continue
+        elif n == 0:
+          m, s, n = datum.m, datum.s, datum.n
+        else:
+          m += datum.m + (s / n - datum.s / datum.n)**2 * n * datum.n / (
+              n + datum.n)
+          s += datum.s
+          n += datum.n
+      if n <= ddof:
+        return float('nan')
+      else:
+        return math.sqrt(m / (n - ddof))
+
+    moments = expressions.ComputedExpression(
+        'compute_moments',
+        compute_moments, [self._expr],
+        requires_partition_by=partitionings.Nothing())
+    with expressions.allow_non_parallel_operations(True):
+      return frame_base.DeferredFrame.wrap(
+          expressions.ComputedExpression(
+              'combine_moments',
+              combine_moments, [moments],
+              requires_partition_by=partitionings.Singleton()))
+
+  @frame_base.args_to_kwargs(pd.Series)
+  @frame_base.populate_defaults(pd.Series)
+  def corr(self, other, method, min_periods):
+    if method == 'pearson':  # Note that this is the default.
+      x = self.dropna()
+      y = other.dropna()
+
+      # Do this first to filter to the entries that are present on both sides.
+      def join(x, y):
+        return pd.concat([x, y], axis=1, join='inner').rename(
+            lambda c: 'xy'[c], axis=1)
+
+      # Use the formulae from
+      # 
https://en.wikipedia.org/wiki/Algorithms_for_calculating_variance#Covariance
+      def compute_co_moments(x, y):
+        n = len(x)
+        if n <= 1:
+          c = 0
+        else:
+          c = x.corr(y) * x.std() * y.std() * (n - 1)
+        sx = x.sum()
+        sy = y.sum()
+        return pd.DataFrame(dict(c=[c], sx=[sx], sy=[sy], n=[n]))
+
+      def combine_co_moments(data, std_x, std_y):
+        c = sx = sy = n = 0.0
+        for datum in data.itertuples():
+          if datum.n == 0:
+            continue
+          elif n == 0:
+            c, sx, sy, n = datum.c, datum.sx, datum.sy, datum.n
+          else:
+            c += (
+                datum.c + (sx / n - datum.sx / datum.n) *
+                (sy / n - datum.sy / datum.n) * n * datum.n / (n + datum.n))
+            sx += datum.sx
+            sy += datum.sy
+            n += datum.n
+        if n < max(2, min_periods or 0):
+          return float('nan')
+        else:
+          return c / (n - 1) / std_x / std_y
+
+      joined = frame_base.DeferredFrame.wrap(
+          expressions.ComputedExpression(
+              'join',
+              join, [x._expr, y._expr],
+              requires_partition_by=partitionings.Index()))
+      std_x = joined.x.std()
+      std_y = joined.y.std()
+
+      moments = expressions.ComputedExpression(
+          'compute_co_moments',
+          compute_co_moments, [joined.x._expr, joined.y._expr])
+
+      with expressions.allow_non_parallel_operations(True):
+        return frame_base.DeferredFrame.wrap(
+            expressions.ComputedExpression(
+                'comnine_co_moments',
+                combine_co_moments, [moments, std_x._expr, std_y._expr],
+                requires_partition_by=partitionings.Singleton()))
+
+    else:
+      # The rank-based correlations are not obviously parallelizable, though
+      # perhaps an approximation could be done with a knowledge of quantiles
+      # and custom partitioning.
+      return frame_base.DeferredFrame.wrap(
+          expressions.ComputedExpression(
+              'corr',
+              lambda df,
+              other: df.corr(other, method=method, DataFrame=min_periods)[

Review comment:
       Nevermind looks like you corrected this later

##########
File path: sdks/python/apache_beam/dataframe/frames.py
##########
@@ -150,8 +115,72 @@ def combine_co_moments(data, std_x, std_y):
           expressions.ComputedExpression(
               'corr',
               lambda df,
-              other: df.corr(other, method=method, DataFrame=min_periods)[
-                  self._expr, other._expr],
+              other: df.corr(other, method=method, min_periods=min_periods),
+              [self._expr, other._expr],
+              requires_partition_by=partitionings.Singleton()))
+
+  def _corr_aligned(self, other, method, min_periods):
+    std_x = self.std()
+    std_y = other.std()
+    cov = self._cov_aligned(other, min_periods)
+    with expressions.allow_non_parallel_operations(True):
+      return frame_base.DeferredFrame.wrap(
+          expressions.ComputedExpression(
+              'normalize',
+              lambda cov,
+              std_x,
+              std_y: cov / (std_x * std_y),
+              [cov._expr, std_x._expr, std_y._expr],
+              requires_partition_by=partitionings.Singleton()))
+
+  @frame_base.args_to_kwargs(pd.Series)
+  @frame_base.populate_defaults(pd.Series)
+  def cov(self, other, min_periods, ddof):
+    x, y = self.dropna().align(other.dropna(), 'inner')
+    return x._cov_aligned(y, min_periods, ddof)
+
+  def _cov_aligned(self, other, min_periods, ddof=1):
+    # Use the formulae from
+    # 
https://en.wikipedia.org/wiki/Algorithms_for_calculating_variance#Covariance
+    def compute_co_moments(x, y):
+      n = len(x)
+      if n <= 1:
+        c = 0
+      else:
+        c = x.corr(y) * x.std() * y.std() * (n - 1)
+      sx = x.sum()
+      sy = y.sum()
+      return pd.DataFrame(dict(c=[c], sx=[sx], sy=[sy], n=[n]))
+
+    def combine_co_moments(data):
+      c = sx = sy = n = 0.0
+      for datum in data.itertuples():
+        if datum.n == 0:
+          continue
+        elif n == 0:
+          c, sx, sy, n = datum.c, datum.sx, datum.sy, datum.n
+        else:
+          c += (
+              datum.c + (sx / n - datum.sx / datum.n) *
+              (sy / n - datum.sy / datum.n) * n * datum.n / (n + datum.n))
+          sx += datum.sx
+          sy += datum.sy
+          n += datum.n
+      if n < max(2, min_periods or 0):
+        return float('nan')

Review comment:
       Should this also check against ddof like std?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to