Updated Branches:
  refs/heads/master ed25105fd -> dfd40e9f6

Don't retry tasks when they fail due to a NotSerializableException


Project: http://git-wip-us.apache.org/repos/asf/incubator-spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-spark/commit/29c88e40
Tree: http://git-wip-us.apache.org/repos/asf/incubator-spark/tree/29c88e40
Diff: http://git-wip-us.apache.org/repos/asf/incubator-spark/diff/29c88e40

Branch: refs/heads/master
Commit: 29c88e408ecc3416104530756561fee482393913
Parents: 1a4cfbe
Author: Kay Ousterhout <[email protected]>
Authored: Thu Nov 14 15:15:19 2013 -0800
Committer: Kay Ousterhout <[email protected]>
Committed: Thu Nov 14 15:15:19 2013 -0800

----------------------------------------------------------------------
 .../spark/scheduler/cluster/ClusterTaskSetManager.scala     | 9 +++++++++
 1 file changed, 9 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/29c88e40/core/src/main/scala/org/apache/spark/scheduler/cluster/ClusterTaskSetManager.scala
----------------------------------------------------------------------
diff --git 
a/core/src/main/scala/org/apache/spark/scheduler/cluster/ClusterTaskSetManager.scala
 
b/core/src/main/scala/org/apache/spark/scheduler/cluster/ClusterTaskSetManager.scala
index ee47aaf..4c5eca8 100644
--- 
a/core/src/main/scala/org/apache/spark/scheduler/cluster/ClusterTaskSetManager.scala
+++ 
b/core/src/main/scala/org/apache/spark/scheduler/cluster/ClusterTaskSetManager.scala
@@ -17,6 +17,7 @@
 
 package org.apache.spark.scheduler.cluster
 
+import java.io.NotSerializableException
 import java.util.Arrays
 
 import scala.collection.mutable.ArrayBuffer
@@ -484,6 +485,14 @@ private[spark] class ClusterTaskSetManager(
 
         case ef: ExceptionFailure =>
           sched.dagScheduler.taskEnded(tasks(index), ef, null, null, info, 
ef.metrics.getOrElse(null))
+          if (ef.className == classOf[NotSerializableException].getName()) {
+            // If the task result wasn't serializable, there's no point in 
trying to re-execute it.
+            logError("Task %s:%s had a not serializable result: %s; not 
retrying".format(
+              taskSet.id, index, ef.description))
+            abort("Task %s:%s had a not serializable result: %s".format(
+              taskSet.id, index, ef.description))
+            return
+          }
           val key = ef.description
           val now = clock.getTime()
           val (printFull, dupCount) = {

Reply via email to