Repository: spark
Updated Branches:
  refs/heads/branch-2.0 b8818d892 -> 2ce240cfe


[SPARK-13723][YARN] Change behavior of --num-executors with dynamic allocation.

## What changes were proposed in this pull request?

This changes the behavior of --num-executors and spark.executor.instances when 
using dynamic allocation. Instead of turning dynamic allocation off, it uses 
the value for the initial number of executors.

This changes was discussed on 
[SPARK-13723](https://issues.apache.org/jira/browse/SPARK-13723). I highly 
recommend using it while we can change the behavior for 2.0.0. In practice, the 
1.x behavior causes unexpected behavior for users (it is not clear that it 
disables dynamic allocation) and wastes cluster resources because users rarely 
notice the log message.

## How was this patch tested?

This patch updates tests and adds a test for 
Utils.getDynamicAllocationInitialExecutors.

Author: Ryan Blue <[email protected]>

Closes #13338 from rdblue/SPARK-13723-num-executors-with-dynamic-allocation.

(cherry picked from commit 738f134bf4bf07bafb17e7066cf1a36e315872c2)
Signed-off-by: Tom Graves <[email protected]>


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/2ce240cf
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/2ce240cf
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/2ce240cf

Branch: refs/heads/branch-2.0
Commit: 2ce240cfe0cbcb944d225b2455a9cb2e806699f0
Parents: b8818d8
Author: Ryan Blue <[email protected]>
Authored: Thu Jun 23 14:03:46 2016 -0500
Committer: Tom Graves <[email protected]>
Committed: Thu Jun 23 14:03:58 2016 -0500

----------------------------------------------------------------------
 .../spark/ExecutorAllocationManager.scala       | 11 +++++-----
 .../spark/deploy/SparkSubmitArguments.scala     |  2 ++
 .../scala/org/apache/spark/util/Utils.scala     | 22 ++++++++++++--------
 .../org/apache/spark/util/UtilsSuite.scala      | 18 +++++++++++++++-
 docs/configuration.md                           |  3 +++
 docs/running-on-yarn.md                         |  2 +-
 .../spark/deploy/yarn/YarnSparkHadoopUtil.scala |  2 +-
 7 files changed, 42 insertions(+), 18 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/2ce240cf/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala
----------------------------------------------------------------------
diff --git 
a/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala 
b/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala
index 0926d05..932ba16 100644
--- a/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala
+++ b/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala
@@ -25,9 +25,10 @@ import scala.util.control.ControlThrowable
 import com.codahale.metrics.{Gauge, MetricRegistry}
 
 import org.apache.spark.internal.Logging
+import org.apache.spark.internal.config.{DYN_ALLOCATION_MAX_EXECUTORS, 
DYN_ALLOCATION_MIN_EXECUTORS}
 import org.apache.spark.metrics.source.Source
 import org.apache.spark.scheduler._
-import org.apache.spark.util.{Clock, SystemClock, ThreadUtils}
+import org.apache.spark.util.{Clock, SystemClock, ThreadUtils, Utils}
 
 /**
  * An agent that dynamically allocates and removes executors based on the 
workload.
@@ -87,11 +88,9 @@ private[spark] class ExecutorAllocationManager(
   import ExecutorAllocationManager._
 
   // Lower and upper bounds on the number of executors.
-  private val minNumExecutors = 
conf.getInt("spark.dynamicAllocation.minExecutors", 0)
-  private val maxNumExecutors = 
conf.getInt("spark.dynamicAllocation.maxExecutors",
-    Integer.MAX_VALUE)
-  private val initialNumExecutors = 
conf.getInt("spark.dynamicAllocation.initialExecutors",
-    minNumExecutors)
+  private val minNumExecutors = conf.get(DYN_ALLOCATION_MIN_EXECUTORS)
+  private val maxNumExecutors = conf.get(DYN_ALLOCATION_MAX_EXECUTORS)
+  private val initialNumExecutors = 
Utils.getDynamicAllocationInitialExecutors(conf)
 
   // How long there must be backlogged tasks for before an addition is 
triggered (seconds)
   private val schedulerBacklogTimeoutS = conf.getTimeAsSeconds(

http://git-wip-us.apache.org/repos/asf/spark/blob/2ce240cf/core/src/main/scala/org/apache/spark/deploy/SparkSubmitArguments.scala
----------------------------------------------------------------------
diff --git 
a/core/src/main/scala/org/apache/spark/deploy/SparkSubmitArguments.scala 
b/core/src/main/scala/org/apache/spark/deploy/SparkSubmitArguments.scala
index 206c130..f1761e7 100644
--- a/core/src/main/scala/org/apache/spark/deploy/SparkSubmitArguments.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/SparkSubmitArguments.scala
@@ -550,6 +550,8 @@ private[deploy] class SparkSubmitArguments(args: 
Seq[String], env: Map[String, S
         |                              (Default: 1).
         |  --queue QUEUE_NAME          The YARN queue to submit to (Default: 
"default").
         |  --num-executors NUM         Number of executors to launch (Default: 
2).
+        |                              If dynamic allocation is enabled, the 
initial number of
+        |                              executors will be at least NUM.
         |  --archives ARCHIVES         Comma separated list of archives to be 
extracted into the
         |                              working directory of each executor.
         |  --principal PRINCIPAL       Principal to be used to login to KDC, 
while running on

http://git-wip-us.apache.org/repos/asf/spark/blob/2ce240cf/core/src/main/scala/org/apache/spark/util/Utils.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/util/Utils.scala 
b/core/src/main/scala/org/apache/spark/util/Utils.scala
index 17d193b..f77cc2f 100644
--- a/core/src/main/scala/org/apache/spark/util/Utils.scala
+++ b/core/src/main/scala/org/apache/spark/util/Utils.scala
@@ -52,6 +52,7 @@ import org.slf4j.Logger
 import org.apache.spark._
 import org.apache.spark.deploy.SparkHadoopUtil
 import org.apache.spark.internal.Logging
+import org.apache.spark.internal.config.{DYN_ALLOCATION_INITIAL_EXECUTORS, 
DYN_ALLOCATION_MIN_EXECUTORS, EXECUTOR_INSTANCES}
 import org.apache.spark.network.util.JavaUtils
 import org.apache.spark.serializer.{DeserializationStream, 
SerializationStream, SerializerInstance}
 
@@ -2309,21 +2310,24 @@ private[spark] object Utils extends Logging {
   }
 
   /**
-   * Return whether dynamic allocation is enabled in the given conf
-   * Dynamic allocation and explicitly setting the number of executors are 
inherently
-   * incompatible. In environments where dynamic allocation is turned on by 
default,
-   * the latter should override the former (SPARK-9092).
+   * Return whether dynamic allocation is enabled in the given conf.
    */
   def isDynamicAllocationEnabled(conf: SparkConf): Boolean = {
-    val numExecutor = conf.getInt("spark.executor.instances", 0)
     val dynamicAllocationEnabled = 
conf.getBoolean("spark.dynamicAllocation.enabled", false)
-    if (numExecutor != 0 && dynamicAllocationEnabled) {
-      logWarning("Dynamic Allocation and num executors both set, thus dynamic 
allocation disabled.")
-    }
-    numExecutor == 0 && dynamicAllocationEnabled &&
+    dynamicAllocationEnabled &&
       (!isLocalMaster(conf) || 
conf.getBoolean("spark.dynamicAllocation.testing", false))
   }
 
+  /**
+   * Return the initial number of executors for dynamic allocation.
+   */
+  def getDynamicAllocationInitialExecutors(conf: SparkConf): Int = {
+    Seq(
+      conf.get(DYN_ALLOCATION_MIN_EXECUTORS),
+      conf.get(DYN_ALLOCATION_INITIAL_EXECUTORS),
+      conf.get(EXECUTOR_INSTANCES).getOrElse(0)).max
+  }
+
   def tryWithResource[R <: Closeable, T](createResource: => R)(f: R => T): T = 
{
     val resource = createResource
     try f.apply(resource) finally resource.close()

http://git-wip-us.apache.org/repos/asf/spark/blob/2ce240cf/core/src/test/scala/org/apache/spark/util/UtilsSuite.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/spark/util/UtilsSuite.scala 
b/core/src/test/scala/org/apache/spark/util/UtilsSuite.scala
index a5363f0..e3a8e83 100644
--- a/core/src/test/scala/org/apache/spark/util/UtilsSuite.scala
+++ b/core/src/test/scala/org/apache/spark/util/UtilsSuite.scala
@@ -761,13 +761,29 @@ class UtilsSuite extends SparkFunSuite with 
ResetSystemProperties with Logging {
     assert(Utils.isDynamicAllocationEnabled(
       conf.set("spark.dynamicAllocation.enabled", "true")) === true)
     assert(Utils.isDynamicAllocationEnabled(
-      conf.set("spark.executor.instances", "1")) === false)
+      conf.set("spark.executor.instances", "1")) === true)
     assert(Utils.isDynamicAllocationEnabled(
       conf.set("spark.executor.instances", "0")) === true)
     assert(Utils.isDynamicAllocationEnabled(conf.set("spark.master", "local")) 
=== false)
     
assert(Utils.isDynamicAllocationEnabled(conf.set("spark.dynamicAllocation.testing",
 "true")))
   }
 
+  test("getDynamicAllocationInitialExecutors") {
+    val conf = new SparkConf()
+    assert(Utils.getDynamicAllocationInitialExecutors(conf) === 0)
+    assert(Utils.getDynamicAllocationInitialExecutors(
+      conf.set("spark.dynamicAllocation.minExecutors", "3")) === 3)
+    assert(Utils.getDynamicAllocationInitialExecutors( // should use 
minExecutors
+      conf.set("spark.executor.instances", "2")) === 3)
+    assert(Utils.getDynamicAllocationInitialExecutors( // should use 
executor.instances
+      conf.set("spark.executor.instances", "4")) === 4)
+    assert(Utils.getDynamicAllocationInitialExecutors( // should use 
executor.instances
+      conf.set("spark.dynamicAllocation.initialExecutors", "3")) === 4)
+    assert(Utils.getDynamicAllocationInitialExecutors( // should use 
initialExecutors
+      conf.set("spark.dynamicAllocation.initialExecutors", "5")) === 5)
+  }
+
+
   test("encodeFileNameToURIRawPath") {
     assert(Utils.encodeFileNameToURIRawPath("abc") === "abc")
     assert(Utils.encodeFileNameToURIRawPath("abc xyz") === "abc%20xyz")

http://git-wip-us.apache.org/repos/asf/spark/blob/2ce240cf/docs/configuration.md
----------------------------------------------------------------------
diff --git a/docs/configuration.md b/docs/configuration.md
index fbda91c..cee59cf 100644
--- a/docs/configuration.md
+++ b/docs/configuration.md
@@ -1236,6 +1236,9 @@ Apart from these, the following properties are also 
available, and may be useful
   <td><code>spark.dynamicAllocation.minExecutors</code></td>
   <td>
     Initial number of executors to run if dynamic allocation is enabled.
+    <br /><br />
+    If `--num-executors` (or `spark.executor.instances`) is set and larger 
than this value, it will
+    be used as the initial number of executors.
   </td>
 </tr>
 <tr>

http://git-wip-us.apache.org/repos/asf/spark/blob/2ce240cf/docs/running-on-yarn.md
----------------------------------------------------------------------
diff --git a/docs/running-on-yarn.md b/docs/running-on-yarn.md
index 9833806..dbd46cc 100644
--- a/docs/running-on-yarn.md
+++ b/docs/running-on-yarn.md
@@ -244,7 +244,7 @@ To use a custom metrics.properties for the application 
master and executors, upd
  <td><code>spark.executor.instances</code></td>
   <td><code>2</code></td>
   <td>
-    The number of executors. Note that this property is incompatible with 
<code>spark.dynamicAllocation.enabled</code>. If both 
<code>spark.dynamicAllocation.enabled</code> and 
<code>spark.executor.instances</code> are specified, dynamic allocation is 
turned off and the specified number of <code>spark.executor.instances</code> is 
used.
+    The number of executors for static allocation. With 
<code>spark.dynamicAllocation.enabled</code>, the initial set of executors will 
be at least this large.
   </td>
 </tr>
 <tr>

http://git-wip-us.apache.org/repos/asf/spark/blob/2ce240cf/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtil.scala
----------------------------------------------------------------------
diff --git 
a/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtil.scala 
b/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtil.scala
index de6cd94..156a7a3 100644
--- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtil.scala
+++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtil.scala
@@ -520,7 +520,7 @@ object YarnSparkHadoopUtil {
       numExecutors: Int = DEFAULT_NUMBER_EXECUTORS): Int = {
     if (Utils.isDynamicAllocationEnabled(conf)) {
       val minNumExecutors = conf.get(DYN_ALLOCATION_MIN_EXECUTORS)
-      val initialNumExecutors = conf.get(DYN_ALLOCATION_INITIAL_EXECUTORS)
+      val initialNumExecutors = 
Utils.getDynamicAllocationInitialExecutors(conf)
       val maxNumExecutors = conf.get(DYN_ALLOCATION_MAX_EXECUTORS)
       require(initialNumExecutors >= minNumExecutors && initialNumExecutors <= 
maxNumExecutors,
         s"initial executor number $initialNumExecutors must between min 
executor number " +


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to