This is an automated email from the ASF dual-hosted git repository.
rexxiong pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/celeborn.git
The following commit(s) were added to refs/heads/main by this push:
new 38f3bdd37 [CELEBORN-1909] Support pre-run static code blocks of
TransportMessages to improve performance of protobuf serialization
38f3bdd37 is described below
commit 38f3bdd3759954d9a8fbbe95ae17ad155894cc33
Author: SteNicholas <[email protected]>
AuthorDate: Tue Mar 18 11:34:39 2025 +0800
[CELEBORN-1909] Support pre-run static code blocks of TransportMessages to
improve performance of protobuf serialization
### What changes were proposed in this pull request?
Support pre-run static code blocks of `TransportMessages` to improve
performance of protobuf serialization.
### Why are the changes needed?
The protobuf message protocol defines many map type fields, which makes it
time-consuming to build these message instances. This is because
`TransportMessages` contains static code blocks to initialize a large number of
`Descriptor`s and `FieldAccessorTable`s, where the instantiation of
`FieldAccessorTable` includes reflection. The test result proves that the
static code blocks execute in about 70 milliseconds.
Therefore, it's better to pre-run static code blocks of `TransportMessages`
to improve performance of protobuf serialization. Meanwhile, it's recommended
to use repeated instead of map type field for rpc messages.
### Does this PR introduce _any_ user-facing change?
No.
### How was this patch tested?
CI.
Closes #3149 from SteNicholas/CELEBORN-1909.
Authored-by: SteNicholas <[email protected]>
Signed-off-by: Shuang <[email protected]>
---
CONTRIBUTING.md | 3 ++
.../apache/celeborn/client/ShuffleClientImpl.java | 4 ++
.../apache/celeborn/client/LifecycleManager.scala | 4 ++
.../network/protocol/TransportMessagesHelper.java | 48 ++++++++++++++++++++++
.../celeborn/service/deploy/master/Master.scala | 5 ++-
.../celeborn/service/deploy/worker/Worker.scala | 4 ++
6 files changed, 67 insertions(+), 1 deletion(-)
diff --git a/CONTRIBUTING.md b/CONTRIBUTING.md
index 3f5303260..0fbf570e3 100644
--- a/CONTRIBUTING.md
+++ b/CONTRIBUTING.md
@@ -45,6 +45,9 @@ There are already some further improvements on the schedule
and welcome to conta
When you add new RPC message, it's recommended to follow raw PB message case,
for example
`RegisterWorker` and `RegisterWorkerResponse`. The RPC messages will be
unified into raw PB messages eventually.
+### Using `repeated` instead of `map` type field of RPC Messages
+When adding fields to an RPC Message, use `repeated` instead of `map` type.
`TransportMessages` contains static code blocks to initialize many
`Descriptor`s and `FieldAccessorTable`s, where the instantiation of
`FieldAccessorTable` includes reflection.
+
### Using Error Prone
Error Prone is a static analysis tool for Java that catches common programming
mistakes at compile-time.
diff --git
a/client/src/main/java/org/apache/celeborn/client/ShuffleClientImpl.java
b/client/src/main/java/org/apache/celeborn/client/ShuffleClientImpl.java
index cf48f95bc..be6b5aa3f 100644
--- a/client/src/main/java/org/apache/celeborn/client/ShuffleClientImpl.java
+++ b/client/src/main/java/org/apache/celeborn/client/ShuffleClientImpl.java
@@ -56,6 +56,7 @@ import
org.apache.celeborn.common.network.client.TransportClientFactory;
import org.apache.celeborn.common.network.protocol.PushData;
import org.apache.celeborn.common.network.protocol.PushMergedData;
import org.apache.celeborn.common.network.protocol.TransportMessage;
+import org.apache.celeborn.common.network.protocol.TransportMessagesHelper;
import org.apache.celeborn.common.network.sasl.SaslClientBootstrap;
import org.apache.celeborn.common.network.sasl.SaslCredentials;
import org.apache.celeborn.common.network.server.BaseMessageHandler;
@@ -185,6 +186,8 @@ public class ShuffleClientImpl extends ShuffleClient {
protected final Map<Integer, Tuple3<ReduceFileGroups, String, Exception>>
reduceFileGroupsMap =
JavaUtils.newConcurrentHashMap();
+ private final TransportMessagesHelper messagesHelper = new
TransportMessagesHelper();
+
public ShuffleClientImpl(String appUniqueId, CelebornConf conf,
UserIdentifier userIdentifier) {
super();
this.appUniqueId = appUniqueId;
@@ -1984,6 +1987,7 @@ public class ShuffleClientImpl extends ShuffleClient {
shuffleIdCache.clear();
pushExcludedWorkers.clear();
fetchExcludedWorkers.clear();
+ messagesHelper.close();
logger.warn("Shuffle client has been shutdown!");
}
diff --git
a/client/src/main/scala/org/apache/celeborn/client/LifecycleManager.scala
b/client/src/main/scala/org/apache/celeborn/client/LifecycleManager.scala
index a718a2b8b..285da2296 100644
--- a/client/src/main/scala/org/apache/celeborn/client/LifecycleManager.scala
+++ b/client/src/main/scala/org/apache/celeborn/client/LifecycleManager.scala
@@ -45,6 +45,7 @@ import org.apache.celeborn.common.identity.{IdentityProvider,
UserIdentifier}
import org.apache.celeborn.common.internal.Logging
import org.apache.celeborn.common.meta.{ApplicationMeta,
ShufflePartitionLocationInfo, WorkerInfo}
import org.apache.celeborn.common.metrics.source.Role
+import org.apache.celeborn.common.network.protocol.TransportMessagesHelper
import org.apache.celeborn.common.network.sasl.registration.RegistrationInfo
import org.apache.celeborn.common.protocol._
import org.apache.celeborn.common.protocol.RpcNameConstants.WORKER_EP
@@ -237,6 +238,8 @@ class LifecycleManager(val appUniqueId: String, val conf:
CelebornConf) extends
private val changePartitionManager = new ChangePartitionManager(conf, this)
private val releasePartitionManager = new ReleasePartitionManager(conf, this)
+ private val messagesHelper: TransportMessagesHelper = new
TransportMessagesHelper()
+
// Since method `onStart` is executed when `rpcEnv.setupEndpoint` is
executed, and
// `masterClient` is initialized after `rpcEnv` is initialized, if method
`onStart` contains
// a reference to `masterClient`, there may be cases where `masterClient` is
null when
@@ -287,6 +290,7 @@ class LifecycleManager(val appUniqueId: String, val conf:
CelebornConf) extends
workerRpcEnvInUse.awaitTermination()
}
}
+ messagesHelper.close()
}
/**
diff --git
a/common/src/main/java/org/apache/celeborn/common/network/protocol/TransportMessagesHelper.java
b/common/src/main/java/org/apache/celeborn/common/network/protocol/TransportMessagesHelper.java
new file mode 100644
index 000000000..8cc8c79f0
--- /dev/null
+++
b/common/src/main/java/org/apache/celeborn/common/network/protocol/TransportMessagesHelper.java
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.celeborn.common.network.protocol;
+
+import java.util.concurrent.Future;
+import java.util.concurrent.ScheduledExecutorService;
+
+import com.google.protobuf.ExtensionRegistry;
+
+import org.apache.celeborn.common.protocol.TransportMessages;
+import org.apache.celeborn.common.util.ThreadUtils;
+
+public class TransportMessagesHelper {
+
+ private final ScheduledExecutorService transportMessagesRunner;
+ private final Future<?> runTransportMessagesStaticBlockerTask;
+
+ public TransportMessagesHelper() {
+ transportMessagesRunner =
+
ThreadUtils.newDaemonSingleThreadScheduledExecutor("transport-messages-runner");
+ runTransportMessagesStaticBlockerTask =
+ transportMessagesRunner.submit(
+ () ->
+ // Pre-run TransportMessages static code blocks to improve
performance of protobuf
+ // serialization.
+
TransportMessages.registerAllExtensions(ExtensionRegistry.newInstance()));
+ }
+
+ public void close() {
+ runTransportMessagesStaticBlockerTask.cancel(true);
+ ThreadUtils.shutdown(transportMessagesRunner);
+ }
+}
diff --git
a/master/src/main/scala/org/apache/celeborn/service/deploy/master/Master.scala
b/master/src/main/scala/org/apache/celeborn/service/deploy/master/Master.scala
index 6b5152409..1e52da3e2 100644
---
a/master/src/main/scala/org/apache/celeborn/service/deploy/master/Master.scala
+++
b/master/src/main/scala/org/apache/celeborn/service/deploy/master/Master.scala
@@ -42,7 +42,7 @@ import org.apache.celeborn.common.meta.{DiskInfo, WorkerInfo,
WorkerStatus}
import org.apache.celeborn.common.metrics.MetricsSystem
import org.apache.celeborn.common.metrics.source.{JVMCPUSource, JVMSource,
ResourceConsumptionSource, Role, SystemMiscSource, ThreadPoolSource}
import org.apache.celeborn.common.network.CelebornRackResolver
-import org.apache.celeborn.common.network.protocol.TransportMessage
+import org.apache.celeborn.common.network.protocol.{TransportMessage,
TransportMessagesHelper}
import org.apache.celeborn.common.protocol._
import org.apache.celeborn.common.protocol.message.ControlMessages._
import org.apache.celeborn.common.protocol.message.StatusCode
@@ -309,6 +309,8 @@ private[celeborn] class Master(
: util.concurrent.ConcurrentHashMap[String, util.Set[WorkerInfo]] =
JavaUtils.newConcurrentHashMap[String, util.Set[WorkerInfo]]()
+ private val messagesHelper: TransportMessagesHelper = new
TransportMessagesHelper()
+
// start threads to check timeout for workers and applications
override def onStart(): Unit = {
if (!threadsStarted.compareAndSet(false, true)) {
@@ -363,6 +365,7 @@ private[celeborn] class Master(
if (authEnabled) {
sendApplicationMetaExecutor.shutdownNow()
}
+ messagesHelper.close()
logInfo("Celeborn Master is stopped.")
}
diff --git
a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/Worker.scala
b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/Worker.scala
index 5a229fa54..077adf727 100644
---
a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/Worker.scala
+++
b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/Worker.scala
@@ -38,6 +38,7 @@ import org.apache.celeborn.common.meta.{DiskInfo, WorkerInfo,
WorkerPartitionLoc
import org.apache.celeborn.common.metrics.MetricsSystem
import org.apache.celeborn.common.metrics.source.{JVMCPUSource, JVMSource,
ResourceConsumptionSource, Role, SystemMiscSource, ThreadPoolSource}
import org.apache.celeborn.common.network.{CelebornRackResolver,
TransportContext}
+import org.apache.celeborn.common.network.protocol.TransportMessagesHelper
import org.apache.celeborn.common.network.sasl.SaslServerBootstrap
import org.apache.celeborn.common.network.server.TransportServerBootstrap
import org.apache.celeborn.common.network.util.TransportConf
@@ -363,6 +364,8 @@ private[celeborn] class Worker(
jvmQuake.start()
}
+ private val messagesHelper: TransportMessagesHelper = new
TransportMessagesHelper()
+
workerSource.addGauge(WorkerSource.REGISTERED_SHUFFLE_COUNT) { () =>
workerInfo.getShuffleKeySet.size
}
@@ -623,6 +626,7 @@ private[celeborn] class Worker(
if (conf.internalPortEnabled) {
internalRpcEnvInUse.stop(internalRpcEndpointRef)
}
+ messagesHelper.close()
super.stop(exitKind)
logInfo("Worker is stopped.")