rohdesamuel commented on a change in pull request #14438:
URL: https://github.com/apache/beam/pull/14438#discussion_r616075863
##########
File path: sdks/python/apache_beam/dataframe/frames.py
##########
@@ -922,33 +923,79 @@ def dropna(self, **kwargs):
to_string = frame_base.wont_implement_method(
pd.Series, 'to_string', reason="non-deferred-result")
- def aggregate(self, func, axis=0, *args, **kwargs):
+ @frame_base.args_to_kwargs(pd.Series)
+ @frame_base.populate_defaults(pd.Series)
+ def aggregate(self, func, axis, *args, **kwargs):
Review comment:
Should axis be 0 by default?
##########
File path: sdks/python/apache_beam/dataframe/frames.py
##########
@@ -922,33 +923,79 @@ def dropna(self, **kwargs):
to_string = frame_base.wont_implement_method(
pd.Series, 'to_string', reason="non-deferred-result")
- def aggregate(self, func, axis=0, *args, **kwargs):
+ @frame_base.args_to_kwargs(pd.Series)
+ @frame_base.populate_defaults(pd.Series)
+ def aggregate(self, func, axis, *args, **kwargs):
+ if kwargs.get('skipna', False):
+ # Eagerly generate a proxy to make sure skipna is a valid argument
+ # for this aggregation method
+ _ = self._expr.proxy().aggregate(func, axis, *args, **kwargs)
+ kwargs.pop('skipna')
+ self.dropna().aggregate(func, axis, *args, **kwargs)
+
if isinstance(func, list) and len(func) > 1:
- # Aggregate each column separately, then stick them all together.
+ # level arg is ignored for multiple aggregations
+ _ = kwargs.pop('level', None)
+
+ # Aggregate with each method separately, then stick them all together.
rows = [self.agg([f], *args, **kwargs) for f in func]
return frame_base.DeferredFrame.wrap(
expressions.ComputedExpression(
'join_aggregate',
lambda *rows: pd.concat(rows), [row._expr for row in rows]))
else:
- # We're only handling a single column.
+ # We're only handling a single column. It could be 'func' or ['func'],
+ # which produce different results. 'func' produces a scalar, ['func']
+ # produces a single element Series.
base_func = func[0] if isinstance(func, list) else func
- if _is_associative(base_func) and not args and not kwargs:
+
+ if (_is_numeric(base_func) and
+ not pd.core.dtypes.common.is_numeric_dtype(self.dtype)):
+ warnings.warn(
+ f"Performing a numeric aggregation, {base_func!r}, on "
+ f"Series {self._expr.proxy().name!r} with non-numeric type "
+ f"{self.dtype!r}. This can result in runtime errors or surprising "
+ "results.")
+
+ if 'level' in kwargs:
+ # Defer to groupby.agg for level= mode
+ return self.groupby(
+ level=kwargs.pop('level'), axis=axis).agg(func, *args, **kwargs)
+
+ requires_singleton = False
+ if 'min_count' in kwargs:
+ # Eagerly generate a proxy to make sure min_count is a valid argument
+ # for this aggregation method
+ _ = self._expr.proxy().agg(func, axis, *args, **kwargs)
Review comment:
What happens when min_count isn't a valid argument? Should that
exception be caught here and have a specific error log for the aggregate method?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]