RongtongJin commented on code in PR #6100:
URL: https://github.com/apache/rocketmq/pull/6100#discussion_r1125893831


##########
controller/src/main/java/org/apache/rocketmq/controller/processor/ControllerRequestProcessor.java:
##########
@@ -77,100 +81,140 @@ public RemotingCommand 
processRequest(ChannelHandlerContext ctx, RemotingCommand
                 request);
         }
         switch (request.getCode()) {
-            case CONTROLLER_ALTER_SYNC_STATE_SET: {
-                final AlterSyncStateSetRequestHeader controllerRequest = 
(AlterSyncStateSetRequestHeader) 
request.decodeCommandCustomHeader(AlterSyncStateSetRequestHeader.class);
-                final SyncStateSet syncStateSet = 
RemotingSerializable.decode(request.getBody(), SyncStateSet.class);
-                final CompletableFuture<RemotingCommand> future = 
this.controllerManager.getController().alterSyncStateSet(controllerRequest, 
syncStateSet);
-                if (future != null) {
-                    return future.get(WAIT_TIMEOUT_OUT, TimeUnit.SECONDS);
-                }
-                break;
-            }
-            case CONTROLLER_ELECT_MASTER: {
-                final ElectMasterRequestHeader electMasterRequest = 
(ElectMasterRequestHeader) 
request.decodeCommandCustomHeader(ElectMasterRequestHeader.class);
-                final CompletableFuture<RemotingCommand> future = 
this.controllerManager.getController().electMaster(electMasterRequest);
-                if (future != null) {
-                    final RemotingCommand response = 
future.get(WAIT_TIMEOUT_OUT, TimeUnit.SECONDS);
-                    final ElectMasterResponseHeader responseHeader = 
(ElectMasterResponseHeader) response.readCustomHeader();
-
-                    if (null != responseHeader) {
-                        if 
(this.controllerManager.getControllerConfig().isNotifyBrokerRoleChanged()) {
-                            
this.controllerManager.notifyBrokerRoleChanged(responseHeader, 
electMasterRequest.getClusterName());
-                        }
-                    }
-                    return response;
-                }
-                break;
-            }
-            case CONTROLLER_REGISTER_BROKER: {
-                final RegisterBrokerToControllerRequestHeader 
controllerRequest = (RegisterBrokerToControllerRequestHeader) 
request.decodeCommandCustomHeader(RegisterBrokerToControllerRequestHeader.class);
-                final CompletableFuture<RemotingCommand> future = 
this.controllerManager.getController().registerBroker(controllerRequest);
-                if (future != null) {
-                    final RemotingCommand response = 
future.get(WAIT_TIMEOUT_OUT, TimeUnit.SECONDS);
-                    final RegisterBrokerToControllerResponseHeader 
responseHeader = (RegisterBrokerToControllerResponseHeader) 
response.readCustomHeader();
-                    if (responseHeader != null && responseHeader.getBrokerId() 
>= 0) {
-                        
this.heartbeatManager.onBrokerHeartbeat(controllerRequest.getClusterName(), 
controllerRequest.getBrokerName(), controllerRequest.getBrokerAddress(),
-                            responseHeader.getBrokerId(), 
controllerRequest.getHeartbeatTimeoutMillis(), ctx.channel(),
-                            controllerRequest.getEpoch(), 
controllerRequest.getMaxOffset(), controllerRequest.getConfirmOffset(), 
controllerRequest.getElectionPriority());
-                    }
-                    return response;
-                }
-                break;
-            }
-            case CONTROLLER_GET_REPLICA_INFO: {
-                final GetReplicaInfoRequestHeader controllerRequest = 
(GetReplicaInfoRequestHeader) 
request.decodeCommandCustomHeader(GetReplicaInfoRequestHeader.class);
-                final CompletableFuture<RemotingCommand> future = 
this.controllerManager.getController().getReplicaInfo(controllerRequest);
-                if (future != null) {
-                    return future.get(WAIT_TIMEOUT_OUT, TimeUnit.SECONDS);
-                }
-                break;
-            }
-            case CONTROLLER_GET_METADATA_INFO: {
-                return 
this.controllerManager.getController().getControllerMetadata();
-            }
-            case BROKER_HEARTBEAT: {
-                final BrokerHeartbeatRequestHeader requestHeader = 
(BrokerHeartbeatRequestHeader) 
request.decodeCommandCustomHeader(BrokerHeartbeatRequestHeader.class);
-                
this.heartbeatManager.onBrokerHeartbeat(requestHeader.getClusterName(), 
requestHeader.getBrokerName(), requestHeader.getBrokerAddr(), 
requestHeader.getBrokerId(),
-                    requestHeader.getHeartbeatTimeoutMills(), ctx.channel(), 
requestHeader.getEpoch(), requestHeader.getMaxOffset(), 
requestHeader.getConfirmOffset(), requestHeader.getElectionPriority());
-                return 
RemotingCommand.createResponseCommand(ResponseCode.SUCCESS, "Heart beat 
success");
-            }
-            case CONTROLLER_GET_SYNC_STATE_DATA: {
-                if (request.getBody() != null) {
-                    final List<String> brokerNames = 
RemotingSerializable.decode(request.getBody(), List.class);
-                    if (brokerNames != null && brokerNames.size() > 0) {
-                        final CompletableFuture<RemotingCommand> future = 
this.controllerManager.getController().getSyncStateData(brokerNames);
-                        if (future != null) {
-                            return future.get(WAIT_TIMEOUT_OUT, 
TimeUnit.SECONDS);
-                        }
-                    }
-                }
-                break;
-            }
+            case CONTROLLER_ALTER_SYNC_STATE_SET:
+                return this.handleAlterSyncStateSet(ctx, request);
+            case CONTROLLER_ELECT_MASTER:
+                return this.handleControllerElectMaster(ctx, request);
+            case CONTROLLER_GET_REPLICA_INFO:
+                return this.handleControllerGetReplicaInfo(ctx, request);
+            case CONTROLLER_GET_METADATA_INFO:
+                return this.handleControllerGetMetadataInfo(ctx, request);
+            case BROKER_HEARTBEAT:
+                return this.handleBrokerHeartbeat(ctx, request);
+            case CONTROLLER_GET_SYNC_STATE_DATA:
+                return this.handleControllerGetSyncStateData(ctx, request);
             case UPDATE_CONTROLLER_CONFIG:
-                return this.updateControllerConfig(ctx, request);
+                return this.handleUpdateControllerConfig(ctx, request);
             case GET_CONTROLLER_CONFIG:
-                return this.getControllerConfig(ctx, request);
+                return this.handleGetControllerConfig(ctx, request);
             case CLEAN_BROKER_DATA:
-                final CleanControllerBrokerDataRequestHeader requestHeader = 
(CleanControllerBrokerDataRequestHeader) 
request.decodeCommandCustomHeader(CleanControllerBrokerDataRequestHeader.class);
-                final CompletableFuture<RemotingCommand> future = 
this.controllerManager.getController().cleanBrokerData(requestHeader);
-                if (null != future) {
-                    return future.get(WAIT_TIMEOUT_OUT, TimeUnit.SECONDS);
-                }
-                break;
+                return this.handleCleanBrokerData(ctx, request);
+            case CONTROLLER_GET_NEXT_BROKER_ID:
+                return this.handleGetNextBrokerId(ctx, request);
+            case CONTROLLER_APPLY_BROKER_ID:
+                return this.handleApplyBrokerId(ctx, request);
+            case CONTROLLER_REGISTER_BROKER:
+                return this.handleRegisterBroker(ctx, request);
             default: {
                 final String error = " request type " + request.getCode() + " 
not supported";
                 return 
RemotingCommand.createResponseCommand(ResponseCode.REQUEST_CODE_NOT_SUPPORTED, 
error);
             }
         }
+    }
+
+    private RemotingCommand handleAlterSyncStateSet(ChannelHandlerContext ctx,
+        RemotingCommand request) throws Exception {
+        final AlterSyncStateSetRequestHeader controllerRequest = 
(AlterSyncStateSetRequestHeader) 
request.decodeCommandCustomHeader(AlterSyncStateSetRequestHeader.class);
+        final SyncStateSet syncStateSet = 
RemotingSerializable.decode(request.getBody(), SyncStateSet.class);
+        final CompletableFuture<RemotingCommand> future = 
this.controllerManager.getController().alterSyncStateSet(controllerRequest, 
syncStateSet);
+        if (future != null) {
+            return future.get(WAIT_TIMEOUT_OUT, TimeUnit.SECONDS);
+        }
         return RemotingCommand.createResponseCommand(null);
     }
 
-    @Override
-    public boolean rejectRequest() {
-        return false;
+    private RemotingCommand handleControllerElectMaster(ChannelHandlerContext 
ctx,
+        RemotingCommand request) throws Exception {
+        final ElectMasterRequestHeader electMasterRequest = 
(ElectMasterRequestHeader) 
request.decodeCommandCustomHeader(ElectMasterRequestHeader.class);
+        final CompletableFuture<RemotingCommand> future = 
this.controllerManager.getController().electMaster(electMasterRequest);
+        if (future != null) {
+            final RemotingCommand response = future.get(WAIT_TIMEOUT_OUT, 
TimeUnit.SECONDS);
+
+            if (response.getCode() == ResponseCode.SUCCESS) {
+                if 
(this.controllerManager.getControllerConfig().isNotifyBrokerRoleChanged()) {
+                    
this.controllerManager.notifyBrokerRoleChanged(RoleChangeNotifyEntry.convert(response));
+                }
+            }
+            return response;
+        }
+        return RemotingCommand.createResponseCommand(null);
+    }
+
+    private RemotingCommand 
handleControllerGetReplicaInfo(ChannelHandlerContext ctx,
+                                                           RemotingCommand 
request) throws Exception {
+        final GetReplicaInfoRequestHeader controllerRequest = 
(GetReplicaInfoRequestHeader) 
request.decodeCommandCustomHeader(GetReplicaInfoRequestHeader.class);
+        final CompletableFuture<RemotingCommand> future = 
this.controllerManager.getController().getReplicaInfo(controllerRequest);
+        if (future != null) {
+            return future.get(WAIT_TIMEOUT_OUT, TimeUnit.SECONDS);
+        }
+        return RemotingCommand.createResponseCommand(null);
+    }
+
+    private RemotingCommand 
handleControllerGetMetadataInfo(ChannelHandlerContext ctx, RemotingCommand 
request) {
+        return this.controllerManager.getController().getControllerMetadata();
+    }
+
+    private RemotingCommand handleBrokerHeartbeat(ChannelHandlerContext ctx, 
RemotingCommand request) throws Exception {
+        final BrokerHeartbeatRequestHeader requestHeader = 
(BrokerHeartbeatRequestHeader) 
request.decodeCommandCustomHeader(BrokerHeartbeatRequestHeader.class);
+        if (requestHeader.getBrokerId() == null) {
+            return 
RemotingCommand.createResponseCommand(ResponseCode.CONTROLLER_INVALID_REQUEST, 
"Heart beat with empty brokerId");
+        }
+        
this.heartbeatManager.onBrokerHeartbeat(requestHeader.getClusterName(), 
requestHeader.getBrokerName(), requestHeader.getBrokerAddr(), 
requestHeader.getBrokerId(),
+                requestHeader.getHeartbeatTimeoutMills(), ctx.channel(), 
requestHeader.getEpoch(), requestHeader.getMaxOffset(), 
requestHeader.getConfirmOffset(), requestHeader.getElectionPriority());
+        return RemotingCommand.createResponseCommand(ResponseCode.SUCCESS, 
"Heart beat success");
+    }
+
+    private RemotingCommand 
handleControllerGetSyncStateData(ChannelHandlerContext ctx,
+                                                             RemotingCommand 
request) throws Exception {
+        if (request.getBody() != null) {
+            final List<String> brokerNames = 
RemotingSerializable.decode(request.getBody(), List.class);
+            if (brokerNames != null && brokerNames.size() > 0) {
+                final CompletableFuture<RemotingCommand> future = 
this.controllerManager.getController().getSyncStateData(brokerNames);
+                if (future != null) {
+                    return future.get(WAIT_TIMEOUT_OUT, TimeUnit.SECONDS);
+                }
+            }
+        }
+        return RemotingCommand.createResponseCommand(null);
+    }
+
+    private RemotingCommand handleCleanBrokerData(ChannelHandlerContext ctx, 
RemotingCommand request) throws Exception {
+        final CleanControllerBrokerDataRequestHeader requestHeader = 
(CleanControllerBrokerDataRequestHeader) 
request.decodeCommandCustomHeader(CleanControllerBrokerDataRequestHeader.class);
+        final CompletableFuture<RemotingCommand> future = 
this.controllerManager.getController().cleanBrokerData(requestHeader);
+        if (null != future) {
+            return future.get(WAIT_TIMEOUT_OUT, TimeUnit.SECONDS);
+        }
+        return RemotingCommand.createResponseCommand(null);
     }
 
-    private RemotingCommand updateControllerConfig(ChannelHandlerContext ctx, 
RemotingCommand request) {
+    private RemotingCommand handleGetNextBrokerId(ChannelHandlerContext ctx, 
RemotingCommand request) throws Exception {
+        final GetNextBrokerIdRequestHeader requestHeader = 
(GetNextBrokerIdRequestHeader) 
request.decodeCommandCustomHeader(GetNextBrokerIdRequestHeader.class);
+        CompletableFuture<RemotingCommand> future = 
this.controllerManager.getController().getNextBrokerId(requestHeader);
+        if (future != null) {
+            return future.get(WAIT_TIMEOUT_OUT, TimeUnit.SECONDS);
+        }
+        return RemotingCommand.createResponseCommand(null);
+    }
+
+    private RemotingCommand handleApplyBrokerId(ChannelHandlerContext ctx, 
RemotingCommand request) throws Exception {
+        final ApplyBrokerIdRequestHeader requestHeader = 
(ApplyBrokerIdRequestHeader) 
request.decodeCommandCustomHeader(ApplyBrokerIdRequestHeader.class);
+        CompletableFuture<RemotingCommand> future = 
this.controllerManager.getController().applyBrokerId(requestHeader);
+        if (future != null) {
+            return future.get(WAIT_TIMEOUT_OUT, TimeUnit.SECONDS);
+        }
+        return RemotingCommand.createResponseCommand(null);
+    }
+
+    private RemotingCommand handleRegisterBroker(ChannelHandlerContext ctx, 
RemotingCommand request) throws Exception {
+        RegisterBrokerToControllerRequestHeader requestHeader = 
(RegisterBrokerToControllerRequestHeader) 
request.decodeCommandCustomHeader(RegisterBrokerToControllerRequestHeader.class);
+        CompletableFuture<RemotingCommand> future = 
this.controllerManager.getController().registerBroker(requestHeader);
+        if (future != null) {
+            return future.get(WAIT_TIMEOUT_OUT, TimeUnit.SECONDS);
+        }
+        return RemotingCommand.createResponseCommand(null);
+    }

Review Comment:
   > 
   
   好的



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to