Repository: spark Updated Branches: refs/heads/master 2cd1bfa4f -> 9fcf1c51d
[SPARK-17623][CORE] Clarify type of TaskEndReason with a failed task. ## What changes were proposed in this pull request? In TaskResultGetter, enqueueFailedTask currently deserializes the result as a TaskEndReason. But the type is actually more specific, its a TaskFailedReason. This just leads to more blind casting later on â it would be more clear if the msg was cast to the right type immediately, so method parameter types could be tightened. ## How was this patch tested? Existing unit tests via jenkins. Note that the code was already performing a blind-cast to a TaskFailedReason before in any case, just in a different spot, so there shouldn't be any behavior change. Author: Imran Rashid <iras...@cloudera.com> Closes #15181 from squito/SPARK-17623. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/9fcf1c51 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/9fcf1c51 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/9fcf1c51 Branch: refs/heads/master Commit: 9fcf1c51d518847eda7f5ea71337cfa7def3c45c Parents: 2cd1bfa Author: Imran Rashid <iras...@cloudera.com> Authored: Wed Sep 21 17:49:36 2016 -0400 Committer: Andrew Or <andrewo...@gmail.com> Committed: Wed Sep 21 17:49:36 2016 -0400 ---------------------------------------------------------------------- .../apache/spark/executor/CommitDeniedException.scala | 4 ++-- .../main/scala/org/apache/spark/executor/Executor.scala | 4 ++-- .../org/apache/spark/scheduler/TaskResultGetter.scala | 4 ++-- .../org/apache/spark/scheduler/TaskSchedulerImpl.scala | 2 +- .../org/apache/spark/scheduler/TaskSetManager.scala | 12 +++--------- .../org/apache/spark/shuffle/FetchFailedException.scala | 4 ++-- .../scala/org/apache/spark/util/JsonProtocolSuite.scala | 2 +- 7 files changed, 13 insertions(+), 19 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/spark/blob/9fcf1c51/core/src/main/scala/org/apache/spark/executor/CommitDeniedException.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/spark/executor/CommitDeniedException.scala b/core/src/main/scala/org/apache/spark/executor/CommitDeniedException.scala index 7d84889..326e042 100644 --- a/core/src/main/scala/org/apache/spark/executor/CommitDeniedException.scala +++ b/core/src/main/scala/org/apache/spark/executor/CommitDeniedException.scala @@ -17,7 +17,7 @@ package org.apache.spark.executor -import org.apache.spark.{TaskCommitDenied, TaskEndReason} +import org.apache.spark.{TaskCommitDenied, TaskFailedReason} /** * Exception thrown when a task attempts to commit output to HDFS but is denied by the driver. @@ -29,5 +29,5 @@ private[spark] class CommitDeniedException( attemptNumber: Int) extends Exception(msg) { - def toTaskEndReason: TaskEndReason = TaskCommitDenied(jobID, splitID, attemptNumber) + def toTaskFailedReason: TaskFailedReason = TaskCommitDenied(jobID, splitID, attemptNumber) } http://git-wip-us.apache.org/repos/asf/spark/blob/9fcf1c51/core/src/main/scala/org/apache/spark/executor/Executor.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/spark/executor/Executor.scala b/core/src/main/scala/org/apache/spark/executor/Executor.scala index fbf2b86..668ec41 100644 --- a/core/src/main/scala/org/apache/spark/executor/Executor.scala +++ b/core/src/main/scala/org/apache/spark/executor/Executor.scala @@ -355,7 +355,7 @@ private[spark] class Executor( } catch { case ffe: FetchFailedException => - val reason = ffe.toTaskEndReason + val reason = ffe.toTaskFailedReason setTaskFinishedAndClearInterruptStatus() execBackend.statusUpdate(taskId, TaskState.FAILED, ser.serialize(reason)) @@ -370,7 +370,7 @@ private[spark] class Executor( execBackend.statusUpdate(taskId, TaskState.KILLED, ser.serialize(TaskKilled)) case CausedBy(cDE: CommitDeniedException) => - val reason = cDE.toTaskEndReason + val reason = cDE.toTaskFailedReason setTaskFinishedAndClearInterruptStatus() execBackend.statusUpdate(taskId, TaskState.FAILED, ser.serialize(reason)) http://git-wip-us.apache.org/repos/asf/spark/blob/9fcf1c51/core/src/main/scala/org/apache/spark/scheduler/TaskResultGetter.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskResultGetter.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskResultGetter.scala index 685ef55..1c3fcbd 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/TaskResultGetter.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/TaskResultGetter.scala @@ -118,14 +118,14 @@ private[spark] class TaskResultGetter(sparkEnv: SparkEnv, scheduler: TaskSchedul def enqueueFailedTask(taskSetManager: TaskSetManager, tid: Long, taskState: TaskState, serializedData: ByteBuffer) { - var reason : TaskEndReason = UnknownReason + var reason : TaskFailedReason = UnknownReason try { getTaskResultExecutor.execute(new Runnable { override def run(): Unit = Utils.logUncaughtExceptions { val loader = Utils.getContextOrSparkClassLoader try { if (serializedData != null && serializedData.limit() > 0) { - reason = serializer.get().deserialize[TaskEndReason]( + reason = serializer.get().deserialize[TaskFailedReason]( serializedData, loader) } } catch { http://git-wip-us.apache.org/repos/asf/spark/blob/9fcf1c51/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala index ee5cbfe..52a7186 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala @@ -431,7 +431,7 @@ private[spark] class TaskSchedulerImpl( taskSetManager: TaskSetManager, tid: Long, taskState: TaskState, - reason: TaskEndReason): Unit = synchronized { + reason: TaskFailedReason): Unit = synchronized { taskSetManager.handleFailedTask(tid, taskState, reason) if (!taskSetManager.isZombie && taskState != TaskState.KILLED) { // Need to revive offers again now that the task set manager state has been updated to http://git-wip-us.apache.org/repos/asf/spark/blob/9fcf1c51/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala index 2fef447..226bed2 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala @@ -696,7 +696,7 @@ private[spark] class TaskSetManager( * Marks the task as failed, re-adds it to the list of pending tasks, and notifies the * DAG Scheduler. */ - def handleFailedTask(tid: Long, state: TaskState, reason: TaskEndReason) { + def handleFailedTask(tid: Long, state: TaskState, reason: TaskFailedReason) { val info = taskInfos(tid) if (info.failed || info.killed) { return @@ -707,7 +707,7 @@ private[spark] class TaskSetManager( copiesRunning(index) -= 1 var accumUpdates: Seq[AccumulatorV2[_, _]] = Seq.empty val failureReason = s"Lost task ${info.id} in stage ${taskSet.id} (TID $tid, ${info.host}): " + - reason.asInstanceOf[TaskFailedReason].toErrorString + reason.toErrorString val failureException: Option[Throwable] = reason match { case fetchFailed: FetchFailed => logWarning(failureReason) @@ -765,10 +765,6 @@ private[spark] class TaskSetManager( case e: TaskFailedReason => // TaskResultLost, TaskKilled, and others logWarning(failureReason) None - - case e: TaskEndReason => - logError("Unknown TaskEndReason: " + e) - None } // always add to failed executors failedExecutors.getOrElseUpdate(index, new HashMap[String, Long]()). @@ -784,9 +780,7 @@ private[spark] class TaskSetManager( addPendingTask(index) } - if (!isZombie && state != TaskState.KILLED - && reason.isInstanceOf[TaskFailedReason] - && reason.asInstanceOf[TaskFailedReason].countTowardsTaskFailures) { + if (!isZombie && state != TaskState.KILLED && reason.countTowardsTaskFailures) { assert (null != failureReason) numFailures(index) += 1 if (numFailures(index) >= maxTaskFailures) { http://git-wip-us.apache.org/repos/asf/spark/blob/9fcf1c51/core/src/main/scala/org/apache/spark/shuffle/FetchFailedException.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/spark/shuffle/FetchFailedException.scala b/core/src/main/scala/org/apache/spark/shuffle/FetchFailedException.scala index b2d050b..498c12e 100644 --- a/core/src/main/scala/org/apache/spark/shuffle/FetchFailedException.scala +++ b/core/src/main/scala/org/apache/spark/shuffle/FetchFailedException.scala @@ -17,7 +17,7 @@ package org.apache.spark.shuffle -import org.apache.spark.{FetchFailed, TaskEndReason} +import org.apache.spark.{FetchFailed, TaskFailedReason} import org.apache.spark.storage.BlockManagerId import org.apache.spark.util.Utils @@ -45,7 +45,7 @@ private[spark] class FetchFailedException( this(bmAddress, shuffleId, mapId, reduceId, cause.getMessage, cause) } - def toTaskEndReason: TaskEndReason = FetchFailed(bmAddress, shuffleId, mapId, reduceId, + def toTaskFailedReason: TaskFailedReason = FetchFailed(bmAddress, shuffleId, mapId, reduceId, Utils.exceptionString(this)) } http://git-wip-us.apache.org/repos/asf/spark/blob/9fcf1c51/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala b/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala index c89be22..00314ab 100644 --- a/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala +++ b/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala @@ -146,7 +146,7 @@ class JsonProtocolSuite extends SparkFunSuite { val fetchFailed = FetchFailed(BlockManagerId("With or", "without you", 15), 17, 18, 19, "Some exception") val fetchMetadataFailed = new MetadataFetchFailedException(17, - 19, "metadata Fetch failed exception").toTaskEndReason + 19, "metadata Fetch failed exception").toTaskFailedReason val exceptionFailure = new ExceptionFailure(exception, Seq.empty[AccumulableInfo]) testTaskEndReason(Success) testTaskEndReason(Resubmitted) --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org