This is an automated email from the ASF dual-hosted git repository.

jinrongtong pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/rocketmq.git


The following commit(s) were added to refs/heads/develop by this push:
     new d117742e3 [ISSUE #4847]Optimize 
DefaultBrokerHeartbeatManager#onBrokerHeartbeat mehtod code logic (#4848)
d117742e3 is described below

commit d117742e3c4fa8e1bf001c02512ee091bff8797e
Author: mxsm <[email protected]>
AuthorDate: Sat Aug 20 09:54:04 2022 +0800

    [ISSUE #4847]Optimize DefaultBrokerHeartbeatManager#onBrokerHeartbeat 
mehtod code logic (#4848)
---
 .../impl/DefaultBrokerHeartbeatManager.java        | 49 +++++++++++-----------
 1 file changed, 24 insertions(+), 25 deletions(-)

diff --git 
a/controller/src/main/java/org/apache/rocketmq/controller/impl/DefaultBrokerHeartbeatManager.java
 
b/controller/src/main/java/org/apache/rocketmq/controller/impl/DefaultBrokerHeartbeatManager.java
index 95cc85197..e56d97c99 100644
--- 
a/controller/src/main/java/org/apache/rocketmq/controller/impl/DefaultBrokerHeartbeatManager.java
+++ 
b/controller/src/main/java/org/apache/rocketmq/controller/impl/DefaultBrokerHeartbeatManager.java
@@ -22,6 +22,7 @@ import java.util.ArrayList;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
+import java.util.Optional;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
@@ -80,7 +81,7 @@ public class DefaultBrokerHeartbeatManager implements 
BrokerHeartbeatManager {
                         RemotingUtil.closeChannel(channel);
                     }
                     this.executor.submit(() ->
-                            
notifyBrokerInActive(next.getKey().getClusterName(), 
next.getValue().getBrokerName(), next.getKey().getBrokerAddr(), 
next.getValue().getBrokerId()));
+                        notifyBrokerInActive(next.getKey().getClusterName(), 
next.getValue().getBrokerName(), next.getKey().getBrokerAddr(), 
next.getValue().getBrokerId()));
                     log.warn("The broker channel {} expired, brokerInfo {}, 
expired {}ms", next.getValue().getChannel(), next.getKey(), timeoutMillis);
                 }
             }
@@ -102,15 +103,15 @@ public class DefaultBrokerHeartbeatManager implements 
BrokerHeartbeatManager {
 
     @Override
     public void registerBroker(String clusterName, String brokerName, String 
brokerAddr,
-                               long brokerId, Long timeoutMillis, Channel 
channel, Integer epoch, Long maxOffset) {
+        long brokerId, Long timeoutMillis, Channel channel, Integer epoch, 
Long maxOffset) {
         final BrokerAddrInfo addrInfo = new BrokerAddrInfo(clusterName, 
brokerAddr);
         final BrokerLiveInfo prevBrokerLiveInfo = 
this.brokerLiveTable.put(addrInfo,
-                new BrokerLiveInfo(brokerName,
-                        brokerAddr,
-                        brokerId,
-                        System.currentTimeMillis(),
-                        timeoutMillis == null ? 
DEFAULT_BROKER_CHANNEL_EXPIRED_TIME : timeoutMillis,
-                        channel, epoch == null ? -1 : epoch, maxOffset == null 
? -1 : maxOffset));
+            new BrokerLiveInfo(brokerName,
+                brokerAddr,
+                brokerId,
+                System.currentTimeMillis(),
+                timeoutMillis == null ? DEFAULT_BROKER_CHANNEL_EXPIRED_TIME : 
timeoutMillis,
+                channel, epoch == null ? -1 : epoch, maxOffset == null ? -1 : 
maxOffset));
         if (prevBrokerLiveInfo == null) {
             log.info("new broker registered, {}, brokerId:{}", addrInfo, 
brokerId);
         }
@@ -127,24 +128,22 @@ public class DefaultBrokerHeartbeatManager implements 
BrokerHeartbeatManager {
     }
 
     @Override
-    public void onBrokerHeartbeat(String clusterName, String brokerAddr, 
Integer epoch, Long maxOffset, Long confirmOffset) {
+    public void onBrokerHeartbeat(String clusterName, String brokerAddr, 
Integer epoch, Long maxOffset,
+        Long confirmOffset) {
         BrokerAddrInfo addrInfo = new BrokerAddrInfo(clusterName, brokerAddr);
         BrokerLiveInfo prev = this.brokerLiveTable.get(addrInfo);
-        int realEpoch = epoch == null ? -1 : epoch;
-        long realMaxOffset = maxOffset == null ? -1 : maxOffset;
-        long realConfirmOffset = confirmOffset == null ? -1 : confirmOffset;
-        if (prev != null) {
-            prev.setLastUpdateTimestamp(System.currentTimeMillis());
-            if (realEpoch > prev.getEpoch()) {
-                prev.setEpoch(realEpoch);
-                prev.setMaxOffset(realMaxOffset);
-                prev.setConfirmOffset(realConfirmOffset);
-            } else if (realEpoch == prev.getEpoch()) {
-                if (realMaxOffset > prev.getMaxOffset()) {
-                    prev.setMaxOffset(realMaxOffset);
-                    prev.setConfirmOffset(realConfirmOffset);
-                }
-            }
+        if (null == prev) {
+            return;
+        }
+        int realEpoch = Optional.ofNullable(epoch).orElse(-1);
+        long realMaxOffset = Optional.ofNullable(maxOffset).orElse(-1L);
+        long realConfirmOffset = 
Optional.ofNullable(confirmOffset).orElse(-1L);
+
+        prev.setLastUpdateTimestamp(System.currentTimeMillis());
+        if (realEpoch > prev.getEpoch() || (realEpoch == prev.getEpoch() && 
realMaxOffset > prev.getMaxOffset())) {
+            prev.setEpoch(realEpoch);
+            prev.setMaxOffset(realMaxOffset);
+            prev.setConfirmOffset(realConfirmOffset);
         }
     }
 
@@ -156,7 +155,7 @@ public class DefaultBrokerHeartbeatManager implements 
BrokerHeartbeatManager {
                 log.info("Channel {} inactive, broker {}, addr:{}, id:{}", 
entry.getValue().getChannel(), entry.getValue().getBrokerName(), 
entry.getKey().getBrokerAddr(), entry.getValue().getBrokerId());
                 addrInfo = entry.getKey();
                 this.executor.submit(() ->
-                        notifyBrokerInActive(entry.getKey().getClusterName(), 
entry.getValue().getBrokerName(), entry.getKey().getBrokerAddr(), 
entry.getValue().getBrokerId()));
+                    notifyBrokerInActive(entry.getKey().getClusterName(), 
entry.getValue().getBrokerName(), entry.getKey().getBrokerAddr(), 
entry.getValue().getBrokerId()));
                 break;
             }
         }

Reply via email to