Updated Branches: refs/heads/master 84595ea3e -> c06a307ca
Fail rather than hanging if a task crashes the JVM. Prior to this commit, if a task crashes the JVM, the task (and all other tasks running on that executor) is marked at KILLED rather than FAILED. As a result, the TaskSetManager will retry the task indefiniteily rather than failing the job after maxFailures. This commit fixes that problem by marking tasks as FAILED rather than killed when an executor is lost. The downside of this commit is that if task A fails because another task running on the same executor caused the VM to crash, the failure will incorrectly be counted as a failure of task A. This should not be an issue because we typically set maxFailures to 3, and it is unlikely that a task will be co-located with a JVM-crashing task multiple times. Project: http://git-wip-us.apache.org/repos/asf/incubator-spark/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-spark/commit/a268d634 Tree: http://git-wip-us.apache.org/repos/asf/incubator-spark/tree/a268d634 Diff: http://git-wip-us.apache.org/repos/asf/incubator-spark/diff/a268d634 Branch: refs/heads/master Commit: a268d634113536f7aca11af23619b9713b5ef5de Parents: 5fecd25 Author: Kay Ousterhout <kayousterh...@gmail.com> Authored: Wed Jan 15 16:03:40 2014 -0800 Committer: Kay Ousterhout <kayousterh...@gmail.com> Committed: Wed Jan 15 16:03:40 2014 -0800 ---------------------------------------------------------------------- .../org/apache/spark/scheduler/TaskSetManager.scala | 2 +- .../scala/org/apache/spark/DistributedSuite.scala | 15 +++++++++++++++ 2 files changed, 16 insertions(+), 1 deletion(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/a268d634/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala index fc0ee07..5ad00a1 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala @@ -629,7 +629,7 @@ private[spark] class TaskSetManager( } // Also re-enqueue any tasks that were running on the node for ((tid, info) <- taskInfos if info.running && info.executorId == execId) { - handleFailedTask(tid, TaskState.KILLED, None) + handleFailedTask(tid, TaskState.FAILED, None) } } http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/a268d634/core/src/test/scala/org/apache/spark/DistributedSuite.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/org/apache/spark/DistributedSuite.scala b/core/src/test/scala/org/apache/spark/DistributedSuite.scala index d9cb7fe..27c4b01 100644 --- a/core/src/test/scala/org/apache/spark/DistributedSuite.scala +++ b/core/src/test/scala/org/apache/spark/DistributedSuite.scala @@ -125,6 +125,21 @@ class DistributedSuite extends FunSuite with ShouldMatchers with BeforeAndAfter assert(thrown.getMessage.contains("failed 4 times")) } + test("repeatedly failing task that crashes JVM") { + // Ensures that if a task fails in a way that crashes the JVM, the job eventually fails rather + // than hanging. + sc = new SparkContext(clusterUrl, "test") + failAfter(Span(100000, Millis)) { + val thrown = intercept[SparkException] { + // One of the tasks always fails. + sc.parallelize(1 to 10, 2).foreach { x => if (x == 1) System.exit(42) } + } + assert(thrown.getClass === classOf[SparkException]) + System.out.println(thrown.getMessage) + assert(thrown.getMessage.contains("failed 4 times")) + } + } + test("caching") { sc = new SparkContext(clusterUrl, "test") val data = sc.parallelize(1 to 1000, 10).cache()