RongtongJin commented on code in PR #4236:
URL: https://github.com/apache/rocketmq/pull/4236#discussion_r863323873


##########
store/src/main/java/org/apache/rocketmq/store/ha/autoswitch/AutoSwitchHAClient.java:
##########
@@ -0,0 +1,474 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.store.ha.autoswitch;
+
+import java.io.IOException;
+import java.net.SocketAddress;
+import java.nio.ByteBuffer;
+import java.nio.channels.ClosedChannelException;
+import java.nio.channels.SelectionKey;
+import java.nio.channels.Selector;
+import java.nio.channels.SocketChannel;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.atomic.AtomicReference;
+import org.apache.rocketmq.common.EpochEntry;
+import org.apache.rocketmq.common.ServiceThread;
+import org.apache.rocketmq.common.constant.LoggerName;
+import org.apache.rocketmq.logging.InternalLogger;
+import org.apache.rocketmq.logging.InternalLoggerFactory;
+import org.apache.rocketmq.remoting.common.RemotingUtil;
+import org.apache.rocketmq.store.DefaultMessageStore;
+import org.apache.rocketmq.store.ha.FlowMonitor;
+import org.apache.rocketmq.store.ha.HAClient;
+import org.apache.rocketmq.store.ha.HAConnectionState;
+import org.apache.rocketmq.store.ha.io.AbstractHAReader;
+import org.apache.rocketmq.store.ha.io.HAWriter;
+
+public class AutoSwitchHAClient extends ServiceThread implements HAClient {
+    /**
+     * Transfer header buffer size. Schema: state ordinal + maxOffset
+     */
+    public static final int TRANSFER_HEADER_SIZE = 4 + 8;
+    private static final InternalLogger LOGGER = 
InternalLoggerFactory.getLogger(LoggerName.STORE_LOGGER_NAME);
+    private static final int READ_MAX_BUFFER_SIZE = 1024 * 1024 * 4;
+    private final AtomicReference<String> masterHaAddress = new 
AtomicReference<>();
+    private final AtomicReference<String> masterAddress = new 
AtomicReference<>();
+    private final ByteBuffer transferHeaderBuffer = 
ByteBuffer.allocate(TRANSFER_HEADER_SIZE);
+    private final AutoSwitchHAService haService;
+    private final ByteBuffer byteBufferRead = 
ByteBuffer.allocate(READ_MAX_BUFFER_SIZE);
+    private final DefaultMessageStore messageStore;
+    private final EpochFileCache epochCache;
+
+    private SocketChannel socketChannel;
+    private Selector selector;
+    private AbstractHAReader haReader;
+    private HAWriter haWriter;
+    private FlowMonitor flowMonitor;
+    /**
+     * last time that slave reads date from master.
+     */
+    private long lastReadTimestamp;
+    /**
+     * last time that slave reports offset to master.
+     */
+    private long lastWriteTimestamp;
+
+    private long currentReportedOffset;
+    private int processPosition;
+    private volatile HAConnectionState currentState;
+    /**
+     * Current epoch
+     */
+    private volatile long currentReceivedEpoch;
+
+    /**
+     * Confirm offset = min(localMaxOffset, master confirm offset).
+     */
+    private volatile long confirmOffset;
+
+    public AutoSwitchHAClient(AutoSwitchHAService haService, 
DefaultMessageStore defaultMessageStore,
+        EpochFileCache epochCache) throws IOException {
+        this.haService = haService;
+        this.messageStore = defaultMessageStore;
+        this.epochCache = epochCache;
+        init();
+    }
+
+    public void init() throws IOException {
+        this.selector = RemotingUtil.openSelector();
+        this.flowMonitor = new 
FlowMonitor(this.messageStore.getMessageStoreConfig());
+        this.haReader = new HAClientReader();
+        haReader.registerHook(readSize -> {
+            if (readSize > 0) {
+                
AutoSwitchHAClient.this.flowMonitor.addByteCountTransferred(readSize);
+                lastReadTimestamp = System.currentTimeMillis();
+            }
+        });
+        this.haWriter = new HAWriter();
+        haWriter.registerHook(writeSize -> {
+            if (writeSize > 0) {
+                lastWriteTimestamp = System.currentTimeMillis();
+            }
+        });
+        changeCurrentState(HAConnectionState.READY);
+        this.currentReceivedEpoch = -1;
+        this.currentReportedOffset = 0;
+        this.processPosition = 0;
+        this.confirmOffset = -1;
+        this.lastReadTimestamp = System.currentTimeMillis();
+        this.lastWriteTimestamp = System.currentTimeMillis();
+    }
+
+    public void reOpen() throws IOException {
+        shutdown();
+        init();
+    }
+
+    @Override public String getServiceName() {
+        return AutoSwitchHAClient.class.getSimpleName();
+    }
+
+    @Override public void updateMasterAddress(String newAddress) {
+        String currentAddr = this.masterAddress.get();
+        if (masterAddress.compareAndSet(currentAddr, newAddress)) {
+            LOGGER.info("update master address, OLD: " + currentAddr + " NEW: 
" + newAddress);
+        }
+    }
+
+    @Override public void updateHaMasterAddress(String newAddress) {
+        String currentAddr = this.masterHaAddress.get();
+        if (masterHaAddress.compareAndSet(currentAddr, newAddress)) {
+            LOGGER.info("update master ha address, OLD: " + currentAddr + " 
NEW: " + newAddress);
+        }
+    }
+
+    @Override public String getMasterAddress() {
+        return this.masterAddress.get();
+    }
+
+    @Override public String getHaMasterAddress() {
+        return this.masterHaAddress.get();
+    }
+
+    @Override public long getLastReadTimestamp() {
+        return this.lastReadTimestamp;
+    }
+
+    @Override public long getLastWriteTimestamp() {
+        return this.lastWriteTimestamp;
+    }
+
+    @Override public HAConnectionState getCurrentState() {
+        return this.currentState;
+    }
+
+    @Override public void changeCurrentState(HAConnectionState 
haConnectionState) {
+        LOGGER.info("change state to {}", haConnectionState);
+        this.currentState = haConnectionState;
+    }
+
+    public void closeMasterAndWait() {
+        this.closeMaster();
+        this.waitForRunning(1000 * 5);
+    }
+
+    @Override public void closeMaster() {
+        if (null != this.socketChannel) {
+            try {
+                SelectionKey sk = this.socketChannel.keyFor(this.selector);
+                if (sk != null) {
+                    sk.cancel();
+                }
+
+                this.socketChannel.close();
+                this.socketChannel = null;
+
+                LOGGER.info("AutoRecoverHAClient close connection with master 
{}", this.masterHaAddress.get());
+                this.changeCurrentState(HAConnectionState.READY);
+            } catch (IOException e) {
+                LOGGER.warn("CloseMaster exception. ", e);
+            }
+
+            this.lastReadTimestamp = 0;
+            this.processPosition = 0;
+
+            this.byteBufferRead.position(0);
+            this.byteBufferRead.limit(READ_MAX_BUFFER_SIZE);
+        }
+    }
+
+    @Override public long getTransferredByteInSecond() {
+        return this.flowMonitor.getTransferredByteInSecond();
+    }
+
+    @Override public void shutdown() {
+        changeCurrentState(HAConnectionState.SHUTDOWN);
+        // Shutdown thread firstly
+        this.flowMonitor.shutdown();
+        super.shutdown();
+
+        closeMaster();
+        try {
+            this.selector.close();
+        } catch (IOException e) {
+            LOGGER.warn("Close the selector of AutoRecoverHAClient error, ", 
e);
+        }
+    }
+
+    private boolean isTimeToReportOffset() {
+        long interval = this.messageStore.now() - this.lastWriteTimestamp;
+        return interval > 
this.messageStore.getMessageStoreConfig().getHaSendHeartbeatInterval();
+    }
+
+    private boolean sendHandshakeHeader() {
+        this.transferHeaderBuffer.position(0);
+        this.transferHeaderBuffer.limit(TRANSFER_HEADER_SIZE);
+        
this.transferHeaderBuffer.putInt(HAConnectionState.HANDSHAKE.ordinal());
+        this.transferHeaderBuffer.putLong(0L);
+        this.transferHeaderBuffer.flip();
+        return this.haWriter.write(this.socketChannel, 
this.transferHeaderBuffer);
+    }
+
+    private void handshakeWithMaster() throws IOException {
+        sendHandshakeHeader();
+        boolean result = this.sendHandshakeHeader();
+        if (!result) {
+            closeMasterAndWait();
+        }
+
+        this.selector.select(5000);
+
+        result = this.haReader.read(this.socketChannel, this.byteBufferRead);
+        if (!result) {
+            closeMasterAndWait();
+            return;
+        }
+    }
+
+    private boolean reportSlaveOffset(final long offsetToReport) {
+        this.transferHeaderBuffer.position(0);
+        this.transferHeaderBuffer.limit(TRANSFER_HEADER_SIZE);
+        this.transferHeaderBuffer.putInt(this.currentState.ordinal());
+        this.transferHeaderBuffer.putLong(offsetToReport);
+        this.transferHeaderBuffer.flip();
+        return this.haWriter.write(this.socketChannel, 
this.transferHeaderBuffer);
+    }
+
+    private boolean reportSlaveMaxOffset() {
+        boolean result = true;
+        final long maxPhyOffset = this.messageStore.getMaxPhyOffset();
+        if (maxPhyOffset > this.currentReportedOffset) {
+            this.currentReportedOffset = maxPhyOffset;
+            result = reportSlaveOffset(this.currentReportedOffset);
+        }
+        return result;
+    }
+
+    public boolean connectMaster() throws ClosedChannelException {
+        if (null == this.socketChannel) {
+            String addr = this.masterHaAddress.get();
+            if (addr != null) {
+                SocketAddress socketAddress = 
RemotingUtil.string2SocketAddress(addr);
+                this.socketChannel = RemotingUtil.connect(socketAddress);
+                if (this.socketChannel != null) {
+                    this.socketChannel.register(this.selector, 
SelectionKey.OP_READ);
+                    LOGGER.info("AutoRecoverHAClient connect to master {}", 
addr);
+                    changeCurrentState(HAConnectionState.HANDSHAKE);
+                }
+            }
+            this.currentReportedOffset = this.messageStore.getMaxPhyOffset();
+            this.lastReadTimestamp = System.currentTimeMillis();
+        }
+        return this.socketChannel != null;
+    }
+
+    private boolean transferFromMaster() throws IOException {
+        boolean result;
+        if (isTimeToReportOffset()) {
+            LOGGER.info("Slave report current offset {}", 
this.currentReportedOffset);
+            result = reportSlaveOffset(this.currentReportedOffset);
+            if (!result) {
+                return false;
+            }
+        }
+
+        this.selector.select(1000);
+
+        result = this.haReader.read(this.socketChannel, this.byteBufferRead);
+        if (!result) {
+            return false;
+        }
+
+        return this.reportSlaveMaxOffset();
+    }
+
+    @Override public void run() {
+        LOGGER.info(this.getServiceName() + " service started");
+
+        this.flowMonitor.start();
+        while (!this.isStopped()) {
+            try {
+                switch (this.currentState) {
+                    case SHUTDOWN:
+                        return;
+                    case READY:
+                        // Truncate invalid msg first
+                        final long truncateOffset = 
AutoSwitchHAClient.this.haService.truncateInvalidMsg();
+                        if (truncateOffset >= 0) {
+                            
AutoSwitchHAClient.this.epochCache.truncateFromOffset(truncateOffset);
+                        }
+                        if (!connectMaster()) {
+                            LOGGER.warn("AutoRecoverHAClient connect to master 
{} failed", this.masterHaAddress.get());
+                            waitForRunning(1000 * 5);
+                        }
+                        continue;
+                    case HANDSHAKE:
+                        handshakeWithMaster();
+                        continue;
+                    case TRANSFER:
+                        if (!transferFromMaster()) {
+                            closeMasterAndWait();
+                            continue;
+                        }
+                        break;
+                    case SUSPEND:
+                    default:
+                        waitForRunning(1000 * 5);
+                        continue;
+                }
+                long interval = this.messageStore.now() - 
this.lastReadTimestamp;
+                if (interval > 
this.messageStore.getMessageStoreConfig().getHaHousekeepingInterval()) {
+                    LOGGER.warn("AutoRecoverHAClient, housekeeping, found this 
connection[" + this.masterHaAddress

Review Comment:
   AutoSwicthHAClient



##########
store/src/main/java/org/apache/rocketmq/store/config/MessageStoreConfig.java:
##########
@@ -33,6 +33,11 @@ public class MessageStoreConfig {
     private String storePathCommitLog = System.getProperty("user.home") + 
File.separator + "store"
         + File.separator + "commitlog";
 
+    //The directory in which the commitlog is kept
+    @ImportantField
+    private String storePathEpochFile = System.getProperty("user.home") + 
File.separator + "store"
+        + File.separator + "epochFileCheckpoint";

Review Comment:
   最好和commitlog文件在同一级目录下,现在是和commitlog目录在同一级



##########
store/src/main/java/org/apache/rocketmq/store/ha/autoswitch/AutoSwitchHAClient.java:
##########
@@ -0,0 +1,474 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.store.ha.autoswitch;
+
+import java.io.IOException;
+import java.net.SocketAddress;
+import java.nio.ByteBuffer;
+import java.nio.channels.ClosedChannelException;
+import java.nio.channels.SelectionKey;
+import java.nio.channels.Selector;
+import java.nio.channels.SocketChannel;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.atomic.AtomicReference;
+import org.apache.rocketmq.common.EpochEntry;
+import org.apache.rocketmq.common.ServiceThread;
+import org.apache.rocketmq.common.constant.LoggerName;
+import org.apache.rocketmq.logging.InternalLogger;
+import org.apache.rocketmq.logging.InternalLoggerFactory;
+import org.apache.rocketmq.remoting.common.RemotingUtil;
+import org.apache.rocketmq.store.DefaultMessageStore;
+import org.apache.rocketmq.store.ha.FlowMonitor;
+import org.apache.rocketmq.store.ha.HAClient;
+import org.apache.rocketmq.store.ha.HAConnectionState;
+import org.apache.rocketmq.store.ha.io.AbstractHAReader;
+import org.apache.rocketmq.store.ha.io.HAWriter;
+
+public class AutoSwitchHAClient extends ServiceThread implements HAClient {
+    /**
+     * Transfer header buffer size. Schema: state ordinal + maxOffset
+     */
+    public static final int TRANSFER_HEADER_SIZE = 4 + 8;
+    private static final InternalLogger LOGGER = 
InternalLoggerFactory.getLogger(LoggerName.STORE_LOGGER_NAME);
+    private static final int READ_MAX_BUFFER_SIZE = 1024 * 1024 * 4;
+    private final AtomicReference<String> masterHaAddress = new 
AtomicReference<>();
+    private final AtomicReference<String> masterAddress = new 
AtomicReference<>();
+    private final ByteBuffer transferHeaderBuffer = 
ByteBuffer.allocate(TRANSFER_HEADER_SIZE);
+    private final AutoSwitchHAService haService;
+    private final ByteBuffer byteBufferRead = 
ByteBuffer.allocate(READ_MAX_BUFFER_SIZE);
+    private final DefaultMessageStore messageStore;
+    private final EpochFileCache epochCache;
+
+    private SocketChannel socketChannel;
+    private Selector selector;
+    private AbstractHAReader haReader;
+    private HAWriter haWriter;
+    private FlowMonitor flowMonitor;
+    /**
+     * last time that slave reads date from master.
+     */
+    private long lastReadTimestamp;
+    /**
+     * last time that slave reports offset to master.
+     */
+    private long lastWriteTimestamp;
+
+    private long currentReportedOffset;
+    private int processPosition;
+    private volatile HAConnectionState currentState;
+    /**
+     * Current epoch
+     */
+    private volatile long currentReceivedEpoch;
+
+    /**
+     * Confirm offset = min(localMaxOffset, master confirm offset).
+     */
+    private volatile long confirmOffset;
+
+    public AutoSwitchHAClient(AutoSwitchHAService haService, 
DefaultMessageStore defaultMessageStore,
+        EpochFileCache epochCache) throws IOException {
+        this.haService = haService;
+        this.messageStore = defaultMessageStore;
+        this.epochCache = epochCache;
+        init();
+    }
+
+    public void init() throws IOException {
+        this.selector = RemotingUtil.openSelector();
+        this.flowMonitor = new 
FlowMonitor(this.messageStore.getMessageStoreConfig());
+        this.haReader = new HAClientReader();
+        haReader.registerHook(readSize -> {
+            if (readSize > 0) {
+                
AutoSwitchHAClient.this.flowMonitor.addByteCountTransferred(readSize);
+                lastReadTimestamp = System.currentTimeMillis();
+            }
+        });
+        this.haWriter = new HAWriter();
+        haWriter.registerHook(writeSize -> {
+            if (writeSize > 0) {
+                lastWriteTimestamp = System.currentTimeMillis();
+            }
+        });
+        changeCurrentState(HAConnectionState.READY);
+        this.currentReceivedEpoch = -1;
+        this.currentReportedOffset = 0;
+        this.processPosition = 0;
+        this.confirmOffset = -1;
+        this.lastReadTimestamp = System.currentTimeMillis();
+        this.lastWriteTimestamp = System.currentTimeMillis();
+    }
+
+    public void reOpen() throws IOException {
+        shutdown();
+        init();
+    }
+
+    @Override public String getServiceName() {
+        return AutoSwitchHAClient.class.getSimpleName();
+    }
+
+    @Override public void updateMasterAddress(String newAddress) {
+        String currentAddr = this.masterAddress.get();
+        if (masterAddress.compareAndSet(currentAddr, newAddress)) {
+            LOGGER.info("update master address, OLD: " + currentAddr + " NEW: 
" + newAddress);
+        }
+    }
+
+    @Override public void updateHaMasterAddress(String newAddress) {
+        String currentAddr = this.masterHaAddress.get();
+        if (masterHaAddress.compareAndSet(currentAddr, newAddress)) {
+            LOGGER.info("update master ha address, OLD: " + currentAddr + " 
NEW: " + newAddress);
+        }
+    }
+
+    @Override public String getMasterAddress() {
+        return this.masterAddress.get();
+    }
+
+    @Override public String getHaMasterAddress() {
+        return this.masterHaAddress.get();
+    }
+
+    @Override public long getLastReadTimestamp() {
+        return this.lastReadTimestamp;
+    }
+
+    @Override public long getLastWriteTimestamp() {
+        return this.lastWriteTimestamp;
+    }
+
+    @Override public HAConnectionState getCurrentState() {
+        return this.currentState;
+    }
+
+    @Override public void changeCurrentState(HAConnectionState 
haConnectionState) {
+        LOGGER.info("change state to {}", haConnectionState);
+        this.currentState = haConnectionState;
+    }
+
+    public void closeMasterAndWait() {
+        this.closeMaster();
+        this.waitForRunning(1000 * 5);
+    }
+
+    @Override public void closeMaster() {
+        if (null != this.socketChannel) {
+            try {
+                SelectionKey sk = this.socketChannel.keyFor(this.selector);
+                if (sk != null) {
+                    sk.cancel();
+                }
+
+                this.socketChannel.close();
+                this.socketChannel = null;
+
+                LOGGER.info("AutoRecoverHAClient close connection with master 
{}", this.masterHaAddress.get());

Review Comment:
   AutoSwitchHAClient



##########
store/src/test/java/org/apache/rocketmq/store/ha/EpochFileCacheTest.java:
##########
@@ -0,0 +1,117 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.store.ha;
+
+import java.io.File;
+import java.nio.file.Paths;
+import org.apache.rocketmq.common.EpochEntry;
+import org.apache.rocketmq.store.ha.autoswitch.EpochFileCache;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
+
+public class EpochFileCacheTest {
+    private EpochFileCache epochCache;
+    private EpochFileCache epochCache2;
+    private String path;
+    private String path2;
+
+    @Before
+    public void setup() {
+        this.path = Paths.get(File.separator + "tmp", 
"EpochCheckpoint").toString();
+        this.epochCache = new EpochFileCache(path);
+        assertTrue(this.epochCache.appendEntry(new EpochEntry(1, 100)));
+        assertTrue(this.epochCache.appendEntry(new EpochEntry(2, 300)));
+        assertTrue(this.epochCache.appendEntry(new EpochEntry(3, 500)));
+        final EpochEntry entry = this.epochCache.getEntry(2);
+        assertEquals(entry.getEpoch(), 2);
+        assertEquals(entry.getStartOffset(), 300);
+        assertEquals(entry.getEndOffset(), 500);
+    }
+
+    @After
+    public void shutdown() {
+        new File(this.path).delete();
+        if (this.path2 != null) {
+            new File(this.path2).delete();
+        }
+    }
+
+    @Test
+    public void testInitFromFile() {
+        // Remove entries, init from file
+        assertTrue(this.epochCache.initCacheFromFile());
+        final EpochEntry entry = this.epochCache.getEntry(2);
+        assertEquals(entry.getEpoch(), 2);
+        assertEquals(entry.getStartOffset(), 300);
+        assertEquals(entry.getEndOffset(), 500);
+    }
+
+    @Test
+    public void testTruncate() {
+        this.epochCache.truncateFromOffset(150);
+        assertNotNull(this.epochCache.getEntry(1));
+        assertNull(this.epochCache.getEntry(2));
+    }
+
+    @Test
+    public void testFindConsistentPointSample1() {
+        this.path2 = Paths.get(File.separator + "tmp", 
"EpochCheckpoint2").toString();
+        this.epochCache2 = new EpochFileCache(path2);
+        assertTrue(this.epochCache2.appendEntry(new EpochEntry(1, 100)));
+        assertTrue(this.epochCache2.appendEntry(new EpochEntry(2, 300)));
+        assertTrue(this.epochCache2.appendEntry(new EpochEntry(3, 450)));
+        /**
+         *  cache1: <Epoch1, 100>, <Epoch2, 300>, <Epoch3, 500>
+         *  cache2: <Epoch1, 100>, <Epoch2, 300>, <Epoch3, 450>
+         *  The consistent point should be 450
+         */
+        final long consistentPoint = 
this.epochCache.findConsistentPoint(this.epochCache2);
+        assertEquals(consistentPoint, 450);
+    }
+
+    @Test
+    public void testFindConsistentPointSample2() {
+        this.path2 = Paths.get(File.separator + "tmp", 
"EpochCheckpoint2").toString();
+        this.epochCache2 = new EpochFileCache(path2);
+        assertTrue(this.epochCache2.appendEntry(new EpochEntry(1, 100)));
+        assertTrue(this.epochCache2.appendEntry(new EpochEntry(2, 300)));
+        assertTrue(this.epochCache2.appendEntry(new EpochEntry(3, 500)));
+        /**
+         *  cache1: <Epoch1, 100>, <Epoch2, 300>, <Epoch3, 500, 700>
+         *  cache2: <Epoch1, 100>, <Epoch2, 300>, <Epoch3, 500, 600>
+         *  The consistent point should be 600
+         */
+        this.epochCache.setLastEpochEntryEndOffset(700);
+        this.epochCache2.setLastEpochEntryEndOffset(600);
+        final long consistentPoint = 
this.epochCache.findConsistentPoint(this.epochCache2);
+        assertEquals(consistentPoint, 600);
+    }

Review Comment:
   It would be better to add some other samples :-)



##########
store/src/main/java/org/apache/rocketmq/store/ha/autoswitch/EpochFileCache.java:
##########
@@ -0,0 +1,311 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.store.ha.autoswitch;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.TreeMap;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReadWriteLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+import java.util.function.Predicate;
+import org.apache.rocketmq.common.EpochEntry;
+import org.apache.rocketmq.common.constant.LoggerName;
+import org.apache.rocketmq.common.utils.CheckpointFile;
+import org.apache.rocketmq.logging.InternalLogger;
+import org.apache.rocketmq.logging.InternalLoggerFactory;
+
+/**
+ * Cache for epochFile.
+ * Mapping (Epoch -> StartOffset)
+ */
+public class EpochFileCache {
+    private static final InternalLogger log = 
InternalLoggerFactory.getLogger(LoggerName.STORE_LOGGER_NAME);
+    private final ReadWriteLock readWriteLock = new ReentrantReadWriteLock();
+    private final Lock readLock = this.readWriteLock.readLock();
+    private final Lock writeLock = this.readWriteLock.writeLock();
+    private final TreeMap<Integer, EpochEntry> epochMap;
+    private CheckpointFile<EpochEntry> checkpoint;
+
+    public EpochFileCache() {
+        this.epochMap = new TreeMap<>();
+    }
+
+    public EpochFileCache(final String path) {
+        this.epochMap = new TreeMap<>();
+        this.checkpoint = new CheckpointFile<>(path, new 
EpochEntrySerializer());
+    }
+
+    public boolean initCacheFromFile() {
+        this.writeLock.lock();
+        try {
+            final List<EpochEntry> entries = this.checkpoint.read();
+            initEntries(entries);
+            return true;
+        } catch (final IOException e) {
+            log.error("Error happen when init epoch entries from epochFile", 
e);
+            return false;
+        } finally {
+            this.writeLock.unlock();
+        }
+    }
+
+    public void initCacheFromEntries(final List<EpochEntry> entries) {
+        this.writeLock.lock();
+        try {
+            initEntries(entries);
+            flush();
+        } finally {
+            this.writeLock.unlock();
+        }
+    }
+
+    private void initEntries(final List<EpochEntry> entries) {
+        this.epochMap.clear();
+        EpochEntry preEntry = null;
+        for (final EpochEntry entry : entries) {
+            this.epochMap.put(entry.getEpoch(), entry);
+            if (preEntry != null) {
+                preEntry.setEndOffset(entry.getStartOffset());
+            }
+            preEntry = entry;
+        }
+    }
+
+    public boolean appendEntry(final EpochEntry entry) {
+        this.writeLock.lock();
+        try {
+            if (!this.epochMap.isEmpty()) {
+                final EpochEntry lastEntry = 
this.epochMap.lastEntry().getValue();
+                if (lastEntry.getEpoch() >= entry.getEpoch() || 
lastEntry.getStartOffset() >= entry.getStartOffset()) {
+                    return false;
+                }
+                lastEntry.setEndOffset(entry.getStartOffset());
+            }
+            this.epochMap.put(entry.getEpoch(), new EpochEntry(entry));
+            flush();
+            return true;
+        } finally {
+            this.writeLock.unlock();
+        }
+    }
+
+    /**
+     * Set endOffset for lastEpochEntry.
+     */
+    public void setLastEpochEntryEndOffset(final long endOffset) {
+        this.writeLock.lock();
+        try {
+            if (!this.epochMap.isEmpty()) {
+                final EpochEntry lastEntry = 
this.epochMap.lastEntry().getValue();
+                if (lastEntry.getStartOffset() <= endOffset) {
+                    lastEntry.setEndOffset(endOffset);
+                }
+            }
+        } finally {
+            this.writeLock.unlock();
+        }
+    }
+
+    public EpochEntry firstEntry() {
+        this.readLock.lock();
+        try {
+            if (this.epochMap.isEmpty()) {
+                return null;
+            }
+            return new EpochEntry(this.epochMap.firstEntry().getValue());
+        } finally {
+            this.readLock.unlock();
+        }
+    }
+
+    public EpochEntry lastEntry() {
+        this.readLock.lock();
+        try {
+            if (this.epochMap.isEmpty()) {
+                return null;
+            }
+            return new EpochEntry(this.epochMap.lastEntry().getValue());
+        } finally {
+            this.readLock.unlock();
+        }
+    }
+
+    public int lastEpoch() {
+        final EpochEntry entry = lastEntry();
+        if (entry != null) {
+            return entry.getEpoch();
+        }
+        return -1;
+    }
+
+    public EpochEntry getEntry(final int epoch) {
+        this.readLock.lock();
+        try {
+            if (this.epochMap.containsKey(epoch)) {
+                final EpochEntry entry = this.epochMap.get(epoch);
+                return new EpochEntry(entry);
+            }
+            return null;
+        } finally {
+            this.readLock.unlock();
+        }
+    }
+
+    public EpochEntry findEpochEntryByOffset(final long offset) {
+        this.readLock.lock();
+        try {
+            if (!this.epochMap.isEmpty()) {
+                for (Map.Entry<Integer, EpochEntry> entry : 
this.epochMap.entrySet()) {
+                    if (entry.getValue().getStartOffset() <= offset && 
entry.getValue().getEndOffset() > offset) {
+                        return new EpochEntry(entry.getValue());
+                    }
+                }
+            }
+            return null;
+        } finally {
+            this.readLock.unlock();
+        }
+    }
+
+    public EpochEntry nextEntry(final int epoch) {
+        this.readLock.lock();
+        try {
+            final Map.Entry<Integer, EpochEntry> entry = 
this.epochMap.ceilingEntry(epoch + 1);
+            if (entry != null) {
+                return new EpochEntry(entry.getValue());
+            }
+            return null;
+        } finally {
+            this.readLock.unlock();
+        }
+    }
+
+    public List<EpochEntry> getAllEntries() {
+        this.readLock.lock();
+        try {
+            final ArrayList<EpochEntry> result = new 
ArrayList<>(this.epochMap.size());
+            this.epochMap.forEach((key, value) -> result.add(new 
EpochEntry(value)));
+            return result;
+        } finally {
+            this.readLock.unlock();
+        }
+    }
+
+    /**
+     * Find the consistentPoint between compareCache and local.
+     *
+     * @return the consistent offset
+     */
+    public long findConsistentPoint(final EpochFileCache compareCache) {
+        this.readLock.lock();
+        try {
+            long consistentOffset = -1;
+            final Map<Integer, EpochEntry> descendingMap = new 
TreeMap<>(this.epochMap).descendingMap();
+            final Iterator<Map.Entry<Integer, EpochEntry>> iter = 
descendingMap.entrySet().iterator();
+            while (iter.hasNext()) {
+                final Map.Entry<Integer, EpochEntry> curLocalEntry = 
iter.next();
+                final EpochEntry compareEntry = 
compareCache.getEntry(curLocalEntry.getKey());
+                if (compareEntry != null && compareEntry.getStartOffset() == 
curLocalEntry.getValue().getStartOffset()) {
+                    consistentOffset = 
Math.min(curLocalEntry.getValue().getEndOffset(), compareEntry.getEndOffset());
+                    break;
+                }
+            }
+            return consistentOffset;
+        } finally {
+            this.readLock.unlock();
+        }
+    }
+
+    /**
+     * Remove epochEntries with epoch >= truncateEpoch.
+     */
+    public void truncateFromEpoch(final int truncateEpoch) {
+        doTruncate((entry) -> entry.getEpoch() >= truncateEpoch);
+    }
+
+    /**
+     * Remove epochEntries with startOffset >= truncateOffset.
+     */
+    public void truncateFromOffset(final long truncateOffset) {
+        doTruncate((entry) -> entry.getStartOffset() >= truncateOffset);
+    }
+
+    /**
+     * Clear all epochEntries
+     */
+    public void clearAll() {
+        doTruncate((entry) -> true);
+    }
+
+    private void doTruncate(final Predicate<EpochEntry> predict) {
+        this.writeLock.lock();
+        try {
+            this.epochMap.entrySet().removeIf(entry -> 
predict.test(entry.getValue()));
+            final EpochEntry entry = lastEntry();
+            if (entry != null) {
+                entry.setEndOffset(Long.MAX_VALUE);
+            }
+            flush();
+        } finally {
+            this.writeLock.unlock();
+        }
+    }
+
+    private void flush() {
+        this.writeLock.lock();
+        try {
+            final ArrayList<EpochEntry> entries = new 
ArrayList<>(this.epochMap.values());
+            try {
+                if (this.checkpoint != null) {
+                    this.checkpoint.write(entries);
+                }
+            } catch (final IOException e) {
+                log.error("Error happen when flush epochEntries to 
epochCheckpointFile", e);
+            }

Review Comment:
   It would be better to initialize entries when the checkpoint is not  null



##########
store/src/main/java/org/apache/rocketmq/store/ha/autoswitch/EpochFileCache.java:
##########
@@ -0,0 +1,311 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.store.ha.autoswitch;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.TreeMap;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReadWriteLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+import java.util.function.Predicate;
+import org.apache.rocketmq.common.EpochEntry;
+import org.apache.rocketmq.common.constant.LoggerName;
+import org.apache.rocketmq.common.utils.CheckpointFile;
+import org.apache.rocketmq.logging.InternalLogger;
+import org.apache.rocketmq.logging.InternalLoggerFactory;
+
+/**
+ * Cache for epochFile.
+ * Mapping (Epoch -> StartOffset)
+ */
+public class EpochFileCache {
+    private static final InternalLogger log = 
InternalLoggerFactory.getLogger(LoggerName.STORE_LOGGER_NAME);
+    private final ReadWriteLock readWriteLock = new ReentrantReadWriteLock();
+    private final Lock readLock = this.readWriteLock.readLock();
+    private final Lock writeLock = this.readWriteLock.writeLock();
+    private final TreeMap<Integer, EpochEntry> epochMap;
+    private CheckpointFile<EpochEntry> checkpoint;
+
+    public EpochFileCache() {
+        this.epochMap = new TreeMap<>();
+    }
+
+    public EpochFileCache(final String path) {
+        this.epochMap = new TreeMap<>();
+        this.checkpoint = new CheckpointFile<>(path, new 
EpochEntrySerializer());
+    }
+
+    public boolean initCacheFromFile() {
+        this.writeLock.lock();
+        try {
+            final List<EpochEntry> entries = this.checkpoint.read();
+            initEntries(entries);
+            return true;
+        } catch (final IOException e) {
+            log.error("Error happen when init epoch entries from epochFile", 
e);
+            return false;
+        } finally {
+            this.writeLock.unlock();
+        }
+    }
+
+    public void initCacheFromEntries(final List<EpochEntry> entries) {
+        this.writeLock.lock();
+        try {
+            initEntries(entries);
+            flush();
+        } finally {
+            this.writeLock.unlock();
+        }
+    }
+
+    private void initEntries(final List<EpochEntry> entries) {
+        this.epochMap.clear();
+        EpochEntry preEntry = null;
+        for (final EpochEntry entry : entries) {
+            this.epochMap.put(entry.getEpoch(), entry);
+            if (preEntry != null) {
+                preEntry.setEndOffset(entry.getStartOffset());
+            }
+            preEntry = entry;
+        }
+    }
+
+    public boolean appendEntry(final EpochEntry entry) {
+        this.writeLock.lock();
+        try {
+            if (!this.epochMap.isEmpty()) {
+                final EpochEntry lastEntry = 
this.epochMap.lastEntry().getValue();
+                if (lastEntry.getEpoch() >= entry.getEpoch() || 
lastEntry.getStartOffset() >= entry.getStartOffset()) {
+                    return false;
+                }

Review Comment:
   It would be better to print logs when appending entry fails.



##########
store/src/main/java/org/apache/rocketmq/store/ha/autoswitch/AutoSwitchHAConnection.java:
##########
@@ -0,0 +1,597 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.store.ha.autoswitch;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.channels.SelectionKey;
+import java.nio.channels.Selector;
+import java.nio.channels.SocketChannel;
+import java.util.List;
+import org.apache.rocketmq.common.EpochEntry;
+import org.apache.rocketmq.common.ServiceThread;
+import org.apache.rocketmq.common.constant.LoggerName;
+import org.apache.rocketmq.logging.InternalLogger;
+import org.apache.rocketmq.logging.InternalLoggerFactory;
+import org.apache.rocketmq.remoting.common.RemotingUtil;
+import org.apache.rocketmq.store.SelectMappedBufferResult;
+import org.apache.rocketmq.store.ha.FlowMonitor;
+import org.apache.rocketmq.store.ha.HAConnection;
+import org.apache.rocketmq.store.ha.HAConnectionState;
+import org.apache.rocketmq.store.ha.io.AbstractHAReader;
+import org.apache.rocketmq.store.ha.io.HAWriter;
+
+public class AutoSwitchHAConnection implements HAConnection {
+    /**
+     * Header protocol in syncing msg from master.
+     * Format: current state + body size + offset + epoch + 
additionalInfo(confirmOffset / lastFlushOffset).
+     * If the msg is normalMsg, the additionalInfo is confirmOffset
+     * If the msg is hankeShakeMsg, the body size = EpochEntrySize * 
EpochEntryNums, the additionalInfo is lastFlushOffset
+     */
+    public static final int MSG_HEADER_SIZE = 4 + 4 + 8 + 4 + 8;
+    public static final int EPOCH_ENTRY_SIZE = 12;
+    private static final InternalLogger LOGGER = 
InternalLoggerFactory.getLogger(LoggerName.STORE_LOGGER_NAME);
+    private final AutoSwitchHAService haService;
+    private final SocketChannel socketChannel;
+    private final String clientAddress;
+    private final EpochFileCache epochCache;
+    private final AbstractWriteSocketService writeSocketService;
+    private final ReadSocketService readSocketService;
+    private volatile HAConnectionState currentState = 
HAConnectionState.HANDSHAKE;
+    private volatile long slaveRequestOffset = -1;
+    private volatile long slaveAckOffset = -1;
+    /**
+     * Whether the slave have already sent a handshake message
+     */
+    private volatile boolean isSlaveSendHandshake = false;
+    private volatile int currentTransferEpoch = -1;
+    private volatile long currentTransferEpochEndOffset = 0;
+    private final FlowMonitor flowMonitor;
+
+    public AutoSwitchHAConnection(AutoSwitchHAService haService, SocketChannel 
socketChannel,
+        EpochFileCache epochCache) throws IOException {
+        this.haService = haService;
+        this.socketChannel = socketChannel;
+        this.epochCache = epochCache;
+        this.clientAddress = 
this.socketChannel.socket().getRemoteSocketAddress().toString();
+        this.socketChannel.configureBlocking(false);
+        this.socketChannel.socket().setSoLinger(false, -1);
+        this.socketChannel.socket().setTcpNoDelay(true);
+        this.socketChannel.socket().setReceiveBufferSize(1024 * 64);
+        this.socketChannel.socket().setSendBufferSize(1024 * 64);
+        this.writeSocketService = new WriteSocketService(this.socketChannel);
+        this.readSocketService = new ReadSocketService(this.socketChannel);
+        this.haService.getConnectionCount().incrementAndGet();
+        this.flowMonitor = new 
FlowMonitor(haService.getDefaultMessageStore().getMessageStoreConfig());
+    }
+
+    @Override public void start() {
+        changeCurrentState(HAConnectionState.HANDSHAKE);
+        this.flowMonitor.start();
+        this.readSocketService.start();
+        this.writeSocketService.start();
+    }
+
+    @Override public void shutdown() {
+        changeCurrentState(HAConnectionState.SHUTDOWN);
+        this.flowMonitor.shutdown(true);
+        this.writeSocketService.shutdown(true);
+        this.readSocketService.shutdown(true);
+        this.close();
+    }
+
+    @Override public void close() {
+        if (this.socketChannel != null) {
+            try {
+                this.socketChannel.close();
+            } catch (final IOException e) {
+                LOGGER.error("", e);
+            }
+        }
+    }
+
+    public void changeCurrentState(HAConnectionState connectionState) {
+        LOGGER.info("change state to {}", connectionState);
+        this.currentState = connectionState;
+    }
+
+    @Override public HAConnectionState getCurrentState() {
+        return currentState;
+    }
+
+    @Override public SocketChannel getSocketChannel() {
+        return socketChannel;
+    }
+
+    @Override public String getClientAddress() {
+        return clientAddress;
+    }
+
+    @Override public long getSlaveAckOffset() {
+        return slaveAckOffset;
+    }
+
+    @Override public long getTransferredByteInSecond() {
+        return flowMonitor.getTransferredByteInSecond();
+    }
+
+    @Override public long getTransferFromWhere() {
+        return this.writeSocketService.getNextTransferFromWhere();
+    }
+
+    class ReadSocketService extends ServiceThread {
+        private static final int READ_MAX_BUFFER_SIZE = 1024 * 1024;
+        private final Selector selector;
+        private final SocketChannel socketChannel;
+        private final ByteBuffer byteBufferRead = 
ByteBuffer.allocate(READ_MAX_BUFFER_SIZE);
+        private final AbstractHAReader haReader;
+        private int processPosition = 0;
+        private volatile long lastReadTimestamp = System.currentTimeMillis();
+
+        public ReadSocketService(final SocketChannel socketChannel) throws 
IOException {
+            this.selector = RemotingUtil.openSelector();
+            this.socketChannel = socketChannel;
+            this.socketChannel.register(this.selector, SelectionKey.OP_READ);
+            this.setDaemon(true);
+            haReader = new HAServerReader();
+            haReader.registerHook(readSize -> {
+                if (readSize > 0) {
+                    ReadSocketService.this.lastReadTimestamp =
+                        
haService.getDefaultMessageStore().getSystemClock().now();
+                }
+            });
+        }
+
+        @Override
+        public void run() {
+            LOGGER.info(this.getServiceName() + " service started");
+
+            while (!this.isStopped()) {
+                try {
+                    this.selector.select(1000);
+                    boolean ok = this.haReader.read(this.socketChannel, 
this.byteBufferRead);
+                    if (!ok) {
+                        AutoSwitchHAConnection.LOGGER.error("processReadEvent 
error");
+                        break;
+                    }
+
+                    long interval = 
haService.getDefaultMessageStore().getSystemClock().now() - 
this.lastReadTimestamp;
+                    if (interval > 
haService.getDefaultMessageStore().getMessageStoreConfig().getHaHousekeepingInterval())
 {
+                        LOGGER.warn("ha housekeeping, found this connection[" 
+ clientAddress + "] expired, " + interval);
+                        break;
+                    }
+                } catch (Exception e) {
+                    AutoSwitchHAConnection.LOGGER.error(this.getServiceName() 
+ " service has exception.", e);
+                    break;
+                }
+            }
+
+            this.makeStop();
+
+            changeCurrentState(HAConnectionState.SHUTDOWN);
+
+            writeSocketService.makeStop();
+
+            haService.removeConnection(AutoSwitchHAConnection.this);
+
+            haService.getConnectionCount().decrementAndGet();
+
+            SelectionKey sk = this.socketChannel.keyFor(this.selector);
+            if (sk != null) {
+                sk.cancel();
+            }
+
+            try {
+                this.selector.close();
+                this.socketChannel.close();
+            } catch (IOException e) {
+                AutoSwitchHAConnection.LOGGER.error("", e);
+            }
+
+            AutoSwitchHAConnection.LOGGER.info(this.getServiceName() + " 
service end");
+        }
+
+        @Override
+        public String getServiceName() {
+            return ReadSocketService.class.getSimpleName();
+        }
+
+        class HAServerReader extends AbstractHAReader {
+            @Override
+            protected boolean processReadResult(ByteBuffer byteBufferRead) {
+                while (true) {
+                    int diff = byteBufferRead.position() - 
ReadSocketService.this.processPosition;
+                    if (diff >= AutoSwitchHAClient.TRANSFER_HEADER_SIZE) {
+                        int readPosition = 
ReadSocketService.this.processPosition;
+                        HAConnectionState slaveState = 
HAConnectionState.values()[byteBufferRead.getInt(readPosition)];
+
+                        switch (slaveState) {
+                            case HANDSHAKE:
+                                isSlaveSendHandshake = true;
+                                ReadSocketService.this.processPosition += 
AutoSwitchHAClient.TRANSFER_HEADER_SIZE;
+                                break;
+                            case TRANSFER:
+                                long slaveMaxOffset = 
byteBufferRead.getLong(readPosition + 4);
+                                ReadSocketService.this.processPosition += 
AutoSwitchHAClient.TRANSFER_HEADER_SIZE;
+
+                                slaveAckOffset = slaveMaxOffset;
+                                if (slaveRequestOffset < 0) {
+                                    slaveRequestOffset = slaveMaxOffset;
+                                }
+                                LOGGER.info("slave[" + clientAddress + "] 
request offset " + slaveMaxOffset);
+                                
AutoSwitchHAConnection.this.haService.notifyTransferSome(AutoSwitchHAConnection.this.slaveAckOffset);
+                                break;
+                            default:
+                                LOGGER.error("Current state illegal {}", 
currentState);
+                                break;
+                        }
+
+                        if (!slaveState.equals(currentState)) {
+                            LOGGER.warn("Master change state from {} to {}", 
currentState, slaveState);
+                            changeCurrentState(slaveState);
+                        }
+                        continue;
+                    }
+
+                    if (!byteBufferRead.hasRemaining()) {
+                        
byteBufferRead.position(ReadSocketService.this.processPosition);
+                        byteBufferRead.compact();
+                        ReadSocketService.this.processPosition = 0;
+                    }
+                    break;
+                }
+
+                return true;
+            }
+        }
+    }
+
+    class WriteSocketService extends AbstractWriteSocketService {
+        private SelectMappedBufferResult selectMappedBufferResult;
+
+        public WriteSocketService(final SocketChannel socketChannel) throws 
IOException {
+            super(socketChannel);
+        }
+
+        @Override
+        protected int getNextTransferDataSize() {
+            SelectMappedBufferResult selectResult = 
haService.getDefaultMessageStore().getCommitLogData(this.nextTransferFromWhere);
+            if (selectResult == null || selectResult.getSize() <= 0) {
+                return 0;
+            }
+            this.selectMappedBufferResult = selectResult;
+            return selectResult.getSize();
+        }
+
+        @Override
+        protected void releaseData() {
+            this.selectMappedBufferResult.release();
+            this.selectMappedBufferResult = null;
+        }
+
+        @Override
+        protected boolean transferData(int maxTransferSize) {
+
+            if (null != this.selectMappedBufferResult && maxTransferSize >= 0) 
{
+                
this.selectMappedBufferResult.getByteBuffer().limit(maxTransferSize);
+            }
+
+            // Write Header
+            boolean result = haWriter.write(this.socketChannel, 
this.byteBufferHeader);
+
+            if (!result) {
+                return false;
+            }
+
+            if (null == this.selectMappedBufferResult) {
+                return true;
+            }
+
+            // Write Body
+            result = haWriter.write(this.socketChannel, 
this.selectMappedBufferResult.getByteBuffer());
+
+            if (result) {
+                releaseData();
+            }
+            return result;
+        }
+
+        @Override
+        protected void onStop() {
+            if (this.selectMappedBufferResult != null) {
+                this.selectMappedBufferResult.release();
+            }
+        }
+
+        @Override
+        public String getServiceName() {
+            if 
(haService.getDefaultMessageStore().getBrokerConfig().isInBrokerContainer()) {
+                return 
haService.getDefaultMessageStore().getBrokerConfig().getLoggerIdentifier() + 
WriteSocketService.class.getSimpleName();
+            }
+            return WriteSocketService.class.getSimpleName();
+        }
+    }
+
+    abstract class AbstractWriteSocketService extends ServiceThread {
+        protected final Selector selector;
+        protected final SocketChannel socketChannel;
+        protected final HAWriter haWriter;
+
+        protected final ByteBuffer byteBufferHeader = 
ByteBuffer.allocate(MSG_HEADER_SIZE);
+        // Store master epochFileCache: (Epoch + startOffset) * 1000
+        private final ByteBuffer handShakeBuffer = 
ByteBuffer.allocate(EPOCH_ENTRY_SIZE * 1000);
+        protected long nextTransferFromWhere = -1;
+        protected boolean lastWriteOver = true;
+        protected long lastWriteTimestamp = System.currentTimeMillis();
+        protected long lastPrintTimestamp = System.currentTimeMillis();
+        protected long transferOffset = 0;
+
+        public AbstractWriteSocketService(final SocketChannel socketChannel) 
throws IOException {
+            this.selector = RemotingUtil.openSelector();
+            this.socketChannel = socketChannel;
+            this.socketChannel.register(this.selector, SelectionKey.OP_WRITE);
+            this.setDaemon(true);
+            haWriter = new HAWriter();
+            haWriter.registerHook(writeSize -> {
+                flowMonitor.addByteCountTransferred(writeSize);
+                if (writeSize > 0) {
+                    AbstractWriteSocketService.this.lastWriteTimestamp =
+                        
haService.getDefaultMessageStore().getSystemClock().now();
+                }
+            });
+        }
+
+        public long getNextTransferFromWhere() {
+            return this.nextTransferFromWhere;
+        }
+
+        // Handle shake method
+        private boolean buildHandshakeBuffer() {
+            final List<EpochEntry> epochEntries = 
AutoSwitchHAConnection.this.epochCache.getAllEntries();
+            final int lastEpoch = 
AutoSwitchHAConnection.this.epochCache.lastEpoch();
+            final long maxPhyOffset = 
AutoSwitchHAConnection.this.haService.getDefaultMessageStore().getMaxPhyOffset();
+            this.byteBufferHeader.position(0);
+            this.byteBufferHeader.limit(MSG_HEADER_SIZE);
+            // State
+            this.byteBufferHeader.putInt(currentState.ordinal());
+            // Body size
+            this.byteBufferHeader.putInt(epochEntries.size() * 
EPOCH_ENTRY_SIZE);
+            // Offset
+            this.byteBufferHeader.putLong(0L);

Review Comment:
   Why not put masterMaxOffset in here but in additionalOffset?



##########
store/src/main/java/org/apache/rocketmq/store/ha/autoswitch/AutoSwitchHAConnection.java:
##########
@@ -0,0 +1,597 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.store.ha.autoswitch;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.channels.SelectionKey;
+import java.nio.channels.Selector;
+import java.nio.channels.SocketChannel;
+import java.util.List;
+import org.apache.rocketmq.common.EpochEntry;
+import org.apache.rocketmq.common.ServiceThread;
+import org.apache.rocketmq.common.constant.LoggerName;
+import org.apache.rocketmq.logging.InternalLogger;
+import org.apache.rocketmq.logging.InternalLoggerFactory;
+import org.apache.rocketmq.remoting.common.RemotingUtil;
+import org.apache.rocketmq.store.SelectMappedBufferResult;
+import org.apache.rocketmq.store.ha.FlowMonitor;
+import org.apache.rocketmq.store.ha.HAConnection;
+import org.apache.rocketmq.store.ha.HAConnectionState;
+import org.apache.rocketmq.store.ha.io.AbstractHAReader;
+import org.apache.rocketmq.store.ha.io.HAWriter;
+
+public class AutoSwitchHAConnection implements HAConnection {
+    /**
+     * Header protocol in syncing msg from master.
+     * Format: current state + body size + offset + epoch + 
additionalInfo(confirmOffset / lastFlushOffset).
+     * If the msg is normalMsg, the additionalInfo is confirmOffset
+     * If the msg is hankeShakeMsg, the body size = EpochEntrySize * 
EpochEntryNums, the additionalInfo is lastFlushOffset
+     */
+    public static final int MSG_HEADER_SIZE = 4 + 4 + 8 + 4 + 8;
+    public static final int EPOCH_ENTRY_SIZE = 12;
+    private static final InternalLogger LOGGER = 
InternalLoggerFactory.getLogger(LoggerName.STORE_LOGGER_NAME);
+    private final AutoSwitchHAService haService;
+    private final SocketChannel socketChannel;
+    private final String clientAddress;
+    private final EpochFileCache epochCache;
+    private final AbstractWriteSocketService writeSocketService;
+    private final ReadSocketService readSocketService;
+    private volatile HAConnectionState currentState = 
HAConnectionState.HANDSHAKE;
+    private volatile long slaveRequestOffset = -1;
+    private volatile long slaveAckOffset = -1;
+    /**
+     * Whether the slave have already sent a handshake message
+     */
+    private volatile boolean isSlaveSendHandshake = false;
+    private volatile int currentTransferEpoch = -1;
+    private volatile long currentTransferEpochEndOffset = 0;
+    private final FlowMonitor flowMonitor;
+
+    public AutoSwitchHAConnection(AutoSwitchHAService haService, SocketChannel 
socketChannel,
+        EpochFileCache epochCache) throws IOException {
+        this.haService = haService;
+        this.socketChannel = socketChannel;
+        this.epochCache = epochCache;
+        this.clientAddress = 
this.socketChannel.socket().getRemoteSocketAddress().toString();
+        this.socketChannel.configureBlocking(false);
+        this.socketChannel.socket().setSoLinger(false, -1);
+        this.socketChannel.socket().setTcpNoDelay(true);
+        this.socketChannel.socket().setReceiveBufferSize(1024 * 64);
+        this.socketChannel.socket().setSendBufferSize(1024 * 64);
+        this.writeSocketService = new WriteSocketService(this.socketChannel);
+        this.readSocketService = new ReadSocketService(this.socketChannel);
+        this.haService.getConnectionCount().incrementAndGet();
+        this.flowMonitor = new 
FlowMonitor(haService.getDefaultMessageStore().getMessageStoreConfig());
+    }
+
+    @Override public void start() {
+        changeCurrentState(HAConnectionState.HANDSHAKE);
+        this.flowMonitor.start();
+        this.readSocketService.start();
+        this.writeSocketService.start();
+    }
+
+    @Override public void shutdown() {
+        changeCurrentState(HAConnectionState.SHUTDOWN);
+        this.flowMonitor.shutdown(true);
+        this.writeSocketService.shutdown(true);
+        this.readSocketService.shutdown(true);
+        this.close();
+    }
+
+    @Override public void close() {
+        if (this.socketChannel != null) {
+            try {
+                this.socketChannel.close();
+            } catch (final IOException e) {
+                LOGGER.error("", e);
+            }
+        }
+    }
+
+    public void changeCurrentState(HAConnectionState connectionState) {
+        LOGGER.info("change state to {}", connectionState);
+        this.currentState = connectionState;
+    }
+
+    @Override public HAConnectionState getCurrentState() {
+        return currentState;
+    }
+
+    @Override public SocketChannel getSocketChannel() {
+        return socketChannel;
+    }
+
+    @Override public String getClientAddress() {
+        return clientAddress;
+    }
+
+    @Override public long getSlaveAckOffset() {
+        return slaveAckOffset;
+    }
+
+    @Override public long getTransferredByteInSecond() {
+        return flowMonitor.getTransferredByteInSecond();
+    }
+
+    @Override public long getTransferFromWhere() {
+        return this.writeSocketService.getNextTransferFromWhere();
+    }
+
+    class ReadSocketService extends ServiceThread {
+        private static final int READ_MAX_BUFFER_SIZE = 1024 * 1024;
+        private final Selector selector;
+        private final SocketChannel socketChannel;
+        private final ByteBuffer byteBufferRead = 
ByteBuffer.allocate(READ_MAX_BUFFER_SIZE);
+        private final AbstractHAReader haReader;
+        private int processPosition = 0;
+        private volatile long lastReadTimestamp = System.currentTimeMillis();
+
+        public ReadSocketService(final SocketChannel socketChannel) throws 
IOException {
+            this.selector = RemotingUtil.openSelector();
+            this.socketChannel = socketChannel;
+            this.socketChannel.register(this.selector, SelectionKey.OP_READ);
+            this.setDaemon(true);
+            haReader = new HAServerReader();
+            haReader.registerHook(readSize -> {
+                if (readSize > 0) {
+                    ReadSocketService.this.lastReadTimestamp =
+                        
haService.getDefaultMessageStore().getSystemClock().now();
+                }
+            });
+        }
+
+        @Override
+        public void run() {
+            LOGGER.info(this.getServiceName() + " service started");
+
+            while (!this.isStopped()) {
+                try {
+                    this.selector.select(1000);
+                    boolean ok = this.haReader.read(this.socketChannel, 
this.byteBufferRead);
+                    if (!ok) {
+                        AutoSwitchHAConnection.LOGGER.error("processReadEvent 
error");
+                        break;
+                    }
+
+                    long interval = 
haService.getDefaultMessageStore().getSystemClock().now() - 
this.lastReadTimestamp;
+                    if (interval > 
haService.getDefaultMessageStore().getMessageStoreConfig().getHaHousekeepingInterval())
 {
+                        LOGGER.warn("ha housekeeping, found this connection[" 
+ clientAddress + "] expired, " + interval);
+                        break;
+                    }
+                } catch (Exception e) {
+                    AutoSwitchHAConnection.LOGGER.error(this.getServiceName() 
+ " service has exception.", e);
+                    break;
+                }
+            }
+
+            this.makeStop();
+
+            changeCurrentState(HAConnectionState.SHUTDOWN);
+
+            writeSocketService.makeStop();
+
+            haService.removeConnection(AutoSwitchHAConnection.this);
+
+            haService.getConnectionCount().decrementAndGet();
+
+            SelectionKey sk = this.socketChannel.keyFor(this.selector);
+            if (sk != null) {
+                sk.cancel();
+            }
+
+            try {
+                this.selector.close();
+                this.socketChannel.close();
+            } catch (IOException e) {
+                AutoSwitchHAConnection.LOGGER.error("", e);
+            }
+
+            AutoSwitchHAConnection.LOGGER.info(this.getServiceName() + " 
service end");
+        }
+
+        @Override
+        public String getServiceName() {
+            return ReadSocketService.class.getSimpleName();
+        }
+
+        class HAServerReader extends AbstractHAReader {
+            @Override
+            protected boolean processReadResult(ByteBuffer byteBufferRead) {
+                while (true) {
+                    int diff = byteBufferRead.position() - 
ReadSocketService.this.processPosition;
+                    if (diff >= AutoSwitchHAClient.TRANSFER_HEADER_SIZE) {
+                        int readPosition = 
ReadSocketService.this.processPosition;
+                        HAConnectionState slaveState = 
HAConnectionState.values()[byteBufferRead.getInt(readPosition)];
+
+                        switch (slaveState) {
+                            case HANDSHAKE:
+                                isSlaveSendHandshake = true;
+                                ReadSocketService.this.processPosition += 
AutoSwitchHAClient.TRANSFER_HEADER_SIZE;
+                                break;
+                            case TRANSFER:
+                                long slaveMaxOffset = 
byteBufferRead.getLong(readPosition + 4);
+                                ReadSocketService.this.processPosition += 
AutoSwitchHAClient.TRANSFER_HEADER_SIZE;
+
+                                slaveAckOffset = slaveMaxOffset;
+                                if (slaveRequestOffset < 0) {
+                                    slaveRequestOffset = slaveMaxOffset;
+                                }
+                                LOGGER.info("slave[" + clientAddress + "] 
request offset " + slaveMaxOffset);
+                                
AutoSwitchHAConnection.this.haService.notifyTransferSome(AutoSwitchHAConnection.this.slaveAckOffset);
+                                break;
+                            default:
+                                LOGGER.error("Current state illegal {}", 
currentState);
+                                break;
+                        }
+
+                        if (!slaveState.equals(currentState)) {
+                            LOGGER.warn("Master change state from {} to {}", 
currentState, slaveState);
+                            changeCurrentState(slaveState);
+                        }
+                        continue;
+                    }
+
+                    if (!byteBufferRead.hasRemaining()) {
+                        
byteBufferRead.position(ReadSocketService.this.processPosition);
+                        byteBufferRead.compact();
+                        ReadSocketService.this.processPosition = 0;
+                    }
+                    break;
+                }
+
+                return true;
+            }
+        }
+    }
+
+    class WriteSocketService extends AbstractWriteSocketService {
+        private SelectMappedBufferResult selectMappedBufferResult;
+
+        public WriteSocketService(final SocketChannel socketChannel) throws 
IOException {
+            super(socketChannel);
+        }
+
+        @Override
+        protected int getNextTransferDataSize() {
+            SelectMappedBufferResult selectResult = 
haService.getDefaultMessageStore().getCommitLogData(this.nextTransferFromWhere);
+            if (selectResult == null || selectResult.getSize() <= 0) {
+                return 0;
+            }
+            this.selectMappedBufferResult = selectResult;
+            return selectResult.getSize();
+        }
+
+        @Override
+        protected void releaseData() {
+            this.selectMappedBufferResult.release();
+            this.selectMappedBufferResult = null;
+        }
+
+        @Override
+        protected boolean transferData(int maxTransferSize) {
+
+            if (null != this.selectMappedBufferResult && maxTransferSize >= 0) 
{
+                
this.selectMappedBufferResult.getByteBuffer().limit(maxTransferSize);
+            }
+
+            // Write Header
+            boolean result = haWriter.write(this.socketChannel, 
this.byteBufferHeader);
+
+            if (!result) {
+                return false;
+            }
+
+            if (null == this.selectMappedBufferResult) {
+                return true;
+            }
+
+            // Write Body
+            result = haWriter.write(this.socketChannel, 
this.selectMappedBufferResult.getByteBuffer());
+
+            if (result) {
+                releaseData();
+            }
+            return result;
+        }
+
+        @Override
+        protected void onStop() {
+            if (this.selectMappedBufferResult != null) {
+                this.selectMappedBufferResult.release();
+            }
+        }
+
+        @Override
+        public String getServiceName() {
+            if 
(haService.getDefaultMessageStore().getBrokerConfig().isInBrokerContainer()) {
+                return 
haService.getDefaultMessageStore().getBrokerConfig().getLoggerIdentifier() + 
WriteSocketService.class.getSimpleName();
+            }
+            return WriteSocketService.class.getSimpleName();
+        }
+    }
+
+    abstract class AbstractWriteSocketService extends ServiceThread {
+        protected final Selector selector;
+        protected final SocketChannel socketChannel;
+        protected final HAWriter haWriter;
+
+        protected final ByteBuffer byteBufferHeader = 
ByteBuffer.allocate(MSG_HEADER_SIZE);
+        // Store master epochFileCache: (Epoch + startOffset) * 1000
+        private final ByteBuffer handShakeBuffer = 
ByteBuffer.allocate(EPOCH_ENTRY_SIZE * 1000);
+        protected long nextTransferFromWhere = -1;
+        protected boolean lastWriteOver = true;
+        protected long lastWriteTimestamp = System.currentTimeMillis();
+        protected long lastPrintTimestamp = System.currentTimeMillis();
+        protected long transferOffset = 0;
+
+        public AbstractWriteSocketService(final SocketChannel socketChannel) 
throws IOException {
+            this.selector = RemotingUtil.openSelector();
+            this.socketChannel = socketChannel;
+            this.socketChannel.register(this.selector, SelectionKey.OP_WRITE);
+            this.setDaemon(true);
+            haWriter = new HAWriter();
+            haWriter.registerHook(writeSize -> {
+                flowMonitor.addByteCountTransferred(writeSize);
+                if (writeSize > 0) {
+                    AbstractWriteSocketService.this.lastWriteTimestamp =
+                        
haService.getDefaultMessageStore().getSystemClock().now();
+                }
+            });
+        }
+
+        public long getNextTransferFromWhere() {
+            return this.nextTransferFromWhere;
+        }
+
+        // Handle shake method
+        private boolean buildHandshakeBuffer() {
+            final List<EpochEntry> epochEntries = 
AutoSwitchHAConnection.this.epochCache.getAllEntries();
+            final int lastEpoch = 
AutoSwitchHAConnection.this.epochCache.lastEpoch();
+            final long maxPhyOffset = 
AutoSwitchHAConnection.this.haService.getDefaultMessageStore().getMaxPhyOffset();
+            this.byteBufferHeader.position(0);
+            this.byteBufferHeader.limit(MSG_HEADER_SIZE);
+            // State
+            this.byteBufferHeader.putInt(currentState.ordinal());
+            // Body size
+            this.byteBufferHeader.putInt(epochEntries.size() * 
EPOCH_ENTRY_SIZE);
+            // Offset
+            this.byteBufferHeader.putLong(0L);
+            // Epoch
+            this.byteBufferHeader.putInt(lastEpoch);
+            // Additional info(Flush position)
+            this.byteBufferHeader.putLong(maxPhyOffset);
+            this.byteBufferHeader.flip();
+
+            // EpochEntries
+            this.handShakeBuffer.position(0);
+            this.handShakeBuffer.limit(EPOCH_ENTRY_SIZE * epochEntries.size());
+            for (final EpochEntry entry : epochEntries) {
+                if (entry != null) {
+                    this.handShakeBuffer.putInt(entry.getEpoch());
+                    this.handShakeBuffer.putLong(entry.getStartOffset());
+                }
+            }
+            this.handShakeBuffer.flip();
+            LOGGER.info("Master build handshake header: maxEpoch:{}, 
maxOffset:{}, epochEntries:{}", lastEpoch, maxPhyOffset, epochEntries);
+            return true;
+        }
+
+        private boolean handshakeWithSlave() {
+            // Write Header
+            boolean result = this.haWriter.write(this.socketChannel, 
this.byteBufferHeader);
+
+            if (!result) {
+                return false;
+            }
+
+            // Write Body
+            return this.haWriter.write(this.socketChannel, 
this.handShakeBuffer);
+        }
+
+        // Normal transfer method
+        private void buildTransferHeaderBuffer(long nextOffset, int bodySize) {
+            // Build Header
+            this.byteBufferHeader.position(0);
+            this.byteBufferHeader.limit(MSG_HEADER_SIZE);
+            // State
+            this.byteBufferHeader.putInt(currentState.ordinal());
+            // Body size
+            this.byteBufferHeader.putInt(bodySize);
+            // Offset
+            this.byteBufferHeader.putLong(nextOffset);
+            // Epoch
+            
this.byteBufferHeader.putInt(AutoSwitchHAConnection.this.currentTransferEpoch);
+            // Additional info(confirm offset)
+            final long confirmOffset = 
AutoSwitchHAConnection.this.haService.getConfirmOffset();
+            this.byteBufferHeader.putLong(confirmOffset);
+            this.byteBufferHeader.flip();
+            LOGGER.info("Master send msg, state:{}, size:{}, offset:{}, 
epoch:{}, confirmOffset:{}",
+                currentState, bodySize, nextOffset, 
AutoSwitchHAConnection.this.currentTransferEpoch, confirmOffset);
+        }
+
+        private void transferToSlave() throws Exception {
+            if (this.lastWriteOver) {
+                long interval =
+                    haService.getDefaultMessageStore().getSystemClock().now() 
- this.lastWriteTimestamp;
+
+                if (interval > 
haService.getDefaultMessageStore().getMessageStoreConfig()
+                    .getHaSendHeartbeatInterval()) {
+
+                    buildTransferHeaderBuffer(this.nextTransferFromWhere, 0);
+
+                    this.lastWriteOver = this.transferData(0);
+                    if (!this.lastWriteOver) {
+                        return;
+                    }
+                }
+            } else {
+                // maxTransferSize == -1 means to continue transfer remaining 
data.
+                this.lastWriteOver = this.transferData(-1);
+                if (!this.lastWriteOver) {
+                    return;
+                }
+            }
+
+            int size = this.getNextTransferDataSize();
+            if (size > 0) {
+                if (size > 
haService.getDefaultMessageStore().getMessageStoreConfig().getHaTransferBatchSize())
 {
+                    size = 
haService.getDefaultMessageStore().getMessageStoreConfig().getHaTransferBatchSize();
+                }
+                int canTransferMaxBytes = flowMonitor.canTransferMaxByteNum();
+                if (size > canTransferMaxBytes) {
+                    if (System.currentTimeMillis() - lastPrintTimestamp > 
1000) {
+                        LOGGER.warn("Trigger HA flow control, max transfer 
speed {}KB/s, current speed: {}KB/s",
+                            String.format("%.2f", 
flowMonitor.maxTransferByteInSecond() / 1024.0),
+                            String.format("%.2f", 
flowMonitor.getTransferredByteInSecond() / 1024.0));
+                        lastPrintTimestamp = System.currentTimeMillis();
+                    }
+                    size = canTransferMaxBytes;
+                }
+                if (size <= 0) {
+                    this.releaseData();
+                    this.waitForRunning(100);
+                    return;
+                }
+
+                // We must ensure that the transmitted logs are within the 
same epoch
+                if (this.nextTransferFromWhere + size > 
AutoSwitchHAConnection.this.currentTransferEpochEndOffset) {
+                    final EpochEntry epochEntry = 
AutoSwitchHAConnection.this.epochCache.nextEntry(AutoSwitchHAConnection.this.currentTransferEpoch);
+                    if (epochEntry == null) {
+                        LOGGER.error("Can't find a bigger epochEntry than 
epoch {}", AutoSwitchHAConnection.this.currentTransferEpoch);
+                        waitForRunning(100);
+                        return;
+                    }
+                    size = (int) 
(AutoSwitchHAConnection.this.currentTransferEpochEndOffset - 
this.nextTransferFromWhere);
+                    AutoSwitchHAConnection.this.currentTransferEpoch = 
epochEntry.getEpoch();
+                    AutoSwitchHAConnection.this.currentTransferEpochEndOffset 
= epochEntry.getEndOffset();
+                }
+
+                this.transferOffset = this.nextTransferFromWhere;
+                this.nextTransferFromWhere += size;
+
+                // Build Header
+                buildTransferHeaderBuffer(this.transferOffset, size);
+
+                this.lastWriteOver = this.transferData(size);
+            } else {
+                haService.getWaitNotifyObject().allWaitForRunning(100);
+            }
+        }
+
+        @Override public void run() {
+            AutoSwitchHAConnection.LOGGER.info(this.getServiceName() + " 
service started");
+
+            while (!this.isStopped()) {
+                try {
+                    this.selector.select(1000);
+
+                    switch (currentState) {
+                        case HANDSHAKE:
+                            // Wait until the slave send it handshake msg to 
master.
+                            if (!isSlaveSendHandshake) {
+                                this.waitForRunning(10);
+                                continue;
+                            }
+
+                            if (this.lastWriteOver) {
+                                if (!buildHandshakeBuffer()) {
+                                    LOGGER.error("AutoSwitchHAConnection build 
handshake buffer failed");
+                                    this.waitForRunning(5000);
+                                    continue;
+                                }
+                            }
+
+                            this.lastWriteOver = handshakeWithSlave();
+                            if (this.lastWriteOver) {
+                                // change flag to {false} to wait for slave 
notification
+                                isSlaveSendHandshake = false;
+                            }
+                            break;
+                        case TRANSFER:
+                            if (-1 == slaveRequestOffset) {
+                                this.waitForRunning(10);
+                                continue;
+                            }
+
+                            if (-1 == this.nextTransferFromWhere) {
+                                if (0 == slaveRequestOffset) {
+                                    this.nextTransferFromWhere = 
haService.getDefaultMessageStore().getCommitLog().getMinOffset();
+                                } else {
+                                    this.nextTransferFromWhere = 
slaveRequestOffset;
+                                }
+                                // Setup initial transferEpoch
+                                EpochEntry epochEntry = 
AutoSwitchHAConnection.this.epochCache.findEpochEntryByOffset(this.nextTransferFromWhere);
+                                if (epochEntry == null) {
+                                    LOGGER.error("Failed to find an epochEntry 
to match slaveRequestOffset {}", this.nextTransferFromWhere);
+                                    waitForRunning(500);
+                                    break;
+                                }
+                                
AutoSwitchHAConnection.this.currentTransferEpoch = epochEntry.getEpoch();
+                                
AutoSwitchHAConnection.this.currentTransferEpochEndOffset = 
epochEntry.getEndOffset();
+                                if (epochEntry.getEpoch() == 
AutoSwitchHAConnection.this.epochCache.lastEpoch()) {
+                                    
AutoSwitchHAConnection.this.currentTransferEpochEndOffset = Long.MAX_VALUE;

Review Comment:
   Although it is almost impossible to exceed Long.MAX_VALUE, it would be to 
change it to - 1 for infinity.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to