This is an automated email from the ASF dual-hosted git repository.

jinrongtong pushed a commit to branch dledger-controller-brokerId
in repository https://gitbox.apache.org/repos/asf/rocketmq.git

commit c1bc01263e7675ed0506e38c4b09924e63957e9e
Author: TheR1sing3un <[email protected]>
AuthorDate: Sun Feb 5 14:45:46 2023 +0800

    feat(controller): implement the general register to controller protocol in 
controller side
    
    1. implement the general register to controller protocol in controller
    side
---
 .../broker/controller/ReplicasManager.java         |   2 +-
 .../apache/rocketmq/broker/out/BrokerOuterAPI.java |   4 +-
 .../org/apache/rocketmq/controller/Controller.java |  11 +-
 .../controller/impl/DLedgerController.java         |  36 +++--
 .../rocketmq/controller/impl/event/EventType.java  |   4 +-
 .../impl/event/UpdateBrokerAddressEvent.java       |  45 +++---
 .../controller/impl/manager/BrokerReplicaInfo.java |  12 +-
 .../impl/manager/ReplicasInfoManager.java          | 157 ++++++++++++++++-----
 .../processor/ControllerRequestProcessor.java      |  31 ++++
 .../controller/impl/DLedgerControllerTest.java     |   6 +-
 .../register/ApplyBrokerIdRequestHeader.java       |  16 +++
 .../register/ApplyBrokerIdResponseHeader.java      |   1 -
 .../register/GetNextBrokerIdRequestHeader.java     |   8 ++
 .../register/GetNextBrokerIdResponseHeader.java    |   8 ++
 .../RegisterBrokerToControllerRequestHeader.java   |  12 +-
 .../register/RegisterSuccessRequestHeader.java     |  16 +++
 .../register/RegisterSuccessResponseHeader.java    |  36 +++++
 17 files changed, 331 insertions(+), 74 deletions(-)

diff --git 
a/broker/src/main/java/org/apache/rocketmq/broker/controller/ReplicasManager.java
 
b/broker/src/main/java/org/apache/rocketmq/broker/controller/ReplicasManager.java
index e3c9382f8..b2b4a9163 100644
--- 
a/broker/src/main/java/org/apache/rocketmq/broker/controller/ReplicasManager.java
+++ 
b/broker/src/main/java/org/apache/rocketmq/broker/controller/ReplicasManager.java
@@ -369,7 +369,7 @@ public class ReplicasManager {
         // Register this broker to controller to get a stable and credible 
broker id, and persist metadata to local file.
         try {
             final RegisterBrokerToControllerResponseHeader registerResponse = 
this.brokerOuterAPI.registerBrokerToController(this.controllerLeaderAddress,
-                this.brokerConfig.getBrokerClusterName(), 
this.brokerConfig.getBrokerName(), this.localAddress, 
this.brokerConfig.getControllerHeartBeatTimeoutMills(),
+                this.brokerConfig.getBrokerClusterName(), 
this.brokerConfig.getBrokerName(), this.localAddress, this.brokerId, 
this.brokerConfig.getControllerHeartBeatTimeoutMills(),
                 this.haService.getLastEpoch(), 
this.brokerController.getMessageStore().getMaxPhyOffset(), 
this.brokerConfig.getBrokerElectionPriority());
             final String newMasterAddress = 
registerResponse.getMasterAddress();
             if (StringUtils.isNoneEmpty(newMasterAddress)) {
diff --git 
a/broker/src/main/java/org/apache/rocketmq/broker/out/BrokerOuterAPI.java 
b/broker/src/main/java/org/apache/rocketmq/broker/out/BrokerOuterAPI.java
index 82e9ea33e..054f1edaa 100644
--- a/broker/src/main/java/org/apache/rocketmq/broker/out/BrokerOuterAPI.java
+++ b/broker/src/main/java/org/apache/rocketmq/broker/out/BrokerOuterAPI.java
@@ -1202,10 +1202,10 @@ public class BrokerOuterAPI {
      */
     public RegisterBrokerToControllerResponseHeader registerBrokerToController(
         final String controllerAddress, final String clusterName,
-        final String brokerName, final String address, final long 
controllerHeartbeatTimeoutMills, final int epoch,
+        final String brokerName, final String brokerAddress, final Long 
brokerId, final long controllerHeartbeatTimeoutMills, final int epoch,
         final long maxOffset, final int electionPriority) throws Exception {
 
-        final RegisterBrokerToControllerRequestHeader requestHeader = new 
RegisterBrokerToControllerRequestHeader(clusterName, brokerName, address, 
controllerHeartbeatTimeoutMills, epoch, maxOffset, electionPriority);
+        final RegisterBrokerToControllerRequestHeader requestHeader = new 
RegisterBrokerToControllerRequestHeader(clusterName, brokerName, brokerAddress, 
brokerId, controllerHeartbeatTimeoutMills, epoch, maxOffset, electionPriority);
         final RemotingCommand request = 
RemotingCommand.createRequestCommand(RequestCode.CONTROLLER_REGISTER_BROKER, 
requestHeader);
         final RemotingCommand response = 
this.remotingClient.invokeSync(controllerAddress, request, 3000);
         assert response != null;
diff --git 
a/controller/src/main/java/org/apache/rocketmq/controller/Controller.java 
b/controller/src/main/java/org/apache/rocketmq/controller/Controller.java
index 963f35058..5c0402dad 100644
--- a/controller/src/main/java/org/apache/rocketmq/controller/Controller.java
+++ b/controller/src/main/java/org/apache/rocketmq/controller/Controller.java
@@ -26,7 +26,10 @@ import 
org.apache.rocketmq.remoting.protocol.header.controller.AlterSyncStateSet
 import 
org.apache.rocketmq.remoting.protocol.header.controller.admin.CleanControllerBrokerDataRequestHeader;
 import 
org.apache.rocketmq.remoting.protocol.header.controller.ElectMasterRequestHeader;
 import 
org.apache.rocketmq.remoting.protocol.header.controller.GetReplicaInfoRequestHeader;
+import 
org.apache.rocketmq.remoting.protocol.header.controller.register.ApplyBrokerIdRequestHeader;
+import 
org.apache.rocketmq.remoting.protocol.header.controller.register.GetNextBrokerIdRequestHeader;
 import 
org.apache.rocketmq.remoting.protocol.header.controller.register.RegisterBrokerToControllerRequestHeader;
+import 
org.apache.rocketmq.remoting.protocol.header.controller.register.RegisterSuccessRequestHeader;
 
 /**
  * The api for controller
@@ -81,7 +84,13 @@ public interface Controller {
      * @param request RegisterBrokerRequest
      * @return RemotingCommand(RegisterBrokerResponseHeader)
      */
-    CompletableFuture<RemotingCommand> registerBroker(final 
RegisterBrokerToControllerRequestHeader request);
+    // CompletableFuture<RemotingCommand> registerBroker(final 
RegisterBrokerToControllerRequestHeader request);
+
+    CompletableFuture<RemotingCommand> getNextBrokerId(final 
GetNextBrokerIdRequestHeader request);
+
+    CompletableFuture<RemotingCommand> applyBrokerId(final 
ApplyBrokerIdRequestHeader request);
+
+    CompletableFuture<RemotingCommand> registerSuccess(final 
RegisterSuccessRequestHeader request);
 
     /**
      * Get the Replica Info for a target broker.
diff --git 
a/controller/src/main/java/org/apache/rocketmq/controller/impl/DLedgerController.java
 
b/controller/src/main/java/org/apache/rocketmq/controller/impl/DLedgerController.java
index 3f0aef746..58870cce1 100644
--- 
a/controller/src/main/java/org/apache/rocketmq/controller/impl/DLedgerController.java
+++ 
b/controller/src/main/java/org/apache/rocketmq/controller/impl/DLedgerController.java
@@ -34,7 +34,6 @@ import java.util.concurrent.Executors;
 import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.function.BiPredicate;
 import java.util.function.Supplier;
 import org.apache.rocketmq.common.ControllerConfig;
 import org.apache.rocketmq.common.ServiceThread;
@@ -43,6 +42,7 @@ import org.apache.rocketmq.common.constant.LoggerName;
 import org.apache.rocketmq.controller.Controller;
 import org.apache.rocketmq.controller.elect.ElectPolicy;
 import org.apache.rocketmq.controller.elect.impl.DefaultElectPolicy;
+import org.apache.rocketmq.controller.helper.BrokerValidPredicate;
 import org.apache.rocketmq.controller.impl.event.ControllerResult;
 import org.apache.rocketmq.controller.impl.event.EventMessage;
 import org.apache.rocketmq.controller.impl.event.EventSerializer;
@@ -62,7 +62,9 @@ import 
org.apache.rocketmq.remoting.protocol.header.controller.admin.CleanContro
 import 
org.apache.rocketmq.remoting.protocol.header.controller.ElectMasterRequestHeader;
 import 
org.apache.rocketmq.remoting.protocol.header.controller.GetMetaDataResponseHeader;
 import 
org.apache.rocketmq.remoting.protocol.header.controller.GetReplicaInfoRequestHeader;
-import 
org.apache.rocketmq.remoting.protocol.header.controller.register.RegisterBrokerToControllerRequestHeader;
+import 
org.apache.rocketmq.remoting.protocol.header.controller.register.ApplyBrokerIdRequestHeader;
+import 
org.apache.rocketmq.remoting.protocol.header.controller.register.GetNextBrokerIdRequestHeader;
+import 
org.apache.rocketmq.remoting.protocol.header.controller.register.RegisterSuccessRequestHeader;
 
 /**
  * The implementation of controller, based on DLedger (raft).
@@ -78,19 +80,20 @@ public class DLedgerController implements Controller {
     private final EventSerializer eventSerializer;
     private final RoleChangeHandler roleHandler;
     private final DLedgerControllerStateMachine statemachine;
+
     // Usr for checking whether the broker is alive
-    private BiPredicate<String, String> brokerAlivePredicate;
+    private BrokerValidPredicate brokerAlivePredicate;
     // use for elect a master
     private ElectPolicy electPolicy;
 
     private AtomicBoolean isScheduling = new AtomicBoolean(false);
 
-    public DLedgerController(final ControllerConfig config, final 
BiPredicate<String, String> brokerAlivePredicate) {
+    public DLedgerController(final ControllerConfig config, final 
BrokerValidPredicate brokerAlivePredicate) {
         this(config, brokerAlivePredicate, null, null, null, null);
     }
 
     public DLedgerController(final ControllerConfig controllerConfig,
-        final BiPredicate<String, String> brokerAlivePredicate, final 
NettyServerConfig nettyServerConfig,
+        final BrokerValidPredicate brokerAlivePredicate, final 
NettyServerConfig nettyServerConfig,
         final NettyClientConfig nettyClientConfig, final ChannelEventListener 
channelEventListener,
         final ElectPolicy electPolicy) {
         this.controllerConfig = controllerConfig;
@@ -163,10 +166,25 @@ public class DLedgerController implements Controller {
             () -> this.replicasInfoManager.electMaster(request, 
this.electPolicy), true);
     }
 
+//    @Override
+//    public CompletableFuture<RemotingCommand> 
registerBroker(RegisterBrokerToControllerRequestHeader request) {
+//        return this.scheduler.appendEvent("registerBroker",
+//            () -> this.replicasInfoManager.registerBroker(request, 
brokerAlivePredicate), true);
+//    }
+
+    @Override
+    public CompletableFuture<RemotingCommand> 
getNextBrokerId(GetNextBrokerIdRequestHeader request) {
+        return this.scheduler.appendEvent("getNextBrokerId", () -> 
this.replicasInfoManager.getNextBrokerId(request), false);
+    }
+
+    @Override
+    public CompletableFuture<RemotingCommand> 
applyBrokerId(ApplyBrokerIdRequestHeader request) {
+        return this.scheduler.appendEvent("applyBrokerId", () -> 
this.replicasInfoManager.applyBrokerId(request), true);
+    }
+
     @Override
-    public CompletableFuture<RemotingCommand> 
registerBroker(RegisterBrokerToControllerRequestHeader request) {
-        return this.scheduler.appendEvent("registerBroker",
-            () -> this.replicasInfoManager.registerBroker(request, 
brokerAlivePredicate), true);
+    public CompletableFuture<RemotingCommand> 
registerSuccess(RegisterSuccessRequestHeader request) {
+        return this.scheduler.appendEvent("registerSuccess", () -> 
this.replicasInfoManager.registerSuccess(request, brokerAlivePredicate), true);
     }
 
     @Override
@@ -229,7 +247,7 @@ public class DLedgerController implements Controller {
         return this.dLedgerServer.getMemberState();
     }
 
-    public void setBrokerAlivePredicate(BiPredicate<String, String> 
brokerAlivePredicate) {
+    public void setBrokerAlivePredicate(BrokerValidPredicate 
brokerAlivePredicate) {
         this.brokerAlivePredicate = brokerAlivePredicate;
     }
 
diff --git 
a/controller/src/main/java/org/apache/rocketmq/controller/impl/event/EventType.java
 
b/controller/src/main/java/org/apache/rocketmq/controller/impl/event/EventType.java
index 6f100438e..29aacf7a6 100644
--- 
a/controller/src/main/java/org/apache/rocketmq/controller/impl/event/EventType.java
+++ 
b/controller/src/main/java/org/apache/rocketmq/controller/impl/event/EventType.java
@@ -24,7 +24,9 @@ public enum EventType {
     APPLY_BROKER_ID_EVENT("ApplyBrokerIdEvent", (short) 2),
     ELECT_MASTER_EVENT("ElectMasterEvent", (short) 3),
     READ_EVENT("ReadEvent", (short) 4),
-    CLEAN_BROKER_DATA_EVENT("CleanBrokerDataEvent", (short) 5);
+    CLEAN_BROKER_DATA_EVENT("CleanBrokerDataEvent", (short) 5),
+
+    UPDATE_BROKER_ADDRESS("UpdateBrokerAddressEvent", (short) 6);
 
     private final String name;
     private final short id;
diff --git 
a/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/header/controller/register/GetNextBrokerIdResponseHeader.java
 
b/controller/src/main/java/org/apache/rocketmq/controller/impl/event/UpdateBrokerAddressEvent.java
similarity index 57%
copy from 
remoting/src/main/java/org/apache/rocketmq/remoting/protocol/header/controller/register/GetNextBrokerIdResponseHeader.java
copy to 
controller/src/main/java/org/apache/rocketmq/controller/impl/event/UpdateBrokerAddressEvent.java
index ddec9b0ea..d40121ee0 100644
--- 
a/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/header/controller/register/GetNextBrokerIdResponseHeader.java
+++ 
b/controller/src/main/java/org/apache/rocketmq/controller/impl/event/UpdateBrokerAddressEvent.java
@@ -15,40 +15,53 @@
  * limitations under the License.
  */
 
-package org.apache.rocketmq.remoting.protocol.header.controller.register;
+package org.apache.rocketmq.controller.impl.event;
 
-import org.apache.rocketmq.remoting.CommandCustomHeader;
-import org.apache.rocketmq.remoting.exception.RemotingCommandException;
-
-public class GetNextBrokerIdResponseHeader implements CommandCustomHeader {
+public class UpdateBrokerAddressEvent implements EventMessage {
 
     private String clusterName;
 
     private String brokerName;
 
-    private Long nextBrokerId;
+    private String brokerAddress;
+
+    private Long brokerId;
 
-    public GetNextBrokerIdResponseHeader(String clusterName, String 
brokerName, Long nextBrokerId) {
+    public UpdateBrokerAddressEvent(String clusterName, String brokerName, 
String brokerAddress, Long brokerId) {
         this.clusterName = clusterName;
         this.brokerName = brokerName;
-        this.nextBrokerId = nextBrokerId;
+        this.brokerAddress = brokerAddress;
+        this.brokerId = brokerId;
+    }
+
+    public String getClusterName() {
+        return clusterName;
+    }
+
+    public String getBrokerName() {
+        return brokerName;
+    }
+
+    public String getBrokerAddress() {
+        return brokerAddress;
+    }
+
+    public Long getBrokerId() {
+        return brokerId;
     }
 
     @Override
     public String toString() {
-        return "GetNextBrokerIdResponseHeader{" +
+        return "UpdateBrokerAddressEvent{" +
                 "clusterName='" + clusterName + '\'' +
                 ", brokerName='" + brokerName + '\'' +
-                ", nextBrokerId=" + nextBrokerId +
+                ", brokerAddress='" + brokerAddress + '\'' +
+                ", brokerId=" + brokerId +
                 '}';
     }
 
     @Override
-    public void checkFields() throws RemotingCommandException {
-
-    }
-
-    public Long getNextBrokerId() {
-        return nextBrokerId;
+    public EventType getEventType() {
+        return EventType.UPDATE_BROKER_ADDRESS;
     }
 }
diff --git 
a/controller/src/main/java/org/apache/rocketmq/controller/impl/manager/BrokerReplicaInfo.java
 
b/controller/src/main/java/org/apache/rocketmq/controller/impl/manager/BrokerReplicaInfo.java
index bc60c8b54..abfaf275c 100644
--- 
a/controller/src/main/java/org/apache/rocketmq/controller/impl/manager/BrokerReplicaInfo.java
+++ 
b/controller/src/main/java/org/apache/rocketmq/controller/impl/manager/BrokerReplicaInfo.java
@@ -47,8 +47,8 @@ public class BrokerReplicaInfo {
         this.brokerIdInfo.remove(brokerId);
     }
 
-    public long newBrokerId() {
-        return this.nextAssignBrokerId.getAndIncrement();
+    public Long getNextAssignBrokerId() {
+        return nextAssignBrokerId.get();
     }
 
     public String getClusterName() {
@@ -61,6 +61,7 @@ public class BrokerReplicaInfo {
 
     public void addBroker(final Long brokerId, final String ipAddress, final 
String registerCheckCode) {
         this.brokerIdInfo.put(brokerId, new Pair<>(ipAddress, 
registerCheckCode));
+        this.nextAssignBrokerId.incrementAndGet();
     }
 
     public boolean isBrokerExist(final Long brokerId) {
@@ -85,4 +86,11 @@ public class BrokerReplicaInfo {
         }
         return null;
     }
+
+    public String getBrokerRegisterCheckCode(final Long brokerId) {
+        if (this.brokerIdInfo.containsKey(brokerId)) {
+            return this.brokerIdInfo.get(brokerId).getObject2();
+        }
+        return null;
+    }
 }
diff --git 
a/controller/src/main/java/org/apache/rocketmq/controller/impl/manager/ReplicasInfoManager.java
 
b/controller/src/main/java/org/apache/rocketmq/controller/impl/manager/ReplicasInfoManager.java
index a5ab46bf7..7eca573cf 100644
--- 
a/controller/src/main/java/org/apache/rocketmq/controller/impl/manager/ReplicasInfoManager.java
+++ 
b/controller/src/main/java/org/apache/rocketmq/controller/impl/manager/ReplicasInfoManager.java
@@ -39,6 +39,7 @@ import 
org.apache.rocketmq.controller.impl.event.ControllerResult;
 import org.apache.rocketmq.controller.impl.event.ElectMasterEvent;
 import org.apache.rocketmq.controller.impl.event.EventMessage;
 import org.apache.rocketmq.controller.impl.event.EventType;
+import org.apache.rocketmq.controller.impl.event.UpdateBrokerAddressEvent;
 import org.apache.rocketmq.logging.org.slf4j.Logger;
 import org.apache.rocketmq.logging.org.slf4j.LoggerFactory;
 import org.apache.rocketmq.remoting.protocol.ResponseCode;
@@ -52,8 +53,16 @@ import 
org.apache.rocketmq.remoting.protocol.header.controller.ElectMasterReques
 import 
org.apache.rocketmq.remoting.protocol.header.controller.ElectMasterResponseHeader;
 import 
org.apache.rocketmq.remoting.protocol.header.controller.GetReplicaInfoRequestHeader;
 import 
org.apache.rocketmq.remoting.protocol.header.controller.GetReplicaInfoResponseHeader;
+import 
org.apache.rocketmq.remoting.protocol.header.controller.register.ApplyBrokerIdRequestHeader;
+import 
org.apache.rocketmq.remoting.protocol.header.controller.register.ApplyBrokerIdResponseHeader;
+import 
org.apache.rocketmq.remoting.protocol.header.controller.register.GetNextBrokerIdRequestHeader;
+import 
org.apache.rocketmq.remoting.protocol.header.controller.register.GetNextBrokerIdResponseHeader;
 import 
org.apache.rocketmq.remoting.protocol.header.controller.register.RegisterBrokerToControllerRequestHeader;
 import 
org.apache.rocketmq.remoting.protocol.header.controller.register.RegisterBrokerToControllerResponseHeader;
+import 
org.apache.rocketmq.remoting.protocol.header.controller.register.RegisterSuccessRequestHeader;
+import 
org.apache.rocketmq.remoting.protocol.header.controller.register.RegisterSuccessResponseHeader;
+
+import javax.naming.ldap.Control;
 
 /**
  * The manager that manages the replicas info for all brokers. We can think of 
this class as the controller's memory
@@ -245,46 +254,127 @@ public class ReplicasInfoManager {
         return null;
     }
 
-    public ControllerResult<RegisterBrokerToControllerResponseHeader> 
registerBroker(
-            final RegisterBrokerToControllerRequestHeader request, final 
BiPredicate<String, String> brokerAlivePredicate) {
-        String brokerAddress = request.getBrokerAddress();
-        final String brokerName = request.getBrokerName();
+//    public ControllerResult<RegisterBrokerToControllerResponseHeader> 
registerBroker(
+//            final RegisterBrokerToControllerRequestHeader request, final 
BrokerValidPredicate alivePredicate) {
+//        String brokerAddress = request.getBrokerAddress();
+//        final String brokerName = request.getBrokerName();
+//        final String clusterName = request.getClusterName();
+//        final ControllerResult<RegisterBrokerToControllerResponseHeader> 
result = new ControllerResult<>(new RegisterBrokerToControllerResponseHeader());
+//        final RegisterBrokerToControllerResponseHeader response = 
result.getResponse();
+//        if (!isContainsBroker(brokerName)) {
+//            
result.setCodeAndRemark(ResponseCode.CONTROLLER_BROKER_NEED_TO_BE_REGISTERED, 
"Broker-set hasn't been registered in controller");
+//            return result;
+//        }
+//        final BrokerReplicaInfo brokerReplicaInfo = 
this.replicaInfoTable.get(brokerName);
+//        final SyncStateInfo syncStateInfo = 
this.syncStateSetInfoTable.get(brokerName);
+//        if (brokerReplicaInfo.isBrokerExist())
+//
+//        // If the broker's metadata does not exist in the state machine, we 
can assign the broker a brokerId valued 1
+//        // By default, we set this variable to a value of 1
+//        long brokerId = MixAll.FIRST_SLAVE_ID;
+//        boolean shouldApplyBrokerId = true;
+//        if (isContainsBroker(brokerName)) {
+//            final SyncStateInfo syncStateInfo = 
this.syncStateSetInfoTable.get(brokerName);
+//            final BrokerReplicaInfo brokerReplicaInfo = 
this.replicaInfoTable.get(brokerName);
+//
+//            if (brokerReplicaInfo.isBrokerExist(brokerAddress)) {
+//                // this broker have registered
+//                brokerId = brokerReplicaInfo.getBrokerId(brokerAddress);
+//                shouldApplyBrokerId = false;
+//            } else {
+//                // If this broker replicas is first time come online, we 
need to apply a new id for this replicas.
+//                brokerId = brokerReplicaInfo.newBrokerId();
+//            }
+//
+//            if (syncStateInfo.isMasterExist() && 
brokerAlivePredicate.test(clusterName, syncStateInfo.getMasterAddress())) {
+//                // If the master is alive, just return master info.
+//                final String masterAddress = 
syncStateInfo.getMasterAddress();
+//                response.setMasterAddress(masterAddress);
+//                response.setMasterEpoch(syncStateInfo.getMasterEpoch());
+//                
response.setSyncStateSetEpoch(syncStateInfo.getSyncStateSetEpoch());
+//            }
+//        }
+//
+//        response.setBrokerId(brokerId);
+//        if (response.getMasterAddress() == null) {
+//            response.setMasterAddress("");
+//        }
+//        if (shouldApplyBrokerId) {
+//            final ApplyBrokerIdEvent applyIdEvent = new 
ApplyBrokerIdEvent(request.getClusterName(), brokerName, brokerAddress, 
brokerId);
+//            result.addEvent(applyIdEvent);
+//        }
+//        return result;
+//    }
+
+    public ControllerResult<GetNextBrokerIdResponseHeader> 
getNextBrokerId(final GetNextBrokerIdRequestHeader request) {
         final String clusterName = request.getClusterName();
-        final ControllerResult<RegisterBrokerToControllerResponseHeader> 
result = new ControllerResult<>(new RegisterBrokerToControllerResponseHeader());
-        final RegisterBrokerToControllerResponseHeader response = 
result.getResponse();
-        // If the broker's metadata does not exist in the state machine, we 
can assign the broker a brokerId valued 1
-        // By default, we set this variable to a value of 1
-        long brokerId = MixAll.FIRST_SLAVE_ID;
-        boolean shouldApplyBrokerId = true;
-        if (isContainsBroker(brokerName)) {
-            final SyncStateInfo syncStateInfo = 
this.syncStateSetInfoTable.get(brokerName);
-            final BrokerReplicaInfo brokerReplicaInfo = 
this.replicaInfoTable.get(brokerName);
+        final String brokerName = request.getBrokerName();
+        BrokerReplicaInfo brokerReplicaInfo = 
this.replicaInfoTable.get(brokerName);
+        final ControllerResult<GetNextBrokerIdResponseHeader> result = new 
ControllerResult<>(new GetNextBrokerIdResponseHeader(clusterName, brokerName));
+        final GetNextBrokerIdResponseHeader response = result.getResponse();
+        if (brokerReplicaInfo == null) {
+            // means that none of brokers in this broker-set are registered
+            response.setNextBrokerId(MixAll.FIRST_SLAVE_ID);
+        } else {
+            
response.setNextBrokerId(brokerReplicaInfo.getNextAssignBrokerId());
+        }
+        return result;
+    }
 
-            if (brokerReplicaInfo.isBrokerExist(brokerAddress)) {
-                // this broker have registered
-                brokerId = brokerReplicaInfo.getBrokerId(brokerAddress);
-                shouldApplyBrokerId = false;
+    public ControllerResult<ApplyBrokerIdResponseHeader> applyBrokerId(final 
ApplyBrokerIdRequestHeader request) {
+        final String clusterName = request.getClusterName();
+        final String brokerName = request.getBrokerName();
+        final Long brokerId = request.getAppliedBrokerId();
+        final String registerCheckCode = request.getRegisterCheckCode();
+        final String brokerAddress = registerCheckCode.split(";")[0];
+        BrokerReplicaInfo brokerReplicaInfo = 
this.replicaInfoTable.get(brokerName);
+        final ControllerResult<ApplyBrokerIdResponseHeader> result = new 
ControllerResult<>(new ApplyBrokerIdResponseHeader(clusterName, brokerName));
+        final ApplyBrokerIdEvent event = new ApplyBrokerIdEvent(clusterName, 
brokerName, brokerAddress, brokerId, registerCheckCode);
+        // broker-set unregistered
+        if (brokerReplicaInfo == null) {
+            // first brokerId
+            if (brokerId == MixAll.FIRST_SLAVE_ID) {
+                result.addEvent(event);
             } else {
-                // If this broker replicas is first time come online, we need 
to apply a new id for this replicas.
-                brokerId = brokerReplicaInfo.newBrokerId();
-            }
-
-            if (syncStateInfo.isMasterExist() && 
brokerAlivePredicate.test(clusterName, syncStateInfo.getMasterAddress())) {
-                // If the master is alive, just return master info.
-                final String masterAddress = syncStateInfo.getMasterAddress();
-                response.setMasterAddress(masterAddress);
-                response.setMasterEpoch(syncStateInfo.getMasterEpoch());
-                
response.setSyncStateSetEpoch(syncStateInfo.getSyncStateSetEpoch());
+                
result.setCodeAndRemark(ResponseCode.CONTROLLER_BROKER_ID_INVALID, 
String.format("Broker-set: %s hasn't been registered in controller, but broker 
try to apply brokerId: %d", brokerName, brokerId));
             }
+            return result;
+        }
+        // broker-set registered
+        if (!brokerReplicaInfo.isBrokerExist(brokerId) || 
registerCheckCode.equals(brokerReplicaInfo.getBrokerRegisterCheckCode(brokerId)))
 {
+            // if brokerId hasn't been assigned or brokerId was assigned to 
this broker
+            return result;
         }
+        result.setCodeAndRemark(ResponseCode.CONTROLLER_BROKER_ID_INVALID, 
String.format("Fail to apply brokerId: %d in broker-set: %s", brokerId, 
brokerName));
+        return result;
+    }
 
-        response.setBrokerId(brokerId);
-        if (response.getMasterAddress() == null) {
-            response.setMasterAddress("");
+    public ControllerResult<RegisterSuccessResponseHeader> 
registerSuccess(final RegisterSuccessRequestHeader request, final 
BrokerValidPredicate alivePredicate) {
+        final String brokerAddress = request.getBrokerAddress();
+        final String brokerName = request.getBrokerName();
+        final String clusterName = request.getClusterName();
+        final Long brokerId = request.getBrokerId();
+        final ControllerResult<RegisterSuccessResponseHeader> result = new 
ControllerResult<>(new RegisterSuccessResponseHeader(clusterName, brokerName));
+        final RegisterSuccessResponseHeader response = result.getResponse();
+        if (!isContainsBroker(brokerName)) {
+            
result.setCodeAndRemark(ResponseCode.CONTROLLER_BROKER_NEED_TO_BE_REGISTERED, 
String.format("Broker-set: %s hasn't been registered in controller", 
brokerName));
+            return result;
+        }
+        final BrokerReplicaInfo brokerReplicaInfo = 
this.replicaInfoTable.get(brokerName);
+        final SyncStateInfo syncStateInfo = 
this.syncStateSetInfoTable.get(brokerName);
+        if (!brokerReplicaInfo.isBrokerExist(brokerId)) {
+            
result.setCodeAndRemark(ResponseCode.CONTROLLER_BROKER_NEED_TO_BE_REGISTERED, 
String.format("BrokerId: %d hasn't been registered in broker-set: %s", 
brokerId, brokerName));
+            return result;
+        }
+        if (syncStateInfo.isMasterExist() && alivePredicate.check(clusterName, 
brokerName, syncStateInfo.getMasterBrokerId())) {
+            // if master still exist
+            response.setMasterBrokerId(syncStateInfo.getMasterBrokerId());
+            
response.setMasterAddress(brokerReplicaInfo.getBrokerAddress(response.getMasterBrokerId()));
         }
-        if (shouldApplyBrokerId) {
-            final ApplyBrokerIdEvent applyIdEvent = new 
ApplyBrokerIdEvent(request.getClusterName(), brokerName, brokerAddress, 
brokerId);
-            result.addEvent(applyIdEvent);
+        // if this broker's address has been changed, we need to update it
+        if 
(!brokerAddress.equals(brokerReplicaInfo.getBrokerAddress(brokerId))) {
+            final UpdateBrokerAddressEvent event = new 
UpdateBrokerAddressEvent(clusterName, brokerName, brokerAddress, brokerId);
+            result.addEvent(event);
         }
         return result;
     }
@@ -487,4 +577,5 @@ public class ReplicasInfoManager {
     private boolean isContainsBroker(final String brokerName) {
         return this.replicaInfoTable.containsKey(brokerName) && 
this.syncStateSetInfoTable.containsKey(brokerName);
     }
+
 }
diff --git 
a/controller/src/main/java/org/apache/rocketmq/controller/processor/ControllerRequestProcessor.java
 
b/controller/src/main/java/org/apache/rocketmq/controller/processor/ControllerRequestProcessor.java
index 6dea0e1d7..da1de6ef1 100644
--- 
a/controller/src/main/java/org/apache/rocketmq/controller/processor/ControllerRequestProcessor.java
+++ 
b/controller/src/main/java/org/apache/rocketmq/controller/processor/ControllerRequestProcessor.java
@@ -18,6 +18,7 @@ package org.apache.rocketmq.controller.processor;
 
 import io.netty.channel.ChannelHandlerContext;
 import java.io.UnsupportedEncodingException;
+import java.sql.Time;
 import java.util.List;
 import java.util.Properties;
 import java.util.concurrent.CompletableFuture;
@@ -36,6 +37,9 @@ import 
org.apache.rocketmq.remoting.protocol.RemotingSerializable;
 import org.apache.rocketmq.remoting.protocol.ResponseCode;
 import org.apache.rocketmq.remoting.protocol.body.RoleChangeNotifyEntry;
 import org.apache.rocketmq.remoting.protocol.body.SyncStateSet;
+import 
org.apache.rocketmq.remoting.protocol.header.controller.register.ApplyBrokerIdRequestHeader;
+import 
org.apache.rocketmq.remoting.protocol.header.controller.register.GetNextBrokerIdRequestHeader;
+import 
org.apache.rocketmq.remoting.protocol.header.controller.register.RegisterSuccessRequestHeader;
 import 
org.apache.rocketmq.remoting.protocol.header.namesrv.BrokerHeartbeatRequestHeader;
 import 
org.apache.rocketmq.remoting.protocol.header.controller.AlterSyncStateSetRequestHeader;
 import 
org.apache.rocketmq.remoting.protocol.header.controller.admin.CleanControllerBrokerDataRequestHeader;
@@ -206,6 +210,33 @@ public class ControllerRequestProcessor implements 
NettyRequestProcessor {
         return RemotingCommand.createResponseCommand(null);
     }
 
+    private RemotingCommand handleGetNextBrokerId(ChannelHandlerContext ctx, 
RemotingCommand request) throws Exception {
+        final GetNextBrokerIdRequestHeader requestHeader = 
(GetNextBrokerIdRequestHeader) 
request.decodeCommandCustomHeader(GetNextBrokerIdRequestHeader.class);
+        CompletableFuture<RemotingCommand> future = 
this.controllerManager.getController().getNextBrokerId(requestHeader);
+        if (future != null) {
+            return future.get(WAIT_TIMEOUT_OUT, TimeUnit.SECONDS);
+        }
+        return RemotingCommand.createResponseCommand(null);
+    }
+
+    private RemotingCommand handleApplyBrokerId(ChannelHandlerContext ctx, 
RemotingCommand request) throws Exception {
+        final ApplyBrokerIdRequestHeader requestHeader = 
(ApplyBrokerIdRequestHeader) 
request.decodeCommandCustomHeader(ApplyBrokerIdRequestHeader.class);
+        CompletableFuture<RemotingCommand> future = 
this.controllerManager.getController().applyBrokerId(requestHeader);
+        if (future != null) {
+            return future.get(WAIT_TIMEOUT_OUT, TimeUnit.SECONDS);
+        }
+        return RemotingCommand.createResponseCommand(null);
+    }
+
+    private RemotingCommand handleRegisterSuccess(ChannelHandlerContext ctx, 
RemotingCommand request) throws Exception {
+        RegisterSuccessRequestHeader requestHeader = 
(RegisterSuccessRequestHeader) 
request.decodeCommandCustomHeader(RegisterSuccessRequestHeader.class);
+        CompletableFuture<RemotingCommand> future = 
this.controllerManager.getController().registerSuccess(requestHeader);
+        if (future != null) {
+            return future.get(WAIT_TIMEOUT_OUT, TimeUnit.SECONDS);
+        }
+        return RemotingCommand.createResponseCommand(null);
+    }
+
     private RemotingCommand handleUpdateControllerConfig(ChannelHandlerContext 
ctx, RemotingCommand request) {
         if (ctx != null) {
             log.info("updateConfig called by {}", 
RemotingHelper.parseChannelRemoteAddr(ctx.channel()));
diff --git 
a/controller/src/test/java/org/apache/rocketmq/controller/impl/controller/impl/DLedgerControllerTest.java
 
b/controller/src/test/java/org/apache/rocketmq/controller/impl/controller/impl/DLedgerControllerTest.java
index 239094c29..9cfd1146e 100644
--- 
a/controller/src/test/java/org/apache/rocketmq/controller/impl/controller/impl/DLedgerControllerTest.java
+++ 
b/controller/src/test/java/org/apache/rocketmq/controller/impl/controller/impl/DLedgerControllerTest.java
@@ -73,7 +73,7 @@ public class DLedgerControllerTest {
         config.setMappedFileSize(10 * 1024 * 1024);
         config.setEnableElectUncleanMaster(isEnableElectUncleanMaster);
 
-        final DLedgerController controller = new DLedgerController(config, 
(str1, str2) -> true);
+        final DLedgerController controller = new DLedgerController(config, 
(str1, str2, str3) -> true);
 
         controller.startup();
         return controller;
@@ -198,7 +198,7 @@ public class DLedgerControllerTest {
     }
 
     public void setBrokerAlivePredicate(DLedgerController controller, 
String... deathBroker) {
-        controller.setBrokerAlivePredicate((clusterName, brokerAddress) -> {
+        controller.setBrokerAlivePredicate((clusterName, brokerName, 
brokerAddress) -> {
             for (String broker : deathBroker) {
                 if (broker.equals(brokerAddress)) {
                     return false;
@@ -209,7 +209,7 @@ public class DLedgerControllerTest {
     }
 
     public void setBrokerElectPolicy(DLedgerController controller, String... 
deathBroker) {
-        controller.setElectPolicy(new DefaultElectPolicy((clusterName, 
brokerAddress) -> {
+        controller.setElectPolicy(new DefaultElectPolicy((clusterName, 
brokerName, brokerAddress) -> {
             for (String broker : deathBroker) {
                 if (broker.equals(brokerAddress)) {
                     return false;
diff --git 
a/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/header/controller/register/ApplyBrokerIdRequestHeader.java
 
b/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/header/controller/register/ApplyBrokerIdRequestHeader.java
index 780f519a7..bfc103eb3 100644
--- 
a/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/header/controller/register/ApplyBrokerIdRequestHeader.java
+++ 
b/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/header/controller/register/ApplyBrokerIdRequestHeader.java
@@ -41,4 +41,20 @@ public class ApplyBrokerIdRequestHeader implements 
CommandCustomHeader {
     public void checkFields() throws RemotingCommandException {
 
     }
+
+    public String getClusterName() {
+        return clusterName;
+    }
+
+    public String getBrokerName() {
+        return brokerName;
+    }
+
+    public Long getAppliedBrokerId() {
+        return appliedBrokerId;
+    }
+
+    public String getRegisterCheckCode() {
+        return registerCheckCode;
+    }
 }
diff --git 
a/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/header/controller/register/ApplyBrokerIdResponseHeader.java
 
b/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/header/controller/register/ApplyBrokerIdResponseHeader.java
index d83164747..1221c206d 100644
--- 
a/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/header/controller/register/ApplyBrokerIdResponseHeader.java
+++ 
b/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/header/controller/register/ApplyBrokerIdResponseHeader.java
@@ -27,7 +27,6 @@ public class ApplyBrokerIdResponseHeader implements 
CommandCustomHeader {
     private String brokerName;
 
 
-
     public ApplyBrokerIdResponseHeader(String clusterName, String brokerName) {
         this.clusterName = clusterName;
         this.brokerName = brokerName;
diff --git 
a/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/header/controller/register/GetNextBrokerIdRequestHeader.java
 
b/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/header/controller/register/GetNextBrokerIdRequestHeader.java
index eee82a8f4..90361ff74 100644
--- 
a/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/header/controller/register/GetNextBrokerIdRequestHeader.java
+++ 
b/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/header/controller/register/GetNextBrokerIdRequestHeader.java
@@ -43,4 +43,12 @@ public class GetNextBrokerIdRequestHeader implements 
CommandCustomHeader {
     public void checkFields() throws RemotingCommandException {
 
     }
+
+    public String getClusterName() {
+        return clusterName;
+    }
+
+    public String getBrokerName() {
+        return brokerName;
+    }
 }
diff --git 
a/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/header/controller/register/GetNextBrokerIdResponseHeader.java
 
b/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/header/controller/register/GetNextBrokerIdResponseHeader.java
index ddec9b0ea..3fece1768 100644
--- 
a/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/header/controller/register/GetNextBrokerIdResponseHeader.java
+++ 
b/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/header/controller/register/GetNextBrokerIdResponseHeader.java
@@ -28,6 +28,10 @@ public class GetNextBrokerIdResponseHeader implements 
CommandCustomHeader {
 
     private Long nextBrokerId;
 
+    public GetNextBrokerIdResponseHeader(String clusterName, String 
brokerName) {
+        this(clusterName, brokerName, null);
+    }
+
     public GetNextBrokerIdResponseHeader(String clusterName, String 
brokerName, Long nextBrokerId) {
         this.clusterName = clusterName;
         this.brokerName = brokerName;
@@ -48,6 +52,10 @@ public class GetNextBrokerIdResponseHeader implements 
CommandCustomHeader {
 
     }
 
+    public void setNextBrokerId(Long nextBrokerId) {
+        this.nextBrokerId = nextBrokerId;
+    }
+
     public Long getNextBrokerId() {
         return nextBrokerId;
     }
diff --git 
a/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/header/controller/register/RegisterBrokerToControllerRequestHeader.java
 
b/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/header/controller/register/RegisterBrokerToControllerRequestHeader.java
index f67df18b6..f36b22170 100644
--- 
a/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/header/controller/register/RegisterBrokerToControllerRequestHeader.java
+++ 
b/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/header/controller/register/RegisterBrokerToControllerRequestHeader.java
@@ -24,6 +24,7 @@ public class RegisterBrokerToControllerRequestHeader 
implements CommandCustomHea
     private String clusterName;
     private String brokerName;
     private String brokerAddress;
+    private Long brokerId;
     @CFNullable
     private Integer epoch;
     @CFNullable
@@ -38,20 +39,21 @@ public class RegisterBrokerToControllerRequestHeader 
implements CommandCustomHea
     public RegisterBrokerToControllerRequestHeader() {
     }
 
-    public RegisterBrokerToControllerRequestHeader(String clusterName, String 
brokerName, String brokerAddress) {
-        this(clusterName, brokerName, brokerAddress, 0);
+    public RegisterBrokerToControllerRequestHeader(String clusterName, String 
brokerName, String brokerAddress, Long brokerId) {
+        this(clusterName, brokerName, brokerAddress, brokerId, 0);
     }
 
-    public RegisterBrokerToControllerRequestHeader(String clusterName, String 
brokerName, String brokerAddress,
+    public RegisterBrokerToControllerRequestHeader(String clusterName, String 
brokerName, String brokerAddress, Long brokerId,
         int electionPriority) {
-        this(clusterName, brokerName, brokerAddress, null, 0, 0, 
electionPriority);
+        this(clusterName, brokerName, brokerAddress, brokerId, null, 0, 0, 
electionPriority);
     }
 
     public RegisterBrokerToControllerRequestHeader(String clusterName, String 
brokerName, String brokerAddress,
-        Long heartbeatTimeoutMillis, int epoch, long maxOffset, int 
electionPriority) {
+        Long brokerId, Long heartbeatTimeoutMillis, int epoch, long maxOffset, 
int electionPriority) {
         this.clusterName = clusterName;
         this.brokerName = brokerName;
         this.brokerAddress = brokerAddress;
+        this.brokerId = brokerId;
         this.heartbeatTimeoutMillis = heartbeatTimeoutMillis;
         this.epoch = epoch;
         this.maxOffset = maxOffset;
diff --git 
a/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/header/controller/register/RegisterSuccessRequestHeader.java
 
b/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/header/controller/register/RegisterSuccessRequestHeader.java
index 721509ac9..db5808d6d 100644
--- 
a/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/header/controller/register/RegisterSuccessRequestHeader.java
+++ 
b/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/header/controller/register/RegisterSuccessRequestHeader.java
@@ -41,4 +41,20 @@ public class RegisterSuccessRequestHeader implements 
CommandCustomHeader {
     public void checkFields() throws RemotingCommandException {
 
     }
+
+    public String getClusterName() {
+        return clusterName;
+    }
+
+    public String getBrokerName() {
+        return brokerName;
+    }
+
+    public Long getBrokerId() {
+        return brokerId;
+    }
+
+    public String getBrokerAddress() {
+        return brokerAddress;
+    }
 }
diff --git 
a/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/header/controller/register/RegisterSuccessResponseHeader.java
 
b/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/header/controller/register/RegisterSuccessResponseHeader.java
index 953190480..61e5d8ea1 100644
--- 
a/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/header/controller/register/RegisterSuccessResponseHeader.java
+++ 
b/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/header/controller/register/RegisterSuccessResponseHeader.java
@@ -22,9 +22,45 @@ import 
org.apache.rocketmq.remoting.exception.RemotingCommandException;
 
 public class RegisterSuccessResponseHeader implements CommandCustomHeader {
 
+    private String clusterName;
+
+    private String brokerName;
+
+    private Long masterBrokerId;
+
+    private String masterAddress;
+
     @Override
     public void checkFields() throws RemotingCommandException {
 
     }
 
+    public RegisterSuccessResponseHeader(String clusterName, String 
brokerName) {
+        this.clusterName = clusterName;
+        this.brokerName = brokerName;
+    }
+
+    public void setMasterBrokerId(Long masterBrokerId) {
+        this.masterBrokerId = masterBrokerId;
+    }
+
+    public void setMasterAddress(String masterAddress) {
+        this.masterAddress = masterAddress;
+    }
+
+    public String getClusterName() {
+        return clusterName;
+    }
+
+    public String getBrokerName() {
+        return brokerName;
+    }
+
+    public Long getMasterBrokerId() {
+        return masterBrokerId;
+    }
+
+    public String getMasterAddress() {
+        return masterAddress;
+    }
 }


Reply via email to