[
https://issues.apache.org/jira/browse/SPARK-33576?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17241342#comment-17241342
]
Darshat commented on SPARK-33576:
---------------------------------
I turned on the jvm exception logging, and this is the full trace:
JVM stacktrace: org.apache.spark.SparkException: Job aborted due to stage
failure: Task 1 in stage 58.0 failed 4 times, most recent failure: Lost task
1.3 in stage 58.0 (TID 3336, 10.139.64.5, executor 7):
org.apache.spark.api.python.PythonException: 'OSError: Invalid IPC message:
negative bodyLength'. Full traceback below: Traceback (most recent call last):
File "/databricks/spark/python/pyspark/worker.py", line 654, in main process()
File "/databricks/spark/python/pyspark/worker.py", line 646, in process
serializer.dump_stream(out_iter, outfile) File
"/databricks/spark/python/pyspark/sql/pandas/serializers.py", line 281, in
dump_stream timely_flush_timeout_ms=self.timely_flush_timeout_ms) File
"/databricks/spark/python/pyspark/sql/pandas/serializers.py", line 97, in
dump_stream for batch in iterator: File
"/databricks/spark/python/pyspark/sql/pandas/serializers.py", line 271, in
init_stream_yield_batches for series in iterator: File
"/databricks/spark/python/pyspark/sql/pandas/serializers.py", line 287, in
load_stream for batch in batches: File
"/databricks/spark/python/pyspark/sql/pandas/serializers.py", line 228, in
load_stream for batch in batches: File
"/databricks/spark/python/pyspark/sql/pandas/serializers.py", line 118, in
load_stream for batch in reader: File "pyarrow/ipc.pxi", line 412, in __iter__
File "pyarrow/ipc.pxi", line 432, in
pyarrow.lib._CRecordBatchReader.read_next_batch File "pyarrow/error.pxi", line
99, in pyarrow.lib.check_status OSError: Invalid IPC message: negative
bodyLength at
org.apache.spark.api.python.BasePythonRunner$ReaderIterator.handlePythonException(PythonRunner.scala:598)
at
org.apache.spark.sql.execution.python.PythonArrowOutput$$anon$1.read(PythonArrowOutput.scala:99)
at
org.apache.spark.sql.execution.python.PythonArrowOutput$$anon$1.read(PythonArrowOutput.scala:49)
at
org.apache.spark.api.python.BasePythonRunner$ReaderIterator.hasNext(PythonRunner.scala:551)
at
org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:489) at
scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:458) at
org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage4.agg_doAggregateWithoutKey_0$(Unknown
Source) at
org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage4.processNext(Unknown
Source) at
org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
at
org.apache.spark.sql.execution.WholeStageCodegenExec$$anon$1.hasNext(WholeStageCodegenExec.scala:733)
at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:458) at
org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriter.write(BypassMergeSortShuffleWriter.java:132)
at
org.apache.spark.shuffle.ShuffleWriteProcessor.write(ShuffleWriteProcessor.scala:59)
at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:99)
at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:52)
at org.apache.spark.scheduler.Task.doRunTask(Task.scala:144) at
org.apache.spark.scheduler.Task.run(Task.scala:117) at
org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$9(Executor.scala:660)
at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1581) at
org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:663) at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748) Driver stacktrace: at
org.apache.spark.scheduler.DAGScheduler.failJobAndIndependentStages(DAGScheduler.scala:2519)
at
org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2(DAGScheduler.scala:2466)
at
org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2$adapted(DAGScheduler.scala:2460)
at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62) at
scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55) at
scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49) at
org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:2460) at
org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1(DAGScheduler.scala:1152)
at
org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1$adapted(DAGScheduler.scala:1152)
at scala.Option.foreach(Option.scala:407) at
org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:1152)
at
org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2721)
at
org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2668)
at
org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2656)
at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49) Caused by:
org.apache.spark.api.python.PythonException: 'OSError: Invalid IPC message:
negative bodyLength'. Full traceback below: Traceback (most recent call last):
File "/databricks/spark/python/pyspark/worker.py", line 654, in main process()
File "/databricks/spark/python/pyspark/worker.py", line 646, in process
serializer.dump_stream(out_iter, outfile) File
"/databricks/spark/python/pyspark/sql/pandas/serializers.py", line 281, in
dump_stream timely_flush_timeout_ms=self.timely_flush_timeout_ms) File
"/databricks/spark/python/pyspark/sql/pandas/serializers.py", line 97, in
dump_stream for batch in iterator: File
"/databricks/spark/python/pyspark/sql/pandas/serializers.py", line 271, in
init_stream_yield_batches for series in iterator: File
"/databricks/spark/python/pyspark/sql/pandas/serializers.py", line 287, in
load_stream for batch in batches: File
"/databricks/spark/python/pyspark/sql/pandas/serializers.py", line 228, in
load_stream for batch in batches: File
"/databricks/spark/python/pyspark/sql/pandas/serializers.py", line 118, in
load_stream for batch in reader: File "pyarrow/ipc.pxi", line 412, in __iter__
File "pyarrow/ipc.pxi", line 432, in
pyarrow.lib._CRecordBatchReader.read_next_batch File "pyarrow/error.pxi", line
99, in pyarrow.lib.check_status OSError: Invalid IPC message: negative
bodyLength at
org.apache.spark.api.python.BasePythonRunner$ReaderIterator.handlePythonException(PythonRunner.scala:598)
at
org.apache.spark.sql.execution.python.PythonArrowOutput$$anon$1.read(PythonArrowOutput.scala:99)
at
org.apache.spark.sql.execution.python.PythonArrowOutput$$anon$1.read(PythonArrowOutput.scala:49)
at
org.apache.spark.api.python.BasePythonRunner$ReaderIterator.hasNext(PythonRunner.scala:551)
at
org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:489) at
scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:458) at
org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage4.agg_doAggregateWithoutKey_0$(Unknown
Source) at
org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage4.processNext(Unknown
Source) at
org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
at
org.apache.spark.sql.execution.WholeStageCodegenExec$$anon$1.hasNext(WholeStageCodegenExec.scala:733)
at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:458) at
org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriter.write(BypassMergeSortShuffleWriter.java:132)
at
org.apache.spark.shuffle.ShuffleWriteProcessor.write(ShuffleWriteProcessor.scala:59)
at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:99)
at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:52)
at org.apache.spark.scheduler.Task.doRunTask(Task.scala:144) at
org.apache.spark.scheduler.Task.run(Task.scala:117) at
org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$9(Executor.scala:660)
at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1581) at
org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:663) at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
> PythonException: An exception was thrown from a UDF: 'OSError: Invalid IPC
> message: negative bodyLength'.
> ---------------------------------------------------------------------------------------------------------
>
> Key: SPARK-33576
> URL: https://issues.apache.org/jira/browse/SPARK-33576
> Project: Spark
> Issue Type: Bug
> Components: PySpark
> Affects Versions: 3.0.1
> Environment: Databricks runtime 7.3
> Spakr 3.0.1
> Scala 2.12
> Reporter: Darshat
> Priority: Major
>
> Hello,
> We are using Databricks on Azure to process large amount of ecommerce data.
> Databricks runtime is 7.3 which includes Apache spark 3.0.1 and Scala 2.12.
> During processing, there is a groupby operation on the DataFrame that
> consistently gets an exception of this type:
>
> {color:#ff0000}PythonException: An exception was thrown from a UDF: 'OSError:
> Invalid IPC message: negative bodyLength'. Full traceback below: Traceback
> (most recent call last): File "/databricks/spark/python/pyspark/worker.py",
> line 654, in main process() File
> "/databricks/spark/python/pyspark/worker.py", line 646, in process
> serializer.dump_stream(out_iter, outfile) File
> "/databricks/spark/python/pyspark/sql/pandas/serializers.py", line 281, in
> dump_stream timely_flush_timeout_ms=self.timely_flush_timeout_ms) File
> "/databricks/spark/python/pyspark/sql/pandas/serializers.py", line 97, in
> dump_stream for batch in iterator: File
> "/databricks/spark/python/pyspark/sql/pandas/serializers.py", line 271, in
> init_stream_yield_batches for series in iterator: File
> "/databricks/spark/python/pyspark/sql/pandas/serializers.py", line 287, in
> load_stream for batch in batches: File
> "/databricks/spark/python/pyspark/sql/pandas/serializers.py", line 228, in
> load_stream for batch in batches: File
> "/databricks/spark/python/pyspark/sql/pandas/serializers.py", line 118, in
> load_stream for batch in reader: File "pyarrow/ipc.pxi", line 412, in
> __iter__ File "pyarrow/ipc.pxi", line 432, in
> pyarrow.lib._CRecordBatchReader.read_next_batch File "pyarrow/error.pxi",
> line 99, in pyarrow.lib.check_status OSError: Invalid IPC message: negative
> bodyLength{color}
>
> Code that causes this:
> {color:#ff0000}x = df.groupby('providerid').apply(domain_features){color}
> {color:#ff0000}display(x.info()){color}
> Dataframe size - 22 million rows, 31 columns
> One of the columns is a string ('providerid') on which we do a groupby
> followed by an apply operation. There are 3 distinct provider ids in this
> set. While trying to enumerate/count the results, we get this exception.
> We've put all possible checks in the code for null values, or corrupt data
> and we are not able to track this to application level code. I hope we can
> get some help troubleshooting this as this is a blocker for rolling out at
> scale.
> The cluster has 8 nodes + driver, all 28GB RAM. I can provide any other
> settings that could be useful.
> Hope to get some insights into the problem.
> Thanks,
> Darshat Shah
--
This message was sent by Atlassian Jira
(v8.3.4#803005)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]