This is an automated email from the ASF dual-hosted git repository. jinrongtong pushed a commit to branch dledger-controller-brokerId in repository https://gitbox.apache.org/repos/asf/rocketmq.git
commit 56578f99fbde9c759d095ef5fea24ab1f5d655de Author: TheR1sing3un <[email protected]> AuthorDate: Sun Feb 5 14:54:05 2023 +0800 feat(controller): implement logic about dealing with UpdateBrokerAddress event 1. implement logic about dealing with UpdateBrokerAddress event --- .../controller/impl/manager/BrokerReplicaInfo.java | 7 +++++++ .../controller/impl/manager/ReplicasInfoManager.java | 18 ++++++++++++------ 2 files changed, 19 insertions(+), 6 deletions(-) diff --git a/controller/src/main/java/org/apache/rocketmq/controller/impl/manager/BrokerReplicaInfo.java b/controller/src/main/java/org/apache/rocketmq/controller/impl/manager/BrokerReplicaInfo.java index abfaf275c..24e67bf1e 100644 --- a/controller/src/main/java/org/apache/rocketmq/controller/impl/manager/BrokerReplicaInfo.java +++ b/controller/src/main/java/org/apache/rocketmq/controller/impl/manager/BrokerReplicaInfo.java @@ -93,4 +93,11 @@ public class BrokerReplicaInfo { } return null; } + + public void updateBrokerAddress(final Long brokerId, final String brokerAddress) { + Pair<String, String> oldPair = this.brokerIdInfo.get(brokerId); + if (oldPair != null) { + this.brokerIdInfo.put(brokerId, new Pair<>(brokerAddress, oldPair.getObject2())); + } + } } diff --git a/controller/src/main/java/org/apache/rocketmq/controller/impl/manager/ReplicasInfoManager.java b/controller/src/main/java/org/apache/rocketmq/controller/impl/manager/ReplicasInfoManager.java index 7eca573cf..1400932e0 100644 --- a/controller/src/main/java/org/apache/rocketmq/controller/impl/manager/ReplicasInfoManager.java +++ b/controller/src/main/java/org/apache/rocketmq/controller/impl/manager/ReplicasInfoManager.java @@ -22,7 +22,6 @@ import java.util.HashSet; import java.util.List; import java.util.Map; import java.util.Set; -import java.util.function.BiPredicate; import java.util.stream.Collectors; import java.util.stream.Stream; @@ -57,12 +56,9 @@ import org.apache.rocketmq.remoting.protocol.header.controller.register.ApplyBro import org.apache.rocketmq.remoting.protocol.header.controller.register.ApplyBrokerIdResponseHeader; import org.apache.rocketmq.remoting.protocol.header.controller.register.GetNextBrokerIdRequestHeader; import org.apache.rocketmq.remoting.protocol.header.controller.register.GetNextBrokerIdResponseHeader; -import org.apache.rocketmq.remoting.protocol.header.controller.register.RegisterBrokerToControllerRequestHeader; -import org.apache.rocketmq.remoting.protocol.header.controller.register.RegisterBrokerToControllerResponseHeader; import org.apache.rocketmq.remoting.protocol.header.controller.register.RegisterSuccessRequestHeader; import org.apache.rocketmq.remoting.protocol.header.controller.register.RegisterSuccessResponseHeader; -import javax.naming.ldap.Control; /** * The manager that manages the replicas info for all brokers. We can think of this class as the controller's memory @@ -484,6 +480,9 @@ public class ReplicasInfoManager { case CLEAN_BROKER_DATA_EVENT: handleCleanBrokerDataEvent((CleanBrokerDataEvent) event); break; + case UPDATE_BROKER_ADDRESS: + handleUpdateBrokerAddress((UpdateBrokerAddressEvent) event); + break; default: break; } @@ -509,8 +508,7 @@ public class ReplicasInfoManager { // Initialize the replicaInfo about this broker set final String clusterName = event.getClusterName(); final BrokerReplicaInfo brokerReplicaInfo = new BrokerReplicaInfo(clusterName, brokerName); - long brokerId = brokerReplicaInfo.newBrokerId(); - brokerReplicaInfo.addBroker(brokerId, event.getBrokerAddress(), event.getRegisterCheckCode()); + brokerReplicaInfo.addBroker(event.getNewBrokerId(), event.getBrokerAddress(), event.getRegisterCheckCode()); this.replicaInfoTable.put(brokerName, brokerReplicaInfo); final SyncStateInfo syncStateInfo = new SyncStateInfo(clusterName, brokerName); // Initialize an empty syncStateInfo for this broker set @@ -518,6 +516,14 @@ public class ReplicasInfoManager { } } + private void handleUpdateBrokerAddress(final UpdateBrokerAddressEvent event) { + final String brokerName = event.getBrokerName(); + final String brokerAddress = event.getBrokerAddress(); + final Long brokerId = event.getBrokerId(); + BrokerReplicaInfo brokerReplicaInfo = this.replicaInfoTable.get(brokerName); + brokerReplicaInfo.updateBrokerAddress(brokerId, brokerAddress); + } + private void handleElectMaster(final ElectMasterEvent event) { final String brokerName = event.getBrokerName(); final Long newMaster = event.getNewMasterBrokerId();
