[
https://issues.apache.org/jira/browse/BEAM-13605?focusedWorklogId=719763&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-719763
]
ASF GitHub Bot logged work on BEAM-13605:
-----------------------------------------
Author: ASF GitHub Bot
Created on: 02/Feb/22 23:16
Start Date: 02/Feb/22 23:16
Worklog Time Spent: 10m
Work Description: yeandy commented on a change in pull request #16706:
URL: https://github.com/apache/beam/pull/16706#discussion_r798061719
##########
File path: sdks/python/apache_beam/dataframe/frames.py
##########
@@ -4017,7 +4076,8 @@ def do_partition_apply(df):
by=grouping_columns or None)
gb = project(gb)
- return gb.apply(func, *args, **kwargs)
+
Review comment:
nit: extra space
##########
File path: sdks/python/apache_beam/dataframe/frames.py
##########
@@ -4117,8 +4172,15 @@ def apply_fn(df):
@property # type: ignore
@frame_base.with_docs_from(DataFrameGroupBy)
def dtypes(self):
- grouping_columns = self._grouping_columns
- return self.apply(lambda df: df.drop(grouping_columns, axis=1).dtypes)
+ return frame_base.DeferredFrame.wrap(
+ expressions.ComputedExpression(
+ 'dtypes',
+ lambda gb: gb.dtypes,
+ [self._expr],
+ requires_partition_by=partitionings.Arbitrary(),
+ preserves_partition_by=partitionings.Arbitrary()
+ )
+ )
Review comment:
Nice
##########
File path: sdks/python/apache_beam/dataframe/frames.py
##########
@@ -3975,7 +3973,19 @@ def apply(self, func, *args, **kwargs):
object of the same type as what will be returned when the pipeline is
processing actual data. If the result is a pandas object it should have the
same type and name (for a Series) or column types and names (for
- a DataFrame) as the actual results."""
+ a DataFrame) as the actual results.
+
+ Note that in pandas, ``apply`` attempts to detect if the index is
unmodified
+ in ``func`` (indicating ``func`` is a transform) and drops the duplicate
+ index in the output. To determine this, pandas tests the indexes for
+ equality. However, Beam cannot do this since it is sensitive to the input
+ data, instead this implementation tests if the indexes are equivalent
Review comment:
```suggestion
data; instead this implementation tests if the indexes are equivalent
```
##########
File path: sdks/python/apache_beam/dataframe/frames.py
##########
@@ -4017,7 +4076,8 @@ def do_partition_apply(df):
by=grouping_columns or None)
gb = project(gb)
- return gb.apply(func, *args, **kwargs)
+
Review comment:
On this topic, what's Beam's guidance on flexibility with Python
styling? I'm running formatting/linting on the commit hooks; they don't seem
too strict or anything, and I don't want to focus on this too much. I suppose
everyone will always impart some of his personalities to the code over time. 😄
##########
File path: sdks/python/apache_beam/dataframe/frames.py
##########
@@ -3986,29 +3996,78 @@ def apply(self, func, *args, **kwargs):
fn_input = project(self._ungrouped_with_index.proxy().reset_index(
grouping_columns, drop=True))
result = func(fn_input)
- if isinstance(result, pd.core.generic.NDFrame):
- if result.index is fn_input.index:
- proxy = result
+ def index_to_arrays(index):
+ return [index.get_level_values(level)
+ for level in range(index.nlevels)]
+
+
+ # By default do_apply will just call pandas apply()
+ # We override it below if necessary
+ do_apply = lambda gb: gb.apply(func, *args, **kwargs)
+
+ if (isinstance(result, pd.core.generic.NDFrame) and
+ result.index is fn_input.index):
+ # Special case where apply fn is a transform
+ # Note we trust that if the user fn produces a proxy with the identical
+ # index, it will produce results with identical indexes at execution
+ # time too
+ proxy = result
+ elif isinstance(result, pd.DataFrame):
+ # apply fn is not a transform, we need to make sure the original index
+ # values are prepended to the result's index
+ proxy = result[:0]
+
+ # First adjust proxy
+ proxy.index = pd.MultiIndex.from_arrays(
+ index_to_arrays(self._ungrouped.proxy().index) +
+ index_to_arrays(proxy.index),
+ names=self._ungrouped.proxy().index.names + proxy.index.names)
+
+
+ # Then override do_apply function
+ new_index_names = self._ungrouped.proxy().index.names
+ if len(new_index_names) > 1:
+ def add_key_index(key, df):
+ # df is a dataframe or Series representing the result of func for
+ # a single key
+ # key is a tuple with the MultiIndex values for this key
+ df.index = pd.MultiIndex.from_arrays(
+ [[key[i]] * len(df) for i in range(len(new_index_names))] +
+ index_to_arrays(df.index),
+ names=new_index_names + df.index.names)
+ return df
else:
- proxy = result[:0]
-
- def index_to_arrays(index):
- return [index.get_level_values(level)
- for level in range(index.nlevels)]
-
- # The final result will have the grouped indexes + the indexes from the
- # result
- proxy.index = pd.MultiIndex.from_arrays(
- index_to_arrays(self._ungrouped.proxy().index) +
- index_to_arrays(proxy.index),
- names=self._ungrouped.proxy().index.names + proxy.index.names)
+ def add_key_index(key, df):
+ # df is a dataframe or Series representing the result of func for
+ # a single key
+ df.index = pd.MultiIndex.from_arrays(
+ [[key] * len(df)] + index_to_arrays(df.index),
+ names=new_index_names + df.index.names)
+ return df
+
+
+ do_apply = lambda gb: pd.concat([
+ add_key_index(k, func(gb.get_group(k), *args, **kwargs))
+ for k in gb.groups.keys()])
+ elif isinstance(result, pd.Series):
+ if isinstance(fn_input, pd.DataFrame):
+ # DataFrameGroupBy
+ dtype = pd.Series([result]).dtype
+ proxy = pd.DataFrame(columns=result.index,
+ dtype=result.dtype,
+ index=self._ungrouped.proxy().index)
+ elif isinstance(fn_input, pd.Series):
+ # SeriesGroupBy
+ proxy = pd.Series(dtype=result.dtype,
+ name=result.name,
+ index=index_to_arrays(self._ungrouped.proxy().index)
+
+ index_to_arrays(result[:0].index))
Review comment:
Can you help me better understand the logic under `elif
isinstance(result, pd.Series):`, for both the if `isinstance(fn_input,
pd.DataFrame):` and `elif isinstance(fn_input, pd.Series):` cases?
##########
File path: sdks/python/apache_beam/dataframe/frames.py
##########
@@ -3986,29 +3996,78 @@ def apply(self, func, *args, **kwargs):
fn_input = project(self._ungrouped_with_index.proxy().reset_index(
grouping_columns, drop=True))
result = func(fn_input)
- if isinstance(result, pd.core.generic.NDFrame):
- if result.index is fn_input.index:
- proxy = result
+ def index_to_arrays(index):
+ return [index.get_level_values(level)
+ for level in range(index.nlevels)]
+
+
+ # By default do_apply will just call pandas apply()
+ # We override it below if necessary
+ do_apply = lambda gb: gb.apply(func, *args, **kwargs)
+
+ if (isinstance(result, pd.core.generic.NDFrame) and
+ result.index is fn_input.index):
+ # Special case where apply fn is a transform
+ # Note we trust that if the user fn produces a proxy with the identical
+ # index, it will produce results with identical indexes at execution
+ # time too
+ proxy = result
+ elif isinstance(result, pd.DataFrame):
+ # apply fn is not a transform, we need to make sure the original index
+ # values are prepended to the result's index
+ proxy = result[:0]
+
+ # First adjust proxy
+ proxy.index = pd.MultiIndex.from_arrays(
+ index_to_arrays(self._ungrouped.proxy().index) +
+ index_to_arrays(proxy.index),
+ names=self._ungrouped.proxy().index.names + proxy.index.names)
+
+
+ # Then override do_apply function
+ new_index_names = self._ungrouped.proxy().index.names
+ if len(new_index_names) > 1:
+ def add_key_index(key, df):
+ # df is a dataframe or Series representing the result of func for
+ # a single key
+ # key is a tuple with the MultiIndex values for this key
+ df.index = pd.MultiIndex.from_arrays(
+ [[key[i]] * len(df) for i in range(len(new_index_names))] +
+ index_to_arrays(df.index),
+ names=new_index_names + df.index.names)
+ return df
else:
- proxy = result[:0]
-
- def index_to_arrays(index):
- return [index.get_level_values(level)
- for level in range(index.nlevels)]
-
- # The final result will have the grouped indexes + the indexes from the
- # result
- proxy.index = pd.MultiIndex.from_arrays(
- index_to_arrays(self._ungrouped.proxy().index) +
- index_to_arrays(proxy.index),
- names=self._ungrouped.proxy().index.names + proxy.index.names)
+ def add_key_index(key, df):
+ # df is a dataframe or Series representing the result of func for
+ # a single key
+ df.index = pd.MultiIndex.from_arrays(
+ [[key] * len(df)] + index_to_arrays(df.index),
+ names=new_index_names + df.index.names)
+ return df
+
+
+ do_apply = lambda gb: pd.concat([
+ add_key_index(k, func(gb.get_group(k), *args, **kwargs))
+ for k in gb.groups.keys()])
+ elif isinstance(result, pd.Series):
+ if isinstance(fn_input, pd.DataFrame):
+ # DataFrameGroupBy
+ dtype = pd.Series([result]).dtype
+ proxy = pd.DataFrame(columns=result.index,
+ dtype=result.dtype,
+ index=self._ungrouped.proxy().index)
Review comment:
if the `fn_input` is a `pd.DataFrame`, we still want proxy to be of type
`pd.DataFrame` even though the result is `pd.Series`?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
Issue Time Tracking
-------------------
Worklog Id: (was: 719763)
Time Spent: 5h 40m (was: 5.5h)
> Support pandas 1.4.0 in the DataFrame API
> -----------------------------------------
>
> Key: BEAM-13605
> URL: https://issues.apache.org/jira/browse/BEAM-13605
> Project: Beam
> Issue Type: Improvement
> Components: dsl-dataframe
> Reporter: Brian Hulette
> Assignee: Andy Ye
> Priority: P2
> Time Spent: 5h 40m
> Remaining Estimate: 0h
>
> 1.4.0rc1 is out now, we should verify it works with the DataFrame API, then
> increase the version range to allow it.
--
This message was sent by Atlassian Jira
(v8.20.1#820001)