This is an automated email from the ASF dual-hosted git repository.

ruifengz pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new 1e3d1162ac77 [SPARK-55506][PYTHON] Pass explicit input schema to 
`to_pandas` in `CogroupPandasUDFSerializer`
1e3d1162ac77 is described below

commit 1e3d1162ac77ff6bee80be146b9ba4642a5ad076
Author: Yicong Huang <[email protected]>
AuthorDate: Wed Feb 18 09:10:33 2026 +0800

    [SPARK-55506][PYTHON] Pass explicit input schema to `to_pandas` in 
`CogroupPandasUDFSerializer`
    
    ### What changes were proposed in this pull request?
    
    Pass explicit Spark schema (derived from each Arrow table's schema via 
`from_arrow_schema`) to `ArrowBatchTransformer.to_pandas()` in 
`CogroupPandasUDFSerializer.load_stream()`, instead of passing `None` (the 
inherited `_input_type`).
    
    ### Why are the changes needed?
    
    `CogroupPandasUDFSerializer` is constructed without `input_type`, so 
`_input_type` defaults to `None`. When `to_pandas()` receives `schema=None`, it 
infers the Spark schema from the Arrow batch internally via 
`from_arrow_type()`. This works, but:
    
    1. The same `None` is used for both left and right DataFrames, which is 
conceptually wrong since they can have different schemas.
    2. The schema inference is implicit rather than explicit.
    
    ### Does this PR introduce _any_ user-facing change?
    
    No.
    
    ### How was this patch tested?
    
    Existing tests in `test_pandas_cogrouped_map.py`.
    
    ### Was this patch authored or co-authored using generative AI tooling?
    
    No.
    
    Closes #54293 from Yicong-Huang/SPARK-55506/fix/cogroup-input-type.
    
    Authored-by: Yicong Huang <[email protected]>
    Signed-off-by: Ruifeng Zheng <[email protected]>
---
 python/pyspark/sql/pandas/serializers.py | 20 +++++++++++++++-----
 1 file changed, 15 insertions(+), 5 deletions(-)

diff --git a/python/pyspark/sql/pandas/serializers.py 
b/python/pyspark/sql/pandas/serializers.py
index fd7237c3426f..55cedd6f02dc 100644
--- a/python/pyspark/sql/pandas/serializers.py
+++ b/python/pyspark/sql/pandas/serializers.py
@@ -38,6 +38,7 @@ from pyspark.sql.conversion import (
     ArrowBatchTransformer,
 )
 from pyspark.sql.pandas.types import (
+    from_arrow_schema,
     is_variant,
     to_arrow_type,
     _create_converter_from_pandas,
@@ -1256,16 +1257,25 @@ class 
CogroupPandasUDFSerializer(ArrowStreamPandasUDFSerializer):
         import pyarrow as pa
 
         for left_batches, right_batches in self._load_group_dataframes(stream, 
num_dfs=2):
-            yield tuple(
+            left_table = pa.Table.from_batches(left_batches)
+            right_table = pa.Table.from_batches(right_batches)
+            yield (
                 ArrowBatchTransformer.to_pandas(
-                    pa.Table.from_batches(batches),
+                    left_table,
                     timezone=self._timezone,
-                    schema=self._input_type,
+                    schema=from_arrow_schema(left_table.schema),
                     struct_in_pandas=self._struct_in_pandas,
                     ndarray_as_list=self._ndarray_as_list,
                     df_for_struct=self._df_for_struct,
-                )
-                for batches in (left_batches, right_batches)
+                ),
+                ArrowBatchTransformer.to_pandas(
+                    right_table,
+                    timezone=self._timezone,
+                    schema=from_arrow_schema(right_table.schema),
+                    struct_in_pandas=self._struct_in_pandas,
+                    ndarray_as_list=self._ndarray_as_list,
+                    df_for_struct=self._df_for_struct,
+                ),
             )
 
 


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to