[ https://issues.apache.org/jira/browse/SPARK-26067?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Abdeali Kothari updated SPARK-26067: ------------------------------------ Description: When I run spark's Pandas GROUPED_MAP udfs to apply a UDAF i wrote in pythohn/pandas on a grouped dataframe in spark - it fails if the number of columns is greater than 255 in Pytohn 3.6 and lower. {code:java} import pyspark from pyspark.sql import types as T, functions as F spark = pyspark.sql.SparkSession.builder.getOrCreate() df = spark.createDataFrame( [[i for i in range(256)], [i+1 for i in range(256)]], schema=["a" + str(i) for i in range(256)]) new_schema = T.StructType([ field for field in df.schema] + [T.StructField("new_row", T.DoubleType())]) def myfunc(df): df['new_row'] = 1 return df myfunc_udf = F.pandas_udf(new_schema, F.PandasUDFType.GROUPED_MAP)(myfunc) df2 = df.groupBy(["a1"]).apply(myfunc_udf) print(df2.count()) # This FAILS # ERROR: # Caused by: org.apache.spark.api.python.PythonException: Traceback (most recent call last): # File "/usr/local/hadoop/spark2.3.1/python/lib/pyspark.zip/pyspark/worker.py", line 219, in main # func, profiler, deserializer, serializer = read_udfs(pickleSer, infile, eval_type) # File "/usr/local/hadoop/spark2.3.1/python/lib/pyspark.zip/pyspark/worker.py", line 148, in read_udfs # mapper = eval(mapper_str, udfs) # File "<string>", line 1 # SyntaxError: more than 255 arguments {code} Note: In Python 3.7 the 255 limit was raised, but I have not tried with Pytohn 3.7 ...https://docs.python.org/3.7/whatsnew/3.7.html#other-language-changes I was using Python 3.5 (from anaconda), Spark 2.3.1 to reproduce thihs on my Hadoop Linux cluster and also on my Mac standalone spark installation. was: When I run spark's Pandas GROUPED_MAP udfs to apply a UDAF i wrote in pythohn/pandas on a grouped dataframe in spark - it fails if the number of columns is greater than 255 in Pytohn 3.6 and lower. {code:java} import pyspark from pyspark.sql import types as T, functions as F spark = pyspark.sql.SparkSession.builder.getOrCreate() df = spark.createDataFrame( [[i for i in range(256)], [i+1 for i in range(256)]], schema=["a" + str(i) for i in range(256)]) new_schema = T.StructType([ field for field in df.schema] + [T.StructField("new_row", T.DoubleType())]) def myfunc(df): df['new_row'] = 1 return df myfunc_udf = F.pandas_udf(new_schema, F.PandasUDFType.GROUPED_MAP)(myfunc) df2 = df.groupBy(["a1"]).apply(myfunc_udf) print(df2.count()) # This FAILS # ERROR: # Caused by: org.apache.spark.api.python.PythonException: Traceback (most recent call last): # File "/usr/local/hadoop/spark2.3.1/python/lib/pyspark.zip/pyspark/worker.py", line 219, in main # func, profiler, deserializer, serializer = read_udfs(pickleSer, infile, eval_type) # File "/usr/local/hadoop/spark2.3.1/python/lib/pyspark.zip/pyspark/worker.py", line 148, in read_udfs # mapper = eval(mapper_str, udfs) # File "<string>", line 1 # SyntaxError: more than 255 arguments {code} I believe thhis is happening because internally this creates a UDF with inputs as every column in the DF. https://github.com/apache/spark/blob/41c2227a2318029709553a588e44dee28f106350/python/pyspark/sql/group.py#L274 Note: In Python 3.7 the 255 limit was raised, but I have not tried with Pytohn 3.7 ...https://docs.python.org/3.7/whatsnew/3.7.html#other-language-changes I was using Python 3.5 (from anaconda), Spark 2.3.1 to reproduce thihs on my Hadoop Linux cluster and also on my Mac standalone spark installation. > Pandas GROUPED_MAP udf breaks if DF has >255 columns > ---------------------------------------------------- > > Key: SPARK-26067 > URL: https://issues.apache.org/jira/browse/SPARK-26067 > Project: Spark > Issue Type: Bug > Components: PySpark > Affects Versions: 2.3.2, 2.4.0 > Reporter: Abdeali Kothari > Priority: Major > > When I run spark's Pandas GROUPED_MAP udfs to apply a UDAF i wrote in > pythohn/pandas on a grouped dataframe in spark - it fails if the number of > columns is greater than 255 in Pytohn 3.6 and lower. > {code:java} > import pyspark > from pyspark.sql import types as T, functions as F > spark = pyspark.sql.SparkSession.builder.getOrCreate() > df = spark.createDataFrame( > [[i for i in range(256)], [i+1 for i in range(256)]], schema=["a" + > str(i) for i in range(256)]) > new_schema = T.StructType([ > field for field in df.schema] + [T.StructField("new_row", > T.DoubleType())]) > def myfunc(df): > df['new_row'] = 1 > return df > myfunc_udf = F.pandas_udf(new_schema, F.PandasUDFType.GROUPED_MAP)(myfunc) > df2 = df.groupBy(["a1"]).apply(myfunc_udf) > print(df2.count()) # This FAILS > # ERROR: > # Caused by: org.apache.spark.api.python.PythonException: Traceback (most > recent call last): > # File > "/usr/local/hadoop/spark2.3.1/python/lib/pyspark.zip/pyspark/worker.py", line > 219, in main > # func, profiler, deserializer, serializer = read_udfs(pickleSer, infile, > eval_type) > # File > "/usr/local/hadoop/spark2.3.1/python/lib/pyspark.zip/pyspark/worker.py", line > 148, in read_udfs > # mapper = eval(mapper_str, udfs) > # File "<string>", line 1 > # SyntaxError: more than 255 arguments > {code} > Note: In Python 3.7 the 255 limit was raised, but I have not tried with > Pytohn 3.7 > ...https://docs.python.org/3.7/whatsnew/3.7.html#other-language-changes > I was using Python 3.5 (from anaconda), Spark 2.3.1 to reproduce thihs on my > Hadoop Linux cluster and also on my Mac standalone spark installation. -- This message was sent by Atlassian JIRA (v7.6.3#76005) --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org