This is an automated email from the ASF dual-hosted git repository.
HyukjinKwon pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push:
new 7041d1eb72e0 [SPARK-40336][PS] Implement DataFrameGroupBy.cov
7041d1eb72e0 is described below
commit 7041d1eb72e048545d94b37be05fa6b3c5af8586
Author: Devin Petersohn <[email protected]>
AuthorDate: Mon May 11 06:54:03 2026 +0900
[SPARK-40336][PS] Implement DataFrameGroupBy.cov
### What changes were proposed in this pull request?
Implement `DataFrameGroupBy.cov(min_periods, ddof, numeric_only)` in the
pandas API on Spark.
### Why are the changes needed?
Missing API coverage.
### Does this PR introduce _any_ user-facing change?
Yes.
### How was this patch tested?
Unit tests.
### Was this patch authored or co-authored using generative AI tooling?
Generated-by: Claude Code (claude-opus-4-7)
Closes #55775 from devin-petersohn/devin/groupby-cov.
Authored-by: Devin Petersohn <[email protected]>
Signed-off-by: Hyukjin Kwon <[email protected]>
---
dev/sparktestsupport/modules.py | 2 +
.../source/reference/pyspark.pandas/groupby.rst | 1 +
python/pyspark/pandas/groupby.py | 210 +++++++++++++++++++++
python/pyspark/pandas/missing/groupby.py | 1 -
.../tests/connect/groupby/test_parity_cov.py | 34 ++++
python/pyspark/pandas/tests/groupby/test_cov.py | 124 ++++++++++++
6 files changed, 371 insertions(+), 1 deletion(-)
diff --git a/dev/sparktestsupport/modules.py b/dev/sparktestsupport/modules.py
index 664b0d81840a..693d43f10f57 100644
--- a/dev/sparktestsupport/modules.py
+++ b/dev/sparktestsupport/modules.py
@@ -1045,6 +1045,7 @@ pyspark_pandas_slow = Module(
"pyspark.pandas.tests.groupby.test_aggregate",
"pyspark.pandas.tests.groupby.test_apply_func",
"pyspark.pandas.tests.groupby.test_corr",
+ "pyspark.pandas.tests.groupby.test_cov",
"pyspark.pandas.tests.groupby.test_cumulative",
"pyspark.pandas.tests.groupby.test_describe",
"pyspark.pandas.tests.groupby.test_groupby",
@@ -1477,6 +1478,7 @@ pyspark_pandas_slow_connect = Module(
"pyspark.pandas.tests.connect.groupby.test_parity_aggregate",
"pyspark.pandas.tests.connect.groupby.test_parity_apply_func",
"pyspark.pandas.tests.connect.groupby.test_parity_corr",
+ "pyspark.pandas.tests.connect.groupby.test_parity_cov",
"pyspark.pandas.tests.connect.groupby.test_parity_cumulative",
"pyspark.pandas.tests.connect.groupby.test_parity_missing_data",
"pyspark.pandas.tests.connect.groupby.test_parity_split_apply",
diff --git a/python/docs/source/reference/pyspark.pandas/groupby.rst
b/python/docs/source/reference/pyspark.pandas/groupby.rst
index f86a7572666b..02b9174ad0ab 100644
--- a/python/docs/source/reference/pyspark.pandas/groupby.rst
+++ b/python/docs/source/reference/pyspark.pandas/groupby.rst
@@ -97,6 +97,7 @@ The following methods are available only for
`DataFrameGroupBy` objects.
.. autosummary::
:toctree: api/
+ DataFrameGroupBy.cov
DataFrameGroupBy.describe
The following methods are available only for `SeriesGroupBy` objects.
diff --git a/python/pyspark/pandas/groupby.py b/python/pyspark/pandas/groupby.py
index c5b81f05cc57..2cfa1ae2316b 100644
--- a/python/pyspark/pandas/groupby.py
+++ b/python/pyspark/pandas/groupby.py
@@ -4303,6 +4303,216 @@ class DataFrameGroupBy(GroupBy[DataFrame]):
)
)
+ @with_ansi_mode_context
+ def cov(
+ self,
+ min_periods: Optional[int] = None,
+ ddof: int = 1,
+ numeric_only: bool = False,
+ ) -> "DataFrame":
+ """
+ Compute pairwise covariance of columns, excluding NA/null values.
+
+ The returned DataFrame is the covariance matrix of the columns
+ of the DataFrame within each group.
+
+ Both NA and null values are automatically excluded from the
+ calculation. A threshold can be set for the minimum number of
+ observations for each value created. Comparisons with observations
+ below this threshold will be returned as ``NaN``.
+
+ .. versionadded:: 4.3.0
+
+ Parameters
+ ----------
+ min_periods : int, optional
+ Minimum number of observations required per pair of columns
+ to have a valid result.
+
+ ddof : int, default 1
+ Delta degrees of freedom. The divisor used in calculations
+ is ``N - ddof``, where ``N`` represents the number of elements.
+
+ numeric_only : bool, default False
+ Include only `float`, `int` or `boolean` data.
+
+ Returns
+ -------
+ DataFrame
+ The covariance matrix of the series of the DataFrame within each
group.
+
+ See Also
+ --------
+ DataFrame.cov
+ Series.cov
+
+ Examples
+ --------
+ >>> df = ps.DataFrame(
+ ... {"A": [1, 1, 2, 2, 2], "B": [1, 2, 3, 4, 5], "C": [4, 6, 7, 9,
11]},
+ ... columns=["A", "B", "C"])
+ >>> df.groupby("A").cov().sort_index()
+ B C
+ A
+ 1 B 0.5 1.0
+ C 1.0 2.0
+ 2 B 1.0 2.0
+ C 2.0 4.0
+
+ >>> df.groupby("A").cov(ddof=0).sort_index()
+ B C
+ A
+ 1 B 0.250000 0.500000
+ C 0.500000 1.000000
+ 2 B 0.666667 1.333333
+ C 1.333333 2.666667
+
+ >>> df.groupby("A").cov(min_periods=3).sort_index()
+ B C
+ A
+ 1 B NaN NaN
+ C NaN NaN
+ 2 B 1.0 2.0
+ C 2.0 4.0
+ """
+ if not isinstance(ddof, int):
+ raise TypeError("ddof must be integer")
+ if LooseVersion(pd.__version__) >= "3.0.0":
+ if not isinstance(numeric_only, bool):
+ raise ValueError("numeric_only accepts only Boolean values")
+ min_periods = 1 if min_periods is None else min_periods
+
+ groupkey_names: List[str] = [str(key.name) for key in self._groupkeys]
+ internal, _, _ = self._prepare_reduce(
+ groupkey_names=groupkey_names,
+ accepted_spark_types=(NumericType, BooleanType) if numeric_only
else None,
+ bool_to_numeric=False,
+ )
+
+ numeric_labels = [
+ label
+ for label in internal.column_labels
+ if isinstance(internal.spark_type_for(label), (NumericType,
BooleanType))
+ ]
+ numeric_scols: List[Column] = [
+ internal.spark_column_for(label).cast("double") for label in
numeric_labels
+ ]
+ numeric_col_names: List[str] = [name_like_string(label) for label in
numeric_labels]
+ num_scols = len(numeric_scols)
+
+ sdf = internal.spark_frame
+ index_1_col_name = verify_temp_column_name(sdf,
"__groupby_cov_index_1_temp_column__")
+ index_2_col_name = verify_temp_column_name(sdf,
"__groupby_cov_index_2_temp_column__")
+ value_1_col_name = verify_temp_column_name(sdf,
"__groupby_cov_value_1_temp_column__")
+ value_2_col_name = verify_temp_column_name(sdf,
"__groupby_cov_value_2_temp_column__")
+ cov_output_col_name = verify_temp_column_name(sdf,
"__groupby_cov_output_temp_column__")
+ count_output_col_name = verify_temp_column_name(sdf,
"__groupby_cov_count_temp_column__")
+
+ pair_scols: List[Column] = []
+ for i in range(0, num_scols):
+ for j in range(i, num_scols):
+ pair_scols.append(
+ F.struct(
+ F.lit(i).alias(index_1_col_name),
+ F.lit(j).alias(index_2_col_name),
+ numeric_scols[i].alias(value_1_col_name),
+ numeric_scols[j].alias(value_2_col_name),
+ )
+ )
+
+ sdf = sdf.select(*[F.col(key) for key in groupkey_names],
*[F.inline(F.array(*pair_scols))])
+
+ # Null both values when either is null so pairwise non-null counts are
accurate.
+ sdf = sdf.select(
+ *[F.col(key) for key in groupkey_names + [index_1_col_name,
index_2_col_name]],
+ F.when(F.isnull(value_1_col_name) | F.isnull(value_2_col_name),
F.lit(None))
+ .otherwise(F.col(value_1_col_name))
+ .alias(value_1_col_name),
+ F.when(F.isnull(value_1_col_name) | F.isnull(value_2_col_name),
F.lit(None))
+ .otherwise(F.col(value_2_col_name))
+ .alias(value_2_col_name),
+ )
+
+ sdf = sdf.groupby(groupkey_names + [index_1_col_name,
index_2_col_name]).agg(
+ SF.covar(F.col(value_1_col_name), F.col(value_2_col_name),
ddof).alias(
+ cov_output_col_name
+ ),
+ F.count(F.when(~F.isnull(value_1_col_name),
1)).alias(count_output_col_name),
+ )
+
+ sdf = sdf.withColumn(
+ cov_output_col_name,
+ F.when(F.col(count_output_col_name) < min_periods,
F.lit(None)).otherwise(
+ F.col(cov_output_col_name)
+ ),
+ )
+
+ # Mirror the (i, j) pair to (j, i) to fill in the lower triangle of
the matrix.
+ auxiliary_col_name = verify_temp_column_name(sdf,
"__groupby_cov_auxiliary_temp_column__")
+ sdf = sdf.withColumn(
+ auxiliary_col_name,
+ F.explode(
+ F.when(
+ F.col(index_1_col_name) == F.col(index_2_col_name),
+ F.lit([0]),
+ ).otherwise(F.lit([0, 1]))
+ ),
+ ).select(
+ *[F.col(key) for key in groupkey_names],
+ *[
+ F.when(F.col(auxiliary_col_name) == 0, F.col(index_1_col_name))
+ .otherwise(F.col(index_2_col_name))
+ .alias(index_1_col_name),
+ F.when(F.col(auxiliary_col_name) == 0, F.col(index_2_col_name))
+ .otherwise(F.col(index_1_col_name))
+ .alias(index_2_col_name),
+ F.col(cov_output_col_name),
+ ],
+ )
+
+ array_col_name = verify_temp_column_name(sdf,
"__groupby_cov_array_temp_column__")
+ sdf = sdf.groupby(groupkey_names + [index_1_col_name]).agg(
+ F.array_sort(
+ F.collect_list(
+ F.struct(
+ F.col(index_2_col_name),
+ F.col(cov_output_col_name),
+ )
+ )
+ ).alias(array_col_name)
+ )
+
+ for i in range(0, num_scols):
+ sdf = sdf.withColumn(auxiliary_col_name,
F.get(F.col(array_col_name), i)).withColumn(
+ numeric_col_names[i],
+ F.col(f"{auxiliary_col_name}.{cov_output_col_name}"),
+ )
+
+ sdf = sdf.orderBy(groupkey_names + [index_1_col_name])
+
+ sdf = sdf.select(
+ *[F.col(col) for col in groupkey_names + numeric_col_names],
+ *[
+ F.get(F.lit(numeric_col_names),
F.col(index_1_col_name)).alias(auxiliary_col_name),
+
F.monotonically_increasing_id().alias(NATURAL_ORDER_COLUMN_NAME),
+ ],
+ )
+
+ return DataFrame(
+ InternalFrame(
+ spark_frame=sdf,
+ index_spark_columns=[
+ scol_for(sdf, key) for key in groupkey_names +
[auxiliary_col_name]
+ ],
+ index_names=(
+ [psser._column_label for psser in self._groupkeys]
+ + self._psdf._internal.index_names
+ ),
+ column_labels=numeric_labels,
+ column_label_names=internal.column_label_names,
+ )
+ )
+
class SeriesGroupBy(GroupBy[Series]):
@staticmethod
diff --git a/python/pyspark/pandas/missing/groupby.py
b/python/pyspark/pandas/missing/groupby.py
index 04891006dee7..9e655384dfac 100644
--- a/python/pyspark/pandas/missing/groupby.py
+++ b/python/pyspark/pandas/missing/groupby.py
@@ -42,7 +42,6 @@ class MissingPandasLikeDataFrameGroupBy:
# Properties
corrwith = _unsupported_property("corrwith")
- cov = _unsupported_property("cov")
dtypes = _unsupported_property("dtypes")
groups = _unsupported_property("groups")
hist = _unsupported_property("hist")
diff --git a/python/pyspark/pandas/tests/connect/groupby/test_parity_cov.py
b/python/pyspark/pandas/tests/connect/groupby/test_parity_cov.py
new file mode 100644
index 000000000000..389a50ff6e96
--- /dev/null
+++ b/python/pyspark/pandas/tests/connect/groupby/test_parity_cov.py
@@ -0,0 +1,34 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+from pyspark.pandas.tests.groupby.test_cov import CovMixin
+from pyspark.testing.connectutils import ReusedConnectTestCase
+from pyspark.testing.pandasutils import PandasOnSparkTestUtils
+
+
+class CovParityTests(
+ CovMixin,
+ PandasOnSparkTestUtils,
+ ReusedConnectTestCase,
+):
+ pass
+
+
+if __name__ == "__main__":
+ from pyspark.testing import main
+
+ main()
diff --git a/python/pyspark/pandas/tests/groupby/test_cov.py
b/python/pyspark/pandas/tests/groupby/test_cov.py
new file mode 100644
index 000000000000..849e469b0500
--- /dev/null
+++ b/python/pyspark/pandas/tests/groupby/test_cov.py
@@ -0,0 +1,124 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+import numpy as np
+import pandas as pd
+
+from pyspark import pandas as ps
+from pyspark.loose_version import LooseVersion
+from pyspark.testing.pandasutils import PandasOnSparkTestCase
+from pyspark.testing.sqlutils import SQLTestUtils
+
+
+class CovMixin:
+ @property
+ def pdf(self):
+ return pd.DataFrame(
+ {
+ "A": [1, 1, 2, 2, 2, 3],
+ "B": [-1, 2, 3, 5, 6, 0],
+ "C": [4, 6, 5, 1, 3, 0],
+ },
+ columns=["A", "B", "C"],
+ )
+
+ @property
+ def psdf(self):
+ return ps.from_pandas(self.pdf)
+
+ def test_cov(self):
+ for c in ["A", "B", "C"]:
+ self.assert_eq(
+ self.pdf.groupby(c).cov().sort_index(),
+ self.psdf.groupby(c).cov().sort_index(),
+ almost=True,
+ )
+
+ def test_ddof(self):
+ # Use a dataset with enough rows per group to keep N - ddof > 0,
+ # since pandas and Spark diverge on inf/NaN handling when the divisor
is non-positive.
+ pdf = pd.DataFrame(
+ {
+ "A": [1, 1, 1, 1, 2, 2, 2, 2],
+ "B": [1, 2, 3, 4, 5, 6, 7, 8],
+ "C": [4, 6, 5, 1, 3, 0, 9, 2],
+ },
+ columns=["A", "B", "C"],
+ )
+ psdf = ps.from_pandas(pdf)
+ for ddof in [0, 1, 2]:
+ self.assert_eq(
+ pdf.groupby("A").cov(ddof=ddof).sort_index(),
+ psdf.groupby("A").cov(ddof=ddof).sort_index(),
+ almost=True,
+ )
+
+ def test_min_periods(self):
+ pdf = pd.DataFrame(
+ {
+ "A": [1, 1, 1, 1, 2, 2],
+ "B": [1.0, 2.0, np.nan, 4.0, 5.0, 6.0],
+ "C": [4.0, 6.0, 5.0, 7.0, 1.0, 3.0],
+ },
+ columns=["A", "B", "C"],
+ )
+ psdf = ps.from_pandas(pdf)
+ for m in [1, 2, 3, 4]:
+ self.assert_eq(
+ pdf.groupby("A").cov(min_periods=m).sort_index(),
+ psdf.groupby("A").cov(min_periods=m).sort_index(),
+ almost=True,
+ )
+
+ def test_numeric_only(self):
+ pdf = pd.DataFrame(
+ {
+ "A": [1, 1, 2, 2, 2],
+ "B": [1, 2, 3, 4, 5],
+ "C": [4, 6, 5, 1, 3],
+ "D": ["x", "y", "z", "w", "v"],
+ },
+ columns=["A", "B", "C", "D"],
+ )
+ psdf = ps.from_pandas(pdf)
+ self.assert_eq(
+ pdf.groupby("A").cov(numeric_only=True).sort_index(),
+ psdf.groupby("A").cov(numeric_only=True).sort_index(),
+ almost=True,
+ )
+
+ def test_invalid_args(self):
+ with self.assertRaisesRegex(TypeError, "ddof must be integer"):
+ self.psdf.groupby("A").cov(ddof="1")
+
+ if LooseVersion(pd.__version__) >= "3.0.0":
+ with self.assertRaisesRegex(ValueError, "numeric_only accepts only
Boolean values"):
+ self.psdf.groupby("A").cov(numeric_only="True")
+
+
+class CovTests(
+ CovMixin,
+ PandasOnSparkTestCase,
+ SQLTestUtils,
+):
+ pass
+
+
+if __name__ == "__main__":
+ from pyspark.testing import main
+
+ main()
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]