RongtongJin commented on code in PR #6100:
URL: https://github.com/apache/rocketmq/pull/6100#discussion_r1126054850


##########
broker/src/main/java/org/apache/rocketmq/broker/controller/ReplicasManager.java:
##########
@@ -296,61 +347,241 @@ private void handleSlaveSynchronize(final BrokerRole 
role) {
         }
     }
 
-    private boolean registerBrokerToController() {
-        // Register this broker to controller, get brokerId and masterAddress.
+    private boolean brokerElect() {
+        // Broker try to elect itself as a master in broker set.
         try {
-            final RegisterBrokerToControllerResponseHeader registerResponse = 
this.brokerOuterAPI.registerBrokerToController(this.controllerLeaderAddress,
-                this.brokerConfig.getBrokerClusterName(), 
this.brokerConfig.getBrokerName(), this.localAddress, 
this.brokerConfig.getControllerHeartBeatTimeoutMills(),
-                this.haService.getLastEpoch(), 
this.brokerController.getMessageStore().getMaxPhyOffset(), 
this.brokerConfig.getBrokerElectionPriority());
-            final String newMasterAddress = 
registerResponse.getMasterAddress();
-            if (StringUtils.isNoneEmpty(newMasterAddress)) {
-                if (StringUtils.equals(newMasterAddress, this.localAddress)) {
-                    changeToMaster(registerResponse.getMasterEpoch(), 
registerResponse.getSyncStateSetEpoch());
-                } else {
-                    changeToSlave(newMasterAddress, 
registerResponse.getMasterEpoch(), registerResponse.getBrokerId());
-                }
-                // Set isolated to false, make broker can register to namesrv 
regularly
-                brokerController.setIsolated(false);
-            } else {
-                LOGGER.warn("No master in controller");
+            ElectMasterResponseHeader tryElectResponse = 
this.brokerOuterAPI.brokerElect(this.controllerLeaderAddress, 
this.brokerConfig.getBrokerClusterName(),
+                this.brokerConfig.getBrokerName(), this.brokerControllerId);
+            final String masterAddress = tryElectResponse.getMasterAddress();
+            final Long masterBrokerId = tryElectResponse.getMasterBrokerId();
+            if (StringUtils.isEmpty(masterAddress) || masterBrokerId == null) {
+                LOGGER.warn("Now no master in broker set");
                 return false;
             }
+
+            if (masterBrokerId.equals(this.brokerControllerId)) {
+                changeToMaster(tryElectResponse.getMasterEpoch(), 
tryElectResponse.getSyncStateSetEpoch());
+            } else {
+                changeToSlave(masterAddress, 
tryElectResponse.getMasterEpoch(), tryElectResponse.getMasterBrokerId());
+            }
+            brokerController.setIsolated(false);
+            return true;
+        } catch (Exception e) {
+            LOGGER.error("Failed to try elect", e);
+            return false;
+        }
+    }
+
+    public void sendHeartbeatToController() {
+        final List<String> controllerAddresses = 
this.getAvailableControllerAddresses();
+        for (String controllerAddress : controllerAddresses) {
+            if (StringUtils.isNotEmpty(controllerAddress)) {
+                this.brokerOuterAPI.sendHeartbeatToController(
+                        controllerAddress,
+                        this.brokerConfig.getBrokerClusterName(),
+                        this.brokerAddress,
+                        this.brokerConfig.getBrokerName(),
+                        this.brokerControllerId,
+                        this.brokerConfig.getSendHeartbeatTimeoutMillis(),
+                        this.brokerConfig.isInBrokerContainer(), 
this.getLastEpoch(),
+                        
this.brokerController.getMessageStore().getMaxPhyOffset(),
+                        this.getConfirmOffset(),
+                        this.brokerConfig.getControllerHeartBeatTimeoutMills(),
+                        this.brokerConfig.getBrokerElectionPriority()
+                );
+            }
+        }
+    }
+
+    /**
+     * Register broker to controller, and persist the metadata to file
+     * @return whether registering process succeeded
+     */
+    private boolean register() {
+        try {
+            // 1. confirm now registering state
+            confirmNowRegisteringState();
+            // 2. get next assigning brokerId, and create temp metadata file
+            if (this.registerState == RegisterState.INITIAL) {
+                Long nextBrokerId = getNextBrokerId();
+                if (nextBrokerId == null || 
!createTempMetadataFile(nextBrokerId)) {
+                    return false;
+                }
+                this.registerState = 
RegisterState.CREATE_TEMP_METADATA_FILE_DONE;
+            }
+            // 3. apply brokerId to controller, and create metadata file
+            if (this.registerState == 
RegisterState.CREATE_TEMP_METADATA_FILE_DONE) {
+                if (!applyBrokerId()) {
+                    // apply broker id failed, means that this brokerId has 
been used
+                    // delete temp metadata file
+                    this.tempBrokerMetadata.clear();
+                    // back to the first step
+                    this.registerState = RegisterState.INITIAL;
+                    return false;
+                }
+                if (!createMetadataFileAndDeleteTemp()) {
+                    return false;
+                }
+                this.registerState = RegisterState.CREATE_METADATA_FILE_DONE;
+            }
+            // 4. register
+            if (this.registerState == RegisterState.CREATE_METADATA_FILE_DONE) 
{
+                if (!registerBrokerToController()) {
+                    return false;
+                }
+                this.registerState = RegisterState.REGISTERED;
+            }
             return true;
         } catch (final Exception e) {
             LOGGER.error("Failed to register broker to controller", e);
             return false;
         }
     }
 
+    /**
+     * Send GetNextBrokerRequest to controller for getting next assigning 
brokerId in this broker-set
+     * @return next brokerId in this broker-set
+     */
+    private Long getNextBrokerId() {
+        try {
+            GetNextBrokerIdResponseHeader nextBrokerIdResp = 
this.brokerOuterAPI.getNextBrokerId(this.brokerConfig.getBrokerClusterName(), 
this.brokerConfig.getBrokerName(), this.controllerLeaderAddress);
+            return nextBrokerIdResp.getNextBrokerId();
+        } catch (Exception e) {
+            LOGGER.error("fail to get next broker id from controller", e);
+            return null;
+        }
+    }
+
+    /**
+     * Create temp metadata file in local file system, records the brokerId 
and registerCheckCode
+     * @param brokerId the brokerId that is expected to be assigned
+     * @return whether the temp meta file is created successfully
+     */
+
+    private boolean createTempMetadataFile(Long brokerId) {
+        // generate register check code, format like that: 
$ipAddress;$timestamp
+        String registerCheckCode = this.brokerAddress + ";" + 
System.currentTimeMillis();
+        try {
+            
this.tempBrokerMetadata.updateAndPersist(brokerConfig.getBrokerClusterName(), 
brokerConfig.getBrokerName(), brokerId, registerCheckCode);
+            return true;
+        } catch (Exception e) {
+            LOGGER.error("update and persist temp broker metadata file 
failed", e);
+            this.tempBrokerMetadata.clear();
+            return false;
+        }
+    }
+
+    /**
+     * Send applyBrokerId request to controller
+     * @return whether controller has assigned this brokerId for this broker
+     */
+    private boolean applyBrokerId() {
+        try {
+            ApplyBrokerIdResponseHeader response = 
this.brokerOuterAPI.applyBrokerId(brokerConfig.getBrokerClusterName(), 
brokerConfig.getBrokerName(),
+                    tempBrokerMetadata.getBrokerId(), 
tempBrokerMetadata.getRegisterCheckCode(), this.controllerLeaderAddress);
+            return true;
+
+        } catch (Exception e) {
+            LOGGER.error("fail to apply broker id: {}", e, 
tempBrokerMetadata.getBrokerId());
+            return false;
+        }
+    }
+
+    /**
+     * Create metadata file and delete temp metadata file
+     * @return whether process success
+     */
+    private boolean createMetadataFileAndDeleteTemp() {
+        // create metadata file and delete temp metadata file
+        try {
+            
this.brokerMetadata.updateAndPersist(brokerConfig.getBrokerClusterName(), 
brokerConfig.getBrokerName(), tempBrokerMetadata.getBrokerId());
+            this.tempBrokerMetadata.clear();
+            this.brokerControllerId = this.brokerMetadata.getBrokerId();
+            return true;
+        } catch (Exception e) {
+            LOGGER.error("fail to create metadata file", e);
+            this.brokerMetadata.clear();
+            return false;
+        }
+    }
+
+    /**
+     * Send registerSuccess request to inform controller that now broker has 
been registered successfully and controller should update broker ipAddress if 
changed
+     * @return whether request success
+     */
+    private boolean registerBrokerToController() {
+        try {
+            RegisterBrokerToControllerResponseHeader response = 
this.brokerOuterAPI.registerSuccess(brokerConfig.getBrokerClusterName(), 
brokerConfig.getBrokerName(), brokerControllerId, brokerAddress, 
controllerLeaderAddress);
+            final Long masterBrokerId = response.getMasterBrokerId();
+            final String masterAddress = response.getMasterAddress();
+            if (masterBrokerId == null) {
+                return true;
+            }
+            if (this.brokerControllerId.equals(masterBrokerId)) {
+                changeToMaster(response.getMasterEpoch(), 
response.getSyncStateSetEpoch());
+            } else {
+                changeToSlave(masterAddress, response.getMasterEpoch(), 
masterBrokerId);
+            }
+            brokerController.setIsolated(false);
+            return true;
+        } catch (Exception e) {
+            LOGGER.error("fail to send registerSuccess request to controller", 
e);
+            return false;
+        }
+    }
+
+    /**
+     * Confirm the registering state now
+     */
+    private void confirmNowRegisteringState() {
+        // 1. check if metadata exist
+        try {
+            this.brokerMetadata.readFromFile();
+        } catch (Exception e) {
+            LOGGER.error("read metadata file failed", e);
+        }
+        if (this.brokerMetadata.isLoaded()) {
+            this.registerState = RegisterState.CREATE_METADATA_FILE_DONE;
+            this.brokerControllerId = brokerMetadata.getBrokerId();
+            return;
+        }
+        // 2. check if temp metadata exist
+        try {
+            this.tempBrokerMetadata.readFromFile();
+        } catch (Exception e) {
+            LOGGER.error("read temp metadata file failed", e);
+        }

Review Comment:
   
如果brokerConfig中的brokerName与metadata文件中的brokerName不一致,建议日志告警出来(提问问题,如果是重建,建议删除xxx文件),然后退出。



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to