This is an automated email from the ASF dual-hosted git repository.
Yicong-Huang pushed a commit to branch branch-4.x
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/branch-4.x by this push:
new 1720ef16dc1f [SPARK-46776][PYTHON] Support pa.ChunkedArray columns in
createDataFrame from pandas
1720ef16dc1f is described below
commit 1720ef16dc1fa92384965b15d0ffb2d7f4e307e5
Author: Yicong Huang <[email protected]>
AuthorDate: Thu May 28 23:53:39 2026 +0000
[SPARK-46776][PYTHON] Support pa.ChunkedArray columns in createDataFrame
from pandas
### What changes were proposed in this pull request?
`pa.Array.from_pandas` may return a `pa.ChunkedArray` (e.g. for
`string[pyarrow]` dtype, or string data over 2 GB), which
`pa.RecordBatch.from_arrays` rejects. Route the conversion through
`pa.Table.from_arrays(...).to_batches()` instead, which accepts both `Array`
and `ChunkedArray` and emits zero-copy `RecordBatch`es aligned on a common
chunk boundary.
### Why are the changes needed?
Fix `createDataFrame` from pandas raising `TypeError: Cannot convert
pyarrow.lib.ChunkedArray to pyarrow.lib.Array`. See
[SPARK-46776](https://issues.apache.org/jira/browse/SPARK-46776).
### Does this PR introduce _any_ user-facing change?
Yes. `createDataFrame(pandas_df)` no longer raises when a column is backed
by a multi-chunk pyarrow array.
### How was this patch tested?
New test
`ArrowTestsMixin.test_createDataFrame_pandas_chunked_array_backed`, exercised
on both classic and Spark Connect paths via the parity suite.
### Was this patch authored or co-authored using generative AI tooling?
No
Closes #56157 from Yicong-Huang/SPARK-46776.
Authored-by: Yicong Huang <[email protected]>
Signed-off-by: Yicong-Huang <[email protected]>
(cherry picked from commit 2f4ed64204ef37fc1c7f479ad8f6a1e49e97b323)
Signed-off-by: Yicong-Huang <[email protected]>
---
python/pyspark/sql/connect/session.py | 16 ++++++--------
python/pyspark/sql/pandas/conversion.py | 31 +++++++++++++++++++---------
python/pyspark/sql/tests/arrow/test_arrow.py | 17 +++++++++++++++
3 files changed, 44 insertions(+), 20 deletions(-)
diff --git a/python/pyspark/sql/connect/session.py
b/python/pyspark/sql/connect/session.py
index 3edab12d852b..20d237b1c958 100644
--- a/python/pyspark/sql/connect/session.py
+++ b/python/pyspark/sql/connect/session.py
@@ -66,7 +66,7 @@ from pyspark.sql.connect.profiler import ProfilerCollector
from pyspark.sql.connect.readwriter import DataFrameReader
from pyspark.sql.connect.streaming.readwriter import DataStreamReader
from pyspark.sql.connect.streaming.query import StreamingQueryManager
-from pyspark.sql.pandas.conversion import create_arrow_batch_from_pandas
+from pyspark.sql.pandas.conversion import create_arrow_table_from_pandas
from pyspark.sql.pandas.types import (
to_arrow_schema,
_deduplicate_field_names,
@@ -628,15 +628,11 @@ class SparkSession:
if len(data.columns) == 0:
_table =
pa.Table.from_batches([pa.RecordBatch.from_pandas(data)])
else:
- _table = pa.Table.from_batches(
- [
- create_arrow_batch_from_pandas(
- [(c, st) for (_, c), st in zip(data.items(),
spark_types)],
- timezone=cast(str, timezone),
- safecheck=safecheck == "true",
- prefers_large_types=prefers_large_types,
- )
- ]
+ _table = create_arrow_table_from_pandas(
+ [(c, st) for (_, c), st in zip(data.items(), spark_types)],
+ timezone=cast(str, timezone),
+ safecheck=safecheck == "true",
+ prefers_large_types=prefers_large_types,
)
if isinstance(schema, StructType):
diff --git a/python/pyspark/sql/pandas/conversion.py
b/python/pyspark/sql/pandas/conversion.py
index 3f5d68d10452..b488a49bcb6f 100644
--- a/python/pyspark/sql/pandas/conversion.py
+++ b/python/pyspark/sql/pandas/conversion.py
@@ -135,15 +135,25 @@ def create_arrow_array_from_pandas(
raise PySparkValueError(error_msg % (series.dtype, series.name,
arrow_type)) from e
-def create_arrow_batch_from_pandas(
+def create_arrow_table_from_pandas(
series_with_types: Iterable[Tuple["pd.Series", Optional[DataType]]],
*,
timezone: Optional[str] = None,
safecheck: bool = False,
prefers_large_types: bool = False,
-) -> "pa.RecordBatch":
+) -> "pa.Table":
"""
- Create an Arrow record batch from the given iterable of (series,
spark_type) tuples.
+ Create an Arrow ``Table`` from the given iterable of (series, spark_type)
tuples.
+
+ A ``pa.Table`` is used (rather than a single ``pa.RecordBatch``) because
+ ``pa.Array.from_pandas`` may return a ``pa.ChunkedArray`` when the input
+ pandas Series is backed by a chunked Arrow array (e.g. pyarrow-backed
+ extension dtypes such as ``string[pyarrow]``) or when the data exceeds
+ the maximum size of a single Arrow array (e.g. string data larger than
+ 2 GB). ``pa.RecordBatch.from_arrays`` does not accept ``ChunkedArray``,
+ but ``pa.Table.from_arrays`` does. Call ``.to_batches()`` on the result
+ to obtain a zero-copy list of ``pa.RecordBatch`` aligned on a common
+ chunk boundary.
Parameters
----------
@@ -158,8 +168,7 @@ def create_arrow_batch_from_pandas(
Returns
-------
- pyarrow.RecordBatch
- Arrow RecordBatch
+ pyarrow.Table
"""
import pyarrow as pa
@@ -173,7 +182,7 @@ def create_arrow_batch_from_pandas(
)
for s, spark_type in series_with_types
]
- return pa.RecordBatch.from_arrays(arrs, ["_%d" % i for i in
range(len(arrs))])
+ return pa.Table.from_arrays(arrs, names=["_%d" % i for i in
range(len(arrs))])
def _convert_arrow_table_to_pandas(
@@ -999,15 +1008,17 @@ class SparkConversionMixin:
if len(pdf.columns) == 0:
arrow_batches = [pa.RecordBatch.from_pandas(pdf_slice) for
pdf_slice in pdf_slices]
else:
- # Create Arrow batches directly using the standalone function
+ # Each slice may produce more than one RecordBatch when a column is
+ # backed by a ChunkedArray, so flatten the per-slice tables.
arrow_batches = [
- create_arrow_batch_from_pandas(
+ b
+ for pdf_slice in pdf_slices
+ for b in create_arrow_table_from_pandas(
[(c, t) for (_, c), t in zip(pdf_slice.items(),
spark_types)],
timezone=timezone,
safecheck=safecheck,
prefers_large_types=prefers_large_var_types,
- )
- for pdf_slice in pdf_slices
+ ).to_batches()
]
jsparkSession = self._jsparkSession
diff --git a/python/pyspark/sql/tests/arrow/test_arrow.py
b/python/pyspark/sql/tests/arrow/test_arrow.py
index 3e31ff637180..086869a4c60a 100644
--- a/python/pyspark/sql/tests/arrow/test_arrow.py
+++ b/python/pyspark/sql/tests/arrow/test_arrow.py
@@ -748,6 +748,23 @@ class ArrowTestsMixin:
self.spark.createDataFrame(pdf, schema=self.schema)
self.assertTrue(pdf.equals(pdf_copy))
+ def test_createDataFrame_pandas_chunked_array_backed(self):
+ # SPARK-46776: pa.Array.from_pandas can return a pa.ChunkedArray when
the
+ # input pandas Series is backed by a multi-chunk pyarrow array (e.g.
the
+ # pyarrow-backed string extension dtype). pa.RecordBatch.from_arrays
does
+ # not accept ChunkedArray, so createDataFrame previously failed with
+ # "Cannot convert pyarrow.lib.ChunkedArray to pyarrow.lib.Array".
+ chunked = pa.chunked_array([pa.array(["a", "b"]), pa.array(["c", "d",
"e"])])
+ self.assertEqual(chunked.num_chunks, 2)
+ pdf = pd.DataFrame({"s": pd.Series(chunked, dtype="string[pyarrow]")})
+
+ for arrow_enabled in [True, False]:
+ with self.subTest(arrow_enabled=arrow_enabled):
+ with
self.sql_conf({"spark.sql.execution.arrow.pyspark.enabled": arrow_enabled}):
+ df = self.spark.createDataFrame(pdf)
+ self.assertEqual(df.count(), 5)
+ self.assertEqual([r.s for r in df.collect()], ["a", "b", "c",
"d", "e"])
+
def test_createDataFrame_arrow_truncate_timestamp(self):
t_in = pa.Table.from_arrays(
[pa.array([1234567890123456789], type=pa.timestamp("ns",
tz="UTC"))], names=["ts"]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]