RongtongJin commented on code in PR #6724:
URL: https://github.com/apache/rocketmq/pull/6724#discussion_r1225681656
##########
client/src/main/java/org/apache/rocketmq/client/impl/factory/MQClientInstance.java:
##########
@@ -563,6 +589,83 @@ private void sendHeartbeatToAllBroker() {
}
}
+ private void sendHeartbeatToAllBrokerV2(boolean isRebalance) {
+ final HeartbeatData heartbeatDataWithSub =
this.prepareHeartbeatData(false);
+ final boolean producerEmpty =
heartbeatDataWithSub.getProducerDataSet().isEmpty();
+ final boolean consumerEmpty =
heartbeatDataWithSub.getConsumerDataSet().isEmpty();
+ if (producerEmpty && consumerEmpty) {
+ log.warn("sendHeartbeatToAllBrokerV2 sending heartbeat, but no
consumer and no producer. [{}]", this.clientId);
+ return;
+ }
+ if (this.brokerAddrTable.isEmpty()) {
+ return;
+ }
+ if (isRebalance) {
+ resetBrokerAddrHeartbeatFingerprintMap();
+ }
+ long times = this.sendHeartbeatTimesTotal.getAndIncrement();
+ int currentHeartbeatFingerprint =
heartbeatDataWithSub.computeHeartbeatFingerprint();
+
heartbeatDataWithSub.setHeartbeatFingerprint(currentHeartbeatFingerprint);
+ HeartbeatData heartbeatDataWithoutSub =
this.prepareHeartbeatData(true);
+
heartbeatDataWithoutSub.setHeartbeatFingerprint(currentHeartbeatFingerprint);
+
+ for (Entry<String, HashMap<Long, String>> brokerClusterInfo :
this.brokerAddrTable.entrySet()) {
+ String brokerName = brokerClusterInfo.getKey();
+ HashMap<Long, String> oneTable = brokerClusterInfo.getValue();
+ if (oneTable == null) {
+ continue;
+ }
+ for (Entry<Long, String> singleBrokerInstance :
oneTable.entrySet()) {
+ Long id = singleBrokerInstance.getKey();
+ String addr = singleBrokerInstance.getValue();
+ if (addr == null) {
+ continue;
+ }
+ if (consumerEmpty && MixAll.MASTER_ID != id) {
+ continue;
+ }
+ try {
+ int version = 0;
+ boolean isBrokerSupportV2 =
brokerSupportV2HeartbeatSet.contains(addr);
+ HeartbeatV2Result heartbeatV2Result = null;
+ if (isBrokerSupportV2 && null !=
brokerAddrHeartbeatFingerprintTable.get(addr) &&
brokerAddrHeartbeatFingerprintTable.get(addr) == currentHeartbeatFingerprint) {
+ heartbeatV2Result =
this.mQClientAPIImpl.sendHeartbeatV2(addr, heartbeatDataWithoutSub, 3000);
Review Comment:
How to replace 3000 with clientConfig.getMqClientApiTimeout()?
##########
client/src/main/java/org/apache/rocketmq/client/impl/factory/MQClientInstance.java:
##########
@@ -563,6 +589,83 @@ private void sendHeartbeatToAllBroker() {
}
}
+ private void sendHeartbeatToAllBrokerV2(boolean isRebalance) {
+ final HeartbeatData heartbeatDataWithSub =
this.prepareHeartbeatData(false);
+ final boolean producerEmpty =
heartbeatDataWithSub.getProducerDataSet().isEmpty();
+ final boolean consumerEmpty =
heartbeatDataWithSub.getConsumerDataSet().isEmpty();
+ if (producerEmpty && consumerEmpty) {
+ log.warn("sendHeartbeatToAllBrokerV2 sending heartbeat, but no
consumer and no producer. [{}]", this.clientId);
+ return;
+ }
+ if (this.brokerAddrTable.isEmpty()) {
+ return;
+ }
+ if (isRebalance) {
+ resetBrokerAddrHeartbeatFingerprintMap();
+ }
+ long times = this.sendHeartbeatTimesTotal.getAndIncrement();
+ int currentHeartbeatFingerprint =
heartbeatDataWithSub.computeHeartbeatFingerprint();
+
heartbeatDataWithSub.setHeartbeatFingerprint(currentHeartbeatFingerprint);
+ HeartbeatData heartbeatDataWithoutSub =
this.prepareHeartbeatData(true);
+
heartbeatDataWithoutSub.setHeartbeatFingerprint(currentHeartbeatFingerprint);
+
+ for (Entry<String, HashMap<Long, String>> brokerClusterInfo :
this.brokerAddrTable.entrySet()) {
+ String brokerName = brokerClusterInfo.getKey();
+ HashMap<Long, String> oneTable = brokerClusterInfo.getValue();
+ if (oneTable == null) {
+ continue;
+ }
+ for (Entry<Long, String> singleBrokerInstance :
oneTable.entrySet()) {
+ Long id = singleBrokerInstance.getKey();
+ String addr = singleBrokerInstance.getValue();
+ if (addr == null) {
+ continue;
+ }
+ if (consumerEmpty && MixAll.MASTER_ID != id) {
+ continue;
+ }
+ try {
+ int version = 0;
+ boolean isBrokerSupportV2 =
brokerSupportV2HeartbeatSet.contains(addr);
+ HeartbeatV2Result heartbeatV2Result = null;
+ if (isBrokerSupportV2 && null !=
brokerAddrHeartbeatFingerprintTable.get(addr) &&
brokerAddrHeartbeatFingerprintTable.get(addr) == currentHeartbeatFingerprint) {
+ heartbeatV2Result =
this.mQClientAPIImpl.sendHeartbeatV2(addr, heartbeatDataWithoutSub, 3000);
+ if (heartbeatV2Result.isSubChange()) {
+ brokerAddrHeartbeatFingerprintTable.remove(addr);
+ }
+ log.info("sendHeartbeatToAllBrokerV2 simple
brokerName: {} subChange: {} brokerAddrHeartbeatFingerprintTable: {}",
brokerName, heartbeatV2Result.isSubChange(),
JSON.toJSONString(brokerAddrHeartbeatFingerprintTable));
+ } else {
+ heartbeatV2Result =
this.mQClientAPIImpl.sendHeartbeatV2(addr, heartbeatDataWithSub, 3000);
Review Comment:
Same as above
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]