This is an automated email from the ASF dual-hosted git repository. jinrongtong pushed a commit to branch dledger-controller-brokerId in repository https://gitbox.apache.org/repos/asf/rocketmq.git
commit c1bc01263e7675ed0506e38c4b09924e63957e9e Author: TheR1sing3un <[email protected]> AuthorDate: Sun Feb 5 14:45:46 2023 +0800 feat(controller): implement the general register to controller protocol in controller side 1. implement the general register to controller protocol in controller side --- .../broker/controller/ReplicasManager.java | 2 +- .../apache/rocketmq/broker/out/BrokerOuterAPI.java | 4 +- .../org/apache/rocketmq/controller/Controller.java | 11 +- .../controller/impl/DLedgerController.java | 36 +++-- .../rocketmq/controller/impl/event/EventType.java | 4 +- .../impl/event/UpdateBrokerAddressEvent.java | 45 +++--- .../controller/impl/manager/BrokerReplicaInfo.java | 12 +- .../impl/manager/ReplicasInfoManager.java | 157 ++++++++++++++++----- .../processor/ControllerRequestProcessor.java | 31 ++++ .../controller/impl/DLedgerControllerTest.java | 6 +- .../register/ApplyBrokerIdRequestHeader.java | 16 +++ .../register/ApplyBrokerIdResponseHeader.java | 1 - .../register/GetNextBrokerIdRequestHeader.java | 8 ++ .../register/GetNextBrokerIdResponseHeader.java | 8 ++ .../RegisterBrokerToControllerRequestHeader.java | 12 +- .../register/RegisterSuccessRequestHeader.java | 16 +++ .../register/RegisterSuccessResponseHeader.java | 36 +++++ 17 files changed, 331 insertions(+), 74 deletions(-) diff --git a/broker/src/main/java/org/apache/rocketmq/broker/controller/ReplicasManager.java b/broker/src/main/java/org/apache/rocketmq/broker/controller/ReplicasManager.java index e3c9382f8..b2b4a9163 100644 --- a/broker/src/main/java/org/apache/rocketmq/broker/controller/ReplicasManager.java +++ b/broker/src/main/java/org/apache/rocketmq/broker/controller/ReplicasManager.java @@ -369,7 +369,7 @@ public class ReplicasManager { // Register this broker to controller to get a stable and credible broker id, and persist metadata to local file. try { final RegisterBrokerToControllerResponseHeader registerResponse = this.brokerOuterAPI.registerBrokerToController(this.controllerLeaderAddress, - this.brokerConfig.getBrokerClusterName(), this.brokerConfig.getBrokerName(), this.localAddress, this.brokerConfig.getControllerHeartBeatTimeoutMills(), + this.brokerConfig.getBrokerClusterName(), this.brokerConfig.getBrokerName(), this.localAddress, this.brokerId, this.brokerConfig.getControllerHeartBeatTimeoutMills(), this.haService.getLastEpoch(), this.brokerController.getMessageStore().getMaxPhyOffset(), this.brokerConfig.getBrokerElectionPriority()); final String newMasterAddress = registerResponse.getMasterAddress(); if (StringUtils.isNoneEmpty(newMasterAddress)) { diff --git a/broker/src/main/java/org/apache/rocketmq/broker/out/BrokerOuterAPI.java b/broker/src/main/java/org/apache/rocketmq/broker/out/BrokerOuterAPI.java index 82e9ea33e..054f1edaa 100644 --- a/broker/src/main/java/org/apache/rocketmq/broker/out/BrokerOuterAPI.java +++ b/broker/src/main/java/org/apache/rocketmq/broker/out/BrokerOuterAPI.java @@ -1202,10 +1202,10 @@ public class BrokerOuterAPI { */ public RegisterBrokerToControllerResponseHeader registerBrokerToController( final String controllerAddress, final String clusterName, - final String brokerName, final String address, final long controllerHeartbeatTimeoutMills, final int epoch, + final String brokerName, final String brokerAddress, final Long brokerId, final long controllerHeartbeatTimeoutMills, final int epoch, final long maxOffset, final int electionPriority) throws Exception { - final RegisterBrokerToControllerRequestHeader requestHeader = new RegisterBrokerToControllerRequestHeader(clusterName, brokerName, address, controllerHeartbeatTimeoutMills, epoch, maxOffset, electionPriority); + final RegisterBrokerToControllerRequestHeader requestHeader = new RegisterBrokerToControllerRequestHeader(clusterName, brokerName, brokerAddress, brokerId, controllerHeartbeatTimeoutMills, epoch, maxOffset, electionPriority); final RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.CONTROLLER_REGISTER_BROKER, requestHeader); final RemotingCommand response = this.remotingClient.invokeSync(controllerAddress, request, 3000); assert response != null; diff --git a/controller/src/main/java/org/apache/rocketmq/controller/Controller.java b/controller/src/main/java/org/apache/rocketmq/controller/Controller.java index 963f35058..5c0402dad 100644 --- a/controller/src/main/java/org/apache/rocketmq/controller/Controller.java +++ b/controller/src/main/java/org/apache/rocketmq/controller/Controller.java @@ -26,7 +26,10 @@ import org.apache.rocketmq.remoting.protocol.header.controller.AlterSyncStateSet import org.apache.rocketmq.remoting.protocol.header.controller.admin.CleanControllerBrokerDataRequestHeader; import org.apache.rocketmq.remoting.protocol.header.controller.ElectMasterRequestHeader; import org.apache.rocketmq.remoting.protocol.header.controller.GetReplicaInfoRequestHeader; +import org.apache.rocketmq.remoting.protocol.header.controller.register.ApplyBrokerIdRequestHeader; +import org.apache.rocketmq.remoting.protocol.header.controller.register.GetNextBrokerIdRequestHeader; import org.apache.rocketmq.remoting.protocol.header.controller.register.RegisterBrokerToControllerRequestHeader; +import org.apache.rocketmq.remoting.protocol.header.controller.register.RegisterSuccessRequestHeader; /** * The api for controller @@ -81,7 +84,13 @@ public interface Controller { * @param request RegisterBrokerRequest * @return RemotingCommand(RegisterBrokerResponseHeader) */ - CompletableFuture<RemotingCommand> registerBroker(final RegisterBrokerToControllerRequestHeader request); + // CompletableFuture<RemotingCommand> registerBroker(final RegisterBrokerToControllerRequestHeader request); + + CompletableFuture<RemotingCommand> getNextBrokerId(final GetNextBrokerIdRequestHeader request); + + CompletableFuture<RemotingCommand> applyBrokerId(final ApplyBrokerIdRequestHeader request); + + CompletableFuture<RemotingCommand> registerSuccess(final RegisterSuccessRequestHeader request); /** * Get the Replica Info for a target broker. diff --git a/controller/src/main/java/org/apache/rocketmq/controller/impl/DLedgerController.java b/controller/src/main/java/org/apache/rocketmq/controller/impl/DLedgerController.java index 3f0aef746..58870cce1 100644 --- a/controller/src/main/java/org/apache/rocketmq/controller/impl/DLedgerController.java +++ b/controller/src/main/java/org/apache/rocketmq/controller/impl/DLedgerController.java @@ -34,7 +34,6 @@ import java.util.concurrent.Executors; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; -import java.util.function.BiPredicate; import java.util.function.Supplier; import org.apache.rocketmq.common.ControllerConfig; import org.apache.rocketmq.common.ServiceThread; @@ -43,6 +42,7 @@ import org.apache.rocketmq.common.constant.LoggerName; import org.apache.rocketmq.controller.Controller; import org.apache.rocketmq.controller.elect.ElectPolicy; import org.apache.rocketmq.controller.elect.impl.DefaultElectPolicy; +import org.apache.rocketmq.controller.helper.BrokerValidPredicate; import org.apache.rocketmq.controller.impl.event.ControllerResult; import org.apache.rocketmq.controller.impl.event.EventMessage; import org.apache.rocketmq.controller.impl.event.EventSerializer; @@ -62,7 +62,9 @@ import org.apache.rocketmq.remoting.protocol.header.controller.admin.CleanContro import org.apache.rocketmq.remoting.protocol.header.controller.ElectMasterRequestHeader; import org.apache.rocketmq.remoting.protocol.header.controller.GetMetaDataResponseHeader; import org.apache.rocketmq.remoting.protocol.header.controller.GetReplicaInfoRequestHeader; -import org.apache.rocketmq.remoting.protocol.header.controller.register.RegisterBrokerToControllerRequestHeader; +import org.apache.rocketmq.remoting.protocol.header.controller.register.ApplyBrokerIdRequestHeader; +import org.apache.rocketmq.remoting.protocol.header.controller.register.GetNextBrokerIdRequestHeader; +import org.apache.rocketmq.remoting.protocol.header.controller.register.RegisterSuccessRequestHeader; /** * The implementation of controller, based on DLedger (raft). @@ -78,19 +80,20 @@ public class DLedgerController implements Controller { private final EventSerializer eventSerializer; private final RoleChangeHandler roleHandler; private final DLedgerControllerStateMachine statemachine; + // Usr for checking whether the broker is alive - private BiPredicate<String, String> brokerAlivePredicate; + private BrokerValidPredicate brokerAlivePredicate; // use for elect a master private ElectPolicy electPolicy; private AtomicBoolean isScheduling = new AtomicBoolean(false); - public DLedgerController(final ControllerConfig config, final BiPredicate<String, String> brokerAlivePredicate) { + public DLedgerController(final ControllerConfig config, final BrokerValidPredicate brokerAlivePredicate) { this(config, brokerAlivePredicate, null, null, null, null); } public DLedgerController(final ControllerConfig controllerConfig, - final BiPredicate<String, String> brokerAlivePredicate, final NettyServerConfig nettyServerConfig, + final BrokerValidPredicate brokerAlivePredicate, final NettyServerConfig nettyServerConfig, final NettyClientConfig nettyClientConfig, final ChannelEventListener channelEventListener, final ElectPolicy electPolicy) { this.controllerConfig = controllerConfig; @@ -163,10 +166,25 @@ public class DLedgerController implements Controller { () -> this.replicasInfoManager.electMaster(request, this.electPolicy), true); } +// @Override +// public CompletableFuture<RemotingCommand> registerBroker(RegisterBrokerToControllerRequestHeader request) { +// return this.scheduler.appendEvent("registerBroker", +// () -> this.replicasInfoManager.registerBroker(request, brokerAlivePredicate), true); +// } + + @Override + public CompletableFuture<RemotingCommand> getNextBrokerId(GetNextBrokerIdRequestHeader request) { + return this.scheduler.appendEvent("getNextBrokerId", () -> this.replicasInfoManager.getNextBrokerId(request), false); + } + + @Override + public CompletableFuture<RemotingCommand> applyBrokerId(ApplyBrokerIdRequestHeader request) { + return this.scheduler.appendEvent("applyBrokerId", () -> this.replicasInfoManager.applyBrokerId(request), true); + } + @Override - public CompletableFuture<RemotingCommand> registerBroker(RegisterBrokerToControllerRequestHeader request) { - return this.scheduler.appendEvent("registerBroker", - () -> this.replicasInfoManager.registerBroker(request, brokerAlivePredicate), true); + public CompletableFuture<RemotingCommand> registerSuccess(RegisterSuccessRequestHeader request) { + return this.scheduler.appendEvent("registerSuccess", () -> this.replicasInfoManager.registerSuccess(request, brokerAlivePredicate), true); } @Override @@ -229,7 +247,7 @@ public class DLedgerController implements Controller { return this.dLedgerServer.getMemberState(); } - public void setBrokerAlivePredicate(BiPredicate<String, String> brokerAlivePredicate) { + public void setBrokerAlivePredicate(BrokerValidPredicate brokerAlivePredicate) { this.brokerAlivePredicate = brokerAlivePredicate; } diff --git a/controller/src/main/java/org/apache/rocketmq/controller/impl/event/EventType.java b/controller/src/main/java/org/apache/rocketmq/controller/impl/event/EventType.java index 6f100438e..29aacf7a6 100644 --- a/controller/src/main/java/org/apache/rocketmq/controller/impl/event/EventType.java +++ b/controller/src/main/java/org/apache/rocketmq/controller/impl/event/EventType.java @@ -24,7 +24,9 @@ public enum EventType { APPLY_BROKER_ID_EVENT("ApplyBrokerIdEvent", (short) 2), ELECT_MASTER_EVENT("ElectMasterEvent", (short) 3), READ_EVENT("ReadEvent", (short) 4), - CLEAN_BROKER_DATA_EVENT("CleanBrokerDataEvent", (short) 5); + CLEAN_BROKER_DATA_EVENT("CleanBrokerDataEvent", (short) 5), + + UPDATE_BROKER_ADDRESS("UpdateBrokerAddressEvent", (short) 6); private final String name; private final short id; diff --git a/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/header/controller/register/GetNextBrokerIdResponseHeader.java b/controller/src/main/java/org/apache/rocketmq/controller/impl/event/UpdateBrokerAddressEvent.java similarity index 57% copy from remoting/src/main/java/org/apache/rocketmq/remoting/protocol/header/controller/register/GetNextBrokerIdResponseHeader.java copy to controller/src/main/java/org/apache/rocketmq/controller/impl/event/UpdateBrokerAddressEvent.java index ddec9b0ea..d40121ee0 100644 --- a/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/header/controller/register/GetNextBrokerIdResponseHeader.java +++ b/controller/src/main/java/org/apache/rocketmq/controller/impl/event/UpdateBrokerAddressEvent.java @@ -15,40 +15,53 @@ * limitations under the License. */ -package org.apache.rocketmq.remoting.protocol.header.controller.register; +package org.apache.rocketmq.controller.impl.event; -import org.apache.rocketmq.remoting.CommandCustomHeader; -import org.apache.rocketmq.remoting.exception.RemotingCommandException; - -public class GetNextBrokerIdResponseHeader implements CommandCustomHeader { +public class UpdateBrokerAddressEvent implements EventMessage { private String clusterName; private String brokerName; - private Long nextBrokerId; + private String brokerAddress; + + private Long brokerId; - public GetNextBrokerIdResponseHeader(String clusterName, String brokerName, Long nextBrokerId) { + public UpdateBrokerAddressEvent(String clusterName, String brokerName, String brokerAddress, Long brokerId) { this.clusterName = clusterName; this.brokerName = brokerName; - this.nextBrokerId = nextBrokerId; + this.brokerAddress = brokerAddress; + this.brokerId = brokerId; + } + + public String getClusterName() { + return clusterName; + } + + public String getBrokerName() { + return brokerName; + } + + public String getBrokerAddress() { + return brokerAddress; + } + + public Long getBrokerId() { + return brokerId; } @Override public String toString() { - return "GetNextBrokerIdResponseHeader{" + + return "UpdateBrokerAddressEvent{" + "clusterName='" + clusterName + '\'' + ", brokerName='" + brokerName + '\'' + - ", nextBrokerId=" + nextBrokerId + + ", brokerAddress='" + brokerAddress + '\'' + + ", brokerId=" + brokerId + '}'; } @Override - public void checkFields() throws RemotingCommandException { - - } - - public Long getNextBrokerId() { - return nextBrokerId; + public EventType getEventType() { + return EventType.UPDATE_BROKER_ADDRESS; } } diff --git a/controller/src/main/java/org/apache/rocketmq/controller/impl/manager/BrokerReplicaInfo.java b/controller/src/main/java/org/apache/rocketmq/controller/impl/manager/BrokerReplicaInfo.java index bc60c8b54..abfaf275c 100644 --- a/controller/src/main/java/org/apache/rocketmq/controller/impl/manager/BrokerReplicaInfo.java +++ b/controller/src/main/java/org/apache/rocketmq/controller/impl/manager/BrokerReplicaInfo.java @@ -47,8 +47,8 @@ public class BrokerReplicaInfo { this.brokerIdInfo.remove(brokerId); } - public long newBrokerId() { - return this.nextAssignBrokerId.getAndIncrement(); + public Long getNextAssignBrokerId() { + return nextAssignBrokerId.get(); } public String getClusterName() { @@ -61,6 +61,7 @@ public class BrokerReplicaInfo { public void addBroker(final Long brokerId, final String ipAddress, final String registerCheckCode) { this.brokerIdInfo.put(brokerId, new Pair<>(ipAddress, registerCheckCode)); + this.nextAssignBrokerId.incrementAndGet(); } public boolean isBrokerExist(final Long brokerId) { @@ -85,4 +86,11 @@ public class BrokerReplicaInfo { } return null; } + + public String getBrokerRegisterCheckCode(final Long brokerId) { + if (this.brokerIdInfo.containsKey(brokerId)) { + return this.brokerIdInfo.get(brokerId).getObject2(); + } + return null; + } } diff --git a/controller/src/main/java/org/apache/rocketmq/controller/impl/manager/ReplicasInfoManager.java b/controller/src/main/java/org/apache/rocketmq/controller/impl/manager/ReplicasInfoManager.java index a5ab46bf7..7eca573cf 100644 --- a/controller/src/main/java/org/apache/rocketmq/controller/impl/manager/ReplicasInfoManager.java +++ b/controller/src/main/java/org/apache/rocketmq/controller/impl/manager/ReplicasInfoManager.java @@ -39,6 +39,7 @@ import org.apache.rocketmq.controller.impl.event.ControllerResult; import org.apache.rocketmq.controller.impl.event.ElectMasterEvent; import org.apache.rocketmq.controller.impl.event.EventMessage; import org.apache.rocketmq.controller.impl.event.EventType; +import org.apache.rocketmq.controller.impl.event.UpdateBrokerAddressEvent; import org.apache.rocketmq.logging.org.slf4j.Logger; import org.apache.rocketmq.logging.org.slf4j.LoggerFactory; import org.apache.rocketmq.remoting.protocol.ResponseCode; @@ -52,8 +53,16 @@ import org.apache.rocketmq.remoting.protocol.header.controller.ElectMasterReques import org.apache.rocketmq.remoting.protocol.header.controller.ElectMasterResponseHeader; import org.apache.rocketmq.remoting.protocol.header.controller.GetReplicaInfoRequestHeader; import org.apache.rocketmq.remoting.protocol.header.controller.GetReplicaInfoResponseHeader; +import org.apache.rocketmq.remoting.protocol.header.controller.register.ApplyBrokerIdRequestHeader; +import org.apache.rocketmq.remoting.protocol.header.controller.register.ApplyBrokerIdResponseHeader; +import org.apache.rocketmq.remoting.protocol.header.controller.register.GetNextBrokerIdRequestHeader; +import org.apache.rocketmq.remoting.protocol.header.controller.register.GetNextBrokerIdResponseHeader; import org.apache.rocketmq.remoting.protocol.header.controller.register.RegisterBrokerToControllerRequestHeader; import org.apache.rocketmq.remoting.protocol.header.controller.register.RegisterBrokerToControllerResponseHeader; +import org.apache.rocketmq.remoting.protocol.header.controller.register.RegisterSuccessRequestHeader; +import org.apache.rocketmq.remoting.protocol.header.controller.register.RegisterSuccessResponseHeader; + +import javax.naming.ldap.Control; /** * The manager that manages the replicas info for all brokers. We can think of this class as the controller's memory @@ -245,46 +254,127 @@ public class ReplicasInfoManager { return null; } - public ControllerResult<RegisterBrokerToControllerResponseHeader> registerBroker( - final RegisterBrokerToControllerRequestHeader request, final BiPredicate<String, String> brokerAlivePredicate) { - String brokerAddress = request.getBrokerAddress(); - final String brokerName = request.getBrokerName(); +// public ControllerResult<RegisterBrokerToControllerResponseHeader> registerBroker( +// final RegisterBrokerToControllerRequestHeader request, final BrokerValidPredicate alivePredicate) { +// String brokerAddress = request.getBrokerAddress(); +// final String brokerName = request.getBrokerName(); +// final String clusterName = request.getClusterName(); +// final ControllerResult<RegisterBrokerToControllerResponseHeader> result = new ControllerResult<>(new RegisterBrokerToControllerResponseHeader()); +// final RegisterBrokerToControllerResponseHeader response = result.getResponse(); +// if (!isContainsBroker(brokerName)) { +// result.setCodeAndRemark(ResponseCode.CONTROLLER_BROKER_NEED_TO_BE_REGISTERED, "Broker-set hasn't been registered in controller"); +// return result; +// } +// final BrokerReplicaInfo brokerReplicaInfo = this.replicaInfoTable.get(brokerName); +// final SyncStateInfo syncStateInfo = this.syncStateSetInfoTable.get(brokerName); +// if (brokerReplicaInfo.isBrokerExist()) +// +// // If the broker's metadata does not exist in the state machine, we can assign the broker a brokerId valued 1 +// // By default, we set this variable to a value of 1 +// long brokerId = MixAll.FIRST_SLAVE_ID; +// boolean shouldApplyBrokerId = true; +// if (isContainsBroker(brokerName)) { +// final SyncStateInfo syncStateInfo = this.syncStateSetInfoTable.get(brokerName); +// final BrokerReplicaInfo brokerReplicaInfo = this.replicaInfoTable.get(brokerName); +// +// if (brokerReplicaInfo.isBrokerExist(brokerAddress)) { +// // this broker have registered +// brokerId = brokerReplicaInfo.getBrokerId(brokerAddress); +// shouldApplyBrokerId = false; +// } else { +// // If this broker replicas is first time come online, we need to apply a new id for this replicas. +// brokerId = brokerReplicaInfo.newBrokerId(); +// } +// +// if (syncStateInfo.isMasterExist() && brokerAlivePredicate.test(clusterName, syncStateInfo.getMasterAddress())) { +// // If the master is alive, just return master info. +// final String masterAddress = syncStateInfo.getMasterAddress(); +// response.setMasterAddress(masterAddress); +// response.setMasterEpoch(syncStateInfo.getMasterEpoch()); +// response.setSyncStateSetEpoch(syncStateInfo.getSyncStateSetEpoch()); +// } +// } +// +// response.setBrokerId(brokerId); +// if (response.getMasterAddress() == null) { +// response.setMasterAddress(""); +// } +// if (shouldApplyBrokerId) { +// final ApplyBrokerIdEvent applyIdEvent = new ApplyBrokerIdEvent(request.getClusterName(), brokerName, brokerAddress, brokerId); +// result.addEvent(applyIdEvent); +// } +// return result; +// } + + public ControllerResult<GetNextBrokerIdResponseHeader> getNextBrokerId(final GetNextBrokerIdRequestHeader request) { final String clusterName = request.getClusterName(); - final ControllerResult<RegisterBrokerToControllerResponseHeader> result = new ControllerResult<>(new RegisterBrokerToControllerResponseHeader()); - final RegisterBrokerToControllerResponseHeader response = result.getResponse(); - // If the broker's metadata does not exist in the state machine, we can assign the broker a brokerId valued 1 - // By default, we set this variable to a value of 1 - long brokerId = MixAll.FIRST_SLAVE_ID; - boolean shouldApplyBrokerId = true; - if (isContainsBroker(brokerName)) { - final SyncStateInfo syncStateInfo = this.syncStateSetInfoTable.get(brokerName); - final BrokerReplicaInfo brokerReplicaInfo = this.replicaInfoTable.get(brokerName); + final String brokerName = request.getBrokerName(); + BrokerReplicaInfo brokerReplicaInfo = this.replicaInfoTable.get(brokerName); + final ControllerResult<GetNextBrokerIdResponseHeader> result = new ControllerResult<>(new GetNextBrokerIdResponseHeader(clusterName, brokerName)); + final GetNextBrokerIdResponseHeader response = result.getResponse(); + if (brokerReplicaInfo == null) { + // means that none of brokers in this broker-set are registered + response.setNextBrokerId(MixAll.FIRST_SLAVE_ID); + } else { + response.setNextBrokerId(brokerReplicaInfo.getNextAssignBrokerId()); + } + return result; + } - if (brokerReplicaInfo.isBrokerExist(brokerAddress)) { - // this broker have registered - brokerId = brokerReplicaInfo.getBrokerId(brokerAddress); - shouldApplyBrokerId = false; + public ControllerResult<ApplyBrokerIdResponseHeader> applyBrokerId(final ApplyBrokerIdRequestHeader request) { + final String clusterName = request.getClusterName(); + final String brokerName = request.getBrokerName(); + final Long brokerId = request.getAppliedBrokerId(); + final String registerCheckCode = request.getRegisterCheckCode(); + final String brokerAddress = registerCheckCode.split(";")[0]; + BrokerReplicaInfo brokerReplicaInfo = this.replicaInfoTable.get(brokerName); + final ControllerResult<ApplyBrokerIdResponseHeader> result = new ControllerResult<>(new ApplyBrokerIdResponseHeader(clusterName, brokerName)); + final ApplyBrokerIdEvent event = new ApplyBrokerIdEvent(clusterName, brokerName, brokerAddress, brokerId, registerCheckCode); + // broker-set unregistered + if (brokerReplicaInfo == null) { + // first brokerId + if (brokerId == MixAll.FIRST_SLAVE_ID) { + result.addEvent(event); } else { - // If this broker replicas is first time come online, we need to apply a new id for this replicas. - brokerId = brokerReplicaInfo.newBrokerId(); - } - - if (syncStateInfo.isMasterExist() && brokerAlivePredicate.test(clusterName, syncStateInfo.getMasterAddress())) { - // If the master is alive, just return master info. - final String masterAddress = syncStateInfo.getMasterAddress(); - response.setMasterAddress(masterAddress); - response.setMasterEpoch(syncStateInfo.getMasterEpoch()); - response.setSyncStateSetEpoch(syncStateInfo.getSyncStateSetEpoch()); + result.setCodeAndRemark(ResponseCode.CONTROLLER_BROKER_ID_INVALID, String.format("Broker-set: %s hasn't been registered in controller, but broker try to apply brokerId: %d", brokerName, brokerId)); } + return result; + } + // broker-set registered + if (!brokerReplicaInfo.isBrokerExist(brokerId) || registerCheckCode.equals(brokerReplicaInfo.getBrokerRegisterCheckCode(brokerId))) { + // if brokerId hasn't been assigned or brokerId was assigned to this broker + return result; } + result.setCodeAndRemark(ResponseCode.CONTROLLER_BROKER_ID_INVALID, String.format("Fail to apply brokerId: %d in broker-set: %s", brokerId, brokerName)); + return result; + } - response.setBrokerId(brokerId); - if (response.getMasterAddress() == null) { - response.setMasterAddress(""); + public ControllerResult<RegisterSuccessResponseHeader> registerSuccess(final RegisterSuccessRequestHeader request, final BrokerValidPredicate alivePredicate) { + final String brokerAddress = request.getBrokerAddress(); + final String brokerName = request.getBrokerName(); + final String clusterName = request.getClusterName(); + final Long brokerId = request.getBrokerId(); + final ControllerResult<RegisterSuccessResponseHeader> result = new ControllerResult<>(new RegisterSuccessResponseHeader(clusterName, brokerName)); + final RegisterSuccessResponseHeader response = result.getResponse(); + if (!isContainsBroker(brokerName)) { + result.setCodeAndRemark(ResponseCode.CONTROLLER_BROKER_NEED_TO_BE_REGISTERED, String.format("Broker-set: %s hasn't been registered in controller", brokerName)); + return result; + } + final BrokerReplicaInfo brokerReplicaInfo = this.replicaInfoTable.get(brokerName); + final SyncStateInfo syncStateInfo = this.syncStateSetInfoTable.get(brokerName); + if (!brokerReplicaInfo.isBrokerExist(brokerId)) { + result.setCodeAndRemark(ResponseCode.CONTROLLER_BROKER_NEED_TO_BE_REGISTERED, String.format("BrokerId: %d hasn't been registered in broker-set: %s", brokerId, brokerName)); + return result; + } + if (syncStateInfo.isMasterExist() && alivePredicate.check(clusterName, brokerName, syncStateInfo.getMasterBrokerId())) { + // if master still exist + response.setMasterBrokerId(syncStateInfo.getMasterBrokerId()); + response.setMasterAddress(brokerReplicaInfo.getBrokerAddress(response.getMasterBrokerId())); } - if (shouldApplyBrokerId) { - final ApplyBrokerIdEvent applyIdEvent = new ApplyBrokerIdEvent(request.getClusterName(), brokerName, brokerAddress, brokerId); - result.addEvent(applyIdEvent); + // if this broker's address has been changed, we need to update it + if (!brokerAddress.equals(brokerReplicaInfo.getBrokerAddress(brokerId))) { + final UpdateBrokerAddressEvent event = new UpdateBrokerAddressEvent(clusterName, brokerName, brokerAddress, brokerId); + result.addEvent(event); } return result; } @@ -487,4 +577,5 @@ public class ReplicasInfoManager { private boolean isContainsBroker(final String brokerName) { return this.replicaInfoTable.containsKey(brokerName) && this.syncStateSetInfoTable.containsKey(brokerName); } + } diff --git a/controller/src/main/java/org/apache/rocketmq/controller/processor/ControllerRequestProcessor.java b/controller/src/main/java/org/apache/rocketmq/controller/processor/ControllerRequestProcessor.java index 6dea0e1d7..da1de6ef1 100644 --- a/controller/src/main/java/org/apache/rocketmq/controller/processor/ControllerRequestProcessor.java +++ b/controller/src/main/java/org/apache/rocketmq/controller/processor/ControllerRequestProcessor.java @@ -18,6 +18,7 @@ package org.apache.rocketmq.controller.processor; import io.netty.channel.ChannelHandlerContext; import java.io.UnsupportedEncodingException; +import java.sql.Time; import java.util.List; import java.util.Properties; import java.util.concurrent.CompletableFuture; @@ -36,6 +37,9 @@ import org.apache.rocketmq.remoting.protocol.RemotingSerializable; import org.apache.rocketmq.remoting.protocol.ResponseCode; import org.apache.rocketmq.remoting.protocol.body.RoleChangeNotifyEntry; import org.apache.rocketmq.remoting.protocol.body.SyncStateSet; +import org.apache.rocketmq.remoting.protocol.header.controller.register.ApplyBrokerIdRequestHeader; +import org.apache.rocketmq.remoting.protocol.header.controller.register.GetNextBrokerIdRequestHeader; +import org.apache.rocketmq.remoting.protocol.header.controller.register.RegisterSuccessRequestHeader; import org.apache.rocketmq.remoting.protocol.header.namesrv.BrokerHeartbeatRequestHeader; import org.apache.rocketmq.remoting.protocol.header.controller.AlterSyncStateSetRequestHeader; import org.apache.rocketmq.remoting.protocol.header.controller.admin.CleanControllerBrokerDataRequestHeader; @@ -206,6 +210,33 @@ public class ControllerRequestProcessor implements NettyRequestProcessor { return RemotingCommand.createResponseCommand(null); } + private RemotingCommand handleGetNextBrokerId(ChannelHandlerContext ctx, RemotingCommand request) throws Exception { + final GetNextBrokerIdRequestHeader requestHeader = (GetNextBrokerIdRequestHeader) request.decodeCommandCustomHeader(GetNextBrokerIdRequestHeader.class); + CompletableFuture<RemotingCommand> future = this.controllerManager.getController().getNextBrokerId(requestHeader); + if (future != null) { + return future.get(WAIT_TIMEOUT_OUT, TimeUnit.SECONDS); + } + return RemotingCommand.createResponseCommand(null); + } + + private RemotingCommand handleApplyBrokerId(ChannelHandlerContext ctx, RemotingCommand request) throws Exception { + final ApplyBrokerIdRequestHeader requestHeader = (ApplyBrokerIdRequestHeader) request.decodeCommandCustomHeader(ApplyBrokerIdRequestHeader.class); + CompletableFuture<RemotingCommand> future = this.controllerManager.getController().applyBrokerId(requestHeader); + if (future != null) { + return future.get(WAIT_TIMEOUT_OUT, TimeUnit.SECONDS); + } + return RemotingCommand.createResponseCommand(null); + } + + private RemotingCommand handleRegisterSuccess(ChannelHandlerContext ctx, RemotingCommand request) throws Exception { + RegisterSuccessRequestHeader requestHeader = (RegisterSuccessRequestHeader) request.decodeCommandCustomHeader(RegisterSuccessRequestHeader.class); + CompletableFuture<RemotingCommand> future = this.controllerManager.getController().registerSuccess(requestHeader); + if (future != null) { + return future.get(WAIT_TIMEOUT_OUT, TimeUnit.SECONDS); + } + return RemotingCommand.createResponseCommand(null); + } + private RemotingCommand handleUpdateControllerConfig(ChannelHandlerContext ctx, RemotingCommand request) { if (ctx != null) { log.info("updateConfig called by {}", RemotingHelper.parseChannelRemoteAddr(ctx.channel())); diff --git a/controller/src/test/java/org/apache/rocketmq/controller/impl/controller/impl/DLedgerControllerTest.java b/controller/src/test/java/org/apache/rocketmq/controller/impl/controller/impl/DLedgerControllerTest.java index 239094c29..9cfd1146e 100644 --- a/controller/src/test/java/org/apache/rocketmq/controller/impl/controller/impl/DLedgerControllerTest.java +++ b/controller/src/test/java/org/apache/rocketmq/controller/impl/controller/impl/DLedgerControllerTest.java @@ -73,7 +73,7 @@ public class DLedgerControllerTest { config.setMappedFileSize(10 * 1024 * 1024); config.setEnableElectUncleanMaster(isEnableElectUncleanMaster); - final DLedgerController controller = new DLedgerController(config, (str1, str2) -> true); + final DLedgerController controller = new DLedgerController(config, (str1, str2, str3) -> true); controller.startup(); return controller; @@ -198,7 +198,7 @@ public class DLedgerControllerTest { } public void setBrokerAlivePredicate(DLedgerController controller, String... deathBroker) { - controller.setBrokerAlivePredicate((clusterName, brokerAddress) -> { + controller.setBrokerAlivePredicate((clusterName, brokerName, brokerAddress) -> { for (String broker : deathBroker) { if (broker.equals(brokerAddress)) { return false; @@ -209,7 +209,7 @@ public class DLedgerControllerTest { } public void setBrokerElectPolicy(DLedgerController controller, String... deathBroker) { - controller.setElectPolicy(new DefaultElectPolicy((clusterName, brokerAddress) -> { + controller.setElectPolicy(new DefaultElectPolicy((clusterName, brokerName, brokerAddress) -> { for (String broker : deathBroker) { if (broker.equals(brokerAddress)) { return false; diff --git a/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/header/controller/register/ApplyBrokerIdRequestHeader.java b/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/header/controller/register/ApplyBrokerIdRequestHeader.java index 780f519a7..bfc103eb3 100644 --- a/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/header/controller/register/ApplyBrokerIdRequestHeader.java +++ b/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/header/controller/register/ApplyBrokerIdRequestHeader.java @@ -41,4 +41,20 @@ public class ApplyBrokerIdRequestHeader implements CommandCustomHeader { public void checkFields() throws RemotingCommandException { } + + public String getClusterName() { + return clusterName; + } + + public String getBrokerName() { + return brokerName; + } + + public Long getAppliedBrokerId() { + return appliedBrokerId; + } + + public String getRegisterCheckCode() { + return registerCheckCode; + } } diff --git a/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/header/controller/register/ApplyBrokerIdResponseHeader.java b/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/header/controller/register/ApplyBrokerIdResponseHeader.java index d83164747..1221c206d 100644 --- a/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/header/controller/register/ApplyBrokerIdResponseHeader.java +++ b/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/header/controller/register/ApplyBrokerIdResponseHeader.java @@ -27,7 +27,6 @@ public class ApplyBrokerIdResponseHeader implements CommandCustomHeader { private String brokerName; - public ApplyBrokerIdResponseHeader(String clusterName, String brokerName) { this.clusterName = clusterName; this.brokerName = brokerName; diff --git a/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/header/controller/register/GetNextBrokerIdRequestHeader.java b/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/header/controller/register/GetNextBrokerIdRequestHeader.java index eee82a8f4..90361ff74 100644 --- a/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/header/controller/register/GetNextBrokerIdRequestHeader.java +++ b/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/header/controller/register/GetNextBrokerIdRequestHeader.java @@ -43,4 +43,12 @@ public class GetNextBrokerIdRequestHeader implements CommandCustomHeader { public void checkFields() throws RemotingCommandException { } + + public String getClusterName() { + return clusterName; + } + + public String getBrokerName() { + return brokerName; + } } diff --git a/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/header/controller/register/GetNextBrokerIdResponseHeader.java b/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/header/controller/register/GetNextBrokerIdResponseHeader.java index ddec9b0ea..3fece1768 100644 --- a/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/header/controller/register/GetNextBrokerIdResponseHeader.java +++ b/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/header/controller/register/GetNextBrokerIdResponseHeader.java @@ -28,6 +28,10 @@ public class GetNextBrokerIdResponseHeader implements CommandCustomHeader { private Long nextBrokerId; + public GetNextBrokerIdResponseHeader(String clusterName, String brokerName) { + this(clusterName, brokerName, null); + } + public GetNextBrokerIdResponseHeader(String clusterName, String brokerName, Long nextBrokerId) { this.clusterName = clusterName; this.brokerName = brokerName; @@ -48,6 +52,10 @@ public class GetNextBrokerIdResponseHeader implements CommandCustomHeader { } + public void setNextBrokerId(Long nextBrokerId) { + this.nextBrokerId = nextBrokerId; + } + public Long getNextBrokerId() { return nextBrokerId; } diff --git a/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/header/controller/register/RegisterBrokerToControllerRequestHeader.java b/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/header/controller/register/RegisterBrokerToControllerRequestHeader.java index f67df18b6..f36b22170 100644 --- a/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/header/controller/register/RegisterBrokerToControllerRequestHeader.java +++ b/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/header/controller/register/RegisterBrokerToControllerRequestHeader.java @@ -24,6 +24,7 @@ public class RegisterBrokerToControllerRequestHeader implements CommandCustomHea private String clusterName; private String brokerName; private String brokerAddress; + private Long brokerId; @CFNullable private Integer epoch; @CFNullable @@ -38,20 +39,21 @@ public class RegisterBrokerToControllerRequestHeader implements CommandCustomHea public RegisterBrokerToControllerRequestHeader() { } - public RegisterBrokerToControllerRequestHeader(String clusterName, String brokerName, String brokerAddress) { - this(clusterName, brokerName, brokerAddress, 0); + public RegisterBrokerToControllerRequestHeader(String clusterName, String brokerName, String brokerAddress, Long brokerId) { + this(clusterName, brokerName, brokerAddress, brokerId, 0); } - public RegisterBrokerToControllerRequestHeader(String clusterName, String brokerName, String brokerAddress, + public RegisterBrokerToControllerRequestHeader(String clusterName, String brokerName, String brokerAddress, Long brokerId, int electionPriority) { - this(clusterName, brokerName, brokerAddress, null, 0, 0, electionPriority); + this(clusterName, brokerName, brokerAddress, brokerId, null, 0, 0, electionPriority); } public RegisterBrokerToControllerRequestHeader(String clusterName, String brokerName, String brokerAddress, - Long heartbeatTimeoutMillis, int epoch, long maxOffset, int electionPriority) { + Long brokerId, Long heartbeatTimeoutMillis, int epoch, long maxOffset, int electionPriority) { this.clusterName = clusterName; this.brokerName = brokerName; this.brokerAddress = brokerAddress; + this.brokerId = brokerId; this.heartbeatTimeoutMillis = heartbeatTimeoutMillis; this.epoch = epoch; this.maxOffset = maxOffset; diff --git a/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/header/controller/register/RegisterSuccessRequestHeader.java b/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/header/controller/register/RegisterSuccessRequestHeader.java index 721509ac9..db5808d6d 100644 --- a/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/header/controller/register/RegisterSuccessRequestHeader.java +++ b/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/header/controller/register/RegisterSuccessRequestHeader.java @@ -41,4 +41,20 @@ public class RegisterSuccessRequestHeader implements CommandCustomHeader { public void checkFields() throws RemotingCommandException { } + + public String getClusterName() { + return clusterName; + } + + public String getBrokerName() { + return brokerName; + } + + public Long getBrokerId() { + return brokerId; + } + + public String getBrokerAddress() { + return brokerAddress; + } } diff --git a/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/header/controller/register/RegisterSuccessResponseHeader.java b/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/header/controller/register/RegisterSuccessResponseHeader.java index 953190480..61e5d8ea1 100644 --- a/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/header/controller/register/RegisterSuccessResponseHeader.java +++ b/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/header/controller/register/RegisterSuccessResponseHeader.java @@ -22,9 +22,45 @@ import org.apache.rocketmq.remoting.exception.RemotingCommandException; public class RegisterSuccessResponseHeader implements CommandCustomHeader { + private String clusterName; + + private String brokerName; + + private Long masterBrokerId; + + private String masterAddress; + @Override public void checkFields() throws RemotingCommandException { } + public RegisterSuccessResponseHeader(String clusterName, String brokerName) { + this.clusterName = clusterName; + this.brokerName = brokerName; + } + + public void setMasterBrokerId(Long masterBrokerId) { + this.masterBrokerId = masterBrokerId; + } + + public void setMasterAddress(String masterAddress) { + this.masterAddress = masterAddress; + } + + public String getClusterName() { + return clusterName; + } + + public String getBrokerName() { + return brokerName; + } + + public Long getMasterBrokerId() { + return masterBrokerId; + } + + public String getMasterAddress() { + return masterAddress; + } }
