This is an automated email from the ASF dual-hosted git repository.
ueshin pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push:
new c6a2021 [SPARK-36399][PYTHON] Implement DataFrame.combine_first
c6a2021 is described below
commit c6a2021fec5bab9069fbfba33f75d4415ea76e99
Author: Xinrong Meng <[email protected]>
AuthorDate: Tue Aug 31 19:50:16 2021 -0700
[SPARK-36399][PYTHON] Implement DataFrame.combine_first
### What changes were proposed in this pull request?
Implement `DataFrame.combine_first`.
The PR is based on https://github.com/databricks/koalas/pull/1950. Thanks
AishwaryaKalloli for the prototype.
### Why are the changes needed?
Update null elements with value in the same location in another is a common
use case.
That is supported in pandas. We should support that as well.
### Does this PR introduce _any_ user-facing change?
Yes. `DataFrame.combine_first` can be used.
```py
>>> ps.set_option("compute.ops_on_diff_frames", True)
>>> df1 = ps.DataFrame({'A': [None, 0], 'B': [None, 4]})
>>> df2 = ps.DataFrame({'A': [1, 1], 'B': [3, 3]})
>>> df1.combine_first(df2).sort_index()
A B
0 1.0 3.0
1 0.0 4.0
# Null values still persist if the location of that null value does not
exist in other
>>> df1 = ps.DataFrame({'A': [None, 0], 'B': [4, None]})
>>> df2 = ps.DataFrame({'B': [3, 3], 'C': [1, 1]}, index=[1, 2])
>>> df1.combine_first(df2).sort_index()
A B C
0 NaN 4.0 NaN
1 0.0 3.0 1.0
2 NaN 3.0 1.0
>>> ps.reset_option("compute.ops_on_diff_frames")
```
### How was this patch tested?
Unit tests.
Closes #33714 from xinrong-databricks/df_combine_first.
Authored-by: Xinrong Meng <[email protected]>
Signed-off-by: Takuya UESHIN <[email protected]>
---
.../docs/source/reference/pyspark.pandas/frame.rst | 1 +
python/pyspark/pandas/frame.py | 83 ++++++++++++++++++++++
python/pyspark/pandas/missing/frame.py | 1 -
python/pyspark/pandas/tests/test_dataframe.py | 16 +++++
.../pandas/tests/test_ops_on_diff_frames.py | 67 +++++++++++------
python/pyspark/pandas/tests/test_series.py | 18 +++++
6 files changed, 164 insertions(+), 22 deletions(-)
diff --git a/python/docs/source/reference/pyspark.pandas/frame.rst
b/python/docs/source/reference/pyspark.pandas/frame.rst
index dbeee65..01b83ec 100644
--- a/python/docs/source/reference/pyspark.pandas/frame.rst
+++ b/python/docs/source/reference/pyspark.pandas/frame.rst
@@ -100,6 +100,7 @@ Binary operator functions
DataFrame.ne
DataFrame.eq
DataFrame.dot
+ DataFrame.combine_first
Function application, GroupBy & Window
--------------------------------------
diff --git a/python/pyspark/pandas/frame.py b/python/pyspark/pandas/frame.py
index 48be138..5ef71aa 100644
--- a/python/pyspark/pandas/frame.py
+++ b/python/pyspark/pandas/frame.py
@@ -7897,6 +7897,89 @@ defaultdict(<class 'list'>, {'col..., 'col...})]
)
return join_psdf.reset_index() if need_set_index else join_psdf
+ def combine_first(self, other: "DataFrame") -> "DataFrame":
+ """
+ Update null elements with value in the same location in `other`.
+
+ Combine two DataFrame objects by filling null values in one DataFrame
+ with non-null values from other DataFrame. The row and column indexes
+ of the resulting DataFrame will be the union of the two.
+
+ Parameters
+ ----------
+ other : DataFrame
+ Provided DataFrame to use to fill null values.
+
+ Returns
+ -------
+ DataFrame
+
+ Examples
+ --------
+ >>> ps.set_option("compute.ops_on_diff_frames", True)
+ >>> df1 = ps.DataFrame({'A': [None, 0], 'B': [None, 4]})
+ >>> df2 = ps.DataFrame({'A': [1, 1], 'B': [3, 3]})
+
+ >>> df1.combine_first(df2).sort_index()
+ A B
+ 0 1.0 3.0
+ 1 0.0 4.0
+
+ Null values still persist if the location of that null value does not
exist in other
+
+ >>> df1 = ps.DataFrame({'A': [None, 0], 'B': [4, None]})
+ >>> df2 = ps.DataFrame({'B': [3, 3], 'C': [1, 1]}, index=[1, 2])
+
+ >>> df1.combine_first(df2).sort_index()
+ A B C
+ 0 NaN 4.0 NaN
+ 1 0.0 3.0 1.0
+ 2 NaN 3.0 1.0
+ >>> ps.reset_option("compute.ops_on_diff_frames")
+ """
+ if not isinstance(other, DataFrame):
+ raise TypeError("`combine_first` only allows `DataFrame` for
parameter `other`")
+ if same_anchor(self, other):
+ combined = self
+ this = self
+ that = other
+ else:
+ combined = combine_frames(self, other)
+ this = combined["this"]
+ that = combined["that"]
+
+ intersect_column_labels =
set(self._internal.column_labels).intersection(
+ set(other._internal.column_labels)
+ )
+
+ column_labels, data_spark_columns = [], []
+ for column_label in this._internal.column_labels:
+ this_scol = this._internal.spark_column_for(column_label)
+ if column_label in intersect_column_labels:
+ that_scol = that._internal.spark_column_for(column_label)
+ this_scol_name =
this._internal.spark_column_name_for(column_label)
+ combined_scol = (
+ F.when(this_scol.isNull(),
that_scol).otherwise(this_scol).alias(this_scol_name)
+ )
+ data_spark_columns.append(combined_scol)
+ else:
+ data_spark_columns.append(this_scol)
+ column_labels.append(column_label)
+
+ for column_label in that._internal.column_labels:
+ if column_label not in intersect_column_labels:
+ that_scol = that._internal.spark_column_for(column_label)
+ data_spark_columns.append(that_scol)
+ column_labels.append(column_label)
+
+ internal = combined._internal.copy(
+ column_labels=column_labels,
+ data_spark_columns=data_spark_columns,
+ data_fields=None, # TODO: dtype?
+ column_label_names=self._internal.column_label_names,
+ )
+ return DataFrame(internal)
+
def append(
self,
other: "DataFrame",
diff --git a/python/pyspark/pandas/missing/frame.py
b/python/pyspark/pandas/missing/frame.py
index 3082a2a..f3cc92f 100644
--- a/python/pyspark/pandas/missing/frame.py
+++ b/python/pyspark/pandas/missing/frame.py
@@ -40,7 +40,6 @@ class _MissingPandasLikeDataFrame(object):
asof = _unsupported_function("asof")
boxplot = _unsupported_function("boxplot")
combine = _unsupported_function("combine")
- combine_first = _unsupported_function("combine_first")
compare = _unsupported_function("compare")
convert_dtypes = _unsupported_function("convert_dtypes")
corrwith = _unsupported_function("corrwith")
diff --git a/python/pyspark/pandas/tests/test_dataframe.py
b/python/pyspark/pandas/tests/test_dataframe.py
index 5fe1165..d6f0a58 100644
--- a/python/pyspark/pandas/tests/test_dataframe.py
+++ b/python/pyspark/pandas/tests/test_dataframe.py
@@ -5726,6 +5726,22 @@ class DataFrameTest(PandasOnSparkTestCase, SQLTestUtils):
for value_psdf, value_pdf in zip(psdf, pdf):
self.assert_eq(value_psdf, value_pdf)
+ def test_combine_first(self):
+ pdf = pd.DataFrame(
+ {("X", "A"): [None, 0], ("X", "B"): [4, None], ("Y", "C"): [3, 3],
("Y", "B"): [1, 1]}
+ )
+ pdf1, pdf2 = pdf["X"], pdf["Y"]
+ psdf = ps.from_pandas(pdf)
+ psdf1, psdf2 = psdf["X"], psdf["Y"]
+
+ if LooseVersion(pd.__version__) >= LooseVersion("1.2.0"):
+ self.assert_eq(pdf1.combine_first(pdf2),
psdf1.combine_first(psdf2))
+ else:
+ # pandas < 1.2.0 returns unexpected dtypes,
+ # please refer to
https://github.com/pandas-dev/pandas/issues/28481 for details
+ expected_pdf = pd.DataFrame({"A": [None, 0], "B": [4.0, 1.0], "C":
[3, 3]})
+ self.assert_eq(expected_pdf, psdf1.combine_first(psdf2))
+
if __name__ == "__main__":
from pyspark.pandas.tests.test_dataframe import * # noqa: F401
diff --git a/python/pyspark/pandas/tests/test_ops_on_diff_frames.py
b/python/pyspark/pandas/tests/test_ops_on_diff_frames.py
index 12e87b2..f7fa3e8 100644
--- a/python/pyspark/pandas/tests/test_ops_on_diff_frames.py
+++ b/python/pyspark/pandas/tests/test_ops_on_diff_frames.py
@@ -654,30 +654,35 @@ class OpsOnDiffFramesEnabledTest(PandasOnSparkTestCase,
SQLTestUtils):
psser1.combine_first(psser2).sort_index(),
pser1.combine_first(pser2).sort_index()
)
- # Series come from same DataFrame
- pdf = pd.DataFrame(
- {
- "A": {"falcon": 330.0, "eagle": 160.0},
- "B": {"falcon": 345.0, "eagle": 200.0, "duck": 30.0},
- }
- )
- pser1 = pdf.A
- pser2 = pdf.B
- psser1 = ps.from_pandas(pser1)
- psser2 = ps.from_pandas(pser2)
+ # DataFrame
+ pdf1 = pd.DataFrame({"A": [None, 0], "B": [4, None]})
+ psdf1 = ps.from_pandas(pdf1)
+ pdf2 = pd.DataFrame({"C": [3, 3], "B": [1, 1]})
+ psdf2 = ps.from_pandas(pdf2)
- self.assert_eq(
- psser1.combine_first(psser2).sort_index(),
pser1.combine_first(pser2).sort_index()
- )
+ if LooseVersion(pd.__version__) >= LooseVersion("1.2.0"):
+ self.assert_eq(pdf1.combine_first(pdf2),
psdf1.combine_first(psdf2).sort_index())
+ else:
+ # pandas < 1.2.0 returns unexpected dtypes,
+ # please refer to
https://github.com/pandas-dev/pandas/issues/28481 for details
+ expected_pdf = pd.DataFrame({"A": [None, 0], "B": [4.0, 1.0], "C":
[3, 3]})
+ self.assert_eq(expected_pdf,
psdf1.combine_first(psdf2).sort_index())
- psser1.name = ("X", "A")
- psser2.name = ("Y", "B")
- pser1.name = ("X", "A")
- pser2.name = ("Y", "B")
+ pdf1.columns = pd.MultiIndex.from_tuples([("A", "willow"), ("B",
"pine")])
+ psdf1 = ps.from_pandas(pdf1)
+ pdf2.columns = pd.MultiIndex.from_tuples([("C", "oak"), ("B", "pine")])
+ psdf2 = ps.from_pandas(pdf2)
- self.assert_eq(
- psser1.combine_first(psser2).sort_index(),
pser1.combine_first(pser2).sort_index()
- )
+ if LooseVersion(pd.__version__) >= LooseVersion("1.2.0"):
+ self.assert_eq(pdf1.combine_first(pdf2),
psdf1.combine_first(psdf2).sort_index())
+ else:
+ # pandas < 1.2.0 returns unexpected dtypes,
+ # please refer to
https://github.com/pandas-dev/pandas/issues/28481 for details
+ expected_pdf = pd.DataFrame({"A": [None, 0], "B": [4.0, 1.0], "C":
[3, 3]})
+ expected_pdf.columns = pd.MultiIndex.from_tuples(
+ [("A", "willow"), ("B", "pine"), ("C", "oak")]
+ )
+ self.assert_eq(expected_pdf,
psdf1.combine_first(psdf2).sort_index())
def test_insert(self):
#
@@ -1955,6 +1960,26 @@ class OpsOnDiffFramesDisabledTest(PandasOnSparkTestCase,
SQLTestUtils):
with self.assertRaisesRegex(ValueError, "Cannot combine the series or
dataframe"):
psser.rpow(psser_other)
+ def test_combine_first(self):
+ pdf1 = pd.DataFrame({"A": [None, 0], "B": [4, None]})
+ psdf1 = ps.from_pandas(pdf1)
+
+ self.assertRaises(TypeError, lambda: psdf1.combine_first(ps.Series([1,
2])))
+
+ pser1 = pd.Series({"falcon": 330.0, "eagle": 160.0})
+ pser2 = pd.Series({"falcon": 345.0, "eagle": 200.0, "duck": 30.0})
+ psser1 = ps.from_pandas(pser1)
+ psser2 = ps.from_pandas(pser2)
+ with self.assertRaisesRegex(ValueError, "Cannot combine the series or
dataframe"):
+ psser1.combine_first(psser2)
+
+ pdf1 = pd.DataFrame({"A": [None, 0], "B": [4, None]})
+ psdf1 = ps.from_pandas(pdf1)
+ pdf2 = pd.DataFrame({"C": [3, 3], "B": [1, 1]})
+ psdf2 = ps.from_pandas(pdf2)
+ with self.assertRaisesRegex(ValueError, "Cannot combine the series or
dataframe"):
+ psdf1.combine_first(psdf2)
+
if __name__ == "__main__":
from pyspark.pandas.tests.test_ops_on_diff_frames import * # noqa: F401
diff --git a/python/pyspark/pandas/tests/test_series.py
b/python/pyspark/pandas/tests/test_series.py
index 58c87ed..a861dd0 100644
--- a/python/pyspark/pandas/tests/test_series.py
+++ b/python/pyspark/pandas/tests/test_series.py
@@ -2887,6 +2887,24 @@ class SeriesTest(PandasOnSparkTestCase, SQLTestUtils):
psser.at_time("0:20").sort_index(),
)
+ def test_combine_first(self):
+ pdf = pd.DataFrame(
+ {
+ "A": {"falcon": 330.0, "eagle": 160.0},
+ "B": {"falcon": 345.0, "eagle": 200.0, "duck": 30.0},
+ }
+ )
+ pser1, pser2 = pdf.A, pdf.B
+ psdf = ps.from_pandas(pdf)
+ psser1, psser2 = psdf.A, psdf.B
+
+ self.assert_eq(psser1.combine_first(psser2),
pser1.combine_first(pser2))
+
+ psser1.name = pser1.name = ("X", "A")
+ psser2.name = pser2.name = ("Y", "B")
+
+ self.assert_eq(psser1.combine_first(psser2),
pser1.combine_first(pser2))
+
if __name__ == "__main__":
from pyspark.pandas.tests.test_series import * # noqa: F401
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]