Repository: spark
Updated Branches:
  refs/heads/master 0bfacd5c5 -> 424a86a1e


[SPARK-6175] Fix standalone executor log links when ephemeral ports or 
SPARK_PUBLIC_DNS are used

This patch fixes two issues with the executor log viewing links added in Spark 
1.3.  In standalone mode, the log URLs might include a port value of 0 rather 
than the actual bound port of the UI, which broke the ability to view logs from 
workers whose web UIs had been configured to bind to ephemeral ports.  In 
addition, the URLs used workers' local hostnames instead of respecting 
SPARK_PUBLIC_DNS, which prevented this feature from working properly on Spark 
EC2 clusters because the links would point to internal DNS names instead of 
external ones.

I included tests for both of these bugs:

- We now browse to the URLs and verify that they point to the expected pages.
- To test SPARK_PUBLIC_DNS, I changed the code that reads the environment 
variable to do so via `SparkConf.getenv`, then used a custom SparkConf subclass 
to mock the environment variable (this pattern is used elsewhere in Spark's 
tests).

Author: Josh Rosen <joshro...@databricks.com>

Closes #4903 from JoshRosen/SPARK-6175 and squashes the following commits:

5577f41 [Josh Rosen] Remove println
cfec135 [Josh Rosen] Use webUi.boundPort and publicAddress in log links
27918c7 [Josh Rosen] Add failing unit tests for standalone log URL viewing
c250fbe [Josh Rosen] Respect SparkConf in local-cluster Workers.
422a2ef [Josh Rosen] Use conf.getenv to read SPARK_PUBLIC_DNS


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/424a86a1
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/424a86a1
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/424a86a1

Branch: refs/heads/master
Commit: 424a86a1ed2a3e6dd54cf8b09fe2f13a1311b7e6
Parents: 0bfacd5
Author: Josh Rosen <joshro...@databricks.com>
Authored: Thu Mar 5 12:04:00 2015 -0800
Committer: Josh Rosen <joshro...@databricks.com>
Committed: Thu Mar 5 12:04:00 2015 -0800

----------------------------------------------------------------------
 .../apache/spark/deploy/LocalSparkCluster.scala |  2 +-
 .../org/apache/spark/deploy/master/Master.scala |  2 +-
 .../spark/deploy/worker/ExecutorRunner.scala    |  4 +-
 .../org/apache/spark/deploy/worker/Worker.scala |  9 ++--
 .../main/scala/org/apache/spark/ui/WebUI.scala  |  2 +-
 .../apache/spark/deploy/JsonProtocolSuite.scala |  2 +-
 .../spark/deploy/LogUrlsStandaloneSuite.scala   | 54 ++++++++++++++++----
 .../deploy/worker/ExecutorRunnerTest.scala      |  2 +-
 8 files changed, 57 insertions(+), 20 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/424a86a1/core/src/main/scala/org/apache/spark/deploy/LocalSparkCluster.scala
----------------------------------------------------------------------
diff --git 
a/core/src/main/scala/org/apache/spark/deploy/LocalSparkCluster.scala 
b/core/src/main/scala/org/apache/spark/deploy/LocalSparkCluster.scala
index 0401b15..3ab425a 100644
--- a/core/src/main/scala/org/apache/spark/deploy/LocalSparkCluster.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/LocalSparkCluster.scala
@@ -59,7 +59,7 @@ class LocalSparkCluster(
     /* Start the Workers */
     for (workerNum <- 1 to numWorkers) {
       val (workerSystem, _) = Worker.startSystemAndActor(localHostname, 0, 0, 
coresPerWorker,
-        memoryPerWorker, masters, null, Some(workerNum))
+        memoryPerWorker, masters, null, Some(workerNum), _conf)
       workerActorSystems += workerSystem
     }
 

http://git-wip-us.apache.org/repos/asf/spark/blob/424a86a1/core/src/main/scala/org/apache/spark/deploy/master/Master.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/deploy/master/Master.scala 
b/core/src/main/scala/org/apache/spark/deploy/master/Master.scala
index 4584b73..1581429 100644
--- a/core/src/main/scala/org/apache/spark/deploy/master/Master.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/master/Master.scala
@@ -96,7 +96,7 @@ private[spark] class Master(
   val webUi = new MasterWebUI(this, webUiPort)
 
   val masterPublicAddress = {
-    val envVar = System.getenv("SPARK_PUBLIC_DNS")
+    val envVar = conf.getenv("SPARK_PUBLIC_DNS")
     if (envVar != null) envVar else host
   }
 

http://git-wip-us.apache.org/repos/asf/spark/blob/424a86a1/core/src/main/scala/org/apache/spark/deploy/worker/ExecutorRunner.scala
----------------------------------------------------------------------
diff --git 
a/core/src/main/scala/org/apache/spark/deploy/worker/ExecutorRunner.scala 
b/core/src/main/scala/org/apache/spark/deploy/worker/ExecutorRunner.scala
index 066d46c..023f3c6 100644
--- a/core/src/main/scala/org/apache/spark/deploy/worker/ExecutorRunner.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/worker/ExecutorRunner.scala
@@ -44,6 +44,7 @@ private[spark] class ExecutorRunner(
     val workerId: String,
     val host: String,
     val webUiPort: Int,
+    val publicAddress: String,
     val sparkHome: File,
     val executorDir: File,
     val workerUrl: String,
@@ -140,7 +141,8 @@ private[spark] class ExecutorRunner(
       builder.environment.put("SPARK_LAUNCH_WITH_SCALA", "0")
 
       // Add webUI log urls
-      val baseUrl = 
s"http://$host:$webUiPort/logPage/?appId=$appId&executorId=$execId&logType=";
+      val baseUrl =
+        
s"http://$publicAddress:$webUiPort/logPage/?appId=$appId&executorId=$execId&logType=";
       builder.environment.put("SPARK_LOG_URL_STDERR", s"${baseUrl}stderr")
       builder.environment.put("SPARK_LOG_URL_STDOUT", s"${baseUrl}stdout")
 

http://git-wip-us.apache.org/repos/asf/spark/blob/424a86a1/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala 
b/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala
index 2473a90..f2e7418 100755
--- a/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala
@@ -121,7 +121,7 @@ private[spark] class Worker(
   val shuffleService = new StandaloneWorkerShuffleService(conf, securityMgr)
 
   val publicAddress = {
-    val envVar = System.getenv("SPARK_PUBLIC_DNS")
+    val envVar = conf.getenv("SPARK_PUBLIC_DNS")
     if (envVar != null) envVar else host
   }
   var webUi: WorkerWebUI = null
@@ -362,7 +362,8 @@ private[spark] class Worker(
             self,
             workerId,
             host,
-            webUiPort,
+            webUi.boundPort,
+            publicAddress,
             sparkHome,
             executorDir,
             akkaUrl,
@@ -538,10 +539,10 @@ private[spark] object Worker extends Logging {
       memory: Int,
       masterUrls: Array[String],
       workDir: String,
-      workerNumber: Option[Int] = None): (ActorSystem, Int) = {
+      workerNumber: Option[Int] = None,
+      conf: SparkConf = new SparkConf): (ActorSystem, Int) = {
 
     // The LocalSparkCluster runs multiple local sparkWorkerX actor systems
-    val conf = new SparkConf
     val systemName = "sparkWorker" + workerNumber.map(_.toString).getOrElse("")
     val actorName = "Worker"
     val securityMgr = new SecurityManager(conf)

http://git-wip-us.apache.org/repos/asf/spark/blob/424a86a1/core/src/main/scala/org/apache/spark/ui/WebUI.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/ui/WebUI.scala 
b/core/src/main/scala/org/apache/spark/ui/WebUI.scala
index 9be65a4..ec68837 100644
--- a/core/src/main/scala/org/apache/spark/ui/WebUI.scala
+++ b/core/src/main/scala/org/apache/spark/ui/WebUI.scala
@@ -47,7 +47,7 @@ private[spark] abstract class WebUI(
   protected val handlers = ArrayBuffer[ServletContextHandler]()
   protected var serverInfo: Option[ServerInfo] = None
   protected val localHostName = Utils.localHostName()
-  protected val publicHostName = 
Option(System.getenv("SPARK_PUBLIC_DNS")).getOrElse(localHostName)
+  protected val publicHostName = 
Option(conf.getenv("SPARK_PUBLIC_DNS")).getOrElse(localHostName)
   private val className = Utils.getFormattedClassName(this)
 
   def getBasePath: String = basePath

http://git-wip-us.apache.org/repos/asf/spark/blob/424a86a1/core/src/test/scala/org/apache/spark/deploy/JsonProtocolSuite.scala
----------------------------------------------------------------------
diff --git 
a/core/src/test/scala/org/apache/spark/deploy/JsonProtocolSuite.scala 
b/core/src/test/scala/org/apache/spark/deploy/JsonProtocolSuite.scala
index e955636..68b5776 100644
--- a/core/src/test/scala/org/apache/spark/deploy/JsonProtocolSuite.scala
+++ b/core/src/test/scala/org/apache/spark/deploy/JsonProtocolSuite.scala
@@ -119,7 +119,7 @@ class JsonProtocolSuite extends FunSuite {
 
   def createExecutorRunner(): ExecutorRunner = {
     new ExecutorRunner("appId", 123, createAppDesc(), 4, 1234, null, 
"workerId", "host", 123,
-      new File("sparkHome"), new File("workDir"), "akka://worker",
+      "publicAddress", new File("sparkHome"), new File("workDir"), 
"akka://worker",
       new SparkConf, Seq("localDir"), ExecutorState.RUNNING)
   }
 

http://git-wip-us.apache.org/repos/asf/spark/blob/424a86a1/core/src/test/scala/org/apache/spark/deploy/LogUrlsStandaloneSuite.scala
----------------------------------------------------------------------
diff --git 
a/core/src/test/scala/org/apache/spark/deploy/LogUrlsStandaloneSuite.scala 
b/core/src/test/scala/org/apache/spark/deploy/LogUrlsStandaloneSuite.scala
index f33bdc7..54dd7c9 100644
--- a/core/src/test/scala/org/apache/spark/deploy/LogUrlsStandaloneSuite.scala
+++ b/core/src/test/scala/org/apache/spark/deploy/LogUrlsStandaloneSuite.scala
@@ -17,35 +17,69 @@
 
 package org.apache.spark.deploy
 
+import java.net.URL
+
 import scala.collection.mutable
+import scala.io.Source
 
-import org.scalatest.{BeforeAndAfter, FunSuite}
+import org.scalatest.FunSuite
 
 import org.apache.spark.scheduler.cluster.ExecutorInfo
 import org.apache.spark.scheduler.{SparkListenerExecutorAdded, SparkListener}
-import org.apache.spark.{SparkContext, LocalSparkContext}
+import org.apache.spark.{SparkConf, SparkContext, LocalSparkContext}
 
-class LogUrlsStandaloneSuite extends FunSuite with LocalSparkContext with 
BeforeAndAfter {
+class LogUrlsStandaloneSuite extends FunSuite with LocalSparkContext {
 
   /** Length of time to wait while draining listener events. */
-  val WAIT_TIMEOUT_MILLIS = 10000
+  private val WAIT_TIMEOUT_MILLIS = 10000
 
-  before {
+  test("verify that correct log urls get propagated from workers") {
     sc = new SparkContext("local-cluster[2,1,512]", "test")
+
+    val listener = new SaveExecutorInfo
+    sc.addSparkListener(listener)
+
+    // Trigger a job so that executors get added
+    sc.parallelize(1 to 100, 4).map(_.toString).count()
+
+    assert(sc.listenerBus.waitUntilEmpty(WAIT_TIMEOUT_MILLIS))
+    listener.addedExecutorInfos.values.foreach { info =>
+      assert(info.logUrlMap.nonEmpty)
+      // Browse to each URL to check that it's valid
+      info.logUrlMap.foreach { case (logType, logUrl) =>
+        val html = Source.fromURL(logUrl).mkString
+        assert(html.contains(s"$logType log page"))
+      }
+    }
   }
 
-  test("verify log urls get propagated from workers") {
+  test("verify that log urls reflect SPARK_PUBLIC_DNS (SPARK-6175)") {
+    val SPARK_PUBLIC_DNS = "public_dns"
+    class MySparkConf extends SparkConf(false) {
+      override def getenv(name: String) = {
+        if (name == "SPARK_PUBLIC_DNS") SPARK_PUBLIC_DNS
+        else super.getenv(name)
+      }
+
+      override def clone: SparkConf = {
+        new MySparkConf().setAll(getAll)
+      }
+    }
+    val conf = new MySparkConf()
+    sc = new SparkContext("local-cluster[2,1,512]", "test", conf)
+
     val listener = new SaveExecutorInfo
     sc.addSparkListener(listener)
 
-    val rdd1 = sc.parallelize(1 to 100, 4)
-    val rdd2 = rdd1.map(_.toString)
-    rdd2.setName("Target RDD")
-    rdd2.count()
+    // Trigger a job so that executors get added
+    sc.parallelize(1 to 100, 4).map(_.toString).count()
 
     assert(sc.listenerBus.waitUntilEmpty(WAIT_TIMEOUT_MILLIS))
     listener.addedExecutorInfos.values.foreach { info =>
       assert(info.logUrlMap.nonEmpty)
+      info.logUrlMap.values.foreach { logUrl =>
+        assert(new URL(logUrl).getHost === SPARK_PUBLIC_DNS)
+      }
     }
   }
 

http://git-wip-us.apache.org/repos/asf/spark/blob/424a86a1/core/src/test/scala/org/apache/spark/deploy/worker/ExecutorRunnerTest.scala
----------------------------------------------------------------------
diff --git 
a/core/src/test/scala/org/apache/spark/deploy/worker/ExecutorRunnerTest.scala 
b/core/src/test/scala/org/apache/spark/deploy/worker/ExecutorRunnerTest.scala
index 7651169..6fca632 100644
--- 
a/core/src/test/scala/org/apache/spark/deploy/worker/ExecutorRunnerTest.scala
+++ 
b/core/src/test/scala/org/apache/spark/deploy/worker/ExecutorRunnerTest.scala
@@ -33,7 +33,7 @@ class ExecutorRunnerTest extends FunSuite {
     val appDesc = new ApplicationDescription("app name", Some(8), 500,
       Command("foo", Seq(appId), Map(), Seq(), Seq(), Seq()), "appUiUrl")
     val er = new ExecutorRunner(appId, 1, appDesc, 8, 500, null, "blah", 
"worker321", 123,
-      new File(sparkHome), new File("ooga"), "blah", new SparkConf, 
Seq("localDir"),
+      "publicAddr", new File(sparkHome), new File("ooga"), "blah", new 
SparkConf, Seq("localDir"),
       ExecutorState.RUNNING)
     val builder = CommandUtils.buildProcessBuilder(appDesc.command, 512, 
sparkHome, er.substituteVariables)
     assert(builder.command().last === appId)


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to