TheR1sing3un commented on code in PR #6100:
URL: https://github.com/apache/rocketmq/pull/6100#discussion_r1125868400


##########
controller/src/main/java/org/apache/rocketmq/controller/processor/ControllerRequestProcessor.java:
##########
@@ -77,100 +81,140 @@ public RemotingCommand 
processRequest(ChannelHandlerContext ctx, RemotingCommand
                 request);
         }
         switch (request.getCode()) {
-            case CONTROLLER_ALTER_SYNC_STATE_SET: {
-                final AlterSyncStateSetRequestHeader controllerRequest = 
(AlterSyncStateSetRequestHeader) 
request.decodeCommandCustomHeader(AlterSyncStateSetRequestHeader.class);
-                final SyncStateSet syncStateSet = 
RemotingSerializable.decode(request.getBody(), SyncStateSet.class);
-                final CompletableFuture<RemotingCommand> future = 
this.controllerManager.getController().alterSyncStateSet(controllerRequest, 
syncStateSet);
-                if (future != null) {
-                    return future.get(WAIT_TIMEOUT_OUT, TimeUnit.SECONDS);
-                }
-                break;
-            }
-            case CONTROLLER_ELECT_MASTER: {
-                final ElectMasterRequestHeader electMasterRequest = 
(ElectMasterRequestHeader) 
request.decodeCommandCustomHeader(ElectMasterRequestHeader.class);
-                final CompletableFuture<RemotingCommand> future = 
this.controllerManager.getController().electMaster(electMasterRequest);
-                if (future != null) {
-                    final RemotingCommand response = 
future.get(WAIT_TIMEOUT_OUT, TimeUnit.SECONDS);
-                    final ElectMasterResponseHeader responseHeader = 
(ElectMasterResponseHeader) response.readCustomHeader();
-
-                    if (null != responseHeader) {
-                        if 
(this.controllerManager.getControllerConfig().isNotifyBrokerRoleChanged()) {
-                            
this.controllerManager.notifyBrokerRoleChanged(responseHeader, 
electMasterRequest.getClusterName());
-                        }
-                    }
-                    return response;
-                }
-                break;
-            }
-            case CONTROLLER_REGISTER_BROKER: {
-                final RegisterBrokerToControllerRequestHeader 
controllerRequest = (RegisterBrokerToControllerRequestHeader) 
request.decodeCommandCustomHeader(RegisterBrokerToControllerRequestHeader.class);
-                final CompletableFuture<RemotingCommand> future = 
this.controllerManager.getController().registerBroker(controllerRequest);
-                if (future != null) {
-                    final RemotingCommand response = 
future.get(WAIT_TIMEOUT_OUT, TimeUnit.SECONDS);
-                    final RegisterBrokerToControllerResponseHeader 
responseHeader = (RegisterBrokerToControllerResponseHeader) 
response.readCustomHeader();
-                    if (responseHeader != null && responseHeader.getBrokerId() 
>= 0) {
-                        
this.heartbeatManager.onBrokerHeartbeat(controllerRequest.getClusterName(), 
controllerRequest.getBrokerName(), controllerRequest.getBrokerAddress(),
-                            responseHeader.getBrokerId(), 
controllerRequest.getHeartbeatTimeoutMillis(), ctx.channel(),
-                            controllerRequest.getEpoch(), 
controllerRequest.getMaxOffset(), controllerRequest.getConfirmOffset(), 
controllerRequest.getElectionPriority());
-                    }
-                    return response;
-                }
-                break;
-            }
-            case CONTROLLER_GET_REPLICA_INFO: {
-                final GetReplicaInfoRequestHeader controllerRequest = 
(GetReplicaInfoRequestHeader) 
request.decodeCommandCustomHeader(GetReplicaInfoRequestHeader.class);
-                final CompletableFuture<RemotingCommand> future = 
this.controllerManager.getController().getReplicaInfo(controllerRequest);
-                if (future != null) {
-                    return future.get(WAIT_TIMEOUT_OUT, TimeUnit.SECONDS);
-                }
-                break;
-            }
-            case CONTROLLER_GET_METADATA_INFO: {
-                return 
this.controllerManager.getController().getControllerMetadata();
-            }
-            case BROKER_HEARTBEAT: {
-                final BrokerHeartbeatRequestHeader requestHeader = 
(BrokerHeartbeatRequestHeader) 
request.decodeCommandCustomHeader(BrokerHeartbeatRequestHeader.class);
-                
this.heartbeatManager.onBrokerHeartbeat(requestHeader.getClusterName(), 
requestHeader.getBrokerName(), requestHeader.getBrokerAddr(), 
requestHeader.getBrokerId(),
-                    requestHeader.getHeartbeatTimeoutMills(), ctx.channel(), 
requestHeader.getEpoch(), requestHeader.getMaxOffset(), 
requestHeader.getConfirmOffset(), requestHeader.getElectionPriority());
-                return 
RemotingCommand.createResponseCommand(ResponseCode.SUCCESS, "Heart beat 
success");
-            }
-            case CONTROLLER_GET_SYNC_STATE_DATA: {
-                if (request.getBody() != null) {
-                    final List<String> brokerNames = 
RemotingSerializable.decode(request.getBody(), List.class);
-                    if (brokerNames != null && brokerNames.size() > 0) {
-                        final CompletableFuture<RemotingCommand> future = 
this.controllerManager.getController().getSyncStateData(brokerNames);
-                        if (future != null) {
-                            return future.get(WAIT_TIMEOUT_OUT, 
TimeUnit.SECONDS);
-                        }
-                    }
-                }
-                break;
-            }
+            case CONTROLLER_ALTER_SYNC_STATE_SET:
+                return this.handleAlterSyncStateSet(ctx, request);
+            case CONTROLLER_ELECT_MASTER:
+                return this.handleControllerElectMaster(ctx, request);
+            case CONTROLLER_GET_REPLICA_INFO:
+                return this.handleControllerGetReplicaInfo(ctx, request);
+            case CONTROLLER_GET_METADATA_INFO:
+                return this.handleControllerGetMetadataInfo(ctx, request);
+            case BROKER_HEARTBEAT:
+                return this.handleBrokerHeartbeat(ctx, request);
+            case CONTROLLER_GET_SYNC_STATE_DATA:
+                return this.handleControllerGetSyncStateData(ctx, request);
             case UPDATE_CONTROLLER_CONFIG:
-                return this.updateControllerConfig(ctx, request);
+                return this.handleUpdateControllerConfig(ctx, request);
             case GET_CONTROLLER_CONFIG:
-                return this.getControllerConfig(ctx, request);
+                return this.handleGetControllerConfig(ctx, request);
             case CLEAN_BROKER_DATA:
-                final CleanControllerBrokerDataRequestHeader requestHeader = 
(CleanControllerBrokerDataRequestHeader) 
request.decodeCommandCustomHeader(CleanControllerBrokerDataRequestHeader.class);
-                final CompletableFuture<RemotingCommand> future = 
this.controllerManager.getController().cleanBrokerData(requestHeader);
-                if (null != future) {
-                    return future.get(WAIT_TIMEOUT_OUT, TimeUnit.SECONDS);
-                }
-                break;
+                return this.handleCleanBrokerData(ctx, request);
+            case CONTROLLER_GET_NEXT_BROKER_ID:
+                return this.handleGetNextBrokerId(ctx, request);
+            case CONTROLLER_APPLY_BROKER_ID:
+                return this.handleApplyBrokerId(ctx, request);
+            case CONTROLLER_REGISTER_BROKER:
+                return this.handleRegisterBroker(ctx, request);
             default: {
                 final String error = " request type " + request.getCode() + " 
not supported";
                 return 
RemotingCommand.createResponseCommand(ResponseCode.REQUEST_CODE_NOT_SUPPORTED, 
error);
             }
         }
+    }
+
+    private RemotingCommand handleAlterSyncStateSet(ChannelHandlerContext ctx,
+        RemotingCommand request) throws Exception {
+        final AlterSyncStateSetRequestHeader controllerRequest = 
(AlterSyncStateSetRequestHeader) 
request.decodeCommandCustomHeader(AlterSyncStateSetRequestHeader.class);
+        final SyncStateSet syncStateSet = 
RemotingSerializable.decode(request.getBody(), SyncStateSet.class);
+        final CompletableFuture<RemotingCommand> future = 
this.controllerManager.getController().alterSyncStateSet(controllerRequest, 
syncStateSet);
+        if (future != null) {
+            return future.get(WAIT_TIMEOUT_OUT, TimeUnit.SECONDS);
+        }
         return RemotingCommand.createResponseCommand(null);
     }
 
-    @Override
-    public boolean rejectRequest() {
-        return false;
+    private RemotingCommand handleControllerElectMaster(ChannelHandlerContext 
ctx,
+        RemotingCommand request) throws Exception {
+        final ElectMasterRequestHeader electMasterRequest = 
(ElectMasterRequestHeader) 
request.decodeCommandCustomHeader(ElectMasterRequestHeader.class);
+        final CompletableFuture<RemotingCommand> future = 
this.controllerManager.getController().electMaster(electMasterRequest);
+        if (future != null) {
+            final RemotingCommand response = future.get(WAIT_TIMEOUT_OUT, 
TimeUnit.SECONDS);
+
+            if (response.getCode() == ResponseCode.SUCCESS) {
+                if 
(this.controllerManager.getControllerConfig().isNotifyBrokerRoleChanged()) {
+                    
this.controllerManager.notifyBrokerRoleChanged(RoleChangeNotifyEntry.convert(response));
+                }
+            }
+            return response;
+        }
+        return RemotingCommand.createResponseCommand(null);
+    }
+
+    private RemotingCommand 
handleControllerGetReplicaInfo(ChannelHandlerContext ctx,
+                                                           RemotingCommand 
request) throws Exception {
+        final GetReplicaInfoRequestHeader controllerRequest = 
(GetReplicaInfoRequestHeader) 
request.decodeCommandCustomHeader(GetReplicaInfoRequestHeader.class);
+        final CompletableFuture<RemotingCommand> future = 
this.controllerManager.getController().getReplicaInfo(controllerRequest);
+        if (future != null) {
+            return future.get(WAIT_TIMEOUT_OUT, TimeUnit.SECONDS);
+        }
+        return RemotingCommand.createResponseCommand(null);
+    }
+
+    private RemotingCommand 
handleControllerGetMetadataInfo(ChannelHandlerContext ctx, RemotingCommand 
request) {
+        return this.controllerManager.getController().getControllerMetadata();
+    }
+
+    private RemotingCommand handleBrokerHeartbeat(ChannelHandlerContext ctx, 
RemotingCommand request) throws Exception {
+        final BrokerHeartbeatRequestHeader requestHeader = 
(BrokerHeartbeatRequestHeader) 
request.decodeCommandCustomHeader(BrokerHeartbeatRequestHeader.class);
+        if (requestHeader.getBrokerId() == null) {
+            return 
RemotingCommand.createResponseCommand(ResponseCode.CONTROLLER_INVALID_REQUEST, 
"Heart beat with empty brokerId");
+        }
+        
this.heartbeatManager.onBrokerHeartbeat(requestHeader.getClusterName(), 
requestHeader.getBrokerName(), requestHeader.getBrokerAddr(), 
requestHeader.getBrokerId(),
+                requestHeader.getHeartbeatTimeoutMills(), ctx.channel(), 
requestHeader.getEpoch(), requestHeader.getMaxOffset(), 
requestHeader.getConfirmOffset(), requestHeader.getElectionPriority());
+        return RemotingCommand.createResponseCommand(ResponseCode.SUCCESS, 
"Heart beat success");
+    }
+
+    private RemotingCommand 
handleControllerGetSyncStateData(ChannelHandlerContext ctx,
+                                                             RemotingCommand 
request) throws Exception {
+        if (request.getBody() != null) {
+            final List<String> brokerNames = 
RemotingSerializable.decode(request.getBody(), List.class);
+            if (brokerNames != null && brokerNames.size() > 0) {
+                final CompletableFuture<RemotingCommand> future = 
this.controllerManager.getController().getSyncStateData(brokerNames);
+                if (future != null) {
+                    return future.get(WAIT_TIMEOUT_OUT, TimeUnit.SECONDS);
+                }
+            }
+        }
+        return RemotingCommand.createResponseCommand(null);
+    }
+
+    private RemotingCommand handleCleanBrokerData(ChannelHandlerContext ctx, 
RemotingCommand request) throws Exception {
+        final CleanControllerBrokerDataRequestHeader requestHeader = 
(CleanControllerBrokerDataRequestHeader) 
request.decodeCommandCustomHeader(CleanControllerBrokerDataRequestHeader.class);
+        final CompletableFuture<RemotingCommand> future = 
this.controllerManager.getController().cleanBrokerData(requestHeader);
+        if (null != future) {
+            return future.get(WAIT_TIMEOUT_OUT, TimeUnit.SECONDS);
+        }
+        return RemotingCommand.createResponseCommand(null);
     }
 
-    private RemotingCommand updateControllerConfig(ChannelHandlerContext ctx, 
RemotingCommand request) {
+    private RemotingCommand handleGetNextBrokerId(ChannelHandlerContext ctx, 
RemotingCommand request) throws Exception {
+        final GetNextBrokerIdRequestHeader requestHeader = 
(GetNextBrokerIdRequestHeader) 
request.decodeCommandCustomHeader(GetNextBrokerIdRequestHeader.class);
+        CompletableFuture<RemotingCommand> future = 
this.controllerManager.getController().getNextBrokerId(requestHeader);
+        if (future != null) {
+            return future.get(WAIT_TIMEOUT_OUT, TimeUnit.SECONDS);
+        }
+        return RemotingCommand.createResponseCommand(null);
+    }
+
+    private RemotingCommand handleApplyBrokerId(ChannelHandlerContext ctx, 
RemotingCommand request) throws Exception {
+        final ApplyBrokerIdRequestHeader requestHeader = 
(ApplyBrokerIdRequestHeader) 
request.decodeCommandCustomHeader(ApplyBrokerIdRequestHeader.class);
+        CompletableFuture<RemotingCommand> future = 
this.controllerManager.getController().applyBrokerId(requestHeader);
+        if (future != null) {
+            return future.get(WAIT_TIMEOUT_OUT, TimeUnit.SECONDS);
+        }
+        return RemotingCommand.createResponseCommand(null);
+    }
+
+    private RemotingCommand handleRegisterBroker(ChannelHandlerContext ctx, 
RemotingCommand request) throws Exception {
+        RegisterBrokerToControllerRequestHeader requestHeader = 
(RegisterBrokerToControllerRequestHeader) 
request.decodeCommandCustomHeader(RegisterBrokerToControllerRequestHeader.class);
+        CompletableFuture<RemotingCommand> future = 
this.controllerManager.getController().registerBroker(requestHeader);
+        if (future != null) {
+            return future.get(WAIT_TIMEOUT_OUT, TimeUnit.SECONDS);
+        }
+        return RemotingCommand.createResponseCommand(null);
+    }

Review Comment:
   > 注册成功是否也能当作一次心跳,就和之前的一样
   
   
原本是这么想的,但是我发现需要加的心跳参数太多了,而且这个接口也只有注册流程中会用到(注册流程我们也会通过手动触发心跳来保证当前节点的活跃)。注册成功后,心跳定时任务就启动了。



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to