Repository: spark
Updated Branches:
  refs/heads/branch-2.3 6eee545f9 -> 30242b664


[SPARK-23365][CORE] Do not adjust num executors when killing idle executors.

The ExecutorAllocationManager should not adjust the target number of
executors when killing idle executors, as it has already adjusted the
target number down based on the task backlog.

The name `replace` was misleading with DynamicAllocation on, as the target 
number
of executors is changed outside of the call to `killExecutors`, so I adjusted 
that name.  Also separated out the logic of `countFailures` as you don't always 
want that tied to `replace`.

While I was there I made two changes that weren't directly related to this:
1) Fixed `countFailures` in a couple cases where it was getting an incorrect 
value since it used to be tied to `replace`, eg. when killing executors on a 
blacklisted node.
2) hard error if you call `sc.killExecutors` with dynamic allocation on, since 
that's another way the ExecutorAllocationManager and the 
CoarseGrainedSchedulerBackend would get out of sync.

Added a unit test case which verifies that the calls to 
ExecutorAllocationClient do not adjust the number of executors.

Author: Imran Rashid <iras...@cloudera.com>

Closes #20604 from squito/SPARK-23365.

(cherry picked from commit ecb8b383af1cf1b67f3111c148229e00c9c17c40)
Signed-off-by: Marcelo Vanzin <van...@cloudera.com>


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/30242b66
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/30242b66
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/30242b66

Branch: refs/heads/branch-2.3
Commit: 30242b664a05120c01b54ba9ac0ebed114f0d54e
Parents: 6eee545
Author: Imran Rashid <iras...@cloudera.com>
Authored: Tue Feb 27 11:12:32 2018 -0800
Committer: Marcelo Vanzin <van...@cloudera.com>
Committed: Tue Feb 27 11:12:54 2018 -0800

----------------------------------------------------------------------
 .../apache/spark/ExecutorAllocationClient.scala | 15 ++---
 .../spark/ExecutorAllocationManager.scala       | 20 ++++--
 .../scala/org/apache/spark/SparkContext.scala   | 13 +++-
 .../spark/scheduler/BlacklistTracker.scala      |  3 +-
 .../cluster/CoarseGrainedSchedulerBackend.scala | 22 ++++---
 .../spark/ExecutorAllocationManagerSuite.scala  | 66 +++++++++++++++++++-
 .../StandaloneDynamicAllocationSuite.scala      |  3 +-
 .../spark/scheduler/BlacklistTrackerSuite.scala | 14 ++---
 8 files changed, 121 insertions(+), 35 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/30242b66/core/src/main/scala/org/apache/spark/ExecutorAllocationClient.scala
----------------------------------------------------------------------
diff --git 
a/core/src/main/scala/org/apache/spark/ExecutorAllocationClient.scala 
b/core/src/main/scala/org/apache/spark/ExecutorAllocationClient.scala
index 9112d93..63d87b4 100644
--- a/core/src/main/scala/org/apache/spark/ExecutorAllocationClient.scala
+++ b/core/src/main/scala/org/apache/spark/ExecutorAllocationClient.scala
@@ -55,18 +55,18 @@ private[spark] trait ExecutorAllocationClient {
   /**
    * Request that the cluster manager kill the specified executors.
    *
-   * When asking the executor to be replaced, the executor loss is considered 
a failure, and
-   * killed tasks that are running on the executor will count towards the 
failure limits. If no
-   * replacement is being requested, then the tasks will not count towards the 
limit.
-   *
    * @param executorIds identifiers of executors to kill
-   * @param replace whether to replace the killed executors with new ones, 
default false
+   * @param adjustTargetNumExecutors whether the target number of executors 
will be adjusted down
+   *                                 after these executors have been killed
+   * @param countFailures if there are tasks running on the executors when 
they are killed, whether
+    *                     to count those failures toward task failure limits
    * @param force whether to force kill busy executors, default false
    * @return the ids of the executors acknowledged by the cluster manager to 
be removed.
    */
   def killExecutors(
     executorIds: Seq[String],
-    replace: Boolean = false,
+    adjustTargetNumExecutors: Boolean,
+    countFailures: Boolean,
     force: Boolean = false): Seq[String]
 
   /**
@@ -81,7 +81,8 @@ private[spark] trait ExecutorAllocationClient {
    * @return whether the request is acknowledged by the cluster manager.
    */
   def killExecutor(executorId: String): Boolean = {
-    val killedExecutors = killExecutors(Seq(executorId))
+    val killedExecutors = killExecutors(Seq(executorId), 
adjustTargetNumExecutors = true,
+      countFailures = false)
     killedExecutors.nonEmpty && killedExecutors(0).equals(executorId)
   }
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/30242b66/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala
----------------------------------------------------------------------
diff --git 
a/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala 
b/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala
index 6c59038..189d913 100644
--- a/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala
+++ b/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala
@@ -29,6 +29,7 @@ import org.apache.spark.internal.Logging
 import org.apache.spark.internal.config.{DYN_ALLOCATION_MAX_EXECUTORS, 
DYN_ALLOCATION_MIN_EXECUTORS}
 import org.apache.spark.metrics.source.Source
 import org.apache.spark.scheduler._
+import org.apache.spark.storage.BlockManagerMaster
 import org.apache.spark.util.{Clock, SystemClock, ThreadUtils, Utils}
 
 /**
@@ -81,7 +82,8 @@ import org.apache.spark.util.{Clock, SystemClock, 
ThreadUtils, Utils}
 private[spark] class ExecutorAllocationManager(
     client: ExecutorAllocationClient,
     listenerBus: LiveListenerBus,
-    conf: SparkConf)
+    conf: SparkConf,
+    blockManagerMaster: BlockManagerMaster)
   extends Logging {
 
   allocationManager =>
@@ -151,7 +153,7 @@ private[spark] class ExecutorAllocationManager(
   private var clock: Clock = new SystemClock()
 
   // Listener for Spark events that impact the allocation policy
-  private val listener = new ExecutorAllocationListener
+  val listener = new ExecutorAllocationListener
 
   // Executor that handles the scheduling task.
   private val executor =
@@ -334,6 +336,11 @@ private[spark] class ExecutorAllocationManager(
 
       // If the new target has not changed, avoid sending a message to the 
cluster manager
       if (numExecutorsTarget < oldNumExecutorsTarget) {
+        // We lower the target number of executors but don't actively kill any 
yet.  Killing is
+        // controlled separately by an idle timeout.  It's still helpful to 
reduce the target number
+        // in case an executor just happens to get lost (eg., bad hardware, or 
the cluster manager
+        // preempts it) -- in that case, there is no point in trying to 
immediately  get a new
+        // executor, since we wouldn't even use it yet.
         client.requestTotalExecutors(numExecutorsTarget, localityAwareTasks, 
hostToLocalTaskCount)
         logDebug(s"Lowering target number of executors to $numExecutorsTarget 
(previously " +
           s"$oldNumExecutorsTarget) because not all requested executors are 
actually needed")
@@ -455,7 +462,10 @@ private[spark] class ExecutorAllocationManager(
     val executorsRemoved = if (testing) {
       executorIdsToBeRemoved
     } else {
-      client.killExecutors(executorIdsToBeRemoved)
+      // We don't want to change our target number of executors, because we 
already did that
+      // when the task backlog decreased.
+      client.killExecutors(executorIdsToBeRemoved, adjustTargetNumExecutors = 
false,
+        countFailures = false, force = false)
     }
     // [SPARK-21834] killExecutors api reduces the target number of executors.
     // So we need to update the target with desired value.
@@ -575,7 +585,7 @@ private[spark] class ExecutorAllocationManager(
         // Note that it is not necessary to query the executors since all the 
cached
         // blocks we are concerned with are reported to the driver. Note that 
this
         // does not include broadcast blocks.
-        val hasCachedBlocks = 
SparkEnv.get.blockManager.master.hasCachedBlocks(executorId)
+        val hasCachedBlocks = blockManagerMaster.hasCachedBlocks(executorId)
         val now = clock.getTimeMillis()
         val timeout = {
           if (hasCachedBlocks) {
@@ -610,7 +620,7 @@ private[spark] class ExecutorAllocationManager(
    * This class is intentionally conservative in its assumptions about the 
relative ordering
    * and consistency of events returned by the listener.
    */
-  private class ExecutorAllocationListener extends SparkListener {
+  private[spark] class ExecutorAllocationListener extends SparkListener {
 
     private val stageIdToNumTasks = new mutable.HashMap[Int, Int]
     // Number of running tasks per stage including speculative tasks.

http://git-wip-us.apache.org/repos/asf/spark/blob/30242b66/core/src/main/scala/org/apache/spark/SparkContext.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala 
b/core/src/main/scala/org/apache/spark/SparkContext.scala
index 3828d4f..f5b560c 100644
--- a/core/src/main/scala/org/apache/spark/SparkContext.scala
+++ b/core/src/main/scala/org/apache/spark/SparkContext.scala
@@ -533,7 +533,8 @@ class SparkContext(config: SparkConf) extends Logging {
         schedulerBackend match {
           case b: ExecutorAllocationClient =>
             Some(new ExecutorAllocationManager(
-              schedulerBackend.asInstanceOf[ExecutorAllocationClient], 
listenerBus, _conf))
+              schedulerBackend.asInstanceOf[ExecutorAllocationClient], 
listenerBus, _conf,
+              _env.blockManager.master))
           case _ =>
             None
         }
@@ -1632,6 +1633,8 @@ class SparkContext(config: SparkConf) extends Logging {
    * :: DeveloperApi ::
    * Request that the cluster manager kill the specified executors.
    *
+   * This is not supported when dynamic allocation is turned on.
+   *
    * @note This is an indication to the cluster manager that the application 
wishes to adjust
    * its resource usage downwards. If the application wishes to replace the 
executors it kills
    * through this method with new ones, it should follow up explicitly with a 
call to
@@ -1643,7 +1646,10 @@ class SparkContext(config: SparkConf) extends Logging {
   def killExecutors(executorIds: Seq[String]): Boolean = {
     schedulerBackend match {
       case b: ExecutorAllocationClient =>
-        b.killExecutors(executorIds, replace = false, force = true).nonEmpty
+        require(executorAllocationManager.isEmpty,
+          "killExecutors() unsupported with Dynamic Allocation turned on")
+        b.killExecutors(executorIds, adjustTargetNumExecutors = true, 
countFailures = false,
+          force = true).nonEmpty
       case _ =>
         logWarning("Killing executors is not supported by current scheduler.")
         false
@@ -1681,7 +1687,8 @@ class SparkContext(config: SparkConf) extends Logging {
   private[spark] def killAndReplaceExecutor(executorId: String): Boolean = {
     schedulerBackend match {
       case b: ExecutorAllocationClient =>
-        b.killExecutors(Seq(executorId), replace = true, force = true).nonEmpty
+        b.killExecutors(Seq(executorId), adjustTargetNumExecutors = false, 
countFailures = true,
+          force = true).nonEmpty
       case _ =>
         logWarning("Killing executors is not supported by current scheduler.")
         false

http://git-wip-us.apache.org/repos/asf/spark/blob/30242b66/core/src/main/scala/org/apache/spark/scheduler/BlacklistTracker.scala
----------------------------------------------------------------------
diff --git 
a/core/src/main/scala/org/apache/spark/scheduler/BlacklistTracker.scala 
b/core/src/main/scala/org/apache/spark/scheduler/BlacklistTracker.scala
index cd8e61d..952598f 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/BlacklistTracker.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/BlacklistTracker.scala
@@ -152,7 +152,8 @@ private[scheduler] class BlacklistTracker (
         case Some(a) =>
           logInfo(s"Killing blacklisted executor id $exec " +
             s"since ${config.BLACKLIST_KILL_ENABLED.key} is set.")
-          a.killExecutors(Seq(exec), true, true)
+          a.killExecutors(Seq(exec), adjustTargetNumExecutors = false, 
countFailures = false,
+            force = true)
         case None =>
           logWarning(s"Not attempting to kill blacklisted executor id $exec " +
             s"since allocation client is not defined.")

http://git-wip-us.apache.org/repos/asf/spark/blob/30242b66/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala
----------------------------------------------------------------------
diff --git 
a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala
 
b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala
index 4d75063..5627a55 100644
--- 
a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala
+++ 
b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala
@@ -147,7 +147,8 @@ class CoarseGrainedSchedulerBackend(scheduler: 
TaskSchedulerImpl, val rpcEnv: Rp
 
       case KillExecutorsOnHost(host) =>
         scheduler.getExecutorsAliveOnHost(host).foreach { exec =>
-          killExecutors(exec.toSeq, replace = true, force = true)
+          killExecutors(exec.toSeq, adjustTargetNumExecutors = false, 
countFailures = false,
+            force = true)
         }
 
       case UpdateDelegationTokens(newDelegationTokens) =>
@@ -584,18 +585,18 @@ class CoarseGrainedSchedulerBackend(scheduler: 
TaskSchedulerImpl, val rpcEnv: Rp
   /**
    * Request that the cluster manager kill the specified executors.
    *
-   * When asking the executor to be replaced, the executor loss is considered 
a failure, and
-   * killed tasks that are running on the executor will count towards the 
failure limits. If no
-   * replacement is being requested, then the tasks will not count towards the 
limit.
-   *
    * @param executorIds identifiers of executors to kill
-   * @param replace whether to replace the killed executors with new ones, 
default false
+   * @param adjustTargetNumExecutors whether the target number of executors be 
adjusted down
+   *                                 after these executors have been killed
+   * @param countFailures if there are tasks running on the executors when 
they are killed, whether
+   *                      those failures be counted to task failure limits?
    * @param force whether to force kill busy executors, default false
    * @return the ids of the executors acknowledged by the cluster manager to 
be removed.
    */
   final override def killExecutors(
       executorIds: Seq[String],
-      replace: Boolean,
+      adjustTargetNumExecutors: Boolean,
+      countFailures: Boolean,
       force: Boolean): Seq[String] = {
     logInfo(s"Requesting to kill executor(s) ${executorIds.mkString(", ")}")
 
@@ -610,7 +611,7 @@ class CoarseGrainedSchedulerBackend(scheduler: 
TaskSchedulerImpl, val rpcEnv: Rp
       val executorsToKill = knownExecutors
         .filter { id => !executorsPendingToRemove.contains(id) }
         .filter { id => force || !scheduler.isExecutorBusy(id) }
-      executorsToKill.foreach { id => executorsPendingToRemove(id) = !replace }
+      executorsToKill.foreach { id => executorsPendingToRemove(id) = 
!countFailures }
 
       logInfo(s"Actual list of executor(s) to be killed is 
${executorsToKill.mkString(", ")}")
 
@@ -618,12 +619,13 @@ class CoarseGrainedSchedulerBackend(scheduler: 
TaskSchedulerImpl, val rpcEnv: Rp
       // with the cluster manager to avoid allocating new ones. When computing 
the new target,
       // take into account executors that are pending to be added or removed.
       val adjustTotalExecutors =
-        if (!replace) {
+        if (adjustTargetNumExecutors) {
           requestedTotalExecutors = math.max(requestedTotalExecutors - 
executorsToKill.size, 0)
           if (requestedTotalExecutors !=
               (numExistingExecutors + numPendingExecutors - 
executorsPendingToRemove.size)) {
             logDebug(
-              s"""killExecutors($executorIds, $replace, $force): Executor 
counts do not match:
+              s"""killExecutors($executorIds, $adjustTargetNumExecutors, 
$countFailures, $force):
+                 |Executor counts do not match:
                  |requestedTotalExecutors  = $requestedTotalExecutors
                  |numExistingExecutors     = $numExistingExecutors
                  |numPendingExecutors      = $numPendingExecutors

http://git-wip-us.apache.org/repos/asf/spark/blob/30242b66/core/src/test/scala/org/apache/spark/ExecutorAllocationManagerSuite.scala
----------------------------------------------------------------------
diff --git 
a/core/src/test/scala/org/apache/spark/ExecutorAllocationManagerSuite.scala 
b/core/src/test/scala/org/apache/spark/ExecutorAllocationManagerSuite.scala
index a0cae5a..9807d12 100644
--- a/core/src/test/scala/org/apache/spark/ExecutorAllocationManagerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/ExecutorAllocationManagerSuite.scala
@@ -19,6 +19,8 @@ package org.apache.spark
 
 import scala.collection.mutable
 
+import org.mockito.Matchers.{any, eq => meq}
+import org.mockito.Mockito.{mock, never, verify, when}
 import org.scalatest.{BeforeAndAfter, PrivateMethodTester}
 
 import org.apache.spark.executor.TaskMetrics
@@ -26,6 +28,7 @@ import org.apache.spark.scheduler._
 import org.apache.spark.scheduler.ExternalClusterManager
 import org.apache.spark.scheduler.cluster.ExecutorInfo
 import org.apache.spark.scheduler.local.LocalSchedulerBackend
+import org.apache.spark.storage.BlockManagerMaster
 import org.apache.spark.util.ManualClock
 
 /**
@@ -1050,6 +1053,66 @@ class ExecutorAllocationManagerSuite
     assert(removeTimes(manager) === Map.empty)
   }
 
+  test("SPARK-23365 Don't update target num executors when killing idle 
executors") {
+    val minExecutors = 1
+    val initialExecutors = 1
+    val maxExecutors = 2
+    val conf = new SparkConf()
+      .set("spark.dynamicAllocation.enabled", "true")
+      .set("spark.shuffle.service.enabled", "true")
+      .set("spark.dynamicAllocation.minExecutors", minExecutors.toString)
+      .set("spark.dynamicAllocation.maxExecutors", maxExecutors.toString)
+      .set("spark.dynamicAllocation.initialExecutors", 
initialExecutors.toString)
+      .set("spark.dynamicAllocation.schedulerBacklogTimeout", "1000ms")
+      .set("spark.dynamicAllocation.sustainedSchedulerBacklogTimeout", 
"1000ms")
+      .set("spark.dynamicAllocation.executorIdleTimeout", s"3000ms")
+    val mockAllocationClient = mock(classOf[ExecutorAllocationClient])
+    val mockBMM = mock(classOf[BlockManagerMaster])
+    val manager = new ExecutorAllocationManager(
+      mockAllocationClient, mock(classOf[LiveListenerBus]), conf, mockBMM)
+    val clock = new ManualClock()
+    manager.setClock(clock)
+
+    when(mockAllocationClient.requestTotalExecutors(meq(2), any(), 
any())).thenReturn(true)
+    // test setup -- job with 2 tasks, scale up to two executors
+    assert(numExecutorsTarget(manager) === 1)
+    manager.listener.onExecutorAdded(SparkListenerExecutorAdded(
+      clock.getTimeMillis(), "executor-1", new ExecutorInfo("host1", 1, 
Map.empty)))
+    
manager.listener.onStageSubmitted(SparkListenerStageSubmitted(createStageInfo(0,
 2)))
+    clock.advance(1000)
+    manager invokePrivate 
_updateAndSyncNumExecutorsTarget(clock.getTimeMillis())
+    assert(numExecutorsTarget(manager) === 2)
+    val taskInfo0 = createTaskInfo(0, 0, "executor-1")
+    manager.listener.onTaskStart(SparkListenerTaskStart(0, 0, taskInfo0))
+    manager.listener.onExecutorAdded(SparkListenerExecutorAdded(
+      clock.getTimeMillis(), "executor-2", new ExecutorInfo("host1", 1, 
Map.empty)))
+    val taskInfo1 = createTaskInfo(1, 1, "executor-2")
+    manager.listener.onTaskStart(SparkListenerTaskStart(0, 0, taskInfo1))
+    assert(numExecutorsTarget(manager) === 2)
+
+    // have one task finish -- we should adjust the target number of executors 
down
+    // but we should *not* kill any executors yet
+    manager.listener.onTaskEnd(SparkListenerTaskEnd(0, 0, null, Success, 
taskInfo0, null))
+    assert(maxNumExecutorsNeeded(manager) === 1)
+    assert(numExecutorsTarget(manager) === 2)
+    clock.advance(1000)
+    manager invokePrivate 
_updateAndSyncNumExecutorsTarget(clock.getTimeMillis())
+    assert(numExecutorsTarget(manager) === 1)
+    verify(mockAllocationClient, never).killExecutors(any(), any(), any(), 
any())
+
+    // now we cross the idle timeout for executor-1, so we kill it.  the 
really important
+    // thing here is that we do *not* ask the executor allocation client to 
adjust the target
+    // number of executors down
+    when(mockAllocationClient.killExecutors(Seq("executor-1"), false, false, 
false))
+      .thenReturn(Seq("executor-1"))
+    clock.advance(3000)
+    schedule(manager)
+    assert(maxNumExecutorsNeeded(manager) === 1)
+    assert(numExecutorsTarget(manager) === 1)
+    // here's the important verify -- we did kill the executors, but did not 
adjust the target count
+    verify(mockAllocationClient).killExecutors(Seq("executor-1"), false, 
false, false)
+  }
+
   private def createSparkContext(
       minExecutors: Int = 1,
       maxExecutors: Int = 5,
@@ -1268,7 +1331,8 @@ private class DummyLocalSchedulerBackend (sc: 
SparkContext, sb: SchedulerBackend
 
   override def killExecutors(
       executorIds: Seq[String],
-      replace: Boolean,
+      adjustTargetNumExecutors: Boolean,
+      countFailures: Boolean,
       force: Boolean): Seq[String] = executorIds
 
   override def start(): Unit = sb.start()

http://git-wip-us.apache.org/repos/asf/spark/blob/30242b66/core/src/test/scala/org/apache/spark/deploy/StandaloneDynamicAllocationSuite.scala
----------------------------------------------------------------------
diff --git 
a/core/src/test/scala/org/apache/spark/deploy/StandaloneDynamicAllocationSuite.scala
 
b/core/src/test/scala/org/apache/spark/deploy/StandaloneDynamicAllocationSuite.scala
index bf7480d..155564a 100644
--- 
a/core/src/test/scala/org/apache/spark/deploy/StandaloneDynamicAllocationSuite.scala
+++ 
b/core/src/test/scala/org/apache/spark/deploy/StandaloneDynamicAllocationSuite.scala
@@ -573,7 +573,8 @@ class StandaloneDynamicAllocationSuite
     syncExecutors(sc)
     sc.schedulerBackend match {
       case b: CoarseGrainedSchedulerBackend =>
-        b.killExecutors(Seq(executorId), replace = false, force)
+        b.killExecutors(Seq(executorId), adjustTargetNumExecutors = true, 
countFailures = false,
+          force)
       case _ => fail("expected coarse grained scheduler")
     }
   }

http://git-wip-us.apache.org/repos/asf/spark/blob/30242b66/core/src/test/scala/org/apache/spark/scheduler/BlacklistTrackerSuite.scala
----------------------------------------------------------------------
diff --git 
a/core/src/test/scala/org/apache/spark/scheduler/BlacklistTrackerSuite.scala 
b/core/src/test/scala/org/apache/spark/scheduler/BlacklistTrackerSuite.scala
index cd1b7a9..88a57c5 100644
--- a/core/src/test/scala/org/apache/spark/scheduler/BlacklistTrackerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/scheduler/BlacklistTrackerSuite.scala
@@ -479,7 +479,7 @@ class BlacklistTrackerSuite extends SparkFunSuite with 
BeforeAndAfterEach with M
 
   test("blacklisting kills executors, configured by BLACKLIST_KILL_ENABLED") {
     val allocationClientMock = mock[ExecutorAllocationClient]
-    when(allocationClientMock.killExecutors(any(), any(), 
any())).thenReturn(Seq("called"))
+    when(allocationClientMock.killExecutors(any(), any(), any(), 
any())).thenReturn(Seq("called"))
     when(allocationClientMock.killExecutorsOnHost("hostA")).thenAnswer(new 
Answer[Boolean] {
       // To avoid a race between blacklisting and killing, it is important 
that the nodeBlacklist
       // is updated before we ask the executor allocation client to kill all 
the executors
@@ -517,7 +517,7 @@ class BlacklistTrackerSuite extends SparkFunSuite with 
BeforeAndAfterEach with M
     }
     blacklist.updateBlacklistForSuccessfulTaskSet(0, 0, 
taskSetBlacklist1.execToFailures)
 
-    verify(allocationClientMock, never).killExecutors(any(), any(), any())
+    verify(allocationClientMock, never).killExecutors(any(), any(), any(), 
any())
     verify(allocationClientMock, never).killExecutorsOnHost(any())
 
     // Enable auto-kill. Blacklist an executor and make sure killExecutors is 
called.
@@ -533,7 +533,7 @@ class BlacklistTrackerSuite extends SparkFunSuite with 
BeforeAndAfterEach with M
     }
     blacklist.updateBlacklistForSuccessfulTaskSet(0, 0, 
taskSetBlacklist2.execToFailures)
 
-    verify(allocationClientMock).killExecutors(Seq("1"), true, true)
+    verify(allocationClientMock).killExecutors(Seq("1"), false, false, true)
 
     val taskSetBlacklist3 = createTaskSetBlacklist(stageId = 1)
     // Fail 4 tasks in one task set on executor 2, so that executor gets 
blacklisted for the whole
@@ -545,13 +545,13 @@ class BlacklistTrackerSuite extends SparkFunSuite with 
BeforeAndAfterEach with M
     }
     blacklist.updateBlacklistForSuccessfulTaskSet(0, 0, 
taskSetBlacklist3.execToFailures)
 
-    verify(allocationClientMock).killExecutors(Seq("2"), true, true)
+    verify(allocationClientMock).killExecutors(Seq("2"), false, false, true)
     verify(allocationClientMock).killExecutorsOnHost("hostA")
   }
 
   test("fetch failure blacklisting kills executors, configured by 
BLACKLIST_KILL_ENABLED") {
     val allocationClientMock = mock[ExecutorAllocationClient]
-    when(allocationClientMock.killExecutors(any(), any(), 
any())).thenReturn(Seq("called"))
+    when(allocationClientMock.killExecutors(any(), any(), any(), 
any())).thenReturn(Seq("called"))
     when(allocationClientMock.killExecutorsOnHost("hostA")).thenAnswer(new 
Answer[Boolean] {
       // To avoid a race between blacklisting and killing, it is important 
that the nodeBlacklist
       // is updated before we ask the executor allocation client to kill all 
the executors
@@ -571,7 +571,7 @@ class BlacklistTrackerSuite extends SparkFunSuite with 
BeforeAndAfterEach with M
     conf.set(config.BLACKLIST_KILL_ENABLED, false)
     blacklist.updateBlacklistForFetchFailure("hostA", exec = "1")
 
-    verify(allocationClientMock, never).killExecutors(any(), any(), any())
+    verify(allocationClientMock, never).killExecutors(any(), any(), any(), 
any())
     verify(allocationClientMock, never).killExecutorsOnHost(any())
 
     // Enable auto-kill. Blacklist an executor and make sure killExecutors is 
called.
@@ -580,7 +580,7 @@ class BlacklistTrackerSuite extends SparkFunSuite with 
BeforeAndAfterEach with M
     clock.advance(1000)
     blacklist.updateBlacklistForFetchFailure("hostA", exec = "1")
 
-    verify(allocationClientMock).killExecutors(Seq("1"), true, true)
+    verify(allocationClientMock).killExecutors(Seq("1"), false, false, true)
     verify(allocationClientMock, never).killExecutorsOnHost(any())
 
     assert(blacklist.executorIdToBlacklistStatus.contains("1"))


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to