[ 
https://issues.apache.org/jira/browse/SPARK-4879?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14315077#comment-14315077
 ] 

Josh Rosen commented on SPARK-4879:
-----------------------------------

This issue is _really_ hard to reproduce, but I managed to trigger the original 
bug as part of the testing for my patch.  Here's what I ran:

{code}
~/spark-1.3.0-SNAPSHOT-bin-1.0.4/bin/spark-shell --conf 
spark.speculation.multiplier=1 --conf spark.speculation.quantile=0.01 --conf 
spark.speculation=true --conf  
spark.hadoop.outputCommitCoordination.enabled=false
{code}

{code}
val numTasks = 100
val numTrials = 100
val outputPath = "/output-committer-bug-"
val sleepDuration = 1000

for (trial <- 0 to (numTrials - 1)) {
  val outputLocation = outputPath + trial
  sc.parallelize(1 to numTasks, numTasks).mapPartitionsWithContext { case (ctx, 
iter) =>
    if (ctx.partitionId % 5 == 0) {
      if (ctx.attemptNumber == 0) {  // If this is the first attempt, run slow
       Thread.sleep(sleepDuration)
      }
    }
    iter
  }.map(identity).saveAsTextFile(outputLocation)
  Thread.sleep(sleepDuration * 2)
  println("TESTING OUTPUT OF TRIAL " + trial)
  val savedData = sc.textFile(outputLocation).map(_.toInt).collect()
  if (savedData.toSet != (1 to numTasks).toSet) {
    println("MISSING: " + ((1 to numTasks).toSet -- savedData.toSet))
    assert(false)
  }
  println("-" * 80)
}
{code}

It took 22 runs until I actually observed missing output partitions (several of 
the earlier runs threw spurious exceptions and didn't have missing outputs):

{code}
[...]
15/02/10 22:17:21 INFO scheduler.DAGScheduler: Job 66 finished: saveAsTextFile 
at <console>:39, took 2.479592 s
15/02/10 22:17:21 WARN scheduler.TaskSetManager: Lost task 75.0 in stage 66.0 
(TID 6861, ip-172-31-1-124.us-west-2.compute.internal): java.io.IOException: 
Failed to save output of task: attempt_201502102217_0066_m_000075_6861
        at 
org.apache.hadoop.mapred.FileOutputCommitter.moveTaskOutputs(FileOutputCommitter.java:160)
        at 
org.apache.hadoop.mapred.FileOutputCommitter.moveTaskOutputs(FileOutputCommitter.java:172)
        at 
org.apache.hadoop.mapred.FileOutputCommitter.commitTask(FileOutputCommitter.java:132)
        at 
org.apache.spark.SparkHadoopWriter.performCommit$1(SparkHadoopWriter.scala:113)
        at 
org.apache.spark.SparkHadoopWriter.commit(SparkHadoopWriter.scala:150)
        at 
org.apache.spark.rdd.PairRDDFunctions$$anonfun$13.apply(PairRDDFunctions.scala:1082)
        at 
org.apache.spark.rdd.PairRDDFunctions$$anonfun$13.apply(PairRDDFunctions.scala:1059)
        at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:61)
        at org.apache.spark.scheduler.Task.run(Task.scala:64)
        at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:197)
        at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
        at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
        at java.lang.Thread.run(Thread.java:745)

15/02/10 22:17:21 WARN scheduler.TaskSetManager: Lost task 80.0 in stage 66.0 
(TID 6866, ip-172-31-11-151.us-west-2.compute.internal): java.io.IOException: 
The temporary job-output directory 
hdfs://ec2-54-213-142-80.us-west-2.compute.amazonaws.com:9000/output-committer-bug-22/_temporary
 doesn't exist!
        at 
org.apache.hadoop.mapred.FileOutputCommitter.getWorkPath(FileOutputCommitter.java:250)
        at 
org.apache.hadoop.mapred.FileOutputFormat.getTaskOutputPath(FileOutputFormat.java:244)
        at 
org.apache.hadoop.mapred.TextOutputFormat.getRecordWriter(TextOutputFormat.java:116)
        at org.apache.spark.SparkHadoopWriter.open(SparkHadoopWriter.scala:91)
        at 
org.apache.spark.rdd.PairRDDFunctions$$anonfun$13.apply(PairRDDFunctions.scala:1068)
        at 
org.apache.spark.rdd.PairRDDFunctions$$anonfun$13.apply(PairRDDFunctions.scala:1059)
        at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:61)
        at org.apache.spark.scheduler.Task.run(Task.scala:64)
        at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:197)
        at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
        at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
        at java.lang.Thread.run(Thread.java:745)

15/02/10 22:17:21 INFO scheduler.TaskSetManager: Lost task 85.0 in stage 66.0 
(TID 6871) on executor ip-172-31-1-124.us-west-2.compute.internal: 
java.io.IOException (The temporary job-output directory 
hdfs://ec2-54-213-142-80.us-west-2.compute.amazonaws.com:9000/output-committer-bug-22/_temporary
 doesn't exist!) [duplicate 1]
15/02/10 22:17:21 INFO scheduler.TaskSetManager: Lost task 90.0 in stage 66.0 
(TID 6876) on executor ip-172-31-1-124.us-west-2.compute.internal: 
java.io.IOException (The temporary job-output directory 
hdfs://ec2-54-213-142-80.us-west-2.compute.amazonaws.com:9000/output-committer-bug-22/_temporary
 doesn't exist!) [duplicate 2]
15/02/10 22:17:21 INFO scheduler.TaskSetManager: Lost task 95.0 in stage 66.0 
(TID 6881) on executor ip-172-31-11-151.us-west-2.compute.internal: 
java.io.IOException (The temporary job-output directory 
hdfs://ec2-54-213-142-80.us-west-2.compute.amazonaws.com:9000/output-committer-bug-22/_temporary
 doesn't exist!) [duplicate 3]
15/02/10 22:17:21 INFO scheduler.TaskSchedulerImpl: Removed TaskSet 66.0, whose 
tasks have all completed, from pool
TESTING OUTPUT OF TRIAL 22
[...]
15/02/10 22:17:23 INFO scheduler.TaskSchedulerImpl: Removed TaskSet 67.0, whose 
tasks have all completed, from pool
15/02/10 22:17:23 INFO scheduler.DAGScheduler: Stage 67 (collect at 
<console>:42) finished in 0.158 s
15/02/10 22:17:23 INFO scheduler.DAGScheduler: Job 67 finished: collect at 
<console>:42, took 0.165580 s
MISSING: Set(76)
java.lang.AssertionError: assertion failed
[...]
{code}

And I confirmed that it's missing in HDFS:

{code}
~/ephemeral-hdfs/bin/hadoop fs -ls /output-committer-bug-22 | grep part | wc -l
Warning: $HADOOP_HOME is deprecated.

99
{code}

To test my patch, I'm going to remove the flag that disables it and run this 
for a huge number of trials to ensure that we don't hit the missing output bug.

> Missing output partitions after job completes with speculative execution
> ------------------------------------------------------------------------
>
>                 Key: SPARK-4879
>                 URL: https://issues.apache.org/jira/browse/SPARK-4879
>             Project: Spark
>          Issue Type: Bug
>          Components: Input/Output, Spark Core
>    Affects Versions: 1.0.2, 1.1.1, 1.2.0
>            Reporter: Josh Rosen
>            Assignee: Josh Rosen
>            Priority: Critical
>         Attachments: speculation.txt, speculation2.txt
>
>
> When speculative execution is enabled ({{spark.speculation=true}}), jobs that 
> save output files may report that they have completed successfully even 
> though some output partitions written by speculative tasks may be missing.
> h3. Reproduction
> This symptom was reported to me by a Spark user and I've been doing my own 
> investigation to try to come up with an in-house reproduction.
> I'm still working on a reliable local reproduction for this issue, which is a 
> little tricky because Spark won't schedule speculated tasks on the same host 
> as the original task, so you need an actual (or containerized) multi-host 
> cluster to test speculation.  Here's a simple reproduction of some of the 
> symptoms on EC2, which can be run in {{spark-shell}} with {{--conf 
> spark.speculation=true}}:
> {code}
>     // Rig a job such that all but one of the tasks complete instantly
>     // and one task runs for 20 seconds on its first attempt and instantly
>     // on its second attempt:
>     val numTasks = 100
>     sc.parallelize(1 to numTasks, 
> numTasks).repartition(2).mapPartitionsWithContext { case (ctx, iter) =>
>       if (ctx.partitionId == 0) {  // If this is the one task that should run 
> really slow
>         if (ctx.attemptId == 0) {  // If this is the first attempt, run slow
>          Thread.sleep(20 * 1000)
>         }
>       }
>       iter
>     }.map(x => (x, x)).saveAsTextFile("/test4")
> {code}
> When I run this, I end up with a job that completes quickly (due to 
> speculation) but reports failures from the speculated task:
> {code}
> [...]
> 14/12/11 01:41:13 INFO scheduler.TaskSetManager: Finished task 37.1 in stage 
> 3.0 (TID 411) in 131 ms on ip-172-31-8-164.us-west-2.compute.internal 
> (100/100)
> 14/12/11 01:41:13 INFO scheduler.DAGScheduler: Stage 3 (saveAsTextFile at 
> <console>:22) finished in 0.856 s
> 14/12/11 01:41:13 INFO spark.SparkContext: Job finished: saveAsTextFile at 
> <console>:22, took 0.885438374 s
> 14/12/11 01:41:13 INFO scheduler.TaskSetManager: Ignoring task-finished event 
> for 70.1 in stage 3.0 because task 70 has already completed successfully
> scala> 14/12/11 01:41:13 WARN scheduler.TaskSetManager: Lost task 49.1 in 
> stage 3.0 (TID 413, ip-172-31-8-164.us-west-2.compute.internal): 
> java.io.IOException: Failed to save output of task: 
> attempt_201412110141_0003_m_000049_413
>         
> org.apache.hadoop.mapred.FileOutputCommitter.moveTaskOutputs(FileOutputCommitter.java:160)
>         
> org.apache.hadoop.mapred.FileOutputCommitter.moveTaskOutputs(FileOutputCommitter.java:172)
>         
> org.apache.hadoop.mapred.FileOutputCommitter.commitTask(FileOutputCommitter.java:132)
>         org.apache.spark.SparkHadoopWriter.commit(SparkHadoopWriter.scala:109)
>         
> org.apache.spark.rdd.PairRDDFunctions$$anonfun$13.apply(PairRDDFunctions.scala:991)
>         
> org.apache.spark.rdd.PairRDDFunctions$$anonfun$13.apply(PairRDDFunctions.scala:974)
>         org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:62)
>         org.apache.spark.scheduler.Task.run(Task.scala:54)
>         org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:177)
>         
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
>         
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
>         java.lang.Thread.run(Thread.java:745)
> {code}
> One interesting thing to note about this stack trace: if we look at 
> {{FileOutputCommitter.java:160}} 
> ([link|http://grepcode.com/file/repository.cloudera.com/content/repositories/releases/org.apache.hadoop/hadoop-core/2.5.0-mr1-cdh5.2.0/org/apache/hadoop/mapred/FileOutputCommitter.java#160]),
>  this point in the execution seems to correspond to a case where a task 
> completes, attempts to commit its output, fails for some reason, then deletes 
> the destination file, tries again, and fails:
> {code}
>  if (fs.isFile(taskOutput)) {
> 152      Path finalOutputPath = getFinalPath(jobOutputDir, taskOutput, 
> 153                                          getTempTaskOutputPath(context));
> 154      if (!fs.rename(taskOutput, finalOutputPath)) {
> 155        if (!fs.delete(finalOutputPath, true)) {
> 156          throw new IOException("Failed to delete earlier output of task: 
> " + 
> 157                                 attemptId);
> 158        }
> 159        if (!fs.rename(taskOutput, finalOutputPath)) {
> 160          throw new IOException("Failed to save output of task: " + 
> 161                     attemptId);
> 162        }
> 163      }
> {code}
> This could explain why the output file is missing: the second copy of the 
> task keeps running after the job completes and deletes the output written by 
> the other task after failing to commit its own copy of the output.
> There are still a few open questions about how exactly we get into this 
> scenario:
> *Why is the second copy of the task allowed to commit its output after the 
> other task / the job has successfully completed?*
> To check whether a task's temporary output should be committed, 
> SparkHadoopWriter calls {{FileOutputCommitter.needsTaskCommit()}}, which 
> returns {{true}} if the tasks's temporary output exists 
> ([link|http://grepcode.com/file/repository.cloudera.com/content/repositories/releases/org.apache.hadoop/hadoop-core/2.5.0-mr1-cdh5.2.0/org/apache/hadoop/mapred/FileOutputCommitter.java#206]).
>   Tihs does not seem to check whether the destination already exists.  This 
> means that {{needsTaskCommit}} can return {{true}} for speculative tasks.
> *Why does the rename fail?*
> I think that what's happening is that the temporary task output files are 
> being deleted once the job has completed, which is causing the {{rename}} to 
> fail because {{FileOutputCommitter.commitTask}} doesn't seem to guard against 
> missing output files.
> I'm not sure about this, though, since the stack trace seems to imply that 
> the temporary output file existed.  Maybe the filesystem methods are 
> returning stale metadata?  Maybe there's a race?  I think a race condition 
> seems pretty unlikely, since the time-scale at which it would have to happen 
> doesn't sync up with the scale of the timestamps that I saw in the user 
> report.
> h3. Possible Fixes:
> The root problem here might be that speculative copies of tasks are somehow 
> allowed to commit their output.  We might be able to fix this by centralizing 
> the "should this task commit its output" decision at the driver.
> (I have more concrete suggestions of how to do this; to be posted soon)



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to