This is an automated email from the ASF dual-hosted git repository.

Yicong-Huang pushed a commit to branch branch-4.x
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-4.x by this push:
     new 1720ef16dc1f [SPARK-46776][PYTHON] Support pa.ChunkedArray columns in 
createDataFrame from pandas
1720ef16dc1f is described below

commit 1720ef16dc1fa92384965b15d0ffb2d7f4e307e5
Author: Yicong Huang <[email protected]>
AuthorDate: Thu May 28 23:53:39 2026 +0000

    [SPARK-46776][PYTHON] Support pa.ChunkedArray columns in createDataFrame 
from pandas
    
    ### What changes were proposed in this pull request?
    
    `pa.Array.from_pandas` may return a `pa.ChunkedArray` (e.g. for 
`string[pyarrow]` dtype, or string data over 2 GB), which 
`pa.RecordBatch.from_arrays` rejects. Route the conversion through 
`pa.Table.from_arrays(...).to_batches()` instead, which accepts both `Array` 
and `ChunkedArray` and emits zero-copy `RecordBatch`es aligned on a common 
chunk boundary.
    
    ### Why are the changes needed?
    
    Fix `createDataFrame` from pandas raising `TypeError: Cannot convert 
pyarrow.lib.ChunkedArray to pyarrow.lib.Array`. See 
[SPARK-46776](https://issues.apache.org/jira/browse/SPARK-46776).
    
    ### Does this PR introduce _any_ user-facing change?
    
    Yes. `createDataFrame(pandas_df)` no longer raises when a column is backed 
by a multi-chunk pyarrow array.
    
    ### How was this patch tested?
    
    New test 
`ArrowTestsMixin.test_createDataFrame_pandas_chunked_array_backed`, exercised 
on both classic and Spark Connect paths via the parity suite.
    
    ### Was this patch authored or co-authored using generative AI tooling?
    
    No
    
    Closes #56157 from Yicong-Huang/SPARK-46776.
    
    Authored-by: Yicong Huang <[email protected]>
    Signed-off-by: Yicong-Huang <[email protected]>
    (cherry picked from commit 2f4ed64204ef37fc1c7f479ad8f6a1e49e97b323)
    Signed-off-by: Yicong-Huang <[email protected]>
---
 python/pyspark/sql/connect/session.py        | 16 ++++++--------
 python/pyspark/sql/pandas/conversion.py      | 31 +++++++++++++++++++---------
 python/pyspark/sql/tests/arrow/test_arrow.py | 17 +++++++++++++++
 3 files changed, 44 insertions(+), 20 deletions(-)

diff --git a/python/pyspark/sql/connect/session.py 
b/python/pyspark/sql/connect/session.py
index 3edab12d852b..20d237b1c958 100644
--- a/python/pyspark/sql/connect/session.py
+++ b/python/pyspark/sql/connect/session.py
@@ -66,7 +66,7 @@ from pyspark.sql.connect.profiler import ProfilerCollector
 from pyspark.sql.connect.readwriter import DataFrameReader
 from pyspark.sql.connect.streaming.readwriter import DataStreamReader
 from pyspark.sql.connect.streaming.query import StreamingQueryManager
-from pyspark.sql.pandas.conversion import create_arrow_batch_from_pandas
+from pyspark.sql.pandas.conversion import create_arrow_table_from_pandas
 from pyspark.sql.pandas.types import (
     to_arrow_schema,
     _deduplicate_field_names,
@@ -628,15 +628,11 @@ class SparkSession:
             if len(data.columns) == 0:
                 _table = 
pa.Table.from_batches([pa.RecordBatch.from_pandas(data)])
             else:
-                _table = pa.Table.from_batches(
-                    [
-                        create_arrow_batch_from_pandas(
-                            [(c, st) for (_, c), st in zip(data.items(), 
spark_types)],
-                            timezone=cast(str, timezone),
-                            safecheck=safecheck == "true",
-                            prefers_large_types=prefers_large_types,
-                        )
-                    ]
+                _table = create_arrow_table_from_pandas(
+                    [(c, st) for (_, c), st in zip(data.items(), spark_types)],
+                    timezone=cast(str, timezone),
+                    safecheck=safecheck == "true",
+                    prefers_large_types=prefers_large_types,
                 )
 
             if isinstance(schema, StructType):
diff --git a/python/pyspark/sql/pandas/conversion.py 
b/python/pyspark/sql/pandas/conversion.py
index 3f5d68d10452..b488a49bcb6f 100644
--- a/python/pyspark/sql/pandas/conversion.py
+++ b/python/pyspark/sql/pandas/conversion.py
@@ -135,15 +135,25 @@ def create_arrow_array_from_pandas(
         raise PySparkValueError(error_msg % (series.dtype, series.name, 
arrow_type)) from e
 
 
-def create_arrow_batch_from_pandas(
+def create_arrow_table_from_pandas(
     series_with_types: Iterable[Tuple["pd.Series", Optional[DataType]]],
     *,
     timezone: Optional[str] = None,
     safecheck: bool = False,
     prefers_large_types: bool = False,
-) -> "pa.RecordBatch":
+) -> "pa.Table":
     """
-    Create an Arrow record batch from the given iterable of (series, 
spark_type) tuples.
+    Create an Arrow ``Table`` from the given iterable of (series, spark_type) 
tuples.
+
+    A ``pa.Table`` is used (rather than a single ``pa.RecordBatch``) because
+    ``pa.Array.from_pandas`` may return a ``pa.ChunkedArray`` when the input
+    pandas Series is backed by a chunked Arrow array (e.g. pyarrow-backed
+    extension dtypes such as ``string[pyarrow]``) or when the data exceeds
+    the maximum size of a single Arrow array (e.g. string data larger than
+    2 GB). ``pa.RecordBatch.from_arrays`` does not accept ``ChunkedArray``,
+    but ``pa.Table.from_arrays`` does. Call ``.to_batches()`` on the result
+    to obtain a zero-copy list of ``pa.RecordBatch`` aligned on a common
+    chunk boundary.
 
     Parameters
     ----------
@@ -158,8 +168,7 @@ def create_arrow_batch_from_pandas(
 
     Returns
     -------
-    pyarrow.RecordBatch
-        Arrow RecordBatch
+    pyarrow.Table
     """
     import pyarrow as pa
 
@@ -173,7 +182,7 @@ def create_arrow_batch_from_pandas(
         )
         for s, spark_type in series_with_types
     ]
-    return pa.RecordBatch.from_arrays(arrs, ["_%d" % i for i in 
range(len(arrs))])
+    return pa.Table.from_arrays(arrs, names=["_%d" % i for i in 
range(len(arrs))])
 
 
 def _convert_arrow_table_to_pandas(
@@ -999,15 +1008,17 @@ class SparkConversionMixin:
         if len(pdf.columns) == 0:
             arrow_batches = [pa.RecordBatch.from_pandas(pdf_slice) for 
pdf_slice in pdf_slices]
         else:
-            # Create Arrow batches directly using the standalone function
+            # Each slice may produce more than one RecordBatch when a column is
+            # backed by a ChunkedArray, so flatten the per-slice tables.
             arrow_batches = [
-                create_arrow_batch_from_pandas(
+                b
+                for pdf_slice in pdf_slices
+                for b in create_arrow_table_from_pandas(
                     [(c, t) for (_, c), t in zip(pdf_slice.items(), 
spark_types)],
                     timezone=timezone,
                     safecheck=safecheck,
                     prefers_large_types=prefers_large_var_types,
-                )
-                for pdf_slice in pdf_slices
+                ).to_batches()
             ]
 
         jsparkSession = self._jsparkSession
diff --git a/python/pyspark/sql/tests/arrow/test_arrow.py 
b/python/pyspark/sql/tests/arrow/test_arrow.py
index 3e31ff637180..086869a4c60a 100644
--- a/python/pyspark/sql/tests/arrow/test_arrow.py
+++ b/python/pyspark/sql/tests/arrow/test_arrow.py
@@ -748,6 +748,23 @@ class ArrowTestsMixin:
         self.spark.createDataFrame(pdf, schema=self.schema)
         self.assertTrue(pdf.equals(pdf_copy))
 
+    def test_createDataFrame_pandas_chunked_array_backed(self):
+        # SPARK-46776: pa.Array.from_pandas can return a pa.ChunkedArray when 
the
+        # input pandas Series is backed by a multi-chunk pyarrow array (e.g. 
the
+        # pyarrow-backed string extension dtype). pa.RecordBatch.from_arrays 
does
+        # not accept ChunkedArray, so createDataFrame previously failed with
+        # "Cannot convert pyarrow.lib.ChunkedArray to pyarrow.lib.Array".
+        chunked = pa.chunked_array([pa.array(["a", "b"]), pa.array(["c", "d", 
"e"])])
+        self.assertEqual(chunked.num_chunks, 2)
+        pdf = pd.DataFrame({"s": pd.Series(chunked, dtype="string[pyarrow]")})
+
+        for arrow_enabled in [True, False]:
+            with self.subTest(arrow_enabled=arrow_enabled):
+                with 
self.sql_conf({"spark.sql.execution.arrow.pyspark.enabled": arrow_enabled}):
+                    df = self.spark.createDataFrame(pdf)
+                self.assertEqual(df.count(), 5)
+                self.assertEqual([r.s for r in df.collect()], ["a", "b", "c", 
"d", "e"])
+
     def test_createDataFrame_arrow_truncate_timestamp(self):
         t_in = pa.Table.from_arrays(
             [pa.array([1234567890123456789], type=pa.timestamp("ns", 
tz="UTC"))], names=["ts"]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to