This is an automated email from the ASF dual-hosted git repository.
huaxingao pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push:
new 84f978ae6b42 [SPARK-46165][PS] Add support for pandas.DataFrame.all
axis=1
84f978ae6b42 is described below
commit 84f978ae6b426940f5d7f859a6a1f4da9ebaa9ce
Author: Devin Petersohn <[email protected]>
AuthorDate: Fri Jan 16 19:30:26 2026 -0800
[SPARK-46165][PS] Add support for pandas.DataFrame.all axis=1
### What changes were proposed in this pull request?
Add support for `pandas.DataFrame.any` axis=1
### Why are the changes needed?
To support a missing API parameter
### Does this PR introduce _any_ user-facing change?
Yes, new parameter for existing API
### How was this patch tested?
CI
### Was this patch authored or co-authored using generative AI tooling?
No
Closes #53507 from devin-petersohn/devin/all_axis_46165.
Authored-by: Devin Petersohn <[email protected]>
Signed-off-by: Huaxin Gao <[email protected]>
---
python/pyspark/pandas/frame.py | 62 ++++++++++++++++------
.../pandas/tests/computation/test_any_all.py | 34 ++++++++++--
2 files changed, 77 insertions(+), 19 deletions(-)
diff --git a/python/pyspark/pandas/frame.py b/python/pyspark/pandas/frame.py
index 23ac31c8ebfb..63a8998487f5 100644
--- a/python/pyspark/pandas/frame.py
+++ b/python/pyspark/pandas/frame.py
@@ -11058,11 +11058,13 @@ defaultdict(<class 'list'>, {'col..., 'col...})]
Parameters
----------
- axis : {0 or 'index'}, default 0
+ axis : {0, 'index', 1, or 'columns'}, default 0
Indicate which axis or axes should be reduced.
* 0 / 'index' : reduce the index, return a Series whose index is
the
original column labels.
+ * 1 / 'columns' : reduce the columns, return a Series whose index
is the
+ original index.
bool_only : bool, default None
Include only boolean columns. If None, will attempt to use
everything,
@@ -11118,28 +11120,58 @@ defaultdict(<class 'list'>, {'col..., 'col...})]
dtype: bool
"""
axis = validate_axis(axis)
- if axis != 0:
- raise NotImplementedError('axis should be either 0 or "index"
currently.')
-
column_labels = self._internal.column_labels
if bool_only:
column_labels = self._bool_column_labels(column_labels)
if len(column_labels) == 0:
return ps.Series([], dtype=bool)
+ if axis == 0:
+ applied: List[PySparkColumn] = []
+ for label in column_labels:
+ scol = self._internal.spark_column_for(label)
- applied: List[PySparkColumn] = []
- for label in column_labels:
- scol = self._internal.spark_column_for(label)
+ if isinstance(self._internal.spark_type_for(label),
NumericType) or skipna:
+ # np.nan takes no effect to the result; None takes no
effect if `skipna`
+ all_col = F.min(F.coalesce(scol.cast("boolean"),
F.lit(True)))
+ else:
+ # Take None as False when not `skipna`
+ all_col = F.min(
+ F.when(scol.isNull(),
F.lit(False)).otherwise(scol.cast("boolean"))
+ )
+ applied.append(F.when(all_col.isNull(),
True).otherwise(all_col))
- if isinstance(self._internal.spark_type_for(label), NumericType)
or skipna:
- # np.nan takes no effect to the result; None takes no effect
if `skipna`
- all_col = F.min(F.coalesce(scol.cast("boolean"), F.lit(True)))
- else:
- # Take None as False when not `skipna`
- all_col = F.min(F.when(scol.isNull(),
F.lit(False)).otherwise(scol.cast("boolean")))
- applied.append(F.when(all_col.isNull(), True).otherwise(all_col))
+ return self._result_aggregated(column_labels, applied)
+ elif axis == 1:
+ from pyspark.pandas.series import first_series
- return self._result_aggregated(column_labels, applied)
+ sdf = self._internal.spark_frame.select(
+ *self._internal_frame.index_spark_columns,
+ F.least(
+ *[
+ F.coalesce(
+
self._internal.spark_column_for(label).cast("boolean"),
+ # pandas treats all NA values as True in `all()`
+ F.lit(True),
+ )
+ for label in column_labels
+ ],
+ F.lit(True), # Handle one-column DataFrame case
+ ).alias(SPARK_DEFAULT_SERIES_NAME),
+ )
+ return first_series(
+ DataFrame(
+ InternalFrame(
+ spark_frame=sdf,
+ index_spark_columns=self._internal.index_spark_columns,
+ index_names=self._internal.index_names,
+ index_fields=self._internal.index_fields,
+ column_labels=[None],
+ )
+ )
+ )
+ else:
+ # axis=None case - return single boolean value
+ raise NotImplementedError('axis should be 0, 1, "index", or
"columns" currently.')
def any(
self,
diff --git a/python/pyspark/pandas/tests/computation/test_any_all.py
b/python/pyspark/pandas/tests/computation/test_any_all.py
index a179cc927acf..9b35f0b612d5 100644
--- a/python/pyspark/pandas/tests/computation/test_any_all.py
+++ b/python/pyspark/pandas/tests/computation/test_any_all.py
@@ -62,6 +62,29 @@ class FrameAnyAllMixin:
self.assert_eq(psdf.all(bool_only=True), pdf.all(bool_only=True))
self.assert_eq(psdf.all(bool_only=False), pdf.all(bool_only=False))
+ # Test axis=1
+ self.assert_eq(psdf.all(axis=1), pdf.all(axis=1))
+ self.assert_eq(psdf.all(axis=1, bool_only=True), pdf.all(axis=1,
bool_only=True))
+ self.assert_eq(psdf.all(axis=1, bool_only=False), pdf.all(axis=1,
bool_only=False))
+
+ # Test axis='index'
+ self.assert_eq(psdf.all(axis="index"), pdf.all(axis="index"))
+ self.assert_eq(
+ psdf.all(axis="index", bool_only=True), pdf.all(axis="index",
bool_only=True)
+ )
+ self.assert_eq(
+ psdf.all(axis="index", bool_only=False), pdf.all(axis="index",
bool_only=False)
+ )
+
+ # Test axis='columns'
+ self.assert_eq(psdf.all(axis="columns"), pdf.all(axis="columns"))
+ self.assert_eq(
+ psdf.all(axis="columns", bool_only=True), pdf.all(axis="columns",
bool_only=True)
+ )
+ self.assert_eq(
+ psdf.all(axis="columns", bool_only=False), pdf.all(axis="columns",
bool_only=False)
+ )
+
columns.names = ["X", "Y"]
pdf.columns = columns
psdf.columns = columns
@@ -70,16 +93,19 @@ class FrameAnyAllMixin:
self.assert_eq(psdf.all(bool_only=True), pdf.all(bool_only=True))
self.assert_eq(psdf.all(bool_only=False), pdf.all(bool_only=False))
- with self.assertRaisesRegex(
- NotImplementedError, 'axis should be either 0 or "index"
currently.'
- ):
- psdf.all(axis=1)
+ # Test axis=1
+ self.assert_eq(psdf.all(axis=1), pdf.all(axis=1))
+ self.assert_eq(psdf.all(axis=1, bool_only=True), pdf.all(axis=1,
bool_only=True))
+ self.assert_eq(psdf.all(axis=1, bool_only=False), pdf.all(axis=1,
bool_only=False))
# Test skipna
pdf = pd.DataFrame({"A": [True, True], "B": [1, np.nan], "C": [True,
None]})
pdf.name = "x"
psdf = ps.from_pandas(pdf)
self.assert_eq(psdf[["A", "B"]].all(skipna=False), pdf[["A",
"B"]].all(skipna=False))
+ self.assert_eq(
+ psdf[["A", "B"]].all(axis=1, skipna=False), pdf[["A",
"B"]].all(axis=1, skipna=False)
+ )
self.assert_eq(psdf[["A", "C"]].all(skipna=False), pdf[["A",
"C"]].all(skipna=False))
self.assert_eq(psdf[["B", "C"]].all(skipna=False), pdf[["B",
"C"]].all(skipna=False))
self.assert_eq(psdf.all(skipna=False), pdf.all(skipna=False))
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]