[ 
https://issues.apache.org/jira/browse/SPARK-4879?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14279406#comment-14279406
 ] 

Josh Rosen commented on SPARK-4879:
-----------------------------------

I'm not sure that SparkHadoopWriter's use of FileOutputCommitter properly obeys 
the OutputCommitter contracts in Hadoop.  According to the [OutputCommitter 
Javadoc|https://hadoop.apache.org/docs/current/api/org/apache/hadoop/mapreduce/OutputCommitter.html]

{quote}
The methods in this class can be called from several different processes and 
from several different contexts. It is important to know which process and 
which context each is called from. Each method should be marked accordingly in 
its documentation. It is also important to note that not all methods are 
guaranteed to be called once and only once. If a method is not guaranteed to 
have this property the output committer needs to handle this appropriately. 
Also note it will only be in rare situations where they may be called multiple 
times for the same task.
{quote}

Based on the documentation, `needsTaskCommit` " is called from each individual 
task's process that will output to HDFS, and it is called just for that task.", 
so it seems like it should be safe to call this from SparkHadoopWriter.

However, maybe we're misusing the `commitTask` method:

{quote}
If needsTaskCommit(TaskAttemptContext) returns true and this task is the task 
that the AM determines finished first, this method is called to commit an 
individual task's output. This is to mark that tasks output as complete, as 
commitJob(JobContext) will also be called later on if the entire job finished 
successfully. This is called from a task's process. This may be called multiple 
times for the same task, but different task attempts. It should be very rare 
for this to be called multiple times and requires odd networking failures to 
make this happen. In the future the Hadoop framework may eliminate this race. 
{quote}

I think that we're missing the "this task is the task that the AM determines 
finished first" part of the equation here.  If `needsTaskCommit` is false, then 
we definitely shouldn't commit (e.g. if it's an original task that lost to a 
speculated copy), but if it's true then I don't think it's safe to commit; we 
need some central authority to pick a winner.

Let's see how Hadoop does things, working backwards from actual calls of 
`commitTask` to see whether they're guarded by some coordination through the 
AM.  It looks like `OutputCommitter` is part of the `mapred` API, so I'll only 
look at classes in that package:

In `Task.java`, `committer.commitTask` is only performed after checking 
`canCommit` through `TaskUmbilicalProtocol`: 
https://github.com/apache/hadoop/blob/a655973e781caf662b360c96e0fa3f5a873cf676/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/Task.java#L1185.
  According to the Javadocs for TaskAttemptListenerImpl.canCommit (the actual 
concrete implementation of this method):

{code}
  /**
   * Child checking whether it can commit.
   * 
   * <br/>
   * Commit is a two-phased protocol. First the attempt informs the
   * ApplicationMaster that it is
   * {@link #commitPending(TaskAttemptID, TaskStatus)}. Then it repeatedly polls
   * the ApplicationMaster whether it {@link #canCommit(TaskAttemptID)} This is
   * a legacy from the centralized commit protocol handling by the JobTracker.
   */
  @Override
  public boolean canCommit(TaskAttemptID taskAttemptID) throws IOException {
{code}

This ends up delegating to `Task.canCommit()`:

{code}
  /**
   * Can the output of the taskAttempt be committed. Note that once the task
   * gives a go for a commit, further canCommit requests from any other attempts
   * should return false.
   * 
   * @param taskAttemptID
   * @return whether the attempt's output can be committed or not.
   */
  boolean canCommit(TaskAttemptId taskAttemptID);
{code}

There's a bunch of tricky logic that involves communication with the AM (see 
AttemptCommitPendingTransition and the other transitions in TaskImpl), but it 
looks like the gist is that the "winner" is picked by the AM through some 
central coordination process. 

So, it looks like the right fix is to implement these same state transitions 
ourselves.  It would be nice if there was a clean way to do this that could be 
easily backported to maintenance branches.  

> Missing output partitions after job completes with speculative execution
> ------------------------------------------------------------------------
>
>                 Key: SPARK-4879
>                 URL: https://issues.apache.org/jira/browse/SPARK-4879
>             Project: Spark
>          Issue Type: Bug
>          Components: Input/Output, Spark Core
>    Affects Versions: 1.0.2, 1.1.1, 1.2.0
>            Reporter: Josh Rosen
>            Assignee: Josh Rosen
>            Priority: Critical
>         Attachments: speculation.txt, speculation2.txt
>
>
> When speculative execution is enabled ({{spark.speculation=true}}), jobs that 
> save output files may report that they have completed successfully even 
> though some output partitions written by speculative tasks may be missing.
> h3. Reproduction
> This symptom was reported to me by a Spark user and I've been doing my own 
> investigation to try to come up with an in-house reproduction.
> I'm still working on a reliable local reproduction for this issue, which is a 
> little tricky because Spark won't schedule speculated tasks on the same host 
> as the original task, so you need an actual (or containerized) multi-host 
> cluster to test speculation.  Here's a simple reproduction of some of the 
> symptoms on EC2, which can be run in {{spark-shell}} with {{--conf 
> spark.speculation=true}}:
> {code}
>     // Rig a job such that all but one of the tasks complete instantly
>     // and one task runs for 20 seconds on its first attempt and instantly
>     // on its second attempt:
>     val numTasks = 100
>     sc.parallelize(1 to numTasks, 
> numTasks).repartition(2).mapPartitionsWithContext { case (ctx, iter) =>
>       if (ctx.partitionId == 0) {  // If this is the one task that should run 
> really slow
>         if (ctx.attemptId == 0) {  // If this is the first attempt, run slow
>          Thread.sleep(20 * 1000)
>         }
>       }
>       iter
>     }.map(x => (x, x)).saveAsTextFile("/test4")
> {code}
> When I run this, I end up with a job that completes quickly (due to 
> speculation) but reports failures from the speculated task:
> {code}
> [...]
> 14/12/11 01:41:13 INFO scheduler.TaskSetManager: Finished task 37.1 in stage 
> 3.0 (TID 411) in 131 ms on ip-172-31-8-164.us-west-2.compute.internal 
> (100/100)
> 14/12/11 01:41:13 INFO scheduler.DAGScheduler: Stage 3 (saveAsTextFile at 
> <console>:22) finished in 0.856 s
> 14/12/11 01:41:13 INFO spark.SparkContext: Job finished: saveAsTextFile at 
> <console>:22, took 0.885438374 s
> 14/12/11 01:41:13 INFO scheduler.TaskSetManager: Ignoring task-finished event 
> for 70.1 in stage 3.0 because task 70 has already completed successfully
> scala> 14/12/11 01:41:13 WARN scheduler.TaskSetManager: Lost task 49.1 in 
> stage 3.0 (TID 413, ip-172-31-8-164.us-west-2.compute.internal): 
> java.io.IOException: Failed to save output of task: 
> attempt_201412110141_0003_m_000049_413
>         
> org.apache.hadoop.mapred.FileOutputCommitter.moveTaskOutputs(FileOutputCommitter.java:160)
>         
> org.apache.hadoop.mapred.FileOutputCommitter.moveTaskOutputs(FileOutputCommitter.java:172)
>         
> org.apache.hadoop.mapred.FileOutputCommitter.commitTask(FileOutputCommitter.java:132)
>         org.apache.spark.SparkHadoopWriter.commit(SparkHadoopWriter.scala:109)
>         
> org.apache.spark.rdd.PairRDDFunctions$$anonfun$13.apply(PairRDDFunctions.scala:991)
>         
> org.apache.spark.rdd.PairRDDFunctions$$anonfun$13.apply(PairRDDFunctions.scala:974)
>         org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:62)
>         org.apache.spark.scheduler.Task.run(Task.scala:54)
>         org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:177)
>         
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
>         
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
>         java.lang.Thread.run(Thread.java:745)
> {code}
> One interesting thing to note about this stack trace: if we look at 
> {{FileOutputCommitter.java:160}} 
> ([link|http://grepcode.com/file/repository.cloudera.com/content/repositories/releases/org.apache.hadoop/hadoop-core/2.5.0-mr1-cdh5.2.0/org/apache/hadoop/mapred/FileOutputCommitter.java#160]),
>  this point in the execution seems to correspond to a case where a task 
> completes, attempts to commit its output, fails for some reason, then deletes 
> the destination file, tries again, and fails:
> {code}
>  if (fs.isFile(taskOutput)) {
> 152      Path finalOutputPath = getFinalPath(jobOutputDir, taskOutput, 
> 153                                          getTempTaskOutputPath(context));
> 154      if (!fs.rename(taskOutput, finalOutputPath)) {
> 155        if (!fs.delete(finalOutputPath, true)) {
> 156          throw new IOException("Failed to delete earlier output of task: 
> " + 
> 157                                 attemptId);
> 158        }
> 159        if (!fs.rename(taskOutput, finalOutputPath)) {
> 160          throw new IOException("Failed to save output of task: " + 
> 161                     attemptId);
> 162        }
> 163      }
> {code}
> This could explain why the output file is missing: the second copy of the 
> task keeps running after the job completes and deletes the output written by 
> the other task after failing to commit its own copy of the output.
> There are still a few open questions about how exactly we get into this 
> scenario:
> *Why is the second copy of the task allowed to commit its output after the 
> other task / the job has successfully completed?*
> To check whether a task's temporary output should be committed, 
> SparkHadoopWriter calls {{FileOutputCommitter.needsTaskCommit()}}, which 
> returns {{true}} if the tasks's temporary output exists 
> ([link|http://grepcode.com/file/repository.cloudera.com/content/repositories/releases/org.apache.hadoop/hadoop-core/2.5.0-mr1-cdh5.2.0/org/apache/hadoop/mapred/FileOutputCommitter.java#206]).
>   Tihs does not seem to check whether the destination already exists.  This 
> means that {{needsTaskCommit}} can return {{true}} for speculative tasks.
> *Why does the rename fail?*
> I think that what's happening is that the temporary task output files are 
> being deleted once the job has completed, which is causing the {{rename}} to 
> fail because {{FileOutputCommitter.commitTask}} doesn't seem to guard against 
> missing output files.
> I'm not sure about this, though, since the stack trace seems to imply that 
> the temporary output file existed.  Maybe the filesystem methods are 
> returning stale metadata?  Maybe there's a race?  I think a race condition 
> seems pretty unlikely, since the time-scale at which it would have to happen 
> doesn't sync up with the scale of the timestamps that I saw in the user 
> report.
> h3. Possible Fixes:
> The root problem here might be that speculative copies of tasks are somehow 
> allowed to commit their output.  We might be able to fix this by centralizing 
> the "should this task commit its output" decision at the driver.
> (I have more concrete suggestions of how to do this; to be posted soon)



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to