RongtongJin commented on code in PR #6100:
URL: https://github.com/apache/rocketmq/pull/6100#discussion_r1125893831
##########
controller/src/main/java/org/apache/rocketmq/controller/processor/ControllerRequestProcessor.java:
##########
@@ -77,100 +81,140 @@ public RemotingCommand
processRequest(ChannelHandlerContext ctx, RemotingCommand
request);
}
switch (request.getCode()) {
- case CONTROLLER_ALTER_SYNC_STATE_SET: {
- final AlterSyncStateSetRequestHeader controllerRequest =
(AlterSyncStateSetRequestHeader)
request.decodeCommandCustomHeader(AlterSyncStateSetRequestHeader.class);
- final SyncStateSet syncStateSet =
RemotingSerializable.decode(request.getBody(), SyncStateSet.class);
- final CompletableFuture<RemotingCommand> future =
this.controllerManager.getController().alterSyncStateSet(controllerRequest,
syncStateSet);
- if (future != null) {
- return future.get(WAIT_TIMEOUT_OUT, TimeUnit.SECONDS);
- }
- break;
- }
- case CONTROLLER_ELECT_MASTER: {
- final ElectMasterRequestHeader electMasterRequest =
(ElectMasterRequestHeader)
request.decodeCommandCustomHeader(ElectMasterRequestHeader.class);
- final CompletableFuture<RemotingCommand> future =
this.controllerManager.getController().electMaster(electMasterRequest);
- if (future != null) {
- final RemotingCommand response =
future.get(WAIT_TIMEOUT_OUT, TimeUnit.SECONDS);
- final ElectMasterResponseHeader responseHeader =
(ElectMasterResponseHeader) response.readCustomHeader();
-
- if (null != responseHeader) {
- if
(this.controllerManager.getControllerConfig().isNotifyBrokerRoleChanged()) {
-
this.controllerManager.notifyBrokerRoleChanged(responseHeader,
electMasterRequest.getClusterName());
- }
- }
- return response;
- }
- break;
- }
- case CONTROLLER_REGISTER_BROKER: {
- final RegisterBrokerToControllerRequestHeader
controllerRequest = (RegisterBrokerToControllerRequestHeader)
request.decodeCommandCustomHeader(RegisterBrokerToControllerRequestHeader.class);
- final CompletableFuture<RemotingCommand> future =
this.controllerManager.getController().registerBroker(controllerRequest);
- if (future != null) {
- final RemotingCommand response =
future.get(WAIT_TIMEOUT_OUT, TimeUnit.SECONDS);
- final RegisterBrokerToControllerResponseHeader
responseHeader = (RegisterBrokerToControllerResponseHeader)
response.readCustomHeader();
- if (responseHeader != null && responseHeader.getBrokerId()
>= 0) {
-
this.heartbeatManager.onBrokerHeartbeat(controllerRequest.getClusterName(),
controllerRequest.getBrokerName(), controllerRequest.getBrokerAddress(),
- responseHeader.getBrokerId(),
controllerRequest.getHeartbeatTimeoutMillis(), ctx.channel(),
- controllerRequest.getEpoch(),
controllerRequest.getMaxOffset(), controllerRequest.getConfirmOffset(),
controllerRequest.getElectionPriority());
- }
- return response;
- }
- break;
- }
- case CONTROLLER_GET_REPLICA_INFO: {
- final GetReplicaInfoRequestHeader controllerRequest =
(GetReplicaInfoRequestHeader)
request.decodeCommandCustomHeader(GetReplicaInfoRequestHeader.class);
- final CompletableFuture<RemotingCommand> future =
this.controllerManager.getController().getReplicaInfo(controllerRequest);
- if (future != null) {
- return future.get(WAIT_TIMEOUT_OUT, TimeUnit.SECONDS);
- }
- break;
- }
- case CONTROLLER_GET_METADATA_INFO: {
- return
this.controllerManager.getController().getControllerMetadata();
- }
- case BROKER_HEARTBEAT: {
- final BrokerHeartbeatRequestHeader requestHeader =
(BrokerHeartbeatRequestHeader)
request.decodeCommandCustomHeader(BrokerHeartbeatRequestHeader.class);
-
this.heartbeatManager.onBrokerHeartbeat(requestHeader.getClusterName(),
requestHeader.getBrokerName(), requestHeader.getBrokerAddr(),
requestHeader.getBrokerId(),
- requestHeader.getHeartbeatTimeoutMills(), ctx.channel(),
requestHeader.getEpoch(), requestHeader.getMaxOffset(),
requestHeader.getConfirmOffset(), requestHeader.getElectionPriority());
- return
RemotingCommand.createResponseCommand(ResponseCode.SUCCESS, "Heart beat
success");
- }
- case CONTROLLER_GET_SYNC_STATE_DATA: {
- if (request.getBody() != null) {
- final List<String> brokerNames =
RemotingSerializable.decode(request.getBody(), List.class);
- if (brokerNames != null && brokerNames.size() > 0) {
- final CompletableFuture<RemotingCommand> future =
this.controllerManager.getController().getSyncStateData(brokerNames);
- if (future != null) {
- return future.get(WAIT_TIMEOUT_OUT,
TimeUnit.SECONDS);
- }
- }
- }
- break;
- }
+ case CONTROLLER_ALTER_SYNC_STATE_SET:
+ return this.handleAlterSyncStateSet(ctx, request);
+ case CONTROLLER_ELECT_MASTER:
+ return this.handleControllerElectMaster(ctx, request);
+ case CONTROLLER_GET_REPLICA_INFO:
+ return this.handleControllerGetReplicaInfo(ctx, request);
+ case CONTROLLER_GET_METADATA_INFO:
+ return this.handleControllerGetMetadataInfo(ctx, request);
+ case BROKER_HEARTBEAT:
+ return this.handleBrokerHeartbeat(ctx, request);
+ case CONTROLLER_GET_SYNC_STATE_DATA:
+ return this.handleControllerGetSyncStateData(ctx, request);
case UPDATE_CONTROLLER_CONFIG:
- return this.updateControllerConfig(ctx, request);
+ return this.handleUpdateControllerConfig(ctx, request);
case GET_CONTROLLER_CONFIG:
- return this.getControllerConfig(ctx, request);
+ return this.handleGetControllerConfig(ctx, request);
case CLEAN_BROKER_DATA:
- final CleanControllerBrokerDataRequestHeader requestHeader =
(CleanControllerBrokerDataRequestHeader)
request.decodeCommandCustomHeader(CleanControllerBrokerDataRequestHeader.class);
- final CompletableFuture<RemotingCommand> future =
this.controllerManager.getController().cleanBrokerData(requestHeader);
- if (null != future) {
- return future.get(WAIT_TIMEOUT_OUT, TimeUnit.SECONDS);
- }
- break;
+ return this.handleCleanBrokerData(ctx, request);
+ case CONTROLLER_GET_NEXT_BROKER_ID:
+ return this.handleGetNextBrokerId(ctx, request);
+ case CONTROLLER_APPLY_BROKER_ID:
+ return this.handleApplyBrokerId(ctx, request);
+ case CONTROLLER_REGISTER_BROKER:
+ return this.handleRegisterBroker(ctx, request);
default: {
final String error = " request type " + request.getCode() + "
not supported";
return
RemotingCommand.createResponseCommand(ResponseCode.REQUEST_CODE_NOT_SUPPORTED,
error);
}
}
+ }
+
+ private RemotingCommand handleAlterSyncStateSet(ChannelHandlerContext ctx,
+ RemotingCommand request) throws Exception {
+ final AlterSyncStateSetRequestHeader controllerRequest =
(AlterSyncStateSetRequestHeader)
request.decodeCommandCustomHeader(AlterSyncStateSetRequestHeader.class);
+ final SyncStateSet syncStateSet =
RemotingSerializable.decode(request.getBody(), SyncStateSet.class);
+ final CompletableFuture<RemotingCommand> future =
this.controllerManager.getController().alterSyncStateSet(controllerRequest,
syncStateSet);
+ if (future != null) {
+ return future.get(WAIT_TIMEOUT_OUT, TimeUnit.SECONDS);
+ }
return RemotingCommand.createResponseCommand(null);
}
- @Override
- public boolean rejectRequest() {
- return false;
+ private RemotingCommand handleControllerElectMaster(ChannelHandlerContext
ctx,
+ RemotingCommand request) throws Exception {
+ final ElectMasterRequestHeader electMasterRequest =
(ElectMasterRequestHeader)
request.decodeCommandCustomHeader(ElectMasterRequestHeader.class);
+ final CompletableFuture<RemotingCommand> future =
this.controllerManager.getController().electMaster(electMasterRequest);
+ if (future != null) {
+ final RemotingCommand response = future.get(WAIT_TIMEOUT_OUT,
TimeUnit.SECONDS);
+
+ if (response.getCode() == ResponseCode.SUCCESS) {
+ if
(this.controllerManager.getControllerConfig().isNotifyBrokerRoleChanged()) {
+
this.controllerManager.notifyBrokerRoleChanged(RoleChangeNotifyEntry.convert(response));
+ }
+ }
+ return response;
+ }
+ return RemotingCommand.createResponseCommand(null);
+ }
+
+ private RemotingCommand
handleControllerGetReplicaInfo(ChannelHandlerContext ctx,
+ RemotingCommand
request) throws Exception {
+ final GetReplicaInfoRequestHeader controllerRequest =
(GetReplicaInfoRequestHeader)
request.decodeCommandCustomHeader(GetReplicaInfoRequestHeader.class);
+ final CompletableFuture<RemotingCommand> future =
this.controllerManager.getController().getReplicaInfo(controllerRequest);
+ if (future != null) {
+ return future.get(WAIT_TIMEOUT_OUT, TimeUnit.SECONDS);
+ }
+ return RemotingCommand.createResponseCommand(null);
+ }
+
+ private RemotingCommand
handleControllerGetMetadataInfo(ChannelHandlerContext ctx, RemotingCommand
request) {
+ return this.controllerManager.getController().getControllerMetadata();
+ }
+
+ private RemotingCommand handleBrokerHeartbeat(ChannelHandlerContext ctx,
RemotingCommand request) throws Exception {
+ final BrokerHeartbeatRequestHeader requestHeader =
(BrokerHeartbeatRequestHeader)
request.decodeCommandCustomHeader(BrokerHeartbeatRequestHeader.class);
+ if (requestHeader.getBrokerId() == null) {
+ return
RemotingCommand.createResponseCommand(ResponseCode.CONTROLLER_INVALID_REQUEST,
"Heart beat with empty brokerId");
+ }
+
this.heartbeatManager.onBrokerHeartbeat(requestHeader.getClusterName(),
requestHeader.getBrokerName(), requestHeader.getBrokerAddr(),
requestHeader.getBrokerId(),
+ requestHeader.getHeartbeatTimeoutMills(), ctx.channel(),
requestHeader.getEpoch(), requestHeader.getMaxOffset(),
requestHeader.getConfirmOffset(), requestHeader.getElectionPriority());
+ return RemotingCommand.createResponseCommand(ResponseCode.SUCCESS,
"Heart beat success");
+ }
+
+ private RemotingCommand
handleControllerGetSyncStateData(ChannelHandlerContext ctx,
+ RemotingCommand
request) throws Exception {
+ if (request.getBody() != null) {
+ final List<String> brokerNames =
RemotingSerializable.decode(request.getBody(), List.class);
+ if (brokerNames != null && brokerNames.size() > 0) {
+ final CompletableFuture<RemotingCommand> future =
this.controllerManager.getController().getSyncStateData(brokerNames);
+ if (future != null) {
+ return future.get(WAIT_TIMEOUT_OUT, TimeUnit.SECONDS);
+ }
+ }
+ }
+ return RemotingCommand.createResponseCommand(null);
+ }
+
+ private RemotingCommand handleCleanBrokerData(ChannelHandlerContext ctx,
RemotingCommand request) throws Exception {
+ final CleanControllerBrokerDataRequestHeader requestHeader =
(CleanControllerBrokerDataRequestHeader)
request.decodeCommandCustomHeader(CleanControllerBrokerDataRequestHeader.class);
+ final CompletableFuture<RemotingCommand> future =
this.controllerManager.getController().cleanBrokerData(requestHeader);
+ if (null != future) {
+ return future.get(WAIT_TIMEOUT_OUT, TimeUnit.SECONDS);
+ }
+ return RemotingCommand.createResponseCommand(null);
}
- private RemotingCommand updateControllerConfig(ChannelHandlerContext ctx,
RemotingCommand request) {
+ private RemotingCommand handleGetNextBrokerId(ChannelHandlerContext ctx,
RemotingCommand request) throws Exception {
+ final GetNextBrokerIdRequestHeader requestHeader =
(GetNextBrokerIdRequestHeader)
request.decodeCommandCustomHeader(GetNextBrokerIdRequestHeader.class);
+ CompletableFuture<RemotingCommand> future =
this.controllerManager.getController().getNextBrokerId(requestHeader);
+ if (future != null) {
+ return future.get(WAIT_TIMEOUT_OUT, TimeUnit.SECONDS);
+ }
+ return RemotingCommand.createResponseCommand(null);
+ }
+
+ private RemotingCommand handleApplyBrokerId(ChannelHandlerContext ctx,
RemotingCommand request) throws Exception {
+ final ApplyBrokerIdRequestHeader requestHeader =
(ApplyBrokerIdRequestHeader)
request.decodeCommandCustomHeader(ApplyBrokerIdRequestHeader.class);
+ CompletableFuture<RemotingCommand> future =
this.controllerManager.getController().applyBrokerId(requestHeader);
+ if (future != null) {
+ return future.get(WAIT_TIMEOUT_OUT, TimeUnit.SECONDS);
+ }
+ return RemotingCommand.createResponseCommand(null);
+ }
+
+ private RemotingCommand handleRegisterBroker(ChannelHandlerContext ctx,
RemotingCommand request) throws Exception {
+ RegisterBrokerToControllerRequestHeader requestHeader =
(RegisterBrokerToControllerRequestHeader)
request.decodeCommandCustomHeader(RegisterBrokerToControllerRequestHeader.class);
+ CompletableFuture<RemotingCommand> future =
this.controllerManager.getController().registerBroker(requestHeader);
+ if (future != null) {
+ return future.get(WAIT_TIMEOUT_OUT, TimeUnit.SECONDS);
+ }
+ return RemotingCommand.createResponseCommand(null);
+ }
Review Comment:
>
好的
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]