[
https://issues.apache.org/jira/browse/BEAM-12017?focusedWorklogId=598200&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-598200
]
ASF GitHub Bot logged work on BEAM-12017:
-----------------------------------------
Author: ASF GitHub Bot
Created on: 17/May/21 18:50
Start Date: 17/May/21 18:50
Worklog Time Spent: 10m
Work Description: TheNeuralBit commented on a change in pull request
#14656:
URL: https://github.com/apache/beam/pull/14656#discussion_r633780616
##########
File path: sdks/python/apache_beam/dataframe/frames_test.py
##########
@@ -263,6 +263,39 @@ def test_where_concrete_args(self):
}), axis=1),
df)
+ def test_combine_dataframe(self):
+ with expressions.allow_non_parallel_operations():
+ df = pd.DataFrame({'A': [0, 0], 'B': [4, 4]})
+ df2 = pd.DataFrame({'A': [1, 1], 'B': [3, 3]})
+ take_smaller = lambda s1, s2: s1 if s1.sum() < s2.sum() else s2
+ self._run_test(lambda df, df2: df.combine(df2, take_smaller), df, df2)
Review comment:
```suggestion
df = pd.DataFrame({'A': [0, 0], 'B': [4, 4]})
df2 = pd.DataFrame({'A': [1, 1], 'B': [3, 3]})
take_smaller = lambda s1, s2: s1 if s1.sum() < s2.sum() else s2
self._run_test(lambda df, df2: df.combine(df2, take_smaller), df, df2,
nonparallel=True)
```
I recently added this option, `nonparallel` which makes _run_test do some
additional verification (including checking that a `reason` was specified).
Please use that instead of the allow_non_parallel_operations block
##########
File path: sdks/python/apache_beam/dataframe/frames_test.py
##########
@@ -263,6 +263,39 @@ def test_where_concrete_args(self):
}), axis=1),
df)
+ def test_combine_dataframe(self):
+ with expressions.allow_non_parallel_operations():
+ df = pd.DataFrame({'A': [0, 0], 'B': [4, 4]})
+ df2 = pd.DataFrame({'A': [1, 1], 'B': [3, 3]})
+ take_smaller = lambda s1, s2: s1 if s1.sum() < s2.sum() else s2
+ self._run_test(lambda df, df2: df.combine(df2, take_smaller), df, df2)
+ df1 = pd.DataFrame({'A': [0, 0], 'B': [None, 4]})
+ df2 = pd.DataFrame({'A': [1, 1], 'B': [3, 3]})
+ self._run_test(lambda df1, df2: df1.combine(df2, np.minimum), df1, df2)
+ df1 = pd.DataFrame({'A': [0, 0], 'B': [None, 4]})
+ df2 = pd.DataFrame({'A': [1, 1], 'B': [3, 3]})
+
+ self._run_test(
+ lambda df1,
+ df2: df1.combine(df2, take_smaller, fill_value=-5),
+ df1,
+ df2)
+
+ def test_combine_Series(self):
+ with expressions.allow_non_parallel_operations():
+ s1 = pd.Series({'falcon': 330.0, 'eagle': 160.0})
+ s2 = pd.Series({'falcon': 345.0, 'eagle': 200.0, 'duck': 30.0})
+ self._run_test(lambda s1, s2: s1.combine(s2, max), s1, s2)
+
+ def test_combine_first(self):
+ df1 = pd.DataFrame({'A': [None, 0], 'B': [None, 4]})
+ df2 = pd.DataFrame({'A': [1, 1], 'B': [3, 3]})
+
+ s1 = pd.Series([1, np.nan])
+ s2 = pd.Series([3, 4])
+ self._run_test(lambda s1, s2: s1.combine_first(s2), s1, s2)
+ self._run_test(lambda df1, df2: df1.combine_first(df2), df1, df2)
Review comment:
Similarly here
##########
File path: sdks/python/apache_beam/dataframe/frames_test.py
##########
@@ -263,6 +263,39 @@ def test_where_concrete_args(self):
}), axis=1),
df)
+ def test_combine_dataframe(self):
+ with expressions.allow_non_parallel_operations():
+ df = pd.DataFrame({'A': [0, 0], 'B': [4, 4]})
+ df2 = pd.DataFrame({'A': [1, 1], 'B': [3, 3]})
+ take_smaller = lambda s1, s2: s1 if s1.sum() < s2.sum() else s2
+ self._run_test(lambda df, df2: df.combine(df2, take_smaller), df, df2)
+ df1 = pd.DataFrame({'A': [0, 0], 'B': [None, 4]})
+ df2 = pd.DataFrame({'A': [1, 1], 'B': [3, 3]})
+ self._run_test(lambda df1, df2: df1.combine(df2, np.minimum), df1, df2)
+ df1 = pd.DataFrame({'A': [0, 0], 'B': [None, 4]})
+ df2 = pd.DataFrame({'A': [1, 1], 'B': [3, 3]})
+
+ self._run_test(
+ lambda df1,
+ df2: df1.combine(df2, take_smaller, fill_value=-5),
+ df1,
+ df2)
Review comment:
Since each call to `_run_test` is using a different input, could you
break them out into separate test methods?
##########
File path: sdks/python/apache_beam/dataframe/frames.py
##########
@@ -564,6 +564,14 @@ def dtype(self):
return self._expr.proxy().dtype
isin = frame_base._elementwise_method('isin', base=pd.DataFrame)
+ combine_first = frame_base._elementwise_method(
+ 'combine_first', base=pd.DataFrame)
+
+ combine = frame_base._proxy_method(
+ 'combine',
+ base=pd.DataFrame,
+ requires_partition_by=expressions.partitionings.Singleton(),
Review comment:
We recently added the ability for Singleton() requirements to have a
"reason" specified, that describes why a particular operation is not
parallelizable. The reason will be included in error messages so users have
some insight. Please add one here, something like:
```suggestion
requires_partition_by=expressions.partitionings.Singleton(
reason="combine() is not parallelizable because func might operate
on the full dataset."),
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
Issue Time Tracking
-------------------
Worklog Id: (was: 598200)
Time Spent: 4h 40m (was: 4.5h)
> Implement combine, combine_first for DataFrame and Series
> ---------------------------------------------------------
>
> Key: BEAM-12017
> URL: https://issues.apache.org/jira/browse/BEAM-12017
> Project: Beam
> Issue Type: Improvement
> Components: sdk-py-core
> Reporter: Brian Hulette
> Assignee: Irwin Alejandro Rodirguez Ramirez
> Priority: P3
> Labels: dataframe-api
> Time Spent: 4h 40m
> Remaining Estimate: 0h
>
> Add an implementation for
> [combine|https://pandas.pydata.org/docs/reference/api/pandas.DataFrame.combine.html]
> and combine_first that works for DeferredDataFrame and DeferredSeries, and
> is fully unit tested with some combination of pandas_doctests_test.py and
> frames_test.py.
> https://github.com/apache/beam/pull/14274 is an example of a typical PR that
> adds new operations.
--
This message was sent by Atlassian Jira
(v8.3.4#803005)