This is an automated email from the ASF dual-hosted git repository.
ruifengz pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push:
new 1e3d1162ac77 [SPARK-55506][PYTHON] Pass explicit input schema to
`to_pandas` in `CogroupPandasUDFSerializer`
1e3d1162ac77 is described below
commit 1e3d1162ac77ff6bee80be146b9ba4642a5ad076
Author: Yicong Huang <[email protected]>
AuthorDate: Wed Feb 18 09:10:33 2026 +0800
[SPARK-55506][PYTHON] Pass explicit input schema to `to_pandas` in
`CogroupPandasUDFSerializer`
### What changes were proposed in this pull request?
Pass explicit Spark schema (derived from each Arrow table's schema via
`from_arrow_schema`) to `ArrowBatchTransformer.to_pandas()` in
`CogroupPandasUDFSerializer.load_stream()`, instead of passing `None` (the
inherited `_input_type`).
### Why are the changes needed?
`CogroupPandasUDFSerializer` is constructed without `input_type`, so
`_input_type` defaults to `None`. When `to_pandas()` receives `schema=None`, it
infers the Spark schema from the Arrow batch internally via
`from_arrow_type()`. This works, but:
1. The same `None` is used for both left and right DataFrames, which is
conceptually wrong since they can have different schemas.
2. The schema inference is implicit rather than explicit.
### Does this PR introduce _any_ user-facing change?
No.
### How was this patch tested?
Existing tests in `test_pandas_cogrouped_map.py`.
### Was this patch authored or co-authored using generative AI tooling?
No.
Closes #54293 from Yicong-Huang/SPARK-55506/fix/cogroup-input-type.
Authored-by: Yicong Huang <[email protected]>
Signed-off-by: Ruifeng Zheng <[email protected]>
---
python/pyspark/sql/pandas/serializers.py | 20 +++++++++++++++-----
1 file changed, 15 insertions(+), 5 deletions(-)
diff --git a/python/pyspark/sql/pandas/serializers.py
b/python/pyspark/sql/pandas/serializers.py
index fd7237c3426f..55cedd6f02dc 100644
--- a/python/pyspark/sql/pandas/serializers.py
+++ b/python/pyspark/sql/pandas/serializers.py
@@ -38,6 +38,7 @@ from pyspark.sql.conversion import (
ArrowBatchTransformer,
)
from pyspark.sql.pandas.types import (
+ from_arrow_schema,
is_variant,
to_arrow_type,
_create_converter_from_pandas,
@@ -1256,16 +1257,25 @@ class
CogroupPandasUDFSerializer(ArrowStreamPandasUDFSerializer):
import pyarrow as pa
for left_batches, right_batches in self._load_group_dataframes(stream,
num_dfs=2):
- yield tuple(
+ left_table = pa.Table.from_batches(left_batches)
+ right_table = pa.Table.from_batches(right_batches)
+ yield (
ArrowBatchTransformer.to_pandas(
- pa.Table.from_batches(batches),
+ left_table,
timezone=self._timezone,
- schema=self._input_type,
+ schema=from_arrow_schema(left_table.schema),
struct_in_pandas=self._struct_in_pandas,
ndarray_as_list=self._ndarray_as_list,
df_for_struct=self._df_for_struct,
- )
- for batches in (left_batches, right_batches)
+ ),
+ ArrowBatchTransformer.to_pandas(
+ right_table,
+ timezone=self._timezone,
+ schema=from_arrow_schema(right_table.schema),
+ struct_in_pandas=self._struct_in_pandas,
+ ndarray_as_list=self._ndarray_as_list,
+ df_for_struct=self._df_for_struct,
+ ),
)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]