TheR1sing3un commented on code in PR #6100:
URL: https://github.com/apache/rocketmq/pull/6100#discussion_r1125868400
##########
controller/src/main/java/org/apache/rocketmq/controller/processor/ControllerRequestProcessor.java:
##########
@@ -77,100 +81,140 @@ public RemotingCommand
processRequest(ChannelHandlerContext ctx, RemotingCommand
request);
}
switch (request.getCode()) {
- case CONTROLLER_ALTER_SYNC_STATE_SET: {
- final AlterSyncStateSetRequestHeader controllerRequest =
(AlterSyncStateSetRequestHeader)
request.decodeCommandCustomHeader(AlterSyncStateSetRequestHeader.class);
- final SyncStateSet syncStateSet =
RemotingSerializable.decode(request.getBody(), SyncStateSet.class);
- final CompletableFuture<RemotingCommand> future =
this.controllerManager.getController().alterSyncStateSet(controllerRequest,
syncStateSet);
- if (future != null) {
- return future.get(WAIT_TIMEOUT_OUT, TimeUnit.SECONDS);
- }
- break;
- }
- case CONTROLLER_ELECT_MASTER: {
- final ElectMasterRequestHeader electMasterRequest =
(ElectMasterRequestHeader)
request.decodeCommandCustomHeader(ElectMasterRequestHeader.class);
- final CompletableFuture<RemotingCommand> future =
this.controllerManager.getController().electMaster(electMasterRequest);
- if (future != null) {
- final RemotingCommand response =
future.get(WAIT_TIMEOUT_OUT, TimeUnit.SECONDS);
- final ElectMasterResponseHeader responseHeader =
(ElectMasterResponseHeader) response.readCustomHeader();
-
- if (null != responseHeader) {
- if
(this.controllerManager.getControllerConfig().isNotifyBrokerRoleChanged()) {
-
this.controllerManager.notifyBrokerRoleChanged(responseHeader,
electMasterRequest.getClusterName());
- }
- }
- return response;
- }
- break;
- }
- case CONTROLLER_REGISTER_BROKER: {
- final RegisterBrokerToControllerRequestHeader
controllerRequest = (RegisterBrokerToControllerRequestHeader)
request.decodeCommandCustomHeader(RegisterBrokerToControllerRequestHeader.class);
- final CompletableFuture<RemotingCommand> future =
this.controllerManager.getController().registerBroker(controllerRequest);
- if (future != null) {
- final RemotingCommand response =
future.get(WAIT_TIMEOUT_OUT, TimeUnit.SECONDS);
- final RegisterBrokerToControllerResponseHeader
responseHeader = (RegisterBrokerToControllerResponseHeader)
response.readCustomHeader();
- if (responseHeader != null && responseHeader.getBrokerId()
>= 0) {
-
this.heartbeatManager.onBrokerHeartbeat(controllerRequest.getClusterName(),
controllerRequest.getBrokerName(), controllerRequest.getBrokerAddress(),
- responseHeader.getBrokerId(),
controllerRequest.getHeartbeatTimeoutMillis(), ctx.channel(),
- controllerRequest.getEpoch(),
controllerRequest.getMaxOffset(), controllerRequest.getConfirmOffset(),
controllerRequest.getElectionPriority());
- }
- return response;
- }
- break;
- }
- case CONTROLLER_GET_REPLICA_INFO: {
- final GetReplicaInfoRequestHeader controllerRequest =
(GetReplicaInfoRequestHeader)
request.decodeCommandCustomHeader(GetReplicaInfoRequestHeader.class);
- final CompletableFuture<RemotingCommand> future =
this.controllerManager.getController().getReplicaInfo(controllerRequest);
- if (future != null) {
- return future.get(WAIT_TIMEOUT_OUT, TimeUnit.SECONDS);
- }
- break;
- }
- case CONTROLLER_GET_METADATA_INFO: {
- return
this.controllerManager.getController().getControllerMetadata();
- }
- case BROKER_HEARTBEAT: {
- final BrokerHeartbeatRequestHeader requestHeader =
(BrokerHeartbeatRequestHeader)
request.decodeCommandCustomHeader(BrokerHeartbeatRequestHeader.class);
-
this.heartbeatManager.onBrokerHeartbeat(requestHeader.getClusterName(),
requestHeader.getBrokerName(), requestHeader.getBrokerAddr(),
requestHeader.getBrokerId(),
- requestHeader.getHeartbeatTimeoutMills(), ctx.channel(),
requestHeader.getEpoch(), requestHeader.getMaxOffset(),
requestHeader.getConfirmOffset(), requestHeader.getElectionPriority());
- return
RemotingCommand.createResponseCommand(ResponseCode.SUCCESS, "Heart beat
success");
- }
- case CONTROLLER_GET_SYNC_STATE_DATA: {
- if (request.getBody() != null) {
- final List<String> brokerNames =
RemotingSerializable.decode(request.getBody(), List.class);
- if (brokerNames != null && brokerNames.size() > 0) {
- final CompletableFuture<RemotingCommand> future =
this.controllerManager.getController().getSyncStateData(brokerNames);
- if (future != null) {
- return future.get(WAIT_TIMEOUT_OUT,
TimeUnit.SECONDS);
- }
- }
- }
- break;
- }
+ case CONTROLLER_ALTER_SYNC_STATE_SET:
+ return this.handleAlterSyncStateSet(ctx, request);
+ case CONTROLLER_ELECT_MASTER:
+ return this.handleControllerElectMaster(ctx, request);
+ case CONTROLLER_GET_REPLICA_INFO:
+ return this.handleControllerGetReplicaInfo(ctx, request);
+ case CONTROLLER_GET_METADATA_INFO:
+ return this.handleControllerGetMetadataInfo(ctx, request);
+ case BROKER_HEARTBEAT:
+ return this.handleBrokerHeartbeat(ctx, request);
+ case CONTROLLER_GET_SYNC_STATE_DATA:
+ return this.handleControllerGetSyncStateData(ctx, request);
case UPDATE_CONTROLLER_CONFIG:
- return this.updateControllerConfig(ctx, request);
+ return this.handleUpdateControllerConfig(ctx, request);
case GET_CONTROLLER_CONFIG:
- return this.getControllerConfig(ctx, request);
+ return this.handleGetControllerConfig(ctx, request);
case CLEAN_BROKER_DATA:
- final CleanControllerBrokerDataRequestHeader requestHeader =
(CleanControllerBrokerDataRequestHeader)
request.decodeCommandCustomHeader(CleanControllerBrokerDataRequestHeader.class);
- final CompletableFuture<RemotingCommand> future =
this.controllerManager.getController().cleanBrokerData(requestHeader);
- if (null != future) {
- return future.get(WAIT_TIMEOUT_OUT, TimeUnit.SECONDS);
- }
- break;
+ return this.handleCleanBrokerData(ctx, request);
+ case CONTROLLER_GET_NEXT_BROKER_ID:
+ return this.handleGetNextBrokerId(ctx, request);
+ case CONTROLLER_APPLY_BROKER_ID:
+ return this.handleApplyBrokerId(ctx, request);
+ case CONTROLLER_REGISTER_BROKER:
+ return this.handleRegisterBroker(ctx, request);
default: {
final String error = " request type " + request.getCode() + "
not supported";
return
RemotingCommand.createResponseCommand(ResponseCode.REQUEST_CODE_NOT_SUPPORTED,
error);
}
}
+ }
+
+ private RemotingCommand handleAlterSyncStateSet(ChannelHandlerContext ctx,
+ RemotingCommand request) throws Exception {
+ final AlterSyncStateSetRequestHeader controllerRequest =
(AlterSyncStateSetRequestHeader)
request.decodeCommandCustomHeader(AlterSyncStateSetRequestHeader.class);
+ final SyncStateSet syncStateSet =
RemotingSerializable.decode(request.getBody(), SyncStateSet.class);
+ final CompletableFuture<RemotingCommand> future =
this.controllerManager.getController().alterSyncStateSet(controllerRequest,
syncStateSet);
+ if (future != null) {
+ return future.get(WAIT_TIMEOUT_OUT, TimeUnit.SECONDS);
+ }
return RemotingCommand.createResponseCommand(null);
}
- @Override
- public boolean rejectRequest() {
- return false;
+ private RemotingCommand handleControllerElectMaster(ChannelHandlerContext
ctx,
+ RemotingCommand request) throws Exception {
+ final ElectMasterRequestHeader electMasterRequest =
(ElectMasterRequestHeader)
request.decodeCommandCustomHeader(ElectMasterRequestHeader.class);
+ final CompletableFuture<RemotingCommand> future =
this.controllerManager.getController().electMaster(electMasterRequest);
+ if (future != null) {
+ final RemotingCommand response = future.get(WAIT_TIMEOUT_OUT,
TimeUnit.SECONDS);
+
+ if (response.getCode() == ResponseCode.SUCCESS) {
+ if
(this.controllerManager.getControllerConfig().isNotifyBrokerRoleChanged()) {
+
this.controllerManager.notifyBrokerRoleChanged(RoleChangeNotifyEntry.convert(response));
+ }
+ }
+ return response;
+ }
+ return RemotingCommand.createResponseCommand(null);
+ }
+
+ private RemotingCommand
handleControllerGetReplicaInfo(ChannelHandlerContext ctx,
+ RemotingCommand
request) throws Exception {
+ final GetReplicaInfoRequestHeader controllerRequest =
(GetReplicaInfoRequestHeader)
request.decodeCommandCustomHeader(GetReplicaInfoRequestHeader.class);
+ final CompletableFuture<RemotingCommand> future =
this.controllerManager.getController().getReplicaInfo(controllerRequest);
+ if (future != null) {
+ return future.get(WAIT_TIMEOUT_OUT, TimeUnit.SECONDS);
+ }
+ return RemotingCommand.createResponseCommand(null);
+ }
+
+ private RemotingCommand
handleControllerGetMetadataInfo(ChannelHandlerContext ctx, RemotingCommand
request) {
+ return this.controllerManager.getController().getControllerMetadata();
+ }
+
+ private RemotingCommand handleBrokerHeartbeat(ChannelHandlerContext ctx,
RemotingCommand request) throws Exception {
+ final BrokerHeartbeatRequestHeader requestHeader =
(BrokerHeartbeatRequestHeader)
request.decodeCommandCustomHeader(BrokerHeartbeatRequestHeader.class);
+ if (requestHeader.getBrokerId() == null) {
+ return
RemotingCommand.createResponseCommand(ResponseCode.CONTROLLER_INVALID_REQUEST,
"Heart beat with empty brokerId");
+ }
+
this.heartbeatManager.onBrokerHeartbeat(requestHeader.getClusterName(),
requestHeader.getBrokerName(), requestHeader.getBrokerAddr(),
requestHeader.getBrokerId(),
+ requestHeader.getHeartbeatTimeoutMills(), ctx.channel(),
requestHeader.getEpoch(), requestHeader.getMaxOffset(),
requestHeader.getConfirmOffset(), requestHeader.getElectionPriority());
+ return RemotingCommand.createResponseCommand(ResponseCode.SUCCESS,
"Heart beat success");
+ }
+
+ private RemotingCommand
handleControllerGetSyncStateData(ChannelHandlerContext ctx,
+ RemotingCommand
request) throws Exception {
+ if (request.getBody() != null) {
+ final List<String> brokerNames =
RemotingSerializable.decode(request.getBody(), List.class);
+ if (brokerNames != null && brokerNames.size() > 0) {
+ final CompletableFuture<RemotingCommand> future =
this.controllerManager.getController().getSyncStateData(brokerNames);
+ if (future != null) {
+ return future.get(WAIT_TIMEOUT_OUT, TimeUnit.SECONDS);
+ }
+ }
+ }
+ return RemotingCommand.createResponseCommand(null);
+ }
+
+ private RemotingCommand handleCleanBrokerData(ChannelHandlerContext ctx,
RemotingCommand request) throws Exception {
+ final CleanControllerBrokerDataRequestHeader requestHeader =
(CleanControllerBrokerDataRequestHeader)
request.decodeCommandCustomHeader(CleanControllerBrokerDataRequestHeader.class);
+ final CompletableFuture<RemotingCommand> future =
this.controllerManager.getController().cleanBrokerData(requestHeader);
+ if (null != future) {
+ return future.get(WAIT_TIMEOUT_OUT, TimeUnit.SECONDS);
+ }
+ return RemotingCommand.createResponseCommand(null);
}
- private RemotingCommand updateControllerConfig(ChannelHandlerContext ctx,
RemotingCommand request) {
+ private RemotingCommand handleGetNextBrokerId(ChannelHandlerContext ctx,
RemotingCommand request) throws Exception {
+ final GetNextBrokerIdRequestHeader requestHeader =
(GetNextBrokerIdRequestHeader)
request.decodeCommandCustomHeader(GetNextBrokerIdRequestHeader.class);
+ CompletableFuture<RemotingCommand> future =
this.controllerManager.getController().getNextBrokerId(requestHeader);
+ if (future != null) {
+ return future.get(WAIT_TIMEOUT_OUT, TimeUnit.SECONDS);
+ }
+ return RemotingCommand.createResponseCommand(null);
+ }
+
+ private RemotingCommand handleApplyBrokerId(ChannelHandlerContext ctx,
RemotingCommand request) throws Exception {
+ final ApplyBrokerIdRequestHeader requestHeader =
(ApplyBrokerIdRequestHeader)
request.decodeCommandCustomHeader(ApplyBrokerIdRequestHeader.class);
+ CompletableFuture<RemotingCommand> future =
this.controllerManager.getController().applyBrokerId(requestHeader);
+ if (future != null) {
+ return future.get(WAIT_TIMEOUT_OUT, TimeUnit.SECONDS);
+ }
+ return RemotingCommand.createResponseCommand(null);
+ }
+
+ private RemotingCommand handleRegisterBroker(ChannelHandlerContext ctx,
RemotingCommand request) throws Exception {
+ RegisterBrokerToControllerRequestHeader requestHeader =
(RegisterBrokerToControllerRequestHeader)
request.decodeCommandCustomHeader(RegisterBrokerToControllerRequestHeader.class);
+ CompletableFuture<RemotingCommand> future =
this.controllerManager.getController().registerBroker(requestHeader);
+ if (future != null) {
+ return future.get(WAIT_TIMEOUT_OUT, TimeUnit.SECONDS);
+ }
+ return RemotingCommand.createResponseCommand(null);
+ }
Review Comment:
> 注册成功是否也能当作一次心跳,就和之前的一样
原本是这么想的,但是我发现需要加的心跳参数太多了,而且这个接口也只有注册流程中会用到(注册流程我们也会通过手动触发心跳来保证当前节点的活跃)。注册成功后,心跳定时任务就启动了。
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]