This is an automated email from the ASF dual-hosted git repository.
yangjie01 pushed a commit to branch branch-3.5
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/branch-3.5 by this push:
new ca8407b97338 [SPARK-47702][CORE][3.5] Remove Shuffle service endpoint
from the locations list when RDD block is removed form a node
ca8407b97338 is described below
commit ca8407b9733882fd28a6f94fd88aef27f3b648d0
Author: maheshbehera <[email protected]>
AuthorDate: Sat Oct 5 12:56:52 2024 +0800
[SPARK-47702][CORE][3.5] Remove Shuffle service endpoint from the locations
list when RDD block is removed form a node
Credit to maheshk114 for the initial investigation and the fix.
This PR fix a bug where the shuffle service's ID is kept among the block
location list at the removing of a RDD block from a node. Before this change
`StorageLevel.NONE` is used to notify about the block remove which causes the
block manager master ignoring the update of the locations for shuffle service's
IDs (for details please see the method
`BlockManagerMasterEndpoint#updateBlockInfo()` and keep in mind
`StorageLevel.NONE.useDisk` is `false`). But after this change only the replic
[...]
If the block location is not updated properly, then tasks fails with fetch
failed exception. The tasks will try to read the RDD blocks from a node using
external shuffle service. The read will fail, if the node is already
decommissioned.
```
WARN BlockManager [Executor task launch worker for task 25.0 in stage 6.0
(TID 1567)]: Failed to fetch remote block rdd_5_25 from BlockManagerId(4,
vm-92303839, 7337, None) (failed attempt 1)
org.apache.spark.SparkException: Exception thrown in awaitResult:
at
org.apache.spark.util.ThreadUtils$.awaitResult(ThreadUtils.scala:301)
at
org.apache.spark.network.BlockTransferService.fetchBlockSync(BlockTransferService.scala:103)
at
org.apache.spark.storage.BlockManager.fetchRemoteManagedBuffer(BlockManager.scala:1155)
at
org.apache.spark.storage.BlockManager.$anonfun$getRemoteBlock$8(BlockManager.scala:1099)
at scala.Option.orElse(Option.scala:447)
at
org.apache.spark.storage.BlockManager.getRemoteBlock(BlockManager.scala:1099)
at
org.apache.spark.storage.BlockManager.getRemoteValues(BlockManager.scala:1045)
at
org.apache.spark.storage.BlockManager.get(BlockManager.scala:1264)
at
org.apache.spark.storage.BlockManager.getOrElseUpdate(BlockManager.scala:1326)
```
No
Added a new UT.
No
Closes #48357 from attilapiros/SPARK-47702-Spark3.5.
Authored-by: maheshbehera <[email protected]>
Signed-off-by: yangjie01 <[email protected]>
---
.../org/apache/spark/storage/BlockManager.scala | 6 ++--
.../storage/BlockManagerReplicationSuite.scala | 36 ++++++++++++++++++++++
.../apache/spark/storage/BlockManagerSuite.scala | 13 +++++---
3 files changed, 48 insertions(+), 7 deletions(-)
diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala
b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala
index 6de6069d2fea..1b56aa7ade12 100644
--- a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala
+++ b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala
@@ -2082,8 +2082,10 @@ private[spark] class BlockManager(
hasRemoveBlock = true
if (tellMaster) {
// Only update storage level from the captured block status before
deleting, so that
- // memory size and disk size are being kept for calculating delta.
- reportBlockStatus(blockId, blockStatus.get.copy(storageLevel =
StorageLevel.NONE))
+ // memory size and disk size are being kept for calculating delta.
Reset the replica
+ // count 0 in storage level to notify that it is a remove operation.
+ val storageLevel = StorageLevel(blockStatus.get.storageLevel.toInt, 0)
+ reportBlockStatus(blockId, blockStatus.get.copy(storageLevel =
storageLevel))
}
} finally {
if (!hasRemoveBlock) {
diff --git
a/core/src/test/scala/org/apache/spark/storage/BlockManagerReplicationSuite.scala
b/core/src/test/scala/org/apache/spark/storage/BlockManagerReplicationSuite.scala
index 38a669bc8574..29526684c3e9 100644
---
a/core/src/test/scala/org/apache/spark/storage/BlockManagerReplicationSuite.scala
+++
b/core/src/test/scala/org/apache/spark/storage/BlockManagerReplicationSuite.scala
@@ -38,6 +38,8 @@ import org.apache.spark.internal.config.Tests._
import org.apache.spark.memory.UnifiedMemoryManager
import org.apache.spark.network.BlockTransferService
import org.apache.spark.network.netty.NettyBlockTransferService
+import org.apache.spark.network.shuffle.ExternalBlockStoreClient
+import org.apache.spark.network.util.{MapConfigProvider, TransportConf}
import org.apache.spark.rpc.RpcEnv
import org.apache.spark.scheduler.LiveListenerBus
import org.apache.spark.serializer.{KryoSerializer, SerializerManager}
@@ -295,6 +297,40 @@ trait BlockManagerReplicationBehavior extends SparkFunSuite
}
}
+ test("Test block location after replication with
SHUFFLE_SERVICE_FETCH_RDD_ENABLED enabled") {
+ val newConf = conf.clone()
+ newConf.set(SHUFFLE_SERVICE_ENABLED, true)
+ newConf.set(SHUFFLE_SERVICE_FETCH_RDD_ENABLED, true)
+ val blockManagerInfo = new mutable.HashMap[BlockManagerId,
BlockManagerInfo]()
+ val shuffleClient = Some(new ExternalBlockStoreClient(
+ new TransportConf("shuffle", MapConfigProvider.EMPTY),
+ null, false, 5000))
+ master = new BlockManagerMaster(rpcEnv.setupEndpoint("blockmanager-2",
+ new BlockManagerMasterEndpoint(rpcEnv, true, newConf,
+ new LiveListenerBus(newConf), shuffleClient, blockManagerInfo,
mapOutputTracker,
+ sc.env.shuffleManager, isDriver = true)),
+ rpcEnv.setupEndpoint("blockmanagerHeartbeat-2",
+ new BlockManagerMasterHeartbeatEndpoint(rpcEnv, true,
blockManagerInfo)), newConf, true)
+
+ val shuffleServicePort = newConf.get(SHUFFLE_SERVICE_PORT)
+ val store1 = makeBlockManager(10000, "host-1")
+ val store2 = makeBlockManager(10000, "host-2")
+ assert(master.getPeers(store1.blockManagerId).toSet ===
Set(store2.blockManagerId))
+
+ val blockId = RDDBlockId(1, 2)
+ val message = new Array[Byte](1000)
+
+ // if SHUFFLE_SERVICE_FETCH_RDD_ENABLED is enabled, then shuffle port
should be present.
+ store1.putSingle(blockId, message, StorageLevel.DISK_ONLY)
+ assert(master.getLocations(blockId).contains(
+ BlockManagerId("host-1", "localhost", shuffleServicePort, None)))
+
+ // after block is removed, shuffle port should be removed.
+ store1.removeBlock(blockId, true)
+ assert(!master.getLocations(blockId).contains(
+ BlockManagerId("host-1", "localhost", shuffleServicePort, None)))
+ }
+
test("block replication - addition and deletion of block managers") {
val blockSize = 1000
val storeSize = 10000
diff --git
a/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala
b/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala
index ecd66dc2c5fb..728e3a252b7a 100644
--- a/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala
@@ -33,7 +33,7 @@ import scala.reflect.classTag
import com.esotericsoftware.kryo.KryoException
import org.apache.commons.lang3.RandomUtils
import org.mockito.{ArgumentCaptor, ArgumentMatchers => mc}
-import org.mockito.Mockito.{doAnswer, mock, never, spy, times, verify, when}
+import org.mockito.Mockito.{atLeastOnce, doAnswer, mock, never, spy, times,
verify, when}
import org.scalatest.PrivateMethodTester
import org.scalatest.concurrent.{Signaler, ThreadSignaler, TimeLimits}
import org.scalatest.concurrent.Eventually._
@@ -666,7 +666,7 @@ class BlockManagerSuite extends SparkFunSuite with Matchers
with PrivateMethodTe
removedFromMemory: Boolean,
removedFromDisk: Boolean): Unit = {
def assertSizeReported(captor: ArgumentCaptor[Long], expectRemoved:
Boolean): Unit = {
- assert(captor.getAllValues().size() === 1)
+ assert(captor.getAllValues().size() >= 1)
if (expectRemoved) {
assert(captor.getValue() > 0)
} else {
@@ -676,15 +676,18 @@ class BlockManagerSuite extends SparkFunSuite with
Matchers with PrivateMethodTe
val memSizeCaptor =
ArgumentCaptor.forClass(classOf[Long]).asInstanceOf[ArgumentCaptor[Long]]
val diskSizeCaptor =
ArgumentCaptor.forClass(classOf[Long]).asInstanceOf[ArgumentCaptor[Long]]
- verify(master).updateBlockInfo(mc.eq(store.blockManagerId), mc.eq(blockId),
- mc.eq(StorageLevel.NONE), memSizeCaptor.capture(),
diskSizeCaptor.capture())
+ val storageLevelCaptor =
+
ArgumentCaptor.forClass(classOf[StorageLevel]).asInstanceOf[ArgumentCaptor[StorageLevel]]
+ verify(master, atLeastOnce()).updateBlockInfo(mc.eq(store.blockManagerId),
mc.eq(blockId),
+ storageLevelCaptor.capture(), memSizeCaptor.capture(),
diskSizeCaptor.capture())
assertSizeReported(memSizeCaptor, removedFromMemory)
assertSizeReported(diskSizeCaptor, removedFromDisk)
+ assert(storageLevelCaptor.getValue.replication == 0)
}
private def assertUpdateBlockInfoNotReported(store: BlockManager, blockId:
BlockId): Unit = {
verify(master, never()).updateBlockInfo(mc.eq(store.blockManagerId),
mc.eq(blockId),
- mc.eq(StorageLevel.NONE), mc.anyInt(), mc.anyInt())
+ mc.any[StorageLevel](), mc.anyInt(), mc.anyInt())
}
test("reregistration on heart beat") {
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]