[ 
https://issues.apache.org/jira/browse/ARROW-2590?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16792065#comment-16792065
 ] 

sacheendra talluri commented on ARROW-2590:
-------------------------------------------

The problem occurs when the *order* of columns in the pandas dataframe doesn't 
match the order of columns specified in the UDF return type.

A reproducible code example is present below. In it, notice that the first 
group-apply succeeds while the second one fails.

Ideally, the order of columns in the pandas dataframe and the type definition 
of the UDF shouldn't matter. If it does, it should be explicitly documented 
here: https://spark.apache.org/docs/latest/sql-pyspark-pandas-with-arrow.html

 
{code:java}
import pandas as pd
import pyspark.sql.functions as F
from pyspark.sql.types import *
from pyspark.sql import SparkSession


type_info1 = StructType([
    StructField("a", StringType(), False),
    StructField("b", LongType(), False),
])

type_info2 = StructType([
    StructField("b", LongType(), False),
    StructField("a", StringType(), False),
])


@F.pandas_udf(returnType=type_info1, functionType=F.PandasUDFType.GROUPED_MAP)
def fun1(df):
    return pd.DataFrame([{"a": "cool", "b": 1}])

@F.pandas_udf(returnType=type_info2, functionType=F.PandasUDFType.GROUPED_MAP)
def fun2(df):
    return pd.DataFrame([{"a": "cool", "b": 1}])

def main():
    spark = SparkSession.builder \
        .master("local[4]") \
        .appName("type error demo") \
        .getOrCreate()

    df1 = spark.createDataFrame([{"sample": 1}])
    # First groupby
    df1_mod1 = df1.groupBy("sample").apply(fun1)
    print(df1_mod1.collect())
    # Second groupby
    df1_mod2 = df1.groupBy("sample").apply(fun2)
    print(df1_mod2.collect())

if __name__ == "__main__":
    main()
{code}

> [Python] Pyspark python_udf serialization error on grouped map (Amazon EMR)
> ---------------------------------------------------------------------------
>
>                 Key: ARROW-2590
>                 URL: https://issues.apache.org/jira/browse/ARROW-2590
>             Project: Apache Arrow
>          Issue Type: Bug
>          Components: Python
>    Affects Versions: 0.9.0
>         Environment: Amazon EMR 5.13
> Spark 2.3.0
> PyArrow 0.9.0 (and 0.8.0)
> Pandas 0.22.0 (and 0.21.1)
> Numpy 1.14.1
>            Reporter: Daniel Fithian
>            Priority: Critical
>
> I am writing a python_udf grouped map aggregation on Spark 2.3.0 in Amazon 
> EMR. When I try to run any aggregation, I get the following Python stack 
> trace:
> {quote}{{18/05/16 14:08:56 ERROR Utils: Aborting task}}
> {{ org.apache.spark.api.python.PythonException: Traceback (most recent call 
> last):}}
> {{ \{{ File 
> "/mnt/yarn/usercache/hadoop/appcache/application_1526400761989_0068/container_1526400761989_0068_01_000002/pyspark.zip/pyspark/worker.py",
>  line 229, in m}}}}
> {{ ain}}
> {{ \{{ process()}}}}
> {{ \{{ File 
> "/mnt/yarn/usercache/hadoop/appcache/application_1526400761989_0068/container_1526400761989_0068_01_000002/pyspark.zip/pyspark/worker.py",
>  line 224, in p}}}}
> {{ rocess}}
> {{ \{{ serializer.dump_stream(func(split_index, iterator), outfile)}}}}
> {{ \{{ File 
> "/mnt/yarn/usercache/hadoop/appcache/application_1526400761989_0068/container_1526400761989_0068_01_000002/pyspark.zip/pyspark/serializers.py",
>  line 261,}}}}
> {{ \{{ in dump_stream}}}}
> {{ \{{ batch = _create_batch(series, self._timezone)}}}}
> {{ \{{ File 
> "/mnt/yarn/usercache/hadoop/appcache/application_1526400761989_0068/container_1526400761989_0068_01_000002/pyspark.zip/pyspark/serializers.py",
>  line 239,}}}}
> {{ \{{ in _create_batch}}}}
> {{ {{ arrs = [create_array(s, t) for s, t in series]}}}}
> {{ \{{ File 
> "/mnt/yarn/usercache/hadoop/appcache/application_1526400761989_0068/container_1526400761989_0068_01_000002/pyspark.zip/pyspark/serializers.py",
>  line 239,}}}}
> {{ \{{ in <listcomp>}}}}
> {{ {{ arrs = [create_array(s, t) for s, t in series]}}}}
> {{ \{{ File 
> "/mnt/yarn/usercache/hadoop/appcache/application_1526400761989_0068/container_1526400761989_0068_01_000002/pyspark.zip/pyspark/serializers.py",
>  line 237, in create_array}}}}
> {{ \{{ return pa.Array.from_pandas(s, mask=mask, type=t)}}}}
> {{ \{{ File "array.pxi", line 372, in pyarrow.lib.Array.from_pandas}}}}
> {{ \{{ File "array.pxi", line 177, in pyarrow.lib.array}}}}
> {{ \{{ File "array.pxi", line 77, in pyarrow.lib._ndarray_to_array}}}}
> {{ \{{ File "error.pxi", line 98, in pyarrow.lib.check_status}}}}
> {{ pyarrow.lib.ArrowException: Unknown error: 'utf-32-le' codec can't decode 
> bytes in position 0-3: code point not in range(0x110000)}}{quote}
> To be clear, this happens when I run any aggregation, including the identity 
> aggregation (return the Pandas DataFrame that was passed in). I do not get 
> this error when I return an empty DataFrame, so it seems to be a symptom of 
> the serialization of the Pandas DataFrame back to Spark.
> I have observed this behavior with the following versions:
>  * Spark 2.3.0
>  * PyArrow 0.9.0 (also 0.8.0)
>  * Pandas 0.22.0 (also 0.22.1)
>  * Numpy 1.14.1
> Here is some sample code:
> {quote}{{@func.pandas_udf(SCHEMA, func.PandasUDFType.GROUPED_MAP)}}{quote}
> {quote}{{def aggregation(df):}}{quote}
> {quote}{{    return df}}{quote}
> {quote}{{df.groupBy('a').apply(aggregation) # get error}}{quote}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to