diff --git a/core/src/main/scala/org/apache/spark/executor/Executor.scala b/core/src/main/scala/org/apache/spark/executor/Executor.scala index a30a501e5d4a1..f92ea7873f9ce 100644 --- a/core/src/main/scala/org/apache/spark/executor/Executor.scala +++ b/core/src/main/scala/org/apache/spark/executor/Executor.scala @@ -606,17 +606,25 @@ private[spark] class Executor( if (!ShutdownHookManager.inShutdown()) { val (accums, accUpdates) = collectAccumulatorsAndResetStatusOnFailure(taskStartTime) - val serializedTaskEndReason = { + val (state, serializedTaskEndReason) = { try { - ser.serialize(new ExceptionFailure(t, accUpdates).withAccums(accums)) + if (task.reasonIfKilled.isDefined) { + val killReason = task.reasonIfKilled.getOrElse("unknown reason") + val serializedTK = ser.serialize(TaskKilled(killReason), accUpdates, accums) + (TaskState.KILLED, serializedTK) + } else { + (TaskState.FAILED, + ser.serialize(new ExceptionFailure(t, accUpdates).withAccums(accums))) + } } catch { case _: NotSerializableException => // t is not serializable so just send the stacktrace - ser.serialize(new ExceptionFailure(t, accUpdates, false).withAccums(accums)) + (TaskState.FAILED, + ser.serialize(new ExceptionFailure(t, accUpdates, false).withAccums(accums))) } } setTaskFinishedAndClearInterruptStatus() - execBackend.statusUpdate(taskId, TaskState.FAILED, serializedTaskEndReason) + execBackend.statusUpdate(taskId, state, serializedTaskEndReason) } else { logInfo("Not reporting error to driver during JVM shutdown.") }
With regards, Apache Git Services --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org