This is an automated email from the ASF dual-hosted git repository.

ueshin pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new c6a2021  [SPARK-36399][PYTHON] Implement DataFrame.combine_first
c6a2021 is described below

commit c6a2021fec5bab9069fbfba33f75d4415ea76e99
Author: Xinrong Meng <[email protected]>
AuthorDate: Tue Aug 31 19:50:16 2021 -0700

    [SPARK-36399][PYTHON] Implement DataFrame.combine_first
    
    ### What changes were proposed in this pull request?
    Implement `DataFrame.combine_first`.
    
    The PR is based on https://github.com/databricks/koalas/pull/1950. Thanks 
AishwaryaKalloli for the prototype.
    
    ### Why are the changes needed?
    Update null elements with value in the same location in another is a common 
use case.
    That is supported in pandas. We should support that as well.
    
    ### Does this PR introduce _any_ user-facing change?
    Yes. `DataFrame.combine_first` can be used.
    
    ```py
    >>> ps.set_option("compute.ops_on_diff_frames", True)
    >>> df1 = ps.DataFrame({'A': [None, 0], 'B': [None, 4]})
    >>> df2 = ps.DataFrame({'A': [1, 1], 'B': [3, 3]})
    >>> df1.combine_first(df2).sort_index()
         A    B
    0  1.0  3.0
    1  0.0  4.0
    
    # Null values still persist if the location of that null value does not 
exist in other
    
    >>> df1 = ps.DataFrame({'A': [None, 0], 'B': [4, None]})
    >>> df2 = ps.DataFrame({'B': [3, 3], 'C': [1, 1]}, index=[1, 2])
    >>> df1.combine_first(df2).sort_index()
         A    B    C
    0  NaN  4.0  NaN
    1  0.0  3.0  1.0
    2  NaN  3.0  1.0
    >>> ps.reset_option("compute.ops_on_diff_frames")
    ```
    
    ### How was this patch tested?
    Unit tests.
    
    Closes #33714 from xinrong-databricks/df_combine_first.
    
    Authored-by: Xinrong Meng <[email protected]>
    Signed-off-by: Takuya UESHIN <[email protected]>
---
 .../docs/source/reference/pyspark.pandas/frame.rst |  1 +
 python/pyspark/pandas/frame.py                     | 83 ++++++++++++++++++++++
 python/pyspark/pandas/missing/frame.py             |  1 -
 python/pyspark/pandas/tests/test_dataframe.py      | 16 +++++
 .../pandas/tests/test_ops_on_diff_frames.py        | 67 +++++++++++------
 python/pyspark/pandas/tests/test_series.py         | 18 +++++
 6 files changed, 164 insertions(+), 22 deletions(-)

diff --git a/python/docs/source/reference/pyspark.pandas/frame.rst 
b/python/docs/source/reference/pyspark.pandas/frame.rst
index dbeee65..01b83ec 100644
--- a/python/docs/source/reference/pyspark.pandas/frame.rst
+++ b/python/docs/source/reference/pyspark.pandas/frame.rst
@@ -100,6 +100,7 @@ Binary operator functions
    DataFrame.ne
    DataFrame.eq
    DataFrame.dot
+   DataFrame.combine_first
 
 Function application, GroupBy & Window
 --------------------------------------
diff --git a/python/pyspark/pandas/frame.py b/python/pyspark/pandas/frame.py
index 48be138..5ef71aa 100644
--- a/python/pyspark/pandas/frame.py
+++ b/python/pyspark/pandas/frame.py
@@ -7897,6 +7897,89 @@ defaultdict(<class 'list'>, {'col..., 'col...})]
         )
         return join_psdf.reset_index() if need_set_index else join_psdf
 
+    def combine_first(self, other: "DataFrame") -> "DataFrame":
+        """
+        Update null elements with value in the same location in `other`.
+
+        Combine two DataFrame objects by filling null values in one DataFrame
+        with non-null values from other DataFrame. The row and column indexes
+        of the resulting DataFrame will be the union of the two.
+
+        Parameters
+        ----------
+        other : DataFrame
+            Provided DataFrame to use to fill null values.
+
+        Returns
+        -------
+        DataFrame
+
+        Examples
+        --------
+        >>> ps.set_option("compute.ops_on_diff_frames", True)
+        >>> df1 = ps.DataFrame({'A': [None, 0], 'B': [None, 4]})
+        >>> df2 = ps.DataFrame({'A': [1, 1], 'B': [3, 3]})
+
+        >>> df1.combine_first(df2).sort_index()
+             A    B
+        0  1.0  3.0
+        1  0.0  4.0
+
+        Null values still persist if the location of that null value does not 
exist in other
+
+        >>> df1 = ps.DataFrame({'A': [None, 0], 'B': [4, None]})
+        >>> df2 = ps.DataFrame({'B': [3, 3], 'C': [1, 1]}, index=[1, 2])
+
+        >>> df1.combine_first(df2).sort_index()
+             A    B    C
+        0  NaN  4.0  NaN
+        1  0.0  3.0  1.0
+        2  NaN  3.0  1.0
+        >>> ps.reset_option("compute.ops_on_diff_frames")
+        """
+        if not isinstance(other, DataFrame):
+            raise TypeError("`combine_first` only allows `DataFrame` for 
parameter `other`")
+        if same_anchor(self, other):
+            combined = self
+            this = self
+            that = other
+        else:
+            combined = combine_frames(self, other)
+            this = combined["this"]
+            that = combined["that"]
+
+        intersect_column_labels = 
set(self._internal.column_labels).intersection(
+            set(other._internal.column_labels)
+        )
+
+        column_labels, data_spark_columns = [], []
+        for column_label in this._internal.column_labels:
+            this_scol = this._internal.spark_column_for(column_label)
+            if column_label in intersect_column_labels:
+                that_scol = that._internal.spark_column_for(column_label)
+                this_scol_name = 
this._internal.spark_column_name_for(column_label)
+                combined_scol = (
+                    F.when(this_scol.isNull(), 
that_scol).otherwise(this_scol).alias(this_scol_name)
+                )
+                data_spark_columns.append(combined_scol)
+            else:
+                data_spark_columns.append(this_scol)
+            column_labels.append(column_label)
+
+        for column_label in that._internal.column_labels:
+            if column_label not in intersect_column_labels:
+                that_scol = that._internal.spark_column_for(column_label)
+                data_spark_columns.append(that_scol)
+                column_labels.append(column_label)
+
+        internal = combined._internal.copy(
+            column_labels=column_labels,
+            data_spark_columns=data_spark_columns,
+            data_fields=None,  # TODO: dtype?
+            column_label_names=self._internal.column_label_names,
+        )
+        return DataFrame(internal)
+
     def append(
         self,
         other: "DataFrame",
diff --git a/python/pyspark/pandas/missing/frame.py 
b/python/pyspark/pandas/missing/frame.py
index 3082a2a..f3cc92f 100644
--- a/python/pyspark/pandas/missing/frame.py
+++ b/python/pyspark/pandas/missing/frame.py
@@ -40,7 +40,6 @@ class _MissingPandasLikeDataFrame(object):
     asof = _unsupported_function("asof")
     boxplot = _unsupported_function("boxplot")
     combine = _unsupported_function("combine")
-    combine_first = _unsupported_function("combine_first")
     compare = _unsupported_function("compare")
     convert_dtypes = _unsupported_function("convert_dtypes")
     corrwith = _unsupported_function("corrwith")
diff --git a/python/pyspark/pandas/tests/test_dataframe.py 
b/python/pyspark/pandas/tests/test_dataframe.py
index 5fe1165..d6f0a58 100644
--- a/python/pyspark/pandas/tests/test_dataframe.py
+++ b/python/pyspark/pandas/tests/test_dataframe.py
@@ -5726,6 +5726,22 @@ class DataFrameTest(PandasOnSparkTestCase, SQLTestUtils):
         for value_psdf, value_pdf in zip(psdf, pdf):
             self.assert_eq(value_psdf, value_pdf)
 
+    def test_combine_first(self):
+        pdf = pd.DataFrame(
+            {("X", "A"): [None, 0], ("X", "B"): [4, None], ("Y", "C"): [3, 3], 
("Y", "B"): [1, 1]}
+        )
+        pdf1, pdf2 = pdf["X"], pdf["Y"]
+        psdf = ps.from_pandas(pdf)
+        psdf1, psdf2 = psdf["X"], psdf["Y"]
+
+        if LooseVersion(pd.__version__) >= LooseVersion("1.2.0"):
+            self.assert_eq(pdf1.combine_first(pdf2), 
psdf1.combine_first(psdf2))
+        else:
+            # pandas < 1.2.0 returns unexpected dtypes,
+            # please refer to 
https://github.com/pandas-dev/pandas/issues/28481 for details
+            expected_pdf = pd.DataFrame({"A": [None, 0], "B": [4.0, 1.0], "C": 
[3, 3]})
+            self.assert_eq(expected_pdf, psdf1.combine_first(psdf2))
+
 
 if __name__ == "__main__":
     from pyspark.pandas.tests.test_dataframe import *  # noqa: F401
diff --git a/python/pyspark/pandas/tests/test_ops_on_diff_frames.py 
b/python/pyspark/pandas/tests/test_ops_on_diff_frames.py
index 12e87b2..f7fa3e8 100644
--- a/python/pyspark/pandas/tests/test_ops_on_diff_frames.py
+++ b/python/pyspark/pandas/tests/test_ops_on_diff_frames.py
@@ -654,30 +654,35 @@ class OpsOnDiffFramesEnabledTest(PandasOnSparkTestCase, 
SQLTestUtils):
             psser1.combine_first(psser2).sort_index(), 
pser1.combine_first(pser2).sort_index()
         )
 
-        # Series come from same DataFrame
-        pdf = pd.DataFrame(
-            {
-                "A": {"falcon": 330.0, "eagle": 160.0},
-                "B": {"falcon": 345.0, "eagle": 200.0, "duck": 30.0},
-            }
-        )
-        pser1 = pdf.A
-        pser2 = pdf.B
-        psser1 = ps.from_pandas(pser1)
-        psser2 = ps.from_pandas(pser2)
+        # DataFrame
+        pdf1 = pd.DataFrame({"A": [None, 0], "B": [4, None]})
+        psdf1 = ps.from_pandas(pdf1)
+        pdf2 = pd.DataFrame({"C": [3, 3], "B": [1, 1]})
+        psdf2 = ps.from_pandas(pdf2)
 
-        self.assert_eq(
-            psser1.combine_first(psser2).sort_index(), 
pser1.combine_first(pser2).sort_index()
-        )
+        if LooseVersion(pd.__version__) >= LooseVersion("1.2.0"):
+            self.assert_eq(pdf1.combine_first(pdf2), 
psdf1.combine_first(psdf2).sort_index())
+        else:
+            # pandas < 1.2.0 returns unexpected dtypes,
+            # please refer to 
https://github.com/pandas-dev/pandas/issues/28481 for details
+            expected_pdf = pd.DataFrame({"A": [None, 0], "B": [4.0, 1.0], "C": 
[3, 3]})
+            self.assert_eq(expected_pdf, 
psdf1.combine_first(psdf2).sort_index())
 
-        psser1.name = ("X", "A")
-        psser2.name = ("Y", "B")
-        pser1.name = ("X", "A")
-        pser2.name = ("Y", "B")
+        pdf1.columns = pd.MultiIndex.from_tuples([("A", "willow"), ("B", 
"pine")])
+        psdf1 = ps.from_pandas(pdf1)
+        pdf2.columns = pd.MultiIndex.from_tuples([("C", "oak"), ("B", "pine")])
+        psdf2 = ps.from_pandas(pdf2)
 
-        self.assert_eq(
-            psser1.combine_first(psser2).sort_index(), 
pser1.combine_first(pser2).sort_index()
-        )
+        if LooseVersion(pd.__version__) >= LooseVersion("1.2.0"):
+            self.assert_eq(pdf1.combine_first(pdf2), 
psdf1.combine_first(psdf2).sort_index())
+        else:
+            # pandas < 1.2.0 returns unexpected dtypes,
+            # please refer to 
https://github.com/pandas-dev/pandas/issues/28481 for details
+            expected_pdf = pd.DataFrame({"A": [None, 0], "B": [4.0, 1.0], "C": 
[3, 3]})
+            expected_pdf.columns = pd.MultiIndex.from_tuples(
+                [("A", "willow"), ("B", "pine"), ("C", "oak")]
+            )
+            self.assert_eq(expected_pdf, 
psdf1.combine_first(psdf2).sort_index())
 
     def test_insert(self):
         #
@@ -1955,6 +1960,26 @@ class OpsOnDiffFramesDisabledTest(PandasOnSparkTestCase, 
SQLTestUtils):
         with self.assertRaisesRegex(ValueError, "Cannot combine the series or 
dataframe"):
             psser.rpow(psser_other)
 
+    def test_combine_first(self):
+        pdf1 = pd.DataFrame({"A": [None, 0], "B": [4, None]})
+        psdf1 = ps.from_pandas(pdf1)
+
+        self.assertRaises(TypeError, lambda: psdf1.combine_first(ps.Series([1, 
2])))
+
+        pser1 = pd.Series({"falcon": 330.0, "eagle": 160.0})
+        pser2 = pd.Series({"falcon": 345.0, "eagle": 200.0, "duck": 30.0})
+        psser1 = ps.from_pandas(pser1)
+        psser2 = ps.from_pandas(pser2)
+        with self.assertRaisesRegex(ValueError, "Cannot combine the series or 
dataframe"):
+            psser1.combine_first(psser2)
+
+        pdf1 = pd.DataFrame({"A": [None, 0], "B": [4, None]})
+        psdf1 = ps.from_pandas(pdf1)
+        pdf2 = pd.DataFrame({"C": [3, 3], "B": [1, 1]})
+        psdf2 = ps.from_pandas(pdf2)
+        with self.assertRaisesRegex(ValueError, "Cannot combine the series or 
dataframe"):
+            psdf1.combine_first(psdf2)
+
 
 if __name__ == "__main__":
     from pyspark.pandas.tests.test_ops_on_diff_frames import *  # noqa: F401
diff --git a/python/pyspark/pandas/tests/test_series.py 
b/python/pyspark/pandas/tests/test_series.py
index 58c87ed..a861dd0 100644
--- a/python/pyspark/pandas/tests/test_series.py
+++ b/python/pyspark/pandas/tests/test_series.py
@@ -2887,6 +2887,24 @@ class SeriesTest(PandasOnSparkTestCase, SQLTestUtils):
             psser.at_time("0:20").sort_index(),
         )
 
+    def test_combine_first(self):
+        pdf = pd.DataFrame(
+            {
+                "A": {"falcon": 330.0, "eagle": 160.0},
+                "B": {"falcon": 345.0, "eagle": 200.0, "duck": 30.0},
+            }
+        )
+        pser1, pser2 = pdf.A, pdf.B
+        psdf = ps.from_pandas(pdf)
+        psser1, psser2 = psdf.A, psdf.B
+
+        self.assert_eq(psser1.combine_first(psser2), 
pser1.combine_first(pser2))
+
+        psser1.name = pser1.name = ("X", "A")
+        psser2.name = pser2.name = ("Y", "B")
+
+        self.assert_eq(psser1.combine_first(psser2), 
pser1.combine_first(pser2))
+
 
 if __name__ == "__main__":
     from pyspark.pandas.tests.test_series import *  # noqa: F401

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to