This is an automated email from the ASF dual-hosted git repository.
jinrongtong pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/rocketmq.git
The following commit(s) were added to refs/heads/develop by this push:
new 620e6a2544 [ISSUE #7642] Add return value for sendHeartbeat related
method
620e6a2544 is described below
commit 620e6a25441441b6430ce377ba1c5734a5cc7dfa
Author: Zhouxiang Zhan <[email protected]>
AuthorDate: Wed Dec 27 10:42:22 2023 +0800
[ISSUE #7642] Add return value for sendHeartbeat related method
Add sendHeartbeatToBroker
---
.../impl/consumer/DefaultMQPushConsumerImpl.java | 5 +-
.../client/impl/factory/MQClientInstance.java | 194 +++++++++++++--------
2 files changed, 126 insertions(+), 73 deletions(-)
diff --git
a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultMQPushConsumerImpl.java
b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultMQPushConsumerImpl.java
index cbde258655..15563a4f0e 100644
---
a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultMQPushConsumerImpl.java
+++
b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultMQPushConsumerImpl.java
@@ -999,8 +999,9 @@ public class DefaultMQPushConsumerImpl implements
MQConsumerInner {
this.updateTopicSubscribeInfoWhenSubscriptionChanged();
this.mQClientFactory.checkClientInBroker();
- this.mQClientFactory.sendHeartbeatToAllBrokerWithLock();
- this.mQClientFactory.rebalanceImmediately();
+ if (this.mQClientFactory.sendHeartbeatToAllBrokerWithLock()) {
+ this.mQClientFactory.rebalanceImmediately();
+ }
}
private void checkConfig() throws MQClientException {
diff --git
a/client/src/main/java/org/apache/rocketmq/client/impl/factory/MQClientInstance.java
b/client/src/main/java/org/apache/rocketmq/client/impl/factory/MQClientInstance.java
index ba72a6dce7..ad39372d35 100644
---
a/client/src/main/java/org/apache/rocketmq/client/impl/factory/MQClientInstance.java
+++
b/client/src/main/java/org/apache/rocketmq/client/impl/factory/MQClientInstance.java
@@ -176,9 +176,14 @@ public class MQClientInstance {
@Override
public void onChannelActive(String remoteAddr, Channel
channel) {
for (Map.Entry<String, HashMap<Long, String>> addressEntry
: brokerAddrTable.entrySet()) {
- for (String address :
addressEntry.getValue().values()) {
- if (address.equals(remoteAddr)) {
- sendHeartbeatToAllBrokerWithLockV2(false);
+ for (Map.Entry<Long, String> entry :
addressEntry.getValue().entrySet()) {
+ String addr = entry.getValue();
+ if (addr.equals(remoteAddr)) {
+ long id = entry.getKey();
+ String brokerName = addressEntry.getKey();
+ if (sendHeartbeatToBroker(id, brokerName,
addr)) {
+ rebalanceImmediately();
+ }
break;
}
}
@@ -504,13 +509,13 @@ public class MQClientInstance {
}
}
- public void sendHeartbeatToAllBrokerWithLockV2(boolean isRebalance) {
+ public boolean sendHeartbeatToAllBrokerWithLockV2(boolean isRebalance) {
if (this.lockHeartbeat.tryLock()) {
try {
if (clientConfig.isUseHeartbeatV2()) {
- this.sendHeartbeatToAllBrokerV2(isRebalance);
+ return this.sendHeartbeatToAllBrokerV2(isRebalance);
} else {
- this.sendHeartbeatToAllBroker();
+ return this.sendHeartbeatToAllBroker();
}
} catch (final Exception e) {
log.error("sendHeartbeatToAllBrokerWithLockV2 exception", e);
@@ -520,15 +525,16 @@ public class MQClientInstance {
} else {
log.warn("sendHeartbeatToAllBrokerWithLockV2 lock heartBeat, but
failed.");
}
+ return false;
}
- public void sendHeartbeatToAllBrokerWithLock() {
+ public boolean sendHeartbeatToAllBrokerWithLock() {
if (this.lockHeartbeat.tryLock()) {
try {
if (clientConfig.isUseHeartbeatV2()) {
- this.sendHeartbeatToAllBrokerV2(false);
+ return this.sendHeartbeatToAllBrokerV2(false);
} else {
- this.sendHeartbeatToAllBroker();
+ return this.sendHeartbeatToAllBroker();
}
} catch (final Exception e) {
log.error("sendHeartbeatToAllBroker exception", e);
@@ -538,6 +544,7 @@ public class MQClientInstance {
} else {
log.warn("lock heartBeat, but failed. [{}]", this.clientId);
}
+ return false;
}
private void persistAllConsumerOffset() {
@@ -582,19 +589,72 @@ public class MQClientInstance {
return false;
}
- private void sendHeartbeatToAllBroker() {
+ public boolean sendHeartbeatToBroker(long id, String brokerName, String
addr) {
+ if (this.lockHeartbeat.tryLock()) {
+ final HeartbeatData heartbeatDataWithSub =
this.prepareHeartbeatData(false);
+ final boolean producerEmpty =
heartbeatDataWithSub.getProducerDataSet().isEmpty();
+ final boolean consumerEmpty =
heartbeatDataWithSub.getConsumerDataSet().isEmpty();
+ if (producerEmpty && consumerEmpty) {
+ log.warn("sendHeartbeatToBroker sending heartbeat, but no
consumer and no producer. [{}]", this.clientId);
+ return false;
+ }
+ try {
+ if (clientConfig.isUseHeartbeatV2()) {
+ int currentHeartbeatFingerprint =
heartbeatDataWithSub.computeHeartbeatFingerprint();
+
heartbeatDataWithSub.setHeartbeatFingerprint(currentHeartbeatFingerprint);
+ HeartbeatData heartbeatDataWithoutSub =
this.prepareHeartbeatData(true);
+
heartbeatDataWithoutSub.setHeartbeatFingerprint(currentHeartbeatFingerprint);
+ return this.sendHeartbeatToBrokerV2(id, brokerName, addr,
heartbeatDataWithSub, heartbeatDataWithoutSub, currentHeartbeatFingerprint);
+ } else {
+ return this.sendHeartbeatToBroker(id, brokerName, addr,
heartbeatDataWithSub);
+ }
+ } catch (final Exception e) {
+ log.error("sendHeartbeatToAllBroker exception", e);
+ } finally {
+ this.lockHeartbeat.unlock();
+ }
+ } else {
+ log.warn("lock heartBeat, but failed. [{}]", this.clientId);
+ }
+ return false;
+ }
+
+ private boolean sendHeartbeatToBroker(long id, String brokerName, String
addr, HeartbeatData heartbeatData) {
+ try {
+ int version = this.mQClientAPIImpl.sendHeartbeat(addr,
heartbeatData, clientConfig.getMqClientApiTimeout());
+ if (!this.brokerVersionTable.containsKey(brokerName)) {
+ this.brokerVersionTable.put(brokerName, new HashMap<>(4));
+ }
+ this.brokerVersionTable.get(brokerName).put(addr, version);
+ long times = this.sendHeartbeatTimesTotal.getAndIncrement();
+ if (times % 20 == 0) {
+ log.info("send heart beat to broker[{} {} {}] success",
brokerName, id, addr);
+ log.info(heartbeatData.toString());
+ }
+ return true;
+ } catch (Exception e) {
+ if (this.isBrokerInNameServer(addr)) {
+ log.warn("send heart beat to broker[{} {} {}] failed",
brokerName, id, addr, e);
+ } else {
+ log.warn("send heart beat to broker[{} {} {}] exception,
because the broker not up, forget it", brokerName,
+ id, addr, e);
+ }
+ }
+ return false;
+ }
+
+ private boolean sendHeartbeatToAllBroker() {
final HeartbeatData heartbeatData = this.prepareHeartbeatData(false);
final boolean producerEmpty =
heartbeatData.getProducerDataSet().isEmpty();
final boolean consumerEmpty =
heartbeatData.getConsumerDataSet().isEmpty();
if (producerEmpty && consumerEmpty) {
log.warn("sending heartbeat, but no consumer and no producer.
[{}]", this.clientId);
- return;
+ return false;
}
if (this.brokerAddrTable.isEmpty()) {
- return;
+ return false;
}
- long times = this.sendHeartbeatTimesTotal.getAndIncrement();
for (Entry<String, HashMap<Long, String>> brokerClusterInfo :
this.brokerAddrTable.entrySet()) {
String brokerName = brokerClusterInfo.getKey();
HashMap<Long, String> oneTable = brokerClusterInfo.getValue();
@@ -611,43 +671,71 @@ public class MQClientInstance {
continue;
}
- try {
- int version = this.mQClientAPIImpl.sendHeartbeat(addr,
heartbeatData, clientConfig.getMqClientApiTimeout());
- if (!this.brokerVersionTable.containsKey(brokerName)) {
- this.brokerVersionTable.put(brokerName, new
HashMap<>(4));
- }
- this.brokerVersionTable.get(brokerName).put(addr, version);
- if (times % 20 == 0) {
- log.info("send heart beat to broker[{} {} {}]
success", brokerName, id, addr);
- log.info(heartbeatData.toString());
- }
- } catch (Exception e) {
- if (this.isBrokerInNameServer(addr)) {
- log.warn("send heart beat to broker[{} {} {}] failed",
brokerName, id, addr, e);
- } else {
- log.warn("send heart beat to broker[{} {} {}]
exception, because the broker not up, forget it", brokerName,
- id, addr, e);
+ sendHeartbeatToBroker(id, brokerName, addr, heartbeatData);
+ }
+ }
+ return true;
+ }
+
+ private boolean sendHeartbeatToBrokerV2(long id, String brokerName, String
addr, HeartbeatData heartbeatDataWithSub,
+ HeartbeatData heartbeatDataWithoutSub, int
currentHeartbeatFingerprint) {
+ try {
+ int version = 0;
+ boolean isBrokerSupportV2 =
brokerSupportV2HeartbeatSet.contains(addr);
+ HeartbeatV2Result heartbeatV2Result = null;
+ if (isBrokerSupportV2 && null !=
brokerAddrHeartbeatFingerprintTable.get(addr) &&
brokerAddrHeartbeatFingerprintTable.get(addr) == currentHeartbeatFingerprint) {
+ heartbeatV2Result = this.mQClientAPIImpl.sendHeartbeatV2(addr,
heartbeatDataWithoutSub, clientConfig.getMqClientApiTimeout());
+ if (heartbeatV2Result.isSubChange()) {
+ brokerAddrHeartbeatFingerprintTable.remove(addr);
+ }
+ log.info("sendHeartbeatToAllBrokerV2 simple brokerName: {}
subChange: {} brokerAddrHeartbeatFingerprintTable: {}", brokerName,
heartbeatV2Result.isSubChange(),
JSON.toJSONString(brokerAddrHeartbeatFingerprintTable));
+ } else {
+ heartbeatV2Result = this.mQClientAPIImpl.sendHeartbeatV2(addr,
heartbeatDataWithSub, clientConfig.getMqClientApiTimeout());
+ if (heartbeatV2Result.isSupportV2()) {
+ brokerSupportV2HeartbeatSet.add(addr);
+ if (heartbeatV2Result.isSubChange()) {
+ brokerAddrHeartbeatFingerprintTable.remove(addr);
+ } else if
(!brokerAddrHeartbeatFingerprintTable.containsKey(addr) ||
brokerAddrHeartbeatFingerprintTable.get(addr) != currentHeartbeatFingerprint) {
+ brokerAddrHeartbeatFingerprintTable.put(addr,
currentHeartbeatFingerprint);
}
}
+ log.info("sendHeartbeatToAllBrokerV2 normal brokerName: {}
subChange: {} brokerAddrHeartbeatFingerprintTable: {}", brokerName,
heartbeatV2Result.isSubChange(),
JSON.toJSONString(brokerAddrHeartbeatFingerprintTable));
+ }
+ version = heartbeatV2Result.getVersion();
+ if (!this.brokerVersionTable.containsKey(brokerName)) {
+ this.brokerVersionTable.put(brokerName, new HashMap<>(4));
+ }
+ this.brokerVersionTable.get(brokerName).put(addr, version);
+ long times = this.sendHeartbeatTimesTotal.getAndIncrement();
+ if (times % 20 == 0) {
+ log.info("send heart beat to broker[{} {} {}] success",
brokerName, id, addr);
+ log.info(heartbeatDataWithSub.toString());
+ }
+ return true;
+ } catch (Exception e) {
+ if (this.isBrokerInNameServer(addr)) {
+ log.warn("sendHeartbeatToAllBrokerV2 send heart beat to
broker[{} {} {}] failed", brokerName, id, addr, e);
+ } else {
+ log.warn("sendHeartbeatToAllBrokerV2 send heart beat to
broker[{} {} {}] exception, because the broker not up, forget it", brokerName,
id, addr, e);
}
}
+ return false;
}
- private void sendHeartbeatToAllBrokerV2(boolean isRebalance) {
+ private boolean sendHeartbeatToAllBrokerV2(boolean isRebalance) {
final HeartbeatData heartbeatDataWithSub =
this.prepareHeartbeatData(false);
final boolean producerEmpty =
heartbeatDataWithSub.getProducerDataSet().isEmpty();
final boolean consumerEmpty =
heartbeatDataWithSub.getConsumerDataSet().isEmpty();
if (producerEmpty && consumerEmpty) {
log.warn("sendHeartbeatToAllBrokerV2 sending heartbeat, but no
consumer and no producer. [{}]", this.clientId);
- return;
+ return false;
}
if (this.brokerAddrTable.isEmpty()) {
- return;
+ return false;
}
if (isRebalance) {
resetBrokerAddrHeartbeatFingerprintMap();
}
- long times = this.sendHeartbeatTimesTotal.getAndIncrement();
int currentHeartbeatFingerprint =
heartbeatDataWithSub.computeHeartbeatFingerprint();
heartbeatDataWithSub.setHeartbeatFingerprint(currentHeartbeatFingerprint);
HeartbeatData heartbeatDataWithoutSub =
this.prepareHeartbeatData(true);
@@ -668,46 +756,10 @@ public class MQClientInstance {
if (consumerEmpty && MixAll.MASTER_ID != id) {
continue;
}
- try {
- int version = 0;
- boolean isBrokerSupportV2 =
brokerSupportV2HeartbeatSet.contains(addr);
- HeartbeatV2Result heartbeatV2Result = null;
- if (isBrokerSupportV2 && null !=
brokerAddrHeartbeatFingerprintTable.get(addr) &&
brokerAddrHeartbeatFingerprintTable.get(addr) == currentHeartbeatFingerprint) {
- heartbeatV2Result =
this.mQClientAPIImpl.sendHeartbeatV2(addr, heartbeatDataWithoutSub,
clientConfig.getMqClientApiTimeout());
- if (heartbeatV2Result.isSubChange()) {
- brokerAddrHeartbeatFingerprintTable.remove(addr);
- }
- log.info("sendHeartbeatToAllBrokerV2 simple
brokerName: {} subChange: {} brokerAddrHeartbeatFingerprintTable: {}",
brokerName, heartbeatV2Result.isSubChange(),
JSON.toJSONString(brokerAddrHeartbeatFingerprintTable));
- } else {
- heartbeatV2Result =
this.mQClientAPIImpl.sendHeartbeatV2(addr, heartbeatDataWithSub,
clientConfig.getMqClientApiTimeout());
- if (heartbeatV2Result.isSupportV2()) {
- brokerSupportV2HeartbeatSet.add(addr);
- if (heartbeatV2Result.isSubChange()) {
-
brokerAddrHeartbeatFingerprintTable.remove(addr);
- } else if
(!brokerAddrHeartbeatFingerprintTable.containsKey(addr) ||
brokerAddrHeartbeatFingerprintTable.get(addr) != currentHeartbeatFingerprint) {
- brokerAddrHeartbeatFingerprintTable.put(addr,
currentHeartbeatFingerprint);
- }
- }
- log.info("sendHeartbeatToAllBrokerV2 normal
brokerName: {} subChange: {} brokerAddrHeartbeatFingerprintTable: {}",
brokerName, heartbeatV2Result.isSubChange(),
JSON.toJSONString(brokerAddrHeartbeatFingerprintTable));
- }
- version = heartbeatV2Result.getVersion();
- if (!this.brokerVersionTable.containsKey(brokerName)) {
- this.brokerVersionTable.put(brokerName, new
HashMap<>(4));
- }
- this.brokerVersionTable.get(brokerName).put(addr, version);
- if (times % 20 == 0) {
- log.info("send heart beat to broker[{} {} {}]
success", brokerName, id, addr);
- log.info(heartbeatDataWithSub.toString());
- }
- } catch (Exception e) {
- if (this.isBrokerInNameServer(addr)) {
- log.warn("sendHeartbeatToAllBrokerV2 send heart beat
to broker[{} {} {}] failed", brokerName, id, addr, e);
- } else {
- log.warn("sendHeartbeatToAllBrokerV2 send heart beat
to broker[{} {} {}] exception, because the broker not up, forget it",
brokerName, id, addr, e);
- }
- }
+ sendHeartbeatToBrokerV2(id, brokerName, addr,
heartbeatDataWithSub, heartbeatDataWithoutSub, currentHeartbeatFingerprint);
}
}
+ return true;
}
public boolean updateTopicRouteInfoFromNameServer(final String topic,
boolean isDefault,