Repository: spark
Updated Branches:
  refs/heads/master 42dfab7d3 -> 46e224aaa


SPARK-2150: Provide direct link to finished application UI in yarn resou...

...rce manager UI

Use the event logger directory to provide a direct link to finished
application UI in yarn resourcemanager UI.

Author: Rahul Singhal <rahul.sing...@guavus.com>

Closes #1094 from rahulsinghaliitd/SPARK-2150 and squashes the following 
commits:

95f230c [Rahul Singhal] SPARK-2150: Provide direct link to finished application 
UI in yarn resource manager UI


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/46e224aa
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/46e224aa
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/46e224aa

Branch: refs/heads/master
Commit: 46e224aaa26df4b232c5176e98472a902862b76c
Parents: 42dfab7
Author: Rahul Singhal <rahul.sing...@guavus.com>
Authored: Thu Jul 24 09:31:04 2014 -0500
Committer: Thomas Graves <tgra...@apache.org>
Committed: Thu Jul 24 09:31:04 2014 -0500

----------------------------------------------------------------------
 .../spark/deploy/history/FsHistoryProvider.scala    |  3 ++-
 .../apache/spark/deploy/history/HistoryPage.scala   |  2 +-
 .../apache/spark/deploy/history/HistoryServer.scala |  4 +++-
 .../org/apache/spark/deploy/master/Master.scala     | 11 +++++++----
 .../spark/scheduler/EventLoggingListener.scala      |  7 +++++++
 .../spark/deploy/yarn/ApplicationMaster.scala       |  4 +++-
 .../apache/spark/deploy/yarn/ExecutorLauncher.scala |  2 +-
 .../spark/deploy/yarn/YarnSparkHadoopUtil.scala     | 16 ++++++++++++++++
 .../cluster/YarnClientSchedulerBackend.scala        |  3 ++-
 .../spark/deploy/yarn/ApplicationMaster.scala       |  5 +++--
 .../apache/spark/deploy/yarn/ExecutorLauncher.scala |  2 +-
 11 files changed, 46 insertions(+), 13 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/46e224aa/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala
----------------------------------------------------------------------
diff --git 
a/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala 
b/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala
index a8c9ac0..01e7065 100644
--- 
a/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala
+++ 
b/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala
@@ -169,7 +169,8 @@ private[history] class FsHistoryProvider(conf: SparkConf) 
extends ApplicationHis
     val ui: SparkUI = if (renderUI) {
         val conf = this.conf.clone()
         val appSecManager = new SecurityManager(conf)
-        new SparkUI(conf, appSecManager, replayBus, appId, "/history/" + appId)
+        new SparkUI(conf, appSecManager, replayBus, appId,
+          HistoryServer.UI_PATH_PREFIX + s"/$appId")
         // Do not call ui.bind() to avoid creating a new server for each 
application
       } else {
         null

http://git-wip-us.apache.org/repos/asf/spark/blob/46e224aa/core/src/main/scala/org/apache/spark/deploy/history/HistoryPage.scala
----------------------------------------------------------------------
diff --git 
a/core/src/main/scala/org/apache/spark/deploy/history/HistoryPage.scala 
b/core/src/main/scala/org/apache/spark/deploy/history/HistoryPage.scala
index a958c83..d7a3e3f 100644
--- a/core/src/main/scala/org/apache/spark/deploy/history/HistoryPage.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/history/HistoryPage.scala
@@ -75,7 +75,7 @@ private[spark] class HistoryPage(parent: HistoryServer) 
extends WebUIPage("") {
     "Last Updated")
 
   private def appRow(info: ApplicationHistoryInfo): Seq[Node] = {
-    val uiAddress = "/history/" + info.id
+    val uiAddress = HistoryServer.UI_PATH_PREFIX + s"/${info.id}"
     val startTime = UIUtils.formatDate(info.startTime)
     val endTime = UIUtils.formatDate(info.endTime)
     val duration = UIUtils.formatDuration(info.endTime - info.startTime)

http://git-wip-us.apache.org/repos/asf/spark/blob/46e224aa/core/src/main/scala/org/apache/spark/deploy/history/HistoryServer.scala
----------------------------------------------------------------------
diff --git 
a/core/src/main/scala/org/apache/spark/deploy/history/HistoryServer.scala 
b/core/src/main/scala/org/apache/spark/deploy/history/HistoryServer.scala
index 56b38dd..cacb9da 100644
--- a/core/src/main/scala/org/apache/spark/deploy/history/HistoryServer.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/history/HistoryServer.scala
@@ -114,7 +114,7 @@ class HistoryServer(
     attachHandler(createStaticHandler(SparkUI.STATIC_RESOURCE_DIR, "/static"))
 
     val contextHandler = new ServletContextHandler
-    contextHandler.setContextPath("/history")
+    contextHandler.setContextPath(HistoryServer.UI_PATH_PREFIX)
     contextHandler.addServlet(new ServletHolder(loaderServlet), "/*")
     attachHandler(contextHandler)
   }
@@ -172,6 +172,8 @@ class HistoryServer(
 object HistoryServer extends Logging {
   private val conf = new SparkConf
 
+  val UI_PATH_PREFIX = "/history"
+
   def main(argStrings: Array[String]) {
     SignalLogger.register(log)
     initSecurity()

http://git-wip-us.apache.org/repos/asf/spark/blob/46e224aa/core/src/main/scala/org/apache/spark/deploy/master/Master.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/deploy/master/Master.scala 
b/core/src/main/scala/org/apache/spark/deploy/master/Master.scala
index bb1fcc8..21f8667 100644
--- a/core/src/main/scala/org/apache/spark/deploy/master/Master.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/master/Master.scala
@@ -35,6 +35,7 @@ import akka.serialization.SerializationExtension
 import org.apache.spark.{Logging, SecurityManager, SparkConf, SparkException}
 import org.apache.spark.deploy.{ApplicationDescription, DriverDescription, 
ExecutorState}
 import org.apache.spark.deploy.DeployMessages._
+import org.apache.spark.deploy.history.HistoryServer
 import org.apache.spark.deploy.master.DriverState.DriverState
 import org.apache.spark.deploy.master.MasterMessages._
 import org.apache.spark.deploy.master.ui.MasterWebUI
@@ -664,9 +665,10 @@ private[spark] class Master(
    */
   def rebuildSparkUI(app: ApplicationInfo): Boolean = {
     val appName = app.desc.name
+    val notFoundBasePath = HistoryServer.UI_PATH_PREFIX + "/not-found"
     val eventLogDir = app.desc.eventLogDir.getOrElse {
       // Event logging is not enabled for this application
-      app.desc.appUiUrl = "/history/not-found"
+      app.desc.appUiUrl = notFoundBasePath
       return false
     }
     val fileSystem = Utils.getHadoopFileSystem(eventLogDir)
@@ -681,13 +683,14 @@ private[spark] class Master(
       logWarning(msg)
       msg += " Did you specify the correct logging directory?"
       msg = URLEncoder.encode(msg, "UTF-8")
-      app.desc.appUiUrl = s"/history/not-found?msg=$msg&title=$title"
+      app.desc.appUiUrl = notFoundBasePath + s"?msg=$msg&title=$title"
       return false
     }
 
     try {
       val replayBus = new ReplayListenerBus(eventLogPaths, fileSystem, 
compressionCodec)
-      val ui = new SparkUI(new SparkConf, replayBus, appName + " (completed)", 
"/history/" + app.id)
+      val ui = new SparkUI(new SparkConf, replayBus, appName + " (completed)",
+        HistoryServer.UI_PATH_PREFIX + s"/${app.id}")
       replayBus.replay()
       appIdToUI(app.id) = ui
       webUi.attachSparkUI(ui)
@@ -702,7 +705,7 @@ private[spark] class Master(
         var msg = s"Exception in replaying log for application $appName!"
         logError(msg, e)
         msg = URLEncoder.encode(msg, "UTF-8")
-        app.desc.appUiUrl = 
s"/history/not-found?msg=$msg&exception=$exception&title=$title"
+        app.desc.appUiUrl = notFoundBasePath + 
s"?msg=$msg&exception=$exception&title=$title"
         false
     }
   }

http://git-wip-us.apache.org/repos/asf/spark/blob/46e224aa/core/src/main/scala/org/apache/spark/scheduler/EventLoggingListener.scala
----------------------------------------------------------------------
diff --git 
a/core/src/main/scala/org/apache/spark/scheduler/EventLoggingListener.scala 
b/core/src/main/scala/org/apache/spark/scheduler/EventLoggingListener.scala
index a90b0d4..ae6ca9f 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/EventLoggingListener.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/EventLoggingListener.scala
@@ -64,6 +64,13 @@ private[spark] class EventLoggingListener(
   private[scheduler] val loggedEvents = new ArrayBuffer[JValue]
 
   /**
+   * Return only the unique application directory without the base directory.
+   */
+  def getApplicationLogDir(): String = {
+    name
+  }
+
+  /**
    * Begin logging events.
    * If compression is used, log a file that indicates which compression 
library is used.
    */

http://git-wip-us.apache.org/repos/asf/spark/blob/46e224aa/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala
----------------------------------------------------------------------
diff --git 
a/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala
 
b/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala
index 3ec3648..62b5c3b 100644
--- 
a/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala
+++ 
b/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala
@@ -60,6 +60,7 @@ class ApplicationMaster(args: ApplicationMasterArguments, 
conf: Configuration,
   private var yarnAllocator: YarnAllocationHandler = _
   private var isFinished: Boolean = false
   private var uiAddress: String = _
+  private var uiHistoryAddress: String = _
   private val maxAppAttempts: Int = 
conf.getInt(YarnConfiguration.RM_AM_MAX_RETRIES,
     YarnConfiguration.DEFAULT_RM_AM_MAX_RETRIES)
   private var isLastAMRetry: Boolean = true
@@ -237,6 +238,7 @@ class ApplicationMaster(args: ApplicationMasterArguments, 
conf: Configuration,
 
         if (null != sparkContext) {
           uiAddress = sparkContext.ui.appUIHostPort
+          uiHistoryAddress = 
YarnSparkHadoopUtil.getUIHistoryAddress(sparkContext, sparkConf)
           this.yarnAllocator = YarnAllocationHandler.newAllocator(
             yarnConf,
             resourceManager,
@@ -360,7 +362,7 @@ class ApplicationMaster(args: ApplicationMasterArguments, 
conf: Configuration,
         finishReq.setAppAttemptId(appAttemptId)
         finishReq.setFinishApplicationStatus(status)
         finishReq.setDiagnostics(diagnostics)
-        
finishReq.setTrackingUrl(sparkConf.get("spark.yarn.historyServer.address", ""))
+        finishReq.setTrackingUrl(uiHistoryAddress)
         resourceManager.finishApplicationMaster(finishReq)
       }
     }

http://git-wip-us.apache.org/repos/asf/spark/blob/46e224aa/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/ExecutorLauncher.scala
----------------------------------------------------------------------
diff --git 
a/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/ExecutorLauncher.scala 
b/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/ExecutorLauncher.scala
index a86ad25..d232c18 100644
--- 
a/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/ExecutorLauncher.scala
+++ 
b/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/ExecutorLauncher.scala
@@ -289,7 +289,7 @@ class ExecutorLauncher(args: ApplicationMasterArguments, 
conf: Configuration, sp
       .asInstanceOf[FinishApplicationMasterRequest]
     finishReq.setAppAttemptId(appAttemptId)
     finishReq.setFinishApplicationStatus(status)
-    finishReq.setTrackingUrl(sparkConf.get("spark.yarn.historyServer.address", 
""))
+    finishReq.setTrackingUrl(sparkConf.get("spark.driver.appUIHistoryAddress", 
""))
     resourceManager.finishApplicationMaster(finishReq)
   }
 

http://git-wip-us.apache.org/repos/asf/spark/blob/46e224aa/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtil.scala
----------------------------------------------------------------------
diff --git 
a/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtil.scala
 
b/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtil.scala
index 718cb19..e98308c 100644
--- 
a/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtil.scala
+++ 
b/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtil.scala
@@ -30,6 +30,9 @@ import org.apache.hadoop.util.StringInterner
 import org.apache.hadoop.yarn.conf.YarnConfiguration
 import org.apache.hadoop.yarn.api.ApplicationConstants
 import org.apache.hadoop.conf.Configuration
+
+import org.apache.spark.{SparkConf, SparkContext}
+import org.apache.spark.deploy.history.HistoryServer
 import org.apache.spark.deploy.SparkHadoopUtil
 
 /**
@@ -132,4 +135,17 @@ object YarnSparkHadoopUtil {
     }
   }
 
+  def getUIHistoryAddress(sc: SparkContext, conf: SparkConf) : String = {
+    val eventLogDir = sc.eventLogger match {
+      case Some(logger) => logger.getApplicationLogDir()
+      case None => ""
+    }
+    val historyServerAddress = conf.get("spark.yarn.historyServer.address", "")
+    if (historyServerAddress != "" && eventLogDir != "") {
+      historyServerAddress + HistoryServer.UI_PATH_PREFIX + s"/$eventLogDir"
+    } else {
+      ""
+    }
+  }
+
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/46e224aa/yarn/common/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientSchedulerBackend.scala
----------------------------------------------------------------------
diff --git 
a/yarn/common/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientSchedulerBackend.scala
 
b/yarn/common/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientSchedulerBackend.scala
index d8266f7..77b91f8 100644
--- 
a/yarn/common/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientSchedulerBackend.scala
+++ 
b/yarn/common/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientSchedulerBackend.scala
@@ -19,7 +19,7 @@ package org.apache.spark.scheduler.cluster
 
 import org.apache.hadoop.yarn.api.records.{ApplicationId, YarnApplicationState}
 import org.apache.spark.{SparkException, Logging, SparkContext}
-import org.apache.spark.deploy.yarn.{Client, ClientArguments, ExecutorLauncher}
+import org.apache.spark.deploy.yarn.{Client, ClientArguments, 
ExecutorLauncher, YarnSparkHadoopUtil}
 import org.apache.spark.scheduler.TaskSchedulerImpl
 
 import scala.collection.mutable.ArrayBuffer
@@ -54,6 +54,7 @@ private[spark] class YarnClientSchedulerBackend(
     val driverPort = conf.get("spark.driver.port")
     val hostport = driverHost + ":" + driverPort
     conf.set("spark.driver.appUIAddress", sc.ui.appUIHostPort)
+    conf.set("spark.driver.appUIHistoryAddress", 
YarnSparkHadoopUtil.getUIHistoryAddress(sc, conf))
 
     val argsArrayBuf = new ArrayBuffer[String]()
     argsArrayBuf += (

http://git-wip-us.apache.org/repos/asf/spark/blob/46e224aa/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala
----------------------------------------------------------------------
diff --git 
a/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala
 
b/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala
index eaf594c..035356d 100644
--- 
a/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala
+++ 
b/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala
@@ -59,6 +59,7 @@ class ApplicationMaster(args: ApplicationMasterArguments, 
conf: Configuration,
   private var yarnAllocator: YarnAllocationHandler = _
   private var isFinished: Boolean = false
   private var uiAddress: String = _
+  private var uiHistoryAddress: String = _
   private val maxAppAttempts: Int = conf.getInt(
     YarnConfiguration.RM_AM_MAX_ATTEMPTS, 
YarnConfiguration.DEFAULT_RM_AM_MAX_ATTEMPTS)
   private var isLastAMRetry: Boolean = true
@@ -216,6 +217,7 @@ class ApplicationMaster(args: ApplicationMasterArguments, 
conf: Configuration,
 
         if (sparkContext != null) {
           uiAddress = sparkContext.ui.appUIHostPort
+          uiHistoryAddress = 
YarnSparkHadoopUtil.getUIHistoryAddress(sparkContext, sparkConf)
           this.yarnAllocator = YarnAllocationHandler.newAllocator(
             yarnConf,
             amClient,
@@ -312,8 +314,7 @@ class ApplicationMaster(args: ApplicationMasterArguments, 
conf: Configuration,
 
       logInfo("Unregistering ApplicationMaster with " + status)
       if (registered) {
-        val trackingUrl = sparkConf.get("spark.yarn.historyServer.address", "")
-        amClient.unregisterApplicationMaster(status, diagnostics, trackingUrl)
+        amClient.unregisterApplicationMaster(status, diagnostics, 
uiHistoryAddress)
       }
     }
   }

http://git-wip-us.apache.org/repos/asf/spark/blob/46e224aa/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/ExecutorLauncher.scala
----------------------------------------------------------------------
diff --git 
a/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/ExecutorLauncher.scala
 
b/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/ExecutorLauncher.scala
index 5ac95f3..7158d94 100644
--- 
a/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/ExecutorLauncher.scala
+++ 
b/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/ExecutorLauncher.scala
@@ -250,7 +250,7 @@ class ExecutorLauncher(args: ApplicationMasterArguments, 
conf: Configuration, sp
 
   def finishApplicationMaster(status: FinalApplicationStatus) {
     logInfo("Unregistering ApplicationMaster with " + status)
-    val trackingUrl = sparkConf.get("spark.yarn.historyServer.address", "")
+    val trackingUrl = sparkConf.get("spark.driver.appUIHistoryAddress", "")
     amClient.unregisterApplicationMaster(status, "" /* appMessage */ , 
trackingUrl)
   }
 

Reply via email to