Repository: spark
Updated Branches:
  refs/heads/master 35e0db2d4 -> 033d80815


[SPARK-12384] Enables spark-clients to set the min(-Xms) and max(*.memory 
config) j…

## What changes were proposed in this pull request?

Currently Spark clients are started with the same memory setting for Xms and 
Xms leading to reserving unnecessary higher amounts of memory.
This behavior is changed and the clients can now specify an initial heap size 
using the extraJavaOptions in the config for driver,executor and am 
individually.
 Note, that only -Xms can be provided through this config option, if the client 
wants to set the max size(-Xmx), this has to be done via the *.memory 
configuration knobs which are currently supported.

## How was this patch tested?

Monitored executor and yarn logs in debug mode to verify the commands through 
which they are being launched in client and cluster mode. The driver memory was 
verified locally using jps -v. Setting up -Xmx parameter in the 
javaExtraOptions raises exception with the info provided.

Author: Dhruve Ashar <dhruveas...@gmail.com>

Closes #12115 from dhruve/impr/SPARK-12384.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/033d8081
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/033d8081
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/033d8081

Branch: refs/heads/master
Commit: 033d8081525a7137085ec898e2426a58056ee2b8
Parents: 35e0db2
Author: Dhruve Ashar <dhruveas...@gmail.com>
Authored: Thu Apr 7 10:39:21 2016 -0500
Committer: Tom Graves <tgra...@yahoo-inc.com>
Committed: Thu Apr 7 10:39:21 2016 -0500

----------------------------------------------------------------------
 .../main/scala/org/apache/spark/SparkConf.scala  |  6 +++---
 .../spark/launcher/WorkerCommandBuilder.scala    |  1 -
 .../spark/launcher/AbstractCommandBuilder.java   |  3 ++-
 .../spark/launcher/SparkClassCommandBuilder.java | 13 ++++++++++---
 .../launcher/SparkSubmitCommandBuilder.java      | 19 +++++++++++++++----
 .../launcher/SparkSubmitCommandBuilderSuite.java |  4 +---
 .../org/apache/spark/deploy/yarn/Client.scala    |  8 ++++----
 .../spark/deploy/yarn/ExecutorRunnable.scala     |  1 -
 8 files changed, 35 insertions(+), 20 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/033d8081/core/src/main/scala/org/apache/spark/SparkConf.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/SparkConf.scala 
b/core/src/main/scala/org/apache/spark/SparkConf.scala
index e0fd248..acce6bc 100644
--- a/core/src/main/scala/org/apache/spark/SparkConf.scala
+++ b/core/src/main/scala/org/apache/spark/SparkConf.scala
@@ -456,9 +456,9 @@ class SparkConf(loadDefaults: Boolean) extends Cloneable 
with Logging {
           "Set them directly on a SparkConf or in a properties file when using 
./bin/spark-submit."
         throw new Exception(msg)
       }
-      if (javaOpts.contains("-Xmx") || javaOpts.contains("-Xms")) {
-        val msg = s"$executorOptsKey is not allowed to alter memory settings 
(was '$javaOpts'). " +
-          "Use spark.executor.memory instead."
+      if (javaOpts.contains("-Xmx")) {
+        val msg = s"$executorOptsKey is not allowed to specify max heap memory 
settings " +
+          s"(was '$javaOpts'). Use spark.executor.memory instead."
         throw new Exception(msg)
       }
     }

http://git-wip-us.apache.org/repos/asf/spark/blob/033d8081/core/src/main/scala/org/apache/spark/launcher/WorkerCommandBuilder.scala
----------------------------------------------------------------------
diff --git 
a/core/src/main/scala/org/apache/spark/launcher/WorkerCommandBuilder.scala 
b/core/src/main/scala/org/apache/spark/launcher/WorkerCommandBuilder.scala
index a2add61..31b9c5e 100644
--- a/core/src/main/scala/org/apache/spark/launcher/WorkerCommandBuilder.scala
+++ b/core/src/main/scala/org/apache/spark/launcher/WorkerCommandBuilder.scala
@@ -37,7 +37,6 @@ private[spark] class WorkerCommandBuilder(sparkHome: String, 
memoryMb: Int, comm
 
   override def buildCommand(env: JMap[String, String]): JList[String] = {
     val cmd = 
buildJavaCommand(command.classPathEntries.mkString(File.pathSeparator))
-    cmd.add(s"-Xms${memoryMb}M")
     cmd.add(s"-Xmx${memoryMb}M")
     command.javaOpts.foreach(cmd.add)
     CommandBuilderUtils.addPermGenSizeOpt(cmd)

http://git-wip-us.apache.org/repos/asf/spark/blob/033d8081/launcher/src/main/java/org/apache/spark/launcher/AbstractCommandBuilder.java
----------------------------------------------------------------------
diff --git 
a/launcher/src/main/java/org/apache/spark/launcher/AbstractCommandBuilder.java 
b/launcher/src/main/java/org/apache/spark/launcher/AbstractCommandBuilder.java
index 7a5e37c..c748808 100644
--- 
a/launcher/src/main/java/org/apache/spark/launcher/AbstractCommandBuilder.java
+++ 
b/launcher/src/main/java/org/apache/spark/launcher/AbstractCommandBuilder.java
@@ -74,7 +74,8 @@ abstract class AbstractCommandBuilder {
    *            SparkLauncher constructor that takes an environment), and may 
be modified to
    *            include other variables needed by the process to be executed.
    */
-  abstract List<String> buildCommand(Map<String, String> env) throws 
IOException;
+  abstract List<String> buildCommand(Map<String, String> env)
+      throws IOException, IllegalArgumentException;
 
   /**
    * Builds a list of arguments to run java.

http://git-wip-us.apache.org/repos/asf/spark/blob/033d8081/launcher/src/main/java/org/apache/spark/launcher/SparkClassCommandBuilder.java
----------------------------------------------------------------------
diff --git 
a/launcher/src/main/java/org/apache/spark/launcher/SparkClassCommandBuilder.java
 
b/launcher/src/main/java/org/apache/spark/launcher/SparkClassCommandBuilder.java
index 6b9d36c..82b593a 100644
--- 
a/launcher/src/main/java/org/apache/spark/launcher/SparkClassCommandBuilder.java
+++ 
b/launcher/src/main/java/org/apache/spark/launcher/SparkClassCommandBuilder.java
@@ -41,7 +41,8 @@ class SparkClassCommandBuilder extends AbstractCommandBuilder 
{
   }
 
   @Override
-  public List<String> buildCommand(Map<String, String> env) throws IOException 
{
+  public List<String> buildCommand(Map<String, String> env)
+      throws IOException, IllegalArgumentException {
     List<String> javaOptsKeys = new ArrayList<>();
     String memKey = null;
     String extraClassPath = null;
@@ -80,12 +81,18 @@ class SparkClassCommandBuilder extends 
AbstractCommandBuilder {
     }
 
     List<String> cmd = buildJavaCommand(extraClassPath);
+
     for (String key : javaOptsKeys) {
-      addOptionString(cmd, System.getenv(key));
+      String envValue = System.getenv(key);
+      if (!isEmpty(envValue) && envValue.contains("Xmx")) {
+        String msg = String.format("%s is not allowed to specify max heap(Xmx) 
memory settings " +
+                "(was %s). Use the corresponding configuration instead.", key, 
envValue);
+        throw new IllegalArgumentException(msg);
+      }
+      addOptionString(cmd, envValue);
     }
 
     String mem = firstNonEmpty(memKey != null ? System.getenv(memKey) : null, 
DEFAULT_MEM);
-    cmd.add("-Xms" + mem);
     cmd.add("-Xmx" + mem);
     addPermGenSizeOpt(cmd);
     cmd.add(className);

http://git-wip-us.apache.org/repos/asf/spark/blob/033d8081/launcher/src/main/java/org/apache/spark/launcher/SparkSubmitCommandBuilder.java
----------------------------------------------------------------------
diff --git 
a/launcher/src/main/java/org/apache/spark/launcher/SparkSubmitCommandBuilder.java
 
b/launcher/src/main/java/org/apache/spark/launcher/SparkSubmitCommandBuilder.java
index c31c42c..6941ca9 100644
--- 
a/launcher/src/main/java/org/apache/spark/launcher/SparkSubmitCommandBuilder.java
+++ 
b/launcher/src/main/java/org/apache/spark/launcher/SparkSubmitCommandBuilder.java
@@ -132,7 +132,8 @@ class SparkSubmitCommandBuilder extends 
AbstractCommandBuilder {
   }
 
   @Override
-  public List<String> buildCommand(Map<String, String> env) throws IOException 
{
+  public List<String> buildCommand(Map<String, String> env)
+      throws IOException, IllegalArgumentException {
     if (PYSPARK_SHELL_RESOURCE.equals(appResource) && !printInfo) {
       return buildPySparkShellCommand(env);
     } else if (SPARKR_SHELL_RESOURCE.equals(appResource) && !printInfo) {
@@ -211,7 +212,8 @@ class SparkSubmitCommandBuilder extends 
AbstractCommandBuilder {
     return args;
   }
 
-  private List<String> buildSparkSubmitCommand(Map<String, String> env) throws 
IOException {
+  private List<String> buildSparkSubmitCommand(Map<String, String> env)
+      throws IOException, IllegalArgumentException {
     // Load the properties file and check whether spark-submit will be running 
the app's driver
     // or just launching a cluster app. When running the driver, the JVM's 
argument will be
     // modified to cover the driver's configuration.
@@ -227,6 +229,16 @@ class SparkSubmitCommandBuilder extends 
AbstractCommandBuilder {
     addOptionString(cmd, System.getenv("SPARK_SUBMIT_OPTS"));
     addOptionString(cmd, System.getenv("SPARK_JAVA_OPTS"));
 
+    // We don't want the client to specify Xmx. These have to be set by their 
corresponding
+    // memory flag --driver-memory or configuration entry spark.driver.memory
+    String driverExtraJavaOptions = 
config.get(SparkLauncher.DRIVER_EXTRA_JAVA_OPTIONS);
+    if (!isEmpty(driverExtraJavaOptions) && 
driverExtraJavaOptions.contains("Xmx")) {
+      String msg = String.format("Not allowed to specify max heap(Xmx) memory 
settings through " +
+                   "java options (was %s). Use the corresponding 
--driver-memory or " +
+                   "spark.driver.memory configuration instead.", 
driverExtraJavaOptions);
+      throw new IllegalArgumentException(msg);
+    }
+
     if (isClientMode) {
       // Figuring out where the memory value come from is a little tricky due 
to precedence.
       // Precedence is observed in the following order:
@@ -240,9 +252,8 @@ class SparkSubmitCommandBuilder extends 
AbstractCommandBuilder {
         isThriftServer(mainClass) ? System.getenv("SPARK_DAEMON_MEMORY") : 
null;
       String memory = firstNonEmpty(tsMemory, 
config.get(SparkLauncher.DRIVER_MEMORY),
         System.getenv("SPARK_DRIVER_MEMORY"), System.getenv("SPARK_MEM"), 
DEFAULT_MEM);
-      cmd.add("-Xms" + memory);
       cmd.add("-Xmx" + memory);
-      addOptionString(cmd, 
config.get(SparkLauncher.DRIVER_EXTRA_JAVA_OPTIONS));
+      addOptionString(cmd, driverExtraJavaOptions);
       mergeEnvPathList(env, getLibPathEnvName(),
         config.get(SparkLauncher.DRIVER_EXTRA_LIBRARY_PATH));
     }

http://git-wip-us.apache.org/repos/asf/spark/blob/033d8081/launcher/src/test/java/org/apache/spark/launcher/SparkSubmitCommandBuilderSuite.java
----------------------------------------------------------------------
diff --git 
a/launcher/src/test/java/org/apache/spark/launcher/SparkSubmitCommandBuilderSuite.java
 
b/launcher/src/test/java/org/apache/spark/launcher/SparkSubmitCommandBuilderSuite.java
index 29cbbe8..c7e8b2e 100644
--- 
a/launcher/src/test/java/org/apache/spark/launcher/SparkSubmitCommandBuilderSuite.java
+++ 
b/launcher/src/test/java/org/apache/spark/launcher/SparkSubmitCommandBuilderSuite.java
@@ -79,7 +79,6 @@ public class SparkSubmitCommandBuilderSuite extends BaseSuite 
{
     
assertTrue(findInStringList(env.get(CommandBuilderUtils.getLibPathEnvName()),
         File.pathSeparator, "/driverLibPath"));
     assertTrue(findInStringList(findArgValue(cmd, "-cp"), File.pathSeparator, 
"/driverCp"));
-    assertTrue("Driver -Xms should be configured.", cmd.contains("-Xms42g"));
     assertTrue("Driver -Xmx should be configured.", cmd.contains("-Xmx42g"));
     assertTrue("Command should contain user-defined conf.",
       Collections.indexOfSubList(cmd, Arrays.asList(parser.CONF, 
"spark.randomOption=foo")) > 0);
@@ -202,12 +201,11 @@ public class SparkSubmitCommandBuilderSuite extends 
BaseSuite {
     // Checks below are different for driver and non-driver mode.
 
     if (isDriver) {
-      assertTrue("Driver -Xms should be configured.", cmd.contains("-Xms1g"));
       assertTrue("Driver -Xmx should be configured.", cmd.contains("-Xmx1g"));
     } else {
       boolean found = false;
       for (String arg : cmd) {
-        if (arg.startsWith("-Xms") || arg.startsWith("-Xmx")) {
+        if (arg.startsWith("-Xmx")) {
           found = true;
           break;
         }

http://git-wip-us.apache.org/repos/asf/spark/blob/033d8081/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala
----------------------------------------------------------------------
diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala 
b/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala
index 5e7e3be..04e91f8 100644
--- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala
+++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala
@@ -839,16 +839,16 @@ private[spark] class Client(
       // Validate and include yarn am specific java options in yarn-client 
mode.
       sparkConf.get(AM_JAVA_OPTIONS).foreach { opts =>
         if (opts.contains("-Dspark")) {
-          val msg = s"$${amJavaOptions.key} is not allowed to set Spark 
options (was '$opts'). "
+          val msg = s"${AM_JAVA_OPTIONS.key} is not allowed to set Spark 
options (was '$opts')."
           throw new SparkException(msg)
         }
-        if (opts.contains("-Xmx") || opts.contains("-Xms")) {
-          val msg = s"$${amJavaOptions.key} is not allowed to alter memory 
settings (was '$opts')."
+        if (opts.contains("-Xmx")) {
+          val msg = s"${AM_JAVA_OPTIONS.key} is not allowed to specify max 
heap memory settings " +
+            s"(was '$opts'). Use spark.yarn.am.memory instead."
           throw new SparkException(msg)
         }
         javaOpts ++= 
Utils.splitCommandString(opts).map(YarnSparkHadoopUtil.escapeForShell)
       }
-
       sparkConf.get(AM_LIBRARY_PATH).foreach { paths =>
         prefixEnv = Some(getClusterPath(sparkConf, 
Utils.libraryPathEnvPrefix(Seq(paths))))
       }

http://git-wip-us.apache.org/repos/asf/spark/blob/033d8081/yarn/src/main/scala/org/apache/spark/deploy/yarn/ExecutorRunnable.scala
----------------------------------------------------------------------
diff --git 
a/yarn/src/main/scala/org/apache/spark/deploy/yarn/ExecutorRunnable.scala 
b/yarn/src/main/scala/org/apache/spark/deploy/yarn/ExecutorRunnable.scala
index 7b55d78..ef7908a 100644
--- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/ExecutorRunnable.scala
+++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/ExecutorRunnable.scala
@@ -147,7 +147,6 @@ private[yarn] class ExecutorRunnable(
 
     // Set the JVM memory
     val executorMemoryString = executorMemory + "m"
-    javaOpts += "-Xms" + executorMemoryString
     javaOpts += "-Xmx" + executorMemoryString
 
     // Set extra Java options for the executor, if defined


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to