This is an automated email from the ASF dual-hosted git repository.
dongjoon pushed a commit to branch branch-3.5
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/branch-3.5 by this push:
new aa6784c1bcdd Revert "[SPARK-47702][CORE] Remove Shuffle service
endpoint from the locations list when RDD block is removed form a node"
aa6784c1bcdd is described below
commit aa6784c1bcdd340d156463e5cedc59343dc53f4c
Author: yangjie01 <[email protected]>
AuthorDate: Fri Oct 4 10:29:54 2024 -0700
Revert "[SPARK-47702][CORE] Remove Shuffle service endpoint from the
locations list when RDD block is removed form a node"
### What changes were proposed in this pull request?
This reverts commit ec281547eed4cee1dab2b777b4a03c764bd15b54.
### Why are the changes needed?
branch-3.5 cannot be compiled successfully with commit
ec281547eed4cee1dab2b777b4a03c764bd15b54
```
[error]
/home/runner/work/spark/spark/core/src/test/scala/org/apache/spark/storage/BlockManagerReplicationSuite.scala:304:23:
value TEST_SKIP_ESS_REGISTER is not a member of object
org.apache.spark.internal.config.Tests
[error] newConf.set(Tests.TEST_SKIP_ESS_REGISTER, true)
[error] ^
[error] one error found
```
### Does this PR introduce _any_ user-facing change?
No
### How was this patch tested?
Pass GItHub Actions
### Was this patch authored or co-authored using generative AI tooling?
No
Closes #48353 from LuciferYang/Revert-SPARK-47702.
Authored-by: yangjie01 <[email protected]>
Signed-off-by: Dongjoon Hyun <[email protected]>
---
.../org/apache/spark/storage/BlockManager.scala | 6 ++--
.../storage/BlockManagerReplicationSuite.scala | 37 ----------------------
.../apache/spark/storage/BlockManagerSuite.scala | 13 +++-----
3 files changed, 7 insertions(+), 49 deletions(-)
diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala
b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala
index 1b56aa7ade12..6de6069d2fea 100644
--- a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala
+++ b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala
@@ -2082,10 +2082,8 @@ private[spark] class BlockManager(
hasRemoveBlock = true
if (tellMaster) {
// Only update storage level from the captured block status before
deleting, so that
- // memory size and disk size are being kept for calculating delta.
Reset the replica
- // count 0 in storage level to notify that it is a remove operation.
- val storageLevel = StorageLevel(blockStatus.get.storageLevel.toInt, 0)
- reportBlockStatus(blockId, blockStatus.get.copy(storageLevel =
storageLevel))
+ // memory size and disk size are being kept for calculating delta.
+ reportBlockStatus(blockId, blockStatus.get.copy(storageLevel =
StorageLevel.NONE))
}
} finally {
if (!hasRemoveBlock) {
diff --git
a/core/src/test/scala/org/apache/spark/storage/BlockManagerReplicationSuite.scala
b/core/src/test/scala/org/apache/spark/storage/BlockManagerReplicationSuite.scala
index d2fa3b4158d8..38a669bc8574 100644
---
a/core/src/test/scala/org/apache/spark/storage/BlockManagerReplicationSuite.scala
+++
b/core/src/test/scala/org/apache/spark/storage/BlockManagerReplicationSuite.scala
@@ -38,8 +38,6 @@ import org.apache.spark.internal.config.Tests._
import org.apache.spark.memory.UnifiedMemoryManager
import org.apache.spark.network.BlockTransferService
import org.apache.spark.network.netty.NettyBlockTransferService
-import org.apache.spark.network.shuffle.ExternalBlockStoreClient
-import org.apache.spark.network.util.{MapConfigProvider, TransportConf}
import org.apache.spark.rpc.RpcEnv
import org.apache.spark.scheduler.LiveListenerBus
import org.apache.spark.serializer.{KryoSerializer, SerializerManager}
@@ -297,41 +295,6 @@ trait BlockManagerReplicationBehavior extends SparkFunSuite
}
}
- test("Test block location after replication with
SHUFFLE_SERVICE_FETCH_RDD_ENABLED enabled") {
- val newConf = conf.clone()
- newConf.set(SHUFFLE_SERVICE_ENABLED, true)
- newConf.set(SHUFFLE_SERVICE_FETCH_RDD_ENABLED, true)
- newConf.set(Tests.TEST_SKIP_ESS_REGISTER, true)
- val blockManagerInfo = new mutable.HashMap[BlockManagerId,
BlockManagerInfo]()
- val shuffleClient = Some(new ExternalBlockStoreClient(
- new TransportConf("shuffle", MapConfigProvider.EMPTY),
- null, false, 5000))
- master = new BlockManagerMaster(rpcEnv.setupEndpoint("blockmanager-2",
- new BlockManagerMasterEndpoint(rpcEnv, true, newConf,
- new LiveListenerBus(newConf), shuffleClient, blockManagerInfo,
mapOutputTracker,
- sc.env.shuffleManager, isDriver = true)),
- rpcEnv.setupEndpoint("blockmanagerHeartbeat-2",
- new BlockManagerMasterHeartbeatEndpoint(rpcEnv, true,
blockManagerInfo)), newConf, true)
-
- val shuffleServicePort = newConf.get(SHUFFLE_SERVICE_PORT)
- val store1 = makeBlockManager(10000, "host-1")
- val store2 = makeBlockManager(10000, "host-2")
- assert(master.getPeers(store1.blockManagerId).toSet ===
Set(store2.blockManagerId))
-
- val blockId = RDDBlockId(1, 2)
- val message = new Array[Byte](1000)
-
- // if SHUFFLE_SERVICE_FETCH_RDD_ENABLED is enabled, then shuffle port
should be present.
- store1.putSingle(blockId, message, StorageLevel.DISK_ONLY)
- assert(master.getLocations(blockId).contains(
- BlockManagerId("host-1", "localhost", shuffleServicePort, None)))
-
- // after block is removed, shuffle port should be removed.
- store1.removeBlock(blockId, true)
- assert(!master.getLocations(blockId).contains(
- BlockManagerId("host-1", "localhost", shuffleServicePort, None)))
- }
-
test("block replication - addition and deletion of block managers") {
val blockSize = 1000
val storeSize = 10000
diff --git
a/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala
b/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala
index 728e3a252b7a..ecd66dc2c5fb 100644
--- a/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala
@@ -33,7 +33,7 @@ import scala.reflect.classTag
import com.esotericsoftware.kryo.KryoException
import org.apache.commons.lang3.RandomUtils
import org.mockito.{ArgumentCaptor, ArgumentMatchers => mc}
-import org.mockito.Mockito.{atLeastOnce, doAnswer, mock, never, spy, times,
verify, when}
+import org.mockito.Mockito.{doAnswer, mock, never, spy, times, verify, when}
import org.scalatest.PrivateMethodTester
import org.scalatest.concurrent.{Signaler, ThreadSignaler, TimeLimits}
import org.scalatest.concurrent.Eventually._
@@ -666,7 +666,7 @@ class BlockManagerSuite extends SparkFunSuite with Matchers
with PrivateMethodTe
removedFromMemory: Boolean,
removedFromDisk: Boolean): Unit = {
def assertSizeReported(captor: ArgumentCaptor[Long], expectRemoved:
Boolean): Unit = {
- assert(captor.getAllValues().size() >= 1)
+ assert(captor.getAllValues().size() === 1)
if (expectRemoved) {
assert(captor.getValue() > 0)
} else {
@@ -676,18 +676,15 @@ class BlockManagerSuite extends SparkFunSuite with
Matchers with PrivateMethodTe
val memSizeCaptor =
ArgumentCaptor.forClass(classOf[Long]).asInstanceOf[ArgumentCaptor[Long]]
val diskSizeCaptor =
ArgumentCaptor.forClass(classOf[Long]).asInstanceOf[ArgumentCaptor[Long]]
- val storageLevelCaptor =
-
ArgumentCaptor.forClass(classOf[StorageLevel]).asInstanceOf[ArgumentCaptor[StorageLevel]]
- verify(master, atLeastOnce()).updateBlockInfo(mc.eq(store.blockManagerId),
mc.eq(blockId),
- storageLevelCaptor.capture(), memSizeCaptor.capture(),
diskSizeCaptor.capture())
+ verify(master).updateBlockInfo(mc.eq(store.blockManagerId), mc.eq(blockId),
+ mc.eq(StorageLevel.NONE), memSizeCaptor.capture(),
diskSizeCaptor.capture())
assertSizeReported(memSizeCaptor, removedFromMemory)
assertSizeReported(diskSizeCaptor, removedFromDisk)
- assert(storageLevelCaptor.getValue.replication == 0)
}
private def assertUpdateBlockInfoNotReported(store: BlockManager, blockId:
BlockId): Unit = {
verify(master, never()).updateBlockInfo(mc.eq(store.blockManagerId),
mc.eq(blockId),
- mc.any[StorageLevel](), mc.anyInt(), mc.anyInt())
+ mc.eq(StorageLevel.NONE), mc.anyInt(), mc.anyInt())
}
test("reregistration on heart beat") {
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]