This is an automated email from the ASF dual-hosted git repository.

rexxiong pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/celeborn.git


The following commit(s) were added to refs/heads/main by this push:
     new 5789b4e35 [CELEBORN-2217] Use a separate thread to handle 
RpcEndpointVerifier messages
5789b4e35 is described below

commit 5789b4e35786bb62964042f818f5c32a76bf259e
Author: TheodoreLx <[email protected]>
AuthorDate: Sun Dec 14 21:29:17 2025 +0800

    [CELEBORN-2217] Use a separate thread to handle RpcEndpointVerifier messages
    
    ### What changes were proposed in this pull request?
    
    An EndpointVerifierMessageLoop is introduced to specifically handle 
RpcEndpointVerifier messages, and a separate thread is used to execute the 
EndpointVerifierMessageLoop.
    
    ### Why are the changes needed?
    
    RpcEndpointVerifier.CheckExistence is a high-priority message that needs to 
be processed first. In the original model, if the LifecycleManager's RPC 
message queue accumulates a large backlog, RpcEndpointVerifier.CheckExistence 
messages cannot be processed immediately, leading to numerous ShuffleClient 
initialization failures and causing task failures.
    
    ### Does this PR resolve a correctness bug?
    
    No
    
    ### Does this PR introduce _any_ user-facing change?
    
    No
    
    ### How was this patch tested?
    
    Cluster Test
    
    Closes #3554 from TheodoreLx/verifier-first.
    
    Authored-by: TheodoreLx <[email protected]>
    Signed-off-by: Shuang <[email protected]>
---
 .../org/apache/celeborn/common/CelebornConf.scala  | 22 +++++++
 .../celeborn/common/rpc/netty/Dispatcher.scala     | 71 ++++++++++++++++++++--
 .../celeborn/common/rpc/netty/NettyRpcEnv.scala    |  2 +-
 .../common/rpc/netty/NettyRpcEnvSuite.scala        | 18 ++++++
 docs/configuration/network.md                      |  1 +
 5 files changed, 109 insertions(+), 5 deletions(-)

diff --git 
a/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala 
b/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala
index c863a5f6d..7c1cc6875 100644
--- a/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala
+++ b/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala
@@ -30,6 +30,7 @@ import scala.util.matching.Regex
 import io.netty.channel.epoll.Epoll
 import io.netty.channel.kqueue.KQueue
 
+import 
org.apache.celeborn.common.CelebornConf.ENDPOINT_VERIFIER_SEPARATE_ENABLED
 import 
org.apache.celeborn.common.authentication.AnonymousAuthenticationProviderImpl
 import org.apache.celeborn.common.client.{ApplicationInfoProvider, 
DefaultApplicationInfoProvider}
 import org.apache.celeborn.common.identity.{DefaultIdentityProvider, 
HadoopBasedIdentityProvider, IdentityProvider}
@@ -1016,6 +1017,8 @@ class CelebornConf(loadDefaults: Boolean) extends 
Cloneable with Logging with Se
       get(CLIENT_RPC_COMMIT_FILES_ASK_TIMEOUT).milli,
       CLIENT_RPC_COMMIT_FILES_ASK_TIMEOUT.key)
 
+  def endpointVerifierSeparateEnabled: Boolean = 
get(ENDPOINT_VERIFIER_SEPARATE_ENABLED)
+
   // //////////////////////////////////////////////////////
   //               Shuffle Client Fetch                  //
   // //////////////////////////////////////////////////////
@@ -1488,6 +1491,7 @@ class CelebornConf(loadDefaults: Boolean) extends 
Cloneable with Logging with Se
   def testPushReplicaDataTimeout: Boolean = 
get(TEST_WORKER_PUSH_REPLICA_DATA_TIMEOUT)
   def testRetryRevive: Boolean = get(TEST_CLIENT_RETRY_REVIVE)
   def testAlternative: String = get(TEST_ALTERNATIVE.key, "celeborn")
+  def testProcessEndpointVerifierSeparate: Boolean = 
get(TEST_RPC_ENDPOINT_VERIFIER_SEPARATE)
   def clientFlinkMemoryPerResultPartition: Long = 
get(CLIENT_MEMORY_PER_RESULT_PARTITION)
   def clientFlinkMemoryPerInputGate: Long = get(CLIENT_MEMORY_PER_INPUT_GATE)
   def clientFlinkNumConcurrentReading: Int = 
get(CLIENT_NUM_CONCURRENT_READINGS)
@@ -6795,4 +6799,22 @@ object CelebornConf extends Logging {
       .doc("Whether to mark shuffle data lost when unknown worker is 
detected.")
       .booleanConf
       .createWithDefault(false)
+
+  val ENDPOINT_VERIFIER_SEPARATE_ENABLED: ConfigEntry[Boolean] =
+    buildConf("celeborn.rpc.RpcEndpointVerifier.separate.enabled")
+      .categories("network")
+      .version("0.7.0")
+      .doc("Whether to enable dispatcher process RpcEndpointVerifier's request 
separately.")
+      .booleanConf
+      .createWithDefault(true)
+
+  val TEST_RPC_ENDPOINT_VERIFIER_SEPARATE: ConfigEntry[Boolean] =
+    buildConf("celeborn.test.RpcEndpointVerifier.separate.enabled")
+      .internal
+      .categories("test", "rpc")
+      .doc("test to process RpcEndpointVerifier's request separately")
+      .version("0.7.0")
+      .booleanConf
+      .createWithDefault(false)
+
 }
diff --git 
a/common/src/main/scala/org/apache/celeborn/common/rpc/netty/Dispatcher.scala 
b/common/src/main/scala/org/apache/celeborn/common/rpc/netty/Dispatcher.scala
index c93203c36..d8282a64c 100644
--- 
a/common/src/main/scala/org/apache/celeborn/common/rpc/netty/Dispatcher.scala
+++ 
b/common/src/main/scala/org/apache/celeborn/common/rpc/netty/Dispatcher.scala
@@ -51,6 +51,14 @@ private[celeborn] class Dispatcher(nettyEnv: NettyRpcEnv, 
rpcSource: RpcSource)
 
   // Track the receivers whose inboxes may contain messages.
   private val receivers = new LinkedBlockingQueue[EndpointData]
+  private val endpointVerifierSeparate: Boolean =
+    nettyEnv.celebornConf.endpointVerifierSeparateEnabled
+  private val testProcessEndpointVerifierSeparate: Boolean =
+    nettyEnv.celebornConf.testProcessEndpointVerifierSeparate
+  private var endpointVerifierMessageLoopRunning: Boolean = true
+  private var rpcEndpointVerifier: EndpointData = _
+  private val CURRENT_RUNNABLE: ThreadLocal[Runnable] = new 
ThreadLocal[Runnable]
+  private[netty] var testProcessEndpointVerifierSeparateResult: Boolean = false
 
   /**
    * True if the dispatcher has been stopped. Once stopped, all messages 
posted will be bounced
@@ -71,7 +79,11 @@ private[celeborn] class Dispatcher(nettyEnv: NettyRpcEnv, 
rpcSource: RpcSource)
       }
       val data = endpoints.get(name)
       endpointRefs.put(data.endpoint, data.ref)
-      receivers.offer(data) // for the OnStart message
+      if (endpointVerifierSeparate && RpcEndpointVerifier.NAME.equals(name)) {
+        rpcEndpointVerifier = data
+      } else {
+        receivers.offer(data) // for the OnStart message
+      }
     }
     endpointRef
   }
@@ -174,7 +186,11 @@ private[celeborn] class Dispatcher(nettyEnv: NettyRpcEnv, 
rpcSource: RpcSource)
         Some(new CelebornException(s"Could not find $endpointName."))
       } else {
         data.inbox.post(message)
-        receivers.offer(data)
+        if (endpointVerifierSeparate && 
RpcEndpointVerifier.NAME.equals(endpointName)) {
+          // RpcEndpointVerifier's message will process by 
EndpointVerifierMessageLoop, so do noting here
+        } else {
+          receivers.offer(data)
+        }
         None
       }
     }
@@ -193,6 +209,9 @@ private[celeborn] class Dispatcher(nettyEnv: NettyRpcEnv, 
rpcSource: RpcSource)
     endpoints.keySet().asScala.foreach(unregisterRpcEndpoint)
     // Enqueue a message that tells the message loops to stop.
     receivers.offer(PoisonPill)
+    if (endpointVerifierSeparate) {
+      endpointVerifierMessageLoopRunning = false
+    }
     threadpool.shutdown()
   }
 
@@ -204,6 +223,10 @@ private[celeborn] class Dispatcher(nettyEnv: NettyRpcEnv, 
rpcSource: RpcSource)
    * Return if the endpoint exists
    */
   def verify(name: String): Boolean = {
+    if (testProcessEndpointVerifierSeparate && 
!testProcessEndpointVerifierSeparateResult && CURRENT_RUNNABLE.get() != null && 
CURRENT_RUNNABLE.get().isInstanceOf[
+        EndpointVerifierMessageLoop]) {
+      testProcessEndpointVerifierSeparateResult = true
+    }
     endpoints.containsKey(name)
   }
 
@@ -218,8 +241,16 @@ private[celeborn] class Dispatcher(nettyEnv: NettyRpcEnv, 
rpcSource: RpcSource)
 
     val pool = ThreadUtils.newDaemonFixedThreadPool(numThreads, 
"celeborn-dispatcher")
     logInfo(s"Celeborn dispatcher numThreads: $numThreads")
-    for (i <- 0 until numThreads) {
-      pool.execute(new MessageLoop)
+    if (endpointVerifierSeparate) {
+      for (i <- 0 until numThreads - 1) {
+        pool.execute(new MessageLoop)
+      }
+      pool.execute(new EndpointVerifierMessageLoop)
+      logInfo("EndpointVerifierMessageLoop started")
+    } else {
+      for (i <- 0 until numThreads) {
+        pool.execute(new MessageLoop)
+      }
     }
     pool
   }
@@ -255,6 +286,38 @@ private[celeborn] class Dispatcher(nettyEnv: NettyRpcEnv, 
rpcSource: RpcSource)
     }
   }
 
+  /** Message loop used for dispatching messages. */
+  private class EndpointVerifierMessageLoop extends Runnable {
+    override def run(): Unit = {
+      try {
+        CURRENT_RUNNABLE.set(this)
+        while (endpointVerifierMessageLoopRunning) {
+          if (rpcEndpointVerifier == null || 
rpcEndpointVerifier.inbox.isEmpty) {
+            TimeUnit.MILLISECONDS.sleep(50)
+          } else {
+            try {
+              rpcEndpointVerifier.inbox.process(Dispatcher.this)
+            } catch {
+              case NonFatal(e) => logError(e.getMessage, e)
+            }
+          }
+        }
+      } catch {
+        case _: InterruptedException => // exit
+        case t: Throwable =>
+          try {
+            // Re-submit a PriorityMessageLoop so that Dispatcher will still 
work if
+            // UncaughtExceptionHandler decides to not kill JVM.
+            threadpool.execute(new EndpointVerifierMessageLoop)
+          } finally {
+            throw t
+          }
+      } finally {
+        CURRENT_RUNNABLE.remove()
+      }
+    }
+  }
+
   /** A poison endpoint that indicates MessageLoop should exit its message 
loop. */
   private val PoisonPill = new EndpointData(null, null, null)
 }
diff --git 
a/common/src/main/scala/org/apache/celeborn/common/rpc/netty/NettyRpcEnv.scala 
b/common/src/main/scala/org/apache/celeborn/common/rpc/netty/NettyRpcEnv.scala
index c0187f698..ec7219491 100644
--- 
a/common/src/main/scala/org/apache/celeborn/common/rpc/netty/NettyRpcEnv.scala
+++ 
b/common/src/main/scala/org/apache/celeborn/common/rpc/netty/NettyRpcEnv.scala
@@ -57,7 +57,7 @@ class NettyRpcEnv(
 
   private val _rpcSource: RpcSource = new RpcSource(celebornConf, config.role)
 
-  private val dispatcher: Dispatcher = new Dispatcher(this, _rpcSource)
+  private[netty] val dispatcher: Dispatcher = new Dispatcher(this, _rpcSource)
 
   private var worker: RpcEndpoint = null
 
diff --git 
a/common/src/test/scala/org/apache/celeborn/common/rpc/netty/NettyRpcEnvSuite.scala
 
b/common/src/test/scala/org/apache/celeborn/common/rpc/netty/NettyRpcEnvSuite.scala
index bd469591c..f42ffab4c 100644
--- 
a/common/src/test/scala/org/apache/celeborn/common/rpc/netty/NettyRpcEnvSuite.scala
+++ 
b/common/src/test/scala/org/apache/celeborn/common/rpc/netty/NettyRpcEnvSuite.scala
@@ -162,4 +162,22 @@ class NettyRpcEnvSuite extends RpcEnvSuite with TimeLimits 
{
       anotherEnv.awaitTermination()
     }
   }
+
+  test("test process RpcEndpointVerifier separately") {
+    val conf = new CelebornConf()
+    conf.set(CelebornConf.TEST_RPC_ENDPOINT_VERIFIER_SEPARATE, true)
+    val localEnv = createRpcEnv(conf, "local", 0, clientMode = true)
+    val anotherEnv = createRpcEnv(conf, "remote", 0, clientMode = true)
+    try {
+      localEnv.setupEndpointRef(anotherEnv.address, RpcEndpointVerifier.NAME)
+      assert(
+        
anotherEnv.asInstanceOf[NettyRpcEnv].dispatcher.testProcessEndpointVerifierSeparateResult)
+    } finally {
+      anotherEnv.shutdown()
+      anotherEnv.awaitTermination()
+      localEnv.shutdown()
+      localEnv.awaitTermination()
+    }
+  }
+
 }
diff --git a/docs/configuration/network.md b/docs/configuration/network.md
index 2cdad5aa0..44c06ddf1 100644
--- a/docs/configuration/network.md
+++ b/docs/configuration/network.md
@@ -50,6 +50,7 @@ license: |
 | celeborn.network.memory.allocator.verbose.metric | false | false | Whether 
to enable verbose metric for pooled allocator. | 0.3.0 |  | 
 | celeborn.network.timeout | 240s | false | Default timeout for network 
operations. | 0.2.0 |  | 
 | celeborn.port.maxRetries | 1 | false | When port is occupied, we will retry 
for max retry times. | 0.2.0 |  | 
+| celeborn.rpc.RpcEndpointVerifier.separate.enabled | true | false | Whether 
to enable dispatcher process RpcEndpointVerifier's request separately. | 0.7.0 
|  | 
 | celeborn.rpc.askTimeout | 60s | false | Timeout for RPC ask operations. It's 
recommended to set at least `240s` when `HDFS` is enabled in 
`celeborn.storage.availableTypes` | 0.2.0 |  | 
 | celeborn.rpc.connect.threads | 64 | false |  | 0.2.0 |  | 
 | celeborn.rpc.dispatcher.threads | 0 | false | Threads number of message 
dispatcher event loop. Default to 0, which is availableCore. | 0.3.0 | 
celeborn.rpc.dispatcher.numThreads | 

Reply via email to