This is an automated email from the ASF dual-hosted git repository.
dongjoon pushed a commit to branch branch-3.3
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/branch-3.3 by this push:
new 5e417ff8b14 [SPARK-44547][CORE] Ignore fallback storage for cached RDD
migration
5e417ff8b14 is described below
commit 5e417ff8b14764d28ccc9561cb268de83b15eff2
Author: Frank Yin <[email protected]>
AuthorDate: Thu Aug 24 22:19:41 2023 -0700
[SPARK-44547][CORE] Ignore fallback storage for cached RDD migration
### What changes were proposed in this pull request?
Fix bugs that makes the RDD decommissioner never finish
### Why are the changes needed?
The cached RDD decommissioner is in a forever retry loop when the only
viable peer is the fallback storage, which it doesn't know how to handle.
### Does this PR introduce _any_ user-facing change?
No
### How was this patch tested?
Tests are added and tested using Spark jobs.
Closes #42155 from ukby1234/franky.SPARK-44547.
Authored-by: Frank Yin <[email protected]>
Signed-off-by: Dongjoon Hyun <[email protected]>
(cherry picked from commit 47555da2ae292b07488ba181db1aceac8e7ddb3a)
Signed-off-by: Dongjoon Hyun <[email protected]>
---
.../spark/storage/BlockManagerDecommissioner.scala | 4 +--
.../BlockManagerDecommissionUnitSuite.scala | 35 +++++++++++++++++++++-
2 files changed, 36 insertions(+), 3 deletions(-)
diff --git
a/core/src/main/scala/org/apache/spark/storage/BlockManagerDecommissioner.scala
b/core/src/main/scala/org/apache/spark/storage/BlockManagerDecommissioner.scala
index ecd64b6695a..32df6388cab 100644
---
a/core/src/main/scala/org/apache/spark/storage/BlockManagerDecommissioner.scala
+++
b/core/src/main/scala/org/apache/spark/storage/BlockManagerDecommissioner.scala
@@ -184,7 +184,7 @@ private[storage] class BlockManagerDecommissioner(
// Set if we encounter an error attempting to migrate and stop.
@volatile private var stopped = false
- @volatile private var stoppedRDD =
+ @volatile private[storage] var stoppedRDD =
!conf.get(config.STORAGE_DECOMMISSION_RDD_BLOCKS_ENABLED)
@volatile private var stoppedShuffle =
!conf.get(config.STORAGE_DECOMMISSION_SHUFFLE_BLOCKS_ENABLED)
@@ -204,7 +204,7 @@ private[storage] class BlockManagerDecommissioner(
logInfo("Attempting to migrate all RDD blocks")
while (!stopped && !stoppedRDD) {
// Validate if we have peers to migrate to. Otherwise, give up
migration.
- if (bm.getPeers(false).isEmpty) {
+ if (!bm.getPeers(false).exists(_ !=
FallbackStorage.FALLBACK_BLOCK_MANAGER_ID)) {
logWarning("No available peers to receive RDD blocks, stop
migration.")
stoppedRDD = true
} else {
diff --git
a/core/src/test/scala/org/apache/spark/storage/BlockManagerDecommissionUnitSuite.scala
b/core/src/test/scala/org/apache/spark/storage/BlockManagerDecommissionUnitSuite.scala
index b7ac378b4c6..f9263f41060 100644
---
a/core/src/test/scala/org/apache/spark/storage/BlockManagerDecommissionUnitSuite.scala
+++
b/core/src/test/scala/org/apache/spark/storage/BlockManagerDecommissionUnitSuite.scala
@@ -20,7 +20,7 @@ package org.apache.spark.storage
import scala.concurrent.duration._
import org.mockito.{ArgumentMatchers => mc}
-import org.mockito.Mockito.{atLeast => least, mock, times, verify, when}
+import org.mockito.Mockito.{atLeast => least, mock, never, times, verify, when}
import org.scalatest.concurrent.Eventually._
import org.scalatest.matchers.must.Matchers
@@ -305,4 +305,37 @@ class BlockManagerDecommissionUnitSuite extends
SparkFunSuite with Matchers {
bmDecomManager.stop()
}
}
+
+ test("SPARK-44547: test cached rdd migration no available hosts") {
+ val blockTransferService = mock(classOf[BlockTransferService])
+ val bm = mock(classOf[BlockManager])
+
+ val storedBlockId1 = RDDBlockId(0, 0)
+ val storedBlock1 =
+ new ReplicateBlock(storedBlockId1, Seq(BlockManagerId("replicaHolder",
"host1", bmPort)), 1)
+
+ val migratableShuffleBlockResolver = mock(classOf[MigratableResolver])
+ registerShuffleBlocks(migratableShuffleBlockResolver, Set())
+ when(bm.getPeers(mc.any()))
+ .thenReturn(Seq(FallbackStorage.FALLBACK_BLOCK_MANAGER_ID))
+
+ when(bm.blockTransferService).thenReturn(blockTransferService)
+ when(bm.migratableResolver).thenReturn(migratableShuffleBlockResolver)
+ when(bm.getMigratableRDDBlocks())
+ .thenReturn(Seq(storedBlock1))
+
+ val bmDecomManager = new BlockManagerDecommissioner(sparkConf, bm)
+
+ try {
+ bmDecomManager.start()
+ eventually(timeout(100.second), interval(10.milliseconds)) {
+ verify(bm, never()).replicateBlock(
+ mc.eq(storedBlockId1), mc.any(), mc.any(), mc.eq(Some(3)))
+ assert(bmDecomManager.rddBlocksLeft)
+ assert(bmDecomManager.stoppedRDD)
+ }
+ } finally {
+ bmDecomManager.stop()
+ }
+ }
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]