Kyle Osborne created SPARK-46776:
------------------------------------
Summary: Pandas UDF Serialization Error: Expected Array, got
<class 'pyarrow.lib.ChunkedArray'>
Key: SPARK-46776
URL: https://issues.apache.org/jira/browse/SPARK-46776
Project: Spark
Issue Type: Bug
Components: PySpark
Affects Versions: 3.5.0
Environment: python 3.11.7
-> pip install pyspark[sql]
pyspark: 3.5.0
pyarrow: 14.0.2
pandas: 2.1.4
Reporter: Kyle Osborne
When returning a large pandas Dataframe from a UDF, it can get converted to a
"pyarrow.lib.ChunkedArray", which is not allowed in the pyarrow
"StructArray.from_arrays" function, giving the error message:
{code:java}
File "pyarrow/array.pxi", line 3211, in pyarrow.lib.StructArray.from_arrays
TypeError: Expected Array, got <class 'pyarrow.lib.ChunkedArray'> {code}
code to reproduce:
{code:java}
from pyspark.sql import DataFrame, SparkSession
from typing import List
import pyspark.sql.types as T
from pyspark.sql.functions import pandas_udf
import numpy as np
import pyarrow as pa
import pandas as pd
def large_pandas_udf(iterator):
for pdf in iterator:
big_unique_strings = ['x' * ((1 << 20) - 1) + str(i % 10) for i in
range(10)]
strings_list = [big_unique_strings[i % 10] for i in range(1 << 11)]
arr = np.array(strings_list)
yield pd.DataFrame(arr, columns=['string_column'])
def large_pandas_workaround_udf(iterator):
for pdf in iterator:
big_unique_strings = ['x' * ((1 << 20) - 1) + str(i % 10) for i in
range(10)]
strings_list = [big_unique_strings[i % 10] for i in range(1 << 11)]
arr = np.array(strings_list)
num_chunks = 20
chunk_size = len(arr) // 20
for i in range(num_chunks+10):
yield pd.DataFrame(arr[i*chunk_size:(i+1)*chunk_size],
columns=['string_column'])
def run_udf(spark, udf=large_pandas_udf):
schema = T.StructType([T.StructField("string_column", T.StringType())])
spark.conf.set('spark.sql.execution.pyspark.udf.simplifiedTraceback.enabled',
'false') # get full error traceback
df = spark.createDataFrame([['v']], schema)
return df.mapInPandas(udf, schema).count() {code}
stacktrace:
{quote}{{Traceback (most recent call last):}}
{{ File "<stdin>", line 1, in <module>}}
{{ File "<string>", line 30, in run_udf}}
{{ File
"/Users/kosborne/.pyenv/versions/py11/lib/python3.11/site-packages/pyspark/sql/dataframe.py",
line 1234, in count}}
{{ return int(self._jdf.count())}}
{{ ^^^^^^^^^^^^^^^^^}}
{{ File
"/Users/kosborne/.pyenv/versions/py11/lib/python3.11/site-packages/pyspark/python/lib/py4j-0.10.9.7-src.zip/py4j/java_gateway.py",
line 1322, in _{_}call{_}_}}
{{ File
"/Users/kosborne/.pyenv/versions/py11/lib/python3.11/site-packages/pyspark/errors/exceptions/captured.py",
line 185, in deco}}
{{ raise converted from None}}
{{pyspark.errors.exceptions.captured.PythonException:}}
{{ An exception was thrown from the Python worker. Please see the stack trace
below.}}
{{Traceback (most recent call last):}}
{{ File
"/Users/kosborne/.pyenv/versions/py11/lib/python3.11/site-packages/pyspark/python/lib/pyspark.zip/pyspark/worker.py",
line 1247, in main}}
{{ process()}}
{{ File
"/Users/kosborne/.pyenv/versions/py11/lib/python3.11/site-packages/pyspark/python/lib/pyspark.zip/pyspark/worker.py",
line 1239, in process}}
{{ serializer.dump_stream(out_iter, outfile)}}
{{ File
"/Users/kosborne/.pyenv/versions/py11/lib/python3.11/site-packages/pyspark/python/lib/pyspark.zip/pyspark/sql/pandas/serializers.py",
line 470, in dump_stream}}
{{ return ArrowStreamSerializer.dump_stream(self,
init_stream_yield_batches(), stream)}}
{{
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^}}
{{ File
"/Users/kosborne/.pyenv/versions/py11/lib/python3.11/site-packages/pyspark/python/lib/pyspark.zip/pyspark/sql/pandas/serializers.py",
line 100, in dump_stream}}
{{ for batch in iterator:}}
{{ File
"/Users/kosborne/.pyenv/versions/py11/lib/python3.11/site-packages/pyspark/python/lib/pyspark.zip/pyspark/sql/pandas/serializers.py",
line 464, in init_stream_yield_batches}}
{{ batch = self._create_batch(series)}}
{{ ^^^^^^^^^^^^^^^^^^^^^^^^^^}}
{{ File
"/Users/kosborne/.pyenv/versions/py11/lib/python3.11/site-packages/pyspark/python/lib/pyspark.zip/pyspark/sql/pandas/serializers.py",
line 448, in _create_batch}}
{{ arrs.append(self._create_struct_array(s, t))}}
{{ ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^}}
{{ File
"/Users/kosborne/.pyenv/versions/py11/lib/python3.11/site-packages/pyspark/python/lib/pyspark.zip/pyspark/sql/pandas/serializers.py",
line 410, in _create_struct_array}}
{{ return pa.StructArray.from_arrays(struct_arrs, struct_names)}}
{{ ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^}}
{{ File "pyarrow/array.pxi", line 3211, in
pyarrow.lib.StructArray.from_arrays}}
{{TypeError: Expected Array, got <class 'pyarrow.lib.ChunkedArray'>}}
{quote}
Related github issue discussing this pyarrow array constructor behavior:
[https://github.com/apache/arrow/issues/34755]
--
This message was sent by Atlassian Jira
(v8.20.10#820010)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]