Repository: spark Updated Branches: refs/heads/master acef51def -> c1b74df60
[SPARK-5771] Master UI inconsistently displays application cores If the user calls `sc.stop()`, then the number of cores under "Completed Applications" will be 0. If the user does not call `sc.stop()`, then the number of cores will be however many cores were being used before the application exited. This PR makes both cases have the behavior of the latter. Note that there have been a series of PR that attempted to fix this. For the full discussion, please refer to #4841. The unregister event is necessary because of a subtle race condition explained in that PR. Tested this locally with and without calling `sc.stop()`. Author: Andrew Or <[email protected]> Closes #5177 from andrewor14/master-ui-cores and squashes the following commits: 62449d1 [Andrew Or] Freeze application state before finishing it Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/c1b74df6 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/c1b74df6 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/c1b74df6 Branch: refs/heads/master Commit: c1b74df6042b33b2b061cb07c2fbd82dba9074bb Parents: acef51d Author: Andrew Or <[email protected]> Authored: Wed Mar 25 13:28:32 2015 -0700 Committer: Andrew Or <[email protected]> Committed: Wed Mar 25 13:28:32 2015 -0700 ---------------------------------------------------------------------- .../scala/org/apache/spark/deploy/DeployMessage.scala | 2 ++ .../scala/org/apache/spark/deploy/client/AppClient.scala | 1 + .../org/apache/spark/deploy/master/ApplicationInfo.scala | 4 ++++ .../scala/org/apache/spark/deploy/master/Master.scala | 10 +++++++++- 4 files changed, 16 insertions(+), 1 deletion(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/spark/blob/c1b74df6/core/src/main/scala/org/apache/spark/deploy/DeployMessage.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/spark/deploy/DeployMessage.scala b/core/src/main/scala/org/apache/spark/deploy/DeployMessage.scala index 0997507..9db6fd1 100644 --- a/core/src/main/scala/org/apache/spark/deploy/DeployMessage.scala +++ b/core/src/main/scala/org/apache/spark/deploy/DeployMessage.scala @@ -101,6 +101,8 @@ private[deploy] object DeployMessages { case class RegisterApplication(appDescription: ApplicationDescription) extends DeployMessage + case class UnregisterApplication(appId: String) + case class MasterChangeAcknowledged(appId: String) // Master to AppClient http://git-wip-us.apache.org/repos/asf/spark/blob/c1b74df6/core/src/main/scala/org/apache/spark/deploy/client/AppClient.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/spark/deploy/client/AppClient.scala b/core/src/main/scala/org/apache/spark/deploy/client/AppClient.scala index 3b72972..4f06d7f 100644 --- a/core/src/main/scala/org/apache/spark/deploy/client/AppClient.scala +++ b/core/src/main/scala/org/apache/spark/deploy/client/AppClient.scala @@ -157,6 +157,7 @@ private[spark] class AppClient( case StopAppClient => markDead("Application has been stopped.") + master ! UnregisterApplication(appId) sender ! true context.stop(self) } http://git-wip-us.apache.org/repos/asf/spark/blob/c1b74df6/core/src/main/scala/org/apache/spark/deploy/master/ApplicationInfo.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/spark/deploy/master/ApplicationInfo.scala b/core/src/main/scala/org/apache/spark/deploy/master/ApplicationInfo.scala index f979ffa..bc5b293 100644 --- a/core/src/main/scala/org/apache/spark/deploy/master/ApplicationInfo.scala +++ b/core/src/main/scala/org/apache/spark/deploy/master/ApplicationInfo.scala @@ -111,6 +111,10 @@ private[deploy] class ApplicationInfo( endTime = System.currentTimeMillis() } + private[master] def isFinished: Boolean = { + state != ApplicationState.WAITING && state != ApplicationState.RUNNING + } + def duration: Long = { if (endTime != -1) { endTime - startTime http://git-wip-us.apache.org/repos/asf/spark/blob/c1b74df6/core/src/main/scala/org/apache/spark/deploy/master/Master.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/spark/deploy/master/Master.scala b/core/src/main/scala/org/apache/spark/deploy/master/Master.scala index 8050662..9a5d587 100644 --- a/core/src/main/scala/org/apache/spark/deploy/master/Master.scala +++ b/core/src/main/scala/org/apache/spark/deploy/master/Master.scala @@ -339,7 +339,11 @@ private[master] class Master( if (ExecutorState.isFinished(state)) { // Remove this executor from the worker and app logInfo(s"Removing executor ${exec.fullId} because it is $state") - appInfo.removeExecutor(exec) + // If an application has already finished, preserve its + // state to display its information properly on the UI + if (!appInfo.isFinished) { + appInfo.removeExecutor(exec) + } exec.worker.removeExecutor(exec) val normalExit = exitStatus == Some(0) @@ -428,6 +432,10 @@ private[master] class Master( if (canCompleteRecovery) { completeRecovery() } } + case UnregisterApplication(applicationId) => + logInfo(s"Received unregister request from application $applicationId") + idToApp.get(applicationId).foreach(finishApplication) + case DisassociatedEvent(_, address, _) => { // The disconnected client could've been either a worker or an app; remove whichever it was logInfo(s"$address got disassociated, removing it.") --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
