Repository: spark
Updated Branches:
  refs/heads/branch-1.1 48a07490f -> 0f947f123


[SPARK-2886] Use more specific actor system name than "spark"

As of #1777 we log the name of the actor system when it binds to a port. The 
current name "spark" is super general and does not convey any meaning. For 
instance, the following line is taken from my driver log after setting 
`spark.driver.port` to 5001.
```
14/08/13 19:33:29 INFO Remoting: Remoting started; listening on addresses:
[akka.tcp://sparkandrews-mbp:5001]
14/08/13 19:33:29 INFO Remoting: Remoting now listens on addresses:
[akka.tcp://sparkandrews-mbp:5001]
14/08/06 13:40:05 INFO Utils: Successfully started service 'spark' on port 5001.
```
This commit renames this to "sparkDriver" and "sparkExecutor". The goal of this 
unambitious PR is simply to make the logged information more explicit without 
introducing any change in functionality.

Author: Andrew Or <andrewo...@gmail.com>

Closes #1810 from andrewor14/service-name and squashes the following commits:

8c459ed [Andrew Or] Use a common variable for driver/executor actor system names
3a92843 [Andrew Or] Change actor name to sparkDriver and sparkExecutor
921363e [Andrew Or] Merge branch 'master' of github.com:apache/spark into 
service-name
c8c6a62 [Andrew Or] Do not include hyphens in actor name
1c1b42e [Andrew Or] Avoid spaces in akka system name
f644b55 [Andrew Or] Use more specific service name

(cherry picked from commit b21ae5bbb9baa966f69303a30659aa8bbb2098da)
Signed-off-by: Andrew Or <andrewo...@gmail.com>


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/0f947f12
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/0f947f12
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/0f947f12

Branch: refs/heads/branch-1.1
Commit: 0f947f1239831a6ed3b47af65816715999bbe57b
Parents: 48a0749
Author: Andrew Or <andrewo...@gmail.com>
Authored: Mon Aug 25 23:36:09 2014 -0700
Committer: Andrew Or <andrewo...@gmail.com>
Committed: Mon Aug 25 23:36:21 2014 -0700

----------------------------------------------------------------------
 core/src/main/scala/org/apache/spark/SparkEnv.scala       |  9 ++++++---
 .../spark/scheduler/cluster/SimrSchedulerBackend.scala    |  8 +++++---
 .../scheduler/cluster/SparkDeploySchedulerBackend.scala   |  8 +++++---
 .../cluster/mesos/CoarseMesosSchedulerBackend.scala       |  5 +++--
 core/src/main/scala/org/apache/spark/util/AkkaUtils.scala |  5 +++--
 .../spark/streaming/receiver/ReceiverSupervisorImpl.scala | 10 +++++-----
 .../org/apache/spark/deploy/yarn/ExecutorLauncher.scala   |  9 ++++++---
 .../apache/spark/deploy/yarn/YarnAllocationHandler.scala  |  8 +++++---
 .../org/apache/spark/deploy/yarn/ExecutorLauncher.scala   |  9 ++++++---
 .../apache/spark/deploy/yarn/YarnAllocationHandler.scala  |  5 +++--
 10 files changed, 47 insertions(+), 29 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/0f947f12/core/src/main/scala/org/apache/spark/SparkEnv.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/SparkEnv.scala 
b/core/src/main/scala/org/apache/spark/SparkEnv.scala
index fc36e37..7271656 100644
--- a/core/src/main/scala/org/apache/spark/SparkEnv.scala
+++ b/core/src/main/scala/org/apache/spark/SparkEnv.scala
@@ -111,6 +111,9 @@ object SparkEnv extends Logging {
   private val env = new ThreadLocal[SparkEnv]
   @volatile private var lastSetSparkEnv : SparkEnv = _
 
+  private[spark] val driverActorSystemName = "sparkDriver"
+  private[spark] val executorActorSystemName = "sparkExecutor"
+
   def set(e: SparkEnv) {
     lastSetSparkEnv = e
     env.set(e)
@@ -146,9 +149,9 @@ object SparkEnv extends Logging {
     }
 
     val securityManager = new SecurityManager(conf)
-
-    val (actorSystem, boundPort) = AkkaUtils.createActorSystem("spark", 
hostname, port, conf = conf,
-      securityManager = securityManager)
+    val actorSystemName = if (isDriver) driverActorSystemName else 
executorActorSystemName
+    val (actorSystem, boundPort) = AkkaUtils.createActorSystem(
+      actorSystemName, hostname, port, conf, securityManager)
 
     // Figure out which port Akka actually bound to in case the original port 
is 0 or occupied.
     // This is so that we tell the executors the correct port to connect to.

http://git-wip-us.apache.org/repos/asf/spark/blob/0f947f12/core/src/main/scala/org/apache/spark/scheduler/cluster/SimrSchedulerBackend.scala
----------------------------------------------------------------------
diff --git 
a/core/src/main/scala/org/apache/spark/scheduler/cluster/SimrSchedulerBackend.scala
 
b/core/src/main/scala/org/apache/spark/scheduler/cluster/SimrSchedulerBackend.scala
index d99c761..4f7133c 100644
--- 
a/core/src/main/scala/org/apache/spark/scheduler/cluster/SimrSchedulerBackend.scala
+++ 
b/core/src/main/scala/org/apache/spark/scheduler/cluster/SimrSchedulerBackend.scala
@@ -20,7 +20,7 @@ package org.apache.spark.scheduler.cluster
 import org.apache.hadoop.conf.Configuration
 import org.apache.hadoop.fs.{Path, FileSystem}
 
-import org.apache.spark.{Logging, SparkContext}
+import org.apache.spark.{Logging, SparkContext, SparkEnv}
 import org.apache.spark.scheduler.TaskSchedulerImpl
 
 private[spark] class SimrSchedulerBackend(
@@ -38,8 +38,10 @@ private[spark] class SimrSchedulerBackend(
   override def start() {
     super.start()
 
-    val driverUrl = "akka.tcp://spark@%s:%s/user/%s".format(
-      sc.conf.get("spark.driver.host"), sc.conf.get("spark.driver.port"),
+    val driverUrl = "akka.tcp://%s@%s:%s/user/%s".format(
+      SparkEnv.driverActorSystemName,
+      sc.conf.get("spark.driver.host"),
+      sc.conf.get("spark.driver.port"),
       CoarseGrainedSchedulerBackend.ACTOR_NAME)
 
     val conf = new Configuration()

http://git-wip-us.apache.org/repos/asf/spark/blob/0f947f12/core/src/main/scala/org/apache/spark/scheduler/cluster/SparkDeploySchedulerBackend.scala
----------------------------------------------------------------------
diff --git 
a/core/src/main/scala/org/apache/spark/scheduler/cluster/SparkDeploySchedulerBackend.scala
 
b/core/src/main/scala/org/apache/spark/scheduler/cluster/SparkDeploySchedulerBackend.scala
index 589dba2..32138e5 100644
--- 
a/core/src/main/scala/org/apache/spark/scheduler/cluster/SparkDeploySchedulerBackend.scala
+++ 
b/core/src/main/scala/org/apache/spark/scheduler/cluster/SparkDeploySchedulerBackend.scala
@@ -17,7 +17,7 @@
 
 package org.apache.spark.scheduler.cluster
 
-import org.apache.spark.{Logging, SparkConf, SparkContext}
+import org.apache.spark.{Logging, SparkConf, SparkContext, SparkEnv}
 import org.apache.spark.deploy.{ApplicationDescription, Command}
 import org.apache.spark.deploy.client.{AppClient, AppClientListener}
 import org.apache.spark.scheduler.{ExecutorExited, ExecutorLossReason, 
SlaveLost, TaskSchedulerImpl}
@@ -42,8 +42,10 @@ private[spark] class SparkDeploySchedulerBackend(
     super.start()
 
     // The endpoint for executors to talk to us
-    val driverUrl = "akka.tcp://spark@%s:%s/user/%s".format(
-      conf.get("spark.driver.host"), conf.get("spark.driver.port"),
+    val driverUrl = "akka.tcp://%s@%s:%s/user/%s".format(
+      SparkEnv.driverActorSystemName,
+      conf.get("spark.driver.host"),
+      conf.get("spark.driver.port"),
       CoarseGrainedSchedulerBackend.ACTOR_NAME)
     val args = Seq(driverUrl, "{{EXECUTOR_ID}}", "{{HOSTNAME}}", "{{CORES}}", 
"{{WORKER_URL}}")
     val extraJavaOpts = sc.conf.getOption("spark.executor.extraJavaOptions")

http://git-wip-us.apache.org/repos/asf/spark/blob/0f947f12/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/CoarseMesosSchedulerBackend.scala
----------------------------------------------------------------------
diff --git 
a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/CoarseMesosSchedulerBackend.scala
 
b/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/CoarseMesosSchedulerBackend.scala
index 9f45400..f017250 100644
--- 
a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/CoarseMesosSchedulerBackend.scala
+++ 
b/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/CoarseMesosSchedulerBackend.scala
@@ -28,7 +28,7 @@ import org.apache.mesos.{Scheduler => MScheduler}
 import org.apache.mesos._
 import org.apache.mesos.Protos.{TaskInfo => MesosTaskInfo, TaskState => 
MesosTaskState, _}
 
-import org.apache.spark.{Logging, SparkContext, SparkException}
+import org.apache.spark.{Logging, SparkContext, SparkEnv, SparkException}
 import org.apache.spark.scheduler.TaskSchedulerImpl
 import org.apache.spark.scheduler.cluster.CoarseGrainedSchedulerBackend
 
@@ -130,7 +130,8 @@ private[spark] class CoarseMesosSchedulerBackend(
     }
     val command = CommandInfo.newBuilder()
       .setEnvironment(environment)
-    val driverUrl = "akka.tcp://spark@%s:%s/user/%s".format(
+    val driverUrl = "akka.tcp://%s@%s:%s/user/%s".format(
+      SparkEnv.driverActorSystemName,
       conf.get("spark.driver.host"),
       conf.get("spark.driver.port"),
       CoarseGrainedSchedulerBackend.ACTOR_NAME)

http://git-wip-us.apache.org/repos/asf/spark/blob/0f947f12/core/src/main/scala/org/apache/spark/util/AkkaUtils.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/util/AkkaUtils.scala 
b/core/src/main/scala/org/apache/spark/util/AkkaUtils.scala
index d6afb73..e2d32c8 100644
--- a/core/src/main/scala/org/apache/spark/util/AkkaUtils.scala
+++ b/core/src/main/scala/org/apache/spark/util/AkkaUtils.scala
@@ -27,7 +27,7 @@ import akka.pattern.ask
 import com.typesafe.config.ConfigFactory
 import org.apache.log4j.{Level, Logger}
 
-import org.apache.spark.{SparkException, Logging, SecurityManager, SparkConf}
+import org.apache.spark.{Logging, SecurityManager, SparkConf, SparkEnv, 
SparkException}
 
 /**
  * Various utility classes for working with Akka.
@@ -192,10 +192,11 @@ private[spark] object AkkaUtils extends Logging {
   }
 
   def makeDriverRef(name: String, conf: SparkConf, actorSystem: ActorSystem): 
ActorRef = {
+    val driverActorSystemName = SparkEnv.driverActorSystemName
     val driverHost: String = conf.get("spark.driver.host", "localhost")
     val driverPort: Int = conf.getInt("spark.driver.port", 7077)
     Utils.checkHost(driverHost, "Expected hostname")
-    val url = s"akka.tcp://spark@$driverHost:$driverPort/user/$name"
+    val url = 
s"akka.tcp://$driverActorSystemName@$driverHost:$driverPort/user/$name"
     val timeout = AkkaUtils.lookupTimeout(conf)
     logInfo(s"Connecting to $name: $url")
     Await.result(actorSystem.actorSelection(url).resolveOne(timeout), timeout)

http://git-wip-us.apache.org/repos/asf/spark/blob/0f947f12/streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceiverSupervisorImpl.scala
----------------------------------------------------------------------
diff --git 
a/streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceiverSupervisorImpl.scala
 
b/streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceiverSupervisorImpl.scala
index d934b9c..53a3e62 100644
--- 
a/streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceiverSupervisorImpl.scala
+++ 
b/streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceiverSupervisorImpl.scala
@@ -20,22 +20,21 @@ package org.apache.spark.streaming.receiver
 import java.nio.ByteBuffer
 import java.util.concurrent.atomic.AtomicLong
 
-import scala.collection.mutable.{SynchronizedBuffer, ArrayBuffer}
+import scala.collection.mutable.ArrayBuffer
 import scala.concurrent.Await
 
 import akka.actor.{Actor, Props}
 import akka.pattern.ask
 
+import com.google.common.base.Throwables
+
 import org.apache.spark.{Logging, SparkEnv}
-import org.apache.spark.storage.StreamBlockId
 import org.apache.spark.streaming.scheduler._
 import org.apache.spark.util.{Utils, AkkaUtils}
 import org.apache.spark.storage.StreamBlockId
 import org.apache.spark.streaming.scheduler.DeregisterReceiver
 import org.apache.spark.streaming.scheduler.AddBlock
-import scala.Some
 import org.apache.spark.streaming.scheduler.RegisterReceiver
-import com.google.common.base.Throwables
 
 /**
  * Concrete implementation of 
[[org.apache.spark.streaming.receiver.ReceiverSupervisor]]
@@ -56,7 +55,8 @@ private[streaming] class ReceiverSupervisorImpl(
   private val trackerActor = {
     val ip = env.conf.get("spark.driver.host", "localhost")
     val port = env.conf.getInt("spark.driver.port", 7077)
-    val url = "akka.tcp://spark@%s:%s/user/ReceiverTracker".format(ip, port)
+    val url = "akka.tcp://%s@%s:%s/user/ReceiverTracker".format(
+      SparkEnv.driverActorSystemName, ip, port)
     env.actorSystem.actorSelection(url)
   }
 

http://git-wip-us.apache.org/repos/asf/spark/blob/0f947f12/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/ExecutorLauncher.scala
----------------------------------------------------------------------
diff --git 
a/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/ExecutorLauncher.scala 
b/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/ExecutorLauncher.scala
index c3310fb..155dd88 100644
--- 
a/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/ExecutorLauncher.scala
+++ 
b/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/ExecutorLauncher.scala
@@ -28,7 +28,7 @@ import org.apache.hadoop.yarn.ipc.YarnRPC
 import org.apache.hadoop.yarn.util.{ConverterUtils, Records}
 import akka.actor._
 import akka.remote._
-import org.apache.spark.{Logging, SecurityManager, SparkConf}
+import org.apache.spark.{Logging, SecurityManager, SparkConf, SparkEnv}
 import org.apache.spark.util.{Utils, AkkaUtils}
 import org.apache.spark.scheduler.cluster.CoarseGrainedSchedulerBackend
 import 
org.apache.spark.scheduler.cluster.CoarseGrainedClusterMessages.AddWebUIFilter
@@ -210,8 +210,11 @@ class ExecutorLauncher(args: ApplicationMasterArguments, 
conf: Configuration, sp
     sparkConf.set("spark.driver.host",  driverHost)
     sparkConf.set("spark.driver.port",  driverPort.toString)
 
-    val driverUrl = "akka.tcp://spark@%s:%s/user/%s".format(
-      driverHost, driverPort.toString, 
CoarseGrainedSchedulerBackend.ACTOR_NAME)
+    val driverUrl = "akka.tcp://%s@%s:%s/user/%s".format(
+      SparkEnv.driverActorSystemName,
+      driverHost,
+      driverPort.toString,
+      CoarseGrainedSchedulerBackend.ACTOR_NAME)
 
     actor = actorSystem.actorOf(Props(new MonitorActor(driverUrl)), name = 
"YarnAM")
   }

http://git-wip-us.apache.org/repos/asf/spark/blob/0f947f12/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocationHandler.scala
----------------------------------------------------------------------
diff --git 
a/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocationHandler.scala
 
b/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocationHandler.scala
index 80e0162..568a6ef 100644
--- 
a/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocationHandler.scala
+++ 
b/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocationHandler.scala
@@ -26,7 +26,7 @@ import scala.collection
 import scala.collection.JavaConversions._
 import scala.collection.mutable.{ArrayBuffer, HashMap, HashSet}
 
-import org.apache.spark.{Logging, SparkConf}
+import org.apache.spark.{Logging, SparkConf, SparkEnv}
 import org.apache.spark.scheduler.{SplitInfo,TaskSchedulerImpl}
 import org.apache.spark.scheduler.cluster.CoarseGrainedSchedulerBackend
 import org.apache.spark.util.Utils
@@ -245,8 +245,10 @@ private[yarn] class YarnAllocationHandler(
           // Deallocate + allocate can result in reusing id's wrongly - so use 
a different counter
           // (executorIdCounter)
           val executorId = executorIdCounter.incrementAndGet().toString
-          val driverUrl = "akka.tcp://spark@%s:%s/user/%s".format(
-            sparkConf.get("spark.driver.host"), 
sparkConf.get("spark.driver.port"),
+          val driverUrl = "akka.tcp://%s@%s:%s/user/%s".format(
+            SparkEnv.driverActorSystemName,
+            sparkConf.get("spark.driver.host"),
+            sparkConf.get("spark.driver.port"),
             CoarseGrainedSchedulerBackend.ACTOR_NAME)
 
           logInfo("launching container on " + containerId + " host " + 
executorHostname)

http://git-wip-us.apache.org/repos/asf/spark/blob/0f947f12/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/ExecutorLauncher.scala
----------------------------------------------------------------------
diff --git 
a/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/ExecutorLauncher.scala
 
b/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/ExecutorLauncher.scala
index 45925f1..e093fe4 100644
--- 
a/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/ExecutorLauncher.scala
+++ 
b/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/ExecutorLauncher.scala
@@ -25,7 +25,7 @@ import org.apache.hadoop.yarn.api.protocolrecords._
 import org.apache.hadoop.yarn.conf.YarnConfiguration
 import akka.actor._
 import akka.remote._
-import org.apache.spark.{Logging, SecurityManager, SparkConf}
+import org.apache.spark.{Logging, SecurityManager, SparkConf, SparkEnv}
 import org.apache.spark.util.{Utils, AkkaUtils}
 import org.apache.spark.scheduler.cluster.CoarseGrainedSchedulerBackend
 import 
org.apache.spark.scheduler.cluster.CoarseGrainedClusterMessages.AddWebUIFilter
@@ -174,8 +174,11 @@ class ExecutorLauncher(args: ApplicationMasterArguments, 
conf: Configuration, sp
     sparkConf.set("spark.driver.host", driverHost)
     sparkConf.set("spark.driver.port", driverPort.toString)
 
-    val driverUrl = "akka.tcp://spark@%s:%s/user/%s".format(
-      driverHost, driverPort.toString, 
CoarseGrainedSchedulerBackend.ACTOR_NAME)
+    val driverUrl = "akka.tcp://%s@%s:%s/user/%s".format(
+      SparkEnv.driverActorSystemName,
+      driverHost,
+      driverPort.toString,
+      CoarseGrainedSchedulerBackend.ACTOR_NAME)
 
     actor = actorSystem.actorOf(Props(new MonitorActor(driverUrl)), name = 
"YarnAM")
   }

http://git-wip-us.apache.org/repos/asf/spark/blob/0f947f12/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocationHandler.scala
----------------------------------------------------------------------
diff --git 
a/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocationHandler.scala
 
b/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocationHandler.scala
index 29ccec2..0a46174 100644
--- 
a/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocationHandler.scala
+++ 
b/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocationHandler.scala
@@ -26,7 +26,7 @@ import scala.collection
 import scala.collection.JavaConversions._
 import scala.collection.mutable.{ArrayBuffer, HashMap, HashSet}
 
-import org.apache.spark.{Logging, SparkConf}
+import org.apache.spark.{Logging, SparkConf, SparkEnv}
 import org.apache.spark.scheduler.{SplitInfo,TaskSchedulerImpl}
 import org.apache.spark.scheduler.cluster.CoarseGrainedSchedulerBackend
 import org.apache.spark.util.Utils
@@ -262,7 +262,8 @@ private[yarn] class YarnAllocationHandler(
           numExecutorsRunning.decrementAndGet()
         } else {
           val executorId = executorIdCounter.incrementAndGet().toString
-          val driverUrl = "akka.tcp://spark@%s:%s/user/%s".format(
+          val driverUrl = "akka.tcp://%s@%s:%s/user/%s".format(
+            SparkEnv.driverActorSystemName,
             sparkConf.get("spark.driver.host"),
             sparkConf.get("spark.driver.port"),
             CoarseGrainedSchedulerBackend.ACTOR_NAME)


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to