This is an automated email from the ASF dual-hosted git repository. jinrongtong pushed a commit to branch dledger-controller-brokerId in repository https://gitbox.apache.org/repos/asf/rocketmq.git
commit 5da292fba4f3c676964847a2bc653442685b472d Author: TheR1sing3un <[email protected]> AuthorDate: Sun Feb 5 22:18:18 2023 +0800 feat(controller): Improved logic and adaptation testing for persistent broker id versions 1. Improved logic and adaptation testing for persistent broker id versions --- .../broker/controller/ReplicasManager.java | 89 +++-- .../apache/rocketmq/broker/out/BrokerOuterAPI.java | 3 +- .../controller/BrokerHeartbeatManager.java | 3 +- .../rocketmq/controller/ControllerManager.java | 55 ++- .../impl/DefaultBrokerHeartbeatManager.java | 15 +- .../impl/manager/ReplicasInfoManager.java | 55 +-- .../processor/ControllerRequestProcessor.java | 24 -- .../impl/controller/ControllerManagerTest.java | 108 +++--- .../impl/controller/ControllerTestBase.java | 27 ++ .../controller/impl/DLedgerControllerTest.java | 194 ++++++----- .../impl/DefaultBrokerHeartbeatManagerTest.java | 2 +- .../impl/manager/ReplicasInfoManagerTest.java | 374 +++++++++++---------- .../remoting/protocol/body/BrokerReplicasInfo.java | 32 +- .../protocol/body/RoleChangeNotifyEntry.java | 11 +- .../NotifyBrokerRoleChangedRequestHeader.java | 37 +- .../controller/AlterSyncStateSetRequestHeader.java | 8 +- .../controller/ElectMasterResponseHeader.java | 12 +- .../controller/GetReplicaInfoRequestHeader.java | 18 +- .../controller/GetReplicaInfoResponseHeader.java | 6 +- .../CleanControllerBrokerDataRequestHeader.java | 4 +- .../register/ApplyBrokerIdRequestHeader.java | 20 ++ .../register/ApplyBrokerIdResponseHeader.java | 17 + .../register/GetNextBrokerIdRequestHeader.java | 12 + .../register/GetNextBrokerIdResponseHeader.java | 19 ++ .../register/RegisterSuccessRequestHeader.java | 19 ++ .../register/RegisterSuccessResponseHeader.java | 31 ++ 26 files changed, 679 insertions(+), 516 deletions(-) diff --git a/broker/src/main/java/org/apache/rocketmq/broker/controller/ReplicasManager.java b/broker/src/main/java/org/apache/rocketmq/broker/controller/ReplicasManager.java index b2b4a9163..7bbe43e1e 100644 --- a/broker/src/main/java/org/apache/rocketmq/broker/controller/ReplicasManager.java +++ b/broker/src/main/java/org/apache/rocketmq/broker/controller/ReplicasManager.java @@ -49,7 +49,6 @@ import org.apache.rocketmq.remoting.protocol.header.controller.GetMetaDataRespon import org.apache.rocketmq.remoting.protocol.header.controller.GetReplicaInfoResponseHeader; import org.apache.rocketmq.remoting.protocol.header.controller.register.ApplyBrokerIdResponseHeader; import org.apache.rocketmq.remoting.protocol.header.controller.register.GetNextBrokerIdResponseHeader; -import org.apache.rocketmq.remoting.protocol.header.controller.register.RegisterBrokerToControllerResponseHeader; import org.apache.rocketmq.remoting.protocol.header.controller.register.RegisterSuccessResponseHeader; import org.apache.rocketmq.store.config.BrokerRole; import org.apache.rocketmq.store.ha.autoswitch.AutoSwitchHAService; @@ -178,16 +177,21 @@ public class ReplicasManager { } if (this.state == State.FIRST_TIME_SYNC_CONTROLLER_METADATA_DONE) { - if (registerBrokerToController()) { - LOGGER.info("First time register broker success"); - this.state = State.FIRST_TIME_REGISTER_TO_CONTROLLER_DONE; - } else { + for (int retryTimes = 0; retryTimes < 5; retryTimes++) { + if (registerBrokerToController()) { + LOGGER.info("First time register broker success"); + this.state = State.FIRST_TIME_REGISTER_TO_CONTROLLER_DONE; + break; + } + } + // register 5 times but still unsuccessful + if (this.state == State.FIRST_TIME_SYNC_CONTROLLER_METADATA_DONE) { return false; } } if (this.state == State.FIRST_TIME_REGISTER_TO_CONTROLLER_DONE) { - if (StringUtils.isNotEmpty(this.masterAddress) || brokerElect()) { + if (this.masterBrokerId != null || brokerElect()) { LOGGER.info("Master in this broker set is elected"); this.state = State.RUNNING; } else { @@ -365,42 +369,42 @@ public class ReplicasManager { } } - private boolean registerBrokerToController() { - // Register this broker to controller to get a stable and credible broker id, and persist metadata to local file. - try { - final RegisterBrokerToControllerResponseHeader registerResponse = this.brokerOuterAPI.registerBrokerToController(this.controllerLeaderAddress, - this.brokerConfig.getBrokerClusterName(), this.brokerConfig.getBrokerName(), this.localAddress, this.brokerId, this.brokerConfig.getControllerHeartBeatTimeoutMills(), - this.haService.getLastEpoch(), this.brokerController.getMessageStore().getMaxPhyOffset(), this.brokerConfig.getBrokerElectionPriority()); - final String newMasterAddress = registerResponse.getMasterAddress(); - if (StringUtils.isNoneEmpty(newMasterAddress)) { - if (StringUtils.equals(newMasterAddress, this.localAddress)) { - changeToMaster(registerResponse.getMasterEpoch(), registerResponse.getSyncStateSetEpoch()); - } else { - changeToSlave(newMasterAddress, registerResponse.getMasterEpoch(), registerResponse.getBrokerId()); - } - // Set isolated to false, make broker can register to namesrv regularly - brokerController.setIsolated(false); - } else { - // if master address is empty, just apply the brokerId - if (registerResponse.getBrokerId() <= 0) { - // wrong broker id - LOGGER.error("Register to controller but receive a invalid broker id = {}", registerResponse.getBrokerId()); - return false; - } - this.brokerConfig.setBrokerId(registerResponse.getBrokerId()); - } - return true; - } catch (final Exception e) { - LOGGER.error("Failed to register broker to controller", e); - return false; - } - } +// private boolean registerBrokerToController() { +// // Register this broker to controller to get a stable and credible broker id, and persist metadata to local file. +// try { +// final RegisterBrokerToControllerResponseHeader registerResponse = this.brokerOuterAPI.registerBrokerToController(this.controllerLeaderAddress, +// this.brokerConfig.getBrokerClusterName(), this.brokerConfig.getBrokerName(), this.localAddress, this.brokerId, this.brokerConfig.getControllerHeartBeatTimeoutMills(), +// this.haService.getLastEpoch(), this.brokerController.getMessageStore().getMaxPhyOffset(), this.brokerConfig.getBrokerElectionPriority()); +// final String newMasterAddress = registerResponse.getMasterAddress(); +// if (StringUtils.isNoneEmpty(newMasterAddress)) { +// if (StringUtils.equals(newMasterAddress, this.localAddress)) { +// changeToMaster(registerResponse.getMasterEpoch(), registerResponse.getSyncStateSetEpoch()); +// } else { +// changeToSlave(newMasterAddress, registerResponse.getMasterEpoch(), registerResponse.getBrokerId()); +// } +// // Set isolated to false, make broker can register to namesrv regularly +// brokerController.setIsolated(false); +// } else { +// // if master address is empty, just apply the brokerId +// if (registerResponse.getBrokerId() <= 0) { +// // wrong broker id +// LOGGER.error("Register to controller but receive a invalid broker id = {}", registerResponse.getBrokerId()); +// return false; +// } +// this.brokerConfig.setBrokerId(registerResponse.getBrokerId()); +// } +// return true; +// } catch (final Exception e) { +// LOGGER.error("Failed to register broker to controller", e); +// return false; +// } +// } /** * Register broker to controller, and persist the metadata to file * @return whether registering process succeeded */ - private boolean registerBrokerToController2() { + private boolean registerBrokerToController() { try { // 1. confirm now registering state confirmNowRegisteringState(); @@ -514,6 +518,17 @@ public class ReplicasManager { private boolean registerSuccess() { try { RegisterSuccessResponseHeader response = this.brokerOuterAPI.registerSuccess(brokerConfig.getBrokerClusterName(), brokerConfig.getBrokerName(), brokerId, localAddress, controllerLeaderAddress); + final Long masterBrokerId = response.getMasterBrokerId(); + final String masterAddress = response.getMasterAddress(); + if (masterBrokerId == null) { + return true; + } + if (this.brokerId.equals(masterBrokerId)) { + changeToMaster(response.getMasterEpoch(), response.getSyncStateSetEpoch()); + } else { + changeToSlave(masterAddress, response.getMasterEpoch(), masterBrokerId); + } + brokerController.setIsolated(false); return true; } catch (Exception e) { LOGGER.error("fail to send registerSuccess request to controller", e); diff --git a/broker/src/main/java/org/apache/rocketmq/broker/out/BrokerOuterAPI.java b/broker/src/main/java/org/apache/rocketmq/broker/out/BrokerOuterAPI.java index 054f1edaa..5f8c670a8 100644 --- a/broker/src/main/java/org/apache/rocketmq/broker/out/BrokerOuterAPI.java +++ b/broker/src/main/java/org/apache/rocketmq/broker/out/BrokerOuterAPI.java @@ -136,7 +136,6 @@ import org.apache.rocketmq.store.timer.TimerCheckpoint; import org.apache.rocketmq.store.timer.TimerMetrics; import static org.apache.rocketmq.remoting.protocol.RemotingSysResponseCode.SUCCESS; -import static org.apache.rocketmq.remoting.protocol.ResponseCode.CONTROLLER_BROKER_ID_INVALID; import static org.apache.rocketmq.remoting.protocol.ResponseCode.CONTROLLER_BROKER_METADATA_NOT_EXIST; import static org.apache.rocketmq.remoting.protocol.ResponseCode.CONTROLLER_BROKER_NEED_TO_BE_REGISTERED; import static org.apache.rocketmq.remoting.protocol.ResponseCode.CONTROLLER_ELECT_MASTER_FAILED; @@ -1258,7 +1257,7 @@ public class BrokerOuterAPI { */ public Pair<GetReplicaInfoResponseHeader, SyncStateSet> getReplicaInfo(final String controllerAddress, final String brokerName, final String brokerAddress) throws Exception { - final GetReplicaInfoRequestHeader requestHeader = new GetReplicaInfoRequestHeader(brokerName, brokerAddress); + final GetReplicaInfoRequestHeader requestHeader = new GetReplicaInfoRequestHeader(brokerName); final RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.CONTROLLER_GET_REPLICA_INFO, requestHeader); final RemotingCommand response = this.remotingClient.invokeSync(controllerAddress, request, 3000); assert response != null; diff --git a/controller/src/main/java/org/apache/rocketmq/controller/BrokerHeartbeatManager.java b/controller/src/main/java/org/apache/rocketmq/controller/BrokerHeartbeatManager.java index 81e3cf31c..ed021bb88 100644 --- a/controller/src/main/java/org/apache/rocketmq/controller/BrokerHeartbeatManager.java +++ b/controller/src/main/java/org/apache/rocketmq/controller/BrokerHeartbeatManager.java @@ -61,7 +61,6 @@ public interface BrokerHeartbeatManager { /** * Trigger when broker inactive. */ - void onBrokerInactive(final String clusterName, final String brokerName, final String brokerAddress, - final long brokerId); + void onBrokerInactive(final String clusterName, final String brokerName, final Long brokerId); } } diff --git a/controller/src/main/java/org/apache/rocketmq/controller/ControllerManager.java b/controller/src/main/java/org/apache/rocketmq/controller/ControllerManager.java index 116607cc1..0f565ec81 100644 --- a/controller/src/main/java/org/apache/rocketmq/controller/ControllerManager.java +++ b/controller/src/main/java/org/apache/rocketmq/controller/ControllerManager.java @@ -27,7 +27,6 @@ import java.util.concurrent.TimeUnit; import org.apache.commons.lang3.StringUtils; import org.apache.rocketmq.common.ControllerConfig; -import org.apache.rocketmq.common.MixAll; import org.apache.rocketmq.common.ThreadFactoryImpl; import org.apache.rocketmq.common.constant.LoggerName; import org.apache.rocketmq.common.future.FutureTaskExt; @@ -116,18 +115,17 @@ public class ControllerManager { * * @param clusterName The cluster name of this inactive broker * @param brokerName The inactive broker name - * @param brokerAddress The inactive broker address(ip) * @param brokerId The inactive broker id */ - private void onBrokerInactive(String clusterName, String brokerName, String brokerAddress, long brokerId) { + private void onBrokerInactive(String clusterName, String brokerName, Long brokerId) { if (controller.isLeaderState()) { try { - final CompletableFuture<RemotingCommand> replicaInfoFuture = controller.getReplicaInfo(new GetReplicaInfoRequestHeader(brokerName, brokerAddress)); + final CompletableFuture<RemotingCommand> replicaInfoFuture = controller.getReplicaInfo(new GetReplicaInfoRequestHeader(brokerName)); final RemotingCommand replicaInfoResponse = replicaInfoFuture.get(5, TimeUnit.SECONDS); final GetReplicaInfoResponseHeader replicaInfoResponseHeader = (GetReplicaInfoResponseHeader) replicaInfoResponse.readCustomHeader(); // Not master broker offline - if (!replicaInfoResponseHeader.getMasterAddress().equals(brokerAddress)) { - log.warn("The {} broker with IP address {} shutdown", brokerName, brokerAddress); + if (!brokerId.equals(replicaInfoResponseHeader.getMasterBrokerId())) { + log.warn("The broker with brokerId: {} in broker-set: {} shutdown", brokerId, brokerName); return; } @@ -135,7 +133,7 @@ public class ControllerManager { final RemotingCommand electMasterResponse = electMasterFuture.get(5, TimeUnit.SECONDS); final ElectMasterResponseHeader responseHeader = (ElectMasterResponseHeader) electMasterResponse.readCustomHeader(); if (responseHeader != null) { - log.info("Broker {}'s master {} shutdown, elect a new master done, result:{}", brokerName, brokerAddress, responseHeader); + log.info("The broker with brokerId: {} in broker-set: {} shutdown, elect a new master done, result: {}", brokerId, brokerName, responseHeader); if (controllerConfig.isNotifyBrokerRoleChanged()) { notifyBrokerRoleChanged(RoleChangeNotifyEntry.convert(responseHeader)); } @@ -144,7 +142,7 @@ public class ControllerManager { log.error("", e); } } else { - log.info("The {} broker with IP address {} shutdown", brokerName, brokerAddress); + log.warn("The broker with brokerId: {} in broker-set: {} shutdown", brokerId, brokerName); } } @@ -154,38 +152,35 @@ public class ControllerManager { public void notifyBrokerRoleChanged(final RoleChangeNotifyEntry entry) { final BrokerMemberGroup memberGroup = entry.getBrokerMemberGroup(); if (memberGroup != null) { - final String master = entry.getMasterAddress(); - if (StringUtils.isEmpty(master)) { - log.warn("Notify broker role change failed, because member group is not null but the new master address is empty, entry:{}", entry); + final Long masterBrokerId = entry.getMasterBrokerId(); + String clusterName = memberGroup.getCluster(); + String brokerName = memberGroup.getBrokerName(); + if (masterBrokerId == null) { + log.warn("Notify broker role change failed, because member group is not null but the new master brokerId is empty, entry:{}", entry); return; } - // First, inform the master - if (this.heartbeatManager.isBrokerActive(memberGroup.getCluster(), master)) { - doNotifyBrokerRoleChanged(master, MixAll.MASTER_ID, entry); - } - - // Then, inform all slaves - final Map<Long, String> brokerIdAddrs = memberGroup.getBrokerAddrs(); - for (Map.Entry<Long, String> broker : brokerIdAddrs.entrySet()) { - if (!master.equals(broker.getValue()) && this.heartbeatManager.isBrokerActive(memberGroup.getCluster(), broker.getValue())) { - doNotifyBrokerRoleChanged(broker.getValue(), broker.getKey(), entry); - } - } - + // Inform all active brokers + final Map<Long, String> brokerAddrs = memberGroup.getBrokerAddrs(); + brokerAddrs.entrySet().stream().filter(x -> this.heartbeatManager.isBrokerActive(clusterName, brokerName, x.getKey())) + .forEach(x -> doNotifyBrokerRoleChanged(x.getValue(), entry)); } } - public void doNotifyBrokerRoleChanged(final String brokerAddr, final Long brokerId, - final RoleChangeNotifyEntry entry) { + /** + * Notify broker that there are roles-changing in controller + * @param brokerAddr target broker's address to notify + * @param entry role change entry + */ + public void doNotifyBrokerRoleChanged(final String brokerAddr, final RoleChangeNotifyEntry entry) { if (StringUtils.isNoneEmpty(brokerAddr)) { - log.info("Try notify broker {} with id {} that role changed, RoleChangeNotifyEntry:{}", brokerAddr, brokerId, entry); - final NotifyBrokerRoleChangedRequestHeader requestHeader = new NotifyBrokerRoleChangedRequestHeader(entry.getMasterAddress(), - entry.getMasterEpoch(), entry.getSyncStateSetEpoch(), brokerId); + log.info("Try notify broker {} that role changed, RoleChangeNotifyEntry:{}", brokerAddr, entry); + final NotifyBrokerRoleChangedRequestHeader requestHeader = new NotifyBrokerRoleChangedRequestHeader(entry.getMasterAddress(), entry.getMasterBrokerId(), + entry.getMasterEpoch(), entry.getSyncStateSetEpoch()); final RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.NOTIFY_BROKER_ROLE_CHANGED, requestHeader); try { this.remotingClient.invokeOneway(brokerAddr, request, 3000); } catch (final Exception e) { - log.error("Failed to notify broker {} with id {} that role changed", brokerAddr, brokerId, e); + log.error("Failed to notify broker {} that role changed", brokerAddr, e); } } } diff --git a/controller/src/main/java/org/apache/rocketmq/controller/impl/DefaultBrokerHeartbeatManager.java b/controller/src/main/java/org/apache/rocketmq/controller/impl/DefaultBrokerHeartbeatManager.java index 3045da85e..39edce507 100644 --- a/controller/src/main/java/org/apache/rocketmq/controller/impl/DefaultBrokerHeartbeatManager.java +++ b/controller/src/main/java/org/apache/rocketmq/controller/impl/DefaultBrokerHeartbeatManager.java @@ -72,14 +72,14 @@ public class DefaultBrokerHeartbeatManager implements BrokerHeartbeatManager { final Map.Entry<BrokerAddrInfo, BrokerLiveInfo> next = iterator.next(); long last = next.getValue().getLastUpdateTimestamp(); long timeoutMillis = next.getValue().getHeartbeatTimeoutMillis(); - if ((last + timeoutMillis) < System.currentTimeMillis()) { + if (System.currentTimeMillis() - last > timeoutMillis) { final Channel channel = next.getValue().getChannel(); iterator.remove(); if (channel != null) { RemotingHelper.closeChannel(channel); } this.executor.submit(() -> - notifyBrokerInActive(next.getKey().getClusterName(), next.getValue().getBrokerName(), next.getKey().getBrokerAddr(), next.getValue().getBrokerId())); + notifyBrokerInActive(next.getKey().getClusterName(), next.getValue().getBrokerName(), next.getValue().getBrokerId())); log.warn("The broker channel {} expired, brokerInfo {}, expired {}ms", next.getValue().getChannel(), next.getKey(), timeoutMillis); } } @@ -88,9 +88,9 @@ public class DefaultBrokerHeartbeatManager implements BrokerHeartbeatManager { } } - private void notifyBrokerInActive(String clusterName, String brokerName, String brokerAddr, Long brokerId) { + private void notifyBrokerInActive(String clusterName, String brokerName, Long brokerId) { for (BrokerLifecycleListener listener : this.brokerLifecycleListeners) { - listener.onBrokerInactive(clusterName, brokerName, brokerAddr, brokerId); + listener.onBrokerInactive(clusterName, brokerName, brokerId); } } @@ -126,9 +126,6 @@ public class DefaultBrokerHeartbeatManager implements BrokerHeartbeatManager { prev.setLastUpdateTimestamp(System.currentTimeMillis()); prev.setHeartbeatTimeoutMillis(realTimeoutMillis); prev.setElectionPriority(realElectionPriority); - prev.setBrokerId(realBrokerId); - prev.setBrokerAddr(brokerAddr); - prev.setChannel(channel); if (realEpoch > prev.getEpoch() || realEpoch == prev.getEpoch() && realMaxOffset > prev.getMaxOffset()) { prev.setEpoch(realEpoch); prev.setMaxOffset(realMaxOffset); @@ -143,10 +140,10 @@ public class DefaultBrokerHeartbeatManager implements BrokerHeartbeatManager { BrokerAddrInfo addrInfo = null; for (Map.Entry<BrokerAddrInfo, BrokerLiveInfo> entry : this.brokerLiveTable.entrySet()) { if (entry.getValue().getChannel() == channel) { - log.info("Channel {} inactive, broker {}, addr:{}, id:{}", entry.getValue().getChannel(), entry.getValue().getBrokerName(), entry.getKey().getBrokerAddr(), entry.getValue().getBrokerId()); + log.info("Channel {} inactive, broker {}, addr:{}, id:{}", entry.getValue().getChannel(), entry.getValue().getBrokerName(), entry.getValue().getBrokerAddr(), entry.getValue().getBrokerId()); addrInfo = entry.getKey(); this.executor.submit(() -> - notifyBrokerInActive(entry.getKey().getClusterName(), entry.getValue().getBrokerName(), entry.getKey().getBrokerAddr(), entry.getValue().getBrokerId())); + notifyBrokerInActive(entry.getKey().getClusterName(), entry.getValue().getBrokerName(), entry.getValue().getBrokerId())); break; } } diff --git a/controller/src/main/java/org/apache/rocketmq/controller/impl/manager/ReplicasInfoManager.java b/controller/src/main/java/org/apache/rocketmq/controller/impl/manager/ReplicasInfoManager.java index 1400932e0..d2061bb24 100644 --- a/controller/src/main/java/org/apache/rocketmq/controller/impl/manager/ReplicasInfoManager.java +++ b/controller/src/main/java/org/apache/rocketmq/controller/impl/manager/ReplicasInfoManager.java @@ -168,7 +168,6 @@ public class ReplicasInfoManager { final ElectMasterResponseHeader response = result.getResponse(); if (!isContainsBroker(brokerName)) { // this broker set hasn't been registered - response.setMasterAddress(""); result.setCodeAndRemark(ResponseCode.CONTROLLER_BROKER_NEED_TO_BE_REGISTERED, "Broker hasn't been registered"); return result; } @@ -233,7 +232,6 @@ public class ReplicasInfoManager { } else { result.setCodeAndRemark(ResponseCode.CONTROLLER_ELECT_MASTER_FAILED, "Failed to elect a new master"); } - response.setMasterAddress(""); return result; } @@ -250,58 +248,6 @@ public class ReplicasInfoManager { return null; } -// public ControllerResult<RegisterBrokerToControllerResponseHeader> registerBroker( -// final RegisterBrokerToControllerRequestHeader request, final BrokerValidPredicate alivePredicate) { -// String brokerAddress = request.getBrokerAddress(); -// final String brokerName = request.getBrokerName(); -// final String clusterName = request.getClusterName(); -// final ControllerResult<RegisterBrokerToControllerResponseHeader> result = new ControllerResult<>(new RegisterBrokerToControllerResponseHeader()); -// final RegisterBrokerToControllerResponseHeader response = result.getResponse(); -// if (!isContainsBroker(brokerName)) { -// result.setCodeAndRemark(ResponseCode.CONTROLLER_BROKER_NEED_TO_BE_REGISTERED, "Broker-set hasn't been registered in controller"); -// return result; -// } -// final BrokerReplicaInfo brokerReplicaInfo = this.replicaInfoTable.get(brokerName); -// final SyncStateInfo syncStateInfo = this.syncStateSetInfoTable.get(brokerName); -// if (brokerReplicaInfo.isBrokerExist()) -// -// // If the broker's metadata does not exist in the state machine, we can assign the broker a brokerId valued 1 -// // By default, we set this variable to a value of 1 -// long brokerId = MixAll.FIRST_SLAVE_ID; -// boolean shouldApplyBrokerId = true; -// if (isContainsBroker(brokerName)) { -// final SyncStateInfo syncStateInfo = this.syncStateSetInfoTable.get(brokerName); -// final BrokerReplicaInfo brokerReplicaInfo = this.replicaInfoTable.get(brokerName); -// -// if (brokerReplicaInfo.isBrokerExist(brokerAddress)) { -// // this broker have registered -// brokerId = brokerReplicaInfo.getBrokerId(brokerAddress); -// shouldApplyBrokerId = false; -// } else { -// // If this broker replicas is first time come online, we need to apply a new id for this replicas. -// brokerId = brokerReplicaInfo.newBrokerId(); -// } -// -// if (syncStateInfo.isMasterExist() && brokerAlivePredicate.test(clusterName, syncStateInfo.getMasterAddress())) { -// // If the master is alive, just return master info. -// final String masterAddress = syncStateInfo.getMasterAddress(); -// response.setMasterAddress(masterAddress); -// response.setMasterEpoch(syncStateInfo.getMasterEpoch()); -// response.setSyncStateSetEpoch(syncStateInfo.getSyncStateSetEpoch()); -// } -// } -// -// response.setBrokerId(brokerId); -// if (response.getMasterAddress() == null) { -// response.setMasterAddress(""); -// } -// if (shouldApplyBrokerId) { -// final ApplyBrokerIdEvent applyIdEvent = new ApplyBrokerIdEvent(request.getClusterName(), brokerName, brokerAddress, brokerId); -// result.addEvent(applyIdEvent); -// } -// return result; -// } - public ControllerResult<GetNextBrokerIdResponseHeader> getNextBrokerId(final GetNextBrokerIdRequestHeader request) { final String clusterName = request.getClusterName(); final String brokerName = request.getBrokerName(); @@ -339,6 +285,7 @@ public class ReplicasInfoManager { // broker-set registered if (!brokerReplicaInfo.isBrokerExist(brokerId) || registerCheckCode.equals(brokerReplicaInfo.getBrokerRegisterCheckCode(brokerId))) { // if brokerId hasn't been assigned or brokerId was assigned to this broker + result.addEvent(event); return result; } result.setCodeAndRemark(ResponseCode.CONTROLLER_BROKER_ID_INVALID, String.format("Fail to apply brokerId: %d in broker-set: %s", brokerId, brokerName)); diff --git a/controller/src/main/java/org/apache/rocketmq/controller/processor/ControllerRequestProcessor.java b/controller/src/main/java/org/apache/rocketmq/controller/processor/ControllerRequestProcessor.java index da1de6ef1..9e300b738 100644 --- a/controller/src/main/java/org/apache/rocketmq/controller/processor/ControllerRequestProcessor.java +++ b/controller/src/main/java/org/apache/rocketmq/controller/processor/ControllerRequestProcessor.java @@ -18,7 +18,6 @@ package org.apache.rocketmq.controller.processor; import io.netty.channel.ChannelHandlerContext; import java.io.UnsupportedEncodingException; -import java.sql.Time; import java.util.List; import java.util.Properties; import java.util.concurrent.CompletableFuture; @@ -46,8 +45,6 @@ import org.apache.rocketmq.remoting.protocol.header.controller.admin.CleanContro import org.apache.rocketmq.remoting.protocol.header.controller.ElectMasterRequestHeader; import org.apache.rocketmq.remoting.protocol.header.controller.ElectMasterResponseHeader; import org.apache.rocketmq.remoting.protocol.header.controller.GetReplicaInfoRequestHeader; -import org.apache.rocketmq.remoting.protocol.header.controller.register.RegisterBrokerToControllerRequestHeader; -import org.apache.rocketmq.remoting.protocol.header.controller.register.RegisterBrokerToControllerResponseHeader; import static org.apache.rocketmq.remoting.protocol.RequestCode.APPLY_BROKER_ID; import static org.apache.rocketmq.remoting.protocol.RequestCode.BROKER_HEARTBEAT; @@ -57,7 +54,6 @@ import static org.apache.rocketmq.remoting.protocol.RequestCode.CONTROLLER_ELECT import static org.apache.rocketmq.remoting.protocol.RequestCode.CONTROLLER_GET_METADATA_INFO; import static org.apache.rocketmq.remoting.protocol.RequestCode.CONTROLLER_GET_REPLICA_INFO; import static org.apache.rocketmq.remoting.protocol.RequestCode.CONTROLLER_GET_SYNC_STATE_DATA; -import static org.apache.rocketmq.remoting.protocol.RequestCode.CONTROLLER_REGISTER_BROKER; import static org.apache.rocketmq.remoting.protocol.RequestCode.GET_CONTROLLER_CONFIG; import static org.apache.rocketmq.remoting.protocol.RequestCode.GET_NEXT_BROKER_ID; import static org.apache.rocketmq.remoting.protocol.RequestCode.REGISTER_SUCCESS; @@ -90,8 +86,6 @@ public class ControllerRequestProcessor implements NettyRequestProcessor { return this.handleAlterSyncStateSet(ctx, request); case CONTROLLER_ELECT_MASTER: return this.handleControllerElectMaster(ctx, request); - case CONTROLLER_REGISTER_BROKER: - return this.handleControllerRegisterBroker(ctx, request); case CONTROLLER_GET_REPLICA_INFO: return this.handleControllerGetReplicaInfo(ctx, request); case CONTROLLER_GET_METADATA_INFO: @@ -148,24 +142,6 @@ public class ControllerRequestProcessor implements NettyRequestProcessor { return RemotingCommand.createResponseCommand(null); } - - private RemotingCommand handleControllerRegisterBroker(ChannelHandlerContext ctx, - RemotingCommand request) throws Exception { - final RegisterBrokerToControllerRequestHeader controllerRequest = (RegisterBrokerToControllerRequestHeader) request.decodeCommandCustomHeader(RegisterBrokerToControllerRequestHeader.class); - final CompletableFuture<RemotingCommand> future = this.controllerManager.getController().registerBroker(controllerRequest); - if (future != null) { - final RemotingCommand response = future.get(WAIT_TIMEOUT_OUT, TimeUnit.SECONDS); - final RegisterBrokerToControllerResponseHeader responseHeader = (RegisterBrokerToControllerResponseHeader) response.readCustomHeader(); - if (responseHeader != null && responseHeader.getBrokerId() >= 0) { - this.heartbeatManager.onBrokerHeartbeat(controllerRequest.getClusterName(), controllerRequest.getBrokerName(), controllerRequest.getBrokerAddress(), - responseHeader.getBrokerId(), controllerRequest.getHeartbeatTimeoutMillis(), ctx.channel(), - controllerRequest.getEpoch(), controllerRequest.getMaxOffset(), controllerRequest.getConfirmOffset(), controllerRequest.getElectionPriority()); - } - return response; - } - return RemotingCommand.createResponseCommand(null); - } - private RemotingCommand handleControllerGetReplicaInfo(ChannelHandlerContext ctx, RemotingCommand request) throws Exception { final GetReplicaInfoRequestHeader controllerRequest = (GetReplicaInfoRequestHeader) request.decodeCommandCustomHeader(GetReplicaInfoRequestHeader.class); diff --git a/controller/src/test/java/org/apache/rocketmq/controller/impl/controller/ControllerManagerTest.java b/controller/src/test/java/org/apache/rocketmq/controller/impl/controller/ControllerManagerTest.java index 6a54a15fc..49c56f06b 100644 --- a/controller/src/test/java/org/apache/rocketmq/controller/impl/controller/ControllerManagerTest.java +++ b/controller/src/test/java/org/apache/rocketmq/controller/impl/controller/ControllerManagerTest.java @@ -25,9 +25,7 @@ import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; import org.apache.commons.lang3.StringUtils; -import org.apache.rocketmq.client.exception.MQBrokerException; import org.apache.rocketmq.common.ControllerConfig; -import org.apache.rocketmq.common.MixAll; import org.apache.rocketmq.controller.ControllerManager; import org.apache.rocketmq.controller.impl.DLedgerController; import org.apache.rocketmq.remoting.RemotingClient; @@ -36,21 +34,23 @@ import org.apache.rocketmq.remoting.netty.NettyRemotingClient; import org.apache.rocketmq.remoting.netty.NettyServerConfig; import org.apache.rocketmq.remoting.protocol.RemotingCommand; import org.apache.rocketmq.remoting.protocol.RequestCode; +import org.apache.rocketmq.remoting.protocol.ResponseCode; import org.apache.rocketmq.remoting.protocol.header.controller.ElectMasterRequestHeader; +import org.apache.rocketmq.remoting.protocol.header.controller.register.ApplyBrokerIdRequestHeader; +import org.apache.rocketmq.remoting.protocol.header.controller.register.ApplyBrokerIdResponseHeader; +import org.apache.rocketmq.remoting.protocol.header.controller.register.GetNextBrokerIdRequestHeader; +import org.apache.rocketmq.remoting.protocol.header.controller.register.GetNextBrokerIdResponseHeader; +import org.apache.rocketmq.remoting.protocol.header.controller.register.RegisterSuccessRequestHeader; +import org.apache.rocketmq.remoting.protocol.header.controller.register.RegisterSuccessResponseHeader; import org.apache.rocketmq.remoting.protocol.header.namesrv.BrokerHeartbeatRequestHeader; import org.apache.rocketmq.remoting.protocol.header.controller.GetReplicaInfoRequestHeader; import org.apache.rocketmq.remoting.protocol.header.controller.GetReplicaInfoResponseHeader; -import org.apache.rocketmq.remoting.protocol.header.controller.register.RegisterBrokerToControllerRequestHeader; -import org.apache.rocketmq.remoting.protocol.header.controller.register.RegisterBrokerToControllerResponseHeader; import org.apache.rocketmq.remoting.protocol.header.controller.ElectMasterResponseHeader; import org.junit.After; import org.junit.Before; import org.junit.Test; -import static org.apache.rocketmq.remoting.protocol.RemotingSysResponseCode.SUCCESS; -import static org.apache.rocketmq.remoting.protocol.ResponseCode.CONTROLLER_MASTER_STILL_EXIST; -import static org.apache.rocketmq.remoting.protocol.ResponseCode.CONTROLLER_NOT_LEADER; import static org.awaitility.Awaitility.await; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertNotNull; @@ -114,6 +114,7 @@ public class ControllerManagerTest { } return null; }, item -> item != null); + System.out.println("leader born!!!!!"); return manager; } @@ -128,35 +129,56 @@ public class ControllerManagerTest { /** * Register broker to controller */ - public RegisterBrokerToControllerResponseHeader registerBroker( + public void registerBroker( final String controllerAddress, final String clusterName, - final String brokerName, final String address, final RemotingClient client, - final long heartbeatTimeoutMillis) throws Exception { + final String brokerName, final Long brokerId, final String brokerAddress, final Long expectMasterBrokerId, final RemotingClient client) throws Exception { + // Get next brokerId; + final GetNextBrokerIdRequestHeader getNextBrokerIdRequestHeader = new GetNextBrokerIdRequestHeader(clusterName, brokerName); + final RemotingCommand getNextBrokerIdRequest = RemotingCommand.createRequestCommand(RequestCode.GET_NEXT_BROKER_ID, getNextBrokerIdRequestHeader); + final RemotingCommand getNextBrokerIdResponse = client.invokeSync(controllerAddress, getNextBrokerIdRequest, 3000); + final GetNextBrokerIdResponseHeader getNextBrokerIdResponseHeader = (GetNextBrokerIdResponseHeader) getNextBrokerIdResponse.decodeCommandCustomHeader(GetNextBrokerIdResponseHeader.class); + String registerCheckCode = brokerAddress + ";" + System.currentTimeMillis(); + assertEquals(ResponseCode.SUCCESS, getNextBrokerIdResponse.getCode()); + assertEquals(brokerId, getNextBrokerIdResponseHeader.getNextBrokerId()); - final RegisterBrokerToControllerRequestHeader requestHeader = new RegisterBrokerToControllerRequestHeader(clusterName, brokerName, address, heartbeatTimeoutMillis, 1, 1000L, 0); - final RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.CONTROLLER_REGISTER_BROKER, requestHeader); - final RemotingCommand response = client.invokeSync(controllerAddress, request, 3000); - assertNotNull(response); - switch (response.getCode()) { - case SUCCESS: { - return (RegisterBrokerToControllerResponseHeader) response.decodeCommandCustomHeader(RegisterBrokerToControllerResponseHeader.class); - } - case CONTROLLER_NOT_LEADER: { - throw new MQBrokerException(response.getCode(), "Controller leader was changed"); - } - } - throw new MQBrokerException(response.getCode(), response.getRemark()); + // Apply brokerId + final ApplyBrokerIdRequestHeader applyBrokerIdRequestHeader = new ApplyBrokerIdRequestHeader(clusterName, brokerName, brokerId, registerCheckCode); + final RemotingCommand applyBrokerIdRequest = RemotingCommand.createRequestCommand(RequestCode.APPLY_BROKER_ID, applyBrokerIdRequestHeader); + final RemotingCommand applyBrokerIdResponse = client.invokeSync(controllerAddress, applyBrokerIdRequest, 3000); + final ApplyBrokerIdResponseHeader applyBrokerIdResponseHeader = (ApplyBrokerIdResponseHeader) applyBrokerIdResponse.decodeCommandCustomHeader(ApplyBrokerIdResponseHeader.class); + assertEquals(ResponseCode.SUCCESS, applyBrokerIdResponse.getCode()); + + // Register success + final RegisterSuccessRequestHeader registerSuccessRequestHeader = new RegisterSuccessRequestHeader(clusterName, brokerName, brokerId, brokerAddress); + final RemotingCommand registerSuccessRequest = RemotingCommand.createRequestCommand(RequestCode.REGISTER_SUCCESS, registerSuccessRequestHeader); + final RemotingCommand registerSuccessResponse = client.invokeSync(controllerAddress, registerSuccessRequest, 3000); + final RegisterSuccessResponseHeader registerSuccessResponseHeader = (RegisterSuccessResponseHeader) registerSuccessResponse.decodeCommandCustomHeader(RegisterSuccessResponseHeader.class); + assertEquals(ResponseCode.SUCCESS, registerSuccessResponse.getCode()); + assertEquals(expectMasterBrokerId, registerSuccessResponseHeader.getMasterBrokerId()); } public RemotingCommand brokerTryElect(final String controllerAddress, final String clusterName, - final String brokerName, final String brokerAddress, final RemotingClient client) throws Exception { - final ElectMasterRequestHeader requestHeader = ElectMasterRequestHeader.ofBrokerTrigger(clusterName, brokerName, brokerAddress); + final String brokerName, final Long brokerId, final RemotingClient client) throws Exception { + final ElectMasterRequestHeader requestHeader = ElectMasterRequestHeader.ofBrokerTrigger(clusterName, brokerName, brokerId); final RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.CONTROLLER_ELECT_MASTER, requestHeader); RemotingCommand response = client.invokeSync(controllerAddress, request, 3000); assertNotNull(response); return response; } + public void sendHeartbeat(final String controllerAddress, final String clusterName, final String brokerName, final Long brokerId, + final String brokerAddress, final Long timeout, final RemotingClient client) throws Exception { + final BrokerHeartbeatRequestHeader heartbeatRequestHeader0 = new BrokerHeartbeatRequestHeader(); + heartbeatRequestHeader0.setBrokerId(brokerId); + heartbeatRequestHeader0.setClusterName(clusterName); + heartbeatRequestHeader0.setBrokerName(brokerName); + heartbeatRequestHeader0.setBrokerAddr(brokerAddress); + heartbeatRequestHeader0.setHeartbeatTimeoutMills(timeout); + final RemotingCommand heartbeatRequest = RemotingCommand.createRequestCommand(RequestCode.BROKER_HEARTBEAT, heartbeatRequestHeader0); + RemotingCommand remotingCommand = client.invokeSync(controllerAddress, heartbeatRequest, 3000); + assertEquals(ResponseCode.SUCCESS, remotingCommand.getCode()); + } + @Test public void testSomeApi() throws Exception { mockData(); @@ -164,37 +186,41 @@ public class ControllerManagerTest { String leaderAddr = "localhost" + ":" + leader.getController().getRemotingServer().localListenPort(); // Register two broker - final RegisterBrokerToControllerResponseHeader responseHeader1 = registerBroker(leaderAddr, "cluster1", "broker1", "127.0.0.1:8000", this.remotingClient, 1000L); - assert responseHeader1 != null; - assertEquals(responseHeader1.getBrokerId(), MixAll.FIRST_SLAVE_ID); + registerBroker(leaderAddr, "cluster1", "broker1", 1L, "127.0.0.1:8000", null, this.remotingClient); + + registerBroker(leaderAddr, "cluster1", "broker1", 2L, "127.0.0.1:8001", null, this.remotingClient1); - final RegisterBrokerToControllerResponseHeader responseHeader2 = registerBroker(leaderAddr, "cluster1", "broker1", "127.0.0.1:8001", this.remotingClient1, 4000L); - assert responseHeader2 != null; - assertEquals(responseHeader2.getBrokerId(), MixAll.FIRST_SLAVE_ID + 1); + // Send heartbeat + sendHeartbeat(leaderAddr, "cluster1", "broker1", 1L, "127.0.0.1:8000", 3000L, remotingClient); + sendHeartbeat(leaderAddr, "cluster1", "broker1", 2L, "127.0.0.1:8001", 3000L, remotingClient1); // Two all try elect itself as master, but only the first can be the master - RemotingCommand tryElectCommand1 = brokerTryElect(leaderAddr, "cluster1", "broker1", "127.0.0.1:8000", this.remotingClient); + RemotingCommand tryElectCommand1 = brokerTryElect(leaderAddr, "cluster1", "broker1", 1L, this.remotingClient); ElectMasterResponseHeader brokerTryElectResponseHeader1 = (ElectMasterResponseHeader) tryElectCommand1.decodeCommandCustomHeader(ElectMasterResponseHeader.class); - RemotingCommand tryElectCommand2 = brokerTryElect(leaderAddr, "cluster1", "broker1", "127.0.0.1:8001", this.remotingClient1); + RemotingCommand tryElectCommand2 = brokerTryElect(leaderAddr, "cluster1", "broker1", 2L, this.remotingClient1); ElectMasterResponseHeader brokerTryElectResponseHeader2 = (ElectMasterResponseHeader) tryElectCommand2.decodeCommandCustomHeader(ElectMasterResponseHeader.class); - assertEquals(SUCCESS, tryElectCommand1.getCode()); + assertEquals(ResponseCode.SUCCESS, tryElectCommand1.getCode()); + assertEquals(1L, brokerTryElectResponseHeader1.getMasterBrokerId().longValue()); assertEquals("127.0.0.1:8000", brokerTryElectResponseHeader1.getMasterAddress()); - assertEquals(1L, brokerTryElectResponseHeader1.getMasterEpoch()); - assertEquals(1L, brokerTryElectResponseHeader1.getSyncStateSetEpoch()); + assertEquals(1, brokerTryElectResponseHeader1.getMasterEpoch().intValue()); + assertEquals(1, brokerTryElectResponseHeader1.getSyncStateSetEpoch().intValue()); - assertEquals(CONTROLLER_MASTER_STILL_EXIST, tryElectCommand2.getCode()); + assertEquals(ResponseCode.CONTROLLER_MASTER_STILL_EXIST, tryElectCommand2.getCode()); + assertEquals(1L, brokerTryElectResponseHeader2.getMasterBrokerId().longValue()); assertEquals("127.0.0.1:8000", brokerTryElectResponseHeader2.getMasterAddress()); - assertEquals(1L, brokerTryElectResponseHeader2.getMasterEpoch()); - assertEquals(1L, brokerTryElectResponseHeader2.getSyncStateSetEpoch()); + assertEquals(1, brokerTryElectResponseHeader2.getMasterEpoch().intValue()); + assertEquals(1, brokerTryElectResponseHeader2.getSyncStateSetEpoch().intValue()); - // Send heartbeat for broker2 + // Send heartbeat for broker2 every one second ScheduledExecutorService executor = Executors.newSingleThreadScheduledExecutor(); executor.scheduleAtFixedRate(() -> { final BrokerHeartbeatRequestHeader heartbeatRequestHeader = new BrokerHeartbeatRequestHeader(); heartbeatRequestHeader.setClusterName("cluster1"); heartbeatRequestHeader.setBrokerName("broker1"); heartbeatRequestHeader.setBrokerAddr("127.0.0.1:8001"); + heartbeatRequestHeader.setBrokerId(2L); + heartbeatRequestHeader.setHeartbeatTimeoutMills(3000L); final RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.BROKER_HEARTBEAT, heartbeatRequestHeader); try { final RemotingCommand remotingCommand = this.remotingClient1.invokeSync(leaderAddr, request, 3000); @@ -207,7 +233,7 @@ public class ControllerManagerTest { final RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.CONTROLLER_GET_REPLICA_INFO, requestHeader); final RemotingCommand response = this.remotingClient1.invokeSync(leaderAddr, request, 3000); final GetReplicaInfoResponseHeader responseHeader = (GetReplicaInfoResponseHeader) response.decodeCommandCustomHeader(GetReplicaInfoResponseHeader.class); - return StringUtils.equals(responseHeader.getMasterAddress(), "127.0.0.1:8001"); + return responseHeader.getMasterBrokerId().equals(2L); }, item -> item); // The new master should be broker2. diff --git a/controller/src/test/java/org/apache/rocketmq/controller/impl/controller/ControllerTestBase.java b/controller/src/test/java/org/apache/rocketmq/controller/impl/controller/ControllerTestBase.java new file mode 100644 index 000000000..9b8fa757c --- /dev/null +++ b/controller/src/test/java/org/apache/rocketmq/controller/impl/controller/ControllerTestBase.java @@ -0,0 +1,27 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.rocketmq.controller.impl.controller; + +public class ControllerTestBase { + + public final static String DEFAULT_CLUSTER_NAME = "cluster-a"; + + public final static String DEFAULT_BROKER_NAME = "broker-set-a"; + + public final static String[] DEFAULT_IP = {"127.0.0.1:9000", "127.0.0.1:9001", "127.0.0.1:9002"}; +} diff --git a/controller/src/test/java/org/apache/rocketmq/controller/impl/controller/impl/DLedgerControllerTest.java b/controller/src/test/java/org/apache/rocketmq/controller/impl/controller/impl/DLedgerControllerTest.java index 9cfd1146e..7b3953508 100644 --- a/controller/src/test/java/org/apache/rocketmq/controller/impl/controller/impl/DLedgerControllerTest.java +++ b/controller/src/test/java/org/apache/rocketmq/controller/impl/controller/impl/DLedgerControllerTest.java @@ -22,11 +22,11 @@ import java.time.Duration; import java.util.ArrayList; import java.util.HashSet; import java.util.List; -import java.util.Objects; import java.util.Set; import java.util.UUID; import java.util.concurrent.CompletableFuture; import java.util.concurrent.TimeUnit; + import org.apache.commons.lang3.StringUtils; import org.apache.rocketmq.common.ControllerConfig; import org.apache.rocketmq.controller.Controller; @@ -41,16 +41,20 @@ import org.apache.rocketmq.remoting.protocol.header.controller.ElectMasterReques import org.apache.rocketmq.remoting.protocol.header.controller.ElectMasterResponseHeader; import org.apache.rocketmq.remoting.protocol.header.controller.GetReplicaInfoRequestHeader; import org.apache.rocketmq.remoting.protocol.header.controller.GetReplicaInfoResponseHeader; -import org.apache.rocketmq.remoting.protocol.header.controller.register.RegisterBrokerToControllerRequestHeader; -import org.apache.rocketmq.remoting.protocol.header.controller.register.RegisterBrokerToControllerResponseHeader; +import org.apache.rocketmq.remoting.protocol.header.controller.register.ApplyBrokerIdRequestHeader; +import org.apache.rocketmq.remoting.protocol.header.controller.register.GetNextBrokerIdRequestHeader; +import org.apache.rocketmq.remoting.protocol.header.controller.register.GetNextBrokerIdResponseHeader; +import org.apache.rocketmq.remoting.protocol.header.controller.register.RegisterSuccessRequestHeader; import org.junit.After; import org.junit.Before; import org.junit.Test; +import static org.apache.rocketmq.controller.impl.controller.ControllerTestBase.DEFAULT_BROKER_NAME; +import static org.apache.rocketmq.controller.impl.controller.ControllerTestBase.DEFAULT_CLUSTER_NAME; +import static org.apache.rocketmq.controller.impl.controller.ControllerTestBase.DEFAULT_IP; import static org.awaitility.Awaitility.await; import static org.junit.Assert.assertArrayEquals; import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertNotEquals; import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertTrue; @@ -96,42 +100,44 @@ public class DLedgerControllerTest { } public void registerNewBroker(Controller leader, String clusterName, String brokerName, String brokerAddress, - long expectBrokerId) throws Exception { - // Register new broker - final RegisterBrokerToControllerRequestHeader registerRequest = new RegisterBrokerToControllerRequestHeader(clusterName, brokerName, brokerAddress); - RemotingCommand response = await().atMost(Duration.ofSeconds(20)).until(() -> { - try { - final RemotingCommand responseInner = leader.registerBroker(registerRequest).get(2, TimeUnit.SECONDS); - if (responseInner == null || responseInner.getCode() != ResponseCode.SUCCESS) { - return null; - } - return responseInner; - } catch (Exception e) { - e.printStackTrace(); - return null; - } - }, Objects::nonNull); + Long expectBrokerId) throws Exception { + // Get next brokerId + final GetNextBrokerIdRequestHeader getNextBrokerIdRequest = new GetNextBrokerIdRequestHeader(clusterName, brokerName); + RemotingCommand remotingCommand = leader.getNextBrokerId(getNextBrokerIdRequest).get(2, TimeUnit.SECONDS); + GetNextBrokerIdResponseHeader getNextBrokerIdResp = (GetNextBrokerIdResponseHeader) remotingCommand.readCustomHeader(); + Long nextBrokerId = getNextBrokerIdResp.getNextBrokerId(); + String registerCheckCode = brokerAddress + ";" + System.currentTimeMillis(); - final RegisterBrokerToControllerResponseHeader registerResult = (RegisterBrokerToControllerResponseHeader) response.readCustomHeader(); + // Check response + assertEquals(expectBrokerId, nextBrokerId); - assertEquals(expectBrokerId, registerResult.getBrokerId()); - } + // Apply brokerId + final ApplyBrokerIdRequestHeader applyBrokerIdRequestHeader = new ApplyBrokerIdRequestHeader(clusterName, brokerName, nextBrokerId, registerCheckCode); + RemotingCommand remotingCommand1 = leader.applyBrokerId(applyBrokerIdRequestHeader).get(2, TimeUnit.SECONDS); - public void brokerTryElectMaster(Controller leader, String clusterName, String brokerName, String brokerAddress, - boolean exceptSuccess) { - final ElectMasterRequestHeader electMasterRequestHeader = ElectMasterRequestHeader.ofBrokerTrigger(clusterName, brokerName, brokerAddress); - RemotingCommand command = await().atMost(Duration.ofSeconds(20)).until(() -> { - return leader.electMaster(electMasterRequestHeader).get(2, TimeUnit.SECONDS); - }, Objects::nonNull); + // Check response + assertEquals(ResponseCode.SUCCESS, remotingCommand1.getCode()); + + // Register success + final RegisterSuccessRequestHeader registerSuccessRequestHeader = new RegisterSuccessRequestHeader(clusterName, brokerName, nextBrokerId, brokerAddress); + RemotingCommand remotingCommand2 = leader.registerSuccess(registerSuccessRequestHeader).get(2, TimeUnit.SECONDS); + + + assertEquals(ResponseCode.SUCCESS, remotingCommand2.getCode()); + } + public void brokerTryElectMaster(Controller leader, String clusterName, String brokerName, String brokerAddress, Long brokerId, + boolean exceptSuccess) throws Exception { + final ElectMasterRequestHeader electMasterRequestHeader = ElectMasterRequestHeader.ofBrokerTrigger(clusterName, brokerName, brokerId); + RemotingCommand command = leader.electMaster(electMasterRequestHeader).get(2, TimeUnit.SECONDS); ElectMasterResponseHeader header = (ElectMasterResponseHeader) command.readCustomHeader(); assertEquals(exceptSuccess, ResponseCode.SUCCESS == command.getCode()); } - private boolean alterNewInSyncSet(Controller leader, String brokerName, String masterAddress, int masterEpoch, - Set<String> newSyncStateSet, int syncStateSetEpoch) throws Exception { + private boolean alterNewInSyncSet(Controller leader, String brokerName, Long masterBrokerId, Integer masterEpoch, + Set<Long> newSyncStateSet, Integer syncStateSetEpoch) throws Exception { final AlterSyncStateSetRequestHeader alterRequest = - new AlterSyncStateSetRequestHeader(brokerName, masterAddress, masterEpoch); + new AlterSyncStateSetRequestHeader(brokerName, masterBrokerId, masterEpoch); final RemotingCommand response = leader.alterSyncStateSet(alterRequest, new SyncStateSet(newSyncStateSet, syncStateSetEpoch)).get(10, TimeUnit.SECONDS); if (null == response || response.getCode() != ResponseCode.SUCCESS) { return false; @@ -177,30 +183,30 @@ public class DLedgerControllerTest { DLedgerController leader = waitLeader(controllers); // register - registerNewBroker(leader, "cluster1", "broker1", "127.0.0.1:9000", 1L); - registerNewBroker(leader, "cluster1", "broker1", "127.0.0.1:9001", 2L); - registerNewBroker(leader, "cluster1", "broker1", "127.0.0.1:9002", 3L); + registerNewBroker(leader, DEFAULT_CLUSTER_NAME, DEFAULT_BROKER_NAME, DEFAULT_IP[0], 1L); + registerNewBroker(leader, DEFAULT_CLUSTER_NAME, DEFAULT_BROKER_NAME, DEFAULT_IP[1], 2L); + registerNewBroker(leader, DEFAULT_CLUSTER_NAME, DEFAULT_BROKER_NAME, DEFAULT_IP[2], 3L); // try elect - brokerTryElectMaster(leader, "cluster1", "broker1", "127.0.0.1:9000", true); - brokerTryElectMaster(leader, "cluster1", "broker1", "127.0.0.1:9001", false); - brokerTryElectMaster(leader, "cluster1", "broker1", "127.0.0.1:9002", false); - final RemotingCommand getInfoResponse = leader.getReplicaInfo(new GetReplicaInfoRequestHeader("broker1")).get(10, TimeUnit.SECONDS); + brokerTryElectMaster(leader, DEFAULT_CLUSTER_NAME, DEFAULT_BROKER_NAME, DEFAULT_IP[0], 1L,true); + brokerTryElectMaster(leader, DEFAULT_CLUSTER_NAME, DEFAULT_BROKER_NAME, DEFAULT_IP[1], 2L, false); + brokerTryElectMaster(leader, DEFAULT_CLUSTER_NAME, DEFAULT_BROKER_NAME, DEFAULT_IP[2], 3L,false); + final RemotingCommand getInfoResponse = leader.getReplicaInfo(new GetReplicaInfoRequestHeader(DEFAULT_BROKER_NAME)).get(10, TimeUnit.SECONDS); final GetReplicaInfoResponseHeader replicaInfo = (GetReplicaInfoResponseHeader) getInfoResponse.readCustomHeader(); - assertEquals(1, replicaInfo.getMasterEpoch()); - assertEquals("127.0.0.1:9000", replicaInfo.getMasterAddress()); + assertEquals(1, replicaInfo.getMasterEpoch().intValue()); + assertEquals(DEFAULT_IP[0], replicaInfo.getMasterAddress()); // Try alter sync state set - final HashSet<String> newSyncStateSet = new HashSet<>(); - newSyncStateSet.add("127.0.0.1:9000"); - newSyncStateSet.add("127.0.0.1:9001"); - newSyncStateSet.add("127.0.0.1:9002"); - assertTrue(alterNewInSyncSet(leader, "broker1", "127.0.0.1:9000", 1, newSyncStateSet, 1)); + final HashSet<Long> newSyncStateSet = new HashSet<>(); + newSyncStateSet.add(1L); + newSyncStateSet.add(2L); + newSyncStateSet.add(3L); + assertTrue(alterNewInSyncSet(leader, DEFAULT_BROKER_NAME, 1L, 1, newSyncStateSet, 1)); return leader; } - public void setBrokerAlivePredicate(DLedgerController controller, String... deathBroker) { - controller.setBrokerAlivePredicate((clusterName, brokerName, brokerAddress) -> { - for (String broker : deathBroker) { - if (broker.equals(brokerAddress)) { + public void setBrokerAlivePredicate(DLedgerController controller, Long... deathBroker) { + controller.setBrokerAlivePredicate((clusterName, brokerName, brokerId) -> { + for (Long broker : deathBroker) { + if (broker.equals(brokerId)) { return false; } } @@ -208,10 +214,10 @@ public class DLedgerControllerTest { }); } - public void setBrokerElectPolicy(DLedgerController controller, String... deathBroker) { - controller.setElectPolicy(new DefaultElectPolicy((clusterName, brokerName, brokerAddress) -> { - for (String broker : deathBroker) { - if (broker.equals(brokerAddress)) { + public void setBrokerElectPolicy(DLedgerController controller, Long... deathBroker) { + controller.setElectPolicy(new DefaultElectPolicy((clusterName, brokerName, brokerId) -> { + for (Long broker : deathBroker) { + if (broker.equals(brokerId)) { return false; } } @@ -222,78 +228,80 @@ public class DLedgerControllerTest { @Test public void testElectMaster() throws Exception { final DLedgerController leader = mockMetaData(false); - final ElectMasterRequestHeader request = ElectMasterRequestHeader.ofControllerTrigger("broker1"); - setBrokerElectPolicy(leader, "127.0.0.1:9000"); + final ElectMasterRequestHeader request = ElectMasterRequestHeader.ofControllerTrigger(DEFAULT_BROKER_NAME); + setBrokerElectPolicy(leader, 1L); final RemotingCommand resp = leader.electMaster(request).get(10, TimeUnit.SECONDS); final ElectMasterResponseHeader response = (ElectMasterResponseHeader) resp.readCustomHeader(); - assertEquals(response.getMasterEpoch(), 2); - assertFalse(response.getMasterAddress().isEmpty()); - assertNotEquals(response.getMasterAddress(), "127.0.0.1:9000"); + assertEquals(2, response.getMasterEpoch().intValue()); + assertNotEquals(1L, response.getMasterBrokerId().longValue()); + assertNotEquals(DEFAULT_IP[0], response.getMasterAddress()); } @Test public void testAllReplicasShutdownAndRestartWithUnEnableElectUnCleanMaster() throws Exception { final DLedgerController leader = mockMetaData(false); - final HashSet<String> newSyncStateSet = new HashSet<>(); - newSyncStateSet.add("127.0.0.1:9000"); + final HashSet<Long> newSyncStateSet = new HashSet<>(); + newSyncStateSet.add(1L); - assertTrue(alterNewInSyncSet(leader, "broker1", "127.0.0.1:9000", 1, newSyncStateSet, 2)); + assertTrue(alterNewInSyncSet(leader, DEFAULT_BROKER_NAME, 1L, 1, newSyncStateSet, 2)); // Now we trigger electMaster api, which means the old master is shutdown and want to elect a new master. - // However, the syncStateSet in statemachine is {"127.0.0.1:9000"}, not more replicas can be elected as master, it will be failed. - final ElectMasterRequestHeader electRequest = ElectMasterRequestHeader.ofControllerTrigger("broker1"); - setBrokerElectPolicy(leader, "127.0.0.1:9000"); + // However, the syncStateSet in statemachine is {1}, not more replicas can be elected as master, it will be failed. + final ElectMasterRequestHeader electRequest = ElectMasterRequestHeader.ofControllerTrigger(DEFAULT_BROKER_NAME); + setBrokerElectPolicy(leader, 1L); leader.electMaster(electRequest).get(10, TimeUnit.SECONDS); - final RemotingCommand resp = leader.getReplicaInfo(new GetReplicaInfoRequestHeader("broker1")). + final RemotingCommand resp = leader.getReplicaInfo(new GetReplicaInfoRequestHeader(DEFAULT_BROKER_NAME)). get(10, TimeUnit.SECONDS); final GetReplicaInfoResponseHeader replicaInfo = (GetReplicaInfoResponseHeader) resp.readCustomHeader(); final SyncStateSet syncStateSet = RemotingSerializable.decode(resp.getBody(), SyncStateSet.class); assertEquals(syncStateSet.getSyncStateSet(), newSyncStateSet); - assertEquals(replicaInfo.getMasterAddress(), ""); - assertEquals(replicaInfo.getMasterEpoch(), 2); + assertEquals(null, replicaInfo.getMasterAddress()); + assertEquals(2, replicaInfo.getMasterEpoch().intValue()); - // Now, we start broker1 - 127.0.0.1:9001 to try elect, but it was not in syncStateSet, so it will not be elected as master. + // Now, we start broker - id[2]address[127.0.0.1:9001] to try elect, but it was not in syncStateSet, so it will not be elected as master. final ElectMasterRequestHeader request1 = - ElectMasterRequestHeader.ofBrokerTrigger("cluster1", "broker1", "127.0.0.1:9001"); + ElectMasterRequestHeader.ofBrokerTrigger(DEFAULT_CLUSTER_NAME, DEFAULT_BROKER_NAME, 2L); final ElectMasterResponseHeader r1 = (ElectMasterResponseHeader) leader.electMaster(request1).get(10, TimeUnit.SECONDS).readCustomHeader(); - assertEquals(r1.getMasterAddress(), ""); + assertEquals(null, r1.getMasterBrokerId()); + assertEquals(null, r1.getMasterAddress()); - // Now, we start broker1 - 127.0.0.1:9000 to try elect, it will be elected as master + // Now, we start broker - id[1]address[127.0.0.1:9000] to try elect, it will be elected as master setBrokerElectPolicy(leader); final ElectMasterRequestHeader request2 = - ElectMasterRequestHeader.ofBrokerTrigger("cluster1", "broker1", "127.0.0.1:9000"); + ElectMasterRequestHeader.ofBrokerTrigger(DEFAULT_CLUSTER_NAME, DEFAULT_BROKER_NAME, 1L); final ElectMasterResponseHeader r2 = (ElectMasterResponseHeader) leader.electMaster(request2).get(10, TimeUnit.SECONDS).readCustomHeader(); - assertEquals(r2.getMasterAddress(), "127.0.0.1:9000"); - assertEquals(r2.getMasterEpoch(), 3); + assertEquals(1L, r2.getMasterBrokerId().longValue()); + assertEquals(DEFAULT_IP[0], r2.getMasterAddress()); + assertEquals(3, r2.getMasterEpoch().intValue()); } @Test public void testEnableElectUnCleanMaster() throws Exception { final DLedgerController leader = mockMetaData(true); - final HashSet<String> newSyncStateSet = new HashSet<>(); - newSyncStateSet.add("127.0.0.1:9000"); + final HashSet<Long> newSyncStateSet = new HashSet<>(); + newSyncStateSet.add(1L); - assertTrue(alterNewInSyncSet(leader, "broker1", "127.0.0.1:9000", 1, newSyncStateSet, 2)); + assertTrue(alterNewInSyncSet(leader, DEFAULT_BROKER_NAME, 1L, 1, newSyncStateSet, 2)); // Now we trigger electMaster api, which means the old master is shutdown and want to elect a new master. - // However, event if the syncStateSet in statemachine is {"127.0.0.1:9000"} + // However, event if the syncStateSet in statemachine is {DEFAULT_IP[0]} // the option {enableElectUncleanMaster = true}, so the controller sill can elect a new master - final ElectMasterRequestHeader electRequest = ElectMasterRequestHeader.ofControllerTrigger("broker1"); - setBrokerElectPolicy(leader, "127.0.0.1:9000"); + final ElectMasterRequestHeader electRequest = ElectMasterRequestHeader.ofControllerTrigger(DEFAULT_BROKER_NAME); + setBrokerElectPolicy(leader, 1L); final CompletableFuture<RemotingCommand> future = leader.electMaster(electRequest); future.get(10, TimeUnit.SECONDS); - final RemotingCommand resp = leader.getReplicaInfo(new GetReplicaInfoRequestHeader("broker1")).get(10, TimeUnit.SECONDS); + final RemotingCommand resp = leader.getReplicaInfo(new GetReplicaInfoRequestHeader(DEFAULT_BROKER_NAME)).get(10, TimeUnit.SECONDS); final GetReplicaInfoResponseHeader replicaInfo = (GetReplicaInfoResponseHeader) resp.readCustomHeader(); final SyncStateSet syncStateSet = RemotingSerializable.decode(resp.getBody(), SyncStateSet.class); - final HashSet<String> newSyncStateSet2 = new HashSet<>(); - newSyncStateSet2.add(replicaInfo.getMasterAddress()); + final HashSet<Long> newSyncStateSet2 = new HashSet<>(); + newSyncStateSet2.add(replicaInfo.getMasterBrokerId()); assertEquals(syncStateSet.getSyncStateSet(), newSyncStateSet2); - assertNotEquals(replicaInfo.getMasterAddress(), ""); - assertNotEquals(replicaInfo.getMasterAddress(), "127.0.0.1:9000"); - assertEquals(replicaInfo.getMasterEpoch(), 2); + assertNotEquals(1L, replicaInfo.getMasterBrokerId().longValue()); + assertNotEquals(DEFAULT_IP[0], replicaInfo.getMasterAddress()); + assertEquals(2, replicaInfo.getMasterEpoch().intValue()); } @Test @@ -306,7 +314,7 @@ public class DLedgerControllerTest { assertNotNull(newLeader); RemotingCommand response = await().atMost(Duration.ofSeconds(10)).until(() -> { - final RemotingCommand resp = newLeader.getReplicaInfo(new GetReplicaInfoRequestHeader("broker1")).get(10, TimeUnit.SECONDS); + final RemotingCommand resp = newLeader.getReplicaInfo(new GetReplicaInfoRequestHeader(DEFAULT_BROKER_NAME)).get(10, TimeUnit.SECONDS); if (resp.getCode() == ResponseCode.SUCCESS) { return resp; @@ -316,13 +324,13 @@ public class DLedgerControllerTest { }, item -> item != null); final GetReplicaInfoResponseHeader replicaInfo = (GetReplicaInfoResponseHeader) response.readCustomHeader(); final SyncStateSet syncStateSetResult = RemotingSerializable.decode(response.getBody(), SyncStateSet.class); - assertEquals(replicaInfo.getMasterAddress(), "127.0.0.1:9000"); - assertEquals(replicaInfo.getMasterEpoch(), 1); + assertEquals(replicaInfo.getMasterAddress(), DEFAULT_IP[0]); + assertEquals(1, replicaInfo.getMasterEpoch().intValue()); - final HashSet<String> syncStateSet = new HashSet<>(); - syncStateSet.add("127.0.0.1:9000"); - syncStateSet.add("127.0.0.1:9001"); - syncStateSet.add("127.0.0.1:9002"); + final HashSet<Long> syncStateSet = new HashSet<>(); + syncStateSet.add(1L); + syncStateSet.add(2L); + syncStateSet.add(3L); assertEquals(syncStateSetResult.getSyncStateSet(), syncStateSet); } } diff --git a/controller/src/test/java/org/apache/rocketmq/controller/impl/controller/impl/DefaultBrokerHeartbeatManagerTest.java b/controller/src/test/java/org/apache/rocketmq/controller/impl/controller/impl/DefaultBrokerHeartbeatManagerTest.java index 306acf5b6..7b1e086e3 100644 --- a/controller/src/test/java/org/apache/rocketmq/controller/impl/controller/impl/DefaultBrokerHeartbeatManagerTest.java +++ b/controller/src/test/java/org/apache/rocketmq/controller/impl/controller/impl/DefaultBrokerHeartbeatManagerTest.java @@ -40,7 +40,7 @@ public class DefaultBrokerHeartbeatManagerTest { @Test public void testDetectBrokerAlive() throws InterruptedException { final CountDownLatch latch = new CountDownLatch(1); - this.heartbeatManager.addBrokerLifecycleListener((clusterName, brokerName, brokerAddress, brokerId) -> { + this.heartbeatManager.addBrokerLifecycleListener((clusterName, brokerName, brokerId) -> { latch.countDown(); }); this.heartbeatManager.onBrokerHeartbeat("cluster1", "broker1", "127.0.0.1:7000", 1L,3000L, null, diff --git a/controller/src/test/java/org/apache/rocketmq/controller/impl/controller/impl/manager/ReplicasInfoManagerTest.java b/controller/src/test/java/org/apache/rocketmq/controller/impl/controller/impl/manager/ReplicasInfoManagerTest.java index d5cad8188..3b93b6740 100644 --- a/controller/src/test/java/org/apache/rocketmq/controller/impl/controller/impl/manager/ReplicasInfoManagerTest.java +++ b/controller/src/test/java/org/apache/rocketmq/controller/impl/controller/impl/manager/ReplicasInfoManagerTest.java @@ -20,11 +20,11 @@ import java.util.Arrays; import java.util.HashSet; import java.util.List; import java.util.Set; -import org.apache.commons.lang3.StringUtils; + import org.apache.rocketmq.common.ControllerConfig; -import org.apache.rocketmq.common.MixAll; import org.apache.rocketmq.controller.elect.ElectPolicy; import org.apache.rocketmq.controller.elect.impl.DefaultElectPolicy; +import org.apache.rocketmq.controller.helper.BrokerValidPredicate; import org.apache.rocketmq.controller.impl.DefaultBrokerHeartbeatManager; import org.apache.rocketmq.controller.impl.event.ControllerResult; import org.apache.rocketmq.controller.impl.event.ElectMasterEvent; @@ -41,16 +41,24 @@ import org.apache.rocketmq.remoting.protocol.header.controller.ElectMasterReques import org.apache.rocketmq.remoting.protocol.header.controller.ElectMasterResponseHeader; import org.apache.rocketmq.remoting.protocol.header.controller.GetReplicaInfoRequestHeader; import org.apache.rocketmq.remoting.protocol.header.controller.GetReplicaInfoResponseHeader; -import org.apache.rocketmq.remoting.protocol.header.controller.register.RegisterBrokerToControllerRequestHeader; -import org.apache.rocketmq.remoting.protocol.header.controller.register.RegisterBrokerToControllerResponseHeader; +import org.apache.rocketmq.remoting.protocol.header.controller.register.ApplyBrokerIdRequestHeader; +import org.apache.rocketmq.remoting.protocol.header.controller.register.ApplyBrokerIdResponseHeader; +import org.apache.rocketmq.remoting.protocol.header.controller.register.GetNextBrokerIdRequestHeader; +import org.apache.rocketmq.remoting.protocol.header.controller.register.GetNextBrokerIdResponseHeader; +import org.apache.rocketmq.remoting.protocol.header.controller.register.RegisterSuccessRequestHeader; +import org.apache.rocketmq.remoting.protocol.header.controller.register.RegisterSuccessResponseHeader; import org.junit.After; import org.junit.Before; import org.junit.Test; +import static org.apache.rocketmq.controller.impl.controller.ControllerTestBase.DEFAULT_BROKER_NAME; +import static org.apache.rocketmq.controller.impl.controller.ControllerTestBase.DEFAULT_CLUSTER_NAME; +import static org.apache.rocketmq.controller.impl.controller.ControllerTestBase.DEFAULT_IP; import static org.junit.Assert.assertArrayEquals; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertNotEquals; +import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertTrue; public class ReplicasInfoManagerTest { @@ -60,11 +68,9 @@ public class ReplicasInfoManagerTest { private ControllerConfig config; - private ElectPolicy electPolicy; @Before public void init() { - this.electPolicy = new DefaultElectPolicy((clusterName, brokerAddr) -> true, null); this.config = new ControllerConfig(); this.config.setEnableElectUncleanMaster(false); this.config.setScanNotActiveBrokerInterval(300000000); @@ -80,62 +86,102 @@ public class ReplicasInfoManagerTest { this.heartbeatManager = null; } + private BrokerReplicasInfo.ReplicasInfo getReplicasInfo(String brokerName) { + ControllerResult<Void> syncStateData = this.replicasInfoManager.getSyncStateData(Arrays.asList(brokerName)); + BrokerReplicasInfo replicasInfo = RemotingSerializable.decode(syncStateData.getBody(), BrokerReplicasInfo.class); + return replicasInfo.getReplicasInfoTable().get(brokerName); + } + public void registerNewBroker(String clusterName, String brokerName, String brokerAddress, - long exceptBrokerId, String exceptMasterAddress) { - // Register new broker - final RegisterBrokerToControllerRequestHeader registerRequest = - new RegisterBrokerToControllerRequestHeader(clusterName, brokerName, brokerAddress); - final ControllerResult<RegisterBrokerToControllerResponseHeader> registerResult = this.replicasInfoManager.registerBroker(registerRequest, (s, v) -> true); - apply(registerResult.getEvents()); + Long exceptBrokerId, Long exceptMasterBrokerId) { + + // Get next brokerId + final GetNextBrokerIdRequestHeader getNextBrokerIdRequestHeader = new GetNextBrokerIdRequestHeader(clusterName, brokerName); + final ControllerResult<GetNextBrokerIdResponseHeader> nextBrokerIdResult = this.replicasInfoManager.getNextBrokerId(getNextBrokerIdRequestHeader); + Long nextBrokerId = nextBrokerIdResult.getResponse().getNextBrokerId(); + String registerCheckCode = brokerAddress + ";" + System.currentTimeMillis(); + // check response - assertEquals(ResponseCode.SUCCESS, registerResult.getResponseCode()); - assertEquals(exceptBrokerId, registerResult.getResponse().getBrokerId()); - assertEquals(exceptMasterAddress, registerResult.getResponse().getMasterAddress()); + assertEquals(ResponseCode.SUCCESS, nextBrokerIdResult.getResponseCode()); + assertEquals(exceptBrokerId, nextBrokerId); + + // Apply brokerId + final ApplyBrokerIdRequestHeader applyBrokerIdRequestHeader = new ApplyBrokerIdRequestHeader(clusterName, brokerName, nextBrokerId, registerCheckCode); + final ControllerResult<ApplyBrokerIdResponseHeader> applyBrokerIdResult = this.replicasInfoManager.applyBrokerId(applyBrokerIdRequestHeader); + apply(applyBrokerIdResult.getEvents()); + + // check response + assertEquals(ResponseCode.SUCCESS, applyBrokerIdResult.getResponseCode()); + // check it in state machine - final GetReplicaInfoResponseHeader replicaInfo = this.replicasInfoManager.getReplicaInfo(new GetReplicaInfoRequestHeader(brokerName, brokerAddress)).getResponse(); - assertEquals(exceptBrokerId, replicaInfo.getBrokerId()); - } + BrokerReplicasInfo.ReplicasInfo replicasInfo = getReplicasInfo(brokerName); + BrokerReplicasInfo.ReplicaIdentity replicaIdentity = replicasInfo.getNotInSyncReplicas().stream().filter(x -> x.getBrokerId().equals(nextBrokerId)).findFirst().get(); + assertNotNull(replicaIdentity); + assertEquals(brokerName, replicaIdentity.getBrokerName()); + assertEquals(exceptBrokerId, replicaIdentity.getBrokerId()); + assertEquals(brokerAddress, replicaIdentity.getBrokerAddress()); + + // register success + final RegisterSuccessRequestHeader registerSuccessRequestHeader = new RegisterSuccessRequestHeader(clusterName, brokerName, exceptBrokerId, brokerAddress); + ControllerResult<RegisterSuccessResponseHeader> registerSuccessResult = this.replicasInfoManager.registerSuccess(registerSuccessRequestHeader, (a, b, c) -> true); + apply(registerSuccessResult.getEvents()); - public void brokerElectMaster(String clusterName, long brokerId, String brokerName, String brokerAddress, - boolean isFirstTryElect) { + // check response + assertEquals(ResponseCode.SUCCESS, registerSuccessResult.getResponseCode()); + assertEquals(exceptMasterBrokerId, registerSuccessResult.getResponse().getMasterBrokerId()); - final GetReplicaInfoResponseHeader replicaInfoBefore = this.replicasInfoManager.getReplicaInfo(new GetReplicaInfoRequestHeader(brokerName, brokerAddress)).getResponse(); - byte[] body = this.replicasInfoManager.getSyncStateData(Arrays.asList(brokerName)).getBody(); - BrokerReplicasInfo syncStateDataBefore = RemotingSerializable.decode(body, BrokerReplicasInfo.class); + } + public void brokerElectMaster(String clusterName, Long brokerId, String brokerName, String brokerAddress, boolean isFirstTryElect, boolean expectToBeElected) { + this.brokerElectMaster(clusterName, brokerId, brokerName, brokerAddress, isFirstTryElect,expectToBeElected, (a, b, c) -> true); + } + + public void brokerElectMaster(String clusterName, Long brokerId, String brokerName, String brokerAddress, boolean isFirstTryElect, boolean expectToBeElected, BrokerValidPredicate validPredicate) { + + final GetReplicaInfoResponseHeader replicaInfoBefore = this.replicasInfoManager.getReplicaInfo(new GetReplicaInfoRequestHeader(brokerName)).getResponse(); + BrokerReplicasInfo.ReplicasInfo syncStateSetInfo = getReplicasInfo(brokerName); // Try elect itself as a master - ElectMasterRequestHeader requestHeader = ElectMasterRequestHeader.ofBrokerTrigger(clusterName, brokerName, brokerAddress); - final ControllerResult<ElectMasterResponseHeader> result = this.replicasInfoManager.electMaster(requestHeader, this.electPolicy); + ElectMasterRequestHeader requestHeader = ElectMasterRequestHeader.ofBrokerTrigger(clusterName, brokerName, brokerId); + final ControllerResult<ElectMasterResponseHeader> result = this.replicasInfoManager.electMaster(requestHeader, new DefaultElectPolicy(validPredicate, null)); apply(result.getEvents()); - final GetReplicaInfoResponseHeader replicaInfoAfter = this.replicasInfoManager.getReplicaInfo(new GetReplicaInfoRequestHeader(brokerName, brokerAddress)).getResponse(); + final GetReplicaInfoResponseHeader replicaInfoAfter = this.replicasInfoManager.getReplicaInfo(new GetReplicaInfoRequestHeader(brokerName)).getResponse(); final ElectMasterResponseHeader response = result.getResponse(); if (isFirstTryElect) { // it should be elected // check response assertEquals(ResponseCode.SUCCESS, result.getResponseCode()); - assertEquals(1, response.getMasterEpoch()); - assertEquals(1, response.getSyncStateSetEpoch()); + assertEquals(1, response.getMasterEpoch().intValue()); + assertEquals(1, response.getSyncStateSetEpoch().intValue()); assertEquals(brokerAddress, response.getMasterAddress()); + assertEquals(brokerId, response.getMasterBrokerId()); // check it in state machine assertEquals(brokerAddress, replicaInfoAfter.getMasterAddress()); - assertEquals(1, replicaInfoAfter.getMasterEpoch()); - assertEquals(brokerId, replicaInfoAfter.getBrokerId()); + assertEquals(1, replicaInfoAfter.getMasterEpoch().intValue()); + assertEquals(brokerId, replicaInfoAfter.getMasterBrokerId()); } else { // failed because now master still exist - if (StringUtils.isNotEmpty(replicaInfoBefore.getMasterAddress())) { + if (replicaInfoBefore.getMasterBrokerId() != null && validPredicate.check(clusterName, brokerName, replicaInfoBefore.getMasterBrokerId())) { assertEquals(ResponseCode.CONTROLLER_MASTER_STILL_EXIST, result.getResponseCode()); assertEquals(replicaInfoBefore.getMasterAddress(), response.getMasterAddress()); assertEquals(replicaInfoBefore.getMasterEpoch(), response.getMasterEpoch()); - assertEquals(brokerId, replicaInfoAfter.getBrokerId()); + assertEquals(replicaInfoBefore.getMasterBrokerId(), response.getMasterBrokerId()); + assertEquals(replicaInfoBefore.getMasterBrokerId(), replicaInfoAfter.getMasterBrokerId()); return; } - if (syncStateDataBefore.getReplicasInfoTable().containsKey(brokerAddress) || this.config.isEnableElectUncleanMaster()) { - // can be elected successfully + if (syncStateSetInfo.isExistInSync(brokerName, brokerId, brokerAddress) || this.config.isEnableElectUncleanMaster()) { + // a new master can be elected successfully assertEquals(ResponseCode.SUCCESS, result.getResponseCode()); - assertEquals(MixAll.MASTER_ID, replicaInfoAfter.getBrokerId()); - assertEquals(brokerId, replicaInfoAfter.getBrokerId()); + assertEquals(replicaInfoBefore.getMasterEpoch() + 1, replicaInfoAfter.getMasterEpoch().intValue()); + + if (expectToBeElected) { + assertEquals(brokerAddress, response.getMasterAddress()); + assertEquals(brokerId, response.getMasterBrokerId()); + assertEquals(brokerAddress, replicaInfoAfter.getMasterAddress()); + assertEquals(brokerId, replicaInfoAfter.getMasterBrokerId()); + } + } else { // failed because elect nothing assertEquals(ResponseCode.CONTROLLER_ELECT_MASTER_FAILED, result.getResponseCode()); @@ -143,49 +189,12 @@ public class ReplicasInfoManagerTest { } } - @Test - public void testRegisterNewBroker() { - final RegisterBrokerToControllerRequestHeader registerRequest = - new RegisterBrokerToControllerRequestHeader("default", "brokerName-a", "127.0.0.1:9000"); - final ControllerResult<RegisterBrokerToControllerResponseHeader> registerResult = this.replicasInfoManager.registerBroker(registerRequest, (s, v) -> true); - apply(registerResult.getEvents()); - final RegisterBrokerToControllerRequestHeader registerRequest0 = - new RegisterBrokerToControllerRequestHeader("default", "brokerName-a", "127.0.0.1:9001"); - final ControllerResult<RegisterBrokerToControllerResponseHeader> registerResult0 = this.replicasInfoManager.registerBroker(registerRequest0, (s, v) -> true); - apply(registerResult0.getEvents()); - final ElectMasterRequestHeader electMasterRequest = ElectMasterRequestHeader.ofBrokerTrigger("default", "brokerName-a", "127.0.0.1:9000"); - ControllerResult<ElectMasterResponseHeader> electMasterResponseHeaderControllerResult = this.replicasInfoManager.electMaster(electMasterRequest, new DefaultElectPolicy()); - apply(electMasterResponseHeaderControllerResult.getEvents()); - final ControllerResult<GetReplicaInfoResponseHeader> getInfoResult = this.replicasInfoManager.getReplicaInfo(new GetReplicaInfoRequestHeader("brokerName-a")); - final GetReplicaInfoResponseHeader replicaInfo = getInfoResult.getResponse(); - assertEquals("127.0.0.1:9000", replicaInfo.getMasterAddress()); - assertEquals(1, replicaInfo.getMasterEpoch()); - final HashSet<String> newSyncStateSet = new HashSet<>(); - newSyncStateSet.add("127.0.0.1:9000"); - newSyncStateSet.add("127.0.0.1:9001"); - alterNewInSyncSet("brokerName-a", "127.0.0.1:9000", 1, newSyncStateSet, 1); - final RegisterBrokerToControllerRequestHeader registerRequest1 = - new RegisterBrokerToControllerRequestHeader("default", "brokerName-a", "127.0.0.1:9002"); - final ControllerResult<RegisterBrokerToControllerResponseHeader> registerResult1 = this.replicasInfoManager.registerBroker(registerRequest1, (s, v) -> StringUtils.equals(v, "127.0.0.1:9001")); - apply(registerResult1.getEvents()); - assertEquals(3, registerResult1.getResponse().getBrokerId()); - assertEquals("", registerResult1.getResponse().getMasterAddress()); - ElectPolicy electPolicy1 = new DefaultElectPolicy((clusterName, brokerAddress) -> !brokerAddress.equals("127.0.0.1:9000"),null); - final ElectMasterRequestHeader electMasterRequest1 = ElectMasterRequestHeader.ofBrokerTrigger("default", "brokerName-a", "127.0.0.1:9002"); - ControllerResult<ElectMasterResponseHeader> electMasterResponseHeaderControllerResult1 = this.replicasInfoManager.electMaster(electMasterRequest1, electPolicy1); - apply(electMasterResponseHeaderControllerResult1.getEvents()); - final ControllerResult<GetReplicaInfoResponseHeader> getInfoResult0 = this.replicasInfoManager.getReplicaInfo(new GetReplicaInfoRequestHeader("brokerName-a")); - final GetReplicaInfoResponseHeader replicaInfo0 = getInfoResult0.getResponse(); - assertEquals(replicaInfo0.getMasterAddress(), "127.0.0.1:9001"); - assertTrue(replicaInfo0.getMasterAddress().equals("127.0.0.1:9001") || replicaInfo0.getMasterAddress().equals("127.0.0.1:9002")); - assertEquals(replicaInfo0.getMasterEpoch(), 2); - } - - private boolean alterNewInSyncSet(String brokerName, String masterAddress, int masterEpoch, - Set<String> newSyncStateSet, int syncStateSetEpoch) { + private boolean alterNewInSyncSet(String brokerName, Long brokerId, Integer masterEpoch, + Set<Long> newSyncStateSet, Integer syncStateSetEpoch) { final AlterSyncStateSetRequestHeader alterRequest = - new AlterSyncStateSetRequestHeader(brokerName, masterAddress, masterEpoch); - final ControllerResult<AlterSyncStateSetResponseHeader> result = this.replicasInfoManager.alterSyncStateSet(alterRequest, new SyncStateSet(newSyncStateSet, syncStateSetEpoch), (va1, va2) -> true); + new AlterSyncStateSetRequestHeader(brokerName, brokerId, masterEpoch); + final ControllerResult<AlterSyncStateSetResponseHeader> result = this.replicasInfoManager.alterSyncStateSet(alterRequest, + new SyncStateSet(newSyncStateSet, syncStateSetEpoch), (cluster, brokerName1, brokerId1) -> true); apply(result.getEvents()); final ControllerResult<GetReplicaInfoResponseHeader> resp = this.replicasInfoManager.getReplicaInfo(new GetReplicaInfoRequestHeader(brokerName)); @@ -204,79 +213,106 @@ public class ReplicasInfoManagerTest { } public void mockMetaData() { - registerNewBroker("cluster1", "broker1", "127.0.0.1:9000", 1L, ""); - registerNewBroker("cluster1", "broker1", "127.0.0.1:9001", 2L, ""); - registerNewBroker("cluster1", "broker1", "127.0.0.1:9002", 3L, ""); - brokerElectMaster("cluster1", 1L, "broker1", "127.0.0.1:9000", true); - brokerElectMaster("cluster1", 2L, "broker1", "127.0.0.1:9001", false); - brokerElectMaster("cluster1", 3L, "broker1", "127.0.0.1:9002", false); - final HashSet<String> newSyncStateSet = new HashSet<>(); - newSyncStateSet.add("127.0.0.1:9000"); - newSyncStateSet.add("127.0.0.1:9001"); - newSyncStateSet.add("127.0.0.1:9002"); - assertTrue(alterNewInSyncSet("broker1", "127.0.0.1:9000", 1, newSyncStateSet, 1)); + registerNewBroker(DEFAULT_CLUSTER_NAME, DEFAULT_BROKER_NAME, DEFAULT_IP[0], 1L, null); + registerNewBroker(DEFAULT_CLUSTER_NAME, DEFAULT_BROKER_NAME, DEFAULT_IP[1], 2L, null); + registerNewBroker(DEFAULT_CLUSTER_NAME, DEFAULT_BROKER_NAME, DEFAULT_IP[2], 3L, null); + brokerElectMaster(DEFAULT_CLUSTER_NAME, 1L, DEFAULT_BROKER_NAME, DEFAULT_IP[0], true, true); + brokerElectMaster(DEFAULT_CLUSTER_NAME, 2L, DEFAULT_BROKER_NAME, DEFAULT_IP[1], false, false); + brokerElectMaster(DEFAULT_CLUSTER_NAME, 3L, DEFAULT_BROKER_NAME, DEFAULT_IP[2], false, false); + final HashSet<Long> newSyncStateSet = new HashSet<>(); + newSyncStateSet.add(1L); + newSyncStateSet.add(2L); + newSyncStateSet.add(3L); + assertTrue(alterNewInSyncSet(DEFAULT_BROKER_NAME, 1L, 1, newSyncStateSet, 1)); } public void mockHeartbeatDataMasterStillAlive() { - this.heartbeatManager.onBrokerHeartbeat("cluster1", "broker1", "127.0.0.1:9000", 1L, 10000000000L, null, + this.heartbeatManager.onBrokerHeartbeat(DEFAULT_CLUSTER_NAME, DEFAULT_BROKER_NAME, DEFAULT_IP[0], 1L, 10000000000L, null, 1, 1L, -1L, 0); - this.heartbeatManager.onBrokerHeartbeat("cluster1", "broker1", "127.0.0.1:9001", 1L, 10000000000L, null, + this.heartbeatManager.onBrokerHeartbeat(DEFAULT_CLUSTER_NAME, DEFAULT_BROKER_NAME, DEFAULT_IP[1], 2L, 10000000000L, null, 1, 2L, -1L, 0); - this.heartbeatManager.onBrokerHeartbeat("cluster1", "broker1", "127.0.0.1:9002", 1L, 10000000000L, null, + this.heartbeatManager.onBrokerHeartbeat(DEFAULT_CLUSTER_NAME, DEFAULT_BROKER_NAME, DEFAULT_IP[2], 3L, 10000000000L, null, 1, 3L, -1L, 0); } public void mockHeartbeatDataHigherEpoch() { - this.heartbeatManager.onBrokerHeartbeat("cluster1", "broker1", "127.0.0.1:9000", 1L, -10000L, null, + this.heartbeatManager.onBrokerHeartbeat(DEFAULT_CLUSTER_NAME, DEFAULT_BROKER_NAME, DEFAULT_IP[0], 1L, -10000L, null, 1, 3L, -1L, 0); - this.heartbeatManager.onBrokerHeartbeat("cluster1", "broker1", "127.0.0.1:9001", 1L, 10000000000L, null, + this.heartbeatManager.onBrokerHeartbeat(DEFAULT_CLUSTER_NAME, DEFAULT_BROKER_NAME, DEFAULT_IP[1], 2L, 10000000000L, null, 1, 2L, -1L, 0); - this.heartbeatManager.onBrokerHeartbeat("cluster1", "broker1", "127.0.0.1:9002", 1L, 10000000000L, null, + this.heartbeatManager.onBrokerHeartbeat(DEFAULT_CLUSTER_NAME, DEFAULT_BROKER_NAME, DEFAULT_IP[2], 3L, 10000000000L, null, 0, 3L, -1L, 0); } public void mockHeartbeatDataHigherOffset() { - this.heartbeatManager.onBrokerHeartbeat("cluster1", "broker1", "127.0.0.1:9000", 1L, -10000L, null, + this.heartbeatManager.onBrokerHeartbeat(DEFAULT_CLUSTER_NAME, DEFAULT_BROKER_NAME, DEFAULT_IP[0], 1L, -10000L, null, 1, 3L, -1L, 0); - this.heartbeatManager.onBrokerHeartbeat("cluster1", "broker1", "127.0.0.1:9001", 1L, 10000000000L, null, + this.heartbeatManager.onBrokerHeartbeat(DEFAULT_CLUSTER_NAME, DEFAULT_BROKER_NAME, DEFAULT_IP[1], 2L, 10000000000L, null, 1, 2L, -1L, 0); - this.heartbeatManager.onBrokerHeartbeat("cluster1", "broker1", "127.0.0.1:9002", 1L, 10000000000L, null, + this.heartbeatManager.onBrokerHeartbeat(DEFAULT_CLUSTER_NAME, DEFAULT_BROKER_NAME, DEFAULT_IP[2], 3L, 10000000000L, null, 1, 3L, -1L, 0); } public void mockHeartbeatDataHigherPriority() { - this.heartbeatManager.onBrokerHeartbeat("cluster1", "broker1", "127.0.0.1:9000", 1L, -10000L, null, + this.heartbeatManager.onBrokerHeartbeat(DEFAULT_CLUSTER_NAME, DEFAULT_BROKER_NAME, DEFAULT_IP[0], 1L, -10000L, null, 1, 3L, -1L, 3); - this.heartbeatManager.onBrokerHeartbeat("cluster1", "broker1", "127.0.0.1:9001", 1L, 10000000000L, null, + this.heartbeatManager.onBrokerHeartbeat(DEFAULT_CLUSTER_NAME, DEFAULT_BROKER_NAME, DEFAULT_IP[1], 2L, 10000000000L, null, 1, 3L, -1L, 2); - this.heartbeatManager.onBrokerHeartbeat("cluster1", "broker1", "127.0.0.1:9002", 1L, 10000000000L, null, + this.heartbeatManager.onBrokerHeartbeat(DEFAULT_CLUSTER_NAME, DEFAULT_BROKER_NAME, DEFAULT_IP[2], 3L, 10000000000L, null, 1, 3L, -1L, 1); } @Test public void testRegisterBrokerSuccess() { - registerNewBroker("cluster1", "broker1", "127.0.0.1:9000", 1L, ""); - registerNewBroker("cluster1", "broker1", "127.0.0.1:9001", 2L, ""); - registerNewBroker("cluster1", "broker1", "127.0.0.1:9002", 3L, ""); - brokerElectMaster("cluster1", 1L, "broker1", "127.0.0.1:9000", true); - brokerElectMaster("cluster1", 2L, "broker1", "127.0.0.1:9001", false); - brokerElectMaster("cluster1", 3L, "broker1", "127.0.0.1:9002", false); + mockMetaData(); + + BrokerReplicasInfo.ReplicasInfo replicasInfo = getReplicasInfo(DEFAULT_BROKER_NAME); + assertEquals(1L, replicasInfo.getMasterBrokerId().longValue()); + assertEquals(DEFAULT_IP[0], replicasInfo.getMasterAddress()); + assertEquals(1, replicasInfo.getMasterEpoch()); + assertEquals(2, replicasInfo.getSyncStateSetEpoch()); + assertEquals(3, replicasInfo.getInSyncReplicas().size()); + assertEquals(0, replicasInfo.getNotInSyncReplicas().size()); } @Test public void testRegisterWithMasterExistResp() { - registerNewBroker("cluster1", "broker1", "127.0.0.1:9000", 1L, ""); - registerNewBroker("cluster1", "broker1", "127.0.0.1:9001", 2L, ""); - brokerElectMaster("cluster1", 1L, "broker1", "127.0.0.1:9000", true); - brokerElectMaster("cluster1", 2L, "broker1", "127.0.0.1:9001", false); - registerNewBroker("cluster1", "broker1", "127.0.0.1:9002", 3L, "127.0.0.1:9000"); - brokerElectMaster("cluster1", 3L, "broker1", "127.0.0.1:9002", false); + registerNewBroker(DEFAULT_CLUSTER_NAME, DEFAULT_BROKER_NAME, DEFAULT_IP[0], 1L, null); + registerNewBroker(DEFAULT_CLUSTER_NAME, DEFAULT_BROKER_NAME, DEFAULT_IP[1], 2L, null); + brokerElectMaster(DEFAULT_CLUSTER_NAME, 1L, DEFAULT_BROKER_NAME, DEFAULT_IP[0], true, true); + brokerElectMaster(DEFAULT_CLUSTER_NAME, 2L, DEFAULT_BROKER_NAME, DEFAULT_IP[1], false, false); + registerNewBroker(DEFAULT_CLUSTER_NAME, DEFAULT_BROKER_NAME, DEFAULT_IP[2], 3L, 1L); + brokerElectMaster(DEFAULT_CLUSTER_NAME, 3L, DEFAULT_BROKER_NAME, DEFAULT_IP[2], false, false); + + BrokerReplicasInfo.ReplicasInfo replicasInfo = getReplicasInfo(DEFAULT_BROKER_NAME); + assertEquals(1L, replicasInfo.getMasterBrokerId().longValue()); + assertEquals(DEFAULT_IP[0], replicasInfo.getMasterAddress()); + assertEquals(1, replicasInfo.getMasterEpoch()); + assertEquals(1, replicasInfo.getSyncStateSetEpoch()); + assertEquals(1, replicasInfo.getInSyncReplicas().size()); + assertEquals(2, replicasInfo.getNotInSyncReplicas().size()); + } + + @Test + public void testRegisterWithOldMasterInactive() { + mockMetaData(); + // If now only broker-3 alive, it will be elected to be a new master + brokerElectMaster(DEFAULT_CLUSTER_NAME, 3L, DEFAULT_BROKER_NAME, DEFAULT_IP[2], false, true, (a, b, c) -> c.equals(3L)); + + // Check in statemachine + BrokerReplicasInfo.ReplicasInfo replicasInfo = getReplicasInfo(DEFAULT_BROKER_NAME); + assertEquals(3L, replicasInfo.getMasterBrokerId().longValue()); + assertEquals(DEFAULT_IP[2], replicasInfo.getMasterAddress()); + assertEquals(2, replicasInfo.getMasterEpoch()); + assertEquals(3, replicasInfo.getSyncStateSetEpoch()); + assertEquals(1, replicasInfo.getInSyncReplicas().size()); + assertEquals(2, replicasInfo.getNotInSyncReplicas().size()); } @Test public void testElectMasterOldMasterStillAlive() { mockMetaData(); - final ElectMasterRequestHeader request = ElectMasterRequestHeader.ofControllerTrigger("broker1"); + final ElectMasterRequestHeader request = ElectMasterRequestHeader.ofControllerTrigger(DEFAULT_BROKER_NAME); ElectPolicy electPolicy = new DefaultElectPolicy(this.heartbeatManager::isBrokerActive, this.heartbeatManager::getBrokerLiveInfo); mockHeartbeatDataMasterStillAlive(); final ControllerResult<ElectMasterResponseHeader> cResult = this.replicasInfoManager.electMaster(request, @@ -287,100 +323,100 @@ public class ReplicasInfoManagerTest { @Test public void testElectMasterPreferHigherEpoch() { mockMetaData(); - final ElectMasterRequestHeader request = ElectMasterRequestHeader.ofControllerTrigger("broker1"); + final ElectMasterRequestHeader request = ElectMasterRequestHeader.ofControllerTrigger(DEFAULT_BROKER_NAME); ElectPolicy electPolicy = new DefaultElectPolicy(this.heartbeatManager::isBrokerActive, this.heartbeatManager::getBrokerLiveInfo); mockHeartbeatDataHigherEpoch(); final ControllerResult<ElectMasterResponseHeader> cResult = this.replicasInfoManager.electMaster(request, electPolicy); final ElectMasterResponseHeader response = cResult.getResponse(); - assertEquals(response.getMasterEpoch(), 2); - assertFalse(response.getMasterAddress().isEmpty()); - assertEquals("127.0.0.1:9001", response.getMasterAddress()); + assertEquals(DEFAULT_IP[1], response.getMasterAddress()); + assertEquals(2L, response.getMasterBrokerId().longValue()); + assertEquals(2, response.getMasterEpoch().intValue()); } @Test public void testElectMasterPreferHigherOffsetWhenEpochEquals() { mockMetaData(); - final ElectMasterRequestHeader request = ElectMasterRequestHeader.ofControllerTrigger("broker1"); + final ElectMasterRequestHeader request = ElectMasterRequestHeader.ofControllerTrigger(DEFAULT_BROKER_NAME); ElectPolicy electPolicy = new DefaultElectPolicy(this.heartbeatManager::isBrokerActive, this.heartbeatManager::getBrokerLiveInfo); mockHeartbeatDataHigherOffset(); final ControllerResult<ElectMasterResponseHeader> cResult = this.replicasInfoManager.electMaster(request, electPolicy); final ElectMasterResponseHeader response = cResult.getResponse(); - assertEquals(response.getMasterEpoch(), 2); - assertFalse(response.getMasterAddress().isEmpty()); - assertEquals("127.0.0.1:9002", response.getMasterAddress()); + assertEquals(DEFAULT_IP[2], response.getMasterAddress()); + assertEquals(3L, response.getMasterBrokerId().longValue()); + assertEquals(2, response.getMasterEpoch().intValue()); } @Test public void testElectMasterPreferHigherPriorityWhenEpochAndOffsetEquals() { mockMetaData(); - final ElectMasterRequestHeader request = new ElectMasterRequestHeader("broker1"); + final ElectMasterRequestHeader request = new ElectMasterRequestHeader(DEFAULT_BROKER_NAME); ElectPolicy electPolicy = new DefaultElectPolicy(this.heartbeatManager::isBrokerActive, this.heartbeatManager::getBrokerLiveInfo); mockHeartbeatDataHigherPriority(); final ControllerResult<ElectMasterResponseHeader> cResult = this.replicasInfoManager.electMaster(request, electPolicy); final ElectMasterResponseHeader response = cResult.getResponse(); - assertEquals(response.getMasterEpoch(), 2); - assertFalse(response.getMasterAddress().isEmpty()); - assertEquals("127.0.0.1:9002", response.getMasterAddress()); + assertEquals(DEFAULT_IP[2], response.getMasterAddress()); + assertEquals(3L, response.getMasterBrokerId().longValue()); + assertEquals(2, response.getMasterEpoch().intValue()); } @Test public void testElectMaster() { mockMetaData(); - final ElectMasterRequestHeader request = ElectMasterRequestHeader.ofControllerTrigger("broker1"); + final ElectMasterRequestHeader request = ElectMasterRequestHeader.ofControllerTrigger(DEFAULT_BROKER_NAME); final ControllerResult<ElectMasterResponseHeader> cResult = this.replicasInfoManager.electMaster(request, - new DefaultElectPolicy((clusterName, brokerAddress) -> !brokerAddress.equals("127.0.0.1:9000"), null)); + new DefaultElectPolicy((cluster, brokerName, brokerId) -> !brokerId.equals(1L), null)); final ElectMasterResponseHeader response = cResult.getResponse(); - assertEquals(response.getMasterEpoch(), 2); - assertFalse(response.getMasterAddress().isEmpty()); - assertNotEquals(response.getMasterAddress(), "127.0.0.1:9000"); - + assertEquals(2, response.getMasterEpoch().intValue()); + assertNotEquals(1L, response.getMasterBrokerId().longValue()); + assertNotEquals(DEFAULT_IP[0], response.getMasterAddress()); apply(cResult.getEvents()); - final Set<String> brokerSet = new HashSet<>(); - brokerSet.add("127.0.0.1:9000"); - brokerSet.add("127.0.0.1:9001"); - brokerSet.add("127.0.0.1:9002"); - assertTrue(alterNewInSyncSet("broker1", response.getMasterAddress(), response.getMasterEpoch(), brokerSet, response.getSyncStateSetEpoch())); + final Set<Long> brokerSet = new HashSet<>(); + brokerSet.add(1L); + brokerSet.add(2L); + brokerSet.add(3L); + assertTrue(alterNewInSyncSet(DEFAULT_BROKER_NAME, response.getMasterBrokerId(), response.getMasterEpoch(), brokerSet, response.getSyncStateSetEpoch())); // test admin try to elect a assignedMaster, but it isn't alive - final ElectMasterRequestHeader assignRequest = ElectMasterRequestHeader.ofAdminTrigger("cluster1", "broker1", "127.0.0.1:9000"); + final ElectMasterRequestHeader assignRequest = ElectMasterRequestHeader.ofAdminTrigger(DEFAULT_CLUSTER_NAME, DEFAULT_BROKER_NAME, 1L); final ControllerResult<ElectMasterResponseHeader> cResult1 = this.replicasInfoManager.electMaster(assignRequest, - new DefaultElectPolicy((clusterName, brokerAddress) -> !brokerAddress.equals("127.0.0.1:9000"), null)); + new DefaultElectPolicy((cluster, brokerName, brokerId) -> !brokerId.equals(1L), null)); assertEquals(cResult1.getResponseCode(), ResponseCode.CONTROLLER_ELECT_MASTER_FAILED); // test admin try to elect a assignedMaster but old master still alive, and the old master is equals to assignedMaster - final ElectMasterRequestHeader assignRequest1 = ElectMasterRequestHeader.ofAdminTrigger("cluster1", "broker1", response.getMasterAddress()); + final ElectMasterRequestHeader assignRequest1 = ElectMasterRequestHeader.ofAdminTrigger(DEFAULT_CLUSTER_NAME, DEFAULT_BROKER_NAME, response.getMasterBrokerId()); final ControllerResult<ElectMasterResponseHeader> cResult2 = this.replicasInfoManager.electMaster(assignRequest1, - new DefaultElectPolicy((clusterName, brokerAddress) -> true, null)); + new DefaultElectPolicy((cluster, brokerName, brokerId) -> true, null)); assertEquals(cResult2.getResponseCode(), ResponseCode.CONTROLLER_MASTER_STILL_EXIST); // admin successful elect a assignedMaster. - final ElectMasterRequestHeader assignRequest2 = ElectMasterRequestHeader.ofAdminTrigger("cluster1", "broker1", "127.0.0.1:9000"); + final ElectMasterRequestHeader assignRequest2 = ElectMasterRequestHeader.ofAdminTrigger(DEFAULT_CLUSTER_NAME, DEFAULT_BROKER_NAME, 1L); final ControllerResult<ElectMasterResponseHeader> cResult3 = this.replicasInfoManager.electMaster(assignRequest2, - new DefaultElectPolicy((clusterName, brokerAddress) -> !brokerAddress.equals(response.getMasterAddress()), null)); + new DefaultElectPolicy((cluster, brokerName, brokerId) -> !brokerId.equals(response.getMasterBrokerId()), null)); assertEquals(cResult3.getResponseCode(), ResponseCode.SUCCESS); final ElectMasterResponseHeader response3 = cResult3.getResponse(); - assertEquals(response3.getMasterAddress(), "127.0.0.1:9000"); - assertEquals(response3.getMasterEpoch(), 3); + assertEquals(1L, response3.getMasterBrokerId().longValue()); + assertEquals(DEFAULT_IP[0], response3.getMasterAddress()); + assertEquals(3, response3.getMasterEpoch().intValue()); } @Test public void testAllReplicasShutdownAndRestart() { mockMetaData(); - final HashSet<String> newSyncStateSet = new HashSet<>(); - newSyncStateSet.add("127.0.0.1:9000"); - assertTrue(alterNewInSyncSet("broker1", "127.0.0.1:9000", 1, newSyncStateSet, 2)); + final HashSet<Long> newSyncStateSet = new HashSet<>(); + newSyncStateSet.add(1L); + assertTrue(alterNewInSyncSet(DEFAULT_BROKER_NAME, 1L, 1, newSyncStateSet, 2)); // Now we trigger electMaster api, which means the old master is shutdown and want to elect a new master. - // However, the syncStateSet in statemachine is {"127.0.0.1:9000"}, not more replicas can be elected as master, it will be failed. - final ElectMasterRequestHeader electRequest = ElectMasterRequestHeader.ofControllerTrigger("broker1"); + // However, the syncStateSet in statemachine is {DEFAULT_IP[0]}, not more replicas can be elected as master, it will be failed. + final ElectMasterRequestHeader electRequest = ElectMasterRequestHeader.ofControllerTrigger(DEFAULT_BROKER_NAME); final ControllerResult<ElectMasterResponseHeader> cResult = this.replicasInfoManager.electMaster(electRequest, - new DefaultElectPolicy((clusterName, brokerAddress) -> !brokerAddress.equals("127.0.0.1:9000"), null)); + new DefaultElectPolicy((cluster, brokerName, brokerId) -> !brokerId.equals(1L), null)); final List<EventMessage> events = cResult.getEvents(); assertEquals(events.size(), 1); final ElectMasterEvent event = (ElectMasterEvent) events.get(0); @@ -388,42 +424,42 @@ public class ReplicasInfoManagerTest { apply(cResult.getEvents()); - final GetReplicaInfoResponseHeader replicaInfo = this.replicasInfoManager.getReplicaInfo(new GetReplicaInfoRequestHeader("broker1")).getResponse(); - assertEquals(replicaInfo.getMasterAddress(), ""); - assertEquals(replicaInfo.getMasterEpoch(), 2); + final GetReplicaInfoResponseHeader replicaInfo = this.replicasInfoManager.getReplicaInfo(new GetReplicaInfoRequestHeader(DEFAULT_BROKER_NAME)).getResponse(); + assertEquals(replicaInfo.getMasterAddress(), null); + assertEquals(2, replicaInfo.getMasterEpoch().intValue()); } @Test public void testCleanBrokerData() { mockMetaData(); - CleanControllerBrokerDataRequestHeader header1 = new CleanControllerBrokerDataRequestHeader("cluster1", "broker1", "127.0.0.1:9000"); - ControllerResult<Void> result1 = this.replicasInfoManager.cleanBrokerData(header1, (cluster, brokerAddr) -> true); + CleanControllerBrokerDataRequestHeader header1 = new CleanControllerBrokerDataRequestHeader(DEFAULT_CLUSTER_NAME, DEFAULT_BROKER_NAME, "1"); + ControllerResult<Void> result1 = this.replicasInfoManager.cleanBrokerData(header1, (cluster, brokerName, brokerId) -> true); assertEquals(ResponseCode.CONTROLLER_INVALID_CLEAN_BROKER_METADATA, result1.getResponseCode()); - CleanControllerBrokerDataRequestHeader header2 = new CleanControllerBrokerDataRequestHeader("cluster1", "broker1", null); - ControllerResult<Void> result2 = this.replicasInfoManager.cleanBrokerData(header2, (cluster, brokerAddr) -> true); + CleanControllerBrokerDataRequestHeader header2 = new CleanControllerBrokerDataRequestHeader(DEFAULT_CLUSTER_NAME, DEFAULT_BROKER_NAME, null); + ControllerResult<Void> result2 = this.replicasInfoManager.cleanBrokerData(header2, (cluster, brokerName, brokerId) -> true); assertEquals(ResponseCode.CONTROLLER_INVALID_CLEAN_BROKER_METADATA, result2.getResponseCode()); - assertEquals("Broker broker1 is still alive, clean up failure", result2.getRemark()); + assertEquals("Broker broker-set-a is still alive, clean up failure", result2.getRemark()); - CleanControllerBrokerDataRequestHeader header3 = new CleanControllerBrokerDataRequestHeader("cluster1", "broker1", "127.0.0.1:9000"); - ControllerResult<Void> result3 = this.replicasInfoManager.cleanBrokerData(header3, (cluster, brokerAddr) -> false); + CleanControllerBrokerDataRequestHeader header3 = new CleanControllerBrokerDataRequestHeader(DEFAULT_CLUSTER_NAME, DEFAULT_BROKER_NAME, "1"); + ControllerResult<Void> result3 = this.replicasInfoManager.cleanBrokerData(header3, (cluster, brokerName, brokerId) -> false); assertEquals(ResponseCode.SUCCESS, result3.getResponseCode()); - CleanControllerBrokerDataRequestHeader header4 = new CleanControllerBrokerDataRequestHeader("cluster1", "broker1", "127.0.0.1:9000;127.0.0.1:9001;127.0.0.1:9002"); - ControllerResult<Void> result4 = this.replicasInfoManager.cleanBrokerData(header4, (cluster, brokerAddr) -> false); + CleanControllerBrokerDataRequestHeader header4 = new CleanControllerBrokerDataRequestHeader(DEFAULT_CLUSTER_NAME, DEFAULT_BROKER_NAME, "1;2;3"); + ControllerResult<Void> result4 = this.replicasInfoManager.cleanBrokerData(header4, (cluster, brokerName, brokerId) -> false); assertEquals(ResponseCode.SUCCESS, result4.getResponseCode()); - CleanControllerBrokerDataRequestHeader header5 = new CleanControllerBrokerDataRequestHeader("cluster1", "broker12", "127.0.0.1:9000;127.0.0.1:9001;127.0.0.1:9002", true); - ControllerResult<Void> result5 = this.replicasInfoManager.cleanBrokerData(header5, (cluster, brokerAddr) -> false); + CleanControllerBrokerDataRequestHeader header5 = new CleanControllerBrokerDataRequestHeader(DEFAULT_CLUSTER_NAME, "broker12", "1;2;3", true); + ControllerResult<Void> result5 = this.replicasInfoManager.cleanBrokerData(header5, (cluster, brokerName, brokerId) -> false); assertEquals(ResponseCode.CONTROLLER_INVALID_CLEAN_BROKER_METADATA, result5.getResponseCode()); assertEquals("Broker broker12 is not existed,clean broker data failure.", result5.getRemark()); - CleanControllerBrokerDataRequestHeader header6 = new CleanControllerBrokerDataRequestHeader(null, "broker12", "127.0.0.1:9000;127.0.0.1:9001;127.0.0.1:9002", true); - ControllerResult<Void> result6 = this.replicasInfoManager.cleanBrokerData(header6, (cluster, brokerAddr) -> cluster != null); + CleanControllerBrokerDataRequestHeader header6 = new CleanControllerBrokerDataRequestHeader(null, "broker12", "1;2;3", true); + ControllerResult<Void> result6 = this.replicasInfoManager.cleanBrokerData(header6, (cluster, brokerName, brokerId) -> cluster != null); assertEquals(ResponseCode.CONTROLLER_INVALID_CLEAN_BROKER_METADATA, result6.getResponseCode()); - CleanControllerBrokerDataRequestHeader header7 = new CleanControllerBrokerDataRequestHeader(null, "broker1", "127.0.0.1:9000;127.0.0.1:9001;127.0.0.1:9002", true); - ControllerResult<Void> result7 = this.replicasInfoManager.cleanBrokerData(header7, (cluster, brokerAddr) -> false); + CleanControllerBrokerDataRequestHeader header7 = new CleanControllerBrokerDataRequestHeader(null, DEFAULT_BROKER_NAME, "1;2;3", true); + ControllerResult<Void> result7 = this.replicasInfoManager.cleanBrokerData(header7, (cluster, brokerName, brokerId) -> false); assertEquals(ResponseCode.SUCCESS, result7.getResponseCode()); } diff --git a/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/body/BrokerReplicasInfo.java b/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/body/BrokerReplicasInfo.java index c7e410b80..f7ceb82d7 100644 --- a/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/body/BrokerReplicasInfo.java +++ b/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/body/BrokerReplicasInfo.java @@ -19,6 +19,8 @@ package org.apache.rocketmq.remoting.protocol.body; import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.Objects; + import org.apache.rocketmq.remoting.protocol.RemotingSerializable; public class BrokerReplicasInfo extends RemotingSerializable { @@ -46,8 +48,8 @@ public class BrokerReplicasInfo extends RemotingSerializable { private Long masterBrokerId; private String masterAddress; - private int masterEpoch; - private int syncStateSetEpoch; + private Integer masterEpoch; + private Integer syncStateSetEpoch; private List<ReplicaIdentity> inSyncReplicas; private List<ReplicaIdentity> notInSyncReplicas; @@ -111,6 +113,19 @@ public class BrokerReplicasInfo extends RemotingSerializable { public Long getMasterBrokerId() { return masterBrokerId; } + + public boolean isExistInSync(String brokerName, Long brokerId, String brokerAddress) { + return this.getInSyncReplicas().contains(new ReplicaIdentity(brokerName, brokerId, brokerAddress)); + } + + public boolean isExistInNotSync(String brokerName, Long brokerId, String brokerAddress) { + return this.getNotInSyncReplicas().contains(new ReplicaIdentity(brokerName, brokerId, brokerAddress)); + } + + public boolean isExistInAllReplicas(String brokerName, Long brokerId, String brokerAddress) { + return this.isExistInSync(brokerName, brokerId, brokerAddress) || this.isExistInNotSync(brokerName, brokerId, brokerAddress); + } + } public static class ReplicaIdentity extends RemotingSerializable { @@ -157,5 +172,18 @@ public class BrokerReplicasInfo extends RemotingSerializable { ", brokerAddress='" + brokerAddress + '\'' + '}'; } + + @Override + public boolean equals(Object o) { + if (this == o) return true; + if (o == null || getClass() != o.getClass()) return false; + ReplicaIdentity that = (ReplicaIdentity) o; + return brokerName.equals(that.brokerName) && brokerId.equals(that.brokerId) && brokerAddress.equals(that.brokerAddress); + } + + @Override + public int hashCode() { + return Objects.hash(brokerName, brokerId, brokerAddress); + } } } diff --git a/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/body/RoleChangeNotifyEntry.java b/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/body/RoleChangeNotifyEntry.java index 2016b2968..91f9e1e8d 100644 --- a/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/body/RoleChangeNotifyEntry.java +++ b/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/body/RoleChangeNotifyEntry.java @@ -26,19 +26,22 @@ public class RoleChangeNotifyEntry { private final String masterAddress; + private final Long masterBrokerId; + private final int masterEpoch; private final int syncStateSetEpoch; - public RoleChangeNotifyEntry(BrokerMemberGroup brokerMemberGroup, String masterAddress, int masterEpoch, int syncStateSetEpoch) { + public RoleChangeNotifyEntry(BrokerMemberGroup brokerMemberGroup, String masterAddress, Long masterBrokerId, int masterEpoch, int syncStateSetEpoch) { this.brokerMemberGroup = brokerMemberGroup; this.masterAddress = masterAddress; this.masterEpoch = masterEpoch; this.syncStateSetEpoch = syncStateSetEpoch; + this.masterBrokerId = masterBrokerId; } public static RoleChangeNotifyEntry convert(ElectMasterResponseHeader header) { - return new RoleChangeNotifyEntry(header.getBrokerMemberGroup(), header.getMasterAddress(), header.getMasterEpoch(), header.getSyncStateSetEpoch()); + return new RoleChangeNotifyEntry(header.getBrokerMemberGroup(), header.getMasterAddress(), header.getMasterBrokerId(), header.getMasterEpoch(), header.getSyncStateSetEpoch()); } @@ -57,4 +60,8 @@ public class RoleChangeNotifyEntry { public int getSyncStateSetEpoch() { return syncStateSetEpoch; } + + public Long getMasterBrokerId() { + return masterBrokerId; + } } diff --git a/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/header/NotifyBrokerRoleChangedRequestHeader.java b/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/header/NotifyBrokerRoleChangedRequestHeader.java index b32ab7238..3a112a578 100644 --- a/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/header/NotifyBrokerRoleChangedRequestHeader.java +++ b/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/header/NotifyBrokerRoleChangedRequestHeader.java @@ -21,19 +21,18 @@ import org.apache.rocketmq.remoting.exception.RemotingCommandException; public class NotifyBrokerRoleChangedRequestHeader implements CommandCustomHeader { private String masterAddress; - private int masterEpoch; - private int syncStateSetEpoch; - // The id of this broker. - private long brokerId; + private Integer masterEpoch; + private Integer syncStateSetEpoch; + private Long masterBrokerId; public NotifyBrokerRoleChangedRequestHeader() { } - public NotifyBrokerRoleChangedRequestHeader(String masterAddress, int masterEpoch, int syncStateSetEpoch, long brokerId) { + public NotifyBrokerRoleChangedRequestHeader(String masterAddress, Long masterBrokerId, Integer masterEpoch, Integer syncStateSetEpoch) { this.masterAddress = masterAddress; this.masterEpoch = masterEpoch; this.syncStateSetEpoch = syncStateSetEpoch; - this.brokerId = brokerId; + this.masterBrokerId = masterBrokerId; } public String getMasterAddress() { @@ -44,38 +43,38 @@ public class NotifyBrokerRoleChangedRequestHeader implements CommandCustomHeader this.masterAddress = masterAddress; } - public int getMasterEpoch() { + public Integer getMasterEpoch() { return masterEpoch; } - public void setMasterEpoch(int masterEpoch) { + public void setMasterEpoch(Integer masterEpoch) { this.masterEpoch = masterEpoch; } - public int getSyncStateSetEpoch() { + public Integer getSyncStateSetEpoch() { return syncStateSetEpoch; } - public void setSyncStateSetEpoch(int syncStateSetEpoch) { + public void setSyncStateSetEpoch(Integer syncStateSetEpoch) { this.syncStateSetEpoch = syncStateSetEpoch; } - public long getBrokerId() { - return brokerId; + public Long getMasterBrokerId() { + return masterBrokerId; } - public void setBrokerId(long brokerId) { - this.brokerId = brokerId; + public void setMasterBrokerId(Long masterBrokerId) { + this.masterBrokerId = masterBrokerId; } @Override public String toString() { return "NotifyBrokerRoleChangedRequestHeader{" + - "masterAddress='" + masterAddress + '\'' + - ", masterEpoch=" + masterEpoch + - ", syncStateSetEpoch=" + syncStateSetEpoch + - ", brokerId=" + brokerId + - '}'; + "masterAddress='" + masterAddress + '\'' + + ", masterEpoch=" + masterEpoch + + ", syncStateSetEpoch=" + syncStateSetEpoch + + ", masterBrokerId=" + masterBrokerId + + '}'; } @Override diff --git a/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/header/controller/AlterSyncStateSetRequestHeader.java b/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/header/controller/AlterSyncStateSetRequestHeader.java index 9fbf74e1f..5161d74dc 100644 --- a/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/header/controller/AlterSyncStateSetRequestHeader.java +++ b/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/header/controller/AlterSyncStateSetRequestHeader.java @@ -22,12 +22,12 @@ import org.apache.rocketmq.remoting.exception.RemotingCommandException; public class AlterSyncStateSetRequestHeader implements CommandCustomHeader { private String brokerName; private Long masterBrokerId; - private int masterEpoch; + private Integer masterEpoch; public AlterSyncStateSetRequestHeader() { } - public AlterSyncStateSetRequestHeader(String brokerName, Long masterBrokerId, int masterEpoch) { + public AlterSyncStateSetRequestHeader(String brokerName, Long masterBrokerId, Integer masterEpoch) { this.brokerName = brokerName; this.masterBrokerId = masterBrokerId; this.masterEpoch = masterEpoch; @@ -49,11 +49,11 @@ public class AlterSyncStateSetRequestHeader implements CommandCustomHeader { this.masterBrokerId = masterBrokerId; } - public int getMasterEpoch() { + public Integer getMasterEpoch() { return masterEpoch; } - public void setMasterEpoch(int masterEpoch) { + public void setMasterEpoch(Integer masterEpoch) { this.masterEpoch = masterEpoch; } diff --git a/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/header/controller/ElectMasterResponseHeader.java b/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/header/controller/ElectMasterResponseHeader.java index 1544b37db..658e2b592 100644 --- a/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/header/controller/ElectMasterResponseHeader.java +++ b/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/header/controller/ElectMasterResponseHeader.java @@ -24,8 +24,8 @@ public class ElectMasterResponseHeader implements CommandCustomHeader { private Long masterBrokerId; private String masterAddress; - private int masterEpoch; - private int syncStateSetEpoch; + private Integer masterEpoch; + private Integer syncStateSetEpoch; private BrokerMemberGroup brokerMemberGroup; public ElectMasterResponseHeader() { @@ -39,19 +39,19 @@ public class ElectMasterResponseHeader implements CommandCustomHeader { this.masterAddress = masterAddress; } - public int getMasterEpoch() { + public Integer getMasterEpoch() { return masterEpoch; } - public void setMasterEpoch(int masterEpoch) { + public void setMasterEpoch(Integer masterEpoch) { this.masterEpoch = masterEpoch; } - public int getSyncStateSetEpoch() { + public Integer getSyncStateSetEpoch() { return syncStateSetEpoch; } - public void setSyncStateSetEpoch(int syncStateSetEpoch) { + public void setSyncStateSetEpoch(Integer syncStateSetEpoch) { this.syncStateSetEpoch = syncStateSetEpoch; } diff --git a/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/header/controller/GetReplicaInfoRequestHeader.java b/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/header/controller/GetReplicaInfoRequestHeader.java index 9fe0e5316..efc7afc84 100644 --- a/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/header/controller/GetReplicaInfoRequestHeader.java +++ b/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/header/controller/GetReplicaInfoRequestHeader.java @@ -21,7 +21,6 @@ import org.apache.rocketmq.remoting.exception.RemotingCommandException; public class GetReplicaInfoRequestHeader implements CommandCustomHeader { private String brokerName; - private String brokerAddress; public GetReplicaInfoRequestHeader() { } @@ -30,10 +29,6 @@ public class GetReplicaInfoRequestHeader implements CommandCustomHeader { this.brokerName = brokerName; } - public GetReplicaInfoRequestHeader(String brokerName, String brokerAddress) { - this.brokerName = brokerName; - this.brokerAddress = brokerAddress; - } public String getBrokerName() { return brokerName; @@ -43,20 +38,11 @@ public class GetReplicaInfoRequestHeader implements CommandCustomHeader { this.brokerName = brokerName; } - public String getBrokerAddress() { - return brokerAddress; - } - - public void setBrokerAddress(String brokerAddress) { - this.brokerAddress = brokerAddress; - } - @Override public String toString() { return "GetReplicaInfoRequestHeader{" + - "brokerName='" + brokerName + '\'' + - ", brokerAddress='" + brokerAddress + '\'' + - '}'; + "brokerName='" + brokerName + '\'' + + '}'; } @Override diff --git a/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/header/controller/GetReplicaInfoResponseHeader.java b/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/header/controller/GetReplicaInfoResponseHeader.java index a7b6bbefa..f7aa49e69 100644 --- a/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/header/controller/GetReplicaInfoResponseHeader.java +++ b/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/header/controller/GetReplicaInfoResponseHeader.java @@ -23,7 +23,7 @@ public class GetReplicaInfoResponseHeader implements CommandCustomHeader { private Long masterBrokerId; private String masterAddress; - private int masterEpoch; + private Integer masterEpoch; public GetReplicaInfoResponseHeader() { } @@ -36,11 +36,11 @@ public class GetReplicaInfoResponseHeader implements CommandCustomHeader { this.masterAddress = masterAddress; } - public int getMasterEpoch() { + public Integer getMasterEpoch() { return masterEpoch; } - public void setMasterEpoch(int masterEpoch) { + public void setMasterEpoch(Integer masterEpoch) { this.masterEpoch = masterEpoch; } diff --git a/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/header/controller/admin/CleanControllerBrokerDataRequestHeader.java b/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/header/controller/admin/CleanControllerBrokerDataRequestHeader.java index 9bc84d195..795afeed8 100644 --- a/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/header/controller/admin/CleanControllerBrokerDataRequestHeader.java +++ b/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/header/controller/admin/CleanControllerBrokerDataRequestHeader.java @@ -46,8 +46,8 @@ public class CleanControllerBrokerDataRequestHeader implements CommandCustomHead this.isCleanLivingBroker = isCleanLivingBroker; } - public CleanControllerBrokerDataRequestHeader(String clusterName, String brokerName, String brokerAddress) { - this(clusterName, brokerName, brokerAddress, false); + public CleanControllerBrokerDataRequestHeader(String clusterName, String brokerName, String brokerIdSetToClean) { + this(clusterName, brokerName, brokerIdSetToClean, false); } @Override diff --git a/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/header/controller/register/ApplyBrokerIdRequestHeader.java b/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/header/controller/register/ApplyBrokerIdRequestHeader.java index bfc103eb3..c577d6a73 100644 --- a/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/header/controller/register/ApplyBrokerIdRequestHeader.java +++ b/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/header/controller/register/ApplyBrokerIdRequestHeader.java @@ -30,6 +30,10 @@ public class ApplyBrokerIdRequestHeader implements CommandCustomHeader { private String registerCheckCode; + public ApplyBrokerIdRequestHeader() { + + } + public ApplyBrokerIdRequestHeader(String clusterName, String brokerName, Long appliedBrokerId, String registerCheckCode) { this.clusterName = clusterName; this.brokerName = brokerName; @@ -57,4 +61,20 @@ public class ApplyBrokerIdRequestHeader implements CommandCustomHeader { public String getRegisterCheckCode() { return registerCheckCode; } + + public void setClusterName(String clusterName) { + this.clusterName = clusterName; + } + + public void setBrokerName(String brokerName) { + this.brokerName = brokerName; + } + + public void setAppliedBrokerId(Long appliedBrokerId) { + this.appliedBrokerId = appliedBrokerId; + } + + public void setRegisterCheckCode(String registerCheckCode) { + this.registerCheckCode = registerCheckCode; + } } diff --git a/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/header/controller/register/ApplyBrokerIdResponseHeader.java b/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/header/controller/register/ApplyBrokerIdResponseHeader.java index 1221c206d..a7f100f77 100644 --- a/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/header/controller/register/ApplyBrokerIdResponseHeader.java +++ b/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/header/controller/register/ApplyBrokerIdResponseHeader.java @@ -26,6 +26,8 @@ public class ApplyBrokerIdResponseHeader implements CommandCustomHeader { private String brokerName; + public ApplyBrokerIdResponseHeader() { + } public ApplyBrokerIdResponseHeader(String clusterName, String brokerName) { this.clusterName = clusterName; @@ -46,4 +48,19 @@ public class ApplyBrokerIdResponseHeader implements CommandCustomHeader { } + public String getClusterName() { + return clusterName; + } + + public void setClusterName(String clusterName) { + this.clusterName = clusterName; + } + + public String getBrokerName() { + return brokerName; + } + + public void setBrokerName(String brokerName) { + this.brokerName = brokerName; + } } diff --git a/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/header/controller/register/GetNextBrokerIdRequestHeader.java b/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/header/controller/register/GetNextBrokerIdRequestHeader.java index 90361ff74..aeb222955 100644 --- a/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/header/controller/register/GetNextBrokerIdRequestHeader.java +++ b/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/header/controller/register/GetNextBrokerIdRequestHeader.java @@ -26,6 +26,10 @@ public class GetNextBrokerIdRequestHeader implements CommandCustomHeader { private String brokerName; + public GetNextBrokerIdRequestHeader() { + + } + public GetNextBrokerIdRequestHeader(String clusterName, String brokerName) { this.clusterName = clusterName; this.brokerName = brokerName; @@ -51,4 +55,12 @@ public class GetNextBrokerIdRequestHeader implements CommandCustomHeader { public String getBrokerName() { return brokerName; } + + public void setBrokerName(String brokerName) { + this.brokerName = brokerName; + } + + public void setClusterName(String clusterName) { + this.clusterName = clusterName; + } } diff --git a/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/header/controller/register/GetNextBrokerIdResponseHeader.java b/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/header/controller/register/GetNextBrokerIdResponseHeader.java index 3fece1768..7d62722d4 100644 --- a/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/header/controller/register/GetNextBrokerIdResponseHeader.java +++ b/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/header/controller/register/GetNextBrokerIdResponseHeader.java @@ -28,6 +28,9 @@ public class GetNextBrokerIdResponseHeader implements CommandCustomHeader { private Long nextBrokerId; + public GetNextBrokerIdResponseHeader() { + } + public GetNextBrokerIdResponseHeader(String clusterName, String brokerName) { this(clusterName, brokerName, null); } @@ -59,4 +62,20 @@ public class GetNextBrokerIdResponseHeader implements CommandCustomHeader { public Long getNextBrokerId() { return nextBrokerId; } + + public String getClusterName() { + return clusterName; + } + + public void setClusterName(String clusterName) { + this.clusterName = clusterName; + } + + public String getBrokerName() { + return brokerName; + } + + public void setBrokerName(String brokerName) { + this.brokerName = brokerName; + } } diff --git a/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/header/controller/register/RegisterSuccessRequestHeader.java b/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/header/controller/register/RegisterSuccessRequestHeader.java index db5808d6d..cdddcfcd6 100644 --- a/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/header/controller/register/RegisterSuccessRequestHeader.java +++ b/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/header/controller/register/RegisterSuccessRequestHeader.java @@ -30,6 +30,9 @@ public class RegisterSuccessRequestHeader implements CommandCustomHeader { private String brokerAddress; + public RegisterSuccessRequestHeader() { + } + public RegisterSuccessRequestHeader(String clusterName, String brokerName, Long brokerId, String brokerAddress) { this.clusterName = clusterName; this.brokerName = brokerName; @@ -57,4 +60,20 @@ public class RegisterSuccessRequestHeader implements CommandCustomHeader { public String getBrokerAddress() { return brokerAddress; } + + public void setClusterName(String clusterName) { + this.clusterName = clusterName; + } + + public void setBrokerName(String brokerName) { + this.brokerName = brokerName; + } + + public void setBrokerId(Long brokerId) { + this.brokerId = brokerId; + } + + public void setBrokerAddress(String brokerAddress) { + this.brokerAddress = brokerAddress; + } } diff --git a/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/header/controller/register/RegisterSuccessResponseHeader.java b/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/header/controller/register/RegisterSuccessResponseHeader.java index 61e5d8ea1..7bedc95f5 100644 --- a/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/header/controller/register/RegisterSuccessResponseHeader.java +++ b/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/header/controller/register/RegisterSuccessResponseHeader.java @@ -30,11 +30,18 @@ public class RegisterSuccessResponseHeader implements CommandCustomHeader { private String masterAddress; + private Integer masterEpoch; + + private Integer syncStateSetEpoch; + @Override public void checkFields() throws RemotingCommandException { } + public RegisterSuccessResponseHeader() { + } + public RegisterSuccessResponseHeader(String clusterName, String brokerName) { this.clusterName = clusterName; this.brokerName = brokerName; @@ -48,6 +55,22 @@ public class RegisterSuccessResponseHeader implements CommandCustomHeader { this.masterAddress = masterAddress; } + public void setMasterEpoch(Integer masterEpoch) { + this.masterEpoch = masterEpoch; + } + + public void setSyncStateSetEpoch(Integer syncStateSetEpoch) { + this.syncStateSetEpoch = syncStateSetEpoch; + } + + public Integer getMasterEpoch() { + return masterEpoch; + } + + public Integer getSyncStateSetEpoch() { + return syncStateSetEpoch; + } + public String getClusterName() { return clusterName; } @@ -63,4 +86,12 @@ public class RegisterSuccessResponseHeader implements CommandCustomHeader { public String getMasterAddress() { return masterAddress; } + + public void setClusterName(String clusterName) { + this.clusterName = clusterName; + } + + public void setBrokerName(String brokerName) { + this.brokerName = brokerName; + } }
