Repository: spark Updated Branches: refs/heads/master 6181577e9 -> 4e6a7a0b3
[SPARK-4166][Core][WebUI] Display the executor ID in the Web UI when ExecutorLostFailure happens Now when ExecutorLostFailure happens, it only displays `ExecutorLostFailure (executor lost)`. Adding the executor id will help locate the faulted executor. Author: zsxwing <[email protected]> Closes #3033 from zsxwing/SPARK-4166 and squashes the following commits: ff4664c [zsxwing] Backward-compatible support c5c4cf2 [zsxwing] Display the executor ID in the Web UI when ExecutorLostFailure happens Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/4e6a7a0b Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/4e6a7a0b Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/4e6a7a0b Branch: refs/heads/master Commit: 4e6a7a0b3e55098374a22f3ae9500404f7e4e91a Parents: 6181577 Author: zsxwing <[email protected]> Authored: Sun Nov 2 10:44:52 2014 -0800 Committer: Josh Rosen <[email protected]> Committed: Sun Nov 2 10:44:52 2014 -0800 ---------------------------------------------------------------------- core/src/main/scala/org/apache/spark/TaskEndReason.scala | 4 ++-- .../scala/org/apache/spark/scheduler/TaskSetManager.scala | 2 +- core/src/main/scala/org/apache/spark/util/JsonProtocol.scala | 8 ++++++-- .../org/apache/spark/ui/jobs/JobProgressListenerSuite.scala | 2 +- .../test/scala/org/apache/spark/util/JsonProtocolSuite.scala | 5 +++-- 5 files changed, 13 insertions(+), 8 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/spark/blob/4e6a7a0b/core/src/main/scala/org/apache/spark/TaskEndReason.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/spark/TaskEndReason.scala b/core/src/main/scala/org/apache/spark/TaskEndReason.scala index 8f0c5e7..202fba6 100644 --- a/core/src/main/scala/org/apache/spark/TaskEndReason.scala +++ b/core/src/main/scala/org/apache/spark/TaskEndReason.scala @@ -117,8 +117,8 @@ case object TaskKilled extends TaskFailedReason { * the task crashed the JVM. */ @DeveloperApi -case object ExecutorLostFailure extends TaskFailedReason { - override def toErrorString: String = "ExecutorLostFailure (executor lost)" +case class ExecutorLostFailure(execId: String) extends TaskFailedReason { + override def toErrorString: String = s"ExecutorLostFailure (executor ${execId} lost)" } /** http://git-wip-us.apache.org/repos/asf/spark/blob/4e6a7a0b/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala index a976734..d8fb640 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala @@ -732,7 +732,7 @@ private[spark] class TaskSetManager( } // Also re-enqueue any tasks that were running on the node for ((tid, info) <- taskInfos if info.running && info.executorId == execId) { - handleFailedTask(tid, TaskState.FAILED, ExecutorLostFailure) + handleFailedTask(tid, TaskState.FAILED, ExecutorLostFailure(execId)) } // recalculate valid locality levels and waits when executor is lost recomputeLocality() http://git-wip-us.apache.org/repos/asf/spark/blob/4e6a7a0b/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala b/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala index 5b2e7d3..43c7fba 100644 --- a/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala +++ b/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala @@ -272,7 +272,7 @@ private[spark] object JsonProtocol { def taskEndReasonToJson(taskEndReason: TaskEndReason): JValue = { val reason = Utils.getFormattedClassName(taskEndReason) - val json = taskEndReason match { + val json: JObject = taskEndReason match { case fetchFailed: FetchFailed => val blockManagerAddress = Option(fetchFailed.bmAddress). map(blockManagerIdToJson).getOrElse(JNothing) @@ -287,6 +287,8 @@ private[spark] object JsonProtocol { ("Description" -> exceptionFailure.description) ~ ("Stack Trace" -> stackTrace) ~ ("Metrics" -> metrics) + case ExecutorLostFailure(executorId) => + ("Executor ID" -> executorId) case _ => Utils.emptyJson } ("Reason" -> reason) ~ json @@ -636,7 +638,9 @@ private[spark] object JsonProtocol { new ExceptionFailure(className, description, stackTrace, metrics) case `taskResultLost` => TaskResultLost case `taskKilled` => TaskKilled - case `executorLostFailure` => ExecutorLostFailure + case `executorLostFailure` => + val executorId = Utils.jsonOption(json \ "Executor ID").map(_.extract[String]) + ExecutorLostFailure(executorId.getOrElse("Unknown")) case `unknownReason` => UnknownReason } } http://git-wip-us.apache.org/repos/asf/spark/blob/4e6a7a0b/core/src/test/scala/org/apache/spark/ui/jobs/JobProgressListenerSuite.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/org/apache/spark/ui/jobs/JobProgressListenerSuite.scala b/core/src/test/scala/org/apache/spark/ui/jobs/JobProgressListenerSuite.scala index 3370dd4..6567c5a 100644 --- a/core/src/test/scala/org/apache/spark/ui/jobs/JobProgressListenerSuite.scala +++ b/core/src/test/scala/org/apache/spark/ui/jobs/JobProgressListenerSuite.scala @@ -119,7 +119,7 @@ class JobProgressListenerSuite extends FunSuite with LocalSparkContext with Matc new ExceptionFailure("Exception", "description", null, None), TaskResultLost, TaskKilled, - ExecutorLostFailure, + ExecutorLostFailure("0"), UnknownReason) var failCount = 0 for (reason <- taskFailedReasons) { http://git-wip-us.apache.org/repos/asf/spark/blob/4e6a7a0b/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala b/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala index f1f88c5..d235d7a 100644 --- a/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala +++ b/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala @@ -115,7 +115,7 @@ class JsonProtocolSuite extends FunSuite { testTaskEndReason(exceptionFailure) testTaskEndReason(TaskResultLost) testTaskEndReason(TaskKilled) - testTaskEndReason(ExecutorLostFailure) + testTaskEndReason(ExecutorLostFailure("100")) testTaskEndReason(UnknownReason) // BlockId @@ -403,7 +403,8 @@ class JsonProtocolSuite extends FunSuite { assertOptionEquals(r1.metrics, r2.metrics, assertTaskMetricsEquals) case (TaskResultLost, TaskResultLost) => case (TaskKilled, TaskKilled) => - case (ExecutorLostFailure, ExecutorLostFailure) => + case (ExecutorLostFailure(execId1), ExecutorLostFailure(execId2)) => + assert(execId1 === execId2) case (UnknownReason, UnknownReason) => case _ => fail("Task end reasons don't match in types!") } --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
