diff --git a/core/src/main/scala/org/apache/spark/executor/Executor.scala 
b/core/src/main/scala/org/apache/spark/executor/Executor.scala
index a30a501e5d4a1..197f2d34efbc1 100644
--- a/core/src/main/scala/org/apache/spark/executor/Executor.scala
+++ b/core/src/main/scala/org/apache/spark/executor/Executor.scala
@@ -574,7 +574,14 @@ private[spark] class Executor(
           execBackend.statusUpdate(taskId, TaskState.KILLED, serializedTK)
 
         case t: Throwable if hasFetchFailure && !Utils.isFatalError(t) =>
-          val reason = task.context.fetchFailed.get.toTaskFailedReason
+          val (state, reason) = if (task.reasonIfKilled.isDefined) {
+            val killReason = task.reasonIfKilled.getOrElse("unknown reason")
+            val (accums, accUpdates) = 
collectAccumulatorsAndResetStatusOnFailure(taskStartTime)
+            val serializedTK = ser.serialize(TaskKilled(killReason, 
accUpdates, accums))
+            (TaskState.KILLED, serializedTK)
+          } else {
+            (TaskState.FAILED, task.context.fetchFailed.get.toTaskFailedReason)
+          }
           if (!t.isInstanceOf[FetchFailedException]) {
             // there was a fetch failure in the task, but some user code 
wrapped that exception
             // and threw something else.  Regardless, we treat it as a fetch 
failure.
@@ -585,7 +592,7 @@ private[spark] class Executor(
               s"other exception: $t")
           }
           setTaskFinishedAndClearInterruptStatus()
-          execBackend.statusUpdate(taskId, TaskState.FAILED, 
ser.serialize(reason))
+          execBackend.statusUpdate(taskId, state, ser.serialize(reason))
 
         case CausedBy(cDE: CommitDeniedException) =>
           val reason = cDE.toTaskCommitDeniedReason


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to