This is an automated email from the ASF dual-hosted git repository. jinrongtong pushed a commit to branch dledger-controller-brokerId in repository https://gitbox.apache.org/repos/asf/rocketmq.git
commit bf9e6b6e7f51364d9bad1cab61f6380ca5e8ce3f Merge: c5474b33a e2a341e7f Author: RongtongJin <[email protected]> AuthorDate: Wed Feb 1 16:08:24 2023 +0800 Merge branch 'develop' into dledger-controller-brokerId # Conflicts: # controller/src/main/java/org/apache/rocketmq/controller/impl/manager/ReplicasInfoManager.java .../apache/rocketmq/broker/BrokerController.java | 2 +- .../broker/controller/ReplicasManager.java | 66 +++++- .../apache/rocketmq/broker/out/BrokerOuterAPI.java | 8 +- .../broker/processor/PopMessageProcessor.java | 18 +- .../apache/rocketmq/broker/BrokerOuterAPITest.java | 2 +- .../broker/controller/ReplicasManagerTest.java | 1 + .../rocketmq/client/impl/MQClientAPIImpl.java | 6 +- .../consumer/DefaultLitePullConsumerTest.java | 226 ++++++++++++--------- .../org/apache/rocketmq/common/BrokerConfig.java | 10 + .../impl/manager/ReplicasInfoManager.java | 24 ++- pom.xml | 4 +- .../apache/rocketmq/remoting/RemotingClient.java | 2 + .../remoting/netty/NettyRemotingClient.java | 29 ++- ...nSyncStateData.java => BrokerReplicasInfo.java} | 60 +++--- .../statictopic/TopicQueueMappingInfo.java | 16 -- .../rocketmq/tools/admin/DefaultMQAdminExt.java | 4 +- .../tools/admin/DefaultMQAdminExtImpl.java | 4 +- .../apache/rocketmq/tools/admin/MQAdminExt.java | 4 +- .../command/ha/GetSyncStateSetSubCommand.java | 23 ++- 19 files changed, 319 insertions(+), 190 deletions(-) diff --cc broker/src/main/java/org/apache/rocketmq/broker/out/BrokerOuterAPI.java index 83f798474,689e060d8..6975e0fee --- a/broker/src/main/java/org/apache/rocketmq/broker/out/BrokerOuterAPI.java +++ b/broker/src/main/java/org/apache/rocketmq/broker/out/BrokerOuterAPI.java @@@ -206,9 -201,13 +206,13 @@@ public class BrokerOuterAPI return addressList; } + public boolean checkAddressReachable(String address) { + return this.remotingClient.isAddressReachable(address); + } + public void updateNameServerAddressList(final String addrs) { String[] addrArray = addrs.split(";"); - List<String> lst = new ArrayList<>(Arrays.asList(addrArray)); + List<String> lst = new ArrayList<String>(Arrays.asList(addrArray)); this.remotingClient.updateNameServerAddressList(lst); } diff --cc broker/src/test/java/org/apache/rocketmq/broker/controller/ReplicasManagerTest.java index 1414778d4,ca7df5691..b8a6156d3 --- a/broker/src/test/java/org/apache/rocketmq/broker/controller/ReplicasManagerTest.java +++ b/broker/src/test/java/org/apache/rocketmq/broker/controller/ReplicasManagerTest.java @@@ -128,9 -123,9 +128,10 @@@ public class ReplicasManagerTest when(brokerController.getBrokerOuterAPI()).thenReturn(brokerOuterAPI); when(brokerController.getBrokerAddr()).thenReturn(OLD_MASTER_ADDRESS); when(brokerOuterAPI.getControllerMetaData(any())).thenReturn(getMetaDataResponseHeader); + when(brokerOuterAPI.checkAddressReachable(any())).thenReturn(true); when(brokerOuterAPI.registerBrokerToController(any(), any(), any(), any(), anyLong(), anyInt(), anyLong(), anyInt())).thenReturn(registerBrokerToControllerResponseHeader); when(brokerOuterAPI.getReplicaInfo(any(), any(), any())).thenReturn(result); + when(brokerOuterAPI.brokerElect(any(), any(), any(), any())).thenReturn(brokerTryElectResponseHeader); replicasManager = new ReplicasManager(brokerController); autoSwitchHAService.init(defaultMessageStore); replicasManager.start(); diff --cc controller/src/main/java/org/apache/rocketmq/controller/impl/manager/ReplicasInfoManager.java index 7b57bff9b,1c5a805b6..c9c5e0426 --- a/controller/src/main/java/org/apache/rocketmq/controller/impl/manager/ReplicasInfoManager.java +++ b/controller/src/main/java/org/apache/rocketmq/controller/impl/manager/ReplicasInfoManager.java @@@ -315,20 -327,26 +315,26 @@@ public class ReplicasInfoManager if (isContainsBroker(brokerName)) { // If exist broker metadata, just return metadata final SyncStateInfo syncStateInfo = this.syncStateSetInfoTable.get(brokerName); - final BrokerInfo brokerInfo = this.replicaInfoTable.get(brokerName); + final BrokerReplicaInfo brokerReplicaInfo = this.replicaInfoTable.get(brokerName); final Set<String> syncStateSet = syncStateInfo.getSyncStateSet(); final String master = syncStateInfo.getMasterAddress(); - final ArrayList<InSyncStateData.InSyncMember> inSyncMembers = new ArrayList<>(); - syncStateSet.forEach(replicas -> { - long brokerId = StringUtils.equals(master, replicas) ? MixAll.MASTER_ID : brokerReplicaInfo.getBrokerId(replicas); - inSyncMembers.add(new InSyncStateData.InSyncMember(replicas, brokerId)); + final ArrayList<BrokerReplicasInfo.ReplicaIdentity> inSyncReplicas = new ArrayList<>(); + final ArrayList<BrokerReplicasInfo.ReplicaIdentity> notInSyncReplicas = new ArrayList<>(); + + brokerInfo.getBrokerIdTable().forEach((brokerAddress, brokerId) -> { + if (syncStateSet.contains(brokerAddress)) { + long id = StringUtils.equals(master, brokerAddress) ? MixAll.MASTER_ID : brokerInfo.getBrokerId(brokerAddress); + inSyncReplicas.add(new BrokerReplicasInfo.ReplicaIdentity(brokerAddress, id)); + } else { + notInSyncReplicas.add(new BrokerReplicasInfo.ReplicaIdentity(brokerAddress, brokerId)); + } }); - final InSyncStateData.InSyncStateSet inSyncState = new InSyncStateData.InSyncStateSet(master, syncStateInfo.getMasterEpoch(), syncStateInfo.getSyncStateSetEpoch(), inSyncMembers); - inSyncStateData.addInSyncState(brokerName, inSyncState); + final BrokerReplicasInfo.ReplicasInfo inSyncState = new BrokerReplicasInfo.ReplicasInfo(master, syncStateInfo.getMasterEpoch(), syncStateInfo.getSyncStateSetEpoch(), inSyncReplicas, notInSyncReplicas); + brokerReplicasInfo.addReplicaInfo(brokerName, inSyncState); } } - result.setBody(inSyncStateData.encode()); + result.setBody(brokerReplicasInfo.encode()); return result; }
