This is an automated email from the ASF dual-hosted git repository. jinrongtong pushed a commit to branch dledger-controller-brokerId in repository https://gitbox.apache.org/repos/asf/rocketmq.git
commit 468a5d85dd9a6762e63a6fcb06aafac472cbfa40 Author: TheR1sing3un <[email protected]> AuthorDate: Sun Feb 5 13:11:51 2023 +0800 feat(broker): implement the general register to controller protocol 1. implement the general register to controller protocol --- .../broker/controller/ReplicasManager.java | 182 ++++++++++++++++++++- .../apache/rocketmq/broker/out/BrokerOuterAPI.java | 40 +++++ .../rocketmq/client/impl/MQClientAPIImpl.java | 4 +- .../rocketmq/controller/ControllerManager.java | 3 + .../processor/ControllerRequestProcessor.java | 9 + .../rocketmq/remoting/protocol/RequestCode.java | 6 + .../rocketmq/remoting/protocol/ResponseCode.java | 2 + .../register/ApplyBrokerIdRequestHeader.java | 7 + .../register/ApplyBrokerIdResponseHeader.java | 10 +- .../register/GetNextBrokerIdResponseHeader.java | 4 + ...ader.java => RegisterSuccessRequestHeader.java} | 13 +- ...der.java => RegisterSuccessResponseHeader.java} | 11 +- .../rocketmq/store/config/MessageStoreConfig.java | 20 +++ .../store/ha/autoswitch/BrokerMetadata.java | 82 ++++++++++ .../rocketmq/store/ha/autoswitch/MetadataFile.java | 60 +++++++ .../store/ha/autoswitch/TempBrokerMetadata.java | 92 +++++++++++ 16 files changed, 519 insertions(+), 26 deletions(-) diff --git a/broker/src/main/java/org/apache/rocketmq/broker/controller/ReplicasManager.java b/broker/src/main/java/org/apache/rocketmq/broker/controller/ReplicasManager.java index 8a03015f7..e3c9382f8 100644 --- a/broker/src/main/java/org/apache/rocketmq/broker/controller/ReplicasManager.java +++ b/broker/src/main/java/org/apache/rocketmq/broker/controller/ReplicasManager.java @@ -47,9 +47,14 @@ import org.apache.rocketmq.remoting.protocol.body.SyncStateSet; import org.apache.rocketmq.remoting.protocol.header.controller.ElectMasterResponseHeader; import org.apache.rocketmq.remoting.protocol.header.controller.GetMetaDataResponseHeader; import org.apache.rocketmq.remoting.protocol.header.controller.GetReplicaInfoResponseHeader; +import org.apache.rocketmq.remoting.protocol.header.controller.register.ApplyBrokerIdResponseHeader; +import org.apache.rocketmq.remoting.protocol.header.controller.register.GetNextBrokerIdResponseHeader; import org.apache.rocketmq.remoting.protocol.header.controller.register.RegisterBrokerToControllerResponseHeader; +import org.apache.rocketmq.remoting.protocol.header.controller.register.RegisterSuccessResponseHeader; import org.apache.rocketmq.store.config.BrokerRole; import org.apache.rocketmq.store.ha.autoswitch.AutoSwitchHAService; +import org.apache.rocketmq.store.ha.autoswitch.BrokerMetadata; +import org.apache.rocketmq.store.ha.autoswitch.TempBrokerMetadata; import static org.apache.rocketmq.remoting.protocol.ResponseCode.CONTROLLER_BROKER_METADATA_NOT_EXIST; @@ -78,6 +83,8 @@ public class ReplicasManager { private volatile String controllerLeaderAddress = ""; private volatile State state = State.INITIAL; + private RegisterState registerState = RegisterState.INITIAL; + private ScheduledFuture<?> checkSyncStateSetTaskFuture; private ScheduledFuture<?> slaveSyncFuture; @@ -85,6 +92,10 @@ public class ReplicasManager { private Long masterBrokerId; + private BrokerMetadata brokerMetadata; + + private TempBrokerMetadata tempBrokerMetadata; + private Set<Long> syncStateSet; private int syncStateSetEpoch = 0; private String masterAddress = ""; @@ -103,6 +114,8 @@ public class ReplicasManager { this.availableControllerAddresses = new ConcurrentHashMap<>(); this.syncStateSet = new HashSet<>(); this.localAddress = brokerController.getBrokerAddr(); + this.brokerMetadata = new BrokerMetadata(this.brokerController.getMessageStoreConfig().getStorePathMetadata()); + this.tempBrokerMetadata = new TempBrokerMetadata(this.brokerController.getMessageStoreConfig().getStorePathTempMetadata()); } public long getConfirmOffset() { @@ -113,12 +126,22 @@ public class ReplicasManager { INITIAL, FIRST_TIME_SYNC_CONTROLLER_METADATA_DONE, - FIRST_TIME_WAIT_MASTER_IS_ELECTED, + FIRST_TIME_REGISTER_TO_CONTROLLER_DONE, RUNNING, SHUTDOWN, } + enum RegisterState { + INITIAL, + + CREATE_TEMP_METADATA_FILE_DONE, + + CREATE_METADATA_FILE_DONE, + + REGISTERED + } + public void start() { updateControllerAddr(); scanAvailableControllerAddresses(); @@ -157,13 +180,13 @@ public class ReplicasManager { if (this.state == State.FIRST_TIME_SYNC_CONTROLLER_METADATA_DONE) { if (registerBrokerToController()) { LOGGER.info("First time register broker success"); - this.state = State.FIRST_TIME_WAIT_MASTER_IS_ELECTED; + this.state = State.FIRST_TIME_REGISTER_TO_CONTROLLER_DONE; } else { return false; } } - if (this.state == State.FIRST_TIME_WAIT_MASTER_IS_ELECTED) { + if (this.state == State.FIRST_TIME_REGISTER_TO_CONTROLLER_DONE) { if (StringUtils.isNotEmpty(this.masterAddress) || brokerElect()) { LOGGER.info("Master in this broker set is elected"); this.state = State.RUNNING; @@ -343,7 +366,7 @@ public class ReplicasManager { } private boolean registerBrokerToController() { - // Register this broker to controller, get brokerId and masterAddress. + // Register this broker to controller to get a stable and credible broker id, and persist metadata to local file. try { final RegisterBrokerToControllerResponseHeader registerResponse = this.brokerOuterAPI.registerBrokerToController(this.controllerLeaderAddress, this.brokerConfig.getBrokerClusterName(), this.brokerConfig.getBrokerName(), this.localAddress, this.brokerConfig.getControllerHeartBeatTimeoutMills(), @@ -373,6 +396,157 @@ public class ReplicasManager { } } + /** + * Register broker to controller, and persist the metadata to file + * @return whether registering process succeeded + */ + private boolean registerBrokerToController2() { + try { + // 1. confirm now registering state + confirmNowRegisteringState(); + // 2. get next assigning brokerId, and create temp metadata file + if (this.registerState == RegisterState.INITIAL) { + Long nextBrokerId = getNextBrokerId(); + if (nextBrokerId == null || !createTempMetadataFile(nextBrokerId)) { + return false; + } + this.registerState = RegisterState.CREATE_TEMP_METADATA_FILE_DONE; + } + // 3. apply brokerId to controller, and create metadata file + if (this.registerState == RegisterState.CREATE_TEMP_METADATA_FILE_DONE) { + if (!applyBrokerId()) { + // apply broker id failed, means that this brokerId has been used + // delete temp metadata file + this.tempBrokerMetadata.clear(); + // back to the first step + this.registerState = RegisterState.INITIAL; + return false; + } + if (!createMetadataFileAndDeleteTemp()) { + return false; + } + this.registerState = RegisterState.CREATE_METADATA_FILE_DONE; + } + // 4. register success + if (this.registerState == RegisterState.CREATE_METADATA_FILE_DONE) { + if (!registerSuccess()) { + return false; + } + this.registerState = RegisterState.REGISTERED; + } + return true; + } catch (final Exception e) { + LOGGER.error("Failed to register broker to controller", e); + return false; + } + } + + /** + * Send GetNextBrokerRequest to controller for getting next assigning brokerId in this broker-set + * @return next brokerId in this broker-set + */ + private Long getNextBrokerId() { + try { + GetNextBrokerIdResponseHeader nextBrokerIdResp = this.brokerOuterAPI.getNextBrokerId(this.brokerConfig.getBrokerClusterName(), this.brokerConfig.getBrokerName(), this.controllerLeaderAddress); + return nextBrokerIdResp.getNextBrokerId(); + } catch (Exception e) { + LOGGER.error("fail to get next broker id from controller", e); + return null; + } + } + + /** + * Create temp metadata file in local file system, records the brokerId and registerCheckCode + * @param brokerId the brokerId that is expected to be assigned + * @return whether the temp meta file is created successfully + */ + + private boolean createTempMetadataFile(Long brokerId) { + // generate register check code, format like that: $ipAddress;$timestamp + String registerCheckCode = this.localAddress + ";" + System.currentTimeMillis(); + try { + this.tempBrokerMetadata.updateAndPersist(brokerConfig.getBrokerClusterName(), brokerConfig.getBrokerName(), brokerId, registerCheckCode); + return true; + } catch (Exception e) { + LOGGER.error("update and persist temp broker metadata file failed", e); + this.tempBrokerMetadata.clear(); + return false; + } + } + + /** + * Send applyBrokerId request to controller + * @return whether controller has assigned this brokerId for this broker + */ + private boolean applyBrokerId() { + try { + ApplyBrokerIdResponseHeader response = this.brokerOuterAPI.applyBrokerId(brokerConfig.getBrokerClusterName(), brokerConfig.getBrokerName(), + tempBrokerMetadata.getBrokerId(), tempBrokerMetadata.getRegisterCheckCode(), this.controllerLeaderAddress); + return true; + + } catch (Exception e) { + LOGGER.error("fail to apply broker id", e); + return false; + } + } + + /** + * Create metadata file and delete temp metadata file + * @return whether process success + */ + private boolean createMetadataFileAndDeleteTemp() { + // create metadata file and delete temp metadata file + try { + this.brokerMetadata.updateAndPersist(brokerConfig.getBrokerClusterName(), brokerConfig.getBrokerName(), tempBrokerMetadata.getBrokerId()); + this.tempBrokerMetadata.clear(); + return true; + } catch (Exception e) { + LOGGER.error("fail to create metadata file", e); + this.brokerMetadata.clear(); + return false; + } + } + + /** + * Send registerSuccess request to inform controller that now broker has been registered successfully and controller should update broker ipAddress if changed + * @return whether request success + */ + private boolean registerSuccess() { + try { + RegisterSuccessResponseHeader response = this.brokerOuterAPI.registerSuccess(brokerConfig.getBrokerClusterName(), brokerConfig.getBrokerName(), brokerId, localAddress, controllerLeaderAddress); + return true; + } catch (Exception e) { + LOGGER.error("fail to send registerSuccess request to controller", e); + return false; + } + } + + /** + * Confirm the registering state now + */ + private void confirmNowRegisteringState() { + // 1. check if metadata exist + try { + this.brokerMetadata.readFromFile(); + } catch (Exception e) { + LOGGER.error("read metadata file failed", e); + } + if (this.brokerMetadata.isLoaded()) { + this.registerState = RegisterState.CREATE_METADATA_FILE_DONE; + this.brokerId = brokerMetadata.getBrokerId(); + return; + } + // 2. check if temp metadata exist + try { + this.tempBrokerMetadata.readFromFile(); + } catch (Exception e) { + LOGGER.error("read temp metadata file failed", e); + } + if (this.tempBrokerMetadata.isLoaded()) { + this.registerState = RegisterState.CREATE_TEMP_METADATA_FILE_DONE; + } + } + /** * Scheduling sync broker metadata form controller. */ diff --git a/broker/src/main/java/org/apache/rocketmq/broker/out/BrokerOuterAPI.java b/broker/src/main/java/org/apache/rocketmq/broker/out/BrokerOuterAPI.java index f123549cf..82e9ea33e 100644 --- a/broker/src/main/java/org/apache/rocketmq/broker/out/BrokerOuterAPI.java +++ b/broker/src/main/java/org/apache/rocketmq/broker/out/BrokerOuterAPI.java @@ -107,8 +107,14 @@ import org.apache.rocketmq.remoting.protocol.header.controller.AlterSyncStateSet import org.apache.rocketmq.remoting.protocol.header.controller.GetMetaDataResponseHeader; import org.apache.rocketmq.remoting.protocol.header.controller.GetReplicaInfoRequestHeader; import org.apache.rocketmq.remoting.protocol.header.controller.GetReplicaInfoResponseHeader; +import org.apache.rocketmq.remoting.protocol.header.controller.register.ApplyBrokerIdRequestHeader; +import org.apache.rocketmq.remoting.protocol.header.controller.register.ApplyBrokerIdResponseHeader; +import org.apache.rocketmq.remoting.protocol.header.controller.register.GetNextBrokerIdRequestHeader; +import org.apache.rocketmq.remoting.protocol.header.controller.register.GetNextBrokerIdResponseHeader; import org.apache.rocketmq.remoting.protocol.header.controller.register.RegisterBrokerToControllerRequestHeader; import org.apache.rocketmq.remoting.protocol.header.controller.register.RegisterBrokerToControllerResponseHeader; +import org.apache.rocketmq.remoting.protocol.header.controller.register.RegisterSuccessRequestHeader; +import org.apache.rocketmq.remoting.protocol.header.controller.register.RegisterSuccessResponseHeader; import org.apache.rocketmq.remoting.protocol.header.namesrv.BrokerHeartbeatRequestHeader; import org.apache.rocketmq.remoting.protocol.header.namesrv.GetRouteInfoRequestHeader; import org.apache.rocketmq.remoting.protocol.header.namesrv.QueryDataVersionRequestHeader; @@ -130,6 +136,7 @@ import org.apache.rocketmq.store.timer.TimerCheckpoint; import org.apache.rocketmq.store.timer.TimerMetrics; import static org.apache.rocketmq.remoting.protocol.RemotingSysResponseCode.SUCCESS; +import static org.apache.rocketmq.remoting.protocol.ResponseCode.CONTROLLER_BROKER_ID_INVALID; import static org.apache.rocketmq.remoting.protocol.ResponseCode.CONTROLLER_BROKER_METADATA_NOT_EXIST; import static org.apache.rocketmq.remoting.protocol.ResponseCode.CONTROLLER_BROKER_NEED_TO_BE_REGISTERED; import static org.apache.rocketmq.remoting.protocol.ResponseCode.CONTROLLER_ELECT_MASTER_FAILED; @@ -1213,6 +1220,39 @@ public class BrokerOuterAPI { throw new MQBrokerException(response.getCode(), response.getRemark()); } + public GetNextBrokerIdResponseHeader getNextBrokerId(final String clusterName, final String brokerName, final String controllerAddress) throws Exception { + final GetNextBrokerIdRequestHeader requestHeader = new GetNextBrokerIdRequestHeader(clusterName, brokerName); + final RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.GET_NEXT_BROKER_ID, requestHeader); + final RemotingCommand response = this.remotingClient.invokeSync(controllerAddress, request, 3000); + assert response != null; + if (response.getCode() == SUCCESS) { + return (GetNextBrokerIdResponseHeader) response.decodeCommandCustomHeader(GetNextBrokerIdResponseHeader.class); + } + throw new MQBrokerException(response.getCode(), response.getRemark()); + } + + public ApplyBrokerIdResponseHeader applyBrokerId(final String clusterName, final String brokerName, final Long brokerId, final String registerCheckCode, final String controllerAddress) throws Exception { + final ApplyBrokerIdRequestHeader requestHeader = new ApplyBrokerIdRequestHeader(clusterName, brokerName, brokerId, registerCheckCode); + final RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.APPLY_BROKER_ID, requestHeader); + final RemotingCommand response = this.remotingClient.invokeSync(controllerAddress, request, 3000); + assert response != null; + if (response.getCode() == SUCCESS) { + return (ApplyBrokerIdResponseHeader) response.decodeCommandCustomHeader(ApplyBrokerIdResponseHeader.class); + } + throw new MQBrokerException(response.getCode(), response.getRemark()); + } + + public RegisterSuccessResponseHeader registerSuccess(final String clusterName, final String brokerName, final Long brokerId, final String brokerAddress, final String controllerAddress) throws Exception { + final RegisterSuccessRequestHeader requestHeader = new RegisterSuccessRequestHeader(clusterName, brokerName, brokerId, brokerAddress); + final RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.REGISTER_SUCCESS, requestHeader); + final RemotingCommand response = this.remotingClient.invokeSync(controllerAddress, request, 3000); + assert response != null; + if (response.getCode() == SUCCESS) { + return (RegisterSuccessResponseHeader) response.decodeCommandCustomHeader(RegisterSuccessResponseHeader.class); + } + throw new MQBrokerException(response.getCode(), response.getRemark()); + } + /** * Get broker replica info */ diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/MQClientAPIImpl.java b/client/src/main/java/org/apache/rocketmq/client/impl/MQClientAPIImpl.java index bb5ced847..ab8880625 100644 --- a/client/src/main/java/org/apache/rocketmq/client/impl/MQClientAPIImpl.java +++ b/client/src/main/java/org/apache/rocketmq/client/impl/MQClientAPIImpl.java @@ -3076,14 +3076,14 @@ public class MQClientAPIImpl implements NameServerUpdateCallback { } public ElectMasterResponseHeader electMaster(String controllerAddr, String clusterName, String brokerName, - String brokerAddr) throws MQBrokerException, RemotingConnectException, RemotingSendRequestException, RemotingTimeoutException, InterruptedException, RemotingCommandException { + Long brokerId) throws MQBrokerException, RemotingConnectException, RemotingSendRequestException, RemotingTimeoutException, InterruptedException, RemotingCommandException { //get controller leader address final GetMetaDataResponseHeader controllerMetaData = this.getControllerMetaData(controllerAddr); assert controllerMetaData != null; assert controllerMetaData.getControllerLeaderAddress() != null; final String leaderAddress = controllerMetaData.getControllerLeaderAddress(); - ElectMasterRequestHeader electRequestHeader = ElectMasterRequestHeader.ofAdminTrigger(clusterName, brokerName, brokerAddr); + ElectMasterRequestHeader electRequestHeader = ElectMasterRequestHeader.ofAdminTrigger(clusterName, brokerName, brokerId); RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.CONTROLLER_ELECT_MASTER, electRequestHeader); final RemotingCommand response = this.remotingClient.invokeSync(leaderAddress, request, 3000); diff --git a/controller/src/main/java/org/apache/rocketmq/controller/ControllerManager.java b/controller/src/main/java/org/apache/rocketmq/controller/ControllerManager.java index 662403192..116607cc1 100644 --- a/controller/src/main/java/org/apache/rocketmq/controller/ControllerManager.java +++ b/controller/src/main/java/org/apache/rocketmq/controller/ControllerManager.java @@ -204,6 +204,9 @@ public class ControllerManager { controllerRemotingServer.registerProcessor(RequestCode.UPDATE_CONTROLLER_CONFIG, controllerRequestProcessor, this.controllerRequestExecutor); controllerRemotingServer.registerProcessor(RequestCode.GET_CONTROLLER_CONFIG, controllerRequestProcessor, this.controllerRequestExecutor); controllerRemotingServer.registerProcessor(RequestCode.CLEAN_BROKER_DATA, controllerRequestProcessor, this.controllerRequestExecutor); + controllerRemotingServer.registerProcessor(RequestCode.GET_NEXT_BROKER_ID, controllerRequestProcessor, this.controllerRequestExecutor); + controllerRemotingServer.registerProcessor(RequestCode.APPLY_BROKER_ID, controllerRequestProcessor, this.controllerRequestExecutor); + controllerRemotingServer.registerProcessor(RequestCode.REGISTER_SUCCESS, controllerRequestProcessor, this.controllerRequestExecutor); } public void start() { diff --git a/controller/src/main/java/org/apache/rocketmq/controller/processor/ControllerRequestProcessor.java b/controller/src/main/java/org/apache/rocketmq/controller/processor/ControllerRequestProcessor.java index ec84f4ca7..6dea0e1d7 100644 --- a/controller/src/main/java/org/apache/rocketmq/controller/processor/ControllerRequestProcessor.java +++ b/controller/src/main/java/org/apache/rocketmq/controller/processor/ControllerRequestProcessor.java @@ -45,6 +45,7 @@ import org.apache.rocketmq.remoting.protocol.header.controller.GetReplicaInfoReq import org.apache.rocketmq.remoting.protocol.header.controller.register.RegisterBrokerToControllerRequestHeader; import org.apache.rocketmq.remoting.protocol.header.controller.register.RegisterBrokerToControllerResponseHeader; +import static org.apache.rocketmq.remoting.protocol.RequestCode.APPLY_BROKER_ID; import static org.apache.rocketmq.remoting.protocol.RequestCode.BROKER_HEARTBEAT; import static org.apache.rocketmq.remoting.protocol.RequestCode.CLEAN_BROKER_DATA; import static org.apache.rocketmq.remoting.protocol.RequestCode.CONTROLLER_ALTER_SYNC_STATE_SET; @@ -54,6 +55,8 @@ import static org.apache.rocketmq.remoting.protocol.RequestCode.CONTROLLER_GET_R import static org.apache.rocketmq.remoting.protocol.RequestCode.CONTROLLER_GET_SYNC_STATE_DATA; import static org.apache.rocketmq.remoting.protocol.RequestCode.CONTROLLER_REGISTER_BROKER; import static org.apache.rocketmq.remoting.protocol.RequestCode.GET_CONTROLLER_CONFIG; +import static org.apache.rocketmq.remoting.protocol.RequestCode.GET_NEXT_BROKER_ID; +import static org.apache.rocketmq.remoting.protocol.RequestCode.REGISTER_SUCCESS; import static org.apache.rocketmq.remoting.protocol.RequestCode.UPDATE_CONTROLLER_CONFIG; /** @@ -99,6 +102,12 @@ public class ControllerRequestProcessor implements NettyRequestProcessor { return this.handleGetControllerConfig(ctx, request); case CLEAN_BROKER_DATA: return this.handleCleanBrokerData(ctx, request); + case GET_NEXT_BROKER_ID: + return this.handleGetNextBrokerId(ctx, request); + case APPLY_BROKER_ID: + return this.handleApplyBrokerId(ctx, request); + case REGISTER_SUCCESS: + return this.handleRegisterSuccess(ctx, request); default: { final String error = " request type " + request.getCode() + " not supported"; return RemotingCommand.createResponseCommand(ResponseCode.REQUEST_CODE_NOT_SUPPORTED, error); diff --git a/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/RequestCode.java b/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/RequestCode.java index 6c93a5d46..62fd0198f 100644 --- a/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/RequestCode.java +++ b/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/RequestCode.java @@ -275,5 +275,11 @@ public class RequestCode { */ public static final int CLEAN_BROKER_DATA = 1011; + public static final int GET_NEXT_BROKER_ID = 1012; + + public static final int APPLY_BROKER_ID = 1013; + + public static final int REGISTER_SUCCESS = 1014; + } diff --git a/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/ResponseCode.java b/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/ResponseCode.java index 8e4e3770e..6554fe509 100644 --- a/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/ResponseCode.java +++ b/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/ResponseCode.java @@ -120,4 +120,6 @@ public class ResponseCode extends RemotingSysResponseCode { public static final int CONTROLLER_ELECT_MASTER_FAILED = 2012; public static final int CONTROLLER_ALTER_SYNC_STATE_SET_FAILED = 2013; + + public static final int CONTROLLER_BROKER_ID_INVALID = 2014; } diff --git a/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/header/controller/register/ApplyBrokerIdRequestHeader.java b/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/header/controller/register/ApplyBrokerIdRequestHeader.java index e8e1ea944..780f519a7 100644 --- a/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/header/controller/register/ApplyBrokerIdRequestHeader.java +++ b/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/header/controller/register/ApplyBrokerIdRequestHeader.java @@ -30,6 +30,13 @@ public class ApplyBrokerIdRequestHeader implements CommandCustomHeader { private String registerCheckCode; + public ApplyBrokerIdRequestHeader(String clusterName, String brokerName, Long appliedBrokerId, String registerCheckCode) { + this.clusterName = clusterName; + this.brokerName = brokerName; + this.appliedBrokerId = appliedBrokerId; + this.registerCheckCode = registerCheckCode; + } + @Override public void checkFields() throws RemotingCommandException { diff --git a/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/header/controller/register/ApplyBrokerIdResponseHeader.java b/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/header/controller/register/ApplyBrokerIdResponseHeader.java index 382297f21..d83164747 100644 --- a/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/header/controller/register/ApplyBrokerIdResponseHeader.java +++ b/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/header/controller/register/ApplyBrokerIdResponseHeader.java @@ -26,17 +26,11 @@ public class ApplyBrokerIdResponseHeader implements CommandCustomHeader { private String brokerName; - // if nextBrokerId isn't null, means that matched ApplyBrokerIdRequest is failed. - private Long nextBrokerId; - public ApplyBrokerIdResponseHeader(String clusterName, String brokerName) { - this(clusterName, brokerName, null); - } - public ApplyBrokerIdResponseHeader(String clusterName, String brokerName, Long nextBrokerId) { + public ApplyBrokerIdResponseHeader(String clusterName, String brokerName) { this.clusterName = clusterName; this.brokerName = brokerName; - this.nextBrokerId = nextBrokerId; } @@ -45,7 +39,6 @@ public class ApplyBrokerIdResponseHeader implements CommandCustomHeader { return "ApplyBrokerIdResponseHeader{" + "clusterName='" + clusterName + '\'' + ", brokerName='" + brokerName + '\'' + - ", nextBrokerId=" + nextBrokerId + '}'; } @@ -53,4 +46,5 @@ public class ApplyBrokerIdResponseHeader implements CommandCustomHeader { public void checkFields() throws RemotingCommandException { } + } diff --git a/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/header/controller/register/GetNextBrokerIdResponseHeader.java b/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/header/controller/register/GetNextBrokerIdResponseHeader.java index 04f522a7a..ddec9b0ea 100644 --- a/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/header/controller/register/GetNextBrokerIdResponseHeader.java +++ b/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/header/controller/register/GetNextBrokerIdResponseHeader.java @@ -47,4 +47,8 @@ public class GetNextBrokerIdResponseHeader implements CommandCustomHeader { public void checkFields() throws RemotingCommandException { } + + public Long getNextBrokerId() { + return nextBrokerId; + } } diff --git a/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/header/controller/register/ApplyBrokerIdRequestHeader.java b/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/header/controller/register/RegisterSuccessRequestHeader.java similarity index 73% copy from remoting/src/main/java/org/apache/rocketmq/remoting/protocol/header/controller/register/ApplyBrokerIdRequestHeader.java copy to remoting/src/main/java/org/apache/rocketmq/remoting/protocol/header/controller/register/RegisterSuccessRequestHeader.java index e8e1ea944..721509ac9 100644 --- a/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/header/controller/register/ApplyBrokerIdRequestHeader.java +++ b/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/header/controller/register/RegisterSuccessRequestHeader.java @@ -20,15 +20,22 @@ package org.apache.rocketmq.remoting.protocol.header.controller.register; import org.apache.rocketmq.remoting.CommandCustomHeader; import org.apache.rocketmq.remoting.exception.RemotingCommandException; -public class ApplyBrokerIdRequestHeader implements CommandCustomHeader { +public class RegisterSuccessRequestHeader implements CommandCustomHeader { private String clusterName; private String brokerName; - private Long appliedBrokerId; + private Long brokerId; - private String registerCheckCode; + private String brokerAddress; + + public RegisterSuccessRequestHeader(String clusterName, String brokerName, Long brokerId, String brokerAddress) { + this.clusterName = clusterName; + this.brokerName = brokerName; + this.brokerId = brokerId; + this.brokerAddress = brokerAddress; + } @Override public void checkFields() throws RemotingCommandException { diff --git a/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/header/controller/register/ApplyBrokerIdRequestHeader.java b/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/header/controller/register/RegisterSuccessResponseHeader.java similarity index 83% copy from remoting/src/main/java/org/apache/rocketmq/remoting/protocol/header/controller/register/ApplyBrokerIdRequestHeader.java copy to remoting/src/main/java/org/apache/rocketmq/remoting/protocol/header/controller/register/RegisterSuccessResponseHeader.java index e8e1ea944..953190480 100644 --- a/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/header/controller/register/ApplyBrokerIdRequestHeader.java +++ b/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/header/controller/register/RegisterSuccessResponseHeader.java @@ -20,18 +20,11 @@ package org.apache.rocketmq.remoting.protocol.header.controller.register; import org.apache.rocketmq.remoting.CommandCustomHeader; import org.apache.rocketmq.remoting.exception.RemotingCommandException; -public class ApplyBrokerIdRequestHeader implements CommandCustomHeader { - - private String clusterName; - - private String brokerName; - - private Long appliedBrokerId; - - private String registerCheckCode; +public class RegisterSuccessResponseHeader implements CommandCustomHeader { @Override public void checkFields() throws RemotingCommandException { } + } diff --git a/store/src/main/java/org/apache/rocketmq/store/config/MessageStoreConfig.java b/store/src/main/java/org/apache/rocketmq/store/config/MessageStoreConfig.java index e29fdc2b0..0a722c765 100644 --- a/store/src/main/java/org/apache/rocketmq/store/config/MessageStoreConfig.java +++ b/store/src/main/java/org/apache/rocketmq/store/config/MessageStoreConfig.java @@ -41,6 +41,10 @@ public class MessageStoreConfig { private String storePathEpochFile = System.getProperty("user.home") + File.separator + "store" + File.separator + "epochFileCheckpoint"; + private String storePathMetadata = storePathRootDir + File.separator + "metadata"; + + private String storePathTempMetadata = storePathRootDir + File.separator + "metadata-temp"; + private String readOnlyCommitLogStorePaths = null; // CommitLog file size,default is 1G @@ -627,6 +631,22 @@ public class MessageStoreConfig { this.storePathEpochFile = storePathEpochFile; } + public String getStorePathMetadata() { + return storePathMetadata; + } + + public void setStorePathMetadata(String storePathMetadata) { + this.storePathMetadata = storePathMetadata; + } + + public String getStorePathTempMetadata() { + return storePathTempMetadata; + } + + public void setStorePathTempMetadata(String storePathTempMetadata) { + this.storePathTempMetadata = storePathTempMetadata; + } + public String getDeleteWhen() { return deleteWhen; } diff --git a/store/src/main/java/org/apache/rocketmq/store/ha/autoswitch/BrokerMetadata.java b/store/src/main/java/org/apache/rocketmq/store/ha/autoswitch/BrokerMetadata.java new file mode 100644 index 000000000..24f400204 --- /dev/null +++ b/store/src/main/java/org/apache/rocketmq/store/ha/autoswitch/BrokerMetadata.java @@ -0,0 +1,82 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.rocketmq.store.ha.autoswitch; + +import org.apache.commons.lang3.StringUtils; + +public class BrokerMetadata extends MetadataFile { + + private String clusterName; + + private String brokerName; + + private Long brokerId; + + public BrokerMetadata(String filePath) { + this.filePath = filePath; + } + + public void updateAndPersist(String clusterName, String brokerName, Long brokerId) throws Exception { + this.clusterName = clusterName; + this.brokerName = brokerName; + this.brokerId = brokerId; + writeToFile(); + } + + @Override + public String encodeToStr() { + StringBuilder sb = new StringBuilder(); + sb.append(clusterName).append(";"); + sb.append(brokerName).append(";"); + sb.append(brokerId); + return sb.toString(); + } + + @Override + public void decodeFromStr(String dataStr) { + if (dataStr == null) return; + String[] dataArr = dataStr.split(";"); + this.clusterName = dataArr[0]; + this.brokerName = dataArr[1]; + this.brokerId = Long.valueOf(dataArr[2]); + } + + @Override + public boolean isLoaded() { + return StringUtils.isNotEmpty(this.clusterName) && StringUtils.isNotEmpty(this.brokerName) && brokerId != null; + } + + @Override + public void clearInMem() { + this.clusterName = null; + this.brokerName = null; + this.brokerId = null; + } + + public String getBrokerName() { + return brokerName; + } + + public Long getBrokerId() { + return brokerId; + } + + public String getClusterName() { + return clusterName; + } +} diff --git a/store/src/main/java/org/apache/rocketmq/store/ha/autoswitch/MetadataFile.java b/store/src/main/java/org/apache/rocketmq/store/ha/autoswitch/MetadataFile.java new file mode 100644 index 000000000..2e3c3ba99 --- /dev/null +++ b/store/src/main/java/org/apache/rocketmq/store/ha/autoswitch/MetadataFile.java @@ -0,0 +1,60 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.rocketmq.store.ha.autoswitch; + +import org.apache.rocketmq.common.MixAll; + +import java.io.File; + +public abstract class MetadataFile { + + protected String filePath; + + public abstract String encodeToStr(); + + public abstract void decodeFromStr(String dataStr); + + public abstract boolean isLoaded(); + + public abstract void clearInMem(); + + public void writeToFile() throws Exception { + deleteFile(); + MixAll.string2File(encodeToStr(), this.filePath); + } + + public void readFromFile() throws Exception { + String dataStr = MixAll.file2String(filePath); + decodeFromStr(dataStr); + } + public boolean fileExists() { + File file = new File(filePath); + return file.exists(); + } + + public void deleteFile() { + File file = new File(filePath); + file.deleteOnExit(); + } + + public void clear() { + clearInMem(); + deleteFile(); + } + +} diff --git a/store/src/main/java/org/apache/rocketmq/store/ha/autoswitch/TempBrokerMetadata.java b/store/src/main/java/org/apache/rocketmq/store/ha/autoswitch/TempBrokerMetadata.java new file mode 100644 index 000000000..31b4aa5e8 --- /dev/null +++ b/store/src/main/java/org/apache/rocketmq/store/ha/autoswitch/TempBrokerMetadata.java @@ -0,0 +1,92 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.rocketmq.store.ha.autoswitch; + +import org.apache.commons.lang3.StringUtils; + +public class TempBrokerMetadata extends MetadataFile { + + private String clusterName; + + private String brokerName; + + private Long brokerId; + + private String registerCheckCode; + + public TempBrokerMetadata(String filePath) { + this(filePath, null, null, null, null); + } + + public TempBrokerMetadata(String filePath, String clusterName, String brokerName, Long brokerId, String registerCheckCode) { + this.filePath = filePath; + this.clusterName = clusterName; + this.brokerId = brokerId; + this.brokerName = brokerName; + this.registerCheckCode = registerCheckCode; + } + + public void updateAndPersist(String clusterName, String brokerName, Long brokerId, String registerCheckCode) throws Exception { + this.clusterName = clusterName; + this.brokerName = brokerName; + this.brokerId = brokerId; + this.registerCheckCode = registerCheckCode; + writeToFile(); + } + + @Override + public String encodeToStr() { + StringBuilder sb = new StringBuilder(); + sb.append(clusterName).append(";"); + sb.append(brokerName).append(";"); + sb.append(brokerId).append(";"); + sb.append(registerCheckCode); + return sb.toString(); + } + + @Override + public void decodeFromStr(String dataStr) { + if (dataStr == null) return; + String[] dataArr = dataStr.split(";"); + this.clusterName = dataArr[0]; + this.brokerName = dataArr[1]; + this.brokerId = Long.valueOf(dataArr[2]); + this.registerCheckCode = dataArr[3]; + } + + @Override + public boolean isLoaded() { + return StringUtils.isNotEmpty(this.clusterName) && StringUtils.isNotEmpty(this.brokerName) && brokerId != null && StringUtils.isNotEmpty(this.registerCheckCode); + } + + @Override + public void clearInMem() { + this.clusterName = null; + this.brokerName = null; + this.brokerId = null; + this.registerCheckCode = null; + } + + public Long getBrokerId() { + return brokerId; + } + + public String getRegisterCheckCode() { + return registerCheckCode; + } +}
