robertwb commented on a change in pull request #14438:
URL: https://github.com/apache/beam/pull/14438#discussion_r628430152



##########
File path: sdks/python/apache_beam/dataframe/frames.py
##########
@@ -2321,18 +2410,23 @@ def __getitem__(self, name):
         self._grouping_indexes,
         projection=name)
 
-  def agg(self, fn):
-    if not callable(fn):
-      # TODO: Add support for strings in (UN)LIFTABLE_AGGREGATIONS. Test by
-      # running doctests for pandas.core.groupby.generic
-      raise NotImplementedError('GroupBy.agg currently only supports callable '
-                                'arguments')
-    return DeferredDataFrame(
-        expressions.ComputedExpression(
-            'agg',
-            lambda gb: gb.agg(fn), [self._expr],
-            requires_partition_by=partitionings.Index(),
-            preserves_partition_by=partitionings.Singleton()))
+  def agg(self, fn, *args, **kwargs):
+    if callable(fn):

Review comment:
       Do we want things like np.sum to take this path, or should we put this 
case last?

##########
File path: sdks/python/apache_beam/dataframe/frames.py
##########
@@ -948,41 +949,87 @@ def dropna(self, **kwargs):
   to_string = frame_base.wont_implement_method(
       pd.Series, 'to_string', reason="non-deferred-result")
 
-  def aggregate(self, func, axis=0, *args, **kwargs):
+  @frame_base.args_to_kwargs(pd.Series)
+  @frame_base.populate_defaults(pd.Series)
+  def aggregate(self, func, axis, *args, **kwargs):
+    if kwargs.get('skipna', False):
+      # Eagerly generate a proxy to make sure skipna is a valid argument
+      # for this aggregation method
+      _ = self._expr.proxy().aggregate(func, axis, *args, **kwargs)
+      kwargs.pop('skipna')
+      self.dropna().aggregate(func, axis, *args, **kwargs)

Review comment:
       Was the intent to return this? 

##########
File path: sdks/python/apache_beam/dataframe/frames.py
##########
@@ -922,33 +923,79 @@ def dropna(self, **kwargs):
   to_string = frame_base.wont_implement_method(
       pd.Series, 'to_string', reason="non-deferred-result")
 
-  def aggregate(self, func, axis=0, *args, **kwargs):
+  @frame_base.args_to_kwargs(pd.Series)
+  @frame_base.populate_defaults(pd.Series)
+  def aggregate(self, func, axis, *args, **kwargs):

Review comment:
       Yep, that's exactly right. It both makes our intent clear here and 
guarantees the defaults line up.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to