[ 
https://issues.apache.org/jira/browse/BEAM-9547?focusedWorklogId=498641&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-498641
 ]

ASF GitHub Bot logged work on BEAM-9547:
----------------------------------------

                Author: ASF GitHub Bot
            Created on: 09/Oct/20 16:20
            Start Date: 09/Oct/20 16:20
    Worklog Time Spent: 10m 
      Work Description: robertwb commented on a change in pull request #12982:
URL: https://github.com/apache/beam/pull/12982#discussion_r502197895



##########
File path: sdks/python/apache_beam/dataframe/frames.py
##########
@@ -34,6 +36,124 @@ def __array__(self, dtype=None):
 
   between = frame_base._elementwise_method('between')
 
+  @frame_base.args_to_kwargs(pd.Series)
+  @frame_base.populate_defaults(pd.Series)
+  def std(self, axis, skipna, level, ddof, **kwargs):
+    if level is not None:
+      raise NotImplementedError("per-level aggregation")
+    if skipna:
+      self = self.dropna()
+
+    # See the online, numerically stable formulae at
+    # https://en.wikipedia.org/wiki/Algorithms_for_calculating_variance
+    def compute_moments(x):
+      n = len(x)
+      m = x.std(ddof=0)**2 * n
+      s = x.sum()
+      return pd.DataFrame(dict(m=[m], s=[s], n=[n]))
+
+    def combine_moments(data):
+      m = s = n = 0.0
+      for datum in data.itertuples():
+        if datum.n == 0:
+          continue
+        elif n == 0:
+          m, s, n = datum.m, datum.s, datum.n
+        else:
+          m += datum.m + (s / n - datum.s / datum.n)**2 * n * datum.n / (

Review comment:
       Done.

##########
File path: sdks/python/apache_beam/dataframe/frames.py
##########
@@ -150,8 +115,72 @@ def combine_co_moments(data, std_x, std_y):
           expressions.ComputedExpression(
               'corr',
               lambda df,
-              other: df.corr(other, method=method, DataFrame=min_periods)[
-                  self._expr, other._expr],
+              other: df.corr(other, method=method, min_periods=min_periods),
+              [self._expr, other._expr],
+              requires_partition_by=partitionings.Singleton()))
+
+  def _corr_aligned(self, other, method, min_periods):
+    std_x = self.std()
+    std_y = other.std()
+    cov = self._cov_aligned(other, min_periods)
+    with expressions.allow_non_parallel_operations(True):
+      return frame_base.DeferredFrame.wrap(
+          expressions.ComputedExpression(
+              'normalize',
+              lambda cov,
+              std_x,
+              std_y: cov / (std_x * std_y),
+              [cov._expr, std_x._expr, std_y._expr],
+              requires_partition_by=partitionings.Singleton()))
+
+  @frame_base.args_to_kwargs(pd.Series)
+  @frame_base.populate_defaults(pd.Series)
+  def cov(self, other, min_periods, ddof):
+    x, y = self.dropna().align(other.dropna(), 'inner')
+    return x._cov_aligned(y, min_periods, ddof)
+
+  def _cov_aligned(self, other, min_periods, ddof=1):
+    # Use the formulae from
+    # 
https://en.wikipedia.org/wiki/Algorithms_for_calculating_variance#Covariance
+    def compute_co_moments(x, y):
+      n = len(x)
+      if n <= 1:
+        c = 0
+      else:
+        c = x.corr(y) * x.std() * y.std() * (n - 1)
+      sx = x.sum()
+      sy = y.sum()
+      return pd.DataFrame(dict(c=[c], sx=[sx], sy=[sy], n=[n]))
+
+    def combine_co_moments(data):
+      c = sx = sy = n = 0.0
+      for datum in data.itertuples():
+        if datum.n == 0:
+          continue
+        elif n == 0:
+          c, sx, sy, n = datum.c, datum.sx, datum.sy, datum.n
+        else:
+          c += (
+              datum.c + (sx / n - datum.sx / datum.n) *
+              (sy / n - datum.sy / datum.n) * n * datum.n / (n + datum.n))
+          sx += datum.sx
+          sy += datum.sy
+          n += datum.n
+      if n < max(2, min_periods or 0):
+        return float('nan')

Review comment:
       Yes. Done.

##########
File path: sdks/python/apache_beam/dataframe/frames.py
##########
@@ -34,6 +36,124 @@ def __array__(self, dtype=None):
 
   between = frame_base._elementwise_method('between')
 
+  @frame_base.args_to_kwargs(pd.Series)
+  @frame_base.populate_defaults(pd.Series)
+  def std(self, axis, skipna, level, ddof, **kwargs):
+    if level is not None:
+      raise NotImplementedError("per-level aggregation")
+    if skipna:
+      self = self.dropna()
+
+    # See the online, numerically stable formulae at
+    # https://en.wikipedia.org/wiki/Algorithms_for_calculating_variance

Review comment:
       Done. Even if the subheading change, they're descriptive enough to 
identify what is meant.

##########
File path: sdks/python/apache_beam/dataframe/frames_test.py
##########
@@ -36,7 +36,7 @@ def _run_test(self, func, *args):
             expressions.ConstantExpression(arg, arg[0:0])) for arg in args
     ]
     expected = func(*args)
-    actual = expressions.PartitioningSession({}).evaluate(
+    actual = expressions.Session({}).evaluate(

Review comment:
       Some of the other tests in this file were relying on the ordering. 

##########
File path: sdks/python/apache_beam/dataframe/frames.py
##########
@@ -150,8 +115,72 @@ def combine_co_moments(data, std_x, std_y):
           expressions.ComputedExpression(
               'corr',
               lambda df,
-              other: df.corr(other, method=method, DataFrame=min_periods)[
-                  self._expr, other._expr],
+              other: df.corr(other, method=method, min_periods=min_periods),
+              [self._expr, other._expr],
+              requires_partition_by=partitionings.Singleton()))
+
+  def _corr_aligned(self, other, method, min_periods):
+    std_x = self.std()
+    std_y = other.std()
+    cov = self._cov_aligned(other, min_periods)
+    with expressions.allow_non_parallel_operations(True):
+      return frame_base.DeferredFrame.wrap(
+          expressions.ComputedExpression(
+              'normalize',
+              lambda cov,
+              std_x,
+              std_y: cov / (std_x * std_y),
+              [cov._expr, std_x._expr, std_y._expr],
+              requires_partition_by=partitionings.Singleton()))
+
+  @frame_base.args_to_kwargs(pd.Series)
+  @frame_base.populate_defaults(pd.Series)
+  def cov(self, other, min_periods, ddof):
+    x, y = self.dropna().align(other.dropna(), 'inner')
+    return x._cov_aligned(y, min_periods, ddof)
+
+  def _cov_aligned(self, other, min_periods, ddof=1):
+    # Use the formulae from
+    # 
https://en.wikipedia.org/wiki/Algorithms_for_calculating_variance#Covariance
+    def compute_co_moments(x, y):
+      n = len(x)
+      if n <= 1:
+        c = 0
+      else:
+        c = x.corr(y) * x.std() * y.std() * (n - 1)

Review comment:
       Good point. Done.

##########
File path: sdks/python/apache_beam/dataframe/frames_test.py
##########
@@ -36,7 +36,7 @@ def _run_test(self, func, *args):
             expressions.ConstantExpression(arg, arg[0:0])) for arg in args
     ]
     expected = func(*args)
-    actual = expressions.PartitioningSession({}).evaluate(
+    actual = expressions.Session({}).evaluate(

Review comment:
       I've made this an option, we can try to switch it in the future.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Issue Time Tracking
-------------------

    Worklog Id:     (was: 498641)
    Time Spent: 21h 50m  (was: 21h 40m)

> Implement all pandas operations (or raise WontImplementError)
> -------------------------------------------------------------
>
>                 Key: BEAM-9547
>                 URL: https://issues.apache.org/jira/browse/BEAM-9547
>             Project: Beam
>          Issue Type: Improvement
>          Components: sdk-py-core
>            Reporter: Brian Hulette
>            Assignee: Robert Bradshaw
>            Priority: P2
>          Time Spent: 21h 50m
>  Remaining Estimate: 0h
>
> We should have an implementation for every DataFrame, Series, and GroupBy 
> method. Everything that's not actually implemented should get a default 
> implementation that raises WontImplementError
> SeeĀ https://github.com/apache/beam/pull/10757#discussion_r389132292
> Progress at the individual operation level is tracked in a 
> [spreadsheet|https://docs.google.com/spreadsheets/d/1hHAaJ0n0k2tw465ORs5tfdy4Lg0DnGWIQ53cLjAhel0/edit],
>  consider requesting edit access if you'd like to help out.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to