rohdesamuel commented on a change in pull request #14438:
URL: https://github.com/apache/beam/pull/14438#discussion_r616235214
##########
File path: sdks/python/apache_beam/dataframe/frames.py
##########
@@ -922,33 +923,79 @@ def dropna(self, **kwargs):
to_string = frame_base.wont_implement_method(
pd.Series, 'to_string', reason="non-deferred-result")
- def aggregate(self, func, axis=0, *args, **kwargs):
+ @frame_base.args_to_kwargs(pd.Series)
+ @frame_base.populate_defaults(pd.Series)
+ def aggregate(self, func, axis, *args, **kwargs):
+ if kwargs.get('skipna', False):
+ # Eagerly generate a proxy to make sure skipna is a valid argument
+ # for this aggregation method
+ _ = self._expr.proxy().aggregate(func, axis, *args, **kwargs)
+ kwargs.pop('skipna')
+ self.dropna().aggregate(func, axis, *args, **kwargs)
+
if isinstance(func, list) and len(func) > 1:
- # Aggregate each column separately, then stick them all together.
+ # level arg is ignored for multiple aggregations
+ _ = kwargs.pop('level', None)
+
+ # Aggregate with each method separately, then stick them all together.
rows = [self.agg([f], *args, **kwargs) for f in func]
return frame_base.DeferredFrame.wrap(
expressions.ComputedExpression(
'join_aggregate',
lambda *rows: pd.concat(rows), [row._expr for row in rows]))
else:
- # We're only handling a single column.
+ # We're only handling a single column. It could be 'func' or ['func'],
+ # which produce different results. 'func' produces a scalar, ['func']
+ # produces a single element Series.
base_func = func[0] if isinstance(func, list) else func
- if _is_associative(base_func) and not args and not kwargs:
+
+ if (_is_numeric(base_func) and
+ not pd.core.dtypes.common.is_numeric_dtype(self.dtype)):
+ warnings.warn(
+ f"Performing a numeric aggregation, {base_func!r}, on "
+ f"Series {self._expr.proxy().name!r} with non-numeric type "
+ f"{self.dtype!r}. This can result in runtime errors or surprising "
+ "results.")
+
+ if 'level' in kwargs:
+ # Defer to groupby.agg for level= mode
+ return self.groupby(
+ level=kwargs.pop('level'), axis=axis).agg(func, *args, **kwargs)
+
+ requires_singleton = False
+ if 'min_count' in kwargs:
+ # Eagerly generate a proxy to make sure min_count is a valid argument
+ # for this aggregation method
+ _ = self._expr.proxy().agg(func, axis, *args, **kwargs)
Review comment:
I see, that makes sense, and is very thoughtful to keep the same
exception from Pandas.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]