Updated Branches: refs/heads/master ed25105fd -> dfd40e9f6
Don't retry tasks when they fail due to a NotSerializableException Project: http://git-wip-us.apache.org/repos/asf/incubator-spark/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-spark/commit/29c88e40 Tree: http://git-wip-us.apache.org/repos/asf/incubator-spark/tree/29c88e40 Diff: http://git-wip-us.apache.org/repos/asf/incubator-spark/diff/29c88e40 Branch: refs/heads/master Commit: 29c88e408ecc3416104530756561fee482393913 Parents: 1a4cfbe Author: Kay Ousterhout <[email protected]> Authored: Thu Nov 14 15:15:19 2013 -0800 Committer: Kay Ousterhout <[email protected]> Committed: Thu Nov 14 15:15:19 2013 -0800 ---------------------------------------------------------------------- .../spark/scheduler/cluster/ClusterTaskSetManager.scala | 9 +++++++++ 1 file changed, 9 insertions(+) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/29c88e40/core/src/main/scala/org/apache/spark/scheduler/cluster/ClusterTaskSetManager.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/ClusterTaskSetManager.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/ClusterTaskSetManager.scala index ee47aaf..4c5eca8 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/cluster/ClusterTaskSetManager.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/ClusterTaskSetManager.scala @@ -17,6 +17,7 @@ package org.apache.spark.scheduler.cluster +import java.io.NotSerializableException import java.util.Arrays import scala.collection.mutable.ArrayBuffer @@ -484,6 +485,14 @@ private[spark] class ClusterTaskSetManager( case ef: ExceptionFailure => sched.dagScheduler.taskEnded(tasks(index), ef, null, null, info, ef.metrics.getOrElse(null)) + if (ef.className == classOf[NotSerializableException].getName()) { + // If the task result wasn't serializable, there's no point in trying to re-execute it. + logError("Task %s:%s had a not serializable result: %s; not retrying".format( + taskSet.id, index, ef.description)) + abort("Task %s:%s had a not serializable result: %s".format( + taskSet.id, index, ef.description)) + return + } val key = ef.description val now = clock.getTime() val (printFull, dupCount) = {
