This is an automated email from the ASF dual-hosted git repository.
zhengruifeng pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push:
new 4306d0265a5a [SPARK-56937][PYTHON] Raise error on wrong column count
in Arrow grouped/cogrouped map UDF
4306d0265a5a is described below
commit 4306d0265a5a07a82e849061d14d61ca898b62a8
Author: Yicong Huang <[email protected]>
AuthorDate: Wed May 20 09:14:11 2026 +0800
[SPARK-56937][PYTHON] Raise error on wrong column count in Arrow
grouped/cogrouped map UDF
### What changes were proposed in this pull request?
In `verify_arrow_result` (`python/pyspark/worker.py`), the positional
branch zips expected and actual columns without a length check, silently
truncating to the shorter list. This PR raises `RESULT_COLUMN_SCHEMA_MISMATCH`
on length mismatch.
### Why are the changes needed?
Latent since SPARK-40559. Under `assignColumnsByName=false`, a UDF
returning the wrong number of columns either silently drops data (too many) or
surfaces a JVM `ArrayIndexOutOfBoundsException` (too few). The name-based
branch already raises a friendly error; positional should be symmetric.
Affects `SQL_GROUPED_MAP_ARROW_UDF`, `SQL_GROUPED_MAP_ARROW_ITER_UDF`,
`SQL_COGROUPED_MAP_ARROW_UDF`.
### Does this PR introduce _any_ user-facing change?
Yes. Wrong column count under positional mode now raises
`RESULT_COLUMN_SCHEMA_MISMATCH` instead of silent truncation or a JVM error.
### How was this patch tested?
Added
`test_apply_in_arrow_returning_wrong_column_count_positional_assignment` in
`test_arrow_grouped_map.py` (covers iterator variant via `function_variations`)
and `test_arrow_cogrouped_map.py`, exercising both too-many and too-few
columns. Full grouped/cogrouped Arrow map suites pass.
### Was this patch authored or co-authored using generative AI tooling?
No.
Closes #55978 from Yicong-Huang/SPARK-56937.
Authored-by: Yicong Huang <[email protected]>
Signed-off-by: Ruifeng Zheng <[email protected]>
---
.../sql/tests/arrow/test_arrow_cogrouped_map.py | 28 ++++++++++++++++++
.../sql/tests/arrow/test_arrow_grouped_map.py | 33 ++++++++++++++++++++++
python/pyspark/worker.py | 8 ++++++
3 files changed, 69 insertions(+)
diff --git a/python/pyspark/sql/tests/arrow/test_arrow_cogrouped_map.py
b/python/pyspark/sql/tests/arrow/test_arrow_cogrouped_map.py
index cfeba6cbc316..5b272f89bb5d 100644
--- a/python/pyspark/sql/tests/arrow/test_arrow_cogrouped_map.py
+++ b/python/pyspark/sql/tests/arrow/test_arrow_cogrouped_map.py
@@ -230,6 +230,34 @@ class CogroupedMapInArrowTestsMixin:
# stats returns three columns while here we set schema with
two columns
self.cogrouped.applyInArrow(stats, schema="id long, m
double").collect()
+ def
test_apply_in_arrow_returning_wrong_column_count_positional_assignment(self):
+ def too_many_cols(key, left, right):
+ return pa.Table.from_pydict(
+ {
+ "a": [key[0].as_py()],
+ "b": [pc.mean(left.column("v")).as_py()],
+ "c": [pc.mean(right.column("v")).as_py()],
+ }
+ )
+
+ def too_few_cols(key, left, right):
+ return pa.Table.from_pydict({"a": [key[0].as_py()]})
+
+ with self.sql_conf(
+
{"spark.sql.legacy.execution.pandas.groupedMap.assignColumnsByName": False}
+ ):
+ with self.quiet():
+ for func, expected, actual in [
+ (too_many_cols, 2, 3),
+ (too_few_cols, 2, 1),
+ ]:
+ with self.subTest(func=func.__name__):
+ with self.assertRaisesRegex(
+ PythonException,
+ rf"Expected: {expected}.*Actual: {actual}",
+ ):
+ self.cogrouped.applyInArrow(func, schema="a long,
b double").collect()
+
def test_apply_in_arrow_returning_empty_dataframe(self):
def odd_means(key, left, right):
if key[0].as_py() == 0:
diff --git a/python/pyspark/sql/tests/arrow/test_arrow_grouped_map.py
b/python/pyspark/sql/tests/arrow/test_arrow_grouped_map.py
index cefce8d0cf65..e0d40cfebe59 100644
--- a/python/pyspark/sql/tests/arrow/test_arrow_grouped_map.py
+++ b/python/pyspark/sql/tests/arrow/test_arrow_grouped_map.py
@@ -254,6 +254,39 @@ class ApplyInArrowTestsMixin:
func_variation, schema="id long, m double"
).collect()
+ def
test_apply_in_arrow_returning_wrong_column_count_positional_assignment(self):
+ df = self.data
+
+ def too_many_cols(key, table):
+ return pa.Table.from_pydict(
+ {
+ "a": [key[0].as_py()],
+ "b": [pc.mean(table.column("v")).as_py()],
+ "c": [pc.stddev(table.column("v")).as_py()],
+ }
+ )
+
+ def too_few_cols(key, table):
+ return pa.Table.from_pydict({"a": [key[0].as_py()]})
+
+ with self.sql_conf(
+
{"spark.sql.legacy.execution.pandas.groupedMap.assignColumnsByName": False}
+ ):
+ with self.quiet():
+ for func, expected, actual in [
+ (too_many_cols, 2, 3),
+ (too_few_cols, 2, 1),
+ ]:
+ with self.subTest(func=func.__name__):
+ for func_variation in function_variations(func):
+ with self.assertRaisesRegex(
+ PythonException,
+ rf"Expected: {expected}.*Actual: {actual}",
+ ):
+ df.groupby("id").applyInArrow(
+ func_variation, schema="a long, b double"
+ ).collect()
+
def test_apply_in_arrow_returning_empty_dataframe(self):
df = self.data
diff --git a/python/pyspark/worker.py b/python/pyspark/worker.py
index 2d877565f55c..2accab06e2d6 100644
--- a/python/pyspark/worker.py
+++ b/python/pyspark/worker.py
@@ -547,6 +547,14 @@ def verify_arrow_result(result, assign_cols_by_name,
expected_cols_and_types):
actual_cols_and_types = [
(name, dataType) for name, dataType in
zip(result.schema.names, result.schema.types)
]
+ if len(actual_cols_and_types) != len(expected_cols_and_types):
+ raise PySparkRuntimeError(
+ errorClass="RESULT_COLUMN_SCHEMA_MISMATCH",
+ messageParameters={
+ "expected": str(len(expected_cols_and_types)),
+ "actual": str(len(actual_cols_and_types)),
+ },
+ )
column_types = [
(expected_name, expected_type, actual_type)
for (expected_name, expected_type), (actual_name, actual_type)
in zip(
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]