Repository: spark
Updated Branches:
  refs/heads/master 520087224 -> 4bec84b6a


SPARK-1569 Spark on Yarn, authentication broken by pr299

Pass the configs as java options since the executor needs to know before it 
registers whether to create the connection using authentication or not.    We 
could see about passing only the authentication configs but for now I just had 
it pass them all.

I also updating it to use a list to construct the command to make it the same 
as ClientBase and avoid any issues with spaces.

Author: Thomas Graves <[email protected]>

Closes #649 from tgravescs/SPARK-1569 and squashes the following commits:

0178ab8 [Thomas Graves] add akka settings
22a8735 [Thomas Graves] Change to only path spark.auth* configs
8ccc1d4 [Thomas Graves] SPARK-1569 Spark on Yarn, authentication broken


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/4bec84b6
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/4bec84b6
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/4bec84b6

Branch: refs/heads/master
Commit: 4bec84b6a23e1e642708a70a6c7ef7b3d1df9b3e
Parents: 5200872
Author: Thomas Graves <[email protected]>
Authored: Wed May 7 15:51:53 2014 -0700
Committer: Patrick Wendell <[email protected]>
Committed: Wed May 7 15:51:53 2014 -0700

----------------------------------------------------------------------
 .../deploy/yarn/ExecutorRunnableUtil.scala      | 49 ++++++++++++--------
 1 file changed, 30 insertions(+), 19 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/4bec84b6/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ExecutorRunnableUtil.scala
----------------------------------------------------------------------
diff --git 
a/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ExecutorRunnableUtil.scala
 
b/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ExecutorRunnableUtil.scala
index 96f8aa9..32f8861 100644
--- 
a/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ExecutorRunnableUtil.scala
+++ 
b/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ExecutorRunnableUtil.scala
@@ -21,7 +21,7 @@ import java.io.File
 import java.net.URI
 
 import scala.collection.JavaConversions._
-import scala.collection.mutable.HashMap
+import scala.collection.mutable.{HashMap, ListBuffer}
 
 import org.apache.hadoop.fs.Path
 import org.apache.hadoop.yarn.api._
@@ -44,9 +44,9 @@ trait ExecutorRunnableUtil extends Logging {
       hostname: String,
       executorMemory: Int,
       executorCores: Int,
-      localResources: HashMap[String, LocalResource]) = {
+      localResources: HashMap[String, LocalResource]): List[String] = {
     // Extra options for the JVM
-    var JAVA_OPTS = ""
+    val JAVA_OPTS = ListBuffer[String]()
     // Set the JVM memory
     val executorMemoryString = executorMemory + "m"
     JAVA_OPTS += "-Xms" + executorMemoryString + " -Xmx" + 
executorMemoryString + " "
@@ -56,10 +56,21 @@ trait ExecutorRunnableUtil extends Logging {
       JAVA_OPTS += opts
     }
 
-    JAVA_OPTS += " -Djava.io.tmpdir=" +
-      new Path(Environment.PWD.$(), 
YarnConfiguration.DEFAULT_CONTAINER_TEMP_DIR) + " "
+    JAVA_OPTS += "-Djava.io.tmpdir=" +
+      new Path(Environment.PWD.$(), 
YarnConfiguration.DEFAULT_CONTAINER_TEMP_DIR)
     JAVA_OPTS += ClientBase.getLog4jConfiguration(localResources)
 
+    // Certain configs need to be passed here because they are needed before 
the Executor
+    // registers with the Scheduler and transfers the spark configs. Since the 
Executor backend
+    // uses Akka to connect to the scheduler, the akka settings are needed as 
well as the
+    // authentication settings.
+    sparkConf.getAll.
+      filter { case (k, v) => k.startsWith("spark.auth") || 
k.startsWith("spark.akka") }.
+      foreach { case (k, v) => JAVA_OPTS += "-D" + k + "=" + "\\\"" + v + 
"\\\"" }
+
+    sparkConf.getAkkaConf.
+      foreach { case (k, v) => JAVA_OPTS += "-D" + k + "=" + "\\\"" + v + 
"\\\"" }
+
     // Commenting it out for now - so that people can refer to the properties 
if required. Remove
     // it once cpuset version is pushed out.
     // The context is, default gc for server class machines end up using all 
cores to do gc - hence
@@ -85,25 +96,25 @@ trait ExecutorRunnableUtil extends Logging {
         }
     */
 
-    val commands = List[String](
-      Environment.JAVA_HOME.$() + "/bin/java" +
-      " -server " +
+    val commands = Seq(Environment.JAVA_HOME.$() + "/bin/java",
+      "-server",
       // Kill if OOM is raised - leverage yarn's failure handling to cause 
rescheduling.
       // Not killing the task leaves various aspects of the executor and (to 
some extent) the jvm in
       // an inconsistent state.
       // TODO: If the OOM is not recoverable by rescheduling it on different 
node, then do
       // 'something' to fail job ... akin to blacklisting trackers in mapred ?
-      " -XX:OnOutOfMemoryError='kill %p' " +
-      JAVA_OPTS +
-      " org.apache.spark.executor.CoarseGrainedExecutorBackend " +
-      masterAddress + " " +
-      slaveId + " " +
-      hostname + " " +
-      executorCores +
-      " 1> " + ApplicationConstants.LOG_DIR_EXPANSION_VAR + "/stdout" +
-      " 2> " + ApplicationConstants.LOG_DIR_EXPANSION_VAR + "/stderr")
-
-    commands
+      "-XX:OnOutOfMemoryError='kill %p'") ++
+      JAVA_OPTS ++
+      Seq("org.apache.spark.executor.CoarseGrainedExecutorBackend",
+      masterAddress.toString,
+      slaveId.toString,
+      hostname.toString,
+      executorCores.toString,
+      "1>", ApplicationConstants.LOG_DIR_EXPANSION_VAR + "/stdout",
+      "2>", ApplicationConstants.LOG_DIR_EXPANSION_VAR + "/stderr")
+
+    // TODO: it would be nicer to just make sure there are no null commands 
here
+    commands.map(s => if (s == null) "null" else s).toList
   }
 
   private def setupDistributedCache(

Reply via email to