[ 
https://issues.apache.org/jira/browse/SPARK-24492?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Yu-Jhe Li updated SPARK-24492:
------------------------------
    Attachment: retry_stage.png
                螢幕快照 2018-05-16 上午11.10.57.png
                螢幕快照 2018-05-16 上午11.10.46.png

> Endless attempted task when TaskCommitDenied exception
> ------------------------------------------------------
>
>                 Key: SPARK-24492
>                 URL: https://issues.apache.org/jira/browse/SPARK-24492
>             Project: Spark
>          Issue Type: Bug
>          Components: Spark Core
>    Affects Versions: 2.2.0
>         Environment: Spark version: spark-2.2.0
>            Reporter: Yu-Jhe Li
>            Priority: Critical
>         Attachments: retry_stage.png, 螢幕快照 2018-05-16 上午11.10.46.png, 螢幕快照 
> 2018-05-16 上午11.10.57.png
>
>
> Hi, when we run Spark application under spark-2.2.0 on AWS spot instance and 
> output file to S3, some tasks endless retry and all of them failed with 
> TaskCommitDenied exception. This happened when we run Spark application on 
> some network issue instances. (it runs well on healthy spot instances)
> Sorry, I can find a easy way to reproduce this issue, here's all I can 
> provide.
> The Spark UI shows one task of stage 112 failed due to FetchFailedException 
> (it is network issue) and attempt to retry a new stage 112 (retry 1). But in 
> stage 112 (retry 1), all task failed due to TaskCommitDenied exception, and 
> keep retry (it never succeed and cause lots of S3 requests).
> The other side, driver logs shows:
>  # task 123.0 in stage 112.0 failed due to FetchFailedException (network 
> issue cause corrupted file)
>  # warning message from OutputCommitCoordinator
>  # task 92.0 in stage 112.1 failed when writing rows
>  # keep retry the failed tasks, but never succeed
> {noformat}
> 2018-05-16 02:38:055 WARN  TaskSetManager:66 - Lost task 123.0 in stage 112.0 
> (TID 42909, 10.47.20.17, executor 64): FetchFailed(BlockManagerId(137, 
> 10.235.164.113, 60758, None), shuffleId=39, mapId=59, reduceId=123, message=
> org.apache.spark.shuffle.FetchFailedException: Stream is corrupted
>         at 
> org.apache.spark.storage.ShuffleBlockFetcherIterator.throwFetchFailedException(ShuffleBlockFetcherIterator.scala:442)
>         at 
> org.apache.spark.storage.ShuffleBlockFetcherIterator.next(ShuffleBlockFetcherIterator.scala:403)
>         at 
> org.apache.spark.storage.ShuffleBlockFetcherIterator.next(ShuffleBlockFetcherIterator.scala:59)
>         at scala.collection.Iterator$$anon$12.nextCur(Iterator.scala:434)
>         at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:440)
>         at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
>         at 
> org.apache.spark.util.CompletionIterator.hasNext(CompletionIterator.scala:32)
>         at 
> org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
>         at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
>         at 
> org.apache.spark.sql.execution.aggregate.ObjectAggregationIterator.processInputs(ObjectAggregationIterator.scala:191)
>         at 
> org.apache.spark.sql.execution.aggregate.ObjectAggregationIterator.<init>(ObjectAggregationIterator.scala:80)
>         at 
> org.apache.spark.sql.execution.aggregate.ObjectHashAggregateExec$$anonfun$doExecute$1$$anonfun$2.apply(ObjectHashAggregateExec.scala:109)
>         at 
> org.apache.spark.sql.execution.aggregate.ObjectHashAggregateExec$$anonfun$doExecute$1$$anonfun$2.apply(ObjectHashAggregateExec.scala:101)
>         at 
> org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$25.apply(RDD.scala:827)
>         at 
> org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$25.apply(RDD.scala:827)
>         at 
> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
>         at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
>         at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
>         at 
> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
>         at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
>         at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
>         at 
> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
>         at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
>         at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
>         at 
> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
>         at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
>         at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
>         at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
>         at org.apache.spark.scheduler.Task.run(Task.scala:108)
>         at 
> org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:335)
>         at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
>         at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
>         at java.lang.Thread.run(Thread.java:748)
> Caused by: java.io.IOException: Stream is corrupted
>         at 
> org.apache.spark.io.LZ4BlockInputStream.refill(LZ4BlockInputStream.java:211)
>         at 
> org.apache.spark.io.LZ4BlockInputStream.read(LZ4BlockInputStream.java:125)
>         at 
> org.apache.spark.io.LZ4BlockInputStream.read(LZ4BlockInputStream.java:137)
>         at 
> org.apache.spark.util.Utils$$anonfun$copyStream$1.apply$mcJ$sp(Utils.scala:340)
>         at 
> org.apache.spark.util.Utils$$anonfun$copyStream$1.apply(Utils.scala:327)
>         at 
> org.apache.spark.util.Utils$$anonfun$copyStream$1.apply(Utils.scala:327)
>         at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1337)
>         at org.apache.spark.util.Utils$.copyStream(Utils.scala:348)
>         at 
> org.apache.spark.storage.ShuffleBlockFetcherIterator.next(ShuffleBlockFetcherIterator.scala:395)
>         ... 31 more
> Caused by: net.jpountz.lz4.LZ4Exception: Error decoding offset 11718 of input 
> buffer
>         at 
> net.jpountz.lz4.LZ4JNIFastDecompressor.decompress(LZ4JNIFastDecompressor.java:39)
>         at 
> org.apache.spark.io.LZ4BlockInputStream.refill(LZ4BlockInputStream.java:206)
>         ... 39 more
> )
> 2018-05-16 02:39:058 WARN  OutputCommitCoordinator:66 - Authorizing duplicate 
> request to commit for attemptNumber=0 to commit for stage=112, partition=10; 
> existingCommitter = 0. This can indicate dropped network traffic.
> 2018-05-16 02:39:059 WARN  OutputCommitCoordinator:66 - Authorizing duplicate 
> request to commit for attemptNumber=0 to commit for stage=112, partition=3; 
> existingCommitter = 0. This can indicate dropped network traffic.
> 2018-05-16 02:40:002 WARN  OutputCommitCoordinator:66 - Authorizing duplicate 
> request to commit for attemptNumber=0 to commit for stage=112, partition=4; 
> existingCommitter = 0. This can indicate dropped network traffic.
> 2018-05-16 02:40:002 WARN  OutputCommitCoordinator:66 - Authorizing duplicate 
> request to commit for attemptNumber=0 to commit for stage=112, partition=9; 
> existingCommitter = 0. This can indicate dropped network traffic.
> 2018-05-16 02:40:004 WARN  OutputCommitCoordinator:66 - Authorizing duplicate 
> request to commit for attemptNumber=0 to commit for stage=112, partition=15; 
> existingCommitter = 0. This can indicate dropped network traffic.
> 2018-05-16 02:40:005 WARN  OutputCommitCoordinator:66 - Authorizing duplicate 
> request to commit for attemptNumber=0 to commit for stage=112, partition=12; 
> existingCommitter = 0. This can indicate dropped network traffic.
> 2018-05-16 02:40:005 WARN  OutputCommitCoordinator:66 - Authorizing duplicate 
> request to commit for attemptNumber=0 to commit for stage=112, partition=20; 
> existingCommitter = 0. This can indicate dropped network traffic.
> 2018-05-16 02:40:005 WARN  OutputCommitCoordinator:66 - Authorizing duplicate 
> request to commit for attemptNumber=0 to commit for stage=112, partition=22; 
> existingCommitter = 0. This can indicate dropped network traffic.
> 2018-05-16 02:40:005 WARN  OutputCommitCoordinator:66 - Authorizing duplicate 
> request to commit for attemptNumber=0 to commit for stage=112, partition=25; 
> existingCommitter = 0. This can indicate dropped network traffic.
> 2018-05-16 02:40:006 WARN  OutputCommitCoordinator:66 - Authorizing duplicate 
> request to commit for attemptNumber=0 to commit for stage=112, partition=16; 
> existingCommitter = 0. This can indicate dropped network traffic.
> 2018-05-16 02:40:006 WARN  OutputCommitCoordinator:66 - Authorizing duplicate 
> request to commit for attemptNumber=0 to commit for stage=112, partition=7; 
> existingCommitter = 0. This can indicate dropped network traffic.
> 2018-05-16 02:40:007 WARN  OutputCommitCoordinator:66 - Authorizing duplicate 
> request to commit for attemptNumber=0 to commit for stage=112, partition=0; 
> existingCommitter = 0. This can indicate dropped network traffic.
> 2018-05-16 02:40:007 WARN  OutputCommitCoordinator:66 - Authorizing duplicate 
> request to commit for attemptNumber=0 to commit for stage=112, partition=17; 
> existingCommitter = 0. This can indicate dropped network traffic.
> 2018-05-16 02:40:008 WARN  OutputCommitCoordinator:66 - Authorizing duplicate 
> request to commit for attemptNumber=0 to commit for stage=112, partition=1; 
> existingCommitter = 0. This can indicate dropped network traffic.
> 2018-05-16 02:40:008 WARN  OutputCommitCoordinator:66 - Authorizing duplicate 
> request to commit for attemptNumber=0 to commit for stage=112, partition=13; 
> existingCommitter = 0. This can indicate dropped network traffic.
> 2018-05-16 02:40:008 WARN  OutputCommitCoordinator:66 - Authorizing duplicate 
> request to commit for attemptNumber=0 to commit for stage=112, partition=5; 
> existingCommitter = 0. This can indicate dropped network traffic.
> 2018-05-16 02:40:008 WARN  OutputCommitCoordinator:66 - Authorizing duplicate 
> request to commit for attemptNumber=0 to commit for stage=112, partition=34; 
> existingCommitter = 0. This can indicate dropped network traffic.
> 2018-05-16 02:40:008 WARN  OutputCommitCoordinator:66 - Authorizing duplicate 
> request to commit for attemptNumber=0 to commit for stage=112, partition=6; 
> existingCommitter = 0. This can indicate dropped network traffic.
> 2018-05-16 02:40:008 WARN  OutputCommitCoordinator:66 - Authorizing duplicate 
> request to commit for attemptNumber=0 to commit for stage=112, partition=8; 
> existingCommitter = 0. This can indicate dropped network traffic.
> 2018-05-16 02:40:009 WARN  OutputCommitCoordinator:66 - Authorizing duplicate 
> request to commit for attemptNumber=0 to commit for stage=112, partition=26; 
> existingCommitter = 0. This can indicate dropped network traffic.
> 2018-05-16 02:40:009 WARN  OutputCommitCoordinator:66 - Authorizing duplicate 
> request to commit for attemptNumber=0 to commit for stage=112, partition=19; 
> existingCommitter = 0. This can indicate dropped network traffic.
> 2018-05-16 02:40:009 WARN  OutputCommitCoordinator:66 - Authorizing duplicate 
> request to commit for attemptNumber=0 to commit for stage=112, partition=14; 
> existingCommitter = 0. This can indicate dropped network traffic.
> 2018-05-16 02:40:026 WARN  TaskSetManager:66 - Lost task 92.0 in stage 112.1 
> (TID 43815, 10.124.39.217, executor 29): org.apache.spark.SparkException: 
> Task failed while writing rows
>         at 
> org.apache.spark.sql.execution.datasources.FileFormatWriter$.org$apache$spark$sql$execution$datasources$FileFormatWriter$$executeTask(FileFormatWriter.scala:272)
>         at 
> org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$write$1$$anonfun$apply$mcV$sp$1.apply(FileFormatWriter.scala:191)
>         at 
> org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$write$1$$anonfun$apply$mcV$sp$1.apply(FileFormatWriter.scala:190)
>         at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
>         at org.apache.spark.scheduler.Task.run(Task.scala:108)
>         at 
> org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:335)
>         at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
>         at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
>         at java.lang.Thread.run(Thread.java:748)
> Caused by: java.io.FileNotFoundException: No such file or directory: 
> s3a://xxx/1526437970755/_temporary/0/_temporary/attempt_20180516023940_0112_m_000092_0
>         at 
> org.apache.hadoop.fs.s3a.S3AFileSystem.getFileStatus(S3AFileSystem.java:993)
>         at 
> org.apache.hadoop.fs.s3a.S3AFileSystem.listStatus(S3AFileSystem.java:734)
>         at 
> org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter.mergePaths(FileOutputCommitter.java:426)
>         at 
> org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter.commitTask(FileOutputCommitter.java:539)
>         at 
> org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter.commitTask(FileOutputCommitter.java:502)
>         at 
> org.apache.spark.mapred.SparkHadoopMapRedUtil$.performCommit$1(SparkHadoopMapRedUtil.scala:50)
>         at 
> org.apache.spark.mapred.SparkHadoopMapRedUtil$.commitTask(SparkHadoopMapRedUtil.scala:76)
>         at 
> org.apache.spark.internal.io.HadoopMapReduceCommitProtocol.commitTask(HadoopMapReduceCommitProtocol.scala:153)
>         at 
> org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$org$apache$spark$sql$execution$datasources$FileFormatWriter$$executeTask$3.apply(FileFormatWriter.scala:260)
>         at 
> org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$org$apache$spark$sql$execution$datasources$FileFormatWriter$$executeTask$3.apply(FileFormatWriter.scala:256)
>         at 
> org.apache.spark.util.Utils$.tryWithSafeFinallyAndFailureCallbacks(Utils.scala:1375)
>         at 
> org.apache.spark.sql.execution.datasources.FileFormatWriter$.org$apache$spark$sql$execution$datasources$FileFormatWriter$$executeTask(FileFormatWriter.scala:261)
>         ... 8 more
> 2018-05-16 02:40:026 WARN  JobProgressListener:66 - Task start for unknown 
> stage 112
> 2018-05-16 02:40:026 WARN  TaskSetManager:66 - Lost task 51.0 in stage 112.1 
> (TID 43774, 10.26.158.82, executor 8): TaskCommitDenied (Driver denied task 
> commit) for job: 112, partition: 51, attemptNumber: 0
> 2018-05-16 02:40:026 WARN  TaskSetManager:66 - Lost task 125.0 in stage 112.1 
> (TID 43848, 10.124.42.170, executor 84): TaskCommitDenied (Driver denied task 
> commit) for job: 112, partition: 125, attemptNumber: 0
> 2018-05-16 02:40:026 WARN  TaskSetManager:66 - Lost task 122.0 in stage 112.1 
> (TID 43845, 172.31.18.157, executor 134): TaskCommitDenied (Driver denied 
> task commit) for job: 112, partition: 122, attemptNumber: 0
> 2018-05-16 02:40:027 WARN  TaskSetManager:66 - Lost task 118.0 in stage 112.1 
> (TID 43841, 10.95.1.104, executor 100): TaskCommitDenied (Driver denied task 
> commit) for job: 112, partition: 118, attemptNumber: 0
> 2018-05-16 02:40:027 WARN  TaskSetManager:66 - Lost task 79.0 in stage 112.1 
> (TID 43802, 172.31.22.115, executor 94): TaskCommitDenied (Driver denied task 
> commit) for job: 112, partition: 79, attemptNumber: 0
> 2018-05-16 02:40:027 WARN  TaskSetManager:66 - Lost task 58.0 in stage 112.1 
> (TID 43781, 10.26.158.82, executor 8): TaskCommitDenied (Driver denied task 
> commit) for job: 112, partition: 58, attemptNumber: 0
> 2018-05-16 02:41:008 WARN  TaskSetManager:66 - Lost task 51.1 in stage 112.1 
> (TID 46941, 172.31.26.185, executor 57): TaskCommitDenied (Driver denied task 
> commit) for job: 112, partition: 51, attemptNumber: 1
> 2018-05-16 02:41:010 WARN  TaskSetManager:66 - Lost task 92.1 in stage 112.1 
> (TID 46937, 10.92.157.108, executor 153): TaskCommitDenied (Driver denied 
> task commit) for job: 112, partition: 92, attemptNumber: 1
> 2018-05-16 02:41:010 WARN  TaskSetManager:66 - Lost task 125.1 in stage 112.1 
> (TID 46951, 10.31.223.165, executor 149): TaskCommitDenied (Driver denied 
> task commit) for job: 112, partition: 125, attemptNumber: 1
> 2018-05-16 02:41:011 WARN  TaskSetManager:66 - Lost task 79.1 in stage 112.1 
> (TID 46975, 10.26.158.82, executor 8): TaskCommitDenied (Driver denied task 
> commit) for job: 112, partition: 79, attemptNumber: 1
> 2018-05-16 02:41:013 WARN  TaskSetManager:66 - Lost task 58.1 in stage 112.1 
> (TID 46976, 10.29.28.124, executor 118): TaskCommitDenied (Driver denied task 
> commit) for job: 112, partition: 58, attemptNumber: 1
> {noformat}
> I think we should have a way to avoid the endless attempt.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to