This is an automated email from the ASF dual-hosted git repository.

rexxiong pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/celeborn.git


The following commit(s) were added to refs/heads/main by this push:
     new 38f3bdd37 [CELEBORN-1909] Support pre-run static code blocks of 
TransportMessages to improve performance of protobuf serialization
38f3bdd37 is described below

commit 38f3bdd3759954d9a8fbbe95ae17ad155894cc33
Author: SteNicholas <[email protected]>
AuthorDate: Tue Mar 18 11:34:39 2025 +0800

    [CELEBORN-1909] Support pre-run static code blocks of TransportMessages to 
improve performance of protobuf serialization
    
    ### What changes were proposed in this pull request?
    
    Support pre-run static code blocks of `TransportMessages` to improve 
performance of protobuf serialization.
    
    ### Why are the changes needed?
    
    The protobuf message protocol defines many map type fields, which makes it 
time-consuming to build these message instances. This is because 
`TransportMessages` contains static code blocks to initialize a large number of 
`Descriptor`s and `FieldAccessorTable`s, where the instantiation of 
`FieldAccessorTable` includes reflection. The test result proves that the 
static code blocks execute in about 70 milliseconds.
    
    Therefore, it's better to pre-run static code blocks of `TransportMessages` 
to improve performance of protobuf serialization. Meanwhile, it's recommended 
to use repeated instead of map type field for rpc messages.
    
    ### Does this PR introduce _any_ user-facing change?
    
    No.
    
    ### How was this patch tested?
    
    CI.
    
    Closes #3149 from SteNicholas/CELEBORN-1909.
    
    Authored-by: SteNicholas <[email protected]>
    Signed-off-by: Shuang <[email protected]>
---
 CONTRIBUTING.md                                    |  3 ++
 .../apache/celeborn/client/ShuffleClientImpl.java  |  4 ++
 .../apache/celeborn/client/LifecycleManager.scala  |  4 ++
 .../network/protocol/TransportMessagesHelper.java  | 48 ++++++++++++++++++++++
 .../celeborn/service/deploy/master/Master.scala    |  5 ++-
 .../celeborn/service/deploy/worker/Worker.scala    |  4 ++
 6 files changed, 67 insertions(+), 1 deletion(-)

diff --git a/CONTRIBUTING.md b/CONTRIBUTING.md
index 3f5303260..0fbf570e3 100644
--- a/CONTRIBUTING.md
+++ b/CONTRIBUTING.md
@@ -45,6 +45,9 @@ There are already some further improvements on the schedule 
and welcome to conta
 When you add new RPC message, it's recommended to follow raw PB message case, 
for example
 `RegisterWorker` and `RegisterWorkerResponse`. The RPC messages will be 
unified into raw PB messages eventually.
 
+### Using `repeated` instead of `map` type field of RPC Messages
+When adding fields to an RPC Message, use `repeated` instead of `map` type. 
`TransportMessages` contains static code blocks to initialize many 
`Descriptor`s and `FieldAccessorTable`s, where the instantiation of 
`FieldAccessorTable` includes reflection.
+
 ### Using Error Prone
 Error Prone is a static analysis tool for Java that catches common programming 
mistakes at compile-time.
 
diff --git 
a/client/src/main/java/org/apache/celeborn/client/ShuffleClientImpl.java 
b/client/src/main/java/org/apache/celeborn/client/ShuffleClientImpl.java
index cf48f95bc..be6b5aa3f 100644
--- a/client/src/main/java/org/apache/celeborn/client/ShuffleClientImpl.java
+++ b/client/src/main/java/org/apache/celeborn/client/ShuffleClientImpl.java
@@ -56,6 +56,7 @@ import 
org.apache.celeborn.common.network.client.TransportClientFactory;
 import org.apache.celeborn.common.network.protocol.PushData;
 import org.apache.celeborn.common.network.protocol.PushMergedData;
 import org.apache.celeborn.common.network.protocol.TransportMessage;
+import org.apache.celeborn.common.network.protocol.TransportMessagesHelper;
 import org.apache.celeborn.common.network.sasl.SaslClientBootstrap;
 import org.apache.celeborn.common.network.sasl.SaslCredentials;
 import org.apache.celeborn.common.network.server.BaseMessageHandler;
@@ -185,6 +186,8 @@ public class ShuffleClientImpl extends ShuffleClient {
   protected final Map<Integer, Tuple3<ReduceFileGroups, String, Exception>> 
reduceFileGroupsMap =
       JavaUtils.newConcurrentHashMap();
 
+  private final TransportMessagesHelper messagesHelper = new 
TransportMessagesHelper();
+
   public ShuffleClientImpl(String appUniqueId, CelebornConf conf, 
UserIdentifier userIdentifier) {
     super();
     this.appUniqueId = appUniqueId;
@@ -1984,6 +1987,7 @@ public class ShuffleClientImpl extends ShuffleClient {
     shuffleIdCache.clear();
     pushExcludedWorkers.clear();
     fetchExcludedWorkers.clear();
+    messagesHelper.close();
     logger.warn("Shuffle client has been shutdown!");
   }
 
diff --git 
a/client/src/main/scala/org/apache/celeborn/client/LifecycleManager.scala 
b/client/src/main/scala/org/apache/celeborn/client/LifecycleManager.scala
index a718a2b8b..285da2296 100644
--- a/client/src/main/scala/org/apache/celeborn/client/LifecycleManager.scala
+++ b/client/src/main/scala/org/apache/celeborn/client/LifecycleManager.scala
@@ -45,6 +45,7 @@ import org.apache.celeborn.common.identity.{IdentityProvider, 
UserIdentifier}
 import org.apache.celeborn.common.internal.Logging
 import org.apache.celeborn.common.meta.{ApplicationMeta, 
ShufflePartitionLocationInfo, WorkerInfo}
 import org.apache.celeborn.common.metrics.source.Role
+import org.apache.celeborn.common.network.protocol.TransportMessagesHelper
 import org.apache.celeborn.common.network.sasl.registration.RegistrationInfo
 import org.apache.celeborn.common.protocol._
 import org.apache.celeborn.common.protocol.RpcNameConstants.WORKER_EP
@@ -237,6 +238,8 @@ class LifecycleManager(val appUniqueId: String, val conf: 
CelebornConf) extends
   private val changePartitionManager = new ChangePartitionManager(conf, this)
   private val releasePartitionManager = new ReleasePartitionManager(conf, this)
 
+  private val messagesHelper: TransportMessagesHelper = new 
TransportMessagesHelper()
+
   // Since method `onStart` is executed when `rpcEnv.setupEndpoint` is 
executed, and
   // `masterClient` is initialized after `rpcEnv` is initialized, if method 
`onStart` contains
   // a reference to `masterClient`, there may be cases where `masterClient` is 
null when
@@ -287,6 +290,7 @@ class LifecycleManager(val appUniqueId: String, val conf: 
CelebornConf) extends
         workerRpcEnvInUse.awaitTermination()
       }
     }
+    messagesHelper.close()
   }
 
   /**
diff --git 
a/common/src/main/java/org/apache/celeborn/common/network/protocol/TransportMessagesHelper.java
 
b/common/src/main/java/org/apache/celeborn/common/network/protocol/TransportMessagesHelper.java
new file mode 100644
index 000000000..8cc8c79f0
--- /dev/null
+++ 
b/common/src/main/java/org/apache/celeborn/common/network/protocol/TransportMessagesHelper.java
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.celeborn.common.network.protocol;
+
+import java.util.concurrent.Future;
+import java.util.concurrent.ScheduledExecutorService;
+
+import com.google.protobuf.ExtensionRegistry;
+
+import org.apache.celeborn.common.protocol.TransportMessages;
+import org.apache.celeborn.common.util.ThreadUtils;
+
+public class TransportMessagesHelper {
+
+  private final ScheduledExecutorService transportMessagesRunner;
+  private final Future<?> runTransportMessagesStaticBlockerTask;
+
+  public TransportMessagesHelper() {
+    transportMessagesRunner =
+        
ThreadUtils.newDaemonSingleThreadScheduledExecutor("transport-messages-runner");
+    runTransportMessagesStaticBlockerTask =
+        transportMessagesRunner.submit(
+            () ->
+                // Pre-run TransportMessages static code blocks to improve 
performance of protobuf
+                // serialization.
+                
TransportMessages.registerAllExtensions(ExtensionRegistry.newInstance()));
+  }
+
+  public void close() {
+    runTransportMessagesStaticBlockerTask.cancel(true);
+    ThreadUtils.shutdown(transportMessagesRunner);
+  }
+}
diff --git 
a/master/src/main/scala/org/apache/celeborn/service/deploy/master/Master.scala 
b/master/src/main/scala/org/apache/celeborn/service/deploy/master/Master.scala
index 6b5152409..1e52da3e2 100644
--- 
a/master/src/main/scala/org/apache/celeborn/service/deploy/master/Master.scala
+++ 
b/master/src/main/scala/org/apache/celeborn/service/deploy/master/Master.scala
@@ -42,7 +42,7 @@ import org.apache.celeborn.common.meta.{DiskInfo, WorkerInfo, 
WorkerStatus}
 import org.apache.celeborn.common.metrics.MetricsSystem
 import org.apache.celeborn.common.metrics.source.{JVMCPUSource, JVMSource, 
ResourceConsumptionSource, Role, SystemMiscSource, ThreadPoolSource}
 import org.apache.celeborn.common.network.CelebornRackResolver
-import org.apache.celeborn.common.network.protocol.TransportMessage
+import org.apache.celeborn.common.network.protocol.{TransportMessage, 
TransportMessagesHelper}
 import org.apache.celeborn.common.protocol._
 import org.apache.celeborn.common.protocol.message.ControlMessages._
 import org.apache.celeborn.common.protocol.message.StatusCode
@@ -309,6 +309,8 @@ private[celeborn] class Master(
       : util.concurrent.ConcurrentHashMap[String, util.Set[WorkerInfo]] =
     JavaUtils.newConcurrentHashMap[String, util.Set[WorkerInfo]]()
 
+  private val messagesHelper: TransportMessagesHelper = new 
TransportMessagesHelper()
+
   // start threads to check timeout for workers and applications
   override def onStart(): Unit = {
     if (!threadsStarted.compareAndSet(false, true)) {
@@ -363,6 +365,7 @@ private[celeborn] class Master(
     if (authEnabled) {
       sendApplicationMetaExecutor.shutdownNow()
     }
+    messagesHelper.close()
     logInfo("Celeborn Master is stopped.")
   }
 
diff --git 
a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/Worker.scala 
b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/Worker.scala
index 5a229fa54..077adf727 100644
--- 
a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/Worker.scala
+++ 
b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/Worker.scala
@@ -38,6 +38,7 @@ import org.apache.celeborn.common.meta.{DiskInfo, WorkerInfo, 
WorkerPartitionLoc
 import org.apache.celeborn.common.metrics.MetricsSystem
 import org.apache.celeborn.common.metrics.source.{JVMCPUSource, JVMSource, 
ResourceConsumptionSource, Role, SystemMiscSource, ThreadPoolSource}
 import org.apache.celeborn.common.network.{CelebornRackResolver, 
TransportContext}
+import org.apache.celeborn.common.network.protocol.TransportMessagesHelper
 import org.apache.celeborn.common.network.sasl.SaslServerBootstrap
 import org.apache.celeborn.common.network.server.TransportServerBootstrap
 import org.apache.celeborn.common.network.util.TransportConf
@@ -363,6 +364,8 @@ private[celeborn] class Worker(
     jvmQuake.start()
   }
 
+  private val messagesHelper: TransportMessagesHelper = new 
TransportMessagesHelper()
+
   workerSource.addGauge(WorkerSource.REGISTERED_SHUFFLE_COUNT) { () =>
     workerInfo.getShuffleKeySet.size
   }
@@ -623,6 +626,7 @@ private[celeborn] class Worker(
       if (conf.internalPortEnabled) {
         internalRpcEnvInUse.stop(internalRpcEndpointRef)
       }
+      messagesHelper.close()
       super.stop(exitKind)
 
       logInfo("Worker is stopped.")

Reply via email to