This is an automated email from the ASF dual-hosted git repository.

jinrongtong pushed a commit to branch dledger-controller-brokerId
in repository https://gitbox.apache.org/repos/asf/rocketmq.git

commit bf9e6b6e7f51364d9bad1cab61f6380ca5e8ce3f
Merge: c5474b33a e2a341e7f
Author: RongtongJin <[email protected]>
AuthorDate: Wed Feb 1 16:08:24 2023 +0800

    Merge branch 'develop' into dledger-controller-brokerId
    
    # Conflicts:
    #       
controller/src/main/java/org/apache/rocketmq/controller/impl/manager/ReplicasInfoManager.java

 .../apache/rocketmq/broker/BrokerController.java   |   2 +-
 .../broker/controller/ReplicasManager.java         |  66 +++++-
 .../apache/rocketmq/broker/out/BrokerOuterAPI.java |   8 +-
 .../broker/processor/PopMessageProcessor.java      |  18 +-
 .../apache/rocketmq/broker/BrokerOuterAPITest.java |   2 +-
 .../broker/controller/ReplicasManagerTest.java     |   1 +
 .../rocketmq/client/impl/MQClientAPIImpl.java      |   6 +-
 .../consumer/DefaultLitePullConsumerTest.java      | 226 ++++++++++++---------
 .../org/apache/rocketmq/common/BrokerConfig.java   |  10 +
 .../impl/manager/ReplicasInfoManager.java          |  24 ++-
 pom.xml                                            |   4 +-
 .../apache/rocketmq/remoting/RemotingClient.java   |   2 +
 .../remoting/netty/NettyRemotingClient.java        |  29 ++-
 ...nSyncStateData.java => BrokerReplicasInfo.java} |  60 +++---
 .../statictopic/TopicQueueMappingInfo.java         |  16 --
 .../rocketmq/tools/admin/DefaultMQAdminExt.java    |   4 +-
 .../tools/admin/DefaultMQAdminExtImpl.java         |   4 +-
 .../apache/rocketmq/tools/admin/MQAdminExt.java    |   4 +-
 .../command/ha/GetSyncStateSetSubCommand.java      |  23 ++-
 19 files changed, 319 insertions(+), 190 deletions(-)

diff --cc 
broker/src/main/java/org/apache/rocketmq/broker/out/BrokerOuterAPI.java
index 83f798474,689e060d8..6975e0fee
--- a/broker/src/main/java/org/apache/rocketmq/broker/out/BrokerOuterAPI.java
+++ b/broker/src/main/java/org/apache/rocketmq/broker/out/BrokerOuterAPI.java
@@@ -206,9 -201,13 +206,13 @@@ public class BrokerOuterAPI 
          return addressList;
      }
  
+     public boolean checkAddressReachable(String address) {
+         return this.remotingClient.isAddressReachable(address);
+     }
+ 
      public void updateNameServerAddressList(final String addrs) {
          String[] addrArray = addrs.split(";");
 -        List<String> lst = new ArrayList<>(Arrays.asList(addrArray));
 +        List<String> lst = new ArrayList<String>(Arrays.asList(addrArray));
          this.remotingClient.updateNameServerAddressList(lst);
      }
  
diff --cc 
broker/src/test/java/org/apache/rocketmq/broker/controller/ReplicasManagerTest.java
index 1414778d4,ca7df5691..b8a6156d3
--- 
a/broker/src/test/java/org/apache/rocketmq/broker/controller/ReplicasManagerTest.java
+++ 
b/broker/src/test/java/org/apache/rocketmq/broker/controller/ReplicasManagerTest.java
@@@ -128,9 -123,9 +128,10 @@@ public class ReplicasManagerTest 
          when(brokerController.getBrokerOuterAPI()).thenReturn(brokerOuterAPI);
          when(brokerController.getBrokerAddr()).thenReturn(OLD_MASTER_ADDRESS);
          
when(brokerOuterAPI.getControllerMetaData(any())).thenReturn(getMetaDataResponseHeader);
+         when(brokerOuterAPI.checkAddressReachable(any())).thenReturn(true);
          when(brokerOuterAPI.registerBrokerToController(any(), any(), any(), 
any(), anyLong(), anyInt(), anyLong(), 
anyInt())).thenReturn(registerBrokerToControllerResponseHeader);
          when(brokerOuterAPI.getReplicaInfo(any(), any(), 
any())).thenReturn(result);
 +        when(brokerOuterAPI.brokerElect(any(), any(), any(), 
any())).thenReturn(brokerTryElectResponseHeader);
          replicasManager = new ReplicasManager(brokerController);
          autoSwitchHAService.init(defaultMessageStore);
          replicasManager.start();
diff --cc 
controller/src/main/java/org/apache/rocketmq/controller/impl/manager/ReplicasInfoManager.java
index 7b57bff9b,1c5a805b6..c9c5e0426
--- 
a/controller/src/main/java/org/apache/rocketmq/controller/impl/manager/ReplicasInfoManager.java
+++ 
b/controller/src/main/java/org/apache/rocketmq/controller/impl/manager/ReplicasInfoManager.java
@@@ -315,20 -327,26 +315,26 @@@ public class ReplicasInfoManager 
              if (isContainsBroker(brokerName)) {
                  // If exist broker metadata, just return metadata
                  final SyncStateInfo syncStateInfo = 
this.syncStateSetInfoTable.get(brokerName);
 -                final BrokerInfo brokerInfo = 
this.replicaInfoTable.get(brokerName);
 +                final BrokerReplicaInfo brokerReplicaInfo = 
this.replicaInfoTable.get(brokerName);
                  final Set<String> syncStateSet = 
syncStateInfo.getSyncStateSet();
                  final String master = syncStateInfo.getMasterAddress();
-                 final ArrayList<InSyncStateData.InSyncMember> inSyncMembers = 
new ArrayList<>();
-                 syncStateSet.forEach(replicas -> {
-                     long brokerId = StringUtils.equals(master, replicas) ? 
MixAll.MASTER_ID : brokerReplicaInfo.getBrokerId(replicas);
-                     inSyncMembers.add(new 
InSyncStateData.InSyncMember(replicas, brokerId));
+                 final ArrayList<BrokerReplicasInfo.ReplicaIdentity> 
inSyncReplicas = new ArrayList<>();
+                 final ArrayList<BrokerReplicasInfo.ReplicaIdentity> 
notInSyncReplicas = new ArrayList<>();
+ 
+                 brokerInfo.getBrokerIdTable().forEach((brokerAddress, 
brokerId) -> {
+                     if (syncStateSet.contains(brokerAddress)) {
+                         long id = StringUtils.equals(master, brokerAddress) ? 
MixAll.MASTER_ID : brokerInfo.getBrokerId(brokerAddress);
+                         inSyncReplicas.add(new 
BrokerReplicasInfo.ReplicaIdentity(brokerAddress, id));
+                     } else {
+                         notInSyncReplicas.add(new 
BrokerReplicasInfo.ReplicaIdentity(brokerAddress, brokerId));
+                     }
                  });
  
-                 final InSyncStateData.InSyncStateSet inSyncState = new 
InSyncStateData.InSyncStateSet(master, syncStateInfo.getMasterEpoch(), 
syncStateInfo.getSyncStateSetEpoch(), inSyncMembers);
-                 inSyncStateData.addInSyncState(brokerName, inSyncState);
+                 final BrokerReplicasInfo.ReplicasInfo inSyncState = new 
BrokerReplicasInfo.ReplicasInfo(master, syncStateInfo.getMasterEpoch(), 
syncStateInfo.getSyncStateSetEpoch(), inSyncReplicas, notInSyncReplicas);
+                 brokerReplicasInfo.addReplicaInfo(brokerName, inSyncState);
              }
          }
-         result.setBody(inSyncStateData.encode());
+         result.setBody(brokerReplicasInfo.encode());
          return result;
      }
  

Reply via email to