This is an automated email from the ASF dual-hosted git repository.

gurwls223 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new 8269bcbe152 [SPARK-38907][PYTHON] Implement DataFrame.corrwith
8269bcbe152 is described below

commit 8269bcbe152da178c4c9bd8b3f745754c0a510d7
Author: Ruifeng Zheng <ruife...@apache.org>
AuthorDate: Wed Apr 27 10:13:16 2022 +0900

    [SPARK-38907][PYTHON] Implement DataFrame.corrwith
    
    ### What changes were proposed in this pull request?
    implement DataFrame.corrwith
    
    - parameters `axis`,`numeric_only` are not supported,
    - only the default `pearson` correlation is supported.
    
    ### Why are the changes needed?
    Increase pandas API coverage in PySpark
    
    ### Does this PR introduce _any_ user-facing change?
    yes, new function added:
    
    ```
    In [4]: ps.set_option("compute.ops_on_diff_frames", True)
    
    In [5]: df1 = ps.DataFrame({"A":[1, 5, 7, 8],  "X":[5, 8, 4, 3],  "C":[10, 
4, 9, 3]})
    
    In [6]: df2 = ps.DataFrame({"A":[5, 3, 6, 4],  "B":[11, 2, 4, 3], "C":[4, 
3, 8, 5]})
    
    In [7]: df1.corrwith(df2)
    Out[7]:
    A   -0.041703
    C    0.395437
    X         NaN
    B         NaN
    dtype: float64
    
    In [8]: df1.corrwith(df2.B)
    Out[8]:
    A   -0.844007
    X   -0.151186
    C    0.767234
    dtype: float64
    ```
    
    ### How was this patch tested?
    added UT
    
    Closes #36205 from zhengruifeng/impl_corr_with.
    
    Authored-by: Ruifeng Zheng <ruife...@apache.org>
    Signed-off-by: Hyukjin Kwon <gurwls...@apache.org>
---
 .../docs/source/reference/pyspark.pandas/frame.rst |   1 +
 .../pandas_on_spark/supported_pandas_api.rst       |   2 +-
 python/pyspark/pandas/frame.py                     | 151 +++++++++++++++++++++
 python/pyspark/pandas/missing/frame.py             |   1 -
 python/pyspark/pandas/tests/test_dataframe.py      |  27 ++++
 .../pandas/tests/test_ops_on_diff_frames.py        |  29 ++++
 6 files changed, 209 insertions(+), 2 deletions(-)

diff --git a/python/docs/source/reference/pyspark.pandas/frame.rst 
b/python/docs/source/reference/pyspark.pandas/frame.rst
index 9635115b0d7..05c215110c6 100644
--- a/python/docs/source/reference/pyspark.pandas/frame.rst
+++ b/python/docs/source/reference/pyspark.pandas/frame.rst
@@ -147,6 +147,7 @@ Computations / Descriptive Stats
    DataFrame.any
    DataFrame.clip
    DataFrame.corr
+   DataFrame.corrwith
    DataFrame.count
    DataFrame.cov
    DataFrame.describe
diff --git 
a/python/docs/source/user_guide/pandas_on_spark/supported_pandas_api.rst 
b/python/docs/source/user_guide/pandas_on_spark/supported_pandas_api.rst
index efcc55c7178..450742a20f7 100644
--- a/python/docs/source/user_guide/pandas_on_spark/supported_pandas_api.rst
+++ b/python/docs/source/user_guide/pandas_on_spark/supported_pandas_api.rst
@@ -121,7 +121,7 @@ Supported DataFrame APIs
 
+--------------------------------------------+-------------+--------------------------------------+
 | :func:`corr`                               | P           | ``min_periods``   
                   |
 
+--------------------------------------------+-------------+--------------------------------------+
-| corrwith                                   | N           |                   
                   |
+| corrwith                                   | P           | ``axis``          
                   |
 
+--------------------------------------------+-------------+--------------------------------------+
 | :func:`count`                              | P           | ``level``         
                   |
 
+--------------------------------------------+-------------+--------------------------------------+
diff --git a/python/pyspark/pandas/frame.py b/python/pyspark/pandas/frame.py
index 16f8e786b0f..9880e2a18d8 100644
--- a/python/pyspark/pandas/frame.py
+++ b/python/pyspark/pandas/frame.py
@@ -1310,6 +1310,157 @@ class DataFrame(Frame, Generic[T]):
         """
         return cast(DataFrame, ps.from_pandas(corr(self, method)))
 
+    # TODO: add axis parameter and support more methods
+    def corrwith(
+        self, other: DataFrameOrSeries, drop: bool = False, method: str = 
"pearson"
+    ) -> "Series":
+        """
+        Compute pairwise correlation.
+
+        Pairwise correlation is computed between rows or columns of
+        DataFrame with rows or columns of Series or DataFrame. DataFrames
+        are first aligned along both axes before computing the
+        correlations.
+
+        .. versionadded:: 3.4.0
+
+        Parameters
+        ----------
+        other : DataFrame, Series
+            Object with which to compute correlations.
+
+        drop : bool, default False
+            Drop missing indices from result.
+
+        method : str, default 'pearson'
+            Method of correlation, one of:
+
+            * pearson : standard correlation coefficient
+
+        Returns
+        -------
+        Series
+            Pairwise correlations.
+
+        See Also
+        --------
+        DataFrame.corr : Compute pairwise correlation of columns.
+
+        Examples
+        --------
+        >>> df1 = ps.DataFrame({
+        ...         "A":[1, 5, 7, 8],
+        ...         "X":[5, 8, 4, 3],
+        ...         "C":[10, 4, 9, 3]})
+        >>> df1.corrwith(df1[["X", "C"]])
+        X    1.0
+        C    1.0
+        A    NaN
+        dtype: float64
+
+        >>> df2 = ps.DataFrame({
+        ...         "A":[5, 3, 6, 4],
+        ...         "B":[11, 2, 4, 3],
+        ...         "C":[4, 3, 8, 5]})
+
+        >>> with ps.option_context("compute.ops_on_diff_frames", True):
+        ...     df1.corrwith(df2)
+        A   -0.041703
+        C    0.395437
+        X         NaN
+        B         NaN
+        dtype: float64
+
+        >>> with ps.option_context("compute.ops_on_diff_frames", True):
+        ...     df2.corrwith(df1.X)
+        A   -0.597614
+        B   -0.151186
+        C   -0.642857
+        dtype: float64
+        """
+        from pyspark.pandas.series import Series, first_series
+
+        if (method is not None) and (method not in ["pearson"]):
+            raise NotImplementedError("corrwith currently works only for 
method='pearson'")
+        if not isinstance(other, (DataFrame, Series)):
+            raise TypeError("unsupported type: 
{}".format(type(other).__name__))
+
+        right_is_series = isinstance(other, Series)
+
+        if same_anchor(self, other):
+            combined = self
+            this = self
+            that = other
+        else:
+            combined = combine_frames(self, other, how="inner")
+            this = combined["this"]
+            that = combined["that"]
+
+        this_numeric_column_labels: List[Label] = []
+        for column_label in this._internal.column_labels:
+            if isinstance(this._internal.spark_type_for(column_label), 
(NumericType, BooleanType)):
+                this_numeric_column_labels.append(column_label)
+
+        that_numeric_column_labels: List[Label] = []
+        for column_label in that._internal.column_labels:
+            if isinstance(that._internal.spark_type_for(column_label), 
(NumericType, BooleanType)):
+                that_numeric_column_labels.append(column_label)
+
+        intersect_numeric_column_labels: List[Label] = []
+        diff_numeric_column_labels: List[Label] = []
+        corr_scols = []
+        if right_is_series:
+            intersect_numeric_column_labels = this_numeric_column_labels
+            that_scol = 
that._internal.spark_column_for(that_numeric_column_labels[0])
+            for numeric_column_label in intersect_numeric_column_labels:
+                this_scol = 
this._internal.spark_column_for(numeric_column_label)
+                corr_scols.append(
+                    F.corr(this_scol.cast("double"), 
that_scol.cast("double")).alias(
+                        name_like_string(numeric_column_label)
+                    )
+                )
+        else:
+            for numeric_column_label in this_numeric_column_labels:
+                if numeric_column_label in that_numeric_column_labels:
+                    
intersect_numeric_column_labels.append(numeric_column_label)
+                else:
+                    diff_numeric_column_labels.append(numeric_column_label)
+            for numeric_column_label in that_numeric_column_labels:
+                if numeric_column_label not in this_numeric_column_labels:
+                    diff_numeric_column_labels.append(numeric_column_label)
+            for numeric_column_label in intersect_numeric_column_labels:
+                this_scol = 
this._internal.spark_column_for(numeric_column_label)
+                that_scol = 
that._internal.spark_column_for(numeric_column_label)
+                corr_scols.append(
+                    F.corr(this_scol.cast("double"), 
that_scol.cast("double")).alias(
+                        name_like_string(numeric_column_label)
+                    )
+                )
+
+        corr_labels: List[Label] = intersect_numeric_column_labels
+        if not drop:
+            for numeric_column_label in diff_numeric_column_labels:
+                corr_scols.append(
+                    
SF.lit(None).cast("double").alias(name_like_string(numeric_column_label))
+                )
+                corr_labels.append(numeric_column_label)
+
+        sdf = combined._internal.spark_frame.select(
+            
*[SF.lit(None).cast(StringType()).alias(SPARK_DEFAULT_INDEX_NAME)], *corr_scols
+        ).limit(
+            1
+        )  # limit(1) to avoid returning more than 1 row when intersection is 
empty
+
+        # The data is expected to be small so it's fine to transpose/use 
default index.
+        with ps.option_context("compute.max_rows", 1):
+            internal = InternalFrame(
+                spark_frame=sdf,
+                index_spark_columns=[scol_for(sdf, SPARK_DEFAULT_INDEX_NAME)],
+                column_labels=corr_labels,
+                column_label_names=self._internal.column_label_names,
+            )
+            return first_series(DataFrame(internal).transpose())
+
     def iteritems(self) -> Iterator[Tuple[Name, "Series"]]:
         """
         Iterator over (column name, Series) pairs.
diff --git a/python/pyspark/pandas/missing/frame.py 
b/python/pyspark/pandas/missing/frame.py
index 23fb06f03ce..ba2d01c5225 100644
--- a/python/pyspark/pandas/missing/frame.py
+++ b/python/pyspark/pandas/missing/frame.py
@@ -40,7 +40,6 @@ class _MissingPandasLikeDataFrame:
     combine = _unsupported_function("combine")
     compare = _unsupported_function("compare")
     convert_dtypes = _unsupported_function("convert_dtypes")
-    corrwith = _unsupported_function("corrwith")
     infer_objects = _unsupported_function("infer_objects")
     mode = _unsupported_function("mode")
     reorder_levels = _unsupported_function("reorder_levels")
diff --git a/python/pyspark/pandas/tests/test_dataframe.py 
b/python/pyspark/pandas/tests/test_dataframe.py
index 008da92c9a9..8915ec1ca64 100644
--- a/python/pyspark/pandas/tests/test_dataframe.py
+++ b/python/pyspark/pandas/tests/test_dataframe.py
@@ -5547,6 +5547,33 @@ class DataFrameTest(ComparisonTestBase, SQLTestUtils):
         self.assert_eq(abs(psdf), abs(pdf))
         self.assert_eq(np.abs(psdf), np.abs(pdf))
 
+    def test_corrwith(self):
+        df1 = ps.DataFrame({"A": [1, np.nan, 7, 8], "X": [5, 8, np.nan, 3], 
"C": [10, 4, 9, 3]})
+        df2 = df1[["A", "C"]]
+        self._test_corrwith(df1, df2)
+        self._test_corrwith((df1 + 1), df2.A)
+        self._test_corrwith((df1 + 1), (df2.C + 2))
+
+        with self.assertRaisesRegex(
+            NotImplementedError, "corrwith currently works only for 
method='pearson'"
+        ):
+            df1.corrwith(df2, method="kendall")
+
+        with self.assertRaisesRegex(TypeError, "unsupported type"):
+            df1.corrwith(123)
+
+        df_bool = ps.DataFrame({"A": [True, True, False, False], "B": [True, 
False, False, True]})
+        self._test_corrwith(df_bool, df_bool.A)
+        self._test_corrwith(df_bool, df_bool.B)
+
+    def _test_corrwith(self, psdf, psobj):
+        pdf = psdf.to_pandas()
+        pobj = psobj.to_pandas()
+        for drop in [True, False]:
+            p_corr = pdf.corrwith(pobj, drop=drop)
+            ps_corr = psdf.corrwith(psobj, drop=drop)
+            self.assert_eq(p_corr.sort_index(), ps_corr.sort_index(), 
almost=True)
+
     def test_iteritems(self):
         pdf = pd.DataFrame(
             {"species": ["bear", "bear", "marsupial"], "population": [1864, 
22000, 80000]},
diff --git a/python/pyspark/pandas/tests/test_ops_on_diff_frames.py 
b/python/pyspark/pandas/tests/test_ops_on_diff_frames.py
index 96473769475..3ef3c676ad8 100644
--- a/python/pyspark/pandas/tests/test_ops_on_diff_frames.py
+++ b/python/pyspark/pandas/tests/test_ops_on_diff_frames.py
@@ -1842,6 +1842,35 @@ class OpsOnDiffFramesEnabledTest(PandasOnSparkTestCase, 
SQLTestUtils):
         pscov = psser1.cov(psser2, min_periods=3)
         self.assert_eq(pcov, pscov, almost=True)
 
+    def test_corrwith(self):
+        df1 = ps.DataFrame({"A": [1, np.nan, 7, 8], "X": [5, 8, np.nan, 3], 
"C": [10, 4, 9, 3]})
+        df2 = ps.DataFrame({"A": [5, 3, 6, 4], "B": [11, 2, 4, 3], "C": [4, 3, 
8, np.nan]})
+        self._test_corrwith(df1, df2)
+        self._test_corrwith((df1 + 1), df2.B)
+        self._test_corrwith((df1 + 1), (df2.B + 2))
+
+        df_bool = ps.DataFrame({"A": [True, True, False, False], "B": [True, 
False, False, True]})
+        ser_bool = ps.Series([True, True, False, True])
+        self._test_corrwith(df_bool, ser_bool)
+
+        self._test_corrwith(self.psdf1, self.psdf1)
+        self._test_corrwith(self.psdf1, self.psdf2)
+        self._test_corrwith(self.psdf2, self.psdf3)
+        self._test_corrwith(self.psdf3, self.psdf4)
+
+        self._test_corrwith(self.psdf1, self.psdf1.a)
+        self._test_corrwith(self.psdf1, self.psdf2.b)
+        self._test_corrwith(self.psdf2, self.psdf3.c)
+        self._test_corrwith(self.psdf3, self.psdf4.f)
+
+    def _test_corrwith(self, psdf, psobj):
+        pdf = psdf.to_pandas()
+        pobj = psobj.to_pandas()
+        for drop in [True, False]:
+            p_corr = pdf.corrwith(pobj, drop=drop)
+            ps_corr = psdf.corrwith(psobj, drop=drop)
+            self.assert_eq(p_corr.sort_index(), ps_corr.sort_index(), 
almost=True)
+
     def test_series_eq(self):
         pser = pd.Series([1, 2, 3, 4, 5, 6], name="x")
         psser = ps.from_pandas(pser)


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to