[ https://issues.apache.org/jira/browse/SPARK-4879?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14315077#comment-14315077 ]
Josh Rosen commented on SPARK-4879: ----------------------------------- This issue is _really_ hard to reproduce, but I managed to trigger the original bug as part of the testing for my patch. Here's what I ran: {code} ~/spark-1.3.0-SNAPSHOT-bin-1.0.4/bin/spark-shell --conf spark.speculation.multiplier=1 --conf spark.speculation.quantile=0.01 --conf spark.speculation=true --conf spark.hadoop.outputCommitCoordination.enabled=false {code} {code} val numTasks = 100 val numTrials = 100 val outputPath = "/output-committer-bug-" val sleepDuration = 1000 for (trial <- 0 to (numTrials - 1)) { val outputLocation = outputPath + trial sc.parallelize(1 to numTasks, numTasks).mapPartitionsWithContext { case (ctx, iter) => if (ctx.partitionId % 5 == 0) { if (ctx.attemptNumber == 0) { // If this is the first attempt, run slow Thread.sleep(sleepDuration) } } iter }.map(identity).saveAsTextFile(outputLocation) Thread.sleep(sleepDuration * 2) println("TESTING OUTPUT OF TRIAL " + trial) val savedData = sc.textFile(outputLocation).map(_.toInt).collect() if (savedData.toSet != (1 to numTasks).toSet) { println("MISSING: " + ((1 to numTasks).toSet -- savedData.toSet)) assert(false) } println("-" * 80) } {code} It took 22 runs until I actually observed missing output partitions (several of the earlier runs threw spurious exceptions and didn't have missing outputs): {code} [...] 15/02/10 22:17:21 INFO scheduler.DAGScheduler: Job 66 finished: saveAsTextFile at <console>:39, took 2.479592 s 15/02/10 22:17:21 WARN scheduler.TaskSetManager: Lost task 75.0 in stage 66.0 (TID 6861, ip-172-31-1-124.us-west-2.compute.internal): java.io.IOException: Failed to save output of task: attempt_201502102217_0066_m_000075_6861 at org.apache.hadoop.mapred.FileOutputCommitter.moveTaskOutputs(FileOutputCommitter.java:160) at org.apache.hadoop.mapred.FileOutputCommitter.moveTaskOutputs(FileOutputCommitter.java:172) at org.apache.hadoop.mapred.FileOutputCommitter.commitTask(FileOutputCommitter.java:132) at org.apache.spark.SparkHadoopWriter.performCommit$1(SparkHadoopWriter.scala:113) at org.apache.spark.SparkHadoopWriter.commit(SparkHadoopWriter.scala:150) at org.apache.spark.rdd.PairRDDFunctions$$anonfun$13.apply(PairRDDFunctions.scala:1082) at org.apache.spark.rdd.PairRDDFunctions$$anonfun$13.apply(PairRDDFunctions.scala:1059) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:61) at org.apache.spark.scheduler.Task.run(Task.scala:64) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:197) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) at java.lang.Thread.run(Thread.java:745) 15/02/10 22:17:21 WARN scheduler.TaskSetManager: Lost task 80.0 in stage 66.0 (TID 6866, ip-172-31-11-151.us-west-2.compute.internal): java.io.IOException: The temporary job-output directory hdfs://ec2-54-213-142-80.us-west-2.compute.amazonaws.com:9000/output-committer-bug-22/_temporary doesn't exist! at org.apache.hadoop.mapred.FileOutputCommitter.getWorkPath(FileOutputCommitter.java:250) at org.apache.hadoop.mapred.FileOutputFormat.getTaskOutputPath(FileOutputFormat.java:244) at org.apache.hadoop.mapred.TextOutputFormat.getRecordWriter(TextOutputFormat.java:116) at org.apache.spark.SparkHadoopWriter.open(SparkHadoopWriter.scala:91) at org.apache.spark.rdd.PairRDDFunctions$$anonfun$13.apply(PairRDDFunctions.scala:1068) at org.apache.spark.rdd.PairRDDFunctions$$anonfun$13.apply(PairRDDFunctions.scala:1059) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:61) at org.apache.spark.scheduler.Task.run(Task.scala:64) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:197) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) at java.lang.Thread.run(Thread.java:745) 15/02/10 22:17:21 INFO scheduler.TaskSetManager: Lost task 85.0 in stage 66.0 (TID 6871) on executor ip-172-31-1-124.us-west-2.compute.internal: java.io.IOException (The temporary job-output directory hdfs://ec2-54-213-142-80.us-west-2.compute.amazonaws.com:9000/output-committer-bug-22/_temporary doesn't exist!) [duplicate 1] 15/02/10 22:17:21 INFO scheduler.TaskSetManager: Lost task 90.0 in stage 66.0 (TID 6876) on executor ip-172-31-1-124.us-west-2.compute.internal: java.io.IOException (The temporary job-output directory hdfs://ec2-54-213-142-80.us-west-2.compute.amazonaws.com:9000/output-committer-bug-22/_temporary doesn't exist!) [duplicate 2] 15/02/10 22:17:21 INFO scheduler.TaskSetManager: Lost task 95.0 in stage 66.0 (TID 6881) on executor ip-172-31-11-151.us-west-2.compute.internal: java.io.IOException (The temporary job-output directory hdfs://ec2-54-213-142-80.us-west-2.compute.amazonaws.com:9000/output-committer-bug-22/_temporary doesn't exist!) [duplicate 3] 15/02/10 22:17:21 INFO scheduler.TaskSchedulerImpl: Removed TaskSet 66.0, whose tasks have all completed, from pool TESTING OUTPUT OF TRIAL 22 [...] 15/02/10 22:17:23 INFO scheduler.TaskSchedulerImpl: Removed TaskSet 67.0, whose tasks have all completed, from pool 15/02/10 22:17:23 INFO scheduler.DAGScheduler: Stage 67 (collect at <console>:42) finished in 0.158 s 15/02/10 22:17:23 INFO scheduler.DAGScheduler: Job 67 finished: collect at <console>:42, took 0.165580 s MISSING: Set(76) java.lang.AssertionError: assertion failed [...] {code} And I confirmed that it's missing in HDFS: {code} ~/ephemeral-hdfs/bin/hadoop fs -ls /output-committer-bug-22 | grep part | wc -l Warning: $HADOOP_HOME is deprecated. 99 {code} To test my patch, I'm going to remove the flag that disables it and run this for a huge number of trials to ensure that we don't hit the missing output bug. > Missing output partitions after job completes with speculative execution > ------------------------------------------------------------------------ > > Key: SPARK-4879 > URL: https://issues.apache.org/jira/browse/SPARK-4879 > Project: Spark > Issue Type: Bug > Components: Input/Output, Spark Core > Affects Versions: 1.0.2, 1.1.1, 1.2.0 > Reporter: Josh Rosen > Assignee: Josh Rosen > Priority: Critical > Attachments: speculation.txt, speculation2.txt > > > When speculative execution is enabled ({{spark.speculation=true}}), jobs that > save output files may report that they have completed successfully even > though some output partitions written by speculative tasks may be missing. > h3. Reproduction > This symptom was reported to me by a Spark user and I've been doing my own > investigation to try to come up with an in-house reproduction. > I'm still working on a reliable local reproduction for this issue, which is a > little tricky because Spark won't schedule speculated tasks on the same host > as the original task, so you need an actual (or containerized) multi-host > cluster to test speculation. Here's a simple reproduction of some of the > symptoms on EC2, which can be run in {{spark-shell}} with {{--conf > spark.speculation=true}}: > {code} > // Rig a job such that all but one of the tasks complete instantly > // and one task runs for 20 seconds on its first attempt and instantly > // on its second attempt: > val numTasks = 100 > sc.parallelize(1 to numTasks, > numTasks).repartition(2).mapPartitionsWithContext { case (ctx, iter) => > if (ctx.partitionId == 0) { // If this is the one task that should run > really slow > if (ctx.attemptId == 0) { // If this is the first attempt, run slow > Thread.sleep(20 * 1000) > } > } > iter > }.map(x => (x, x)).saveAsTextFile("/test4") > {code} > When I run this, I end up with a job that completes quickly (due to > speculation) but reports failures from the speculated task: > {code} > [...] > 14/12/11 01:41:13 INFO scheduler.TaskSetManager: Finished task 37.1 in stage > 3.0 (TID 411) in 131 ms on ip-172-31-8-164.us-west-2.compute.internal > (100/100) > 14/12/11 01:41:13 INFO scheduler.DAGScheduler: Stage 3 (saveAsTextFile at > <console>:22) finished in 0.856 s > 14/12/11 01:41:13 INFO spark.SparkContext: Job finished: saveAsTextFile at > <console>:22, took 0.885438374 s > 14/12/11 01:41:13 INFO scheduler.TaskSetManager: Ignoring task-finished event > for 70.1 in stage 3.0 because task 70 has already completed successfully > scala> 14/12/11 01:41:13 WARN scheduler.TaskSetManager: Lost task 49.1 in > stage 3.0 (TID 413, ip-172-31-8-164.us-west-2.compute.internal): > java.io.IOException: Failed to save output of task: > attempt_201412110141_0003_m_000049_413 > > org.apache.hadoop.mapred.FileOutputCommitter.moveTaskOutputs(FileOutputCommitter.java:160) > > org.apache.hadoop.mapred.FileOutputCommitter.moveTaskOutputs(FileOutputCommitter.java:172) > > org.apache.hadoop.mapred.FileOutputCommitter.commitTask(FileOutputCommitter.java:132) > org.apache.spark.SparkHadoopWriter.commit(SparkHadoopWriter.scala:109) > > org.apache.spark.rdd.PairRDDFunctions$$anonfun$13.apply(PairRDDFunctions.scala:991) > > org.apache.spark.rdd.PairRDDFunctions$$anonfun$13.apply(PairRDDFunctions.scala:974) > org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:62) > org.apache.spark.scheduler.Task.run(Task.scala:54) > org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:177) > > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) > > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) > java.lang.Thread.run(Thread.java:745) > {code} > One interesting thing to note about this stack trace: if we look at > {{FileOutputCommitter.java:160}} > ([link|http://grepcode.com/file/repository.cloudera.com/content/repositories/releases/org.apache.hadoop/hadoop-core/2.5.0-mr1-cdh5.2.0/org/apache/hadoop/mapred/FileOutputCommitter.java#160]), > this point in the execution seems to correspond to a case where a task > completes, attempts to commit its output, fails for some reason, then deletes > the destination file, tries again, and fails: > {code} > if (fs.isFile(taskOutput)) { > 152 Path finalOutputPath = getFinalPath(jobOutputDir, taskOutput, > 153 getTempTaskOutputPath(context)); > 154 if (!fs.rename(taskOutput, finalOutputPath)) { > 155 if (!fs.delete(finalOutputPath, true)) { > 156 throw new IOException("Failed to delete earlier output of task: > " + > 157 attemptId); > 158 } > 159 if (!fs.rename(taskOutput, finalOutputPath)) { > 160 throw new IOException("Failed to save output of task: " + > 161 attemptId); > 162 } > 163 } > {code} > This could explain why the output file is missing: the second copy of the > task keeps running after the job completes and deletes the output written by > the other task after failing to commit its own copy of the output. > There are still a few open questions about how exactly we get into this > scenario: > *Why is the second copy of the task allowed to commit its output after the > other task / the job has successfully completed?* > To check whether a task's temporary output should be committed, > SparkHadoopWriter calls {{FileOutputCommitter.needsTaskCommit()}}, which > returns {{true}} if the tasks's temporary output exists > ([link|http://grepcode.com/file/repository.cloudera.com/content/repositories/releases/org.apache.hadoop/hadoop-core/2.5.0-mr1-cdh5.2.0/org/apache/hadoop/mapred/FileOutputCommitter.java#206]). > Tihs does not seem to check whether the destination already exists. This > means that {{needsTaskCommit}} can return {{true}} for speculative tasks. > *Why does the rename fail?* > I think that what's happening is that the temporary task output files are > being deleted once the job has completed, which is causing the {{rename}} to > fail because {{FileOutputCommitter.commitTask}} doesn't seem to guard against > missing output files. > I'm not sure about this, though, since the stack trace seems to imply that > the temporary output file existed. Maybe the filesystem methods are > returning stale metadata? Maybe there's a race? I think a race condition > seems pretty unlikely, since the time-scale at which it would have to happen > doesn't sync up with the scale of the timestamps that I saw in the user > report. > h3. Possible Fixes: > The root problem here might be that speculative copies of tasks are somehow > allowed to commit their output. We might be able to fix this by centralizing > the "should this task commit its output" decision at the driver. > (I have more concrete suggestions of how to do this; to be posted soon) -- This message was sent by Atlassian JIRA (v6.3.4#6332) --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org