Repository: spark
Updated Branches:
  refs/heads/master 4d26aca77 -> 186b497c9


[SPARK-3921] Fix CoarseGrainedExecutorBackend's arguments for Standalone mode

The goal of this patch is to fix the swapped arguments in standalone mode, 
which was caused by  
https://github.com/apache/spark/commit/79e45c9323455a51f25ed9acd0edd8682b4bbb88#diff-79391110e9f26657e415aa169a004998R153.

More details can be found in the JIRA: 
[SPARK-3921](https://issues.apache.org/jira/browse/SPARK-3921)

Tested in Standalone mode, but not in Mesos.

Author: Aaron Davidson <aa...@databricks.com>

Closes #2779 from aarondav/fix-standalone and squashes the following commits:

725227a [Aaron Davidson] Fix ExecutorRunnerTest
9d703fe [Aaron Davidson] [SPARK-3921] Fix CoarseGrainedExecutorBackend's 
arguments for Standalone mode


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/186b497c
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/186b497c
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/186b497c

Branch: refs/heads/master
Commit: 186b497c945cc7bbe7a21fef56a948dd1fd10774
Parents: 4d26aca
Author: Aaron Davidson <aa...@databricks.com>
Authored: Mon Oct 13 23:31:37 2014 -0700
Committer: Andrew Or <andrewo...@gmail.com>
Committed: Mon Oct 13 23:31:37 2014 -0700

----------------------------------------------------------------------
 .../org/apache/spark/deploy/worker/ExecutorRunner.scala   |  3 ++-
 .../spark/executor/CoarseGrainedExecutorBackend.scala     |  3 +++
 .../scheduler/cluster/SparkDeploySchedulerBackend.scala   |  3 ++-
 .../cluster/mesos/CoarseMesosSchedulerBackend.scala       |  8 ++++----
 .../apache/spark/deploy/worker/ExecutorRunnerTest.scala   | 10 ++++------
 5 files changed, 15 insertions(+), 12 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/186b497c/core/src/main/scala/org/apache/spark/deploy/worker/ExecutorRunner.scala
----------------------------------------------------------------------
diff --git 
a/core/src/main/scala/org/apache/spark/deploy/worker/ExecutorRunner.scala 
b/core/src/main/scala/org/apache/spark/deploy/worker/ExecutorRunner.scala
index 71650cd..71d7385 100644
--- a/core/src/main/scala/org/apache/spark/deploy/worker/ExecutorRunner.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/worker/ExecutorRunner.scala
@@ -111,13 +111,14 @@ private[spark] class ExecutorRunner(
     case "{{EXECUTOR_ID}}" => execId.toString
     case "{{HOSTNAME}}" => host
     case "{{CORES}}" => cores.toString
+    case "{{APP_ID}}" => appId
     case other => other
   }
 
   def getCommandSeq = {
     val command = Command(
       appDesc.command.mainClass,
-      appDesc.command.arguments.map(substituteVariables) ++ Seq(appId),
+      appDesc.command.arguments.map(substituteVariables),
       appDesc.command.environment,
       appDesc.command.classPathEntries,
       appDesc.command.libraryPathEntries,

http://git-wip-us.apache.org/repos/asf/spark/blob/186b497c/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala
----------------------------------------------------------------------
diff --git 
a/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala
 
b/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala
index 06061ed..c40a3e1 100644
--- 
a/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala
+++ 
b/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala
@@ -152,6 +152,9 @@ private[spark] object CoarseGrainedExecutorBackend extends 
Logging {
           "Usage: CoarseGrainedExecutorBackend <driverUrl> <executorId> 
<hostname> " +
           "<cores> <appid> [<workerUrl>] ")
         System.exit(1)
+
+      // NB: These arguments are provided by SparkDeploySchedulerBackend (for 
standalone mode)
+      // and CoarseMesosSchedulerBackend (for mesos mode).
       case 5 =>
         run(args(0), args(1), args(2), args(3).toInt, args(4), None)
       case x if x > 5 =>

http://git-wip-us.apache.org/repos/asf/spark/blob/186b497c/core/src/main/scala/org/apache/spark/scheduler/cluster/SparkDeploySchedulerBackend.scala
----------------------------------------------------------------------
diff --git 
a/core/src/main/scala/org/apache/spark/scheduler/cluster/SparkDeploySchedulerBackend.scala
 
b/core/src/main/scala/org/apache/spark/scheduler/cluster/SparkDeploySchedulerBackend.scala
index ed209d1..8c7de75 100644
--- 
a/core/src/main/scala/org/apache/spark/scheduler/cluster/SparkDeploySchedulerBackend.scala
+++ 
b/core/src/main/scala/org/apache/spark/scheduler/cluster/SparkDeploySchedulerBackend.scala
@@ -51,7 +51,8 @@ private[spark] class SparkDeploySchedulerBackend(
       conf.get("spark.driver.host"),
       conf.get("spark.driver.port"),
       CoarseGrainedSchedulerBackend.ACTOR_NAME)
-    val args = Seq(driverUrl, "{{EXECUTOR_ID}}", "{{HOSTNAME}}", "{{CORES}}", 
"{{WORKER_URL}}")
+    val args = Seq(driverUrl, "{{EXECUTOR_ID}}", "{{HOSTNAME}}", "{{CORES}}", 
"{{APP_ID}}",
+      "{{WORKER_URL}}")
     val extraJavaOpts = sc.conf.getOption("spark.executor.extraJavaOptions")
       .map(Utils.splitCommandString).getOrElse(Seq.empty)
     val classPathEntries = 
sc.conf.getOption("spark.executor.extraClassPath").toSeq.flatMap { cp =>

http://git-wip-us.apache.org/repos/asf/spark/blob/186b497c/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/CoarseMesosSchedulerBackend.scala
----------------------------------------------------------------------
diff --git 
a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/CoarseMesosSchedulerBackend.scala
 
b/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/CoarseMesosSchedulerBackend.scala
index 9082857..d7f88de 100644
--- 
a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/CoarseMesosSchedulerBackend.scala
+++ 
b/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/CoarseMesosSchedulerBackend.scala
@@ -150,17 +150,17 @@ private[spark] class CoarseMesosSchedulerBackend(
     if (uri == null) {
       val runScript = new File(executorSparkHome, 
"./bin/spark-class").getCanonicalPath
       command.setValue(
-        "\"%s\" org.apache.spark.executor.CoarseGrainedExecutorBackend %s %s 
%s %d".format(
-          runScript, driverUrl, offer.getSlaveId.getValue, offer.getHostname, 
numCores))
+        "\"%s\" org.apache.spark.executor.CoarseGrainedExecutorBackend %s %s 
%s %d %s".format(
+          runScript, driverUrl, offer.getSlaveId.getValue, offer.getHostname, 
numCores, appId))
     } else {
       // Grab everything to the first '.'. We'll use that and '*' to
       // glob the directory "correctly".
       val basename = uri.split('/').last.split('.').head
       command.setValue(
         ("cd %s*; " +
-          "./bin/spark-class 
org.apache.spark.executor.CoarseGrainedExecutorBackend %s %s %s %d")
+          "./bin/spark-class 
org.apache.spark.executor.CoarseGrainedExecutorBackend %s %s %s %d %s")
           .format(basename, driverUrl, offer.getSlaveId.getValue,
-            offer.getHostname, numCores))
+            offer.getHostname, numCores, appId))
       command.addUris(CommandInfo.URI.newBuilder().setValue(uri))
     }
     command.build()

http://git-wip-us.apache.org/repos/asf/spark/blob/186b497c/core/src/test/scala/org/apache/spark/deploy/worker/ExecutorRunnerTest.scala
----------------------------------------------------------------------
diff --git 
a/core/src/test/scala/org/apache/spark/deploy/worker/ExecutorRunnerTest.scala 
b/core/src/test/scala/org/apache/spark/deploy/worker/ExecutorRunnerTest.scala
index 39ab53c..5e2592e 100644
--- 
a/core/src/test/scala/org/apache/spark/deploy/worker/ExecutorRunnerTest.scala
+++ 
b/core/src/test/scala/org/apache/spark/deploy/worker/ExecutorRunnerTest.scala
@@ -26,14 +26,12 @@ import org.apache.spark.SparkConf
 
 class ExecutorRunnerTest extends FunSuite {
   test("command includes appId") {
-    def f(s:String) = new File(s)
+    val appId = "12345-worker321-9876"
     val sparkHome = sys.props.getOrElse("spark.test.home", 
fail("spark.test.home is not set!"))
     val appDesc = new ApplicationDescription("app name", Some(8), 500,
-      Command("foo", Seq(), Map(), Seq(), Seq(), Seq()), "appUiUrl")
-    val appId = "12345-worker321-9876"
-    val er = new ExecutorRunner(appId, 1, appDesc, 8, 500, null, "blah", 
"worker321", f(sparkHome),
-      f("ooga"), "blah", new SparkConf, ExecutorState.RUNNING)
-
+      Command("foo", Seq(appId), Map(), Seq(), Seq(), Seq()), "appUiUrl")
+    val er = new ExecutorRunner(appId, 1, appDesc, 8, 500, null, "blah", 
"worker321",
+      new File(sparkHome), new File("ooga"), "blah", new SparkConf, 
ExecutorState.RUNNING)
     assert(er.getCommandSeq.last === appId)
   }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to