RongtongJin commented on code in PR #4272:
URL: https://github.com/apache/rocketmq/pull/4272#discussion_r870914660


##########
broker/src/main/java/org/apache/rocketmq/broker/hacontroller/ReplicasManager.java:
##########
@@ -0,0 +1,354 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.broker.hacontroller;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.TimeUnit;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.rocketmq.broker.BrokerController;
+import org.apache.rocketmq.broker.out.BrokerOuterAPI;
+import org.apache.rocketmq.common.BrokerConfig;
+import org.apache.rocketmq.common.MixAll;
+import org.apache.rocketmq.common.Pair;
+import org.apache.rocketmq.common.ThreadFactoryImpl;
+import org.apache.rocketmq.common.constant.LoggerName;
+import org.apache.rocketmq.common.protocol.body.SyncStateSet;
+import 
org.apache.rocketmq.common.protocol.header.namesrv.controller.BrokerRegisterResponseHeader;
+import 
org.apache.rocketmq.common.protocol.header.namesrv.controller.GetMetaDataResponseHeader;
+import 
org.apache.rocketmq.common.protocol.header.namesrv.controller.GetReplicaInfoResponseHeader;
+import org.apache.rocketmq.logging.InternalLogger;
+import org.apache.rocketmq.logging.InternalLoggerFactory;
+import org.apache.rocketmq.store.config.BrokerRole;
+import org.apache.rocketmq.store.ha.autoswitch.AutoSwitchHAService;
+
+/**
+ * The manager of broker replicas, including:
+ * 0.regularly syncing controller metadata, change controller leader address, 
both master and slave will start this timed task.
+ * 1.regularly syncing metadata from controllers, and changing broker roles 
and master if needed, both master and slave will start this timed task.
+ * 2.regularly expanding and Shrinking syncStateSet, only master will start 
this timed task.
+ */
+public class ReplicasManager {
+    private static final InternalLogger LOGGER = 
InternalLoggerFactory.getLogger(LoggerName.BROKER_LOGGER_NAME);
+    public static final int SYNC_BROKER_METADATA_PERIOD = 5 * 1000;
+    public static final int SYNC_CONTROLLER_METADATA_PERIOD = 10 * 1000;
+    public static final int CHECK_SYNC_STATE_SET_PERIOD = 8 * 1000;
+
+    private final ScheduledExecutorService scheduledService = 
Executors.newScheduledThreadPool(3, new 
ThreadFactoryImpl("ReplicasManager_ScheduledService_"));
+    private final BrokerController brokerController;
+    private final AutoSwitchHAService haService;
+    private final BrokerConfig brokerConfig;
+    private final String localAddress;
+    private final BrokerOuterAPI outerAPI;
+    private final List<String> controllerAddresses;
+
+    private volatile String controllerLeaderAddress = "";
+
+    private ScheduledFuture<?> checkSyncStateSetTaskFuture;
+    private ScheduledFuture<?> slaveSyncFuture;
+
+    private Set<String> syncStateSet;
+    private int syncStateSetEpoch = 0;
+    private String masterAddress = "";
+    private int masterEpoch = 0;
+
+    public ReplicasManager(final BrokerController brokerController) {
+        this.brokerController = brokerController;
+        this.outerAPI = brokerController.getBrokerOuterAPI();
+        this.haService = (AutoSwitchHAService) 
brokerController.getMessageStore().getHaService();
+        this.brokerConfig = brokerController.getBrokerConfig();
+        final BrokerConfig brokerConfig = brokerController.getBrokerConfig();
+        final String controllerPaths = brokerConfig.getNamesrvAddr();
+        final String[] controllers = controllerPaths.split(";");
+        assert controllers.length > 0;
+        this.controllerAddresses = new ArrayList<>(Arrays.asList(controllers));
+        this.syncStateSet = new HashSet<>();
+        this.localAddress = brokerController.getBrokerAddr();
+        this.haService.setLocalAddress(this.localAddress);
+    }
+
+    public void start() {
+        if (!schedulingSyncControllerMetadata()) {
+            return;
+        }
+
+        if (!registerBroker()) {
+            return;
+        }
+
+        schedulingSyncBrokerMetadata();
+    }
+
+    public void shutdown() {
+        this.scheduledService.shutdown();
+    }
+
+    public void changeToMaster(final int newMasterEpoch, final int 
syncStateSetEpoch) {
+        synchronized (this) {
+            if (newMasterEpoch > this.masterEpoch) {
+                LOGGER.info("Begin to change to master, brokerName:{}, 
replicas:{}, new Epoch:{}", this.brokerConfig.getBrokerName(), 
this.localAddress, newMasterEpoch);
+
+                // Change record
+                this.masterAddress = this.localAddress;
+                this.masterEpoch = newMasterEpoch;
+
+                // Change sync state set
+                final HashSet<String> newSyncStateSet = new HashSet<>();
+                newSyncStateSet.add(this.localAddress);
+                changeSyncStateSet(newSyncStateSet, syncStateSetEpoch);
+                schedulingCheckSyncStateSet();
+
+                // Handle the slave synchronise
+                handleSlaveSynchronize(BrokerRole.SYNC_MASTER);
+
+                
this.brokerController.getBrokerConfig().setBrokerId(MixAll.MASTER_ID);
+                
this.brokerController.getMessageStoreConfig().setBrokerRole(BrokerRole.SYNC_MASTER);
+
+                // Register broker to name-srv
+                try {
+                    this.brokerController.registerBrokerAll(true, false, 
this.brokerController.getBrokerConfig().isForceRegister());
+                } catch (final Throwable e) {
+                    LOGGER.error("Error happen when register broker to 
name-srv, Failed to change broker to master", e);
+                    return;
+                }
+
+                // Notify ha service, change to master
+                this.haService.changeToMaster(newMasterEpoch);
+                LOGGER.info("Change broker {} to master success, masterEpoch 
{}, syncStateSetEpoch:{}", this.localAddress, newMasterEpoch, 
syncStateSetEpoch);
+            }
+        }
+    }
+
+    public void changeToSlave(final String newMasterAddress, final int 
newMasterEpoch) {
+        synchronized (this) {
+            if (newMasterEpoch > this.masterEpoch) {
+                LOGGER.info("Begin to change to slave, brokerName={}, 
replicas:{}, brokerId={}", this.brokerConfig.getBrokerName(), 
this.localAddress, this.brokerConfig.getBrokerId());
+
+                // Change record
+                this.masterAddress = newMasterAddress;
+                this.masterEpoch = newMasterEpoch;
+                stopCheckSyncStateSet();
+
+                // Change config
+                
this.brokerController.getMessageStoreConfig().setBrokerRole(BrokerRole.SLAVE);
+                this.brokerController.changeSpecialServiceStatus(false);
+
+                // Handle the slave synchronise
+                handleSlaveSynchronize(BrokerRole.SLAVE);
+
+                // Register broker to name-srv
+                try {
+                    this.brokerController.registerBrokerAll(true, false, 
this.brokerController.getBrokerConfig().isForceRegister());
+                } catch (final Throwable e) {
+                    LOGGER.error("Error happen when register broker to 
name-srv, Failed to change broker to slave", e);
+                    return;
+                }

Review Comment:
   建议放在线程池异步执行,主要是有耗时的rpc调用,也不适合放在锁中



##########
broker/src/main/java/org/apache/rocketmq/broker/hacontroller/ReplicasManager.java:
##########
@@ -0,0 +1,354 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.broker.hacontroller;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.TimeUnit;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.rocketmq.broker.BrokerController;
+import org.apache.rocketmq.broker.out.BrokerOuterAPI;
+import org.apache.rocketmq.common.BrokerConfig;
+import org.apache.rocketmq.common.MixAll;
+import org.apache.rocketmq.common.Pair;
+import org.apache.rocketmq.common.ThreadFactoryImpl;
+import org.apache.rocketmq.common.constant.LoggerName;
+import org.apache.rocketmq.common.protocol.body.SyncStateSet;
+import 
org.apache.rocketmq.common.protocol.header.namesrv.controller.BrokerRegisterResponseHeader;
+import 
org.apache.rocketmq.common.protocol.header.namesrv.controller.GetMetaDataResponseHeader;
+import 
org.apache.rocketmq.common.protocol.header.namesrv.controller.GetReplicaInfoResponseHeader;
+import org.apache.rocketmq.logging.InternalLogger;
+import org.apache.rocketmq.logging.InternalLoggerFactory;
+import org.apache.rocketmq.store.config.BrokerRole;
+import org.apache.rocketmq.store.ha.autoswitch.AutoSwitchHAService;
+
+/**
+ * The manager of broker replicas, including:
+ * 0.regularly syncing controller metadata, change controller leader address, 
both master and slave will start this timed task.
+ * 1.regularly syncing metadata from controllers, and changing broker roles 
and master if needed, both master and slave will start this timed task.
+ * 2.regularly expanding and Shrinking syncStateSet, only master will start 
this timed task.
+ */
+public class ReplicasManager {
+    private static final InternalLogger LOGGER = 
InternalLoggerFactory.getLogger(LoggerName.BROKER_LOGGER_NAME);
+    public static final int SYNC_BROKER_METADATA_PERIOD = 5 * 1000;
+    public static final int SYNC_CONTROLLER_METADATA_PERIOD = 10 * 1000;
+    public static final int CHECK_SYNC_STATE_SET_PERIOD = 8 * 1000;
+
+    private final ScheduledExecutorService scheduledService = 
Executors.newScheduledThreadPool(3, new 
ThreadFactoryImpl("ReplicasManager_ScheduledService_"));
+    private final BrokerController brokerController;
+    private final AutoSwitchHAService haService;
+    private final BrokerConfig brokerConfig;
+    private final String localAddress;
+    private final BrokerOuterAPI outerAPI;
+    private final List<String> controllerAddresses;
+
+    private volatile String controllerLeaderAddress = "";
+
+    private ScheduledFuture<?> checkSyncStateSetTaskFuture;
+    private ScheduledFuture<?> slaveSyncFuture;
+
+    private Set<String> syncStateSet;
+    private int syncStateSetEpoch = 0;
+    private String masterAddress = "";
+    private int masterEpoch = 0;
+
+    public ReplicasManager(final BrokerController brokerController) {
+        this.brokerController = brokerController;
+        this.outerAPI = brokerController.getBrokerOuterAPI();
+        this.haService = (AutoSwitchHAService) 
brokerController.getMessageStore().getHaService();
+        this.brokerConfig = brokerController.getBrokerConfig();
+        final BrokerConfig brokerConfig = brokerController.getBrokerConfig();
+        final String controllerPaths = brokerConfig.getNamesrvAddr();

Review Comment:
   这里建议新建一个controllerAddr配置,不是每个nameserver都包含ccontroller



##########
test/src/test/java/org/apache/rocketmq/test/autoswitchrole/AutoSwitchRoleIntegrationTest.java:
##########
@@ -0,0 +1,168 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package org.apache.rocketmq.test.autoswitchrole;
+
+import org.apache.rocketmq.broker.BrokerController;
+import org.apache.rocketmq.broker.hacontroller.ReplicasManager;
+import org.apache.rocketmq.common.BrokerConfig;
+import org.apache.rocketmq.common.namesrv.ControllerConfig;
+import org.apache.rocketmq.common.namesrv.NamesrvConfig;
+import org.apache.rocketmq.common.protocol.body.SyncStateSet;
+import org.apache.rocketmq.namesrv.NamesrvController;
+import org.apache.rocketmq.remoting.netty.NettyClientConfig;
+import org.apache.rocketmq.remoting.netty.NettyServerConfig;
+import org.apache.rocketmq.store.MessageStore;
+import org.apache.rocketmq.store.config.MessageStoreConfig;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+public class AutoSwitchRoleIntegrationTest extends AutoSwitchRoleBase {
+
+    private ControllerConfig controllerConfig;
+    private NamesrvController namesrvController;
+
+    private BrokerController brokerController1;
+    private BrokerController brokerController2;
+
+    private MessageStoreConfig storeConfig1;
+    private MessageStoreConfig storeConfig2;
+    private BrokerConfig brokerConfig1;
+    private BrokerConfig brokerConfig2;
+    private NettyServerConfig brokerNettyServerConfig1;
+    private NettyServerConfig brokerNettyServerConfig2;
+
+
+    @Before
+    public void init() throws Exception {
+        super.initialize();
+
+        // Startup namesrv
+        final String peers = String.format("n0-localhost:%d", 30000);
+        final NettyServerConfig serverConfig = new NettyServerConfig();
+        serverConfig.setListenPort(31000);
+
+        this.controllerConfig = buildControllerConfig("n0", peers);
+        this.namesrvController = new NamesrvController(new NamesrvConfig(), 
serverConfig, new NettyClientConfig(), controllerConfig);
+        assertTrue(namesrvController.initialize());
+        namesrvController.start();
+
+        final String namesrvAddress = "127.0.0.1:31000;";
+        for (int i = 0; i < 2; i++) {
+            final MessageStoreConfig storeConfig = 
buildMessageStoreConfig("broker" + i, 20000 + i);
+            final BrokerConfig brokerConfig = new BrokerConfig();
+            brokerConfig.setListenPort(21000 + i);
+            brokerConfig.setNamesrvAddr(namesrvAddress);
+            brokerConfig.setMetaDataHosts(namesrvAddress);
+
+            final NettyServerConfig nettyServerConfig = new 
NettyServerConfig();
+            nettyServerConfig.setListenPort(22000 + i);
+
+            final BrokerController brokerController = new 
BrokerController(brokerConfig, nettyServerConfig, new NettyClientConfig(), 
storeConfig);
+            assertTrue(brokerController.initialize());
+            brokerController.start();
+            System.out.println("Start controller success");
+            Thread.sleep(1000);
+            // The first is master
+            if (i == 0) {
+                
assertTrue(brokerController.getReplicasManager().isMasterState());
+                this.brokerController1 = brokerController;
+                this.storeConfig1 = storeConfig;
+                this.brokerConfig1 = brokerConfig;
+                this.brokerNettyServerConfig1 = nettyServerConfig;
+            } else {
+                
assertFalse(brokerController.getReplicasManager().isMasterState());
+                this.brokerController2 = brokerController;
+                this.storeConfig2 = storeConfig;
+                this.brokerConfig2 = brokerConfig;
+                this.brokerNettyServerConfig2 = nettyServerConfig;
+            }
+        }
+
+        // Wait slave connecting to master
+        Thread.sleep(15000);
+    }
+
+    public void mockData() throws Exception {
+        System.out.println("Begin test");
+        final MessageStore messageStore = brokerController1.getMessageStore();
+        putMessage(messageStore);
+
+        // Check slave message
+        checkMessage(brokerController2.getMessageStore(), 10, 0);
+    }
+
+    @Test
+    public void testCheckSyncStateSet() throws Exception {
+        mockData();
+
+        // Check sync state set
+        final ReplicasManager replicasManager = 
brokerController1.getReplicasManager();
+        final SyncStateSet syncStateSet = replicasManager.getSyncStateSet();
+        assertEquals(2, syncStateSet.getSyncStateSet().size());
+    }
+
+    @Test
+    public void testChangeMaster() throws Exception {
+        mockData();
+
+        // Let master shutdown
+        brokerController1.shutdown();
+        Thread.sleep(5000);
+
+        // The slave should change to master
+        assertTrue(brokerController2.getReplicasManager().isMasterState());
+        assertEquals(brokerController2.getReplicasManager().getMasterEpoch(), 
2);
+
+        // Restart old master, it should be slave
+        brokerController1 = new BrokerController(brokerConfig1, 
brokerNettyServerConfig1, new NettyClientConfig(), storeConfig1);
+        brokerController1.initialize();
+        brokerController1.start();
+
+        Thread.sleep(20000);
+        assertFalse(brokerController1.getReplicasManager().isMasterState());
+        
assertEquals(brokerController1.getReplicasManager().getMasterAddress(), 
brokerController2.getReplicasManager().getLocalAddress());
+
+        // Put another batch messages
+        final MessageStore messageStore = brokerController2.getMessageStore();
+        putMessage(messageStore);
+
+        // Check slave message
+        checkMessage(brokerController1.getMessageStore(), 20, 0);
+    }
+
+

Review Comment:
   最好是能增加一下测试,能覆盖到截断算法



##########
container/src/main/java/org/apache/rocketmq/container/InnerBrokerController.java:
##########
@@ -113,11 +100,32 @@ public void run2() {
             }, 1000, this.brokerConfig.getSyncBrokerMemberGroupPeriod(), 
TimeUnit.MILLISECONDS));
         }
 
+        if (this.messageStoreConfig.isStartupControllerMode()) {
+            scheduleSendHeartbeat();
+        }
+
         if (brokerConfig.isSkipPreOnline()) {
             startServiceWithoutCondition();
         }
     }
 
+    private void scheduleSendHeartbeat() {
+        
scheduledFutures.add(this.brokerHeartbeatExecutorService.scheduleAtFixedRate(new
 AbstractBrokerRunnable(this.getBrokerIdentity()) {
+            @Override
+            public void run2() {
+                if (isIsolated) {
+                    return;
+                }
+                try {
+                    InnerBrokerController.this.sendHeartbeat();
+                } catch (Exception e) {
+                    BrokerController.LOG.error("sendHeartbeat Exception", e);
+                }
+
+            }
+        }, 1000, brokerConfig.getBrokerHeartbeatInterval(), 
TimeUnit.MILLISECONDS));
+    }

Review Comment:
   把父类brokerController的方法改为protected,直接使用即可



##########
broker/src/main/java/org/apache/rocketmq/broker/hacontroller/ReplicasManager.java:
##########
@@ -0,0 +1,354 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.broker.hacontroller;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.TimeUnit;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.rocketmq.broker.BrokerController;
+import org.apache.rocketmq.broker.out.BrokerOuterAPI;
+import org.apache.rocketmq.common.BrokerConfig;
+import org.apache.rocketmq.common.MixAll;
+import org.apache.rocketmq.common.Pair;
+import org.apache.rocketmq.common.ThreadFactoryImpl;
+import org.apache.rocketmq.common.constant.LoggerName;
+import org.apache.rocketmq.common.protocol.body.SyncStateSet;
+import 
org.apache.rocketmq.common.protocol.header.namesrv.controller.BrokerRegisterResponseHeader;
+import 
org.apache.rocketmq.common.protocol.header.namesrv.controller.GetMetaDataResponseHeader;
+import 
org.apache.rocketmq.common.protocol.header.namesrv.controller.GetReplicaInfoResponseHeader;
+import org.apache.rocketmq.logging.InternalLogger;
+import org.apache.rocketmq.logging.InternalLoggerFactory;
+import org.apache.rocketmq.store.config.BrokerRole;
+import org.apache.rocketmq.store.ha.autoswitch.AutoSwitchHAService;
+
+/**
+ * The manager of broker replicas, including:
+ * 0.regularly syncing controller metadata, change controller leader address, 
both master and slave will start this timed task.
+ * 1.regularly syncing metadata from controllers, and changing broker roles 
and master if needed, both master and slave will start this timed task.
+ * 2.regularly expanding and Shrinking syncStateSet, only master will start 
this timed task.
+ */
+public class ReplicasManager {
+    private static final InternalLogger LOGGER = 
InternalLoggerFactory.getLogger(LoggerName.BROKER_LOGGER_NAME);
+    public static final int SYNC_BROKER_METADATA_PERIOD = 5 * 1000;
+    public static final int SYNC_CONTROLLER_METADATA_PERIOD = 10 * 1000;
+    public static final int CHECK_SYNC_STATE_SET_PERIOD = 8 * 1000;
+
+    private final ScheduledExecutorService scheduledService = 
Executors.newScheduledThreadPool(3, new 
ThreadFactoryImpl("ReplicasManager_ScheduledService_"));
+    private final BrokerController brokerController;
+    private final AutoSwitchHAService haService;
+    private final BrokerConfig brokerConfig;
+    private final String localAddress;
+    private final BrokerOuterAPI outerAPI;
+    private final List<String> controllerAddresses;
+
+    private volatile String controllerLeaderAddress = "";
+
+    private ScheduledFuture<?> checkSyncStateSetTaskFuture;
+    private ScheduledFuture<?> slaveSyncFuture;
+
+    private Set<String> syncStateSet;
+    private int syncStateSetEpoch = 0;
+    private String masterAddress = "";
+    private int masterEpoch = 0;
+
+    public ReplicasManager(final BrokerController brokerController) {
+        this.brokerController = brokerController;
+        this.outerAPI = brokerController.getBrokerOuterAPI();
+        this.haService = (AutoSwitchHAService) 
brokerController.getMessageStore().getHaService();
+        this.brokerConfig = brokerController.getBrokerConfig();
+        final BrokerConfig brokerConfig = brokerController.getBrokerConfig();
+        final String controllerPaths = brokerConfig.getNamesrvAddr();
+        final String[] controllers = controllerPaths.split(";");
+        assert controllers.length > 0;
+        this.controllerAddresses = new ArrayList<>(Arrays.asList(controllers));
+        this.syncStateSet = new HashSet<>();
+        this.localAddress = brokerController.getBrokerAddr();
+        this.haService.setLocalAddress(this.localAddress);
+    }
+
+    public void start() {
+        if (!schedulingSyncControllerMetadata()) {
+            return;
+        }
+
+        if (!registerBroker()) {
+            return;
+        }
+
+        schedulingSyncBrokerMetadata();
+    }
+
+    public void shutdown() {
+        this.scheduledService.shutdown();
+    }
+
+    public void changeToMaster(final int newMasterEpoch, final int 
syncStateSetEpoch) {
+        synchronized (this) {
+            if (newMasterEpoch > this.masterEpoch) {
+                LOGGER.info("Begin to change to master, brokerName:{}, 
replicas:{}, new Epoch:{}", this.brokerConfig.getBrokerName(), 
this.localAddress, newMasterEpoch);
+
+                // Change record
+                this.masterAddress = this.localAddress;
+                this.masterEpoch = newMasterEpoch;
+
+                // Change sync state set
+                final HashSet<String> newSyncStateSet = new HashSet<>();
+                newSyncStateSet.add(this.localAddress);
+                changeSyncStateSet(newSyncStateSet, syncStateSetEpoch);
+                schedulingCheckSyncStateSet();
+
+                // Handle the slave synchronise
+                handleSlaveSynchronize(BrokerRole.SYNC_MASTER);
+
+                
this.brokerController.getBrokerConfig().setBrokerId(MixAll.MASTER_ID);
+                
this.brokerController.getMessageStoreConfig().setBrokerRole(BrokerRole.SYNC_MASTER);

Review Comment:
   忘记changeSpecialServiceStatus为true



##########
store/src/main/java/org/apache/rocketmq/store/ha/autoswitch/AutoSwitchHAClient.java:
##########
@@ -42,22 +43,33 @@
 
 public class AutoSwitchHAClient extends ServiceThread implements HAClient {
 
+    /**
+     * Handshake header buffer size. Schema: state ordinal + 
flag(isSyncFromLastFile) + slaveId + slaveAddressLength.
+     */
+    public static final int HANDSHAKE_HEADER_SIZE = 4 + 4 + 8 + 4;

Review Comment:
   还是把slaveAddress内容放到header里面而不是body里面,单独把ip放在body里面有点奇怪



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to