This is an automated email from the ASF dual-hosted git repository.

gurwls223 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new 8ad55cd02546 [SPARK-46162][PS] Implement nunique with axis=1
8ad55cd02546 is described below

commit 8ad55cd025463021e56a77929b4da5ce96bf6d98
Author: Devin Petersohn <[email protected]>
AuthorDate: Mon Feb 23 07:21:18 2026 +0900

    [SPARK-46162][PS] Implement nunique with axis=1
    
    ### What changes were proposed in this pull request?
    
    Add implementation for axis argument for `pandas.DataFrame.nunique`
    
    ### Why are the changes needed?
    
    To add missing API
    
    ### Does this PR introduce _any_ user-facing change?
    
    Yes, new parameter
    
    ### How was this patch tested?
    
    CI
    
    ### Was this patch authored or co-authored using generative AI tooling?
    
    Co-authored-by: Claude Sonnet 4.5
    
    Closes #54319 from devin-petersohn/devin/nunique_axis.
    
    Authored-by: Devin Petersohn <[email protected]>
    Signed-off-by: Hyukjin Kwon <[email protected]>
---
 python/pyspark/pandas/frame.py                     | 88 ++++++++++++++++------
 .../pandas/tests/computation/test_compute.py       | 35 ++++++++-
 2 files changed, 94 insertions(+), 29 deletions(-)

diff --git a/python/pyspark/pandas/frame.py b/python/pyspark/pandas/frame.py
index 4cbf4a8a4530..2fa90e8e15cf 100644
--- a/python/pyspark/pandas/frame.py
+++ b/python/pyspark/pandas/frame.py
@@ -4899,7 +4899,6 @@ defaultdict(<class 'list'>, {'col..., 'col...})]
 
         return self._apply_series_op(lambda psser: psser._diff(periods), 
should_resolve=True)
 
-    # TODO(SPARK-46162): axis should support 1 or 'columns' either at this 
moment
     def nunique(
         self,
         axis: Axis = 0,
@@ -4914,8 +4913,9 @@ defaultdict(<class 'list'>, {'col..., 'col...})]
 
         Parameters
         ----------
-        axis : int, default 0 or 'index'
-            Can only be set to 0 now.
+        axis : {0 or 'index', 1 or 'columns'}, default 0
+            The axis to use. 0 or 'index' for row-wise (count unique values 
per column),
+            1 or 'columns' for column-wise (count unique values per row).
         dropna : bool, default True
             Don’t include NaN in the count.
         approx: bool, default False
@@ -4923,13 +4923,16 @@ defaultdict(<class 'list'>, {'col..., 'col...})]
             If True, it uses the HyperLogLog approximate algorithm, which is 
significantly faster
             for large amounts of data.
             Note: This parameter is specific to pandas-on-Spark and is not 
found in pandas.
+            For axis=1, this parameter is ignored and exact counting is always 
used.
         rsd: float, default 0.05
             Maximum estimation error allowed in the HyperLogLog algorithm.
             Note: Just like ``approx`` this parameter is specific to 
pandas-on-Spark.
+            For axis=1, this parameter is ignored.
 
         Returns
         -------
-        The number of unique values per column as a pandas-on-Spark Series.
+        Series
+            The number of unique values per column (axis=0) or per row 
(axis=1).
 
         Examples
         --------
@@ -4944,6 +4947,18 @@ defaultdict(<class 'list'>, {'col..., 'col...})]
         B    2
         dtype: int64
 
+        >>> df.nunique(axis=1)
+        0    1
+        1    2
+        2    1
+        dtype: int32
+
+        >>> df.nunique(axis=1, dropna=False)
+        0    2
+        1    2
+        2    2
+        dtype: int32
+
         On big data, we recommend using the approximate algorithm to speed up 
this function.
         The result will be very close to the exact unique count.
 
@@ -4955,29 +4970,52 @@ defaultdict(<class 'list'>, {'col..., 'col...})]
         from pyspark.pandas.series import first_series
 
         axis = validate_axis(axis)
-        if axis != 0:
-            raise NotImplementedError('axis should be either 0 or "index" 
currently.')
-        sdf = self._internal.spark_frame.select(
-            [F.lit(None).cast(StringType()).alias(SPARK_DEFAULT_INDEX_NAME)]
-            + [
-                self._psser_for(label)._nunique(dropna, approx, rsd)
-                for label in self._internal.column_labels
-            ]
-        )
+        if axis == 0:
+            sdf = self._internal.spark_frame.select(
+                
[F.lit(None).cast(StringType()).alias(SPARK_DEFAULT_INDEX_NAME)]
+                + [
+                    self._psser_for(label)._nunique(dropna, approx, rsd)
+                    for label in self._internal.column_labels
+                ]
+            )
 
-        # The data is expected to be small so it's fine to transpose/use the 
default index.
-        with ps.option_context("compute.max_rows", 1):
-            internal = self._internal.copy(
-                spark_frame=sdf,
-                index_spark_columns=[scol_for(sdf, SPARK_DEFAULT_INDEX_NAME)],
-                index_names=[None],
-                index_fields=[None],
-                data_spark_columns=[
-                    scol_for(sdf, col) for col in 
self._internal.data_spark_column_names
-                ],
-                data_fields=None,
+            # The data is expected to be small so it's fine to transpose/use 
the default index.
+            with ps.option_context("compute.max_rows", 1):
+                internal = self._internal.copy(
+                    spark_frame=sdf,
+                    index_spark_columns=[scol_for(sdf, 
SPARK_DEFAULT_INDEX_NAME)],
+                    index_names=[None],
+                    index_fields=[None],
+                    data_spark_columns=[
+                        scol_for(sdf, col) for col in 
self._internal.data_spark_column_names
+                    ],
+                    data_fields=None,
+                )
+                return first_series(DataFrame(internal).transpose())
+        elif axis == 1:
+            from pyspark.pandas.series import first_series
+
+            arr = F.array(
+                *[self._internal.spark_column_for(label) for label in 
self._internal.column_labels]
+            )
+            arr = F.filter(arr, lambda x: x.isNotNull()) if dropna else arr
+
+            sdf = self._internal.spark_frame.select(
+                *self._internal.index_spark_columns,
+                F.size(F.array_distinct(arr)).alias(SPARK_DEFAULT_SERIES_NAME),
+            )
+
+            return first_series(
+                DataFrame(
+                    InternalFrame(
+                        spark_frame=sdf,
+                        index_spark_columns=self._internal.index_spark_columns,
+                        index_names=self._internal.index_names,
+                        index_fields=self._internal.index_fields,
+                        column_labels=[None],
+                    )
+                )
             )
-            return first_series(DataFrame(internal).transpose())
 
     def round(self, decimals: Union[int, Dict[Name, int], "Series"] = 0) -> 
"DataFrame":
         """
diff --git a/python/pyspark/pandas/tests/computation/test_compute.py 
b/python/pyspark/pandas/tests/computation/test_compute.py
index c3b61a08af54..a45132a20a2e 100644
--- a/python/pyspark/pandas/tests/computation/test_compute.py
+++ b/python/pyspark/pandas/tests/computation/test_compute.py
@@ -293,10 +293,33 @@ class FrameComputeMixin:
             pd.Series([100], index=["A"]),
         )
 
-        # Assert unsupported axis value yet
-        msg = 'axis should be either 0 or "index" currently.'
-        with self.assertRaisesRegex(NotImplementedError, msg):
-            psdf.nunique(axis=1)
+        # Test axis=1 (row-wise unique count)
+        # Note: Compare values only - Spark's F.size returns int32, pandas 
returns int64
+        self.assertEqual(psdf.nunique(axis=1).tolist(), 
pdf.nunique(axis=1).tolist())
+        self.assertEqual(
+            psdf.nunique(axis=1, dropna=False).tolist(), pdf.nunique(axis=1, 
dropna=False).tolist()
+        )
+
+        # Test axis=1 with more complex data
+        pdf2 = pd.DataFrame({"A": [1, 2, 3], "B": [1, 3, 3], "C": [2, 2, 3]})
+        psdf2 = ps.from_pandas(pdf2)
+        self.assertEqual(psdf2.nunique(axis=1).tolist(), 
pdf2.nunique(axis=1).tolist())
+
+        # Test axis=1 with all NaN row
+        pdf3 = pd.DataFrame({"A": [1, 2, np.nan], "B": [1, 3, np.nan], "C": 
[2, 2, np.nan]})
+        psdf3 = ps.from_pandas(pdf3)
+        self.assertEqual(
+            psdf3.nunique(axis=1, dropna=True).tolist(), pdf3.nunique(axis=1, 
dropna=True).tolist()
+        )
+        self.assertEqual(
+            psdf3.nunique(axis=1, dropna=False).tolist(),
+            pdf3.nunique(axis=1, dropna=False).tolist(),
+        )
+
+        # Test single column DataFrame with axis=1
+        pdf4 = pd.DataFrame({"A": [1, 2, 3]})
+        psdf4 = ps.from_pandas(pdf4)
+        self.assertEqual(psdf4.nunique(axis=1).tolist(), 
pdf4.nunique(axis=1).tolist())
 
         # multi-index columns
         columns = pd.MultiIndex.from_tuples([("X", "A"), ("Y", "B")], 
names=["1", "2"])
@@ -305,6 +328,10 @@ class FrameComputeMixin:
 
         self.assert_eq(psdf.nunique(), pdf.nunique())
         self.assert_eq(psdf.nunique(dropna=False), pdf.nunique(dropna=False))
+        self.assertEqual(psdf.nunique(axis=1).tolist(), 
pdf.nunique(axis=1).tolist())
+        self.assertEqual(
+            psdf.nunique(axis=1, dropna=False).tolist(), pdf.nunique(axis=1, 
dropna=False).tolist()
+        )
 
     def test_quantile(self):
         pdf, psdf = self.df_pair


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to