yeandy commented on a change in pull request #16706:
URL: https://github.com/apache/beam/pull/16706#discussion_r798061719
##########
File path: sdks/python/apache_beam/dataframe/frames.py
##########
@@ -4017,7 +4076,8 @@ def do_partition_apply(df):
by=grouping_columns or None)
gb = project(gb)
- return gb.apply(func, *args, **kwargs)
+
Review comment:
nit: extra space
##########
File path: sdks/python/apache_beam/dataframe/frames.py
##########
@@ -4117,8 +4172,15 @@ def apply_fn(df):
@property # type: ignore
@frame_base.with_docs_from(DataFrameGroupBy)
def dtypes(self):
- grouping_columns = self._grouping_columns
- return self.apply(lambda df: df.drop(grouping_columns, axis=1).dtypes)
+ return frame_base.DeferredFrame.wrap(
+ expressions.ComputedExpression(
+ 'dtypes',
+ lambda gb: gb.dtypes,
+ [self._expr],
+ requires_partition_by=partitionings.Arbitrary(),
+ preserves_partition_by=partitionings.Arbitrary()
+ )
+ )
Review comment:
Nice
##########
File path: sdks/python/apache_beam/dataframe/frames.py
##########
@@ -3975,7 +3973,19 @@ def apply(self, func, *args, **kwargs):
object of the same type as what will be returned when the pipeline is
processing actual data. If the result is a pandas object it should have the
same type and name (for a Series) or column types and names (for
- a DataFrame) as the actual results."""
+ a DataFrame) as the actual results.
+
+ Note that in pandas, ``apply`` attempts to detect if the index is
unmodified
+ in ``func`` (indicating ``func`` is a transform) and drops the duplicate
+ index in the output. To determine this, pandas tests the indexes for
+ equality. However, Beam cannot do this since it is sensitive to the input
+ data, instead this implementation tests if the indexes are equivalent
Review comment:
```suggestion
data; instead this implementation tests if the indexes are equivalent
```
##########
File path: sdks/python/apache_beam/dataframe/frames.py
##########
@@ -4017,7 +4076,8 @@ def do_partition_apply(df):
by=grouping_columns or None)
gb = project(gb)
- return gb.apply(func, *args, **kwargs)
+
Review comment:
On this topic, what's Beam's guidance on flexibility with Python
styling? I'm running formatting/linting on the commit hooks; they don't seem
too strict or anything, and I don't want to focus on this too much. I suppose
everyone will always impart some of his personalities to the code over time. 😄
##########
File path: sdks/python/apache_beam/dataframe/frames.py
##########
@@ -3986,29 +3996,78 @@ def apply(self, func, *args, **kwargs):
fn_input = project(self._ungrouped_with_index.proxy().reset_index(
grouping_columns, drop=True))
result = func(fn_input)
- if isinstance(result, pd.core.generic.NDFrame):
- if result.index is fn_input.index:
- proxy = result
+ def index_to_arrays(index):
+ return [index.get_level_values(level)
+ for level in range(index.nlevels)]
+
+
+ # By default do_apply will just call pandas apply()
+ # We override it below if necessary
+ do_apply = lambda gb: gb.apply(func, *args, **kwargs)
+
+ if (isinstance(result, pd.core.generic.NDFrame) and
+ result.index is fn_input.index):
+ # Special case where apply fn is a transform
+ # Note we trust that if the user fn produces a proxy with the identical
+ # index, it will produce results with identical indexes at execution
+ # time too
+ proxy = result
+ elif isinstance(result, pd.DataFrame):
+ # apply fn is not a transform, we need to make sure the original index
+ # values are prepended to the result's index
+ proxy = result[:0]
+
+ # First adjust proxy
+ proxy.index = pd.MultiIndex.from_arrays(
+ index_to_arrays(self._ungrouped.proxy().index) +
+ index_to_arrays(proxy.index),
+ names=self._ungrouped.proxy().index.names + proxy.index.names)
+
+
+ # Then override do_apply function
+ new_index_names = self._ungrouped.proxy().index.names
+ if len(new_index_names) > 1:
+ def add_key_index(key, df):
+ # df is a dataframe or Series representing the result of func for
+ # a single key
+ # key is a tuple with the MultiIndex values for this key
+ df.index = pd.MultiIndex.from_arrays(
+ [[key[i]] * len(df) for i in range(len(new_index_names))] +
+ index_to_arrays(df.index),
+ names=new_index_names + df.index.names)
+ return df
else:
- proxy = result[:0]
-
- def index_to_arrays(index):
- return [index.get_level_values(level)
- for level in range(index.nlevels)]
-
- # The final result will have the grouped indexes + the indexes from the
- # result
- proxy.index = pd.MultiIndex.from_arrays(
- index_to_arrays(self._ungrouped.proxy().index) +
- index_to_arrays(proxy.index),
- names=self._ungrouped.proxy().index.names + proxy.index.names)
+ def add_key_index(key, df):
+ # df is a dataframe or Series representing the result of func for
+ # a single key
+ df.index = pd.MultiIndex.from_arrays(
+ [[key] * len(df)] + index_to_arrays(df.index),
+ names=new_index_names + df.index.names)
+ return df
+
+
+ do_apply = lambda gb: pd.concat([
+ add_key_index(k, func(gb.get_group(k), *args, **kwargs))
+ for k in gb.groups.keys()])
+ elif isinstance(result, pd.Series):
+ if isinstance(fn_input, pd.DataFrame):
+ # DataFrameGroupBy
+ dtype = pd.Series([result]).dtype
+ proxy = pd.DataFrame(columns=result.index,
+ dtype=result.dtype,
+ index=self._ungrouped.proxy().index)
+ elif isinstance(fn_input, pd.Series):
+ # SeriesGroupBy
+ proxy = pd.Series(dtype=result.dtype,
+ name=result.name,
+ index=index_to_arrays(self._ungrouped.proxy().index)
+
+ index_to_arrays(result[:0].index))
Review comment:
Can you help me better understand the logic under `elif
isinstance(result, pd.Series):`, for both the if `isinstance(fn_input,
pd.DataFrame):` and `elif isinstance(fn_input, pd.Series):` cases?
##########
File path: sdks/python/apache_beam/dataframe/frames.py
##########
@@ -3986,29 +3996,78 @@ def apply(self, func, *args, **kwargs):
fn_input = project(self._ungrouped_with_index.proxy().reset_index(
grouping_columns, drop=True))
result = func(fn_input)
- if isinstance(result, pd.core.generic.NDFrame):
- if result.index is fn_input.index:
- proxy = result
+ def index_to_arrays(index):
+ return [index.get_level_values(level)
+ for level in range(index.nlevels)]
+
+
+ # By default do_apply will just call pandas apply()
+ # We override it below if necessary
+ do_apply = lambda gb: gb.apply(func, *args, **kwargs)
+
+ if (isinstance(result, pd.core.generic.NDFrame) and
+ result.index is fn_input.index):
+ # Special case where apply fn is a transform
+ # Note we trust that if the user fn produces a proxy with the identical
+ # index, it will produce results with identical indexes at execution
+ # time too
+ proxy = result
+ elif isinstance(result, pd.DataFrame):
+ # apply fn is not a transform, we need to make sure the original index
+ # values are prepended to the result's index
+ proxy = result[:0]
+
+ # First adjust proxy
+ proxy.index = pd.MultiIndex.from_arrays(
+ index_to_arrays(self._ungrouped.proxy().index) +
+ index_to_arrays(proxy.index),
+ names=self._ungrouped.proxy().index.names + proxy.index.names)
+
+
+ # Then override do_apply function
+ new_index_names = self._ungrouped.proxy().index.names
+ if len(new_index_names) > 1:
+ def add_key_index(key, df):
+ # df is a dataframe or Series representing the result of func for
+ # a single key
+ # key is a tuple with the MultiIndex values for this key
+ df.index = pd.MultiIndex.from_arrays(
+ [[key[i]] * len(df) for i in range(len(new_index_names))] +
+ index_to_arrays(df.index),
+ names=new_index_names + df.index.names)
+ return df
else:
- proxy = result[:0]
-
- def index_to_arrays(index):
- return [index.get_level_values(level)
- for level in range(index.nlevels)]
-
- # The final result will have the grouped indexes + the indexes from the
- # result
- proxy.index = pd.MultiIndex.from_arrays(
- index_to_arrays(self._ungrouped.proxy().index) +
- index_to_arrays(proxy.index),
- names=self._ungrouped.proxy().index.names + proxy.index.names)
+ def add_key_index(key, df):
+ # df is a dataframe or Series representing the result of func for
+ # a single key
+ df.index = pd.MultiIndex.from_arrays(
+ [[key] * len(df)] + index_to_arrays(df.index),
+ names=new_index_names + df.index.names)
+ return df
+
+
+ do_apply = lambda gb: pd.concat([
+ add_key_index(k, func(gb.get_group(k), *args, **kwargs))
+ for k in gb.groups.keys()])
+ elif isinstance(result, pd.Series):
+ if isinstance(fn_input, pd.DataFrame):
+ # DataFrameGroupBy
+ dtype = pd.Series([result]).dtype
+ proxy = pd.DataFrame(columns=result.index,
+ dtype=result.dtype,
+ index=self._ungrouped.proxy().index)
Review comment:
if the `fn_input` is a `pd.DataFrame`, we still want proxy to be of type
`pd.DataFrame` even though the result is `pd.Series`?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]