This is an automated email from the ASF dual-hosted git repository.
jinrongtong pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/rocketmq.git
The following commit(s) were added to refs/heads/develop by this push:
new d117742e3 [ISSUE #4847]Optimize
DefaultBrokerHeartbeatManager#onBrokerHeartbeat mehtod code logic (#4848)
d117742e3 is described below
commit d117742e3c4fa8e1bf001c02512ee091bff8797e
Author: mxsm <[email protected]>
AuthorDate: Sat Aug 20 09:54:04 2022 +0800
[ISSUE #4847]Optimize DefaultBrokerHeartbeatManager#onBrokerHeartbeat
mehtod code logic (#4848)
---
.../impl/DefaultBrokerHeartbeatManager.java | 49 +++++++++++-----------
1 file changed, 24 insertions(+), 25 deletions(-)
diff --git
a/controller/src/main/java/org/apache/rocketmq/controller/impl/DefaultBrokerHeartbeatManager.java
b/controller/src/main/java/org/apache/rocketmq/controller/impl/DefaultBrokerHeartbeatManager.java
index 95cc85197..e56d97c99 100644
---
a/controller/src/main/java/org/apache/rocketmq/controller/impl/DefaultBrokerHeartbeatManager.java
+++
b/controller/src/main/java/org/apache/rocketmq/controller/impl/DefaultBrokerHeartbeatManager.java
@@ -22,6 +22,7 @@ import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
+import java.util.Optional;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
@@ -80,7 +81,7 @@ public class DefaultBrokerHeartbeatManager implements
BrokerHeartbeatManager {
RemotingUtil.closeChannel(channel);
}
this.executor.submit(() ->
-
notifyBrokerInActive(next.getKey().getClusterName(),
next.getValue().getBrokerName(), next.getKey().getBrokerAddr(),
next.getValue().getBrokerId()));
+ notifyBrokerInActive(next.getKey().getClusterName(),
next.getValue().getBrokerName(), next.getKey().getBrokerAddr(),
next.getValue().getBrokerId()));
log.warn("The broker channel {} expired, brokerInfo {},
expired {}ms", next.getValue().getChannel(), next.getKey(), timeoutMillis);
}
}
@@ -102,15 +103,15 @@ public class DefaultBrokerHeartbeatManager implements
BrokerHeartbeatManager {
@Override
public void registerBroker(String clusterName, String brokerName, String
brokerAddr,
- long brokerId, Long timeoutMillis, Channel
channel, Integer epoch, Long maxOffset) {
+ long brokerId, Long timeoutMillis, Channel channel, Integer epoch,
Long maxOffset) {
final BrokerAddrInfo addrInfo = new BrokerAddrInfo(clusterName,
brokerAddr);
final BrokerLiveInfo prevBrokerLiveInfo =
this.brokerLiveTable.put(addrInfo,
- new BrokerLiveInfo(brokerName,
- brokerAddr,
- brokerId,
- System.currentTimeMillis(),
- timeoutMillis == null ?
DEFAULT_BROKER_CHANNEL_EXPIRED_TIME : timeoutMillis,
- channel, epoch == null ? -1 : epoch, maxOffset == null
? -1 : maxOffset));
+ new BrokerLiveInfo(brokerName,
+ brokerAddr,
+ brokerId,
+ System.currentTimeMillis(),
+ timeoutMillis == null ? DEFAULT_BROKER_CHANNEL_EXPIRED_TIME :
timeoutMillis,
+ channel, epoch == null ? -1 : epoch, maxOffset == null ? -1 :
maxOffset));
if (prevBrokerLiveInfo == null) {
log.info("new broker registered, {}, brokerId:{}", addrInfo,
brokerId);
}
@@ -127,24 +128,22 @@ public class DefaultBrokerHeartbeatManager implements
BrokerHeartbeatManager {
}
@Override
- public void onBrokerHeartbeat(String clusterName, String brokerAddr,
Integer epoch, Long maxOffset, Long confirmOffset) {
+ public void onBrokerHeartbeat(String clusterName, String brokerAddr,
Integer epoch, Long maxOffset,
+ Long confirmOffset) {
BrokerAddrInfo addrInfo = new BrokerAddrInfo(clusterName, brokerAddr);
BrokerLiveInfo prev = this.brokerLiveTable.get(addrInfo);
- int realEpoch = epoch == null ? -1 : epoch;
- long realMaxOffset = maxOffset == null ? -1 : maxOffset;
- long realConfirmOffset = confirmOffset == null ? -1 : confirmOffset;
- if (prev != null) {
- prev.setLastUpdateTimestamp(System.currentTimeMillis());
- if (realEpoch > prev.getEpoch()) {
- prev.setEpoch(realEpoch);
- prev.setMaxOffset(realMaxOffset);
- prev.setConfirmOffset(realConfirmOffset);
- } else if (realEpoch == prev.getEpoch()) {
- if (realMaxOffset > prev.getMaxOffset()) {
- prev.setMaxOffset(realMaxOffset);
- prev.setConfirmOffset(realConfirmOffset);
- }
- }
+ if (null == prev) {
+ return;
+ }
+ int realEpoch = Optional.ofNullable(epoch).orElse(-1);
+ long realMaxOffset = Optional.ofNullable(maxOffset).orElse(-1L);
+ long realConfirmOffset =
Optional.ofNullable(confirmOffset).orElse(-1L);
+
+ prev.setLastUpdateTimestamp(System.currentTimeMillis());
+ if (realEpoch > prev.getEpoch() || (realEpoch == prev.getEpoch() &&
realMaxOffset > prev.getMaxOffset())) {
+ prev.setEpoch(realEpoch);
+ prev.setMaxOffset(realMaxOffset);
+ prev.setConfirmOffset(realConfirmOffset);
}
}
@@ -156,7 +155,7 @@ public class DefaultBrokerHeartbeatManager implements
BrokerHeartbeatManager {
log.info("Channel {} inactive, broker {}, addr:{}, id:{}",
entry.getValue().getChannel(), entry.getValue().getBrokerName(),
entry.getKey().getBrokerAddr(), entry.getValue().getBrokerId());
addrInfo = entry.getKey();
this.executor.submit(() ->
- notifyBrokerInActive(entry.getKey().getClusterName(),
entry.getValue().getBrokerName(), entry.getKey().getBrokerAddr(),
entry.getValue().getBrokerId()));
+ notifyBrokerInActive(entry.getKey().getClusterName(),
entry.getValue().getBrokerName(), entry.getKey().getBrokerAddr(),
entry.getValue().getBrokerId()));
break;
}
}