This is an automated email from the ASF dual-hosted git repository.
lizhimin pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/rocketmq.git
The following commit(s) were added to refs/heads/develop by this push:
new 564e55ea58 [ISSUE #8970] Remove redundant heartbeats (#8971)
564e55ea58 is described below
commit 564e55ea58ba10e366d1136b5381f10e5a5c58e0
Author: weihubeats <[email protected]>
AuthorDate: Tue Dec 10 14:54:42 2024 +0800
[ISSUE #8970] Remove redundant heartbeats (#8971)
---
.../client/impl/factory/MQClientInstance.java | 26 +++++++++++++++++-----
1 file changed, 21 insertions(+), 5 deletions(-)
diff --git
a/client/src/main/java/org/apache/rocketmq/client/impl/factory/MQClientInstance.java
b/client/src/main/java/org/apache/rocketmq/client/impl/factory/MQClientInstance.java
index 8cc910487c..eba654c22d 100644
---
a/client/src/main/java/org/apache/rocketmq/client/impl/factory/MQClientInstance.java
+++
b/client/src/main/java/org/apache/rocketmq/client/impl/factory/MQClientInstance.java
@@ -16,6 +16,7 @@
*/
package org.apache.rocketmq.client.impl.factory;
+import com.alibaba.fastjson.JSON;
import io.netty.channel.Channel;
import java.util.Collections;
import java.util.HashMap;
@@ -35,7 +36,6 @@ import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
-import com.alibaba.fastjson.JSON;
import org.apache.commons.lang3.StringUtils;
import org.apache.rocketmq.client.ClientConfig;
import org.apache.rocketmq.client.admin.MQAdminExtInner;
@@ -66,6 +66,8 @@ import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.common.message.MessageQueue;
import org.apache.rocketmq.common.message.MessageQueueAssignment;
import org.apache.rocketmq.common.topic.TopicValidator;
+import org.apache.rocketmq.logging.org.slf4j.Logger;
+import org.apache.rocketmq.logging.org.slf4j.LoggerFactory;
import org.apache.rocketmq.remoting.ChannelEventListener;
import org.apache.rocketmq.remoting.RPCHook;
import org.apache.rocketmq.remoting.common.HeartbeatV2Result;
@@ -83,8 +85,6 @@ import
org.apache.rocketmq.remoting.protocol.heartbeat.SubscriptionData;
import org.apache.rocketmq.remoting.protocol.route.BrokerData;
import org.apache.rocketmq.remoting.protocol.route.QueueData;
import org.apache.rocketmq.remoting.protocol.route.TopicRouteData;
-import org.apache.rocketmq.logging.org.slf4j.Logger;
-import org.apache.rocketmq.logging.org.slf4j.LoggerFactory;
import static
org.apache.rocketmq.remoting.rpc.ClientMetadata.topicRouteData2EndpointsForStaticTopic;
@@ -157,7 +157,9 @@ public class MQClientInstance {
ChannelEventListener channelEventListener;
if (clientConfig.isEnableHeartbeatChannelEventListener()) {
channelEventListener = new ChannelEventListener() {
+
private final ConcurrentMap<String, HashMap<Long, String>>
brokerAddrTable = MQClientInstance.this.brokerAddrTable;
+
@Override
public void onChannelConnect(String remoteAddr, Channel
channel) {
}
@@ -182,7 +184,7 @@ public class MQClientInstance {
if (addr.equals(remoteAddr)) {
long id = entry.getKey();
String brokerName = addressEntry.getKey();
- if (sendHeartbeatToBroker(id, brokerName,
addr)) {
+ if (sendHeartbeatToBroker(id, brokerName,
addr, false)) {
rebalanceImmediately();
}
break;
@@ -591,6 +593,18 @@ public class MQClientInstance {
}
public boolean sendHeartbeatToBroker(long id, String brokerName, String
addr) {
+ return sendHeartbeatToBroker(id, brokerName, addr, true);
+ }
+
+ /**
+ * @param id
+ * @param brokerName
+ * @param addr
+ * @param strictLockMode When the connection is initially established,
sending a heartbeat will simultaneously trigger the onChannelActive event to
acquire the lock again, causing an exception. Therefore,
+ * the exception that occurs when sending the
heartbeat during the initial onChannelActive event can be ignored.
+ * @return
+ */
+ public boolean sendHeartbeatToBroker(long id, String brokerName, String
addr, boolean strictLockMode) {
if (this.lockHeartbeat.tryLock()) {
final HeartbeatData heartbeatDataWithSub =
this.prepareHeartbeatData(false);
final boolean producerEmpty =
heartbeatDataWithSub.getProducerDataSet().isEmpty();
@@ -615,7 +629,9 @@ public class MQClientInstance {
this.lockHeartbeat.unlock();
}
} else {
- log.warn("lock heartBeat, but failed. [{}]", this.clientId);
+ if (strictLockMode) {
+ log.warn("lock heartBeat, but failed. [{}]", this.clientId);
+ }
}
return false;
}