This is an automated email from the ASF dual-hosted git repository.
holden pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push:
new 375d348 [SPARK-31197][CORE] Shutdown executor once we are done
decommissioning
375d348 is described below
commit 375d348a83e6ffa38dfaece5047633f67aee1da5
Author: Holden Karau <[email protected]>
AuthorDate: Wed Aug 5 16:28:14 2020 -0700
[SPARK-31197][CORE] Shutdown executor once we are done decommissioning
### What changes were proposed in this pull request?
Exit the executor when it has been asked to decommission and there is
nothing left for it to do.
This is a rebase of https://github.com/apache/spark/pull/28817
### Why are the changes needed?
If we want to use decommissioning in Spark's own scale down we should
terminate the executor once finished.
Furthermore, in graceful shutdown it makes sense to release resources we no
longer need if we've been asked to shutdown by the cluster manager instead of
always holding the resources as long as possible.
### Does this PR introduce _any_ user-facing change?
The decommissioned executors will exit and the end of decommissioning. This
is sort of a user facing change, however decommissioning hasn't been in any
releases yet.
### How was this patch tested?
I changed the unit test to not send the executor exit message and still
wait on the executor exited message.
Closes #29211 from holdenk/SPARK-31197-exit-execs-redone.
Authored-by: Holden Karau <[email protected]>
Signed-off-by: Holden Karau <[email protected]>
---
.../org/apache/spark/deploy/DeployMessage.scala | 2 -
.../org/apache/spark/deploy/worker/Worker.scala | 2 +-
.../executor/CoarseGrainedExecutorBackend.scala | 58 ++++++++-
.../cluster/CoarseGrainedClusterMessage.scala | 3 +
.../cluster/CoarseGrainedSchedulerBackend.scala | 10 ++
.../org/apache/spark/storage/BlockManager.scala | 8 ++
.../spark/storage/BlockManagerDecommissioner.scala | 96 +++++++++++---
.../spark/scheduler/WorkerDecommissionSuite.scala | 19 ++-
.../BlockManagerDecommissionIntegrationSuite.scala | 17 ++-
.../BlockManagerDecommissionUnitSuite.scala | 139 ++++++++++++++++++++-
10 files changed, 310 insertions(+), 44 deletions(-)
diff --git a/core/src/main/scala/org/apache/spark/deploy/DeployMessage.scala
b/core/src/main/scala/org/apache/spark/deploy/DeployMessage.scala
index c8c6e5a..b7a64d75 100644
--- a/core/src/main/scala/org/apache/spark/deploy/DeployMessage.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/DeployMessage.scala
@@ -165,8 +165,6 @@ private[deploy] object DeployMessages {
case object ReregisterWithMaster // used when a worker attempts to reconnect
to a master
- case object DecommissionSelf // Mark as decommissioned. May be Master to
Worker in the future.
-
// AppClient to Master
case class RegisterApplication(appDescription: ApplicationDescription,
driver: RpcEndpointRef)
diff --git a/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala
b/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala
index aa8c46f..862e685 100755
--- a/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala
@@ -668,7 +668,7 @@ private[deploy] class Worker(
finishedApps += id
maybeCleanupApplication(id)
- case DecommissionSelf =>
+ case WorkerDecommission(_, _) =>
decommissionSelf()
}
diff --git
a/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala
b/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala
index def125b..55fb76b 100644
---
a/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala
+++
b/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala
@@ -64,7 +64,6 @@ private[spark] class CoarseGrainedExecutorBackend(
private[this] val stopping = new AtomicBoolean(false)
var executor: Executor = null
- @volatile private var decommissioned = false
@volatile var driver: Option[RpcEndpointRef] = None
// If this CoarseGrainedExecutorBackend is changed to support multiple
threads, then this may need
@@ -80,6 +79,8 @@ private[spark] class CoarseGrainedExecutorBackend(
*/
private[executor] val taskResources = new mutable.HashMap[Long, Map[String,
ResourceInformation]]
+ @volatile private var decommissioned = false
+
override def onStart(): Unit = {
logInfo("Registering PWR handler.")
SignalUtils.register("PWR", "Failed to register SIGPWR handler - " +
@@ -214,6 +215,10 @@ private[spark] class CoarseGrainedExecutorBackend(
case UpdateDelegationTokens(tokenBytes) =>
logInfo(s"Received tokens of ${tokenBytes.length} bytes")
SparkHadoopUtil.get.addDelegationTokens(tokenBytes, env.conf)
+
+ case DecommissionSelf =>
+ logInfo("Received decommission self")
+ decommissionSelf()
}
override def onDisconnected(remoteAddress: RpcAddress): Unit = {
@@ -277,12 +282,59 @@ private[spark] class CoarseGrainedExecutorBackend(
if (executor != null) {
executor.decommission()
}
- logInfo("Done decommissioning self.")
+ // Shutdown the executor once all tasks are gone & any configured
migrations completed.
+ // Detecting migrations completion doesn't need to be perfect and we
want to minimize the
+ // overhead for executors that are not in decommissioning state as
overall that will be
+ // more of the executors. For example, this will not catch a block which
is already in
+ // the process of being put from a remote executor before migration
starts. This trade-off
+ // is viewed as acceptable to minimize introduction of any new locking
structures in critical
+ // code paths.
+
+ val shutdownThread = new Thread("wait-for-blocks-to-migrate") {
+ override def run(): Unit = {
+ var lastTaskRunningTime = System.nanoTime()
+ val sleep_time = 1000 // 1s
+
+ while (true) {
+ logInfo("Checking to see if we can shutdown.")
+ Thread.sleep(sleep_time)
+ if (executor == null || executor.numRunningTasks == 0) {
+ if (env.conf.get(STORAGE_DECOMMISSION_ENABLED)) {
+ logInfo("No running tasks, checking migrations")
+ val (migrationTime, allBlocksMigrated) =
env.blockManager.lastMigrationInfo()
+ // We can only trust allBlocksMigrated boolean value if there
were no tasks running
+ // since the start of computing it.
+ if (allBlocksMigrated && (migrationTime >
lastTaskRunningTime)) {
+ logInfo("No running tasks, all blocks migrated, stopping.")
+ exitExecutor(0, "Finished decommissioning", notifyDriver =
true)
+ } else {
+ logInfo("All blocks not yet migrated.")
+ }
+ } else {
+ logInfo("No running tasks, no block migration configured,
stopping.")
+ exitExecutor(0, "Finished decommissioning", notifyDriver =
true)
+ }
+ } else {
+ logInfo("Blocked from shutdown by running
${executor.numRunningtasks} tasks")
+ // If there is a running task it could store blocks, so make
sure we wait for a
+ // migration loop to complete after the last task is done.
+ // Note: this is only advanced if there is a running task, if
there
+ // is no running task but the blocks are not done migrating this
does not
+ // move forward.
+ lastTaskRunningTime = System.nanoTime()
+ }
+ }
+ }
+ }
+ shutdownThread.setDaemon(true)
+ shutdownThread.start()
+
+ logInfo("Will exit when finished decommissioning")
// Return true since we are handling a signal
true
} catch {
case e: Exception =>
- logError(s"Error ${e} during attempt to decommission self")
+ logError("Unexpected error while decommissioning self", e)
false
}
}
diff --git
a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedClusterMessage.scala
b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedClusterMessage.scala
index 91485f0..7242ab7 100644
---
a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedClusterMessage.scala
+++
b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedClusterMessage.scala
@@ -136,4 +136,7 @@ private[spark] object CoarseGrainedClusterMessages {
// The message to check if `CoarseGrainedSchedulerBackend` thinks the
executor is alive or not.
case class IsExecutorAlive(executorId: String) extends
CoarseGrainedClusterMessage
+
+ // Used to ask an executor to decommission itself. (Can be an internal
message)
+ case object DecommissionSelf extends CoarseGrainedClusterMessage
}
diff --git
a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala
b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala
index 8fbefae..d81a617 100644
---
a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala
+++
b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala
@@ -442,6 +442,16 @@ class CoarseGrainedSchedulerBackend(scheduler:
TaskSchedulerImpl, val rpcEnv: Rp
case e: Exception =>
logError(s"Unexpected error during decommissioning ${e.toString}",
e)
}
+ // Send decommission message to the executor, this may be a duplicate
since the executor
+ // could have been the one to notify us. But it's also possible the
notification came from
+ // elsewhere and the executor does not yet know.
+ executorDataMap.get(executorId) match {
+ case Some(executorInfo) =>
+ executorInfo.executorEndpoint.send(DecommissionSelf)
+ case None =>
+ // Ignoring the executor since it is not registered.
+ logWarning(s"Attempted to decommission unknown executor
$executorId.")
+ }
logInfo(s"Finished decommissioning executor $executorId.")
if (conf.get(STORAGE_DECOMMISSION_ENABLED)) {
diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala
b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala
index 47af854..6ec93df 100644
--- a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala
+++ b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala
@@ -1822,6 +1822,14 @@ private[spark] class BlockManager(
}
}
+ /*
+ * Returns the last migration time and a boolean denoting if all the blocks
have been migrated.
+ * If there are any tasks running since that time the boolean may be
incorrect.
+ */
+ private[spark] def lastMigrationInfo(): (Long, Boolean) = {
+ decommissioner.map(_.lastMigrationInfo()).getOrElse((0, false))
+ }
+
private[storage] def getMigratableRDDBlocks(): Seq[ReplicateBlock] =
master.getReplicateInfoForRDDBlocks(blockManagerId)
diff --git
a/core/src/main/scala/org/apache/spark/storage/BlockManagerDecommissioner.scala
b/core/src/main/scala/org/apache/spark/storage/BlockManagerDecommissioner.scala
index 1cc7ef6..f0a8e47 100644
---
a/core/src/main/scala/org/apache/spark/storage/BlockManagerDecommissioner.scala
+++
b/core/src/main/scala/org/apache/spark/storage/BlockManagerDecommissioner.scala
@@ -18,6 +18,7 @@
package org.apache.spark.storage
import java.util.concurrent.ExecutorService
+import java.util.concurrent.atomic.AtomicInteger
import scala.collection.JavaConverters._
import scala.collection.mutable
@@ -41,6 +42,12 @@ private[storage] class BlockManagerDecommissioner(
private val maxReplicationFailuresForDecommission =
conf.get(config.STORAGE_DECOMMISSION_MAX_REPLICATION_FAILURE_PER_BLOCK)
+ // Used for tracking if our migrations are complete. Readable for testing
+ @volatile private[storage] var lastRDDMigrationTime: Long = 0
+ @volatile private[storage] var lastShuffleMigrationTime: Long = 0
+ @volatile private[storage] var rddBlocksLeft: Boolean = true
+ @volatile private[storage] var shuffleBlocksLeft: Boolean = true
+
/**
* This runnable consumes any shuffle blocks in the queue for migration.
This part of a
* producer/consumer where the main migration loop updates the queue of
blocks to be migrated
@@ -91,10 +98,11 @@ private[storage] class BlockManagerDecommissioner(
null)// class tag, we don't need for shuffle
logDebug(s"Migrated sub block ${blockId}")
}
- logInfo(s"Migrated ${shuffleBlockInfo} to ${peer}")
+ logDebug(s"Migrated ${shuffleBlockInfo} to ${peer}")
} else {
logError(s"Skipping block ${shuffleBlockInfo} because it has
failed ${retryCount}")
}
+ numMigratedShuffles.incrementAndGet()
}
}
// This catch is intentionally outside of the while running block.
@@ -115,12 +123,21 @@ private[storage] class BlockManagerDecommissioner(
// Shuffles which are either in queue for migrations or migrated
private val migratingShuffles = mutable.HashSet[ShuffleBlockInfo]()
+ // Shuffles which have migrated. This used to know when we are "done", being
done can change
+ // if a new shuffle file is created by a running task.
+ private val numMigratedShuffles = new AtomicInteger(0)
+
// Shuffles which are queued for migration & number of retries so far.
+ // Visible in storage for testing.
private[storage] val shufflesToMigrate =
new java.util.concurrent.ConcurrentLinkedQueue[(ShuffleBlockInfo, Int)]()
// Set if we encounter an error attempting to migrate and stop.
@volatile private var stopped = false
+ @volatile private var stoppedRDD =
+ !conf.get(config.STORAGE_DECOMMISSION_RDD_BLOCKS_ENABLED)
+ @volatile private var stoppedShuffle =
+ !conf.get(config.STORAGE_DECOMMISSION_SHUFFLE_BLOCKS_ENABLED)
private val migrationPeers =
mutable.HashMap[BlockManagerId, ShuffleMigrationRunnable]()
@@ -133,22 +150,31 @@ private[storage] class BlockManagerDecommissioner(
override def run(): Unit = {
assert(conf.get(config.STORAGE_DECOMMISSION_RDD_BLOCKS_ENABLED))
- while (!stopped && !Thread.interrupted()) {
+ while (!stopped && !stoppedRDD && !Thread.interrupted()) {
logInfo("Iterating on migrating from the block manager.")
+ // Validate we have peers to migrate to.
+ val peers = bm.getPeers(false)
+ // If we have no peers give up.
+ if (peers.isEmpty) {
+ stopped = true
+ stoppedRDD = true
+ }
try {
+ val startTime = System.nanoTime()
logDebug("Attempting to replicate all cached RDD blocks")
- decommissionRddCacheBlocks()
+ rddBlocksLeft = decommissionRddCacheBlocks()
+ lastRDDMigrationTime = startTime
logInfo("Attempt to replicate all cached blocks done")
logInfo(s"Waiting for ${sleepInterval} before refreshing
migrations.")
Thread.sleep(sleepInterval)
} catch {
case e: InterruptedException =>
- logInfo("Interrupted during migration, will not refresh
migrations.")
- stopped = true
+ logInfo("Interrupted during RDD migration, stopping")
+ stoppedRDD = true
case NonFatal(e) =>
- logError("Error occurred while trying to replicate for block
manager decommissioning.",
+ logError("Error occurred replicating RDD for block manager
decommissioning.",
e)
- stopped = true
+ stoppedRDD = true
}
}
}
@@ -162,20 +188,22 @@ private[storage] class BlockManagerDecommissioner(
override def run() {
assert(conf.get(config.STORAGE_DECOMMISSION_SHUFFLE_BLOCKS_ENABLED))
- while (!stopped && !Thread.interrupted()) {
+ while (!stopped && !stoppedShuffle && !Thread.interrupted()) {
try {
logDebug("Attempting to replicate all shuffle blocks")
- refreshOffloadingShuffleBlocks()
+ val startTime = System.nanoTime()
+ shuffleBlocksLeft = refreshOffloadingShuffleBlocks()
+ lastShuffleMigrationTime = startTime
logInfo("Done starting workers to migrate shuffle blocks")
Thread.sleep(sleepInterval)
} catch {
case e: InterruptedException =>
logInfo("Interrupted during migration, will not refresh
migrations.")
- stopped = true
+ stoppedShuffle = true
case NonFatal(e) =>
logError("Error occurred while trying to replicate for block
manager decommissioning.",
e)
- stopped = true
+ stoppedShuffle = true
}
}
}
@@ -191,8 +219,9 @@ private[storage] class BlockManagerDecommissioner(
* but rather shadows them.
* Requires an Indexed based shuffle resolver.
* Note: if called in testing please call stopOffloadingShuffleBlocks to
avoid thread leakage.
+ * Returns true if we are not done migrating shuffle blocks.
*/
- private[storage] def refreshOffloadingShuffleBlocks(): Unit = {
+ private[storage] def refreshOffloadingShuffleBlocks(): Boolean = {
// Update the queue of shuffles to be migrated
logInfo("Offloading shuffle blocks")
val localShuffles = bm.migratableResolver.getStoredShuffles().toSet
@@ -215,6 +244,12 @@ private[storage] class BlockManagerDecommissioner(
deadPeers.foreach { peer =>
migrationPeers.get(peer).foreach(_.running = false)
}
+ // If we don't have anyone to migrate to give up
+ if (migrationPeers.values.find(_.running == true).isEmpty) {
+ stoppedShuffle = true
+ }
+ // If we found any new shuffles to migrate or otherwise have not migrated
everything.
+ newShufflesToMigrate.nonEmpty || migratingShuffles.size <
numMigratedShuffles.get()
}
/**
@@ -231,16 +266,18 @@ private[storage] class BlockManagerDecommissioner(
/**
* Tries to offload all cached RDD blocks from this BlockManager to peer
BlockManagers
* Visible for testing
+ * Returns true if we have not migrated all of our RDD blocks.
*/
- private[storage] def decommissionRddCacheBlocks(): Unit = {
+ private[storage] def decommissionRddCacheBlocks(): Boolean = {
val replicateBlocksInfo = bm.getMigratableRDDBlocks()
+ // Refresh peers and validate we have somewhere to move blocks.
if (replicateBlocksInfo.nonEmpty) {
logInfo(s"Need to replicate ${replicateBlocksInfo.size} RDD blocks " +
"for block manager decommissioning")
} else {
logWarning(s"Asked to decommission RDD cache blocks, but no blocks to
migrate")
- return
+ return false
}
// TODO: We can sort these blocks based on some policy (LRU/blockSize etc)
@@ -252,7 +289,9 @@ private[storage] class BlockManagerDecommissioner(
if (blocksFailedReplication.nonEmpty) {
logWarning("Blocks failed replication in cache decommissioning " +
s"process: ${blocksFailedReplication.mkString(",")}")
+ return true
}
+ return false
}
private def migrateBlock(blockToReplicate: ReplicateBlock): Boolean = {
@@ -327,4 +366,33 @@ private[storage] class BlockManagerDecommissioner(
}
logInfo("Stopped storage decommissioner")
}
+
+ /*
+ * Returns the last migration time and a boolean for if all blocks have
been migrated.
+ * The last migration time is calculated to be the minimum of the last
migration of any
+ * running migration (and if there are now current running migrations it is
set to current).
+ * This provides a timeStamp which, if there have been no tasks running
since that time
+ * we can know that all potential blocks that can be have been migrated off.
+ */
+ private[storage] def lastMigrationInfo(): (Long, Boolean) = {
+ if (stopped || (stoppedRDD && stoppedShuffle)) {
+ // Since we don't have anything left to migrate ever (since we don't
restart once
+ // stopped), return that we're done with a validity timestamp that
doesn't expire.
+ (Long.MaxValue, true)
+ } else {
+ // Chose the min of the active times. See the function description for
more information.
+ val lastMigrationTime = if (!stoppedRDD && !stoppedShuffle) {
+ Math.min(lastRDDMigrationTime, lastShuffleMigrationTime)
+ } else if (!stoppedShuffle) {
+ lastShuffleMigrationTime
+ } else {
+ lastRDDMigrationTime
+ }
+
+ // Technically we could have blocks left if we encountered an error, but
those blocks will
+ // never be migrated, so we don't care about them.
+ val blocksMigrated = (!shuffleBlocksLeft || stoppedShuffle) &&
(!rddBlocksLeft || stoppedRDD)
+ (lastMigrationTime, blocksMigrated)
+ }
+ }
}
diff --git
a/core/src/test/scala/org/apache/spark/scheduler/WorkerDecommissionSuite.scala
b/core/src/test/scala/org/apache/spark/scheduler/WorkerDecommissionSuite.scala
index 3c34070..bb0c33a 100644
---
a/core/src/test/scala/org/apache/spark/scheduler/WorkerDecommissionSuite.scala
+++
b/core/src/test/scala/org/apache/spark/scheduler/WorkerDecommissionSuite.scala
@@ -47,7 +47,12 @@ class WorkerDecommissionSuite extends SparkFunSuite with
LocalSparkContext {
assert(sleepyRdd.count() === 10)
}
- test("verify a task with all workers decommissioned succeeds") {
+ test("verify a running task with all workers decommissioned succeeds") {
+ // Wait for the executors to come up
+ TestUtils.waitUntilExecutorsUp(sc = sc,
+ numExecutors = 2,
+ timeout = 30000) // 30s
+
val input = sc.parallelize(1 to 10)
// Listen for the job
val sem = new Semaphore(0)
@@ -56,9 +61,7 @@ class WorkerDecommissionSuite extends SparkFunSuite with
LocalSparkContext {
sem.release()
}
})
- TestUtils.waitUntilExecutorsUp(sc = sc,
- numExecutors = 2,
- timeout = 30000) // 30s
+
val sleepyRdd = input.mapPartitions{ x =>
Thread.sleep(5000) // 5s
x
@@ -76,13 +79,5 @@ class WorkerDecommissionSuite extends SparkFunSuite with
LocalSparkContext {
execs.foreach(execId => sched.decommissionExecutor(execId,
ExecutorDecommissionInfo("", false)))
val asyncCountResult = ThreadUtils.awaitResult(asyncCount, 20.seconds)
assert(asyncCountResult === 10)
- // Try and launch task after decommissioning, this should fail
- val postDecommissioned = input.map(x => x)
- val postDecomAsyncCount = postDecommissioned.countAsync()
- val thrown = intercept[java.util.concurrent.TimeoutException]{
- val result = ThreadUtils.awaitResult(postDecomAsyncCount, 20.seconds)
- }
- assert(postDecomAsyncCount.isCompleted === false,
- "After exec decommission new task could not launch")
}
}
diff --git
a/core/src/test/scala/org/apache/spark/storage/BlockManagerDecommissionIntegrationSuite.scala
b/core/src/test/scala/org/apache/spark/storage/BlockManagerDecommissionIntegrationSuite.scala
index 6a52f72..25145da 100644
---
a/core/src/test/scala/org/apache/spark/storage/BlockManagerDecommissionIntegrationSuite.scala
+++
b/core/src/test/scala/org/apache/spark/storage/BlockManagerDecommissionIntegrationSuite.scala
@@ -17,7 +17,7 @@
package org.apache.spark.storage
-import java.util.concurrent.{ConcurrentHashMap, ConcurrentLinkedQueue,
Semaphore}
+import java.util.concurrent.{ConcurrentHashMap, ConcurrentLinkedQueue,
Semaphore, TimeUnit}
import scala.collection.JavaConverters._
import scala.collection.mutable.ArrayBuffer
@@ -69,9 +69,9 @@ class BlockManagerDecommissionIntegrationSuite extends
SparkFunSuite with LocalS
.set(config.STORAGE_DECOMMISSION_ENABLED, true)
.set(config.STORAGE_DECOMMISSION_RDD_BLOCKS_ENABLED, persist)
.set(config.STORAGE_DECOMMISSION_SHUFFLE_BLOCKS_ENABLED, shuffle)
- // Just replicate blocks as fast as we can during testing, there isn't
another
+ // Just replicate blocks quickly during testing, there isn't another
// workload we need to worry about.
- .set(config.STORAGE_DECOMMISSION_REPLICATION_REATTEMPT_INTERVAL, 1L)
+ .set(config.STORAGE_DECOMMISSION_REPLICATION_REATTEMPT_INTERVAL, 10L)
if (whenToDecom == TaskStarted) {
// We are using accumulators below, make sure those are reported
frequently.
@@ -266,18 +266,17 @@ class BlockManagerDecommissionIntegrationSuite extends
SparkFunSuite with LocalS
val execIdToBlocksMapping = storageStatus.map(
status => (status.blockManagerId.executorId, status.blocks)).toMap
// No cached blocks should be present on executor which was decommissioned
-
assert(execIdToBlocksMapping(execToDecommission).keys.filter(_.isRDD).toSeq ===
Seq(),
+ assert(
+ !execIdToBlocksMapping.contains(execToDecommission) ||
+ execIdToBlocksMapping(execToDecommission).keys.filter(_.isRDD).toSeq ===
Seq(),
"Cache blocks should be migrated")
if (persist) {
// There should still be all the RDD blocks cached
assert(execIdToBlocksMapping.values.flatMap(_.keys).count(_.isRDD) ===
numParts)
}
- // Make the executor we decommissioned exit
- sched.client.killExecutors(List(execToDecommission))
-
- // Wait for the executor to be removed
- executorRemovedSem.acquire(1)
+ // Wait for the executor to be removed automatically after migration.
+ assert(executorRemovedSem.tryAcquire(1, 5L, TimeUnit.MINUTES))
// Since the RDD is cached or shuffled so further usage of same RDD should
use the
// cached data. Original RDD partitions should not be recomputed i.e. accum
diff --git
a/core/src/test/scala/org/apache/spark/storage/BlockManagerDecommissionUnitSuite.scala
b/core/src/test/scala/org/apache/spark/storage/BlockManagerDecommissionUnitSuite.scala
index 41b68d5..74ad8bd 100644
---
a/core/src/test/scala/org/apache/spark/storage/BlockManagerDecommissionUnitSuite.scala
+++
b/core/src/test/scala/org/apache/spark/storage/BlockManagerDecommissionUnitSuite.scala
@@ -20,7 +20,7 @@ package org.apache.spark.storage
import scala.concurrent.duration._
import org.mockito.{ArgumentMatchers => mc}
-import org.mockito.Mockito.{mock, times, verify, when}
+import org.mockito.Mockito.{atLeast => least, mock, times, verify, when}
import org.scalatest.concurrent.Eventually._
import org.scalatest.matchers.must.Matchers
@@ -38,6 +38,9 @@ class BlockManagerDecommissionUnitSuite extends SparkFunSuite
with Matchers {
private val sparkConf = new SparkConf(false)
.set(config.STORAGE_DECOMMISSION_SHUFFLE_BLOCKS_ENABLED, true)
.set(config.STORAGE_DECOMMISSION_RDD_BLOCKS_ENABLED, true)
+ // Just replicate blocks quickly during testing, as there isn't another
+ // workload we need to worry about.
+ .set(config.STORAGE_DECOMMISSION_REPLICATION_REATTEMPT_INTERVAL, 10L)
private def registerShuffleBlocks(
mockMigratableShuffleResolver: MigratableResolver,
@@ -54,6 +57,113 @@ class BlockManagerDecommissionUnitSuite extends
SparkFunSuite with Matchers {
}
}
+ /**
+ * Validate a given configuration with the mocks.
+ * The fail variable controls if we expect migration to fail, in which case
we expect
+ * a constant Long.MaxValue timestamp.
+ */
+ private def validateDecommissionTimestamps(conf: SparkConf, bm: BlockManager,
+ migratableShuffleBlockResolver: MigratableResolver, fail: Boolean =
false) = {
+ // Verify the decommissioning manager timestamps and status
+ val bmDecomManager = new BlockManagerDecommissioner(conf, bm)
+ var previousTime: Option[Long] = None
+ try {
+ bmDecomManager.start()
+ eventually(timeout(100.second), interval(10.milliseconds)) {
+ val (currentTime, done) = bmDecomManager.lastMigrationInfo()
+ assert(done)
+ // Make sure the time stamp starts moving forward.
+ if (!fail) {
+ previousTime match {
+ case None =>
+ previousTime = Some(currentTime)
+ assert(false)
+ case Some(t) =>
+ assert(t < currentTime)
+ }
+ } else {
+ // If we expect migration to fail we should get the max value
quickly.
+ assert(currentTime === Long.MaxValue)
+ }
+ }
+ if (!fail) {
+ // Wait 5 seconds and assert times keep moving forward.
+ Thread.sleep(5000)
+ val (currentTime, done) = bmDecomManager.lastMigrationInfo()
+ assert(done && currentTime > previousTime.get)
+ }
+ } finally {
+ bmDecomManager.stop()
+ }
+ }
+
+ test("test that with no blocks we finish migration") {
+ // Set up the mocks so we return empty
+ val bm = mock(classOf[BlockManager])
+ val migratableShuffleBlockResolver = mock(classOf[MigratableResolver])
+ when(migratableShuffleBlockResolver.getStoredShuffles())
+ .thenReturn(Seq())
+ when(bm.migratableResolver).thenReturn(migratableShuffleBlockResolver)
+ when(bm.getMigratableRDDBlocks())
+ .thenReturn(Seq())
+ when(bm.getPeers(mc.any()))
+ .thenReturn(Seq(BlockManagerId("exec2", "host2", 12345)))
+
+ // Verify the decom manager handles this correctly
+ validateDecommissionTimestamps(sparkConf, bm,
migratableShuffleBlockResolver)
+ }
+
+ test("block decom manager with no migrations configured") {
+ val bm = mock(classOf[BlockManager])
+ val migratableShuffleBlockResolver = mock(classOf[MigratableResolver])
+ registerShuffleBlocks(migratableShuffleBlockResolver, Set((1, 1L, 1)))
+ when(bm.migratableResolver).thenReturn(migratableShuffleBlockResolver)
+ when(bm.getMigratableRDDBlocks())
+ .thenReturn(Seq())
+ when(bm.getPeers(mc.any()))
+ .thenReturn(Seq(BlockManagerId("exec2", "host2", 12345)))
+
+ val badConf = new SparkConf(false)
+ .set(config.STORAGE_DECOMMISSION_SHUFFLE_BLOCKS_ENABLED, false)
+ .set(config.STORAGE_DECOMMISSION_RDD_BLOCKS_ENABLED, false)
+ .set(config.STORAGE_DECOMMISSION_REPLICATION_REATTEMPT_INTERVAL, 10L)
+ // Verify the decom manager handles this correctly
+ validateDecommissionTimestamps(badConf, bm, migratableShuffleBlockResolver,
+ fail = true)
+ }
+
+ test("block decom manager with no peers") {
+ // Set up the mocks so we return one shuffle block
+ val bm = mock(classOf[BlockManager])
+ val migratableShuffleBlockResolver = mock(classOf[MigratableResolver])
+ registerShuffleBlocks(migratableShuffleBlockResolver, Set((1, 1L, 1)))
+ when(bm.migratableResolver).thenReturn(migratableShuffleBlockResolver)
+ when(bm.getMigratableRDDBlocks())
+ .thenReturn(Seq())
+ when(bm.getPeers(mc.any()))
+ .thenReturn(Seq())
+
+ // Verify the decom manager handles this correctly
+ validateDecommissionTimestamps(sparkConf, bm,
migratableShuffleBlockResolver,
+ fail = true)
+ }
+
+
+ test("block decom manager with only shuffle files time moves forward") {
+ // Set up the mocks so we return one shuffle block
+ val bm = mock(classOf[BlockManager])
+ val migratableShuffleBlockResolver = mock(classOf[MigratableResolver])
+ registerShuffleBlocks(migratableShuffleBlockResolver, Set((1, 1L, 1)))
+ when(bm.migratableResolver).thenReturn(migratableShuffleBlockResolver)
+ when(bm.getMigratableRDDBlocks())
+ .thenReturn(Seq())
+ when(bm.getPeers(mc.any()))
+ .thenReturn(Seq(BlockManagerId("exec2", "host2", 12345)))
+
+ // Verify the decom manager handles this correctly
+ validateDecommissionTimestamps(sparkConf, bm,
migratableShuffleBlockResolver)
+ }
+
test("test shuffle and cached rdd migration without any error") {
val blockTransferService = mock(classOf[BlockTransferService])
val bm = mock(classOf[BlockManager])
@@ -77,13 +187,36 @@ class BlockManagerDecommissionUnitSuite extends
SparkFunSuite with Matchers {
try {
bmDecomManager.start()
- eventually(timeout(5.second), interval(10.milliseconds)) {
+ var previousRDDTime: Option[Long] = None
+ var previousShuffleTime: Option[Long] = None
+
+ // We don't check that all blocks are migrated because out mock is
always returning an RDD.
+ eventually(timeout(100.second), interval(10.milliseconds)) {
assert(bmDecomManager.shufflesToMigrate.isEmpty == true)
- verify(bm, times(1)).replicateBlock(
+ verify(bm, least(1)).replicateBlock(
mc.eq(storedBlockId1), mc.any(), mc.any(), mc.eq(Some(3)))
verify(blockTransferService, times(2))
.uploadBlockSync(mc.eq("host2"), mc.eq(bmPort), mc.eq("exec2"),
mc.any(), mc.any(),
mc.eq(StorageLevel.DISK_ONLY), mc.isNull())
+ // Since we never "finish" the RDD blocks, make sure the time is
always moving forward.
+ assert(bmDecomManager.rddBlocksLeft)
+ previousRDDTime match {
+ case None =>
+ previousRDDTime = Some(bmDecomManager.lastRDDMigrationTime)
+ assert(false)
+ case Some(t) =>
+ assert(bmDecomManager.lastRDDMigrationTime > t)
+ }
+ // Since we do eventually finish the shuffle blocks make sure the
shuffle blocks complete
+ // and that the time keeps moving forward.
+ assert(!bmDecomManager.shuffleBlocksLeft)
+ previousShuffleTime match {
+ case None =>
+ previousShuffleTime = Some(bmDecomManager.lastShuffleMigrationTime)
+ assert(false)
+ case Some(t) =>
+ assert(bmDecomManager.lastShuffleMigrationTime > t)
+ }
}
} finally {
bmDecomManager.stop()
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]