TheNeuralBit commented on a change in pull request #14656:
URL: https://github.com/apache/beam/pull/14656#discussion_r633780616
##########
File path: sdks/python/apache_beam/dataframe/frames_test.py
##########
@@ -263,6 +263,39 @@ def test_where_concrete_args(self):
}), axis=1),
df)
+ def test_combine_dataframe(self):
+ with expressions.allow_non_parallel_operations():
+ df = pd.DataFrame({'A': [0, 0], 'B': [4, 4]})
+ df2 = pd.DataFrame({'A': [1, 1], 'B': [3, 3]})
+ take_smaller = lambda s1, s2: s1 if s1.sum() < s2.sum() else s2
+ self._run_test(lambda df, df2: df.combine(df2, take_smaller), df, df2)
Review comment:
```suggestion
df = pd.DataFrame({'A': [0, 0], 'B': [4, 4]})
df2 = pd.DataFrame({'A': [1, 1], 'B': [3, 3]})
take_smaller = lambda s1, s2: s1 if s1.sum() < s2.sum() else s2
self._run_test(lambda df, df2: df.combine(df2, take_smaller), df, df2,
nonparallel=True)
```
I recently added this option, `nonparallel` which makes _run_test do some
additional verification (including checking that a `reason` was specified).
Please use that instead of the allow_non_parallel_operations block
##########
File path: sdks/python/apache_beam/dataframe/frames_test.py
##########
@@ -263,6 +263,39 @@ def test_where_concrete_args(self):
}), axis=1),
df)
+ def test_combine_dataframe(self):
+ with expressions.allow_non_parallel_operations():
+ df = pd.DataFrame({'A': [0, 0], 'B': [4, 4]})
+ df2 = pd.DataFrame({'A': [1, 1], 'B': [3, 3]})
+ take_smaller = lambda s1, s2: s1 if s1.sum() < s2.sum() else s2
+ self._run_test(lambda df, df2: df.combine(df2, take_smaller), df, df2)
+ df1 = pd.DataFrame({'A': [0, 0], 'B': [None, 4]})
+ df2 = pd.DataFrame({'A': [1, 1], 'B': [3, 3]})
+ self._run_test(lambda df1, df2: df1.combine(df2, np.minimum), df1, df2)
+ df1 = pd.DataFrame({'A': [0, 0], 'B': [None, 4]})
+ df2 = pd.DataFrame({'A': [1, 1], 'B': [3, 3]})
+
+ self._run_test(
+ lambda df1,
+ df2: df1.combine(df2, take_smaller, fill_value=-5),
+ df1,
+ df2)
+
+ def test_combine_Series(self):
+ with expressions.allow_non_parallel_operations():
+ s1 = pd.Series({'falcon': 330.0, 'eagle': 160.0})
+ s2 = pd.Series({'falcon': 345.0, 'eagle': 200.0, 'duck': 30.0})
+ self._run_test(lambda s1, s2: s1.combine(s2, max), s1, s2)
+
+ def test_combine_first(self):
+ df1 = pd.DataFrame({'A': [None, 0], 'B': [None, 4]})
+ df2 = pd.DataFrame({'A': [1, 1], 'B': [3, 3]})
+
+ s1 = pd.Series([1, np.nan])
+ s2 = pd.Series([3, 4])
+ self._run_test(lambda s1, s2: s1.combine_first(s2), s1, s2)
+ self._run_test(lambda df1, df2: df1.combine_first(df2), df1, df2)
Review comment:
Similarly here
##########
File path: sdks/python/apache_beam/dataframe/frames_test.py
##########
@@ -263,6 +263,39 @@ def test_where_concrete_args(self):
}), axis=1),
df)
+ def test_combine_dataframe(self):
+ with expressions.allow_non_parallel_operations():
+ df = pd.DataFrame({'A': [0, 0], 'B': [4, 4]})
+ df2 = pd.DataFrame({'A': [1, 1], 'B': [3, 3]})
+ take_smaller = lambda s1, s2: s1 if s1.sum() < s2.sum() else s2
+ self._run_test(lambda df, df2: df.combine(df2, take_smaller), df, df2)
+ df1 = pd.DataFrame({'A': [0, 0], 'B': [None, 4]})
+ df2 = pd.DataFrame({'A': [1, 1], 'B': [3, 3]})
+ self._run_test(lambda df1, df2: df1.combine(df2, np.minimum), df1, df2)
+ df1 = pd.DataFrame({'A': [0, 0], 'B': [None, 4]})
+ df2 = pd.DataFrame({'A': [1, 1], 'B': [3, 3]})
+
+ self._run_test(
+ lambda df1,
+ df2: df1.combine(df2, take_smaller, fill_value=-5),
+ df1,
+ df2)
Review comment:
Since each call to `_run_test` is using a different input, could you
break them out into separate test methods?
##########
File path: sdks/python/apache_beam/dataframe/frames.py
##########
@@ -564,6 +564,14 @@ def dtype(self):
return self._expr.proxy().dtype
isin = frame_base._elementwise_method('isin', base=pd.DataFrame)
+ combine_first = frame_base._elementwise_method(
+ 'combine_first', base=pd.DataFrame)
+
+ combine = frame_base._proxy_method(
+ 'combine',
+ base=pd.DataFrame,
+ requires_partition_by=expressions.partitionings.Singleton(),
Review comment:
We recently added the ability for Singleton() requirements to have a
"reason" specified, that describes why a particular operation is not
parallelizable. The reason will be included in error messages so users have
some insight. Please add one here, something like:
```suggestion
requires_partition_by=expressions.partitionings.Singleton(
reason="combine() is not parallelizable because func might operate
on the full dataset."),
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]