This is an automated email from the ASF dual-hosted git repository.

dongjoon pushed a commit to branch branch-3.5
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-3.5 by this push:
     new aa6784c1bcdd Revert "[SPARK-47702][CORE] Remove Shuffle service 
endpoint from the locations list when RDD block is removed form a node"
aa6784c1bcdd is described below

commit aa6784c1bcdd340d156463e5cedc59343dc53f4c
Author: yangjie01 <[email protected]>
AuthorDate: Fri Oct 4 10:29:54 2024 -0700

    Revert "[SPARK-47702][CORE] Remove Shuffle service endpoint from the 
locations list when RDD block is removed form a node"
    
    ### What changes were proposed in this pull request?
    This reverts commit ec281547eed4cee1dab2b777b4a03c764bd15b54.
    
    ### Why are the changes needed?
    branch-3.5 cannot be compiled successfully with commit 
ec281547eed4cee1dab2b777b4a03c764bd15b54
    
    ```
    [error] 
/home/runner/work/spark/spark/core/src/test/scala/org/apache/spark/storage/BlockManagerReplicationSuite.scala:304:23:
 value TEST_SKIP_ESS_REGISTER is not a member of object 
org.apache.spark.internal.config.Tests
    [error]     newConf.set(Tests.TEST_SKIP_ESS_REGISTER, true)
    [error]                       ^
    [error] one error found
    ```
    
    ### Does this PR introduce _any_ user-facing change?
    No
    
    ### How was this patch tested?
    Pass GItHub Actions
    
    ### Was this patch authored or co-authored using generative AI tooling?
    No
    
    Closes #48353 from LuciferYang/Revert-SPARK-47702.
    
    Authored-by: yangjie01 <[email protected]>
    Signed-off-by: Dongjoon Hyun <[email protected]>
---
 .../org/apache/spark/storage/BlockManager.scala    |  6 ++--
 .../storage/BlockManagerReplicationSuite.scala     | 37 ----------------------
 .../apache/spark/storage/BlockManagerSuite.scala   | 13 +++-----
 3 files changed, 7 insertions(+), 49 deletions(-)

diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala 
b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala
index 1b56aa7ade12..6de6069d2fea 100644
--- a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala
+++ b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala
@@ -2082,10 +2082,8 @@ private[spark] class BlockManager(
       hasRemoveBlock = true
       if (tellMaster) {
         // Only update storage level from the captured block status before 
deleting, so that
-        // memory size and disk size are being kept for calculating delta. 
Reset the replica
-        // count 0 in storage level to notify that it is a remove operation.
-        val storageLevel = StorageLevel(blockStatus.get.storageLevel.toInt, 0)
-        reportBlockStatus(blockId, blockStatus.get.copy(storageLevel = 
storageLevel))
+        // memory size and disk size are being kept for calculating delta.
+        reportBlockStatus(blockId, blockStatus.get.copy(storageLevel = 
StorageLevel.NONE))
       }
     } finally {
       if (!hasRemoveBlock) {
diff --git 
a/core/src/test/scala/org/apache/spark/storage/BlockManagerReplicationSuite.scala
 
b/core/src/test/scala/org/apache/spark/storage/BlockManagerReplicationSuite.scala
index d2fa3b4158d8..38a669bc8574 100644
--- 
a/core/src/test/scala/org/apache/spark/storage/BlockManagerReplicationSuite.scala
+++ 
b/core/src/test/scala/org/apache/spark/storage/BlockManagerReplicationSuite.scala
@@ -38,8 +38,6 @@ import org.apache.spark.internal.config.Tests._
 import org.apache.spark.memory.UnifiedMemoryManager
 import org.apache.spark.network.BlockTransferService
 import org.apache.spark.network.netty.NettyBlockTransferService
-import org.apache.spark.network.shuffle.ExternalBlockStoreClient
-import org.apache.spark.network.util.{MapConfigProvider, TransportConf}
 import org.apache.spark.rpc.RpcEnv
 import org.apache.spark.scheduler.LiveListenerBus
 import org.apache.spark.serializer.{KryoSerializer, SerializerManager}
@@ -297,41 +295,6 @@ trait BlockManagerReplicationBehavior extends SparkFunSuite
     }
   }
 
-  test("Test block location after replication with 
SHUFFLE_SERVICE_FETCH_RDD_ENABLED enabled") {
-    val newConf = conf.clone()
-    newConf.set(SHUFFLE_SERVICE_ENABLED, true)
-    newConf.set(SHUFFLE_SERVICE_FETCH_RDD_ENABLED, true)
-    newConf.set(Tests.TEST_SKIP_ESS_REGISTER, true)
-    val blockManagerInfo = new mutable.HashMap[BlockManagerId, 
BlockManagerInfo]()
-    val shuffleClient = Some(new ExternalBlockStoreClient(
-        new TransportConf("shuffle", MapConfigProvider.EMPTY),
-        null, false, 5000))
-    master = new BlockManagerMaster(rpcEnv.setupEndpoint("blockmanager-2",
-      new BlockManagerMasterEndpoint(rpcEnv, true, newConf,
-        new LiveListenerBus(newConf), shuffleClient, blockManagerInfo, 
mapOutputTracker,
-        sc.env.shuffleManager, isDriver = true)),
-      rpcEnv.setupEndpoint("blockmanagerHeartbeat-2",
-      new BlockManagerMasterHeartbeatEndpoint(rpcEnv, true, 
blockManagerInfo)), newConf, true)
-
-    val shuffleServicePort = newConf.get(SHUFFLE_SERVICE_PORT)
-    val store1 = makeBlockManager(10000, "host-1")
-    val store2 = makeBlockManager(10000, "host-2")
-    assert(master.getPeers(store1.blockManagerId).toSet === 
Set(store2.blockManagerId))
-
-    val blockId = RDDBlockId(1, 2)
-    val message = new Array[Byte](1000)
-
-    // if SHUFFLE_SERVICE_FETCH_RDD_ENABLED is enabled, then shuffle port 
should be present.
-    store1.putSingle(blockId, message, StorageLevel.DISK_ONLY)
-    assert(master.getLocations(blockId).contains(
-      BlockManagerId("host-1", "localhost", shuffleServicePort, None)))
-
-    // after block is removed, shuffle port should be removed.
-    store1.removeBlock(blockId, true)
-    assert(!master.getLocations(blockId).contains(
-      BlockManagerId("host-1", "localhost", shuffleServicePort, None)))
-  }
-
   test("block replication - addition and deletion of block managers") {
     val blockSize = 1000
     val storeSize = 10000
diff --git 
a/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala 
b/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala
index 728e3a252b7a..ecd66dc2c5fb 100644
--- a/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala
@@ -33,7 +33,7 @@ import scala.reflect.classTag
 import com.esotericsoftware.kryo.KryoException
 import org.apache.commons.lang3.RandomUtils
 import org.mockito.{ArgumentCaptor, ArgumentMatchers => mc}
-import org.mockito.Mockito.{atLeastOnce, doAnswer, mock, never, spy, times, 
verify, when}
+import org.mockito.Mockito.{doAnswer, mock, never, spy, times, verify, when}
 import org.scalatest.PrivateMethodTester
 import org.scalatest.concurrent.{Signaler, ThreadSignaler, TimeLimits}
 import org.scalatest.concurrent.Eventually._
@@ -666,7 +666,7 @@ class BlockManagerSuite extends SparkFunSuite with Matchers 
with PrivateMethodTe
       removedFromMemory: Boolean,
       removedFromDisk: Boolean): Unit = {
     def assertSizeReported(captor: ArgumentCaptor[Long], expectRemoved: 
Boolean): Unit = {
-      assert(captor.getAllValues().size() >= 1)
+      assert(captor.getAllValues().size() === 1)
       if (expectRemoved) {
         assert(captor.getValue() > 0)
       } else {
@@ -676,18 +676,15 @@ class BlockManagerSuite extends SparkFunSuite with 
Matchers with PrivateMethodTe
 
     val memSizeCaptor = 
ArgumentCaptor.forClass(classOf[Long]).asInstanceOf[ArgumentCaptor[Long]]
     val diskSizeCaptor = 
ArgumentCaptor.forClass(classOf[Long]).asInstanceOf[ArgumentCaptor[Long]]
-    val storageLevelCaptor =
-      
ArgumentCaptor.forClass(classOf[StorageLevel]).asInstanceOf[ArgumentCaptor[StorageLevel]]
-    verify(master, atLeastOnce()).updateBlockInfo(mc.eq(store.blockManagerId), 
mc.eq(blockId),
-      storageLevelCaptor.capture(), memSizeCaptor.capture(), 
diskSizeCaptor.capture())
+    verify(master).updateBlockInfo(mc.eq(store.blockManagerId), mc.eq(blockId),
+      mc.eq(StorageLevel.NONE), memSizeCaptor.capture(), 
diskSizeCaptor.capture())
     assertSizeReported(memSizeCaptor, removedFromMemory)
     assertSizeReported(diskSizeCaptor, removedFromDisk)
-    assert(storageLevelCaptor.getValue.replication == 0)
   }
 
   private def assertUpdateBlockInfoNotReported(store: BlockManager, blockId: 
BlockId): Unit = {
     verify(master, never()).updateBlockInfo(mc.eq(store.blockManagerId), 
mc.eq(blockId),
-      mc.any[StorageLevel](), mc.anyInt(), mc.anyInt())
+      mc.eq(StorageLevel.NONE), mc.anyInt(), mc.anyInt())
   }
 
   test("reregistration on heart beat") {


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to