[
https://issues.apache.org/jira/browse/SPARK-4879?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Zach Fry updated SPARK-4879:
----------------------------
Attachment: speculation2.txt
speculation.txt
Hey Josh,
I have been playing around with your repro above and I think I can consistently
trigger the bad behavior by just tweaking the value of
{{spark.speculation.multiplier}} and {{spark.speculation.quantile}}.
I set the {{multiplier}} to be 1 and the {{quantile}} to 0.01 so that only 1%
of tasks have to finish before any task that takes longer than those 1% of
tasks should speculate.
As expected, I see a lot of tasks getting speculated.
After running the repro about 5 times, I have seen 2 errors (stack traces at
the bottom and the full run from the REPL is attached with this comment).
One thing I do notice is that the part-00000 associated with Stage 1 was always
where I expected it to be in HDFS, and all lines were present (checked using a
{{wc -l}})
{code}
scala> 15/01/07 13:44:26 WARN scheduler.TaskSetManager: Lost task 0.1 in stage
0.0 (TID 119, <redacted-host-02>): java.io.IOException: The temporary
job-output directory hdfs://<redacted-host-01>:8020/test6/_temporary doesn't
exist!
org.apache.hadoop.mapred.FileOutputCommitter.getWorkPath(FileOutputCommitter.java:250)
org.apache.hadoop.mapred.FileOutputFormat.getTaskOutputPath(FileOutputFormat.java:240)
org.apache.hadoop.mapred.TextOutputFormat.getRecordWriter(TextOutputFormat.java:116)
org.apache.spark.SparkHadoopWriter.open(SparkHadoopWriter.scala:89)
org.apache.spark.rdd.PairRDDFunctions$$anonfun$13.apply(PairRDDFunctions.scala:980)
org.apache.spark.rdd.PairRDDFunctions$$anonfun$13.apply(PairRDDFunctions.scala:974)
org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:62)
org.apache.spark.scheduler.Task.run(Task.scala:54)
org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:178)
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
java.lang.Thread.run(Thread.java:745)
{code}
{code}
15/01/07 15:17:39 WARN scheduler.TaskSetManager: Lost task 0.1 in stage 0.0
(TID 120, <redacted-host-03>): org.apache.hadoop.ipc.RemoteException: No lease
on /test7/_temporary/_attempt_201501071517_0000_m_000000_120/part-00000: File
does not exist. Holder DFSClient_NONMAPREDUCE_-469253416_73 does not have any
open files.
at
org.apache.hadoop.hdfs.server.namenode.FSNamesystem.checkLease(FSNamesystem.java:2609)
at
org.apache.hadoop.hdfs.server.namenode.FSNamesystem.analyzeFileState(FSNamesystem.java:2426)
at
org.apache.hadoop.hdfs.server.namenode.FSNamesystem.getAdditionalBlock(FSNamesystem.java:2339)
at
org.apache.hadoop.hdfs.server.namenode.NameNodeRpcServer.addBlock(NameNodeRpcServer.java:501)
at
org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolServerSideTranslatorPB.addBlock(ClientNamenodeProtocolServerSideTranslatorPB.java:299)
at
org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos$ClientNamenodeProtocol$2.callBlockingMethod(ClientNamenodeProtocolProtos.java:44954)
at
org.apache.hadoop.ipc.ProtobufRpcEngine$Server$ProtoBufRpcInvoker.call(ProtobufRpcEngine.java:453)
at org.apache.hadoop.ipc.RPC$Server.call(RPC.java:1002)
at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:1752)
at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:1748)
at java.security.AccessController.doPrivileged(Native Method)
at javax.security.auth.Subject.doAs(Subject.java:415)
at
org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1438)
at org.apache.hadoop.ipc.Server$Handler.run(Server.java:1746)
org.apache.hadoop.ipc.Client.call(Client.java:1238)
org.apache.hadoop.ipc.ProtobufRpcEngine$Invoker.invoke(ProtobufRpcEngine.java:202)
com.sun.proxy.$Proxy9.addBlock(Unknown Source)
sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
java.lang.reflect.Method.invoke(Method.java:606)
org.apache.hadoop.io.retry.RetryInvocationHandler.invokeMethod(RetryInvocationHandler.java:164)
org.apache.hadoop.io.retry.RetryInvocationHandler.invoke(RetryInvocationHandler.java:83)
com.sun.proxy.$Proxy9.addBlock(Unknown Source)
org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolTranslatorPB.addBlock(ClientNamenodeProtocolTranslatorPB.java:291)
org.apache.hadoop.hdfs.DFSOutputStream$DataStreamer.locateFollowingBlock(DFSOutputStream.java:1177)
org.apache.hadoop.hdfs.DFSOutputStream$DataStreamer.nextBlockOutputStream(DFSOutputStream.java:1030)
org.apache.hadoop.hdfs.DFSOutputStream$DataStreamer.run(DFSOutputStream.java:488)
{code}
> Missing output partitions after job completes with speculative execution
> ------------------------------------------------------------------------
>
> Key: SPARK-4879
> URL: https://issues.apache.org/jira/browse/SPARK-4879
> Project: Spark
> Issue Type: Bug
> Components: Input/Output, Spark Core
> Affects Versions: 1.0.2, 1.1.1, 1.2.0
> Reporter: Josh Rosen
> Assignee: Josh Rosen
> Priority: Critical
> Attachments: speculation.txt, speculation2.txt
>
>
> When speculative execution is enabled ({{spark.speculation=true}}), jobs that
> save output files may report that they have completed successfully even
> though some output partitions written by speculative tasks may be missing.
> h3. Reproduction
> This symptom was reported to me by a Spark user and I've been doing my own
> investigation to try to come up with an in-house reproduction.
> I'm still working on a reliable local reproduction for this issue, which is a
> little tricky because Spark won't schedule speculated tasks on the same host
> as the original task, so you need an actual (or containerized) multi-host
> cluster to test speculation. Here's a simple reproduction of some of the
> symptoms on EC2, which can be run in {{spark-shell}} with {{--conf
> spark.speculation=true}}:
> {code}
> // Rig a job such that all but one of the tasks complete instantly
> // and one task runs for 20 seconds on its first attempt and instantly
> // on its second attempt:
> val numTasks = 100
> sc.parallelize(1 to numTasks,
> numTasks).repartition(2).mapPartitionsWithContext { case (ctx, iter) =>
> if (ctx.partitionId == 0) { // If this is the one task that should run
> really slow
> if (ctx.attemptId == 0) { // If this is the first attempt, run slow
> Thread.sleep(20 * 1000)
> }
> }
> iter
> }.map(x => (x, x)).saveAsTextFile("/test4")
> {code}
> When I run this, I end up with a job that completes quickly (due to
> speculation) but reports failures from the speculated task:
> {code}
> [...]
> 14/12/11 01:41:13 INFO scheduler.TaskSetManager: Finished task 37.1 in stage
> 3.0 (TID 411) in 131 ms on ip-172-31-8-164.us-west-2.compute.internal
> (100/100)
> 14/12/11 01:41:13 INFO scheduler.DAGScheduler: Stage 3 (saveAsTextFile at
> <console>:22) finished in 0.856 s
> 14/12/11 01:41:13 INFO spark.SparkContext: Job finished: saveAsTextFile at
> <console>:22, took 0.885438374 s
> 14/12/11 01:41:13 INFO scheduler.TaskSetManager: Ignoring task-finished event
> for 70.1 in stage 3.0 because task 70 has already completed successfully
> scala> 14/12/11 01:41:13 WARN scheduler.TaskSetManager: Lost task 49.1 in
> stage 3.0 (TID 413, ip-172-31-8-164.us-west-2.compute.internal):
> java.io.IOException: Failed to save output of task:
> attempt_201412110141_0003_m_000049_413
>
> org.apache.hadoop.mapred.FileOutputCommitter.moveTaskOutputs(FileOutputCommitter.java:160)
>
> org.apache.hadoop.mapred.FileOutputCommitter.moveTaskOutputs(FileOutputCommitter.java:172)
>
> org.apache.hadoop.mapred.FileOutputCommitter.commitTask(FileOutputCommitter.java:132)
> org.apache.spark.SparkHadoopWriter.commit(SparkHadoopWriter.scala:109)
>
> org.apache.spark.rdd.PairRDDFunctions$$anonfun$13.apply(PairRDDFunctions.scala:991)
>
> org.apache.spark.rdd.PairRDDFunctions$$anonfun$13.apply(PairRDDFunctions.scala:974)
> org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:62)
> org.apache.spark.scheduler.Task.run(Task.scala:54)
> org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:177)
>
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
>
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
> java.lang.Thread.run(Thread.java:745)
> {code}
> One interesting thing to note about this stack trace: if we look at
> {{FileOutputCommitter.java:160}}
> ([link|http://grepcode.com/file/repository.cloudera.com/content/repositories/releases/org.apache.hadoop/hadoop-core/2.5.0-mr1-cdh5.2.0/org/apache/hadoop/mapred/FileOutputCommitter.java#160]),
> this point in the execution seems to correspond to a case where a task
> completes, attempts to commit its output, fails for some reason, then deletes
> the destination file, tries again, and fails:
> {code}
> if (fs.isFile(taskOutput)) {
> 152 Path finalOutputPath = getFinalPath(jobOutputDir, taskOutput,
> 153 getTempTaskOutputPath(context));
> 154 if (!fs.rename(taskOutput, finalOutputPath)) {
> 155 if (!fs.delete(finalOutputPath, true)) {
> 156 throw new IOException("Failed to delete earlier output of task:
> " +
> 157 attemptId);
> 158 }
> 159 if (!fs.rename(taskOutput, finalOutputPath)) {
> 160 throw new IOException("Failed to save output of task: " +
> 161 attemptId);
> 162 }
> 163 }
> {code}
> This could explain why the output file is missing: the second copy of the
> task keeps running after the job completes and deletes the output written by
> the other task after failing to commit its own copy of the output.
> There are still a few open questions about how exactly we get into this
> scenario:
> *Why is the second copy of the task allowed to commit its output after the
> other task / the job has successfully completed?*
> To check whether a task's temporary output should be committed,
> SparkHadoopWriter calls {{FileOutputCommitter.needsTaskCommit()}}, which
> returns {{true}} if the tasks's temporary output exists
> ([link|http://grepcode.com/file/repository.cloudera.com/content/repositories/releases/org.apache.hadoop/hadoop-core/2.5.0-mr1-cdh5.2.0/org/apache/hadoop/mapred/FileOutputCommitter.java#206]).
> Tihs does not seem to check whether the destination already exists. This
> means that {{needsTaskCommit}} can return {{true}} for speculative tasks.
> *Why does the rename fail?*
> I think that what's happening is that the temporary task output files are
> being deleted once the job has completed, which is causing the {{rename}} to
> fail because {{FileOutputCommitter.commitTask}} doesn't seem to guard against
> missing output files.
> I'm not sure about this, though, since the stack trace seems to imply that
> the temporary output file existed. Maybe the filesystem methods are
> returning stale metadata? Maybe there's a race? I think a race condition
> seems pretty unlikely, since the time-scale at which it would have to happen
> doesn't sync up with the scale of the timestamps that I saw in the user
> report.
> h3. Possible Fixes:
> The root problem here might be that speculative copies of tasks are somehow
> allowed to commit their output. We might be able to fix this by centralizing
> the "should this task commit its output" decision at the driver.
> (I have more concrete suggestions of how to do this; to be posted soon)
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]