This is an automated email from the ASF dual-hosted git repository.
rexxiong pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/celeborn.git
The following commit(s) were added to refs/heads/main by this push:
new 5789b4e35 [CELEBORN-2217] Use a separate thread to handle
RpcEndpointVerifier messages
5789b4e35 is described below
commit 5789b4e35786bb62964042f818f5c32a76bf259e
Author: TheodoreLx <[email protected]>
AuthorDate: Sun Dec 14 21:29:17 2025 +0800
[CELEBORN-2217] Use a separate thread to handle RpcEndpointVerifier messages
### What changes were proposed in this pull request?
An EndpointVerifierMessageLoop is introduced to specifically handle
RpcEndpointVerifier messages, and a separate thread is used to execute the
EndpointVerifierMessageLoop.
### Why are the changes needed?
RpcEndpointVerifier.CheckExistence is a high-priority message that needs to
be processed first. In the original model, if the LifecycleManager's RPC
message queue accumulates a large backlog, RpcEndpointVerifier.CheckExistence
messages cannot be processed immediately, leading to numerous ShuffleClient
initialization failures and causing task failures.
### Does this PR resolve a correctness bug?
No
### Does this PR introduce _any_ user-facing change?
No
### How was this patch tested?
Cluster Test
Closes #3554 from TheodoreLx/verifier-first.
Authored-by: TheodoreLx <[email protected]>
Signed-off-by: Shuang <[email protected]>
---
.../org/apache/celeborn/common/CelebornConf.scala | 22 +++++++
.../celeborn/common/rpc/netty/Dispatcher.scala | 71 ++++++++++++++++++++--
.../celeborn/common/rpc/netty/NettyRpcEnv.scala | 2 +-
.../common/rpc/netty/NettyRpcEnvSuite.scala | 18 ++++++
docs/configuration/network.md | 1 +
5 files changed, 109 insertions(+), 5 deletions(-)
diff --git
a/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala
b/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala
index c863a5f6d..7c1cc6875 100644
--- a/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala
+++ b/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala
@@ -30,6 +30,7 @@ import scala.util.matching.Regex
import io.netty.channel.epoll.Epoll
import io.netty.channel.kqueue.KQueue
+import
org.apache.celeborn.common.CelebornConf.ENDPOINT_VERIFIER_SEPARATE_ENABLED
import
org.apache.celeborn.common.authentication.AnonymousAuthenticationProviderImpl
import org.apache.celeborn.common.client.{ApplicationInfoProvider,
DefaultApplicationInfoProvider}
import org.apache.celeborn.common.identity.{DefaultIdentityProvider,
HadoopBasedIdentityProvider, IdentityProvider}
@@ -1016,6 +1017,8 @@ class CelebornConf(loadDefaults: Boolean) extends
Cloneable with Logging with Se
get(CLIENT_RPC_COMMIT_FILES_ASK_TIMEOUT).milli,
CLIENT_RPC_COMMIT_FILES_ASK_TIMEOUT.key)
+ def endpointVerifierSeparateEnabled: Boolean =
get(ENDPOINT_VERIFIER_SEPARATE_ENABLED)
+
// //////////////////////////////////////////////////////
// Shuffle Client Fetch //
// //////////////////////////////////////////////////////
@@ -1488,6 +1491,7 @@ class CelebornConf(loadDefaults: Boolean) extends
Cloneable with Logging with Se
def testPushReplicaDataTimeout: Boolean =
get(TEST_WORKER_PUSH_REPLICA_DATA_TIMEOUT)
def testRetryRevive: Boolean = get(TEST_CLIENT_RETRY_REVIVE)
def testAlternative: String = get(TEST_ALTERNATIVE.key, "celeborn")
+ def testProcessEndpointVerifierSeparate: Boolean =
get(TEST_RPC_ENDPOINT_VERIFIER_SEPARATE)
def clientFlinkMemoryPerResultPartition: Long =
get(CLIENT_MEMORY_PER_RESULT_PARTITION)
def clientFlinkMemoryPerInputGate: Long = get(CLIENT_MEMORY_PER_INPUT_GATE)
def clientFlinkNumConcurrentReading: Int =
get(CLIENT_NUM_CONCURRENT_READINGS)
@@ -6795,4 +6799,22 @@ object CelebornConf extends Logging {
.doc("Whether to mark shuffle data lost when unknown worker is
detected.")
.booleanConf
.createWithDefault(false)
+
+ val ENDPOINT_VERIFIER_SEPARATE_ENABLED: ConfigEntry[Boolean] =
+ buildConf("celeborn.rpc.RpcEndpointVerifier.separate.enabled")
+ .categories("network")
+ .version("0.7.0")
+ .doc("Whether to enable dispatcher process RpcEndpointVerifier's request
separately.")
+ .booleanConf
+ .createWithDefault(true)
+
+ val TEST_RPC_ENDPOINT_VERIFIER_SEPARATE: ConfigEntry[Boolean] =
+ buildConf("celeborn.test.RpcEndpointVerifier.separate.enabled")
+ .internal
+ .categories("test", "rpc")
+ .doc("test to process RpcEndpointVerifier's request separately")
+ .version("0.7.0")
+ .booleanConf
+ .createWithDefault(false)
+
}
diff --git
a/common/src/main/scala/org/apache/celeborn/common/rpc/netty/Dispatcher.scala
b/common/src/main/scala/org/apache/celeborn/common/rpc/netty/Dispatcher.scala
index c93203c36..d8282a64c 100644
---
a/common/src/main/scala/org/apache/celeborn/common/rpc/netty/Dispatcher.scala
+++
b/common/src/main/scala/org/apache/celeborn/common/rpc/netty/Dispatcher.scala
@@ -51,6 +51,14 @@ private[celeborn] class Dispatcher(nettyEnv: NettyRpcEnv,
rpcSource: RpcSource)
// Track the receivers whose inboxes may contain messages.
private val receivers = new LinkedBlockingQueue[EndpointData]
+ private val endpointVerifierSeparate: Boolean =
+ nettyEnv.celebornConf.endpointVerifierSeparateEnabled
+ private val testProcessEndpointVerifierSeparate: Boolean =
+ nettyEnv.celebornConf.testProcessEndpointVerifierSeparate
+ private var endpointVerifierMessageLoopRunning: Boolean = true
+ private var rpcEndpointVerifier: EndpointData = _
+ private val CURRENT_RUNNABLE: ThreadLocal[Runnable] = new
ThreadLocal[Runnable]
+ private[netty] var testProcessEndpointVerifierSeparateResult: Boolean = false
/**
* True if the dispatcher has been stopped. Once stopped, all messages
posted will be bounced
@@ -71,7 +79,11 @@ private[celeborn] class Dispatcher(nettyEnv: NettyRpcEnv,
rpcSource: RpcSource)
}
val data = endpoints.get(name)
endpointRefs.put(data.endpoint, data.ref)
- receivers.offer(data) // for the OnStart message
+ if (endpointVerifierSeparate && RpcEndpointVerifier.NAME.equals(name)) {
+ rpcEndpointVerifier = data
+ } else {
+ receivers.offer(data) // for the OnStart message
+ }
}
endpointRef
}
@@ -174,7 +186,11 @@ private[celeborn] class Dispatcher(nettyEnv: NettyRpcEnv,
rpcSource: RpcSource)
Some(new CelebornException(s"Could not find $endpointName."))
} else {
data.inbox.post(message)
- receivers.offer(data)
+ if (endpointVerifierSeparate &&
RpcEndpointVerifier.NAME.equals(endpointName)) {
+ // RpcEndpointVerifier's message will process by
EndpointVerifierMessageLoop, so do noting here
+ } else {
+ receivers.offer(data)
+ }
None
}
}
@@ -193,6 +209,9 @@ private[celeborn] class Dispatcher(nettyEnv: NettyRpcEnv,
rpcSource: RpcSource)
endpoints.keySet().asScala.foreach(unregisterRpcEndpoint)
// Enqueue a message that tells the message loops to stop.
receivers.offer(PoisonPill)
+ if (endpointVerifierSeparate) {
+ endpointVerifierMessageLoopRunning = false
+ }
threadpool.shutdown()
}
@@ -204,6 +223,10 @@ private[celeborn] class Dispatcher(nettyEnv: NettyRpcEnv,
rpcSource: RpcSource)
* Return if the endpoint exists
*/
def verify(name: String): Boolean = {
+ if (testProcessEndpointVerifierSeparate &&
!testProcessEndpointVerifierSeparateResult && CURRENT_RUNNABLE.get() != null &&
CURRENT_RUNNABLE.get().isInstanceOf[
+ EndpointVerifierMessageLoop]) {
+ testProcessEndpointVerifierSeparateResult = true
+ }
endpoints.containsKey(name)
}
@@ -218,8 +241,16 @@ private[celeborn] class Dispatcher(nettyEnv: NettyRpcEnv,
rpcSource: RpcSource)
val pool = ThreadUtils.newDaemonFixedThreadPool(numThreads,
"celeborn-dispatcher")
logInfo(s"Celeborn dispatcher numThreads: $numThreads")
- for (i <- 0 until numThreads) {
- pool.execute(new MessageLoop)
+ if (endpointVerifierSeparate) {
+ for (i <- 0 until numThreads - 1) {
+ pool.execute(new MessageLoop)
+ }
+ pool.execute(new EndpointVerifierMessageLoop)
+ logInfo("EndpointVerifierMessageLoop started")
+ } else {
+ for (i <- 0 until numThreads) {
+ pool.execute(new MessageLoop)
+ }
}
pool
}
@@ -255,6 +286,38 @@ private[celeborn] class Dispatcher(nettyEnv: NettyRpcEnv,
rpcSource: RpcSource)
}
}
+ /** Message loop used for dispatching messages. */
+ private class EndpointVerifierMessageLoop extends Runnable {
+ override def run(): Unit = {
+ try {
+ CURRENT_RUNNABLE.set(this)
+ while (endpointVerifierMessageLoopRunning) {
+ if (rpcEndpointVerifier == null ||
rpcEndpointVerifier.inbox.isEmpty) {
+ TimeUnit.MILLISECONDS.sleep(50)
+ } else {
+ try {
+ rpcEndpointVerifier.inbox.process(Dispatcher.this)
+ } catch {
+ case NonFatal(e) => logError(e.getMessage, e)
+ }
+ }
+ }
+ } catch {
+ case _: InterruptedException => // exit
+ case t: Throwable =>
+ try {
+ // Re-submit a PriorityMessageLoop so that Dispatcher will still
work if
+ // UncaughtExceptionHandler decides to not kill JVM.
+ threadpool.execute(new EndpointVerifierMessageLoop)
+ } finally {
+ throw t
+ }
+ } finally {
+ CURRENT_RUNNABLE.remove()
+ }
+ }
+ }
+
/** A poison endpoint that indicates MessageLoop should exit its message
loop. */
private val PoisonPill = new EndpointData(null, null, null)
}
diff --git
a/common/src/main/scala/org/apache/celeborn/common/rpc/netty/NettyRpcEnv.scala
b/common/src/main/scala/org/apache/celeborn/common/rpc/netty/NettyRpcEnv.scala
index c0187f698..ec7219491 100644
---
a/common/src/main/scala/org/apache/celeborn/common/rpc/netty/NettyRpcEnv.scala
+++
b/common/src/main/scala/org/apache/celeborn/common/rpc/netty/NettyRpcEnv.scala
@@ -57,7 +57,7 @@ class NettyRpcEnv(
private val _rpcSource: RpcSource = new RpcSource(celebornConf, config.role)
- private val dispatcher: Dispatcher = new Dispatcher(this, _rpcSource)
+ private[netty] val dispatcher: Dispatcher = new Dispatcher(this, _rpcSource)
private var worker: RpcEndpoint = null
diff --git
a/common/src/test/scala/org/apache/celeborn/common/rpc/netty/NettyRpcEnvSuite.scala
b/common/src/test/scala/org/apache/celeborn/common/rpc/netty/NettyRpcEnvSuite.scala
index bd469591c..f42ffab4c 100644
---
a/common/src/test/scala/org/apache/celeborn/common/rpc/netty/NettyRpcEnvSuite.scala
+++
b/common/src/test/scala/org/apache/celeborn/common/rpc/netty/NettyRpcEnvSuite.scala
@@ -162,4 +162,22 @@ class NettyRpcEnvSuite extends RpcEnvSuite with TimeLimits
{
anotherEnv.awaitTermination()
}
}
+
+ test("test process RpcEndpointVerifier separately") {
+ val conf = new CelebornConf()
+ conf.set(CelebornConf.TEST_RPC_ENDPOINT_VERIFIER_SEPARATE, true)
+ val localEnv = createRpcEnv(conf, "local", 0, clientMode = true)
+ val anotherEnv = createRpcEnv(conf, "remote", 0, clientMode = true)
+ try {
+ localEnv.setupEndpointRef(anotherEnv.address, RpcEndpointVerifier.NAME)
+ assert(
+
anotherEnv.asInstanceOf[NettyRpcEnv].dispatcher.testProcessEndpointVerifierSeparateResult)
+ } finally {
+ anotherEnv.shutdown()
+ anotherEnv.awaitTermination()
+ localEnv.shutdown()
+ localEnv.awaitTermination()
+ }
+ }
+
}
diff --git a/docs/configuration/network.md b/docs/configuration/network.md
index 2cdad5aa0..44c06ddf1 100644
--- a/docs/configuration/network.md
+++ b/docs/configuration/network.md
@@ -50,6 +50,7 @@ license: |
| celeborn.network.memory.allocator.verbose.metric | false | false | Whether
to enable verbose metric for pooled allocator. | 0.3.0 | |
| celeborn.network.timeout | 240s | false | Default timeout for network
operations. | 0.2.0 | |
| celeborn.port.maxRetries | 1 | false | When port is occupied, we will retry
for max retry times. | 0.2.0 | |
+| celeborn.rpc.RpcEndpointVerifier.separate.enabled | true | false | Whether
to enable dispatcher process RpcEndpointVerifier's request separately. | 0.7.0
| |
| celeborn.rpc.askTimeout | 60s | false | Timeout for RPC ask operations. It's
recommended to set at least `240s` when `HDFS` is enabled in
`celeborn.storage.availableTypes` | 0.2.0 | |
| celeborn.rpc.connect.threads | 64 | false | | 0.2.0 | |
| celeborn.rpc.dispatcher.threads | 0 | false | Threads number of message
dispatcher event loop. Default to 0, which is availableCore. | 0.3.0 |
celeborn.rpc.dispatcher.numThreads |