[
https://issues.apache.org/jira/browse/ARROW-2590?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16818419#comment-16818419
]
Christopher Groskopf commented on ARROW-2590:
---------------------------------------------
I'm experiencing this issue running on DataBricks 5.3 (Spark 2.4). My traceback
is nearly identical to the OP:
{code:java}
...
File "/databricks/spark/python/pyspark/serializers.py", line 264, in
create_array
return pa.Array.from_pandas(s, mask=mask, type=t, safe=False)
File "pyarrow/array.pxi", line 536, in pyarrow.lib.Array.from_pandas
File "pyarrow/array.pxi", line 176, in pyarrow.lib.array
File "pyarrow/array.pxi", line 85, in pyarrow.lib._ndarray_to_array
File "pyarrow/error.pxi", line 81, in pyarrow.lib.check_status
pyarrow.lib.ArrowInvalid: 'utf-32-le' codec can't decode bytes in position 0-3:
code point in surrogate code point range(0xd800, 0xe000)
{code}
As best I can tell this is something _specifically_ to do with returning string
data when LongType is expected. The reproducible example above does *not* fail
for me and so far I haven't been able to write a test that replicates the issue
locally.
I have managed to work around it fairly easily by changingg the column to be
return string instead of long. (Which is actually what it should have been in
the first place.) However, this was pretty difficult to diagnose and I suspect
there is some general case here that should be getting trapped higher up the
call stack.
Versions:
* PySpark: 2.4.0
* pandas 0.23.4
* pyarrow: 0.12.1
> [Python] Pyspark python_udf serialization error on grouped map (Amazon EMR)
> ---------------------------------------------------------------------------
>
> Key: ARROW-2590
> URL: https://issues.apache.org/jira/browse/ARROW-2590
> Project: Apache Arrow
> Issue Type: Bug
> Components: Python
> Affects Versions: 0.9.0
> Environment: Amazon EMR 5.13
> Spark 2.3.0
> PyArrow 0.9.0 (and 0.8.0)
> Pandas 0.22.0 (and 0.21.1)
> Numpy 1.14.1
> Reporter: Daniel Fithian
> Priority: Critical
>
> I am writing a python_udf grouped map aggregation on Spark 2.3.0 in Amazon
> EMR. When I try to run any aggregation, I get the following Python stack
> trace:
> {quote}{{18/05/16 14:08:56 ERROR Utils: Aborting task}}
> {{ org.apache.spark.api.python.PythonException: Traceback (most recent call
> last):}}
> {{ \{{ File
> "/mnt/yarn/usercache/hadoop/appcache/application_1526400761989_0068/container_1526400761989_0068_01_000002/pyspark.zip/pyspark/worker.py",
> line 229, in m}}}}
> {{ ain}}
> {{ \{{ process()}}}}
> {{ \{{ File
> "/mnt/yarn/usercache/hadoop/appcache/application_1526400761989_0068/container_1526400761989_0068_01_000002/pyspark.zip/pyspark/worker.py",
> line 224, in p}}}}
> {{ rocess}}
> {{ \{{ serializer.dump_stream(func(split_index, iterator), outfile)}}}}
> {{ \{{ File
> "/mnt/yarn/usercache/hadoop/appcache/application_1526400761989_0068/container_1526400761989_0068_01_000002/pyspark.zip/pyspark/serializers.py",
> line 261,}}}}
> {{ \{{ in dump_stream}}}}
> {{ \{{ batch = _create_batch(series, self._timezone)}}}}
> {{ \{{ File
> "/mnt/yarn/usercache/hadoop/appcache/application_1526400761989_0068/container_1526400761989_0068_01_000002/pyspark.zip/pyspark/serializers.py",
> line 239,}}}}
> {{ \{{ in _create_batch}}}}
> {{ {{ arrs = [create_array(s, t) for s, t in series]}}}}
> {{ \{{ File
> "/mnt/yarn/usercache/hadoop/appcache/application_1526400761989_0068/container_1526400761989_0068_01_000002/pyspark.zip/pyspark/serializers.py",
> line 239,}}}}
> {{ \{{ in <listcomp>}}}}
> {{ {{ arrs = [create_array(s, t) for s, t in series]}}}}
> {{ \{{ File
> "/mnt/yarn/usercache/hadoop/appcache/application_1526400761989_0068/container_1526400761989_0068_01_000002/pyspark.zip/pyspark/serializers.py",
> line 237, in create_array}}}}
> {{ \{{ return pa.Array.from_pandas(s, mask=mask, type=t)}}}}
> {{ \{{ File "array.pxi", line 372, in pyarrow.lib.Array.from_pandas}}}}
> {{ \{{ File "array.pxi", line 177, in pyarrow.lib.array}}}}
> {{ \{{ File "array.pxi", line 77, in pyarrow.lib._ndarray_to_array}}}}
> {{ \{{ File "error.pxi", line 98, in pyarrow.lib.check_status}}}}
> {{ pyarrow.lib.ArrowException: Unknown error: 'utf-32-le' codec can't decode
> bytes in position 0-3: code point not in range(0x110000)}}{quote}
> To be clear, this happens when I run any aggregation, including the identity
> aggregation (return the Pandas DataFrame that was passed in). I do not get
> this error when I return an empty DataFrame, so it seems to be a symptom of
> the serialization of the Pandas DataFrame back to Spark.
> I have observed this behavior with the following versions:
> * Spark 2.3.0
> * PyArrow 0.9.0 (also 0.8.0)
> * Pandas 0.22.0 (also 0.22.1)
> * Numpy 1.14.1
> Here is some sample code:
> {quote}{{@func.pandas_udf(SCHEMA, func.PandasUDFType.GROUPED_MAP)}}{quote}
> {quote}{{def aggregation(df):}}{quote}
> {quote}{{ return df}}{quote}
> {quote}{{df.groupBy('a').apply(aggregation) # get error}}{quote}
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)