[ 
https://issues.apache.org/jira/browse/ARROW-4890?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16878818#comment-16878818
 ] 

Arvind Ravish commented on ARROW-4890:
--------------------------------------

I get the same thing when running the repro code in Databricks. Can we get some 
description about what the error means?

Plan looks like this

== Physical Plan == *(7) HashAggregate(keys=[], 
functions=[finalmerge_count(merge count#506L) AS count(1)#502L|#506L) AS 
count(1)#502L]) +- Exchange SinglePartition +- *(6) HashAggregate(keys=[], 
functions=[partial_count(1) AS count#506L|#506L]) +- *(6) Project +- 
FlatMapGroupsInPandas [df1_c1#72L|#72L], myudf(df1_c1#72L, df1_c2#73, 
df1_c3#74, df1_c4#75, df2_c1#108L, df2_c2#109, df2_c3#110, df2_c4#111, 
df2_c5#112, df2_c6#113), [df1_c1#264L, df1_c2#265, df1_c3#266, df1_c4#267, 
df2_c1#268L, df2_c2#269, df2_c3#270, df2_c4#271, df2_c5#272, df2_c6#273|#264L, 
df1_c2#265, df1_c3#266, df1_c4#267, df2_c1#268L, df2_c2#269, df2_c3#270, 
df2_c4#271, df2_c5#272, df2_c6#273] +- *(5) Project [df1_c1#72L, df1_c1#72L, 
df1_c2#73, df1_c3#74, df1_c4#75, df2_c1#108L, df2_c2#109, df2_c3#110, 
df2_c4#111, df2_c5#112, df2_c6#113|#72L, df1_c1#72L, df1_c2#73, df1_c3#74, 
df1_c4#75, df2_c1#108L, df2_c2#109, df2_c3#110, df2_c4#111, df2_c5#112, 
df2_c6#113] +- *(5) SortMergeJoin [df1_c1#72L|#72L], [df2_c1#108L|#108L], Inner 
:- *(2) Sort [df1_c1#72L ASC NULLS FIRST|#72L ASC NULLS FIRST], false, 0 : +- 
Exchange hashpartitioning(df1_c1#72L, 200) : +- *(1) Project [df1_c1#72L, 
df1_c2#73, df1_c3#74, df1_c4#75|#72L, df1_c2#73, df1_c3#74, df1_c4#75] : +- 
*(1) Filter isnotnull(df1_c1#72L) : +- *(1) Scan 
ExistingRDD[index#71L,df1_c1#72L,df1_c2#73,df1_c3#74,df1_c4#75|#71L,df1_c1#72L,df1_c2#73,df1_c3#74,df1_c4#75]
 +- *(4) Sort [df2_c1#108L ASC NULLS FIRST|#108L ASC NULLS FIRST], false, 0 +- 
Exchange hashpartitioning(df2_c1#108L, 200) +- *(3) Project [df2_c1#108L, 
df2_c2#109, df2_c3#110, df2_c4#111, df2_c5#112, df2_c6#113|#108L, df2_c2#109, 
df2_c3#110, df2_c4#111, df2_c5#112, df2_c6#113] +- *(3) Filter 
isnotnull(df2_c1#108L) +- *(3) Scan 
ExistingRDD[index#107L,df2_c1#108L,df2_c2#109,df2_c3#110,df2_c4#111,df2_c5#112,df2_c6#113|#107L,df2_c1#108L,df2_c2#109,df2_c3#110,df2_c4#111,df2_c5#112,df2_c6#113]

  !image-2019-07-04-12-03-57-002.png!

An error occurred while calling o1471.showString. : 
org.apache.spark.SparkException: Job aborted due to stage failure: Task 93 in 
stage 39.0 failed 4 times, most recent failure: Lost task 93.3 in stage 39.0 
(TID 1261, 10.139.64.6, executor 1): 
org.apache.spark.api.python.PythonException: Traceback (most recent call last): 
File "/databricks/spark/python/pyspark/worker.py", line 403, in main process() 
File "/databricks/spark/python/pyspark/worker.py", line 398, in process 
serializer.dump_stream(func(split_index, iterator), outfile) File 
"/databricks/spark/python/pyspark/serializers.py", line 296, in dump_stream for 
series in iterator: File "/databricks/spark/python/pyspark/serializers.py", 
line 319, in load_stream for batch in generator(): File 
"/databricks/spark/python/pyspark/serializers.py", line 314, in generator for 
batch in reader: File "pyarrow/ipc.pxi", line 268, in __iter__ 
(/arrow/python/build/temp.linux-x86_64-3.5/lib.cxx:70278) File 
"pyarrow/ipc.pxi", line 284, in pyarrow.lib._RecordBatchReader.read_next_batch 
(/arrow/python/build/temp.linux-x86_64-3.5/lib.cxx:70534) File 
"pyarrow/error.pxi", line 79, in pyarrow.lib.check_status 
(/arrow/python/build/temp.linux-x86_64-3.5/lib.cxx:8345) 
pyarrow.lib.ArrowIOError: read length must be positive or -1 at 
org.apache.spark.api.python.BasePythonRunner$ReaderIterator.handlePythonException(PythonRunner.scala:490)
 at 
org.apache.spark.sql.execution.python.ArrowPythonRunner$$anon$1.read(ArrowPythonRunner.scala:172)
 at 
org.apache.spark.sql.execution.python.ArrowPythonRunner$$anon$1.read(ArrowPythonRunner.scala:122)
 at 
org.apache.spark.api.python.BasePythonRunner$ReaderIterator.hasNext(PythonRunner.scala:444)
 at 
org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37) 
at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:439) at 
scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408) at 
org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage6.agg_doAggregateWithoutKey_0$(Unknown
 Source) at 
org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage6.processNext(Unknown
 Source) at 
org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
 at 
org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$13$$anon$1.hasNext(WholeStageCodegenExec.scala:634)
 at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408) at 
org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriter.write(BypassMergeSortShuffleWriter.java:125)
 at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:99) 
at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:55) 
at org.apache.spark.scheduler.Task.doRunTask(Task.scala:139) at 
org.apache.spark.scheduler.Task.run(Task.scala:112) at 
org.apache.spark.executor.Executor$TaskRunner$$anonfun$13.apply(Executor.scala:497)
 at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1432) at 
org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:503) at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) 
at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) 
at java.lang.Thread.run(Thread.java:748)

> Spark+Arrow Grouped pandas UDAF - read length must be positive or -1
> --------------------------------------------------------------------
>
>                 Key: ARROW-4890
>                 URL: https://issues.apache.org/jira/browse/ARROW-4890
>             Project: Apache Arrow
>          Issue Type: Bug
>          Components: Python
>    Affects Versions: 0.8.0
>         Environment: Cloudera cdh5.13.3
> Cloudera Spark 2.3.0.cloudera3
>            Reporter: Abdeali Kothari
>            Priority: Major
>         Attachments: Task retry fails.png, image-2019-07-04-12-03-57-002.png
>
>
> Creating this in Arrow project as the traceback seems to suggest this is an 
> issue in Arrow.
>  Continuation from the conversation on the 
> https://mail-archives.apache.org/mod_mbox/arrow-dev/201903.mbox/%3CCAK7Z5T_mChuqhFDAF2U68dO=p_1nst5ajjcrg0mexo5kby9...@mail.gmail.com%3E
> When I run a GROUPED_MAP UDF in Spark using PySpark, I run into the error:
> {noformat}
>   File 
> "/opt/cloudera/parcels/SPARK2-2.3.0.cloudera3-1.cdh5.13.3.p0.458809/lib/spark2/python/lib/pyspark.zip/pyspark/serializers.py",
>  line 279, in load_stream
>     for batch in reader:
>   File "pyarrow/ipc.pxi", line 265, in __iter__
>   File "pyarrow/ipc.pxi", line 281, in 
> pyarrow.lib._RecordBatchReader.read_next_batch
>   File "pyarrow/error.pxi", line 83, in pyarrow.lib.check_status
> pyarrow.lib.ArrowIOError: read length must be positive or -1
> {noformat}
> as my dataset size starts increasing that I want to group on. Here is a 
> reproducible code snippet where I can reproduce this.
>  Note: My actual dataset is much larger and has many more unique IDs and is a 
> valid usecase where I cannot simplify this groupby in any way. I have 
> stripped out all the logic to make this example as simple as I could.
> {code:java}
> import os
> os.environ['PYSPARK_SUBMIT_ARGS'] = '--executor-memory 9G pyspark-shell'
> import findspark
> findspark.init()
> import pyspark
> from pyspark.sql import functions as F, types as T
> import pandas as pd
> spark = pyspark.sql.SparkSession.builder.getOrCreate()
> pdf1 = pd.DataFrame(
>       [[1234567, 0.0, "abcdefghij", "2000-01-01T00:00:00.000Z"]],
>       columns=['df1_c1', 'df1_c2', 'df1_c3', 'df1_c4']
> )
> df1 = spark.createDataFrame(pd.concat([pdf1 for i in 
> range(429)]).reset_index()).drop('index')
> pdf2 = pd.DataFrame(
>       [[1234567, 0.0, "abcdefghijklmno", "2000-01-01", "abcdefghijklmno", 
> "abcdefghijklmno"]],
>       columns=['df2_c1', 'df2_c2', 'df2_c3', 'df2_c4', 'df2_c5', 'df2_c6']
> )
> df2 = spark.createDataFrame(pd.concat([pdf2 for i in 
> range(48993)]).reset_index()).drop('index')
> df3 = df1.join(df2, df1['df1_c1'] == df2['df2_c1'], how='inner')
> def myudf(df):
>     return df
> df4 = df3
> udf = F.pandas_udf(df4.schema, F.PandasUDFType.GROUPED_MAP)(myudf)
> df5 = df4.groupBy('df1_c1').apply(udf)
> print('df5.count()', df5.count())
> # df5.write.parquet('/tmp/temp.parquet', mode='overwrite')
> {code}
> I have tried running this on Amazon EMR with Spark 2.3.1 and 20GB RAM per 
> executor too.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to