This is an automated email from the ASF dual-hosted git repository.

lizhimin pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/rocketmq.git


The following commit(s) were added to refs/heads/develop by this push:
     new 527bb9872d [ISSUE #9900] Supports sending heartbeats to the broker 
concurrently (#9901)
527bb9872d is described below

commit 527bb9872d08d20b6bbd557b9f218786a7193f9d
Author: wizcraft_kris <[email protected]>
AuthorDate: Wed Jan 14 11:15:18 2026 +0800

    [ISSUE #9900] Supports sending heartbeats to the broker concurrently (#9901)
---
 .../org/apache/rocketmq/client/ClientConfig.java   |  26 +++++
 .../client/impl/factory/MQClientInstance.java      | 117 ++++++++++++++++++++-
 .../client/impl/factory/MQClientInstanceTest.java  |  48 ++++++++-
 3 files changed, 186 insertions(+), 5 deletions(-)

diff --git a/client/src/main/java/org/apache/rocketmq/client/ClientConfig.java 
b/client/src/main/java/org/apache/rocketmq/client/ClientConfig.java
index 79cb04af1d..9e01225432 100644
--- a/client/src/main/java/org/apache/rocketmq/client/ClientConfig.java
+++ b/client/src/main/java/org/apache/rocketmq/client/ClientConfig.java
@@ -101,6 +101,10 @@ public class ClientConfig {
 
     private boolean enableHeartbeatChannelEventListener = true;
 
+    private boolean enableConcurrentHeartbeat = false;
+
+    private int concurrentHeartbeatThreadPoolSize = 
Runtime.getRuntime().availableProcessors();
+
     /**
      * The switch for message trace
      */
@@ -240,6 +244,8 @@ public class ClientConfig {
         this.namespaceV2 = cc.namespaceV2;
         this.enableTrace = cc.enableTrace;
         this.traceTopic = cc.traceTopic;
+        this.enableConcurrentHeartbeat = cc.enableConcurrentHeartbeat;
+        this.concurrentHeartbeatThreadPoolSize = 
cc.concurrentHeartbeatThreadPoolSize;
     }
 
     public ClientConfig cloneClientConfig() {
@@ -272,6 +278,8 @@ public class ClientConfig {
         cc.namespaceV2 = namespaceV2;
         cc.enableTrace = enableTrace;
         cc.traceTopic = traceTopic;
+        cc.enableConcurrentHeartbeat = enableConcurrentHeartbeat;
+        cc.concurrentHeartbeatThreadPoolSize = 
concurrentHeartbeatThreadPoolSize;
         return cc;
     }
 
@@ -525,6 +533,22 @@ public class ClientConfig {
         this.maxPageSizeInGetMetadata = maxPageSizeInGetMetadata;
     }
 
+    public boolean isEnableConcurrentHeartbeat() {
+        return this.enableConcurrentHeartbeat;
+    }
+
+    public void setEnableConcurrentHeartbeat(boolean 
enableConcurrentHeartbeat) {
+        this.enableConcurrentHeartbeat = enableConcurrentHeartbeat;
+    }
+
+    public int getConcurrentHeartbeatThreadPoolSize() {
+        return concurrentHeartbeatThreadPoolSize;
+    }
+
+    public void setConcurrentHeartbeatThreadPoolSize(int 
concurrentHeartbeatThreadPoolSize) {
+        this.concurrentHeartbeatThreadPoolSize = 
concurrentHeartbeatThreadPoolSize;
+    }
+
     @Override
     public String toString() {
         return "ClientConfig{" +
@@ -558,6 +582,8 @@ public class ClientConfig {
             ", enableHeartbeatChannelEventListener=" + 
enableHeartbeatChannelEventListener +
             ", enableTrace=" + enableTrace +
             ", traceTopic='" + traceTopic + '\'' +
+            ", enableConcurrentHeartbeat=" + enableConcurrentHeartbeat +
+            ", concurrentHeartbeatThreadPoolSize=" + 
concurrentHeartbeatThreadPoolSize +
             '}';
     }
 }
diff --git 
a/client/src/main/java/org/apache/rocketmq/client/impl/factory/MQClientInstance.java
 
b/client/src/main/java/org/apache/rocketmq/client/impl/factory/MQClientInstance.java
index bb838a6265..df93155c36 100644
--- 
a/client/src/main/java/org/apache/rocketmq/client/impl/factory/MQClientInstance.java
+++ 
b/client/src/main/java/org/apache/rocketmq/client/impl/factory/MQClientInstance.java
@@ -42,6 +42,7 @@ import org.apache.rocketmq.client.stat.ConsumerStatsManager;
 import org.apache.rocketmq.common.MQVersion;
 import org.apache.rocketmq.common.MixAll;
 import org.apache.rocketmq.common.ServiceState;
+import org.apache.rocketmq.common.ThreadFactoryImpl;
 import org.apache.rocketmq.common.constant.PermName;
 import org.apache.rocketmq.common.filter.ExpressionType;
 import org.apache.rocketmq.common.message.MessageExt;
@@ -68,6 +69,7 @@ import org.apache.rocketmq.remoting.protocol.route.BrokerData;
 import org.apache.rocketmq.remoting.protocol.route.QueueData;
 import org.apache.rocketmq.remoting.protocol.route.TopicRouteData;
 
+import java.util.ArrayList;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.HashSet;
@@ -79,7 +81,10 @@ import java.util.Random;
 import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
+import java.util.concurrent.RejectedExecutionException;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.ThreadFactory;
 import java.util.concurrent.TimeUnit;
@@ -125,7 +130,7 @@ public class MQClientInstance {
      */
     private final ConcurrentMap<String, HashMap<Long, String>> brokerAddrTable 
= new ConcurrentHashMap<>();
 
-    private final ConcurrentMap<String/* Broker Name */, HashMap<String/* 
address */, Integer>> brokerVersionTable = new ConcurrentHashMap<>();
+    private final ConcurrentMap<String/* Broker Name */, 
ConcurrentHashMap<String/* address */, Integer>> brokerVersionTable = new 
ConcurrentHashMap<>();
     private final Set<String/* Broker address */> brokerSupportV2HeartbeatSet 
= new HashSet<>();
     private final ConcurrentMap<String, Integer> 
brokerAddrHeartbeatFingerprintTable = new ConcurrentHashMap<>();
     private final ScheduledExecutorService scheduledExecutorService = 
Executors.newSingleThreadScheduledExecutor(r -> new Thread(r, 
"MQClientFactoryScheduledThread"));
@@ -142,6 +147,7 @@ public class MQClientInstance {
     private final AtomicLong sendHeartbeatTimesTotal = new AtomicLong(0);
     private ServiceState serviceState = ServiceState.CREATE_JUST;
     private final Random random = new Random();
+    private ExecutorService concurrentHeartbeatExecutor;
 
     public MQClientInstance(ClientConfig clientConfig, int instanceIndex, 
String clientId) {
         this(clientConfig, instanceIndex, clientId, null);
@@ -217,6 +223,12 @@ public class MQClientInstance {
 
         this.consumerStatsManager = new 
ConsumerStatsManager(this.scheduledExecutorService);
 
+        if (this.clientConfig.isEnableConcurrentHeartbeat()) {
+            this.concurrentHeartbeatExecutor = Executors.newFixedThreadPool(
+                clientConfig.getConcurrentHeartbeatThreadPoolSize(),
+                new ThreadFactoryImpl("MQClientConcurrentHeartbeatThread_", 
true));
+        }
+
         log.info("Created a new client Instance, InstanceIndex:{}, 
ClientID:{}, ClientConfig:{}, ClientVersion:{}, SerializerType:{}",
             instanceIndex,
             this.clientId,
@@ -537,6 +549,8 @@ public class MQClientInstance {
             try {
                 if (clientConfig.isUseHeartbeatV2()) {
                     return this.sendHeartbeatToAllBrokerV2(false);
+                } else if (clientConfig.isEnableConcurrentHeartbeat()) {
+                    return this.sendHeartbeatToAllBrokerConcurrently();
                 } else {
                     return this.sendHeartbeatToAllBroker();
                 }
@@ -641,7 +655,7 @@ public class MQClientInstance {
         try {
             int version = this.mQClientAPIImpl.sendHeartbeat(addr, 
heartbeatData, clientConfig.getMqClientApiTimeout());
             if (!this.brokerVersionTable.containsKey(brokerName)) {
-                this.brokerVersionTable.put(brokerName, new HashMap<>(4));
+                this.brokerVersionTable.put(brokerName, new 
ConcurrentHashMap<>(4));
             }
             this.brokerVersionTable.get(brokerName).put(addr, version);
             long times = this.sendHeartbeatTimesTotal.getAndIncrement();
@@ -721,7 +735,7 @@ public class MQClientInstance {
             }
             version = heartbeatV2Result.getVersion();
             if (!this.brokerVersionTable.containsKey(brokerName)) {
-                this.brokerVersionTable.put(brokerName, new HashMap<>(4));
+                this.brokerVersionTable.put(brokerName, new 
ConcurrentHashMap<>(4));
             }
             this.brokerVersionTable.get(brokerName).put(addr, version);
             long times = this.sendHeartbeatTimesTotal.getAndIncrement();
@@ -780,6 +794,100 @@ public class MQClientInstance {
         return true;
     }
 
+    private class ClientHeartBeatTask {
+        private final String brokerName;
+        private final Long brokerId;
+        private final String brokerAddr;
+        private final HeartbeatData heartbeatData;
+
+        public ClientHeartBeatTask(String brokerName, Long brokerId, String 
brokerAddr, HeartbeatData heartbeatData) {
+            this.brokerName = brokerName;
+            this.brokerId = brokerId;
+            this.brokerAddr = brokerAddr;
+            this.heartbeatData = heartbeatData;
+        }
+
+        public void execute() throws Exception {
+            int version = MQClientInstance.this.mQClientAPIImpl.sendHeartbeat(
+                brokerAddr, heartbeatData, 
MQClientInstance.this.clientConfig.getMqClientApiTimeout());
+
+            ConcurrentHashMap<String, Integer> inner = 
MQClientInstance.this.brokerVersionTable
+                .computeIfAbsent(brokerName, k -> new ConcurrentHashMap<>(4));
+            inner.put(brokerAddr, version);
+        }
+    }
+
+    private boolean sendHeartbeatToAllBrokerConcurrently() {
+        final HeartbeatData heartbeatData = this.prepareHeartbeatData(false);
+        final boolean producerEmpty = 
heartbeatData.getProducerDataSet().isEmpty();
+        final boolean consumerEmpty = 
heartbeatData.getConsumerDataSet().isEmpty();
+
+        if (producerEmpty && consumerEmpty) {
+            log.warn("sending heartbeat, but no consumer and no producer. 
[{}]", this.clientId);
+            return false;
+        }
+
+        if (this.brokerAddrTable.isEmpty()) {
+            return false;
+        }
+
+        long times = this.sendHeartbeatTimesTotal.getAndIncrement();
+        List<ClientHeartBeatTask> tasks = new ArrayList<>();
+        for (Entry<String, HashMap<Long, String>> entry : 
this.brokerAddrTable.entrySet()) {
+            String brokerName = entry.getKey();
+            HashMap<Long, String> oneTable = entry.getValue();
+            if (oneTable != null) {
+                for (Map.Entry<Long, String> entry1 : oneTable.entrySet()) {
+                    Long id = entry1.getKey();
+                    String addr = entry1.getValue();
+                    if (addr == null) continue;
+                    if (consumerEmpty && id != MixAll.MASTER_ID) continue;
+                    tasks.add(new ClientHeartBeatTask(brokerName, id, addr, 
heartbeatData));
+                }
+            }
+        }
+
+        if (tasks.isEmpty()) {
+            return false;
+        }
+
+        final CountDownLatch latch = new CountDownLatch(tasks.size());
+
+        for (ClientHeartBeatTask task : tasks) {
+            try {
+                this.concurrentHeartbeatExecutor.execute(() -> {
+                    try {
+                        task.execute();
+                        if (times % 20 == 0) {
+                            log.info("send heart beat to broker[{} {} {}] 
success", task.brokerName, task.brokerId, task.brokerAddr);
+                        }
+                    } catch (Exception e) {
+                        if 
(MQClientInstance.this.isBrokerInNameServer(task.brokerAddr)) {
+                            log.warn("send heart beat to broker[{} {} {}] 
failed", task.brokerName, task.brokerId, task.brokerAddr, e);
+                        } else {
+                            log.warn("send heart beat to broker[{} {} {}] 
exception, because the broker not up, forget it",
+                                task.brokerName, task.brokerId, 
task.brokerAddr, e);
+                        }
+                    } finally {
+                        latch.countDown();
+                    }
+                });
+            } catch (RejectedExecutionException rex) {
+                log.warn("heartbeat submission rejected for broker[{} {} {}], 
will skip this round", task.brokerName, task.brokerId, task.brokerAddr, rex);
+                latch.countDown();
+            }
+        }
+
+        try {
+            // wait all tasks finish
+            latch.await();
+        } catch (InterruptedException ie) {
+            log.warn("Interrupted while waiting for broker heartbeat tasks to 
complete", ie);
+            Thread.currentThread().interrupt();
+        }
+        return true;
+    }
+
     public boolean updateTopicRouteInfoFromNameServer(final String topic, 
boolean isDefault,
         DefaultMQProducer defaultMQProducer) {
         try {
@@ -971,6 +1079,9 @@ public class MQClientInstance {
                     this.scheduledExecutorService.shutdown();
                     this.mQClientAPIImpl.shutdown();
                     this.rebalanceService.shutdown();
+                    if (concurrentHeartbeatExecutor != null) {
+                        this.concurrentHeartbeatExecutor.shutdown();
+                    }
 
                     
MQClientManager.getInstance().removeClientFactory(this.clientId);
                     log.info("the client factory [{}] shutdown OK", 
this.clientId);
diff --git 
a/client/src/test/java/org/apache/rocketmq/client/impl/factory/MQClientInstanceTest.java
 
b/client/src/test/java/org/apache/rocketmq/client/impl/factory/MQClientInstanceTest.java
index 39cff5db82..376ff9da8e 100644
--- 
a/client/src/test/java/org/apache/rocketmq/client/impl/factory/MQClientInstanceTest.java
+++ 
b/client/src/test/java/org/apache/rocketmq/client/impl/factory/MQClientInstanceTest.java
@@ -74,6 +74,7 @@ import java.util.Properties;
 import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.ExecutorService;
 
 import static org.assertj.core.api.Assertions.assertThat;
 import static org.junit.Assert.assertEquals;
@@ -82,9 +83,11 @@ import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertNull;
 import static org.junit.Assert.assertThrows;
 import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
 import static org.mockito.ArgumentMatchers.any;
 import static org.mockito.ArgumentMatchers.anyLong;
 import static org.mockito.ArgumentMatchers.eq;
+import static org.mockito.Mockito.doAnswer;
 import static org.mockito.Mockito.doThrow;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.times;
@@ -345,8 +348,8 @@ public class MQClientInstanceTest {
     public void testFindBrokerAddressInSubscribeWithOneBroker() throws 
IllegalAccessException {
         brokerAddrTable.put(defaultBroker, createBrokerAddrMap());
         consumerTable.put(group, createMQConsumerInner());
-        ConcurrentMap<String, HashMap<String, Integer>> brokerVersionTable = 
new ConcurrentHashMap<>();
-        HashMap<String, Integer> addressMap = new HashMap<>();
+        ConcurrentMap<String, ConcurrentHashMap<String, Integer>> 
brokerVersionTable = new ConcurrentHashMap<>();
+        ConcurrentHashMap<String, Integer> addressMap = new 
ConcurrentHashMap<>();
         addressMap.put(defaultBrokerAddr, 0);
         brokerVersionTable.put(defaultBroker, addressMap);
         FieldUtils.writeDeclaredField(mqClientInstance, "brokerVersionTable", 
brokerVersionTable, true);
@@ -510,4 +513,45 @@ public class MQClientInstanceTest {
         brokerData.setBrokerAddrs(brokerAddrs);
         return Collections.singletonList(brokerData);
     }
+
+    @Test
+    public void testSendHeartbeatToAllBrokerConcurrently() {
+        try {
+            String brokerName = "BrokerA";
+            HashMap<Long, String> addrMap = new HashMap<>();
+            addrMap.put(0L, "127.0.0.1:10911");
+            addrMap.put(1L, "127.0.0.1:10912");
+            addrMap.put(2L, "127.0.0.1:10913");
+            brokerAddrTable.put(brokerName, addrMap);
+
+            DefaultMQPushConsumerImpl mockConsumer = 
mock(DefaultMQPushConsumerImpl.class);
+            
when(mockConsumer.subscriptions()).thenReturn(Collections.singleton(new 
SubscriptionData()));
+            mqClientInstance.registerConsumer("TestConsumerGroup", 
mockConsumer);
+
+            ClientConfig clientConfig = new ClientConfig();
+            FieldUtils.writeDeclaredField(clientConfig, 
"enableConcurrentHeartbeat", true, true);
+            FieldUtils.writeDeclaredField(mqClientInstance, "clientConfig", 
clientConfig, true);
+
+            ExecutorService mockExecutor = mock(ExecutorService.class);
+            doAnswer(invocation -> {
+                try {
+                    Runnable task = invocation.getArgument(0);
+                    task.run();
+                } catch (Exception e) {
+                    // ignore
+                }
+                return null;
+            }).when(mockExecutor).execute(any(Runnable.class));
+            FieldUtils.writeDeclaredField(mqClientInstance, 
"concurrentHeartbeatExecutor", mockExecutor, true);
+            MQClientAPIImpl mockMqClientAPIImpl = mock(MQClientAPIImpl.class);
+            FieldUtils.writeDeclaredField(mqClientInstance, "mQClientAPIImpl", 
mockMqClientAPIImpl, true);
+
+            mqClientInstance.sendHeartbeatToAllBrokerWithLock();
+
+            assertTrue(true);
+
+        } catch (Exception e) {
+            fail("failed: " + e.getMessage());
+        }
+    }
 }

Reply via email to