Repository: spark
Updated Branches:
  refs/heads/master b42ad165b -> 007ae6878


[SPARK-24003][CORE] Add support to provide spark.executor.extraJavaOptions in 
terms of App Id and/or Executor Id's

## What changes were proposed in this pull request?

Added support to specify the 'spark.executor.extraJavaOptions' value in terms 
of the `{{APP_ID}}` and/or `{{EXECUTOR_ID}}`,  `{{APP_ID}}` will be replaced by 
Application Id and `{{EXECUTOR_ID}}` will be replaced by Executor Id while 
starting the executor.

## How was this patch tested?

I have verified this by checking the executor process command and gc logs. I 
verified the same in different deployment modes(Standalone, YARN, Mesos) client 
and cluster modes.

Author: Devaraj K <[email protected]>

Closes #21088 from devaraj-kavali/SPARK-24003.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/007ae687
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/007ae687
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/007ae687

Branch: refs/heads/master
Commit: 007ae6878f4b4defe1f08114212fa7289fc9ee4a
Parents: b42ad16
Author: Devaraj K <[email protected]>
Authored: Mon Apr 30 13:40:03 2018 -0700
Committer: Marcelo Vanzin <[email protected]>
Committed: Mon Apr 30 13:40:03 2018 -0700

----------------------------------------------------------------------
 .../apache/spark/deploy/worker/ExecutorRunner.scala  |  8 ++++++--
 .../src/main/scala/org/apache/spark/util/Utils.scala | 15 +++++++++++++++
 docs/configuration.md                                |  5 +++++
 .../k8s/features/BasicExecutorFeatureStep.scala      |  4 +++-
 .../mesos/MesosCoarseGrainedSchedulerBackend.scala   |  4 +++-
 .../mesos/MesosFineGrainedSchedulerBackend.scala     |  4 +++-
 .../scala/org/apache/spark/deploy/yarn/Client.scala  |  8 ++++++--
 .../apache/spark/deploy/yarn/ExecutorRunnable.scala  |  3 ++-
 8 files changed, 43 insertions(+), 8 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/007ae687/core/src/main/scala/org/apache/spark/deploy/worker/ExecutorRunner.scala
----------------------------------------------------------------------
diff --git 
a/core/src/main/scala/org/apache/spark/deploy/worker/ExecutorRunner.scala 
b/core/src/main/scala/org/apache/spark/deploy/worker/ExecutorRunner.scala
index d4d8521..dc6a307 100644
--- a/core/src/main/scala/org/apache/spark/deploy/worker/ExecutorRunner.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/worker/ExecutorRunner.scala
@@ -25,7 +25,7 @@ import scala.collection.JavaConverters._
 import com.google.common.io.Files
 
 import org.apache.spark.{SecurityManager, SparkConf}
-import org.apache.spark.deploy.{ApplicationDescription, ExecutorState}
+import org.apache.spark.deploy.{ApplicationDescription, Command, ExecutorState}
 import org.apache.spark.deploy.DeployMessages.ExecutorStateChanged
 import org.apache.spark.internal.Logging
 import org.apache.spark.rpc.RpcEndpointRef
@@ -142,7 +142,11 @@ private[deploy] class ExecutorRunner(
   private def fetchAndRunExecutor() {
     try {
       // Launch the process
-      val builder = CommandUtils.buildProcessBuilder(appDesc.command, new 
SecurityManager(conf),
+      val subsOpts = appDesc.command.javaOpts.map {
+        Utils.substituteAppNExecIds(_, appId, execId.toString)
+      }
+      val subsCommand = appDesc.command.copy(javaOpts = subsOpts)
+      val builder = CommandUtils.buildProcessBuilder(subsCommand, new 
SecurityManager(conf),
         memory, sparkHome.getAbsolutePath, substituteVariables)
       val command = builder.command()
       val formattedCommand = command.asScala.mkString("\"", "\" \"", "\"")

http://git-wip-us.apache.org/repos/asf/spark/blob/007ae687/core/src/main/scala/org/apache/spark/util/Utils.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/util/Utils.scala 
b/core/src/main/scala/org/apache/spark/util/Utils.scala
index d2be932..dcad1b9 100644
--- a/core/src/main/scala/org/apache/spark/util/Utils.scala
+++ b/core/src/main/scala/org/apache/spark/util/Utils.scala
@@ -2689,6 +2689,21 @@ private[spark] object Utils extends Logging {
 
     s"k8s://$resolvedURL"
   }
+
+  /**
+   * Replaces all the {{EXECUTOR_ID}} occurrences with the Executor Id
+   * and {{APP_ID}} occurrences with the App Id.
+   */
+  def substituteAppNExecIds(opt: String, appId: String, execId: String): 
String = {
+    opt.replace("{{APP_ID}}", appId).replace("{{EXECUTOR_ID}}", execId)
+  }
+
+  /**
+   * Replaces all the {{APP_ID}} occurrences with the App Id.
+   */
+  def substituteAppId(opt: String, appId: String): String = {
+    opt.replace("{{APP_ID}}", appId)
+  }
 }
 
 private[util] object CallerContext extends Logging {

http://git-wip-us.apache.org/repos/asf/spark/blob/007ae687/docs/configuration.md
----------------------------------------------------------------------
diff --git a/docs/configuration.md b/docs/configuration.md
index fb02d7e..8a1aace 100644
--- a/docs/configuration.md
+++ b/docs/configuration.md
@@ -328,6 +328,11 @@ Apart from these, the following properties are also 
available, and may be useful
     Note that it is illegal to set Spark properties or maximum heap size 
(-Xmx) settings with this
     option. Spark properties should be set using a SparkConf object or the 
spark-defaults.conf file
     used with the spark-submit script. Maximum heap size settings can be set 
with spark.executor.memory.
+
+    The following symbols, if present will be interpolated: {{APP_ID}} will be 
replaced by
+    application ID and {{EXECUTOR_ID}} will be replaced by executor ID. For 
example, to enable
+    verbose gc logging to a file named for the executor ID of the app in /tmp, 
pass a 'value' of:
+    <code>-verbose:gc -Xloggc:/tmp/{{APP_ID}}-{{EXECUTOR_ID}}.gc</code>
   </td>
 </tr>
 <tr>

http://git-wip-us.apache.org/repos/asf/spark/blob/007ae687/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/BasicExecutorFeatureStep.scala
----------------------------------------------------------------------
diff --git 
a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/BasicExecutorFeatureStep.scala
 
b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/BasicExecutorFeatureStep.scala
index d220975..529069d 100644
--- 
a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/BasicExecutorFeatureStep.scala
+++ 
b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/BasicExecutorFeatureStep.scala
@@ -89,7 +89,9 @@ private[spark] class BasicExecutorFeatureStep(
     val executorExtraJavaOptionsEnv = kubernetesConf
       .get(EXECUTOR_JAVA_OPTIONS)
       .map { opts =>
-        val delimitedOpts = Utils.splitCommandString(opts)
+        val subsOpts = Utils.substituteAppNExecIds(opts, kubernetesConf.appId,
+          kubernetesConf.roleSpecificConf.executorId)
+        val delimitedOpts = Utils.splitCommandString(subsOpts)
         delimitedOpts.zipWithIndex.map {
           case (opt, index) =>
             new 
EnvVarBuilder().withName(s"$ENV_JAVA_OPT_PREFIX$index").withValue(opt).build()

http://git-wip-us.apache.org/repos/asf/spark/blob/007ae687/resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackend.scala
----------------------------------------------------------------------
diff --git 
a/resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackend.scala
 
b/resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackend.scala
index 53f5f61..9b75e4c 100644
--- 
a/resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackend.scala
+++ 
b/resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackend.scala
@@ -227,7 +227,9 @@ private[spark] class MesosCoarseGrainedSchedulerBackend(
       environment.addVariables(
         
Environment.Variable.newBuilder().setName("SPARK_EXECUTOR_CLASSPATH").setValue(cp).build())
     }
-    val extraJavaOpts = conf.get("spark.executor.extraJavaOptions", "")
+    val extraJavaOpts = conf.getOption("spark.executor.extraJavaOptions").map {
+      Utils.substituteAppNExecIds(_, appId, taskId)
+    }.getOrElse("")
 
     // Set the environment variable through a command prefix
     // to append to the existing value of the variable

http://git-wip-us.apache.org/repos/asf/spark/blob/007ae687/resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosFineGrainedSchedulerBackend.scala
----------------------------------------------------------------------
diff --git 
a/resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosFineGrainedSchedulerBackend.scala
 
b/resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosFineGrainedSchedulerBackend.scala
index d6d939d..71a70ff 100644
--- 
a/resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosFineGrainedSchedulerBackend.scala
+++ 
b/resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosFineGrainedSchedulerBackend.scala
@@ -111,7 +111,9 @@ private[spark] class MesosFineGrainedSchedulerBackend(
       environment.addVariables(
         
Environment.Variable.newBuilder().setName("SPARK_EXECUTOR_CLASSPATH").setValue(cp).build())
     }
-    val extraJavaOpts = 
sc.conf.getOption("spark.executor.extraJavaOptions").getOrElse("")
+    val extraJavaOpts = 
sc.conf.getOption("spark.executor.extraJavaOptions").map {
+      Utils.substituteAppNExecIds(_, appId, execId)
+    }.getOrElse("")
 
     val prefixEnv = sc.conf.getOption("spark.executor.extraLibraryPath").map { 
p =>
       Utils.libraryPathEnvPrefix(Seq(p))

http://git-wip-us.apache.org/repos/asf/spark/blob/007ae687/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala
----------------------------------------------------------------------
diff --git 
a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala
 
b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala
index 5763c3d..bafb129 100644
--- 
a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala
+++ 
b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala
@@ -892,7 +892,9 @@ private[spark] class Client(
     // Include driver-specific java options if we are launching a driver
     if (isClusterMode) {
       sparkConf.get(DRIVER_JAVA_OPTIONS).foreach { opts =>
-        javaOpts ++= 
Utils.splitCommandString(opts).map(YarnSparkHadoopUtil.escapeForShell)
+        javaOpts ++= Utils.splitCommandString(opts)
+          .map(Utils.substituteAppId(_, appId.toString))
+          .map(YarnSparkHadoopUtil.escapeForShell)
       }
       val libraryPaths = Seq(sparkConf.get(DRIVER_LIBRARY_PATH),
         sys.props.get("spark.driver.libraryPath")).flatten
@@ -914,7 +916,9 @@ private[spark] class Client(
             s"(was '$opts'). Use spark.yarn.am.memory instead."
           throw new SparkException(msg)
         }
-        javaOpts ++= 
Utils.splitCommandString(opts).map(YarnSparkHadoopUtil.escapeForShell)
+        javaOpts ++= Utils.splitCommandString(opts)
+          .map(Utils.substituteAppId(_, appId.toString))
+          .map(YarnSparkHadoopUtil.escapeForShell)
       }
       sparkConf.get(AM_LIBRARY_PATH).foreach { paths =>
         prefixEnv = Some(getClusterPath(sparkConf, 
Utils.libraryPathEnvPrefix(Seq(paths))))

http://git-wip-us.apache.org/repos/asf/spark/blob/007ae687/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/ExecutorRunnable.scala
----------------------------------------------------------------------
diff --git 
a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/ExecutorRunnable.scala
 
b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/ExecutorRunnable.scala
index ab08698..a2a18cd 100644
--- 
a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/ExecutorRunnable.scala
+++ 
b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/ExecutorRunnable.scala
@@ -141,7 +141,8 @@ private[yarn] class ExecutorRunnable(
 
     // Set extra Java options for the executor, if defined
     sparkConf.get(EXECUTOR_JAVA_OPTIONS).foreach { opts =>
-      javaOpts ++= 
Utils.splitCommandString(opts).map(YarnSparkHadoopUtil.escapeForShell)
+      val subsOpt = Utils.substituteAppNExecIds(opts, appId, executorId)
+      javaOpts ++= 
Utils.splitCommandString(subsOpt).map(YarnSparkHadoopUtil.escapeForShell)
     }
     sparkConf.get(EXECUTOR_LIBRARY_PATH).foreach { p =>
       prefixEnv = Some(Client.getClusterPath(sparkConf, 
Utils.libraryPathEnvPrefix(Seq(p))))


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to