diff --git a/core/src/main/scala/org/apache/spark/executor/Executor.scala
b/core/src/main/scala/org/apache/spark/executor/Executor.scala
index a30a501e5d4a1..f92ea7873f9ce 100644
--- a/core/src/main/scala/org/apache/spark/executor/Executor.scala
+++ b/core/src/main/scala/org/apache/spark/executor/Executor.scala
@@ -606,17 +606,25 @@ private[spark] class Executor(
if (!ShutdownHookManager.inShutdown()) {
val (accums, accUpdates) =
collectAccumulatorsAndResetStatusOnFailure(taskStartTime)
- val serializedTaskEndReason = {
+ val (state, serializedTaskEndReason) = {
try {
- ser.serialize(new ExceptionFailure(t,
accUpdates).withAccums(accums))
+ if (task.reasonIfKilled.isDefined) {
+ val killReason = task.reasonIfKilled.getOrElse("unknown
reason")
+ val serializedTK = ser.serialize(TaskKilled(killReason),
accUpdates, accums)
+ (TaskState.KILLED, serializedTK)
+ } else {
+ (TaskState.FAILED,
+ ser.serialize(new ExceptionFailure(t,
accUpdates).withAccums(accums)))
+ }
} catch {
case _: NotSerializableException =>
// t is not serializable so just send the stacktrace
- ser.serialize(new ExceptionFailure(t, accUpdates,
false).withAccums(accums))
+ (TaskState.FAILED,
+ ser.serialize(new ExceptionFailure(t, accUpdates,
false).withAccums(accums)))
}
}
setTaskFinishedAndClearInterruptStatus()
- execBackend.statusUpdate(taskId, TaskState.FAILED,
serializedTaskEndReason)
+ execBackend.statusUpdate(taskId, state, serializedTaskEndReason)
} else {
logInfo("Not reporting error to driver during JVM shutdown.")
}
With regards,
Apache Git Services
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]