mohamedawnallah commented on code in PR #33672:
URL: https://github.com/apache/beam/pull/33672#discussion_r1938475137
##########
sdks/python/apache_beam/dataframe/frames.py:
##########
@@ -4696,6 +4698,43 @@ def _check_str_or_np_builtin(agg_func, func_list):
getattr(agg_func, '__name__', None) in func_list
and agg_func.__module__ in ('numpy', 'builtins'))
+def _agg_with_no_function(expr, **kwargs):
+ if not kwargs:
+ raise ValueError("No aggregation functions specified")
+
+ # Handle dictionary-like input for aggregation.
+ result_columns, result_frames = [], []
+ for col_name, (input_col, agg_fn) in kwargs.items():
+ if not callable(agg_fn) and not isinstance(agg_fn, str):
+ raise ValueError(
+ f"Invalid aggregation function for column {col_name}: {agg_fn!r}"
+ )
+
+ # Create a ComputedExpression for each column aggregation and wrap it in
+ # a DeferredDataFrame.
+ result_columns.append(col_name)
+ result_frames.append(
+ DeferredDataFrame(
+ expressions.ComputedExpression(
+ f"agg_{col_name}",
+ lambda gb, ic=input_col, af=agg_fn: gb[ic].agg(af),
Review Comment:
> why this logic doesn't require branching like the one we have above:
That's a good observation! Although this is handled on the aggregation side
here:
```python
lambda gb, ic=input_col, af=agg_fn: gb[ic].agg(af)
```
where `gb` is of type `pandas.core.groupby.generic.DataFrameGroupBy`.
Applying the same branching logic as in Beam DataFrame's `DeferredGroupBy.agg`
would help ensure consistency in results. 👍
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]