[
https://issues.apache.org/jira/browse/SPARK-18105?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16856558#comment-16856558
]
Piotr Chowaniec edited comment on SPARK-18105 at 6/18/19 7:21 AM:
------------------------------------------------------------------
I have a similar issue with Spark 2.3.2.
Here is a stack trace:
{code:java}
org.apache.spark.scheduler.DAGScheduler : ShuffleMapStage 647 (count at
Step.java:20) failed in 1.908 s due to
org.apache.spark.shuffle.FetchFailedException: Stream is corrupted
at
org.apache.spark.storage.ShuffleBlockFetcherIterator.throwFetchFailedException(ShuffleBlockFetcherIterator.scala:528)
at
org.apache.spark.storage.ShuffleBlockFetcherIterator.next(ShuffleBlockFetcherIterator.scala:444)
at
org.apache.spark.storage.ShuffleBlockFetcherIterator.next(ShuffleBlockFetcherIterator.scala:62)
at scala.collection.Iterator$$anon$12.nextCur(Iterator.scala:434)
at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:440)
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
at
org.apache.spark.util.CompletionIterator.hasNext(CompletionIterator.scala:30)
at
org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
at
org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage4.agg_doAggregateWithKeys_1$(Unknown
Source)
at
org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage4.agg_doAggregateWithKeys_0$(Unknown
Source)
at
org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage4.processNext(Unknown
Source)
at
org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
at
org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$10$$anon$1.hasNext(WholeStageCodegenExec.scala:614)
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
at
org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriter.write(BypassMergeSortShuffleWriter.java:125)
at
org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:96)
at
org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:53)
at org.apache.spark.scheduler.Task.run(Task.scala:109)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:345)
at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
Caused by: java.io.IOException: Stream is corrupted
at
net.jpountz.lz4.LZ4BlockInputStream.refill(LZ4BlockInputStream.java:252)
at
net.jpountz.lz4.LZ4BlockInputStream.read(LZ4BlockInputStream.java:157)
at
net.jpountz.lz4.LZ4BlockInputStream.read(LZ4BlockInputStream.java:170)
at
org.apache.spark.util.Utils$$anonfun$copyStream$1.apply$mcJ$sp(Utils.scala:349)
at
org.apache.spark.util.Utils$$anonfun$copyStream$1.apply(Utils.scala:336)
at
org.apache.spark.util.Utils$$anonfun$copyStream$1.apply(Utils.scala:336)
at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1381)
at org.apache.spark.util.Utils$.copyStream(Utils.scala:357)
at
org.apache.spark.storage.ShuffleBlockFetcherIterator.next(ShuffleBlockFetcherIterator.scala:436)
... 21 more
Caused by: net.jpountz.lz4.LZ4Exception: Error decoding offset 2010 of input
buffer
at
net.jpountz.lz4.LZ4JNIFastDecompressor.decompress(LZ4JNIFastDecompressor.java:39)
at
net.jpountz.lz4.LZ4BlockInputStream.refill(LZ4BlockInputStream.java:247)
... 29 more
{code}
It happens during ETL process that has about 200 steps. It looks like it
depends on the input data because we have exceptions only on the production
environment (on test and dev machines same process with different data is
running without problems). Unfortunately there is no way to use production data
on other environment, so we cannot find differences.
Changing compression codec to Snappy gives:
{code:java}
o.apache.spark.scheduler.TaskSetManager : Lost task 0.0 in stage 852.3 (TID 308
36, localhost, executor driver): FetchFailed(BlockManagerId(driver, DNS.domena,
33588, None), shuffleId=298, mapId=2, reduceId=3, message=
org.apache.spark.shuffle.FetchFailedException: FAILED_TO_UNCOMPRESS(5)
at
org.apache.spark.storage.ShuffleBlockFetcherIterator.throwFetchFailedException(ShuffleBlockFetcherIterator.scala:528)
at
org.apache.spark.storage.ShuffleBlockFetcherIterator.next(ShuffleBlockFetcherIterator.scala:444)
at
org.apache.spark.storage.ShuffleBlockFetcherIterator.next(ShuffleBlockFetcherIterator.scala:62)
at scala.collection.Iterator$$anon$12.nextCur(Iterator.scala:434)
at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:440)
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
at
org.apache.spark.util.CompletionIterator.hasNext(CompletionIterator.scala:30)
at
org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
at
org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage3.agg_doAggregateWithKeys_0$(Unknown
Source)
at
org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage3.processNext(Unknown
Source)
at
org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
at
org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$10$$anon$1.hasNext(WholeStageCodegenExec.scala:614)
at
org.apache.spark.sql.execution.datasources.FileFormatWriter$.org$apache$spark$sql$execution$datasources$FileFormatWriter$$executeTask(FileFormatWriter.scala:257)
at
org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$write$1.apply(FileFormatWriter.scala:197)
at
org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$write$1.apply(FileFormatWriter.scala:196)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
at org.apache.spark.scheduler.Task.run(Task.scala:109)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:345)
at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
Caused by: java.io.IOException: FAILED_TO_UNCOMPRESS(5)
at org.xerial.snappy.SnappyNative.throw_error(SnappyNative.java:98)
at org.xerial.snappy.SnappyNative.rawUncompress(Native Method)
at org.xerial.snappy.Snappy.rawUncompress(Snappy.java:474)
at org.xerial.snappy.Snappy.uncompress(Snappy.java:513)
at
org.xerial.snappy.SnappyInputStream.hasNextChunk(SnappyInputStream.java:439)
at org.xerial.snappy.SnappyInputStream.read(SnappyInputStream.java:167)
at java.io.InputStream.read(InputStream.java:101)
at
org.apache.spark.util.Utils$$anonfun$copyStream$1.apply$mcJ$sp(Utils.scala:349)
at
org.apache.spark.util.Utils$$anonfun$copyStream$1.apply(Utils.scala:336)
at
org.apache.spark.util.Utils$$anonfun$copyStream$1.apply(Utils.scala:336)
at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1381)
at org.apache.spark.util.Utils$.copyStream(Utils.scala:357)
at
org.apache.spark.storage.ShuffleBlockFetcherIterator.next(ShuffleBlockFetcherIterator.scala:436)
... 20 more
{code}
Any ideas how to debug this issue? Any chance to have a workaround or solution?
*UPDATE*
Changing the number of partitions helps in our case, but still this is only a
workaround, not a real solution. Default number of partitions is 200, we've
changed to 20:
{{spark.sql.shuffle.partitions=20}}
Docs: https://spark.apache.org/docs/latest/sql-performance-tuning.html
was (Author: piotr chowaniec):
I have a similar issue with Spark 2.3.2.
Here is a stack trace:
{code:java}
org.apache.spark.scheduler.DAGScheduler : ShuffleMapStage 647 (count at
Step.java:20) failed in 1.908 s due to
org.apache.spark.shuffle.FetchFailedException: Stream is corrupted
at
org.apache.spark.storage.ShuffleBlockFetcherIterator.throwFetchFailedException(ShuffleBlockFetcherIterator.scala:528)
at
org.apache.spark.storage.ShuffleBlockFetcherIterator.next(ShuffleBlockFetcherIterator.scala:444)
at
org.apache.spark.storage.ShuffleBlockFetcherIterator.next(ShuffleBlockFetcherIterator.scala:62)
at scala.collection.Iterator$$anon$12.nextCur(Iterator.scala:434)
at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:440)
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
at
org.apache.spark.util.CompletionIterator.hasNext(CompletionIterator.scala:30)
at
org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
at
org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage4.agg_doAggregateWithKeys_1$(Unknown
Source)
at
org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage4.agg_doAggregateWithKeys_0$(Unknown
Source)
at
org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage4.processNext(Unknown
Source)
at
org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
at
org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$10$$anon$1.hasNext(WholeStageCodegenExec.scala:614)
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
at
org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriter.write(BypassMergeSortShuffleWriter.java:125)
at
org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:96)
at
org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:53)
at org.apache.spark.scheduler.Task.run(Task.scala:109)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:345)
at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
Caused by: java.io.IOException: Stream is corrupted
at
net.jpountz.lz4.LZ4BlockInputStream.refill(LZ4BlockInputStream.java:252)
at
net.jpountz.lz4.LZ4BlockInputStream.read(LZ4BlockInputStream.java:157)
at
net.jpountz.lz4.LZ4BlockInputStream.read(LZ4BlockInputStream.java:170)
at
org.apache.spark.util.Utils$$anonfun$copyStream$1.apply$mcJ$sp(Utils.scala:349)
at
org.apache.spark.util.Utils$$anonfun$copyStream$1.apply(Utils.scala:336)
at
org.apache.spark.util.Utils$$anonfun$copyStream$1.apply(Utils.scala:336)
at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1381)
at org.apache.spark.util.Utils$.copyStream(Utils.scala:357)
at
org.apache.spark.storage.ShuffleBlockFetcherIterator.next(ShuffleBlockFetcherIterator.scala:436)
... 21 more
Caused by: net.jpountz.lz4.LZ4Exception: Error decoding offset 2010 of input
buffer
at
net.jpountz.lz4.LZ4JNIFastDecompressor.decompress(LZ4JNIFastDecompressor.java:39)
at
net.jpountz.lz4.LZ4BlockInputStream.refill(LZ4BlockInputStream.java:247)
... 29 more
{code}
It happens during ETL process that has about 200 steps. It looks like it
depends on the input data because we have exceptions only on the production
environment (on test and dev machines same process with different data is
running without problems). Unfortunately there is no way to use production data
on other environment, so we cannot find differences.
Changing compression codec to Snappy gives:
{code:java}
o.apache.spark.scheduler.TaskSetManager : Lost task 0.0 in stage 852.3 (TID 308
36, localhost, executor driver): FetchFailed(BlockManagerId(driver, DNS.domena,
33588, None), shuffleId=298, mapId=2, reduceId=3, message=
org.apache.spark.shuffle.FetchFailedException: FAILED_TO_UNCOMPRESS(5)
at
org.apache.spark.storage.ShuffleBlockFetcherIterator.throwFetchFailedException(ShuffleBlockFetcherIterator.scala:528)
at
org.apache.spark.storage.ShuffleBlockFetcherIterator.next(ShuffleBlockFetcherIterator.scala:444)
at
org.apache.spark.storage.ShuffleBlockFetcherIterator.next(ShuffleBlockFetcherIterator.scala:62)
at scala.collection.Iterator$$anon$12.nextCur(Iterator.scala:434)
at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:440)
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
at
org.apache.spark.util.CompletionIterator.hasNext(CompletionIterator.scala:30)
at
org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
at
org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage3.agg_doAggregateWithKeys_0$(Unknown
Source)
at
org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage3.processNext(Unknown
Source)
at
org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
at
org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$10$$anon$1.hasNext(WholeStageCodegenExec.scala:614)
at
org.apache.spark.sql.execution.datasources.FileFormatWriter$.org$apache$spark$sql$execution$datasources$FileFormatWriter$$executeTask(FileFormatWriter.scala:257)
at
org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$write$1.apply(FileFormatWriter.scala:197)
at
org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$write$1.apply(FileFormatWriter.scala:196)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
at org.apache.spark.scheduler.Task.run(Task.scala:109)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:345)
at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
Caused by: java.io.IOException: FAILED_TO_UNCOMPRESS(5)
at org.xerial.snappy.SnappyNative.throw_error(SnappyNative.java:98)
at org.xerial.snappy.SnappyNative.rawUncompress(Native Method)
at org.xerial.snappy.Snappy.rawUncompress(Snappy.java:474)
at org.xerial.snappy.Snappy.uncompress(Snappy.java:513)
at
org.xerial.snappy.SnappyInputStream.hasNextChunk(SnappyInputStream.java:439)
at org.xerial.snappy.SnappyInputStream.read(SnappyInputStream.java:167)
at java.io.InputStream.read(InputStream.java:101)
at
org.apache.spark.util.Utils$$anonfun$copyStream$1.apply$mcJ$sp(Utils.scala:349)
at
org.apache.spark.util.Utils$$anonfun$copyStream$1.apply(Utils.scala:336)
at
org.apache.spark.util.Utils$$anonfun$copyStream$1.apply(Utils.scala:336)
at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1381)
at org.apache.spark.util.Utils$.copyStream(Utils.scala:357)
at
org.apache.spark.storage.ShuffleBlockFetcherIterator.next(ShuffleBlockFetcherIterator.scala:436)
... 20 more
{code}
Any ideas how to debug this issue? Any chance to have a workaround or solution?
> LZ4 failed to decompress a stream of shuffled data
> --------------------------------------------------
>
> Key: SPARK-18105
> URL: https://issues.apache.org/jira/browse/SPARK-18105
> Project: Spark
> Issue Type: Bug
> Components: Spark Core
> Reporter: Davies Liu
> Assignee: Davies Liu
> Priority: Major
>
> When lz4 is used to compress the shuffle files, it may fail to decompress it
> as "stream is corrupt"
> {code}
> Caused by: org.apache.spark.SparkException: Job aborted due to stage failure:
> Task 92 in stage 5.0 failed 4 times, most recent failure: Lost task 92.3 in
> stage 5.0 (TID 16616, 10.0.27.18): java.io.IOException: Stream is corrupted
> at
> org.apache.spark.io.LZ4BlockInputStream.refill(LZ4BlockInputStream.java:220)
> at
> org.apache.spark.io.LZ4BlockInputStream.available(LZ4BlockInputStream.java:109)
> at java.io.BufferedInputStream.read(BufferedInputStream.java:353)
> at java.io.DataInputStream.read(DataInputStream.java:149)
> at com.google.common.io.ByteStreams.read(ByteStreams.java:828)
> at com.google.common.io.ByteStreams.readFully(ByteStreams.java:695)
> at
> org.apache.spark.sql.execution.UnsafeRowSerializerInstance$$anon$3$$anon$1.next(UnsafeRowSerializer.scala:127)
> at
> org.apache.spark.sql.execution.UnsafeRowSerializerInstance$$anon$3$$anon$1.next(UnsafeRowSerializer.scala:110)
> at scala.collection.Iterator$$anon$13.next(Iterator.scala:372)
> at scala.collection.Iterator$$anon$11.next(Iterator.scala:328)
> at
> org.apache.spark.util.CompletionIterator.next(CompletionIterator.scala:30)
> at
> org.apache.spark.InterruptibleIterator.next(InterruptibleIterator.scala:43)
> at scala.collection.Iterator$$anon$11.next(Iterator.scala:328)
> at
> org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.sort_addToSorter$(Unknown
> Source)
> at
> org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.processNext(Unknown
> Source)
> at
> org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
> at
> org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$8$$anon$1.hasNext(WholeStageCodegenExec.scala:370)
> at
> org.apache.spark.sql.execution.datasources.DynamicPartitionWriterContainer.writeRows(WriterContainer.scala:397)
> at
> org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelationCommand$$anonfun$run$1$$anonfun$apply$mcV$sp$1.apply(InsertIntoHadoopFsRelationCommand.scala:143)
> at
> org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelationCommand$$anonfun$run$1$$anonfun$apply$mcV$sp$1.apply(InsertIntoHadoopFsRelationCommand.scala:143)
> at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:70)
> at org.apache.spark.scheduler.Task.run(Task.scala:86)
> at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:274)
> at
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
> at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
> at java.lang.Thread.run(Thread.java:745)
> {code}
> https://github.com/jpountz/lz4-java/issues/89
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]