Repository: spark
Updated Branches:
refs/heads/master b42ad165b -> 007ae6878
[SPARK-24003][CORE] Add support to provide spark.executor.extraJavaOptions in
terms of App Id and/or Executor Id's
## What changes were proposed in this pull request?
Added support to specify the 'spark.executor.extraJavaOptions' value in terms
of the `{{APP_ID}}` and/or `{{EXECUTOR_ID}}`, `{{APP_ID}}` will be replaced by
Application Id and `{{EXECUTOR_ID}}` will be replaced by Executor Id while
starting the executor.
## How was this patch tested?
I have verified this by checking the executor process command and gc logs. I
verified the same in different deployment modes(Standalone, YARN, Mesos) client
and cluster modes.
Author: Devaraj K <[email protected]>
Closes #21088 from devaraj-kavali/SPARK-24003.
Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/007ae687
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/007ae687
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/007ae687
Branch: refs/heads/master
Commit: 007ae6878f4b4defe1f08114212fa7289fc9ee4a
Parents: b42ad16
Author: Devaraj K <[email protected]>
Authored: Mon Apr 30 13:40:03 2018 -0700
Committer: Marcelo Vanzin <[email protected]>
Committed: Mon Apr 30 13:40:03 2018 -0700
----------------------------------------------------------------------
.../apache/spark/deploy/worker/ExecutorRunner.scala | 8 ++++++--
.../src/main/scala/org/apache/spark/util/Utils.scala | 15 +++++++++++++++
docs/configuration.md | 5 +++++
.../k8s/features/BasicExecutorFeatureStep.scala | 4 +++-
.../mesos/MesosCoarseGrainedSchedulerBackend.scala | 4 +++-
.../mesos/MesosFineGrainedSchedulerBackend.scala | 4 +++-
.../scala/org/apache/spark/deploy/yarn/Client.scala | 8 ++++++--
.../apache/spark/deploy/yarn/ExecutorRunnable.scala | 3 ++-
8 files changed, 43 insertions(+), 8 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/spark/blob/007ae687/core/src/main/scala/org/apache/spark/deploy/worker/ExecutorRunner.scala
----------------------------------------------------------------------
diff --git
a/core/src/main/scala/org/apache/spark/deploy/worker/ExecutorRunner.scala
b/core/src/main/scala/org/apache/spark/deploy/worker/ExecutorRunner.scala
index d4d8521..dc6a307 100644
--- a/core/src/main/scala/org/apache/spark/deploy/worker/ExecutorRunner.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/worker/ExecutorRunner.scala
@@ -25,7 +25,7 @@ import scala.collection.JavaConverters._
import com.google.common.io.Files
import org.apache.spark.{SecurityManager, SparkConf}
-import org.apache.spark.deploy.{ApplicationDescription, ExecutorState}
+import org.apache.spark.deploy.{ApplicationDescription, Command, ExecutorState}
import org.apache.spark.deploy.DeployMessages.ExecutorStateChanged
import org.apache.spark.internal.Logging
import org.apache.spark.rpc.RpcEndpointRef
@@ -142,7 +142,11 @@ private[deploy] class ExecutorRunner(
private def fetchAndRunExecutor() {
try {
// Launch the process
- val builder = CommandUtils.buildProcessBuilder(appDesc.command, new
SecurityManager(conf),
+ val subsOpts = appDesc.command.javaOpts.map {
+ Utils.substituteAppNExecIds(_, appId, execId.toString)
+ }
+ val subsCommand = appDesc.command.copy(javaOpts = subsOpts)
+ val builder = CommandUtils.buildProcessBuilder(subsCommand, new
SecurityManager(conf),
memory, sparkHome.getAbsolutePath, substituteVariables)
val command = builder.command()
val formattedCommand = command.asScala.mkString("\"", "\" \"", "\"")
http://git-wip-us.apache.org/repos/asf/spark/blob/007ae687/core/src/main/scala/org/apache/spark/util/Utils.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/util/Utils.scala
b/core/src/main/scala/org/apache/spark/util/Utils.scala
index d2be932..dcad1b9 100644
--- a/core/src/main/scala/org/apache/spark/util/Utils.scala
+++ b/core/src/main/scala/org/apache/spark/util/Utils.scala
@@ -2689,6 +2689,21 @@ private[spark] object Utils extends Logging {
s"k8s://$resolvedURL"
}
+
+ /**
+ * Replaces all the {{EXECUTOR_ID}} occurrences with the Executor Id
+ * and {{APP_ID}} occurrences with the App Id.
+ */
+ def substituteAppNExecIds(opt: String, appId: String, execId: String):
String = {
+ opt.replace("{{APP_ID}}", appId).replace("{{EXECUTOR_ID}}", execId)
+ }
+
+ /**
+ * Replaces all the {{APP_ID}} occurrences with the App Id.
+ */
+ def substituteAppId(opt: String, appId: String): String = {
+ opt.replace("{{APP_ID}}", appId)
+ }
}
private[util] object CallerContext extends Logging {
http://git-wip-us.apache.org/repos/asf/spark/blob/007ae687/docs/configuration.md
----------------------------------------------------------------------
diff --git a/docs/configuration.md b/docs/configuration.md
index fb02d7e..8a1aace 100644
--- a/docs/configuration.md
+++ b/docs/configuration.md
@@ -328,6 +328,11 @@ Apart from these, the following properties are also
available, and may be useful
Note that it is illegal to set Spark properties or maximum heap size
(-Xmx) settings with this
option. Spark properties should be set using a SparkConf object or the
spark-defaults.conf file
used with the spark-submit script. Maximum heap size settings can be set
with spark.executor.memory.
+
+ The following symbols, if present will be interpolated: {{APP_ID}} will be
replaced by
+ application ID and {{EXECUTOR_ID}} will be replaced by executor ID. For
example, to enable
+ verbose gc logging to a file named for the executor ID of the app in /tmp,
pass a 'value' of:
+ <code>-verbose:gc -Xloggc:/tmp/{{APP_ID}}-{{EXECUTOR_ID}}.gc</code>
</td>
</tr>
<tr>
http://git-wip-us.apache.org/repos/asf/spark/blob/007ae687/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/BasicExecutorFeatureStep.scala
----------------------------------------------------------------------
diff --git
a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/BasicExecutorFeatureStep.scala
b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/BasicExecutorFeatureStep.scala
index d220975..529069d 100644
---
a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/BasicExecutorFeatureStep.scala
+++
b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/BasicExecutorFeatureStep.scala
@@ -89,7 +89,9 @@ private[spark] class BasicExecutorFeatureStep(
val executorExtraJavaOptionsEnv = kubernetesConf
.get(EXECUTOR_JAVA_OPTIONS)
.map { opts =>
- val delimitedOpts = Utils.splitCommandString(opts)
+ val subsOpts = Utils.substituteAppNExecIds(opts, kubernetesConf.appId,
+ kubernetesConf.roleSpecificConf.executorId)
+ val delimitedOpts = Utils.splitCommandString(subsOpts)
delimitedOpts.zipWithIndex.map {
case (opt, index) =>
new
EnvVarBuilder().withName(s"$ENV_JAVA_OPT_PREFIX$index").withValue(opt).build()
http://git-wip-us.apache.org/repos/asf/spark/blob/007ae687/resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackend.scala
----------------------------------------------------------------------
diff --git
a/resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackend.scala
b/resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackend.scala
index 53f5f61..9b75e4c 100644
---
a/resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackend.scala
+++
b/resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackend.scala
@@ -227,7 +227,9 @@ private[spark] class MesosCoarseGrainedSchedulerBackend(
environment.addVariables(
Environment.Variable.newBuilder().setName("SPARK_EXECUTOR_CLASSPATH").setValue(cp).build())
}
- val extraJavaOpts = conf.get("spark.executor.extraJavaOptions", "")
+ val extraJavaOpts = conf.getOption("spark.executor.extraJavaOptions").map {
+ Utils.substituteAppNExecIds(_, appId, taskId)
+ }.getOrElse("")
// Set the environment variable through a command prefix
// to append to the existing value of the variable
http://git-wip-us.apache.org/repos/asf/spark/blob/007ae687/resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosFineGrainedSchedulerBackend.scala
----------------------------------------------------------------------
diff --git
a/resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosFineGrainedSchedulerBackend.scala
b/resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosFineGrainedSchedulerBackend.scala
index d6d939d..71a70ff 100644
---
a/resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosFineGrainedSchedulerBackend.scala
+++
b/resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosFineGrainedSchedulerBackend.scala
@@ -111,7 +111,9 @@ private[spark] class MesosFineGrainedSchedulerBackend(
environment.addVariables(
Environment.Variable.newBuilder().setName("SPARK_EXECUTOR_CLASSPATH").setValue(cp).build())
}
- val extraJavaOpts =
sc.conf.getOption("spark.executor.extraJavaOptions").getOrElse("")
+ val extraJavaOpts =
sc.conf.getOption("spark.executor.extraJavaOptions").map {
+ Utils.substituteAppNExecIds(_, appId, execId)
+ }.getOrElse("")
val prefixEnv = sc.conf.getOption("spark.executor.extraLibraryPath").map {
p =>
Utils.libraryPathEnvPrefix(Seq(p))
http://git-wip-us.apache.org/repos/asf/spark/blob/007ae687/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala
----------------------------------------------------------------------
diff --git
a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala
b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala
index 5763c3d..bafb129 100644
---
a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala
+++
b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala
@@ -892,7 +892,9 @@ private[spark] class Client(
// Include driver-specific java options if we are launching a driver
if (isClusterMode) {
sparkConf.get(DRIVER_JAVA_OPTIONS).foreach { opts =>
- javaOpts ++=
Utils.splitCommandString(opts).map(YarnSparkHadoopUtil.escapeForShell)
+ javaOpts ++= Utils.splitCommandString(opts)
+ .map(Utils.substituteAppId(_, appId.toString))
+ .map(YarnSparkHadoopUtil.escapeForShell)
}
val libraryPaths = Seq(sparkConf.get(DRIVER_LIBRARY_PATH),
sys.props.get("spark.driver.libraryPath")).flatten
@@ -914,7 +916,9 @@ private[spark] class Client(
s"(was '$opts'). Use spark.yarn.am.memory instead."
throw new SparkException(msg)
}
- javaOpts ++=
Utils.splitCommandString(opts).map(YarnSparkHadoopUtil.escapeForShell)
+ javaOpts ++= Utils.splitCommandString(opts)
+ .map(Utils.substituteAppId(_, appId.toString))
+ .map(YarnSparkHadoopUtil.escapeForShell)
}
sparkConf.get(AM_LIBRARY_PATH).foreach { paths =>
prefixEnv = Some(getClusterPath(sparkConf,
Utils.libraryPathEnvPrefix(Seq(paths))))
http://git-wip-us.apache.org/repos/asf/spark/blob/007ae687/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/ExecutorRunnable.scala
----------------------------------------------------------------------
diff --git
a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/ExecutorRunnable.scala
b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/ExecutorRunnable.scala
index ab08698..a2a18cd 100644
---
a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/ExecutorRunnable.scala
+++
b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/ExecutorRunnable.scala
@@ -141,7 +141,8 @@ private[yarn] class ExecutorRunnable(
// Set extra Java options for the executor, if defined
sparkConf.get(EXECUTOR_JAVA_OPTIONS).foreach { opts =>
- javaOpts ++=
Utils.splitCommandString(opts).map(YarnSparkHadoopUtil.escapeForShell)
+ val subsOpt = Utils.substituteAppNExecIds(opts, appId, executorId)
+ javaOpts ++=
Utils.splitCommandString(subsOpt).map(YarnSparkHadoopUtil.escapeForShell)
}
sparkConf.get(EXECUTOR_LIBRARY_PATH).foreach { p =>
prefixEnv = Some(Client.getClusterPath(sparkConf,
Utils.libraryPathEnvPrefix(Seq(p))))
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]