This is an automated email from the ASF dual-hosted git repository.
holden pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push:
new 548ac7c [SPARK-31198][CORE] Use graceful decommissioning as part of
dynamic scaling
548ac7c is described below
commit 548ac7c4af2270a6bdbf7a6f29f4846eecdc0171
Author: Holden Karau <[email protected]>
AuthorDate: Wed Aug 12 17:07:18 2020 -0700
[SPARK-31198][CORE] Use graceful decommissioning as part of dynamic scaling
### What changes were proposed in this pull request?
If graceful decommissioning is enabled, Spark's dynamic scaling uses this
instead of directly killing executors.
### Why are the changes needed?
When scaling down Spark we should avoid triggering recomputes as much as
possible.
### Does this PR introduce _any_ user-facing change?
Hopefully their jobs run faster or at the same speed. It also enables
experimental shuffle service free dynamic scaling when graceful decommissioning
is enabled (using the same code as the shuffle tracking dynamic scaling).
### How was this patch tested?
For now I've extended the ExecutorAllocationManagerSuite for both core &
streaming.
Closes #29367 from
holdenk/SPARK-31198-use-graceful-decommissioning-as-part-of-dynamic-scaling.
Lead-authored-by: Holden Karau <[email protected]>
Co-authored-by: Holden Karau <[email protected]>
Signed-off-by: Holden Karau <[email protected]>
---
.../apache/spark/ExecutorAllocationClient.scala | 40 +++++
.../apache/spark/ExecutorAllocationManager.scala | 30 +++-
.../cluster/CoarseGrainedSchedulerBackend.scala | 190 ++++++++++++---------
.../cluster/StandaloneSchedulerBackend.scala | 3 +-
.../spark/scheduler/dynalloc/ExecutorMonitor.scala | 61 ++++++-
.../org/apache/spark/storage/BlockManager.scala | 2 +-
.../apache/spark/storage/BlockManagerMaster.scala | 6 +-
.../spark/ExecutorAllocationManagerSuite.scala | 71 +++++++-
.../WorkerDecommissionExtendedSuite.scala | 3 +-
.../spark/scheduler/WorkerDecommissionSuite.scala | 4 +-
.../BlockManagerDecommissionIntegrationSuite.scala | 7 +-
project/SparkBuild.scala | 2 +
.../docker/src/main/dockerfiles/spark/decom.sh | 2 +-
.../k8s/integrationtest/KubernetesSuite.scala | 27 ++-
.../integration-tests/tests/decommissioning.py | 5 -
.../scheduler/ExecutorAllocationManager.scala | 10 +-
.../scheduler/ExecutorAllocationManagerSuite.scala | 51 ++++--
17 files changed, 380 insertions(+), 134 deletions(-)
diff --git
a/core/src/main/scala/org/apache/spark/ExecutorAllocationClient.scala
b/core/src/main/scala/org/apache/spark/ExecutorAllocationClient.scala
index 00bd006..079340a 100644
--- a/core/src/main/scala/org/apache/spark/ExecutorAllocationClient.scala
+++ b/core/src/main/scala/org/apache/spark/ExecutorAllocationClient.scala
@@ -17,6 +17,8 @@
package org.apache.spark
+import org.apache.spark.scheduler.ExecutorDecommissionInfo
+
/**
* A client that communicates with the cluster manager to request or kill
executors.
* This is currently supported only in YARN mode.
@@ -82,6 +84,44 @@ private[spark] trait ExecutorAllocationClient {
force: Boolean = false): Seq[String]
/**
+ * Request that the cluster manager decommission the specified executors.
+ * Default implementation delegates to kill, scheduler must override
+ * if it supports graceful decommissioning.
+ *
+ * @param executorsAndDecominfo identifiers of executors & decom info.
+ * @param adjustTargetNumExecutors whether the target number of executors
will be adjusted down
+ * after these executors have been
decommissioned.
+ * @return the ids of the executors acknowledged by the cluster manager to
be removed.
+ */
+ def decommissionExecutors(
+ executorsAndDecomInfo: Array[(String, ExecutorDecommissionInfo)],
+ adjustTargetNumExecutors: Boolean): Seq[String] = {
+ killExecutors(executorsAndDecomInfo.map(_._1),
+ adjustTargetNumExecutors,
+ countFailures = false)
+ }
+
+
+ /**
+ * Request that the cluster manager decommission the specified executor.
+ * Delegates to decommissionExecutors.
+ *
+ * @param executorId identifiers of executor to decommission
+ * @param decommissionInfo information about the decommission (reason, host
loss)
+ * @param adjustTargetNumExecutors if we should adjust the target number of
executors.
+ * @return whether the request is acknowledged by the cluster manager.
+ */
+ final def decommissionExecutor(executorId: String,
+ decommissionInfo: ExecutorDecommissionInfo,
+ adjustTargetNumExecutors: Boolean): Boolean = {
+ val decommissionedExecutors = decommissionExecutors(
+ Array((executorId, decommissionInfo)),
+ adjustTargetNumExecutors = adjustTargetNumExecutors)
+ decommissionedExecutors.nonEmpty &&
decommissionedExecutors(0).equals(executorId)
+ }
+
+
+ /**
* Request that the cluster manager kill every executor on the specified
host.
*
* @return whether the request is acknowledged by the cluster manager.
diff --git
a/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala
b/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala
index 1570f86..1cb840f 100644
--- a/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala
+++ b/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala
@@ -28,6 +28,7 @@ import com.codahale.metrics.{Gauge, MetricRegistry}
import org.apache.spark.internal.{config, Logging}
import org.apache.spark.internal.config._
import org.apache.spark.internal.config.Tests.TEST_SCHEDULE_INTERVAL
+import org.apache.spark.internal.config.Worker.WORKER_DECOMMISSION_ENABLED
import org.apache.spark.metrics.source.Source
import org.apache.spark.resource.ResourceProfile.UNKNOWN_RESOURCE_PROFILE_ID
import org.apache.spark.resource.ResourceProfileManager
@@ -127,6 +128,8 @@ private[spark] class ExecutorAllocationManager(
private val executorAllocationRatio =
conf.get(DYN_ALLOCATION_EXECUTOR_ALLOCATION_RATIO)
+ private val decommissionEnabled = conf.get(WORKER_DECOMMISSION_ENABLED)
+
private val defaultProfileId =
resourceProfileManager.defaultResourceProfile.id
validateSettings()
@@ -204,7 +207,12 @@ private[spark] class ExecutorAllocationManager(
s"s${DYN_ALLOCATION_SUSTAINED_SCHEDULER_BACKLOG_TIMEOUT.key} must be >
0!")
}
if (!conf.get(config.SHUFFLE_SERVICE_ENABLED)) {
- if (conf.get(config.DYN_ALLOCATION_SHUFFLE_TRACKING_ENABLED)) {
+ // If dynamic allocation shuffle tracking or worker decommissioning
along with
+ // storage shuffle decommissioning is enabled we have *experimental*
support for
+ // decommissioning without a shuffle service.
+ if (conf.get(config.DYN_ALLOCATION_SHUFFLE_TRACKING_ENABLED) ||
+ (decommissionEnabled &&
+ conf.get(config.STORAGE_DECOMMISSION_SHUFFLE_BLOCKS_ENABLED))) {
logWarning("Dynamic allocation without a shuffle service is an
experimental feature.")
} else if (!testing) {
throw new SparkException("Dynamic allocation of executors requires the
external " +
@@ -539,7 +547,9 @@ private[spark] class ExecutorAllocationManager(
// get the running total as we remove or initialize it to the count -
pendingRemoval
val newExecutorTotal = numExecutorsTotalPerRpId.getOrElseUpdate(rpId,
(executorMonitor.executorCountWithResourceProfile(rpId) -
- executorMonitor.pendingRemovalCountPerResourceProfileId(rpId)))
+ executorMonitor.pendingRemovalCountPerResourceProfileId(rpId) -
+ executorMonitor.decommissioningPerResourceProfileId(rpId)
+ ))
if (newExecutorTotal - 1 < minNumExecutors) {
logDebug(s"Not removing idle executor $executorIdToBeRemoved because
there " +
s"are only $newExecutorTotal executor(s) left (minimum number of
executor limit " +
@@ -565,8 +575,14 @@ private[spark] class ExecutorAllocationManager(
} else {
// We don't want to change our target number of executors, because we
already did that
// when the task backlog decreased.
- client.killExecutors(executorIdsToBeRemoved.toSeq,
adjustTargetNumExecutors = false,
- countFailures = false, force = false)
+ if (decommissionEnabled) {
+ val executorIdsWithoutHostLoss = executorIdsToBeRemoved.toSeq.map(
+ id => (id, ExecutorDecommissionInfo("spark scale down",
false))).toArray
+ client.decommissionExecutors(executorIdsWithoutHostLoss,
adjustTargetNumExecutors = false)
+ } else {
+ client.killExecutors(executorIdsToBeRemoved.toSeq,
adjustTargetNumExecutors = false,
+ countFailures = false, force = false)
+ }
}
// [SPARK-21834] killExecutors api reduces the target number of executors.
@@ -578,7 +594,11 @@ private[spark] class ExecutorAllocationManager(
// reset the newExecutorTotal to the existing number of executors
if (testing || executorsRemoved.nonEmpty) {
- executorMonitor.executorsKilled(executorsRemoved.toSeq)
+ if (decommissionEnabled) {
+ executorMonitor.executorsDecommissioned(executorsRemoved)
+ } else {
+ executorMonitor.executorsKilled(executorsRemoved.toSeq)
+ }
logInfo(s"Executors ${executorsRemoved.mkString(",")} removed due to
idle timeout.")
executorsRemoved.toSeq
} else {
diff --git
a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala
b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala
index 200f2d8..ca65731 100644
---
a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala
+++
b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala
@@ -193,7 +193,7 @@ class CoarseGrainedSchedulerBackend(scheduler:
TaskSchedulerImpl, val rpcEnv: Rp
case DecommissionExecutor(executorId, decommissionInfo) =>
logError(s"Received decommission executor message ${executorId}:
$decommissionInfo")
- decommissionExecutor(executorId, decommissionInfo)
+ decommissionExecutor(executorId, decommissionInfo,
adjustTargetNumExecutors = false)
case RemoveWorker(workerId, host, message) =>
removeWorker(workerId, host, message)
@@ -274,8 +274,8 @@ class CoarseGrainedSchedulerBackend(scheduler:
TaskSchedulerImpl, val rpcEnv: Rp
case DecommissionExecutor(executorId, decommissionInfo) =>
logError(s"Received decommission executor message ${executorId}:
${decommissionInfo}.")
- decommissionExecutor(executorId, decommissionInfo)
- context.reply(true)
+ context.reply(decommissionExecutor(executorId, decommissionInfo,
+ adjustTargetNumExecutors = false))
case RetrieveSparkAppConfig(resourceProfileId) =>
val rp =
scheduler.sc.resourceProfileManager.resourceProfileFromId(resourceProfileId)
@@ -420,59 +420,6 @@ class CoarseGrainedSchedulerBackend(scheduler:
TaskSchedulerImpl, val rpcEnv: Rp
}
/**
- * Mark a given executor as decommissioned and stop making resource offers
for it.
- */
- private def decommissionExecutor(
- executorId: String, decommissionInfo: ExecutorDecommissionInfo):
Boolean = {
- val shouldDisable = CoarseGrainedSchedulerBackend.this.synchronized {
- // Only bother decommissioning executors which are alive.
- if (isExecutorActive(executorId)) {
- executorsPendingDecommission += executorId
- true
- } else {
- false
- }
- }
-
- if (shouldDisable) {
- logInfo(s"Starting decommissioning executor $executorId.")
- try {
- scheduler.executorDecommission(executorId, decommissionInfo)
- } catch {
- case e: Exception =>
- logError(s"Unexpected error during decommissioning ${e.toString}",
e)
- }
- // Send decommission message to the executor, this may be a duplicate
since the executor
- // could have been the one to notify us. But it's also possible the
notification came from
- // elsewhere and the executor does not yet know.
- executorDataMap.get(executorId) match {
- case Some(executorInfo) =>
- executorInfo.executorEndpoint.send(DecommissionSelf)
- case None =>
- // Ignoring the executor since it is not registered.
- logWarning(s"Attempted to decommission unknown executor
$executorId.")
- }
- logInfo(s"Finished decommissioning executor $executorId.")
-
- if (conf.get(STORAGE_DECOMMISSION_ENABLED)) {
- try {
- logInfo("Starting decommissioning block manager corresponding to "
+
- s"executor $executorId.")
-
scheduler.sc.env.blockManager.master.decommissionBlockManagers(Seq(executorId))
- } catch {
- case e: Exception =>
- logError("Unexpected error during block manager " +
- s"decommissioning for executor $executorId: ${e.toString}", e)
- }
- logInfo(s"Acknowledged decommissioning block manager corresponding
to $executorId.")
- }
- } else {
- logInfo(s"Skipping decommissioning of executor $executorId.")
- }
- shouldDisable
- }
-
- /**
* Stop making resource offers for the given executor. The executor is
marked as lost with
* the loss reason still pending.
*
@@ -503,6 +450,87 @@ class CoarseGrainedSchedulerBackend(scheduler:
TaskSchedulerImpl, val rpcEnv: Rp
protected def minRegisteredRatio: Double = _minRegisteredRatio
+ /**
+ * Request that the cluster manager decommission the specified executors.
+ *
+ * @param executorsAndDecomInfo Identifiers of executors & decommission info.
+ * @param adjustTargetNumExecutors whether the target number of executors
will be adjusted down
+ * after these executors have been
decommissioned.
+ * @return the ids of the executors acknowledged by the cluster manager to
be removed.
+ */
+ override def decommissionExecutors(
+ executorsAndDecomInfo: Array[(String, ExecutorDecommissionInfo)],
+ adjustTargetNumExecutors: Boolean): Seq[String] = {
+
+ val executorsToDecommission = executorsAndDecomInfo.filter { case
(executorId, _) =>
+ CoarseGrainedSchedulerBackend.this.synchronized {
+ // Only bother decommissioning executors which are alive.
+ if (isExecutorActive(executorId)) {
+ executorsPendingDecommission += executorId
+ true
+ } else {
+ false
+ }
+ }
+ }
+
+ // If we don't want to replace the executors we are decommissioning
+ if (adjustTargetNumExecutors) {
+ adjustExecutors(executorsToDecommission.map(_._1))
+ }
+
+ executorsToDecommission.filter { case (executorId, decomInfo) =>
+ doDecommission(executorId, decomInfo)
+ }.map(_._1)
+ }
+
+
+ private def doDecommission(executorId: String,
+ decomInfo: ExecutorDecommissionInfo): Boolean = {
+
+ logInfo(s"Asking executor $executorId to decommissioning.")
+ try {
+ scheduler.executorDecommission(executorId, decomInfo)
+ if (driverEndpoint != null) {
+ logInfo("Propagating executor decommission to driver.")
+ driverEndpoint.send(DecommissionExecutor(executorId, decomInfo))
+ }
+ } catch {
+ case e: Exception =>
+ logError(s"Unexpected error during decommissioning ${e.toString}", e)
+ return false
+ }
+ // Send decommission message to the executor (it could have originated on
the executor
+ // but not necessarily.
+ CoarseGrainedSchedulerBackend.this.synchronized {
+ executorDataMap.get(executorId) match {
+ case Some(executorInfo) =>
+ executorInfo.executorEndpoint.send(DecommissionSelf)
+ case None =>
+ // Ignoring the executor since it is not registered.
+ logWarning(s"Attempted to decommission unknown executor
$executorId.")
+ return false
+ }
+ }
+ logInfo(s"Asked executor $executorId to decommission.")
+
+ if (conf.get(STORAGE_DECOMMISSION_ENABLED)) {
+ try {
+ logInfo(s"Asking block manager corresponding to executor $executorId
to decommission.")
+
scheduler.sc.env.blockManager.master.decommissionBlockManagers(Seq(executorId))
+ } catch {
+ case e: Exception =>
+ logError("Unexpected error during block manager " +
+ s"decommissioning for executor $executorId: ${e.toString}", e)
+ return false
+ }
+ logInfo(s"Acknowledged decommissioning block manager corresponding to
$executorId.")
+ }
+
+ true
+ }
+
+
override def start(): Unit = {
if (UserGroupInformation.isSecurityEnabled()) {
delegationTokenManager = createTokenManager()
@@ -598,17 +626,6 @@ class CoarseGrainedSchedulerBackend(scheduler:
TaskSchedulerImpl, val rpcEnv: Rp
driverEndpoint.send(RemoveWorker(workerId, host, message))
}
- /**
- * Called by subclasses when notified of a decommissioning executor.
- */
- private[spark] def decommissionExecutor(
- executorId: String, decommissionInfo: ExecutorDecommissionInfo): Unit = {
- if (driverEndpoint != null) {
- logInfo("Propagating executor decommission to driver.")
- driverEndpoint.send(DecommissionExecutor(executorId, decommissionInfo))
- }
- }
-
def sufficientResourcesRegistered(): Boolean = true
override def isReady(): Boolean = {
@@ -761,6 +778,31 @@ class CoarseGrainedSchedulerBackend(scheduler:
TaskSchedulerImpl, val rpcEnv: Rp
Future.successful(false)
/**
+ * Adjust the number of executors being requested to no longer include the
provided executors.
+ */
+ private def adjustExecutors(executorIds: Seq[String]) = {
+ if (executorIds.nonEmpty) {
+ executorIds.foreach { exec =>
+ withLock {
+ val rpId = executorDataMap(exec).resourceProfileId
+ val rp =
scheduler.sc.resourceProfileManager.resourceProfileFromId(rpId)
+ if (requestedTotalExecutorsPerResourceProfile.isEmpty) {
+ // Assume that we are killing an executor that was started by
default and
+ // not through the request api
+ requestedTotalExecutorsPerResourceProfile(rp) = 0
+ } else {
+ val requestedTotalForRp =
requestedTotalExecutorsPerResourceProfile(rp)
+ requestedTotalExecutorsPerResourceProfile(rp) =
math.max(requestedTotalForRp - 1, 0)
+ }
+ }
+ }
+ doRequestTotalExecutors(requestedTotalExecutorsPerResourceProfile.toMap)
+ } else {
+ Future.successful(true)
+ }
+ }
+
+ /**
* Request that the cluster manager kill the specified executors.
*
* @param executorIds identifiers of executors to kill
@@ -798,19 +840,7 @@ class CoarseGrainedSchedulerBackend(scheduler:
TaskSchedulerImpl, val rpcEnv: Rp
// take into account executors that are pending to be added or removed.
val adjustTotalExecutors =
if (adjustTargetNumExecutors) {
- executorsToKill.foreach { exec =>
- val rpId = executorDataMap(exec).resourceProfileId
- val rp =
scheduler.sc.resourceProfileManager.resourceProfileFromId(rpId)
- if (requestedTotalExecutorsPerResourceProfile.isEmpty) {
- // Assume that we are killing an executor that was started by
default and
- // not through the request api
- requestedTotalExecutorsPerResourceProfile(rp) = 0
- } else {
- val requestedTotalForRp =
requestedTotalExecutorsPerResourceProfile(rp)
- requestedTotalExecutorsPerResourceProfile(rp) =
math.max(requestedTotalForRp - 1, 0)
- }
- }
-
doRequestTotalExecutors(requestedTotalExecutorsPerResourceProfile.toMap)
+ adjustExecutors(executorsToKill)
} else {
Future.successful(true)
}
diff --git
a/core/src/main/scala/org/apache/spark/scheduler/cluster/StandaloneSchedulerBackend.scala
b/core/src/main/scala/org/apache/spark/scheduler/cluster/StandaloneSchedulerBackend.scala
index d921af6..3acb6f1 100644
---
a/core/src/main/scala/org/apache/spark/scheduler/cluster/StandaloneSchedulerBackend.scala
+++
b/core/src/main/scala/org/apache/spark/scheduler/cluster/StandaloneSchedulerBackend.scala
@@ -176,7 +176,8 @@ private[spark] class StandaloneSchedulerBackend(
override def executorDecommissioned(fullId: String, decommissionInfo:
ExecutorDecommissionInfo) {
logInfo("Asked to decommission executor")
- decommissionExecutor(fullId.split("/")(1), decommissionInfo)
+ val execId = fullId.split("/")(1)
+ decommissionExecutors(Array((execId, decommissionInfo)),
adjustTargetNumExecutors = false)
logInfo("Executor %s decommissioned: %s".format(fullId, decommissionInfo))
}
diff --git
a/core/src/main/scala/org/apache/spark/scheduler/dynalloc/ExecutorMonitor.scala
b/core/src/main/scala/org/apache/spark/scheduler/dynalloc/ExecutorMonitor.scala
index 4d71907..8dbdc84 100644
---
a/core/src/main/scala/org/apache/spark/scheduler/dynalloc/ExecutorMonitor.scala
+++
b/core/src/main/scala/org/apache/spark/scheduler/dynalloc/ExecutorMonitor.scala
@@ -28,7 +28,7 @@ import org.apache.spark.internal.Logging
import org.apache.spark.internal.config._
import org.apache.spark.resource.ResourceProfile.UNKNOWN_RESOURCE_PROFILE_ID
import org.apache.spark.scheduler._
-import org.apache.spark.storage.RDDBlockId
+import org.apache.spark.storage.{RDDBlockId, ShuffleDataBlockId}
import org.apache.spark.util.Clock
/**
@@ -114,7 +114,8 @@ private[spark] class ExecutorMonitor(
var newNextTimeout = Long.MaxValue
timedOutExecs = executors.asScala
- .filter { case (_, exec) => !exec.pendingRemoval &&
!exec.hasActiveShuffle }
+ .filter { case (_, exec) =>
+ !exec.pendingRemoval && !exec.hasActiveShuffle &&
!exec.decommissioning}
.filter { case (_, exec) =>
val deadline = exec.timeoutAt
if (deadline > now) {
@@ -135,6 +136,7 @@ private[spark] class ExecutorMonitor(
/**
* Mark the given executors as pending to be removed. Should only be called
in the EAM thread.
+ * This covers both kills and decommissions.
*/
def executorsKilled(ids: Seq[String]): Unit = {
ids.foreach { id =>
@@ -149,6 +151,19 @@ private[spark] class ExecutorMonitor(
nextTimeout.set(Long.MinValue)
}
+ private[spark] def executorsDecommissioned(ids: Seq[String]): Unit = {
+ ids.foreach { id =>
+ val tracker = executors.get(id)
+ if (tracker != null) {
+ tracker.decommissioning = true
+ }
+ }
+
+ // Recompute timed out executors in the next EAM callback, since this call
invalidates
+ // the current list.
+ nextTimeout.set(Long.MinValue)
+ }
+
def executorCount: Int = executors.size()
def executorCountWithResourceProfile(id: Int): Int = {
@@ -171,6 +186,16 @@ private[spark] class ExecutorMonitor(
executors.asScala.filter { case (k, v) => v.resourceProfileId == id &&
v.pendingRemoval }.size
}
+ def decommissioningCount: Int = executors.asScala.count { case (_, exec) =>
+ exec.decommissioning
+ }
+
+ def decommissioningPerResourceProfileId(id: Int): Int = {
+ executors.asScala.filter { case (k, v) =>
+ v.resourceProfileId == id && v.decommissioning
+ }.size
+ }
+
override def onJobStart(event: SparkListenerJobStart): Unit = {
if (!shuffleTrackingEnabled) {
return
@@ -298,6 +323,7 @@ private[spark] class ExecutorMonitor(
//
// This means that an executor may be marked as having shuffle data, and
thus prevented
// from being removed, even though the data may not be used.
+ // TODO: Only track used files (SPARK-31974)
if (shuffleTrackingEnabled && event.reason == Success) {
stageToShuffleID.get(event.stageId).foreach { shuffleId =>
exec.addShuffle(shuffleId)
@@ -326,18 +352,35 @@ private[spark] class ExecutorMonitor(
val removed = executors.remove(event.executorId)
if (removed != null) {
decrementExecResourceProfileCount(removed.resourceProfileId)
- if (!removed.pendingRemoval) {
+ if (!removed.pendingRemoval || !removed.decommissioning) {
nextTimeout.set(Long.MinValue)
}
}
}
override def onBlockUpdated(event: SparkListenerBlockUpdated): Unit = {
+ val exec =
ensureExecutorIsTracked(event.blockUpdatedInfo.blockManagerId.executorId,
+ UNKNOWN_RESOURCE_PROFILE_ID)
+
+ // Check if it is a shuffle file, or RDD to pick the correct codepath for
update
if (!event.blockUpdatedInfo.blockId.isInstanceOf[RDDBlockId]) {
+ if (event.blockUpdatedInfo.blockId.isInstanceOf[ShuffleDataBlockId] &&
+ shuffleTrackingEnabled) {
+ /**
+ * The executor monitor keeps track of locations of cache and shuffle
blocks and this can
+ * be used to decide which executor(s) Spark should shutdown first.
Since we move shuffle
+ * blocks around now this wires it up so that it keeps track of it. We
only do this for
+ * data blocks as index and other blocks blocks do not necessarily
mean the entire block
+ * has been committed.
+ */
+ event.blockUpdatedInfo.blockId match {
+ case ShuffleDataBlockId(shuffleId, _, _) =>
exec.addShuffle(shuffleId)
+ case _ => // For now we only update on data blocks
+ }
+ }
return
}
- val exec =
ensureExecutorIsTracked(event.blockUpdatedInfo.blockManagerId.executorId,
- UNKNOWN_RESOURCE_PROFILE_ID)
+
val storageLevel = event.blockUpdatedInfo.storageLevel
val blockId = event.blockUpdatedInfo.blockId.asInstanceOf[RDDBlockId]
@@ -410,10 +453,15 @@ private[spark] class ExecutorMonitor(
}
// Visible for testing
- def executorsPendingToRemove(): Set[String] = {
+ private[spark] def executorsPendingToRemove(): Set[String] = {
executors.asScala.filter { case (_, exec) => exec.pendingRemoval
}.keys.toSet
}
+ // Visible for testing
+ private[spark] def executorsDecommissioning(): Set[String] = {
+ executors.asScala.filter { case (_, exec) => exec.decommissioning
}.keys.toSet
+ }
+
/**
* This method should be used when updating executor state. It guards
against a race condition in
* which the `SparkListenerTaskStart` event is posted before the
`SparkListenerBlockManagerAdded`
@@ -466,6 +514,7 @@ private[spark] class ExecutorMonitor(
@volatile var timedOut: Boolean = false
var pendingRemoval: Boolean = false
+ var decommissioning: Boolean = false
var hasActiveShuffle: Boolean = false
private var idleStart: Long = -1
diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala
b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala
index 6ec93df..ee534f5 100644
--- a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala
+++ b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala
@@ -1822,7 +1822,7 @@ private[spark] class BlockManager(
}
}
- /*
+ /**
* Returns the last migration time and a boolean denoting if all the blocks
have been migrated.
* If there are any tasks running since that time the boolean may be
incorrect.
*/
diff --git
a/core/src/main/scala/org/apache/spark/storage/BlockManagerMaster.scala
b/core/src/main/scala/org/apache/spark/storage/BlockManagerMaster.scala
index 93492cc..f544d47 100644
--- a/core/src/main/scala/org/apache/spark/storage/BlockManagerMaster.scala
+++ b/core/src/main/scala/org/apache/spark/storage/BlockManagerMaster.scala
@@ -43,9 +43,11 @@ class BlockManagerMaster(
logInfo("Removed " + execId + " successfully in removeExecutor")
}
- /** Decommission block managers corresponding to given set of executors */
+ /** Decommission block managers corresponding to given set of executors
+ * Non-blocking.
+ */
def decommissionBlockManagers(executorIds: Seq[String]): Unit = {
- driverEndpoint.ask[Unit](DecommissionBlockManagers(executorIds))
+ driverEndpoint.ask[Boolean](DecommissionBlockManagers(executorIds))
}
/** Get Replication Info for all the RDD blocks stored in given
blockManagerId */
diff --git
a/core/src/test/scala/org/apache/spark/ExecutorAllocationManagerSuite.scala
b/core/src/test/scala/org/apache/spark/ExecutorAllocationManagerSuite.scala
index 5b367d2..3abe051 100644
--- a/core/src/test/scala/org/apache/spark/ExecutorAllocationManagerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/ExecutorAllocationManagerSuite.scala
@@ -28,6 +28,7 @@ import org.scalatest.PrivateMethodTester
import org.apache.spark.executor.ExecutorMetrics
import org.apache.spark.internal.config
import org.apache.spark.internal.config.Tests.TEST_SCHEDULE_INTERVAL
+import org.apache.spark.internal.config.Worker.WORKER_DECOMMISSION_ENABLED
import org.apache.spark.metrics.MetricsSystem
import org.apache.spark.resource._
import org.apache.spark.resource.ResourceProfile.DEFAULT_RESOURCE_PROFILE_ID
@@ -1270,6 +1271,68 @@ class ExecutorAllocationManagerSuite extends
SparkFunSuite {
assert(executorsPendingToRemove(manager).size === 6) // limit reached (1
executor remaining)
}
+ test("mock polling loop remove with decommissioning") {
+ val clock = new ManualClock(2020L)
+ val manager = createManager(createConf(1, 20, 1, true), clock = clock)
+
+ // Remove idle executors on timeout
+ onExecutorAddedDefaultProfile(manager, "executor-1")
+ onExecutorAddedDefaultProfile(manager, "executor-2")
+ onExecutorAddedDefaultProfile(manager, "executor-3")
+ assert(executorsDecommissioning(manager).isEmpty)
+ assert(executorsPendingToRemove(manager).isEmpty)
+
+ // idle threshold not reached yet
+ clock.advance(executorIdleTimeout * 1000 / 2)
+ schedule(manager)
+ assert(manager.executorMonitor.timedOutExecutors().isEmpty)
+ assert(executorsPendingToRemove(manager).isEmpty)
+ assert(executorsDecommissioning(manager).isEmpty)
+
+ // idle threshold exceeded
+ clock.advance(executorIdleTimeout * 1000)
+ assert(manager.executorMonitor.timedOutExecutors().size === 3)
+ schedule(manager)
+ assert(executorsPendingToRemove(manager).isEmpty) // limit reached (1
executor remaining)
+ assert(executorsDecommissioning(manager).size === 2) // limit reached (1
executor remaining)
+
+ // Mark a subset as busy - only idle executors should be removed
+ onExecutorAddedDefaultProfile(manager, "executor-4")
+ onExecutorAddedDefaultProfile(manager, "executor-5")
+ onExecutorAddedDefaultProfile(manager, "executor-6")
+ onExecutorAddedDefaultProfile(manager, "executor-7")
+ assert(manager.executorMonitor.executorCount === 7)
+ assert(executorsPendingToRemove(manager).isEmpty) // no pending to be
removed
+ assert(executorsDecommissioning(manager).size === 2) // 2 decommissioning
+ onExecutorBusy(manager, "executor-4")
+ onExecutorBusy(manager, "executor-5")
+ onExecutorBusy(manager, "executor-6") // 3 busy and 2 idle (of the 5
active ones)
+
+ // after scheduling, the previously timed out executor should be removed,
since
+ // there are new active ones.
+ schedule(manager)
+ assert(executorsDecommissioning(manager).size === 3)
+
+ // advance the clock so that idle executors should time out and move to
the pending list
+ clock.advance(executorIdleTimeout * 1000)
+ schedule(manager)
+ assert(executorsPendingToRemove(manager).size === 0)
+ assert(executorsDecommissioning(manager).size === 4)
+ assert(!executorsDecommissioning(manager).contains("executor-4"))
+ assert(!executorsDecommissioning(manager).contains("executor-5"))
+ assert(!executorsDecommissioning(manager).contains("executor-6"))
+
+ // Busy executors are now idle and should be removed
+ onExecutorIdle(manager, "executor-4")
+ onExecutorIdle(manager, "executor-5")
+ onExecutorIdle(manager, "executor-6")
+ schedule(manager)
+ assert(executorsDecommissioning(manager).size === 4)
+ clock.advance(executorIdleTimeout * 1000)
+ schedule(manager)
+ assert(executorsDecommissioning(manager).size === 6) // limit reached (1
executor remaining)
+ }
+
test("listeners trigger add executors correctly") {
val manager = createManager(createConf(1, 20, 1))
assert(addTime(manager) === NOT_SET)
@@ -1588,7 +1651,8 @@ class ExecutorAllocationManagerSuite extends
SparkFunSuite {
private def createConf(
minExecutors: Int = 1,
maxExecutors: Int = 5,
- initialExecutors: Int = 1): SparkConf = {
+ initialExecutors: Int = 1,
+ decommissioningEnabled: Boolean = false): SparkConf = {
val sparkConf = new SparkConf()
.set(config.DYN_ALLOCATION_ENABLED, true)
.set(config.DYN_ALLOCATION_MIN_EXECUTORS, minExecutors)
@@ -1604,6 +1668,7 @@ class ExecutorAllocationManagerSuite extends
SparkFunSuite {
// SPARK-22864: effectively disable the allocation schedule by setting
the period to a
// really long value.
.set(TEST_SCHEDULE_INTERVAL, 30000L)
+ .set(WORKER_DECOMMISSION_ENABLED, decommissioningEnabled)
sparkConf
}
@@ -1670,6 +1735,10 @@ class ExecutorAllocationManagerSuite extends
SparkFunSuite {
private def executorsPendingToRemove(manager: ExecutorAllocationManager):
Set[String] = {
manager.executorMonitor.executorsPendingToRemove()
}
+
+ private def executorsDecommissioning(manager: ExecutorAllocationManager):
Set[String] = {
+ manager.executorMonitor.executorsDecommissioning()
+ }
}
/**
diff --git
a/core/src/test/scala/org/apache/spark/scheduler/WorkerDecommissionExtendedSuite.scala
b/core/src/test/scala/org/apache/spark/scheduler/WorkerDecommissionExtendedSuite.scala
index d95deb1..6bfd3f7 100644
---
a/core/src/test/scala/org/apache/spark/scheduler/WorkerDecommissionExtendedSuite.scala
+++
b/core/src/test/scala/org/apache/spark/scheduler/WorkerDecommissionExtendedSuite.scala
@@ -65,7 +65,8 @@ class WorkerDecommissionExtendedSuite extends SparkFunSuite
with LocalSparkConte
val sched = sc.schedulerBackend.asInstanceOf[StandaloneSchedulerBackend]
sc.getExecutorIds().tail.foreach { id =>
- sched.decommissionExecutor(id, ExecutorDecommissionInfo("", false))
+ sched.decommissionExecutor(id, ExecutorDecommissionInfo("", false),
+ adjustTargetNumExecutors = false)
assert(rdd3.sortByKey().collect().length === 100)
}
}
diff --git
a/core/src/test/scala/org/apache/spark/scheduler/WorkerDecommissionSuite.scala
b/core/src/test/scala/org/apache/spark/scheduler/WorkerDecommissionSuite.scala
index bb0c33a..ea5be21 100644
---
a/core/src/test/scala/org/apache/spark/scheduler/WorkerDecommissionSuite.scala
+++
b/core/src/test/scala/org/apache/spark/scheduler/WorkerDecommissionSuite.scala
@@ -76,7 +76,9 @@ class WorkerDecommissionSuite extends SparkFunSuite with
LocalSparkContext {
// decom.sh message passing is tested manually.
val sched = sc.schedulerBackend.asInstanceOf[StandaloneSchedulerBackend]
val execs = sched.getExecutorIds()
- execs.foreach(execId => sched.decommissionExecutor(execId,
ExecutorDecommissionInfo("", false)))
+ // Make the executors decommission, finish, exit, and not be replaced.
+ val execsAndDecomInfo = execs.map((_, ExecutorDecommissionInfo("",
false))).toArray
+ sched.decommissionExecutors(execsAndDecomInfo, adjustTargetNumExecutors =
true)
val asyncCountResult = ThreadUtils.awaitResult(asyncCount, 20.seconds)
assert(asyncCountResult === 10)
}
diff --git
a/core/src/test/scala/org/apache/spark/storage/BlockManagerDecommissionIntegrationSuite.scala
b/core/src/test/scala/org/apache/spark/storage/BlockManagerDecommissionIntegrationSuite.scala
index 7cf0083..82f87a5 100644
---
a/core/src/test/scala/org/apache/spark/storage/BlockManagerDecommissionIntegrationSuite.scala
+++
b/core/src/test/scala/org/apache/spark/storage/BlockManagerDecommissionIntegrationSuite.scala
@@ -188,9 +188,12 @@ class BlockManagerDecommissionIntegrationSuite extends
SparkFunSuite with LocalS
val execToDecommission = getCandidateExecutorToDecom.get
logInfo(s"Decommissioning executor ${execToDecommission}")
+
+ // Decommission executor and ensure it is not relaunched by setting
adjustTargetNumExecutors
sched.decommissionExecutor(
execToDecommission,
- ExecutorDecommissionInfo("", isHostDecommissioned = false))
+ ExecutorDecommissionInfo("", isHostDecommissioned = false),
+ adjustTargetNumExecutors = true)
val decomTime = new SystemClock().getTimeMillis()
// Wait for job to finish.
@@ -276,6 +279,8 @@ class BlockManagerDecommissionIntegrationSuite extends
SparkFunSuite with LocalS
}
// Wait for the executor to be removed automatically after migration.
+ // This is set to a high value since github actions is sometimes high
latency
+ // but I've never seen this go for more than a minute.
assert(executorRemovedSem.tryAcquire(1, 5L, TimeUnit.MINUTES))
// Since the RDD is cached or shuffled so further usage of same RDD should
use the
diff --git a/project/SparkBuild.scala b/project/SparkBuild.scala
index 5a3ac21..110c311 100644
--- a/project/SparkBuild.scala
+++ b/project/SparkBuild.scala
@@ -838,6 +838,8 @@ object Unidoc {
f.getCanonicalPath.contains("org/apache/spark/shuffle") &&
!f.getCanonicalPath.contains("org/apache/spark/shuffle/api")))
.map(_.filterNot(_.getCanonicalPath.contains("org/apache/spark/executor")))
+
.map(_.filterNot(_.getCanonicalPath.contains("org/apache/spark/ExecutorAllocationClient")))
+
.map(_.filterNot(_.getCanonicalPath.contains("org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend")))
.map(_.filterNot(f =>
f.getCanonicalPath.contains("org/apache/spark/unsafe") &&
!f.getCanonicalPath.contains("org/apache/spark/unsafe/types/CalendarInterval")))
diff --git
a/resource-managers/kubernetes/docker/src/main/dockerfiles/spark/decom.sh
b/resource-managers/kubernetes/docker/src/main/dockerfiles/spark/decom.sh
index 8a5208d..cd973df 100755
--- a/resource-managers/kubernetes/docker/src/main/dockerfiles/spark/decom.sh
+++ b/resource-managers/kubernetes/docker/src/main/dockerfiles/spark/decom.sh
@@ -32,4 +32,4 @@ timeout 60 tail --pid=${WORKER_PID} -f /dev/null
date
echo "Done"
date
-sleep 30
+sleep 1
diff --git
a/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/KubernetesSuite.scala
b/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/KubernetesSuite.scala
index 279386d..28ab371 100644
---
a/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/KubernetesSuite.scala
+++
b/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/KubernetesSuite.scala
@@ -279,6 +279,7 @@ class KubernetesSuite extends SparkFunSuite
appArgs = appArgs)
val execPods = scala.collection.mutable.Map[String, Pod]()
+ val podsDeleted = scala.collection.mutable.HashSet[String]()
val (patienceInterval, patienceTimeout) = {
executorPatience match {
case Some(patience) => (patience._1.getOrElse(INTERVAL),
patience._2.getOrElse(TIMEOUT))
@@ -339,27 +340,21 @@ class KubernetesSuite extends SparkFunSuite
}
// Delete the pod to simulate cluster scale down/migration.
// This will allow the pod to remain up for the grace period
- val pod = kubernetesTestComponents.kubernetesClient.pods()
- .withName(name)
- pod.delete()
+ kubernetesTestComponents.kubernetesClient.pods()
+ .withName(name).delete()
logDebug(s"Triggered pod decom/delete: $name deleted")
- // Look for the string that indicates we should force kill the
first
- // Executor. This simulates the pod being fully lost.
- logDebug("Waiting for second collect...")
+ // Make sure this pod is deleted
Eventually.eventually(TIMEOUT, INTERVAL) {
- assert(kubernetesTestComponents.kubernetesClient
- .pods()
- .withName(driverPodName)
- .getLog
- .contains("Waiting some more, please kill exec 1."),
- "Decommission test did not complete second collect.")
+ assert(podsDeleted.contains(name))
+ }
+ // Then make sure this pod is replaced
+ Eventually.eventually(TIMEOUT, INTERVAL) {
+ assert(execPods.size == 3)
}
- logDebug("Force deleting")
- val podNoGrace = pod.withGracePeriod(0)
- podNoGrace.delete()
}
case Action.DELETED | Action.ERROR =>
execPods.remove(name)
+ podsDeleted += name
}
}
})
@@ -388,7 +383,6 @@ class KubernetesSuite extends SparkFunSuite
Eventually.eventually(TIMEOUT, patienceInterval) {
execPods.values.nonEmpty should be (true)
}
- execWatcher.close()
execPods.values.foreach(executorPodChecker(_))
Eventually.eventually(patienceTimeout, patienceInterval) {
expectedLogOnCompletion.foreach { e =>
@@ -400,6 +394,7 @@ class KubernetesSuite extends SparkFunSuite
s"The application did not complete, did not find str ${e}")
}
}
+ execWatcher.close()
}
protected def doBasicDriverPodCheck(driverPod: Pod): Unit = {
diff --git
a/resource-managers/kubernetes/integration-tests/tests/decommissioning.py
b/resource-managers/kubernetes/integration-tests/tests/decommissioning.py
index d34e616..5fcad08 100644
--- a/resource-managers/kubernetes/integration-tests/tests/decommissioning.py
+++ b/resource-managers/kubernetes/integration-tests/tests/decommissioning.py
@@ -47,11 +47,6 @@ if __name__ == "__main__":
print("...")
time.sleep(30)
rdd.count()
- print("Waiting some more, please kill exec 1.")
- print("...")
- time.sleep(30)
- print("Executor node should be deleted now")
- rdd.count()
rdd.collect()
print("Final accumulator value is: " + str(acc.value))
print("Finished waiting, stopping Spark.")
diff --git
a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ExecutorAllocationManager.scala
b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ExecutorAllocationManager.scala
index 58bd56c..a4b7b7a2 100644
---
a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ExecutorAllocationManager.scala
+++
b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ExecutorAllocationManager.scala
@@ -23,7 +23,9 @@ import scala.util.Random
import org.apache.spark.{ExecutorAllocationClient, SparkConf}
import org.apache.spark.internal.Logging
import org.apache.spark.internal.config.Streaming._
+import org.apache.spark.internal.config.Worker.WORKER_DECOMMISSION_ENABLED
import org.apache.spark.resource.ResourceProfile
+import org.apache.spark.scheduler.ExecutorDecommissionInfo
import org.apache.spark.streaming.util.RecurringTimer
import org.apache.spark.util.{Clock, Utils}
@@ -133,7 +135,13 @@ private[streaming] class ExecutorAllocationManager(
logDebug(s"Removable executors (${removableExecIds.size}):
${removableExecIds}")
if (removableExecIds.nonEmpty) {
val execIdToRemove =
removableExecIds(Random.nextInt(removableExecIds.size))
- client.killExecutor(execIdToRemove)
+ if (conf.get(WORKER_DECOMMISSION_ENABLED)) {
+ client.decommissionExecutor(execIdToRemove,
+ ExecutorDecommissionInfo("spark scale down", false),
+ adjustTargetNumExecutors = true)
+ } else {
+ client.killExecutor(execIdToRemove)
+ }
logInfo(s"Requested to kill executor $execIdToRemove")
} else {
logInfo(s"No non-receiver executors to kill")
diff --git
a/streaming/src/test/scala/org/apache/spark/streaming/scheduler/ExecutorAllocationManagerSuite.scala
b/streaming/src/test/scala/org/apache/spark/streaming/scheduler/ExecutorAllocationManagerSuite.scala
index 65efa10..9e06625 100644
---
a/streaming/src/test/scala/org/apache/spark/streaming/scheduler/ExecutorAllocationManagerSuite.scala
+++
b/streaming/src/test/scala/org/apache/spark/streaming/scheduler/ExecutorAllocationManagerSuite.scala
@@ -27,7 +27,9 @@ import org.scalatestplus.mockito.MockitoSugar
import org.apache.spark.{ExecutorAllocationClient, SparkConf}
import org.apache.spark.internal.config.{DYN_ALLOCATION_ENABLED,
DYN_ALLOCATION_TESTING}
import org.apache.spark.internal.config.Streaming._
+import org.apache.spark.internal.config.Worker.WORKER_DECOMMISSION_ENABLED
import org.apache.spark.resource.ResourceProfile
+import org.apache.spark.scheduler.ExecutorDecommissionInfo
import org.apache.spark.streaming.{DummyInputDStream, Seconds,
StreamingContext, TestSuiteBase}
import org.apache.spark.util.{ManualClock, Utils}
@@ -44,11 +46,22 @@ class ExecutorAllocationManagerSuite extends TestSuiteBase
}
test("basic functionality") {
+ basicTest(decommissioning = false)
+ }
+
+ test("basic decommissioning") {
+ basicTest(decommissioning = true)
+ }
+
+ def basicTest(decommissioning: Boolean): Unit = {
// Test that adding batch processing time info to allocation manager
// causes executors to be requested and killed accordingly
+ conf.set(WORKER_DECOMMISSION_ENABLED, decommissioning)
// There is 1 receiver, and exec 1 has been allocated to it
- withAllocationManager(numReceivers = 1) { case (receiverTracker,
allocationManager) =>
+ withAllocationManager(numReceivers = 1, conf = conf) {
+ case (receiverTracker, allocationManager) =>
+
when(receiverTracker.allocatedExecutors).thenReturn(Map(1 -> Some("1")))
/** Add data point for batch processing time and verify executor
allocation */
@@ -83,53 +96,67 @@ class ExecutorAllocationManagerSuite extends TestSuiteBase
Map.empty)}
}
- /** Verify that a particular executor was killed */
- def verifyKilledExec(expectedKilledExec: Option[String]): Unit = {
- if (expectedKilledExec.nonEmpty) {
- verify(allocationClient,
times(1)).killExecutor(meq(expectedKilledExec.get))
+ /** Verify that a particular executor was scaled down. */
+ def verifyScaledDownExec(expectedExec: Option[String]): Unit = {
+ if (expectedExec.nonEmpty) {
+ val decomInfo = ExecutorDecommissionInfo("spark scale down", false)
+ if (decommissioning) {
+ verify(allocationClient, times(1)).decommissionExecutor(
+ meq(expectedExec.get), meq(decomInfo), meq(true))
+ verify(allocationClient, never).killExecutor(meq(expectedExec.get))
+ } else {
+ verify(allocationClient,
times(1)).killExecutor(meq(expectedExec.get))
+ verify(allocationClient, never).decommissionExecutor(
+ meq(expectedExec.get), meq(decomInfo), meq(true))
+ }
} else {
- verify(allocationClient, never).killExecutor(null)
+ if (decommissioning) {
+ verify(allocationClient, never).decommissionExecutor(null, null,
false)
+ verify(allocationClient, never).decommissionExecutor(null, null,
true)
+ } else {
+ verify(allocationClient, never).killExecutor(null)
+ }
}
}
// Batch proc time = batch interval, should increase allocation by 1
addBatchProcTimeAndVerifyAllocation(batchDurationMillis) {
verifyTotalRequestedExecs(Some(3)) // one already allocated, increase
allocation by 1
- verifyKilledExec(None)
+ verifyScaledDownExec(None)
}
// Batch proc time = batch interval * 2, should increase allocation by 2
addBatchProcTimeAndVerifyAllocation(batchDurationMillis * 2) {
verifyTotalRequestedExecs(Some(4))
- verifyKilledExec(None)
+ verifyScaledDownExec(None)
}
// Batch proc time slightly more than the scale up ratio, should
increase allocation by 1
addBatchProcTimeAndVerifyAllocation(
batchDurationMillis *
STREAMING_DYN_ALLOCATION_SCALING_UP_RATIO.defaultValue.get + 1) {
verifyTotalRequestedExecs(Some(3))
- verifyKilledExec(None)
+ verifyScaledDownExec(None)
}
// Batch proc time slightly less than the scale up ratio, should not
change allocation
addBatchProcTimeAndVerifyAllocation(
batchDurationMillis *
STREAMING_DYN_ALLOCATION_SCALING_UP_RATIO.defaultValue.get - 1) {
verifyTotalRequestedExecs(None)
- verifyKilledExec(None)
+ verifyScaledDownExec(None)
}
// Batch proc time slightly more than the scale down ratio, should not
change allocation
addBatchProcTimeAndVerifyAllocation(
batchDurationMillis *
STREAMING_DYN_ALLOCATION_SCALING_DOWN_RATIO.defaultValue.get + 1) {
verifyTotalRequestedExecs(None)
- verifyKilledExec(None)
+ verifyScaledDownExec(None)
}
// Batch proc time slightly more than the scale down ratio, should not
change allocation
addBatchProcTimeAndVerifyAllocation(
batchDurationMillis *
STREAMING_DYN_ALLOCATION_SCALING_DOWN_RATIO.defaultValue.get - 1) {
verifyTotalRequestedExecs(None)
- verifyKilledExec(Some("2"))
+ verifyScaledDownExec(Some("2"))
}
}
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]