This is an automated email from the ASF dual-hosted git repository.
gurwls223 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push:
new 8ad55cd02546 [SPARK-46162][PS] Implement nunique with axis=1
8ad55cd02546 is described below
commit 8ad55cd025463021e56a77929b4da5ce96bf6d98
Author: Devin Petersohn <[email protected]>
AuthorDate: Mon Feb 23 07:21:18 2026 +0900
[SPARK-46162][PS] Implement nunique with axis=1
### What changes were proposed in this pull request?
Add implementation for axis argument for `pandas.DataFrame.nunique`
### Why are the changes needed?
To add missing API
### Does this PR introduce _any_ user-facing change?
Yes, new parameter
### How was this patch tested?
CI
### Was this patch authored or co-authored using generative AI tooling?
Co-authored-by: Claude Sonnet 4.5
Closes #54319 from devin-petersohn/devin/nunique_axis.
Authored-by: Devin Petersohn <[email protected]>
Signed-off-by: Hyukjin Kwon <[email protected]>
---
python/pyspark/pandas/frame.py | 88 ++++++++++++++++------
.../pandas/tests/computation/test_compute.py | 35 ++++++++-
2 files changed, 94 insertions(+), 29 deletions(-)
diff --git a/python/pyspark/pandas/frame.py b/python/pyspark/pandas/frame.py
index 4cbf4a8a4530..2fa90e8e15cf 100644
--- a/python/pyspark/pandas/frame.py
+++ b/python/pyspark/pandas/frame.py
@@ -4899,7 +4899,6 @@ defaultdict(<class 'list'>, {'col..., 'col...})]
return self._apply_series_op(lambda psser: psser._diff(periods),
should_resolve=True)
- # TODO(SPARK-46162): axis should support 1 or 'columns' either at this
moment
def nunique(
self,
axis: Axis = 0,
@@ -4914,8 +4913,9 @@ defaultdict(<class 'list'>, {'col..., 'col...})]
Parameters
----------
- axis : int, default 0 or 'index'
- Can only be set to 0 now.
+ axis : {0 or 'index', 1 or 'columns'}, default 0
+ The axis to use. 0 or 'index' for row-wise (count unique values
per column),
+ 1 or 'columns' for column-wise (count unique values per row).
dropna : bool, default True
Don’t include NaN in the count.
approx: bool, default False
@@ -4923,13 +4923,16 @@ defaultdict(<class 'list'>, {'col..., 'col...})]
If True, it uses the HyperLogLog approximate algorithm, which is
significantly faster
for large amounts of data.
Note: This parameter is specific to pandas-on-Spark and is not
found in pandas.
+ For axis=1, this parameter is ignored and exact counting is always
used.
rsd: float, default 0.05
Maximum estimation error allowed in the HyperLogLog algorithm.
Note: Just like ``approx`` this parameter is specific to
pandas-on-Spark.
+ For axis=1, this parameter is ignored.
Returns
-------
- The number of unique values per column as a pandas-on-Spark Series.
+ Series
+ The number of unique values per column (axis=0) or per row
(axis=1).
Examples
--------
@@ -4944,6 +4947,18 @@ defaultdict(<class 'list'>, {'col..., 'col...})]
B 2
dtype: int64
+ >>> df.nunique(axis=1)
+ 0 1
+ 1 2
+ 2 1
+ dtype: int32
+
+ >>> df.nunique(axis=1, dropna=False)
+ 0 2
+ 1 2
+ 2 2
+ dtype: int32
+
On big data, we recommend using the approximate algorithm to speed up
this function.
The result will be very close to the exact unique count.
@@ -4955,29 +4970,52 @@ defaultdict(<class 'list'>, {'col..., 'col...})]
from pyspark.pandas.series import first_series
axis = validate_axis(axis)
- if axis != 0:
- raise NotImplementedError('axis should be either 0 or "index"
currently.')
- sdf = self._internal.spark_frame.select(
- [F.lit(None).cast(StringType()).alias(SPARK_DEFAULT_INDEX_NAME)]
- + [
- self._psser_for(label)._nunique(dropna, approx, rsd)
- for label in self._internal.column_labels
- ]
- )
+ if axis == 0:
+ sdf = self._internal.spark_frame.select(
+
[F.lit(None).cast(StringType()).alias(SPARK_DEFAULT_INDEX_NAME)]
+ + [
+ self._psser_for(label)._nunique(dropna, approx, rsd)
+ for label in self._internal.column_labels
+ ]
+ )
- # The data is expected to be small so it's fine to transpose/use the
default index.
- with ps.option_context("compute.max_rows", 1):
- internal = self._internal.copy(
- spark_frame=sdf,
- index_spark_columns=[scol_for(sdf, SPARK_DEFAULT_INDEX_NAME)],
- index_names=[None],
- index_fields=[None],
- data_spark_columns=[
- scol_for(sdf, col) for col in
self._internal.data_spark_column_names
- ],
- data_fields=None,
+ # The data is expected to be small so it's fine to transpose/use
the default index.
+ with ps.option_context("compute.max_rows", 1):
+ internal = self._internal.copy(
+ spark_frame=sdf,
+ index_spark_columns=[scol_for(sdf,
SPARK_DEFAULT_INDEX_NAME)],
+ index_names=[None],
+ index_fields=[None],
+ data_spark_columns=[
+ scol_for(sdf, col) for col in
self._internal.data_spark_column_names
+ ],
+ data_fields=None,
+ )
+ return first_series(DataFrame(internal).transpose())
+ elif axis == 1:
+ from pyspark.pandas.series import first_series
+
+ arr = F.array(
+ *[self._internal.spark_column_for(label) for label in
self._internal.column_labels]
+ )
+ arr = F.filter(arr, lambda x: x.isNotNull()) if dropna else arr
+
+ sdf = self._internal.spark_frame.select(
+ *self._internal.index_spark_columns,
+ F.size(F.array_distinct(arr)).alias(SPARK_DEFAULT_SERIES_NAME),
+ )
+
+ return first_series(
+ DataFrame(
+ InternalFrame(
+ spark_frame=sdf,
+ index_spark_columns=self._internal.index_spark_columns,
+ index_names=self._internal.index_names,
+ index_fields=self._internal.index_fields,
+ column_labels=[None],
+ )
+ )
)
- return first_series(DataFrame(internal).transpose())
def round(self, decimals: Union[int, Dict[Name, int], "Series"] = 0) ->
"DataFrame":
"""
diff --git a/python/pyspark/pandas/tests/computation/test_compute.py
b/python/pyspark/pandas/tests/computation/test_compute.py
index c3b61a08af54..a45132a20a2e 100644
--- a/python/pyspark/pandas/tests/computation/test_compute.py
+++ b/python/pyspark/pandas/tests/computation/test_compute.py
@@ -293,10 +293,33 @@ class FrameComputeMixin:
pd.Series([100], index=["A"]),
)
- # Assert unsupported axis value yet
- msg = 'axis should be either 0 or "index" currently.'
- with self.assertRaisesRegex(NotImplementedError, msg):
- psdf.nunique(axis=1)
+ # Test axis=1 (row-wise unique count)
+ # Note: Compare values only - Spark's F.size returns int32, pandas
returns int64
+ self.assertEqual(psdf.nunique(axis=1).tolist(),
pdf.nunique(axis=1).tolist())
+ self.assertEqual(
+ psdf.nunique(axis=1, dropna=False).tolist(), pdf.nunique(axis=1,
dropna=False).tolist()
+ )
+
+ # Test axis=1 with more complex data
+ pdf2 = pd.DataFrame({"A": [1, 2, 3], "B": [1, 3, 3], "C": [2, 2, 3]})
+ psdf2 = ps.from_pandas(pdf2)
+ self.assertEqual(psdf2.nunique(axis=1).tolist(),
pdf2.nunique(axis=1).tolist())
+
+ # Test axis=1 with all NaN row
+ pdf3 = pd.DataFrame({"A": [1, 2, np.nan], "B": [1, 3, np.nan], "C":
[2, 2, np.nan]})
+ psdf3 = ps.from_pandas(pdf3)
+ self.assertEqual(
+ psdf3.nunique(axis=1, dropna=True).tolist(), pdf3.nunique(axis=1,
dropna=True).tolist()
+ )
+ self.assertEqual(
+ psdf3.nunique(axis=1, dropna=False).tolist(),
+ pdf3.nunique(axis=1, dropna=False).tolist(),
+ )
+
+ # Test single column DataFrame with axis=1
+ pdf4 = pd.DataFrame({"A": [1, 2, 3]})
+ psdf4 = ps.from_pandas(pdf4)
+ self.assertEqual(psdf4.nunique(axis=1).tolist(),
pdf4.nunique(axis=1).tolist())
# multi-index columns
columns = pd.MultiIndex.from_tuples([("X", "A"), ("Y", "B")],
names=["1", "2"])
@@ -305,6 +328,10 @@ class FrameComputeMixin:
self.assert_eq(psdf.nunique(), pdf.nunique())
self.assert_eq(psdf.nunique(dropna=False), pdf.nunique(dropna=False))
+ self.assertEqual(psdf.nunique(axis=1).tolist(),
pdf.nunique(axis=1).tolist())
+ self.assertEqual(
+ psdf.nunique(axis=1, dropna=False).tolist(), pdf.nunique(axis=1,
dropna=False).tolist()
+ )
def test_quantile(self):
pdf, psdf = self.df_pair
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]