This is an automated email from the ASF dual-hosted git repository.
bhulette pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push:
new cec16b6 [BEAM-11532] Fix edge case in merge where left_on and
right_on contain equivalent column names (#13634)
cec16b6 is described below
commit cec16b662d4e8b9eb757b3782cb9c49bb62e576a
Author: Brian Hulette <[email protected]>
AuthorDate: Wed Dec 30 12:57:03 2020 -0800
[BEAM-11532] Fix edge case in merge where left_on and right_on contain
equivalent column names (#13634)
* Fix merge with same name column
* Fix error when merging with implicit on
* Remove unnecessary py3 check
* Drop column prior to merge
---
sdks/python/apache_beam/dataframe/frames.py | 26 ++++++--
sdks/python/apache_beam/dataframe/frames_test.py | 78 +++++++++++++++++++++++-
2 files changed, 98 insertions(+), 6 deletions(-)
diff --git a/sdks/python/apache_beam/dataframe/frames.py
b/sdks/python/apache_beam/dataframe/frames.py
index 442768b..be7b933 100644
--- a/sdks/python/apache_beam/dataframe/frames.py
+++ b/sdks/python/apache_beam/dataframe/frames.py
@@ -1182,6 +1182,7 @@ class DeferredDataFrame(DeferredDataFrameOrSeries):
right_on,
left_index,
right_index,
+ suffixes,
**kwargs):
self_proxy = self._expr.proxy()
right_proxy = right._expr.proxy()
@@ -1195,14 +1196,14 @@ class DeferredDataFrame(DeferredDataFrameOrSeries):
right_index=right_index,
**kwargs)
if not any([on, left_on, right_on, left_index, right_index]):
- on = [col for col in self_proxy.columns() if col in
right_proxy.columns()]
+ on = [col for col in self_proxy.columns if col in right_proxy.columns]
if not left_on:
left_on = on
- elif not isinstance(left_on, list):
+ if left_on and not isinstance(left_on, list):
left_on = [left_on]
if not right_on:
right_on = on
- elif not isinstance(right_on, list):
+ if right_on and not isinstance(right_on, list):
right_on = [right_on]
if left_index:
@@ -1215,11 +1216,25 @@ class DeferredDataFrame(DeferredDataFrameOrSeries):
else:
indexed_right = right.set_index(right_on, drop=False)
+ if left_on and right_on:
+ common_cols = set(left_on).intersection(right_on)
+ if len(common_cols):
+ # When merging on the same column name from both dfs, we need to make
+ # sure only one df has the column. Otherwise we end up with
+ # two duplicate columns, one with lsuffix and one with rsuffix.
+ # It's safe to drop from either because the data has already been duped
+ # to the index.
+ indexed_right = indexed_right.drop(columns=common_cols)
+
+
merged = frame_base.DeferredFrame.wrap(
expressions.ComputedExpression(
'merge',
- lambda left, right: left.merge(
- right, left_index=True, right_index=True, **kwargs),
+ lambda left, right: left.merge(right,
+ left_index=True,
+ right_index=True,
+ suffixes=suffixes,
+ **kwargs),
[indexed_left._expr, indexed_right._expr],
preserves_partition_by=partitionings.Singleton(),
requires_partition_by=partitionings.Index()))
@@ -1227,6 +1242,7 @@ class DeferredDataFrame(DeferredDataFrameOrSeries):
if left_index or right_index:
return merged
else:
+
return merged.reset_index(drop=True)
@frame_base.args_to_kwargs(pd.DataFrame)
diff --git a/sdks/python/apache_beam/dataframe/frames_test.py
b/sdks/python/apache_beam/dataframe/frames_test.py
index 18799cb..4cff34e 100644
--- a/sdks/python/apache_beam/dataframe/frames_test.py
+++ b/sdks/python/apache_beam/dataframe/frames_test.py
@@ -124,7 +124,6 @@ class DeferredFrameTest(unittest.TestCase):
self._run_test(lambda df: df.groupby(['second', 'A']).sum(), df)
- @unittest.skipIf(sys.version_info <= (3, ), 'differing signature')
def test_merge(self):
# This is from the pandas doctests, but fails due to re-indexing being
# order-sensitive.
@@ -152,6 +151,83 @@ class DeferredFrameTest(unittest.TestCase):
df1,
df2)
+ def test_merge_on_index(self):
+ # This is from the pandas doctests, but fails due to re-indexing being
+ # order-sensitive.
+ df1 = pd.DataFrame({
+ 'lkey': ['foo', 'bar', 'baz', 'foo'], 'value': [1, 2, 3, 5]
+ }).set_index('lkey')
+ df2 = pd.DataFrame({
+ 'rkey': ['foo', 'bar', 'baz', 'foo'], 'value': [5, 6, 7, 8]
+ }).set_index('rkey')
+ with beam.dataframe.allow_non_parallel_operations():
+ self._run_test(
+ lambda df1,
+ df2: df1.merge(df2, left_index=True, right_index=True).sort_values(
+ ['value_x', 'value_y']),
+ df1,
+ df2)
+
+ def test_merge_same_key(self):
+ df1 = pd.DataFrame({
+ 'key': ['foo', 'bar', 'baz', 'foo'], 'value': [1, 2, 3, 5]
+ })
+ df2 = pd.DataFrame({
+ 'key': ['foo', 'bar', 'baz', 'foo'], 'value': [5, 6, 7, 8]
+ })
+ with beam.dataframe.allow_non_parallel_operations():
+ self._run_test(
+ lambda df1,
+ df2: df1.merge(df2, on='key').rename(index=lambda x:
'*').sort_values(
+ ['value_x', 'value_y']),
+ df1,
+ df2)
+ self._run_test(
+ lambda df1,
+ df2: df1.merge(df2, on='key', suffixes=('_left', '_right')).rename(
+ index=lambda x: '*').sort_values(['value_left', 'value_right']),
+ df1,
+ df2)
+
+ def test_merge_same_key_doctest(self):
+ df1 = pd.DataFrame({'a': ['foo', 'bar'], 'b': [1, 2]})
+ df2 = pd.DataFrame({'a': ['foo', 'baz'], 'c': [3, 4]})
+
+ with beam.dataframe.allow_non_parallel_operations():
+ self._run_test(
+ lambda df1,
+ df2: df1.merge(df2, how='left', on='a').rename(index=lambda x: '*').
+ sort_values(['b', 'c']),
+ df1,
+ df2)
+ # Test without specifying 'on'
+ self._run_test(
+ lambda df1,
+ df2: df1.merge(df2, how='left').rename(index=lambda x: '*').
+ sort_values(['b', 'c']),
+ df1,
+ df2)
+
+ def test_merge_same_key_suffix_collision(self):
+ df1 = pd.DataFrame({'a': ['foo', 'bar'], 'b': [1, 2], 'a_lsuffix': [5, 6]})
+ df2 = pd.DataFrame({'a': ['foo', 'baz'], 'c': [3, 4], 'a_rsuffix': [7, 8]})
+
+ with beam.dataframe.allow_non_parallel_operations():
+ self._run_test(
+ lambda df1,
+ df2: df1.merge(
+ df2, how='left', on='a', suffixes=('_lsuffix', '_rsuffix')).
+ rename(index=lambda x: '*').sort_values(['b', 'c']),
+ df1,
+ df2)
+ # Test without specifying 'on'
+ self._run_test(
+ lambda df1,
+ df2: df1.merge(df2, how='left', suffixes=('_lsuffix', '_rsuffix')).
+ rename(index=lambda x: '*').sort_values(['b', 'c']),
+ df1,
+ df2)
+
def test_series_getitem(self):
s = pd.Series([x**2 for x in range(10)])
self._run_test(lambda s: s[...], s)