This is an automated email from the ASF dual-hosted git repository.

bhulette pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new cec16b6  [BEAM-11532] Fix edge case in merge where left_on and 
right_on contain equivalent column names (#13634)
cec16b6 is described below

commit cec16b662d4e8b9eb757b3782cb9c49bb62e576a
Author: Brian Hulette <[email protected]>
AuthorDate: Wed Dec 30 12:57:03 2020 -0800

    [BEAM-11532] Fix edge case in merge where left_on and right_on contain 
equivalent column names (#13634)
    
    * Fix merge with same name column
    
    * Fix error when merging with implicit on
    
    * Remove unnecessary py3 check
    
    * Drop column prior to merge
---
 sdks/python/apache_beam/dataframe/frames.py      | 26 ++++++--
 sdks/python/apache_beam/dataframe/frames_test.py | 78 +++++++++++++++++++++++-
 2 files changed, 98 insertions(+), 6 deletions(-)

diff --git a/sdks/python/apache_beam/dataframe/frames.py 
b/sdks/python/apache_beam/dataframe/frames.py
index 442768b..be7b933 100644
--- a/sdks/python/apache_beam/dataframe/frames.py
+++ b/sdks/python/apache_beam/dataframe/frames.py
@@ -1182,6 +1182,7 @@ class DeferredDataFrame(DeferredDataFrameOrSeries):
       right_on,
       left_index,
       right_index,
+      suffixes,
       **kwargs):
     self_proxy = self._expr.proxy()
     right_proxy = right._expr.proxy()
@@ -1195,14 +1196,14 @@ class DeferredDataFrame(DeferredDataFrameOrSeries):
         right_index=right_index,
         **kwargs)
     if not any([on, left_on, right_on, left_index, right_index]):
-      on = [col for col in self_proxy.columns() if col in 
right_proxy.columns()]
+      on = [col for col in self_proxy.columns if col in right_proxy.columns]
     if not left_on:
       left_on = on
-    elif not isinstance(left_on, list):
+    if left_on and not isinstance(left_on, list):
       left_on = [left_on]
     if not right_on:
       right_on = on
-    elif not isinstance(right_on, list):
+    if right_on and not isinstance(right_on, list):
       right_on = [right_on]
 
     if left_index:
@@ -1215,11 +1216,25 @@ class DeferredDataFrame(DeferredDataFrameOrSeries):
     else:
       indexed_right = right.set_index(right_on, drop=False)
 
+    if left_on and right_on:
+      common_cols = set(left_on).intersection(right_on)
+      if len(common_cols):
+        # When merging on the same column name from both dfs, we need to make
+        # sure only one df has the column. Otherwise we end up with
+        # two duplicate columns, one with lsuffix and one with rsuffix.
+        # It's safe to drop from either because the data has already been duped
+        # to the index.
+        indexed_right = indexed_right.drop(columns=common_cols)
+
+
     merged = frame_base.DeferredFrame.wrap(
         expressions.ComputedExpression(
             'merge',
-            lambda left, right: left.merge(
-                right, left_index=True, right_index=True, **kwargs),
+            lambda left, right: left.merge(right,
+                                           left_index=True,
+                                           right_index=True,
+                                           suffixes=suffixes,
+                                           **kwargs),
             [indexed_left._expr, indexed_right._expr],
             preserves_partition_by=partitionings.Singleton(),
             requires_partition_by=partitionings.Index()))
@@ -1227,6 +1242,7 @@ class DeferredDataFrame(DeferredDataFrameOrSeries):
     if left_index or right_index:
       return merged
     else:
+
       return merged.reset_index(drop=True)
 
   @frame_base.args_to_kwargs(pd.DataFrame)
diff --git a/sdks/python/apache_beam/dataframe/frames_test.py 
b/sdks/python/apache_beam/dataframe/frames_test.py
index 18799cb..4cff34e 100644
--- a/sdks/python/apache_beam/dataframe/frames_test.py
+++ b/sdks/python/apache_beam/dataframe/frames_test.py
@@ -124,7 +124,6 @@ class DeferredFrameTest(unittest.TestCase):
 
     self._run_test(lambda df: df.groupby(['second', 'A']).sum(), df)
 
-  @unittest.skipIf(sys.version_info <= (3, ), 'differing signature')
   def test_merge(self):
     # This is from the pandas doctests, but fails due to re-indexing being
     # order-sensitive.
@@ -152,6 +151,83 @@ class DeferredFrameTest(unittest.TestCase):
           df1,
           df2)
 
+  def test_merge_on_index(self):
+    # This is from the pandas doctests, but fails due to re-indexing being
+    # order-sensitive.
+    df1 = pd.DataFrame({
+        'lkey': ['foo', 'bar', 'baz', 'foo'], 'value': [1, 2, 3, 5]
+    }).set_index('lkey')
+    df2 = pd.DataFrame({
+        'rkey': ['foo', 'bar', 'baz', 'foo'], 'value': [5, 6, 7, 8]
+    }).set_index('rkey')
+    with beam.dataframe.allow_non_parallel_operations():
+      self._run_test(
+          lambda df1,
+          df2: df1.merge(df2, left_index=True, right_index=True).sort_values(
+              ['value_x', 'value_y']),
+          df1,
+          df2)
+
+  def test_merge_same_key(self):
+    df1 = pd.DataFrame({
+        'key': ['foo', 'bar', 'baz', 'foo'], 'value': [1, 2, 3, 5]
+    })
+    df2 = pd.DataFrame({
+        'key': ['foo', 'bar', 'baz', 'foo'], 'value': [5, 6, 7, 8]
+    })
+    with beam.dataframe.allow_non_parallel_operations():
+      self._run_test(
+          lambda df1,
+          df2: df1.merge(df2, on='key').rename(index=lambda x: 
'*').sort_values(
+              ['value_x', 'value_y']),
+          df1,
+          df2)
+      self._run_test(
+          lambda df1,
+          df2: df1.merge(df2, on='key', suffixes=('_left', '_right')).rename(
+              index=lambda x: '*').sort_values(['value_left', 'value_right']),
+          df1,
+          df2)
+
+  def test_merge_same_key_doctest(self):
+    df1 = pd.DataFrame({'a': ['foo', 'bar'], 'b': [1, 2]})
+    df2 = pd.DataFrame({'a': ['foo', 'baz'], 'c': [3, 4]})
+
+    with beam.dataframe.allow_non_parallel_operations():
+      self._run_test(
+          lambda df1,
+          df2: df1.merge(df2, how='left', on='a').rename(index=lambda x: '*').
+          sort_values(['b', 'c']),
+          df1,
+          df2)
+      # Test without specifying 'on'
+      self._run_test(
+          lambda df1,
+          df2: df1.merge(df2, how='left').rename(index=lambda x: '*').
+          sort_values(['b', 'c']),
+          df1,
+          df2)
+
+  def test_merge_same_key_suffix_collision(self):
+    df1 = pd.DataFrame({'a': ['foo', 'bar'], 'b': [1, 2], 'a_lsuffix': [5, 6]})
+    df2 = pd.DataFrame({'a': ['foo', 'baz'], 'c': [3, 4], 'a_rsuffix': [7, 8]})
+
+    with beam.dataframe.allow_non_parallel_operations():
+      self._run_test(
+          lambda df1,
+          df2: df1.merge(
+              df2, how='left', on='a', suffixes=('_lsuffix', '_rsuffix')).
+          rename(index=lambda x: '*').sort_values(['b', 'c']),
+          df1,
+          df2)
+      # Test without specifying 'on'
+      self._run_test(
+          lambda df1,
+          df2: df1.merge(df2, how='left', suffixes=('_lsuffix', '_rsuffix')).
+          rename(index=lambda x: '*').sort_values(['b', 'c']),
+          df1,
+          df2)
+
   def test_series_getitem(self):
     s = pd.Series([x**2 for x in range(10)])
     self._run_test(lambda s: s[...], s)

Reply via email to