[ 
https://issues.apache.org/jira/browse/SPARK-46776?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Yicong Huang resolved SPARK-46776.
----------------------------------
    Fix Version/s: 4.3.0
       Resolution: Fixed

Issue resolved by pull request 56157
[https://github.com/apache/spark/pull/56157]

> Pandas UDF Serialization Error: Expected Array, got <class 
> 'pyarrow.lib.ChunkedArray'>
> --------------------------------------------------------------------------------------
>
>                 Key: SPARK-46776
>                 URL: https://issues.apache.org/jira/browse/SPARK-46776
>             Project: Spark
>          Issue Type: Bug
>          Components: PySpark
>    Affects Versions: 3.5.0
>         Environment: python 3.11.7
> -> pip install pyspark[sql]
> pyspark: 3.5.0
> pyarrow: 14.0.2
> pandas: 2.1.4
>            Reporter: Kyle Osborne
>            Assignee: Yicong Huang
>            Priority: Major
>              Labels: pull-request-available
>             Fix For: 4.3.0
>
>
> When returning a large pandas Dataframe from a UDF, it can get converted to a 
> "pyarrow.lib.ChunkedArray", which is not allowed in the pyarrow 
> "StructArray.from_arrays" function, giving the error message:
> {code:java}
> File "pyarrow/array.pxi", line 3211, in pyarrow.lib.StructArray.from_arrays 
> TypeError: Expected Array, got <class 'pyarrow.lib.ChunkedArray'> {code}
> code to reproduce:
> {code:java}
> from pyspark.sql import DataFrame, SparkSession
> from typing import List
> import pyspark.sql.types as T
> from pyspark.sql.functions import pandas_udf
> import numpy as np
> import pyarrow as pa
> import pandas as pd
> def large_pandas_udf(iterator):
>     for pdf in iterator:
>         big_unique_strings = ['x' * ((1 << 20) - 1) + str(i % 10) for i in 
> range(10)]
>         strings_list = [big_unique_strings[i % 10] for i in range(1 << 11)]
>         arr = np.array(strings_list)
>         yield pd.DataFrame(arr, columns=['string_column'])
> def large_pandas_workaround_udf(iterator):
>     for pdf in iterator:
>         big_unique_strings = ['x' * ((1 << 20) - 1) + str(i % 10) for i in 
> range(10)]
>         strings_list = [big_unique_strings[i % 10] for i in range(1 << 11)]
>         arr = np.array(strings_list)
>         num_chunks = 20
>         chunk_size = len(arr) // 20
>         for i in range(num_chunks+10):
>             yield pd.DataFrame(arr[i*chunk_size:(i+1)*chunk_size], 
> columns=['string_column'])
> def run_udf(spark, udf=large_pandas_udf):
>     schema = T.StructType([T.StructField("string_column", T.StringType())])
>     
> spark.conf.set('spark.sql.execution.pyspark.udf.simplifiedTraceback.enabled', 
> 'false') # get full error traceback
>     df = spark.createDataFrame([['v']], schema)
>     return df.mapInPandas(udf, schema).count() {code}
>  
> stacktrace:
> {quote}{{Traceback (most recent call last):}}
> {{  File "<stdin>", line 1, in <module>}}
> {{  File "<string>", line 30, in run_udf}}
> {{  File 
> "/Users/kosborne/.pyenv/versions/py11/lib/python3.11/site-packages/pyspark/sql/dataframe.py",
>  line 1234, in count}}
> {{    return int(self._jdf.count())}}
> {{               ^^^^^^^^^^^^^^^^^}}
> {{  File 
> "/Users/kosborne/.pyenv/versions/py11/lib/python3.11/site-packages/pyspark/python/lib/py4j-0.10.9.7-src.zip/py4j/java_gateway.py",
>  line 1322, in _{_}call{_}_}}
> {{  File 
> "/Users/kosborne/.pyenv/versions/py11/lib/python3.11/site-packages/pyspark/errors/exceptions/captured.py",
>  line 185, in deco}}
> {{    raise converted from None}}
> {{pyspark.errors.exceptions.captured.PythonException:}}
> {{  An exception was thrown from the Python worker. Please see the stack 
> trace below.}}
> {{Traceback (most recent call last):}}
> {{  File 
> "/Users/kosborne/.pyenv/versions/py11/lib/python3.11/site-packages/pyspark/python/lib/pyspark.zip/pyspark/worker.py",
>  line 1247, in main}}
> {{    process()}}
> {{  File 
> "/Users/kosborne/.pyenv/versions/py11/lib/python3.11/site-packages/pyspark/python/lib/pyspark.zip/pyspark/worker.py",
>  line 1239, in process}}
> {{    serializer.dump_stream(out_iter, outfile)}}
> {{  File 
> "/Users/kosborne/.pyenv/versions/py11/lib/python3.11/site-packages/pyspark/python/lib/pyspark.zip/pyspark/sql/pandas/serializers.py",
>  line 470, in dump_stream}}
> {{    return ArrowStreamSerializer.dump_stream(self, 
> init_stream_yield_batches(), stream)}}
> {{           
> ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^}}
> {{  File 
> "/Users/kosborne/.pyenv/versions/py11/lib/python3.11/site-packages/pyspark/python/lib/pyspark.zip/pyspark/sql/pandas/serializers.py",
>  line 100, in dump_stream}}
> {{    for batch in iterator:}}
> {{  File 
> "/Users/kosborne/.pyenv/versions/py11/lib/python3.11/site-packages/pyspark/python/lib/pyspark.zip/pyspark/sql/pandas/serializers.py",
>  line 464, in init_stream_yield_batches}}
> {{    batch = self._create_batch(series)}}
> {{            ^^^^^^^^^^^^^^^^^^^^^^^^^^}}
> {{  File 
> "/Users/kosborne/.pyenv/versions/py11/lib/python3.11/site-packages/pyspark/python/lib/pyspark.zip/pyspark/sql/pandas/serializers.py",
>  line 448, in _create_batch}}
> {{    arrs.append(self._create_struct_array(s, t))}}
> {{                ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^}}
> {{  File 
> "/Users/kosborne/.pyenv/versions/py11/lib/python3.11/site-packages/pyspark/python/lib/pyspark.zip/pyspark/sql/pandas/serializers.py",
>  line 410, in _create_struct_array}}
> {{    return pa.StructArray.from_arrays(struct_arrs, struct_names)}}
> {{           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^}}
> {{  File "pyarrow/array.pxi", line 3211, in 
> pyarrow.lib.StructArray.from_arrays}}
> {{TypeError: Expected Array, got <class 'pyarrow.lib.ChunkedArray'>}}
> {quote}
> Related github issue discussing this pyarrow array constructor behavior: 
> [https://github.com/apache/arrow/issues/34755]



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to