Updated Branches:
  refs/heads/master 84595ea3e -> c06a307ca

Fail rather than hanging if a task crashes the JVM.

Prior to this commit, if a task crashes the JVM, the task (and
all other tasks running on that executor) is marked at KILLED rather
than FAILED.  As a result, the TaskSetManager will retry the task
indefiniteily rather than failing the job after maxFailures. This
commit fixes that problem by marking tasks as FAILED rather than
killed when an executor is lost.

The downside of this commit is that if task A fails because another
task running on the same executor caused the VM to crash, the failure
will incorrectly be counted as a failure of task A. This should not
be an issue because we typically set maxFailures to 3, and it is
unlikely that a task will be co-located with a JVM-crashing task
multiple times.


Project: http://git-wip-us.apache.org/repos/asf/incubator-spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-spark/commit/a268d634
Tree: http://git-wip-us.apache.org/repos/asf/incubator-spark/tree/a268d634
Diff: http://git-wip-us.apache.org/repos/asf/incubator-spark/diff/a268d634

Branch: refs/heads/master
Commit: a268d634113536f7aca11af23619b9713b5ef5de
Parents: 5fecd25
Author: Kay Ousterhout <kayousterh...@gmail.com>
Authored: Wed Jan 15 16:03:40 2014 -0800
Committer: Kay Ousterhout <kayousterh...@gmail.com>
Committed: Wed Jan 15 16:03:40 2014 -0800

----------------------------------------------------------------------
 .../org/apache/spark/scheduler/TaskSetManager.scala  |  2 +-
 .../scala/org/apache/spark/DistributedSuite.scala    | 15 +++++++++++++++
 2 files changed, 16 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/a268d634/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala
----------------------------------------------------------------------
diff --git 
a/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala 
b/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala
index fc0ee07..5ad00a1 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala
@@ -629,7 +629,7 @@ private[spark] class TaskSetManager(
     }
     // Also re-enqueue any tasks that were running on the node
     for ((tid, info) <- taskInfos if info.running && info.executorId == 
execId) {
-      handleFailedTask(tid, TaskState.KILLED, None)
+      handleFailedTask(tid, TaskState.FAILED, None)
     }
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/a268d634/core/src/test/scala/org/apache/spark/DistributedSuite.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/spark/DistributedSuite.scala 
b/core/src/test/scala/org/apache/spark/DistributedSuite.scala
index d9cb7fe..27c4b01 100644
--- a/core/src/test/scala/org/apache/spark/DistributedSuite.scala
+++ b/core/src/test/scala/org/apache/spark/DistributedSuite.scala
@@ -125,6 +125,21 @@ class DistributedSuite extends FunSuite with 
ShouldMatchers with BeforeAndAfter
     assert(thrown.getMessage.contains("failed 4 times"))
   }
 
+  test("repeatedly failing task that crashes JVM") {
+    // Ensures that if a task fails in a way that crashes the JVM, the job 
eventually fails rather
+    // than hanging.
+    sc = new SparkContext(clusterUrl, "test")
+    failAfter(Span(100000, Millis)) {
+      val thrown = intercept[SparkException] {
+        // One of the tasks always fails.
+        sc.parallelize(1 to 10, 2).foreach { x => if (x == 1) System.exit(42) }
+      }
+      assert(thrown.getClass === classOf[SparkException])
+      System.out.println(thrown.getMessage)
+      assert(thrown.getMessage.contains("failed 4 times"))
+    }
+  }
+
   test("caching") {
     sc = new SparkContext(clusterUrl, "test")
     val data = sc.parallelize(1 to 1000, 10).cache()

Reply via email to