Repository: spark
Updated Branches:
  refs/heads/master c081b21b1 -> b2047b55c


SPARK-4585. Spark dynamic executor allocation should use minExecutors as...

... initial number

Author: Sandy Ryza <sa...@cloudera.com>

Closes #4051 from sryza/sandy-spark-4585 and squashes the following commits:

d1dd039 [Sandy Ryza] Add spark.dynamicAllocation.initialNumExecutors and make 
min and max not required
b7c59dc [Sandy Ryza] SPARK-4585. Spark dynamic executor allocation should use 
minExecutors as initial number


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/b2047b55
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/b2047b55
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/b2047b55

Branch: refs/heads/master
Commit: b2047b55c5fc85de6b63276d8ab9610d2496e08b
Parents: c081b21
Author: Sandy Ryza <sa...@cloudera.com>
Authored: Mon Feb 2 12:27:08 2015 -0800
Committer: Andrew Or <and...@databricks.com>
Committed: Mon Feb 2 12:27:08 2015 -0800

----------------------------------------------------------------------
 .../spark/ExecutorAllocationManager.scala       | 14 ++++++++------
 .../spark/ExecutorAllocationManagerSuite.scala  | 15 +++++++--------
 docs/configuration.md                           | 20 ++++++++++++++------
 docs/job-scheduling.md                          |  9 ++++-----
 .../spark/deploy/yarn/ClientArguments.scala     | 17 +++++++++++++----
 5 files changed, 46 insertions(+), 29 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/b2047b55/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala
----------------------------------------------------------------------
diff --git 
a/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala 
b/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala
index b28da19..5d5288b 100644
--- a/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala
+++ b/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala
@@ -49,6 +49,7 @@ import org.apache.spark.scheduler._
  *   spark.dynamicAllocation.enabled - Whether this feature is enabled
  *   spark.dynamicAllocation.minExecutors - Lower bound on the number of 
executors
  *   spark.dynamicAllocation.maxExecutors - Upper bound on the number of 
executors
+ *   spark.dynamicAllocation.initialExecutors - Number of executors to start 
with
  *
  *   spark.dynamicAllocation.schedulerBacklogTimeout (M) -
  *     If there are backlogged tasks for this duration, add new executors
@@ -70,9 +71,10 @@ private[spark] class ExecutorAllocationManager(
 
   import ExecutorAllocationManager._
 
-  // Lower and upper bounds on the number of executors. These are required.
-  private val minNumExecutors = 
conf.getInt("spark.dynamicAllocation.minExecutors", -1)
-  private val maxNumExecutors = 
conf.getInt("spark.dynamicAllocation.maxExecutors", -1)
+  // Lower and upper bounds on the number of executors.
+  private val minNumExecutors = 
conf.getInt("spark.dynamicAllocation.minExecutors", 0)
+  private val maxNumExecutors = 
conf.getInt("spark.dynamicAllocation.maxExecutors",
+    Integer.MAX_VALUE)
 
   // How long there must be backlogged tasks for before an addition is 
triggered
   private val schedulerBacklogTimeout = conf.getLong(
@@ -132,10 +134,10 @@ private[spark] class ExecutorAllocationManager(
    */
   private def validateSettings(): Unit = {
     if (minNumExecutors < 0 || maxNumExecutors < 0) {
-      throw new SparkException("spark.dynamicAllocation.{min/max}Executors 
must be set!")
+      throw new SparkException("spark.dynamicAllocation.{min/max}Executors 
must be positive!")
     }
-    if (minNumExecutors == 0 || maxNumExecutors == 0) {
-      throw new SparkException("spark.dynamicAllocation.{min/max}Executors 
cannot be 0!")
+    if (maxNumExecutors == 0) {
+      throw new SparkException("spark.dynamicAllocation.maxExecutors cannot be 
0!")
     }
     if (minNumExecutors > maxNumExecutors) {
       throw new SparkException(s"spark.dynamicAllocation.minExecutors 
($minNumExecutors) must " +

http://git-wip-us.apache.org/repos/asf/spark/blob/b2047b55/core/src/test/scala/org/apache/spark/ExecutorAllocationManagerSuite.scala
----------------------------------------------------------------------
diff --git 
a/core/src/test/scala/org/apache/spark/ExecutorAllocationManagerSuite.scala 
b/core/src/test/scala/org/apache/spark/ExecutorAllocationManagerSuite.scala
index 0e4df17..57081dd 100644
--- a/core/src/test/scala/org/apache/spark/ExecutorAllocationManagerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/ExecutorAllocationManagerSuite.scala
@@ -32,24 +32,23 @@ class ExecutorAllocationManagerSuite extends FunSuite with 
LocalSparkContext {
   import ExecutorAllocationManagerSuite._
 
   test("verify min/max executors") {
-    // No min or max
     val conf = new SparkConf()
       .setMaster("local")
       .setAppName("test-executor-allocation-manager")
       .set("spark.dynamicAllocation.enabled", "true")
       .set("spark.dynamicAllocation.testing", "true")
-    intercept[SparkException] { new SparkContext(conf) }
-    SparkEnv.get.stop() // cleanup the created environment
-    SparkContext.clearActiveContext()
+    val sc0 = new SparkContext(conf)
+    assert(sc0.executorAllocationManager.isDefined)
+    sc0.stop()
 
-    // Only min
-    val conf1 = conf.clone().set("spark.dynamicAllocation.minExecutors", "1")
+    // Min < 0
+    val conf1 = conf.clone().set("spark.dynamicAllocation.minExecutors", "-1")
     intercept[SparkException] { new SparkContext(conf1) }
     SparkEnv.get.stop()
     SparkContext.clearActiveContext()
 
-    // Only max
-    val conf2 = conf.clone().set("spark.dynamicAllocation.maxExecutors", "2")
+    // Max < 0
+    val conf2 = conf.clone().set("spark.dynamicAllocation.maxExecutors", "-1")
     intercept[SparkException] { new SparkContext(conf2) }
     SparkEnv.get.stop()
     SparkContext.clearActiveContext()

http://git-wip-us.apache.org/repos/asf/spark/blob/b2047b55/docs/configuration.md
----------------------------------------------------------------------
diff --git a/docs/configuration.md b/docs/configuration.md
index e4e4b8d..08c6bef 100644
--- a/docs/configuration.md
+++ b/docs/configuration.md
@@ -1098,24 +1098,32 @@ Apart from these, the following properties are also 
available, and may be useful
     available on YARN mode. For more detail, see the description
     <a href="job-scheduling.html#dynamic-resource-allocation">here</a>.
     <br><br>
-    This requires the following configurations to be set:
+    This requires <code>spark.shuffle.service.enabled</code> to be set.
+    The following configurations are also relevant:
     <code>spark.dynamicAllocation.minExecutors</code>,
     <code>spark.dynamicAllocation.maxExecutors</code>, and
-    <code>spark.shuffle.service.enabled</code>
+    <code>spark.dynamicAllocation.initialExecutors</code>
   </td>
 </tr>
 <tr>
   <td><code>spark.dynamicAllocation.minExecutors</code></td>
-  <td>(none)</td>
+  <td>0</td>
   <td>
-    Lower bound for the number of executors if dynamic allocation is enabled 
(required).
+    Lower bound for the number of executors if dynamic allocation is enabled.
   </td>
 </tr>
 <tr>
   <td><code>spark.dynamicAllocation.maxExecutors</code></td>
-  <td>(none)</td>
+  <td>Integer.MAX_VALUE</td>
+  <td>
+    Upper bound for the number of executors if dynamic allocation is enabled.
+  </td>
+</tr>
+<tr>
+  <td><code>spark.dynamicAllocation.maxExecutors</code></td>
+  <td><code>spark.dynamicAllocation.minExecutors</code></td>
   <td>
-    Upper bound for the number of executors if dynamic allocation is enabled 
(required).
+    Initial number of executors to run if dynamic allocation is enabled.
   </td>
 </tr>
 <tr>

http://git-wip-us.apache.org/repos/asf/spark/blob/b2047b55/docs/job-scheduling.md
----------------------------------------------------------------------
diff --git a/docs/job-scheduling.md b/docs/job-scheduling.md
index a5425eb..5295e35 100644
--- a/docs/job-scheduling.md
+++ b/docs/job-scheduling.md
@@ -77,11 +77,10 @@ scheduling while sharing cluster resources efficiently.
 ### Configuration and Setup
 
 All configurations used by this feature live under the 
`spark.dynamicAllocation.*` namespace.
-To enable this feature, your application must set 
`spark.dynamicAllocation.enabled` to `true` and
-provide lower and upper bounds for the number of executors through
-`spark.dynamicAllocation.minExecutors` and 
`spark.dynamicAllocation.maxExecutors`. Other relevant
-configurations are described on the [configurations 
page](configuration.html#dynamic-allocation)
-and in the subsequent sections in detail.
+To enable this feature, your application must set 
`spark.dynamicAllocation.enabled` to `true`.
+Other relevant configurations are described on the
+[configurations page](configuration.html#dynamic-allocation) and in the 
subsequent sections in
+detail.
 
 Additionally, your application must use an external shuffle service. The 
purpose of the service is
 to preserve the shuffle files written by executors so the executors can be 
safely removed (more

http://git-wip-us.apache.org/repos/asf/spark/blob/b2047b55/yarn/src/main/scala/org/apache/spark/deploy/yarn/ClientArguments.scala
----------------------------------------------------------------------
diff --git 
a/yarn/src/main/scala/org/apache/spark/deploy/yarn/ClientArguments.scala 
b/yarn/src/main/scala/org/apache/spark/deploy/yarn/ClientArguments.scala
index f96b245..5eb2023 100644
--- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/ClientArguments.scala
+++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/ClientArguments.scala
@@ -75,14 +75,23 @@ private[spark] class ClientArguments(args: Array[String], 
sparkConf: SparkConf)
       .orElse(sparkConf.getOption("spark.yarn.dist.archives").map(p => 
Utils.resolveURIs(p)))
       .orElse(sys.env.get("SPARK_YARN_DIST_ARCHIVES"))
       .orNull
-    // If dynamic allocation is enabled, start at the max number of executors
+    // If dynamic allocation is enabled, start at the configured initial 
number of executors.
+    // Default to minExecutors if no initialExecutors is set.
     if (isDynamicAllocationEnabled) {
+      val minExecutorsConf = "spark.dynamicAllocation.minExecutors"
+      val initialExecutorsConf = "spark.dynamicAllocation.initialExecutors"
       val maxExecutorsConf = "spark.dynamicAllocation.maxExecutors"
-      if (!sparkConf.contains(maxExecutorsConf)) {
+      val minNumExecutors = sparkConf.getInt(minExecutorsConf, 0)
+      val initialNumExecutors = sparkConf.getInt(initialExecutorsConf, 
minNumExecutors)
+      val maxNumExecutors = sparkConf.getInt(maxExecutorsConf, 
Integer.MAX_VALUE)
+
+      // If defined, initial executors must be between min and max
+      if (initialNumExecutors < minNumExecutors || initialNumExecutors > 
maxNumExecutors) {
         throw new IllegalArgumentException(
-          s"$maxExecutorsConf must be set if dynamic allocation is enabled!")
+          s"$initialExecutorsConf must be between $minExecutorsConf and 
$maxNumExecutors!")
       }
-      numExecutors = sparkConf.get(maxExecutorsConf).toInt
+
+      numExecutors = initialNumExecutors
     }
   }
 


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to