[
https://issues.apache.org/jira/browse/BEAM-11628?focusedWorklogId=572072&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-572072
]
ASF GitHub Bot logged work on BEAM-11628:
-----------------------------------------
Author: ASF GitHub Bot
Created on: 25/Mar/21 17:45
Start Date: 25/Mar/21 17:45
Worklog Time Spent: 10m
Work Description: robertwb commented on a change in pull request #13843:
URL: https://github.com/apache/beam/pull/13843#discussion_r601708246
##########
File path: sdks/python/apache_beam/dataframe/frames.py
##########
@@ -1785,9 +1855,63 @@ def agg(self, fn):
return DeferredDataFrame(
expressions.ComputedExpression(
'agg',
- lambda df: df.agg(fn), [self._expr],
+ lambda gb: gb.agg(fn), [self._expr],
requires_partition_by=partitionings.Index(),
- preserves_partition_by=partitionings.Arbitrary()))
+ preserves_partition_by=partitionings.Singleton()))
+
+ def apply(self, fn, *args, **kwargs):
+ if self._grouping_columns and not self._projection:
+ grouping_columns = self._grouping_columns
+ def fn_wrapper(x, *args, **kwargs):
+ # TODO(BEAM-11710): Moving a column to an index and back is lossy
+ # since indexes dont support as many dtypes. We should keep the
original
+ # column in groupby() instead. We need it anyway in case the grouping
+ # column is projected, which is allowed.
+
+ # Move the columns back to columns
+ x = x.assign(**{col: x.index.get_level_values(col)
+ for col in grouping_columns})
+ x = x.droplevel(grouping_columns)
+ return fn(x, *args, **kwargs)
+ else:
+ fn_wrapper = fn
+
+ project = _maybe_project_func(self._projection)
+
+ # Unfortunately pandas does not execute fn to determine the right proxy.
+ # We run user fn on a proxy here to detect the return type and generate the
+ # proxy.
+ result = fn_wrapper(project(self._ungrouped_with_index.proxy()))
+ if isinstance(result, pd.core.generic.NDFrame):
+ proxy = result[:0]
+
+ def index_to_arrays(index):
+ return [index.get_level_values(level)
+ for level in range(index.nlevels)]
+
+ # The final result will have the grouped indexes + the indexes from the
+ # result
+ proxy.index = pd.MultiIndex.from_arrays(
+ index_to_arrays(self._ungrouped.proxy().index) +
+ index_to_arrays(proxy.index),
+ names=self._ungrouped.proxy().index.names + proxy.index.names)
+ else:
+ dtype = pd.Series([result]).dtype
Review comment:
So if I understand correctly, in this case Pandas returns a scalar, but
we want to treat it at a Series (for ease of later processing), right?
##########
File path: sdks/python/apache_beam/dataframe/frames_test.py
##########
@@ -315,6 +314,40 @@ def test_dataframe_drop_ignore_errors(self):
self._run_test(
lambda df: df.drop(index='cow', columns='small', errors='ignore'), df)
+ def test_groupby_apply(self):
+ df = GROUPBY_DF
+
+ def median_sum_fn(x):
+ return (x.foo + x.bar).median()
+
+ describe = lambda df: df.describe()
Review comment:
Why not use `DataFrame.describe`?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
Issue Time Tracking
-------------------
Worklog Id: (was: 572072)
Time Spent: 8h 20m (was: 8h 10m)
> Implement GroupBy.apply
> -----------------------
>
> Key: BEAM-11628
> URL: https://issues.apache.org/jira/browse/BEAM-11628
> Project: Beam
> Issue Type: Improvement
> Components: sdk-py-core
> Reporter: Brian Hulette
> Assignee: Brian Hulette
> Priority: P2
> Labels: dataframe-api
> Time Spent: 8h 20m
> Remaining Estimate: 0h
>
> https://pandas.pydata.org/pandas-docs/stable/reference/api/pandas.core.groupby.GroupBy.apply.html
> We can partition by key and then apply func on each key. It should be
> possible to create a proxy by passing a proxy to the function.
--
This message was sent by Atlassian Jira
(v8.3.4#803005)