RongtongJin commented on code in PR #4236:
URL: https://github.com/apache/rocketmq/pull/4236#discussion_r866426430


##########
store/src/main/java/org/apache/rocketmq/store/ha/autoswitch/AutoSwitchHAClient.java:
##########
@@ -0,0 +1,480 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.store.ha.autoswitch;
+
+import java.io.IOException;
+import java.net.SocketAddress;
+import java.nio.ByteBuffer;
+import java.nio.channels.ClosedChannelException;
+import java.nio.channels.SelectionKey;
+import java.nio.channels.Selector;
+import java.nio.channels.SocketChannel;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.atomic.AtomicReference;
+import org.apache.rocketmq.common.EpochEntry;
+import org.apache.rocketmq.common.ServiceThread;
+import org.apache.rocketmq.common.constant.LoggerName;
+import org.apache.rocketmq.logging.InternalLogger;
+import org.apache.rocketmq.logging.InternalLoggerFactory;
+import org.apache.rocketmq.remoting.common.RemotingUtil;
+import org.apache.rocketmq.store.DefaultMessageStore;
+import org.apache.rocketmq.store.ha.FlowMonitor;
+import org.apache.rocketmq.store.ha.HAClient;
+import org.apache.rocketmq.store.ha.HAConnectionState;
+import org.apache.rocketmq.store.ha.io.AbstractHAReader;
+import org.apache.rocketmq.store.ha.io.HAWriter;
+
+public class AutoSwitchHAClient extends ServiceThread implements HAClient {
+    /**
+     * Transfer header buffer size. Schema: state ordinal + maxOffset
+     */
+    public static final int TRANSFER_HEADER_SIZE = 4 + 8;
+    private static final InternalLogger LOGGER = 
InternalLoggerFactory.getLogger(LoggerName.STORE_LOGGER_NAME);
+    private static final int READ_MAX_BUFFER_SIZE = 1024 * 1024 * 4;
+    private final AtomicReference<String> masterHaAddress = new 
AtomicReference<>();
+    private final AtomicReference<String> masterAddress = new 
AtomicReference<>();
+    private final ByteBuffer transferHeaderBuffer = 
ByteBuffer.allocate(TRANSFER_HEADER_SIZE);
+    private final AutoSwitchHAService haService;
+    private final ByteBuffer byteBufferRead = 
ByteBuffer.allocate(READ_MAX_BUFFER_SIZE);
+    private final DefaultMessageStore messageStore;
+    private final EpochFileCache epochCache;
+
+    private SocketChannel socketChannel;
+    private Selector selector;
+    private AbstractHAReader haReader;
+    private HAWriter haWriter;
+    private FlowMonitor flowMonitor;
+    /**
+     * last time that slave reads date from master.
+     */
+    private long lastReadTimestamp;
+    /**
+     * last time that slave reports offset to master.
+     */
+    private long lastWriteTimestamp;
+
+    private long currentReportedOffset;
+    private int processPosition;
+    private volatile HAConnectionState currentState;
+    /**
+     * Current epoch
+     */
+    private volatile long currentReceivedEpoch;
+
+    /**
+     * Confirm offset = min(localMaxOffset, master confirm offset).
+     */
+    private volatile long confirmOffset;
+
+    public AutoSwitchHAClient(AutoSwitchHAService haService, 
DefaultMessageStore defaultMessageStore,
+        EpochFileCache epochCache) throws IOException {
+        this.haService = haService;
+        this.messageStore = defaultMessageStore;
+        this.epochCache = epochCache;
+        init();
+    }
+
+    public void init() throws IOException {
+        this.selector = RemotingUtil.openSelector();
+        this.flowMonitor = new 
FlowMonitor(this.messageStore.getMessageStoreConfig());
+        this.haReader = new HAClientReader();
+        haReader.registerHook(readSize -> {
+            if (readSize > 0) {
+                
AutoSwitchHAClient.this.flowMonitor.addByteCountTransferred(readSize);
+                lastReadTimestamp = System.currentTimeMillis();
+            }
+        });
+        this.haWriter = new HAWriter();
+        haWriter.registerHook(writeSize -> {
+            if (writeSize > 0) {
+                lastWriteTimestamp = System.currentTimeMillis();
+            }
+        });
+        changeCurrentState(HAConnectionState.READY);
+        this.currentReceivedEpoch = -1;
+        this.currentReportedOffset = 0;
+        this.processPosition = 0;
+        this.confirmOffset = -1;
+        this.lastReadTimestamp = System.currentTimeMillis();
+        this.lastWriteTimestamp = System.currentTimeMillis();
+    }
+
+    public void reOpen() throws IOException {
+        shutdown();
+        init();
+    }
+
+    @Override public String getServiceName() {
+        return AutoSwitchHAClient.class.getSimpleName();
+    }
+
+    @Override public void updateMasterAddress(String newAddress) {
+        String currentAddr = this.masterAddress.get();
+        if (masterAddress.compareAndSet(currentAddr, newAddress)) {
+            LOGGER.info("update master address, OLD: " + currentAddr + " NEW: 
" + newAddress);
+        }
+    }
+
+    @Override public void updateHaMasterAddress(String newAddress) {
+        String currentAddr = this.masterHaAddress.get();
+        if (masterHaAddress.compareAndSet(currentAddr, newAddress)) {
+            LOGGER.info("update master ha address, OLD: " + currentAddr + " 
NEW: " + newAddress);
+        }
+    }
+
+    @Override public String getMasterAddress() {
+        return this.masterAddress.get();
+    }
+
+    @Override public String getHaMasterAddress() {
+        return this.masterHaAddress.get();
+    }
+
+    @Override public long getLastReadTimestamp() {
+        return this.lastReadTimestamp;
+    }
+
+    @Override public long getLastWriteTimestamp() {
+        return this.lastWriteTimestamp;
+    }
+
+    @Override public HAConnectionState getCurrentState() {
+        return this.currentState;
+    }
+
+    @Override public void changeCurrentState(HAConnectionState 
haConnectionState) {
+        LOGGER.info("change state to {}", haConnectionState);
+        this.currentState = haConnectionState;
+    }
+
+    public void closeMasterAndWait() {
+        this.closeMaster();
+        this.waitForRunning(1000 * 5);
+    }
+
+    @Override public void closeMaster() {
+        if (null != this.socketChannel) {
+            try {
+                SelectionKey sk = this.socketChannel.keyFor(this.selector);
+                if (sk != null) {
+                    sk.cancel();
+                }
+
+                this.socketChannel.close();
+                this.socketChannel = null;
+
+                LOGGER.info("AutoSwitchHAClient close connection with master 
{}", this.masterHaAddress.get());
+                this.changeCurrentState(HAConnectionState.READY);
+            } catch (IOException e) {
+                LOGGER.warn("CloseMaster exception. ", e);
+            }
+
+            this.lastReadTimestamp = 0;
+            this.processPosition = 0;
+
+            this.byteBufferRead.position(0);
+            this.byteBufferRead.limit(READ_MAX_BUFFER_SIZE);
+        }
+    }
+
+    @Override public long getTransferredByteInSecond() {
+        return this.flowMonitor.getTransferredByteInSecond();
+    }
+
+    @Override public void shutdown() {
+        changeCurrentState(HAConnectionState.SHUTDOWN);
+        // Shutdown thread firstly
+        this.flowMonitor.shutdown();
+        super.shutdown();
+
+        closeMaster();
+        try {
+            this.selector.close();
+        } catch (IOException e) {
+            LOGGER.warn("Close the selector of AutoSwitchHAClient error, ", e);
+        }
+    }
+
+    private boolean isTimeToReportOffset() {
+        long interval = this.messageStore.now() - this.lastWriteTimestamp;
+        return interval > 
this.messageStore.getMessageStoreConfig().getHaSendHeartbeatInterval();
+    }
+
+    private boolean sendHandshakeHeader() {
+        this.transferHeaderBuffer.position(0);
+        this.transferHeaderBuffer.limit(TRANSFER_HEADER_SIZE);
+        
this.transferHeaderBuffer.putInt(HAConnectionState.HANDSHAKE.ordinal());
+        this.transferHeaderBuffer.putLong(0L);
+        this.transferHeaderBuffer.flip();
+        return this.haWriter.write(this.socketChannel, 
this.transferHeaderBuffer);
+    }
+
+    private void handshakeWithMaster() throws IOException {
+        sendHandshakeHeader();
+        boolean result = this.sendHandshakeHeader();
+        if (!result) {
+            closeMasterAndWait();
+        }
+
+        this.selector.select(5000);
+
+        result = this.haReader.read(this.socketChannel, this.byteBufferRead);
+        if (!result) {
+            closeMasterAndWait();
+            return;
+        }
+    }
+
+    private boolean reportSlaveOffset(final long offsetToReport) {
+        this.transferHeaderBuffer.position(0);
+        this.transferHeaderBuffer.limit(TRANSFER_HEADER_SIZE);
+        this.transferHeaderBuffer.putInt(this.currentState.ordinal());
+        this.transferHeaderBuffer.putLong(offsetToReport);
+        this.transferHeaderBuffer.flip();
+        return this.haWriter.write(this.socketChannel, 
this.transferHeaderBuffer);
+    }
+
+    private boolean reportSlaveMaxOffset() {
+        boolean result = true;
+        final long maxPhyOffset = this.messageStore.getMaxPhyOffset();
+        if (maxPhyOffset > this.currentReportedOffset) {
+            this.currentReportedOffset = maxPhyOffset;
+            result = reportSlaveOffset(this.currentReportedOffset);
+        }
+        return result;
+    }
+
+    public boolean connectMaster() throws ClosedChannelException {
+        if (null == this.socketChannel) {
+            String addr = this.masterHaAddress.get();
+            if (addr != null) {
+                SocketAddress socketAddress = 
RemotingUtil.string2SocketAddress(addr);
+                this.socketChannel = RemotingUtil.connect(socketAddress);
+                if (this.socketChannel != null) {
+                    this.socketChannel.register(this.selector, 
SelectionKey.OP_READ);
+                    LOGGER.info("AutoSwitchHAClient connect to master {}", 
addr);
+                    changeCurrentState(HAConnectionState.HANDSHAKE);
+                }
+            }
+            this.currentReportedOffset = this.messageStore.getMaxPhyOffset();
+            this.lastReadTimestamp = System.currentTimeMillis();
+        }
+        return this.socketChannel != null;
+    }
+
+    private boolean transferFromMaster() throws IOException {
+        boolean result;
+        if (isTimeToReportOffset()) {
+            LOGGER.info("Slave report current offset {}", 
this.currentReportedOffset);
+            result = reportSlaveOffset(this.currentReportedOffset);
+            if (!result) {
+                return false;
+            }
+        }
+
+        this.selector.select(1000);
+
+        result = this.haReader.read(this.socketChannel, this.byteBufferRead);
+        if (!result) {
+            return false;
+        }
+
+        return this.reportSlaveMaxOffset();
+    }
+
+    @Override public void run() {
+        LOGGER.info(this.getServiceName() + " service started");
+
+        this.flowMonitor.start();
+        while (!this.isStopped()) {
+            try {
+                switch (this.currentState) {
+                    case SHUTDOWN:
+                        return;
+                    case READY:
+                        // Truncate invalid msg first
+                        final long truncateOffset = 
AutoSwitchHAClient.this.haService.truncateInvalidMsg();
+                        if (truncateOffset >= 0) {
+                            
AutoSwitchHAClient.this.epochCache.truncateSuffixByOffset(truncateOffset);
+                        }
+                        if (!connectMaster()) {
+                            LOGGER.warn("AutoSwitchHAClient connect to master 
{} failed", this.masterHaAddress.get());
+                            waitForRunning(1000 * 5);
+                        }
+                        continue;
+                    case HANDSHAKE:
+                        handshakeWithMaster();
+                        continue;
+                    case TRANSFER:
+                        if (!transferFromMaster()) {
+                            closeMasterAndWait();
+                            continue;
+                        }
+                        break;
+                    case SUSPEND:
+                    default:
+                        waitForRunning(1000 * 5);
+                        continue;
+                }
+                long interval = this.messageStore.now() - 
this.lastReadTimestamp;
+                if (interval > 
this.messageStore.getMessageStoreConfig().getHaHousekeepingInterval()) {
+                    LOGGER.warn("AutoSwitchHAClient, housekeeping, found this 
connection[" + this.masterHaAddress
+                        + "] expired, " + interval);
+                    closeMaster();
+                    LOGGER.warn("AutoSwitchHAClient, master not response some 
time, so close connection");
+                }
+            } catch (Exception e) {
+                LOGGER.warn(this.getServiceName() + " service has exception. 
", e);
+                closeMasterAndWait();
+            }
+        }
+
+    }
+
+    /**
+     * Compare the master and slave's epoch file, find consistent point, do 
truncate.
+     */
+    private boolean doTruncate(List<EpochEntry> masterEpochEntries, long 
masterEndOffset) {
+        final EpochFileCache masterEpochCache = new EpochFileCache();
+        masterEpochCache.initCacheFromEntries(masterEpochEntries);
+        masterEpochCache.setLastEpochEntryEndOffset(masterEndOffset);
+        final EpochFileCache localEpochCache = new EpochFileCache();
+        localEpochCache.initCacheFromEntries(this.epochCache.getAllEntries());
+        
localEpochCache.setLastEpochEntryEndOffset(this.messageStore.getMaxPhyOffset());
+
+        final long truncateOffset = 
localEpochCache.findConsistentPoint(masterEpochCache);

Review Comment:
   1.如果因为某些原因(比如日志被删除),找不到主备间的一致的点,应该等待人工处理,而不是继续往下走。
   
2.如果slave是空,是不是可以直接不走截断流程,这里正确的原因主要是找不到主备间的一致的点currentReportedOffset=-1,然后再reportSlaveMaxOffset被修正成0.



##########
store/src/main/java/org/apache/rocketmq/store/ha/autoswitch/AutoSwitchHAConnection.java:
##########
@@ -0,0 +1,624 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.store.ha.autoswitch;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.channels.SelectionKey;
+import java.nio.channels.Selector;
+import java.nio.channels.SocketChannel;
+import java.util.List;
+import org.apache.rocketmq.common.EpochEntry;
+import org.apache.rocketmq.common.ServiceThread;
+import org.apache.rocketmq.common.constant.LoggerName;
+import org.apache.rocketmq.logging.InternalLogger;
+import org.apache.rocketmq.logging.InternalLoggerFactory;
+import org.apache.rocketmq.remoting.common.RemotingUtil;
+import org.apache.rocketmq.store.SelectMappedBufferResult;
+import org.apache.rocketmq.store.config.MessageStoreConfig;
+import org.apache.rocketmq.store.ha.FlowMonitor;
+import org.apache.rocketmq.store.ha.HAConnection;
+import org.apache.rocketmq.store.ha.HAConnectionState;
+import org.apache.rocketmq.store.ha.io.AbstractHAReader;
+import org.apache.rocketmq.store.ha.io.HAWriter;
+
+public class AutoSwitchHAConnection implements HAConnection {
+    /**
+     * Header protocol in syncing msg from master.
+     * Format: current state + body size + offset + epoch  + epochStartOffset 
+ additionalInfo(confirmOffset).
+     * If the msg is hankeShakeMsg, the body size = EpochEntrySize * 
EpochEntryNums, the offset is maxOffset in master.
+     */
+    public static final int MSG_HEADER_SIZE = 4 + 4 + 8 + 4 + 8 + 8;
+    public static final int EPOCH_ENTRY_SIZE = 12;
+    private static final InternalLogger LOGGER = 
InternalLoggerFactory.getLogger(LoggerName.STORE_LOGGER_NAME);
+    private final AutoSwitchHAService haService;
+    private final SocketChannel socketChannel;
+    private final String clientAddress;
+    private final EpochFileCache epochCache;
+    private final AbstractWriteSocketService writeSocketService;
+    private final ReadSocketService readSocketService;
+    private volatile HAConnectionState currentState = 
HAConnectionState.HANDSHAKE;
+    private volatile long slaveRequestOffset = -1;
+    private volatile long slaveAckOffset = -1;
+    /**
+     * Whether the slave have already sent a handshake message
+     */
+    private volatile boolean isSlaveSendHandshake = false;
+    private volatile int currentTransferEpoch = -1;
+    private volatile long currentTransferEpochEndOffset = 0;
+    private final FlowMonitor flowMonitor;
+
+    public AutoSwitchHAConnection(AutoSwitchHAService haService, SocketChannel 
socketChannel,
+        EpochFileCache epochCache) throws IOException {
+        this.haService = haService;
+        this.socketChannel = socketChannel;
+        this.epochCache = epochCache;
+        this.clientAddress = 
this.socketChannel.socket().getRemoteSocketAddress().toString();
+        this.socketChannel.configureBlocking(false);
+        this.socketChannel.socket().setSoLinger(false, -1);
+        this.socketChannel.socket().setTcpNoDelay(true);
+        this.socketChannel.socket().setReceiveBufferSize(1024 * 64);
+        this.socketChannel.socket().setSendBufferSize(1024 * 64);
+        this.writeSocketService = new WriteSocketService(this.socketChannel);
+        this.readSocketService = new ReadSocketService(this.socketChannel);
+        this.haService.getConnectionCount().incrementAndGet();
+        this.flowMonitor = new 
FlowMonitor(haService.getDefaultMessageStore().getMessageStoreConfig());
+    }
+
+    @Override public void start() {
+        changeCurrentState(HAConnectionState.HANDSHAKE);
+        this.flowMonitor.start();
+        this.readSocketService.start();
+        this.writeSocketService.start();
+    }
+
+    @Override public void shutdown() {
+        changeCurrentState(HAConnectionState.SHUTDOWN);
+        this.flowMonitor.shutdown(true);
+        this.writeSocketService.shutdown(true);
+        this.readSocketService.shutdown(true);
+        this.close();
+    }
+
+    @Override public void close() {
+        if (this.socketChannel != null) {
+            try {
+                this.socketChannel.close();
+            } catch (final IOException e) {
+                LOGGER.error("", e);
+            }
+        }
+    }
+
+    public void changeCurrentState(HAConnectionState connectionState) {
+        LOGGER.info("change state to {}", connectionState);
+        this.currentState = connectionState;
+    }
+
+    @Override public HAConnectionState getCurrentState() {
+        return currentState;
+    }
+
+    @Override public SocketChannel getSocketChannel() {
+        return socketChannel;
+    }
+
+    @Override public String getClientAddress() {
+        return clientAddress;
+    }
+
+    @Override public long getSlaveAckOffset() {
+        return slaveAckOffset;
+    }
+
+    @Override public long getTransferredByteInSecond() {
+        return flowMonitor.getTransferredByteInSecond();
+    }
+
+    @Override public long getTransferFromWhere() {
+        return this.writeSocketService.getNextTransferFromWhere();
+    }
+
+    private void changeTransferEpochToNext(final EpochEntry entry) {
+        this.currentTransferEpoch = entry.getEpoch();
+        this.currentTransferEpochEndOffset = entry.getEndOffset();
+        if (entry.getEpoch() == this.epochCache.lastEpoch()) {
+            // Use -1 to stand for Long.max
+            this.currentTransferEpochEndOffset = -1;
+        }
+    }
+
+    class ReadSocketService extends ServiceThread {
+        private static final int READ_MAX_BUFFER_SIZE = 1024 * 1024;
+        private final Selector selector;
+        private final SocketChannel socketChannel;
+        private final ByteBuffer byteBufferRead = 
ByteBuffer.allocate(READ_MAX_BUFFER_SIZE);
+        private final AbstractHAReader haReader;
+        private int processPosition = 0;
+        private volatile long lastReadTimestamp = System.currentTimeMillis();
+
+        public ReadSocketService(final SocketChannel socketChannel) throws 
IOException {
+            this.selector = RemotingUtil.openSelector();
+            this.socketChannel = socketChannel;
+            this.socketChannel.register(this.selector, SelectionKey.OP_READ);
+            this.setDaemon(true);
+            haReader = new HAServerReader();
+            haReader.registerHook(readSize -> {
+                if (readSize > 0) {
+                    ReadSocketService.this.lastReadTimestamp =
+                        
haService.getDefaultMessageStore().getSystemClock().now();
+                }
+            });
+        }
+
+        @Override
+        public void run() {
+            LOGGER.info(this.getServiceName() + " service started");
+
+            while (!this.isStopped()) {
+                try {
+                    this.selector.select(1000);
+                    boolean ok = this.haReader.read(this.socketChannel, 
this.byteBufferRead);
+                    if (!ok) {
+                        AutoSwitchHAConnection.LOGGER.error("processReadEvent 
error");
+                        break;
+                    }
+
+                    long interval = 
haService.getDefaultMessageStore().getSystemClock().now() - 
this.lastReadTimestamp;
+                    if (interval > 
haService.getDefaultMessageStore().getMessageStoreConfig().getHaHousekeepingInterval())
 {
+                        LOGGER.warn("ha housekeeping, found this connection[" 
+ clientAddress + "] expired, " + interval);
+                        break;
+                    }
+                } catch (Exception e) {
+                    AutoSwitchHAConnection.LOGGER.error(this.getServiceName() 
+ " service has exception.", e);
+                    break;
+                }
+            }
+
+            this.makeStop();
+
+            changeCurrentState(HAConnectionState.SHUTDOWN);
+
+            writeSocketService.makeStop();
+
+            haService.removeConnection(AutoSwitchHAConnection.this);
+
+            haService.getConnectionCount().decrementAndGet();
+
+            SelectionKey sk = this.socketChannel.keyFor(this.selector);
+            if (sk != null) {
+                sk.cancel();
+            }
+
+            try {
+                this.selector.close();
+                this.socketChannel.close();
+            } catch (IOException e) {
+                AutoSwitchHAConnection.LOGGER.error("", e);
+            }
+
+            AutoSwitchHAConnection.LOGGER.info(this.getServiceName() + " 
service end");
+        }
+
+        @Override
+        public String getServiceName() {
+            return ReadSocketService.class.getSimpleName();
+        }
+
+        class HAServerReader extends AbstractHAReader {
+            @Override
+            protected boolean processReadResult(ByteBuffer byteBufferRead) {
+                while (true) {
+                    int diff = byteBufferRead.position() - 
ReadSocketService.this.processPosition;
+                    if (diff >= AutoSwitchHAClient.TRANSFER_HEADER_SIZE) {
+                        int readPosition = 
ReadSocketService.this.processPosition;
+                        HAConnectionState slaveState = 
HAConnectionState.values()[byteBufferRead.getInt(readPosition)];
+
+                        switch (slaveState) {
+                            case HANDSHAKE:
+                                isSlaveSendHandshake = true;
+                                ReadSocketService.this.processPosition += 
AutoSwitchHAClient.TRANSFER_HEADER_SIZE;
+                                break;
+                            case TRANSFER:
+                                long slaveMaxOffset = 
byteBufferRead.getLong(readPosition + 4);
+                                ReadSocketService.this.processPosition += 
AutoSwitchHAClient.TRANSFER_HEADER_SIZE;
+
+                                slaveAckOffset = slaveMaxOffset;
+                                if (slaveRequestOffset < 0) {
+                                    slaveRequestOffset = slaveMaxOffset;
+                                }
+                                LOGGER.info("slave[" + clientAddress + "] 
request offset " + slaveMaxOffset);
+                                
AutoSwitchHAConnection.this.haService.notifyTransferSome(AutoSwitchHAConnection.this.slaveAckOffset);
+                                break;
+                            default:
+                                LOGGER.error("Current state illegal {}", 
currentState);
+                                break;
+                        }
+
+                        if (!slaveState.equals(currentState)) {
+                            LOGGER.warn("Master change state from {} to {}", 
currentState, slaveState);
+                            changeCurrentState(slaveState);
+                        }
+                        continue;
+                    }
+
+                    if (!byteBufferRead.hasRemaining()) {
+                        
byteBufferRead.position(ReadSocketService.this.processPosition);
+                        byteBufferRead.compact();
+                        ReadSocketService.this.processPosition = 0;
+                    }
+                    break;
+                }
+
+                return true;
+            }
+        }
+    }
+
+    class WriteSocketService extends AbstractWriteSocketService {
+        private SelectMappedBufferResult selectMappedBufferResult;
+
+        public WriteSocketService(final SocketChannel socketChannel) throws 
IOException {
+            super(socketChannel);
+        }
+
+        @Override
+        protected int getNextTransferDataSize() {
+            SelectMappedBufferResult selectResult = 
haService.getDefaultMessageStore().getCommitLogData(this.nextTransferFromWhere);
+            if (selectResult == null || selectResult.getSize() <= 0) {
+                return 0;
+            }
+            this.selectMappedBufferResult = selectResult;
+            return selectResult.getSize();
+        }
+
+        @Override
+        protected void releaseData() {
+            this.selectMappedBufferResult.release();
+            this.selectMappedBufferResult = null;
+        }
+
+        @Override
+        protected boolean transferData(int maxTransferSize) {
+
+            if (null != this.selectMappedBufferResult && maxTransferSize >= 0) 
{
+                
this.selectMappedBufferResult.getByteBuffer().limit(maxTransferSize);
+            }
+
+            // Write Header
+            boolean result = haWriter.write(this.socketChannel, 
this.byteBufferHeader);
+
+            if (!result) {
+                return false;
+            }
+
+            if (null == this.selectMappedBufferResult) {
+                return true;
+            }
+
+            // Write Body
+            result = haWriter.write(this.socketChannel, 
this.selectMappedBufferResult.getByteBuffer());
+
+            if (result) {
+                releaseData();
+            }
+            return result;
+        }
+
+        @Override
+        protected void onStop() {
+            if (this.selectMappedBufferResult != null) {
+                this.selectMappedBufferResult.release();
+            }
+        }
+
+        @Override
+        public String getServiceName() {
+            if 
(haService.getDefaultMessageStore().getBrokerConfig().isInBrokerContainer()) {
+                return 
haService.getDefaultMessageStore().getBrokerConfig().getLoggerIdentifier() + 
WriteSocketService.class.getSimpleName();
+            }
+            return WriteSocketService.class.getSimpleName();
+        }
+    }
+
+    abstract class AbstractWriteSocketService extends ServiceThread {
+        protected final Selector selector;
+        protected final SocketChannel socketChannel;
+        protected final HAWriter haWriter;
+
+        protected final ByteBuffer byteBufferHeader = 
ByteBuffer.allocate(MSG_HEADER_SIZE);
+        // Store master epochFileCache: (Epoch + startOffset) * 1000
+        private final ByteBuffer handShakeBuffer = 
ByteBuffer.allocate(EPOCH_ENTRY_SIZE * 1000);
+        protected long nextTransferFromWhere = -1;
+        protected boolean lastWriteOver = true;
+        protected long lastWriteTimestamp = System.currentTimeMillis();
+        protected long lastPrintTimestamp = System.currentTimeMillis();
+        protected long transferOffset = 0;
+
+        public AbstractWriteSocketService(final SocketChannel socketChannel) 
throws IOException {
+            this.selector = RemotingUtil.openSelector();
+            this.socketChannel = socketChannel;
+            this.socketChannel.register(this.selector, SelectionKey.OP_WRITE);
+            this.setDaemon(true);
+            haWriter = new HAWriter();
+            haWriter.registerHook(writeSize -> {
+                flowMonitor.addByteCountTransferred(writeSize);
+                if (writeSize > 0) {
+                    AbstractWriteSocketService.this.lastWriteTimestamp =
+                        
haService.getDefaultMessageStore().getSystemClock().now();
+                }
+            });
+        }
+
+        public long getNextTransferFromWhere() {
+            return this.nextTransferFromWhere;
+        }
+
+        // Handle shake method
+        private boolean buildHandshakeBuffer() {
+            final List<EpochEntry> epochEntries = 
AutoSwitchHAConnection.this.epochCache.getAllEntries();
+            final int lastEpoch = 
AutoSwitchHAConnection.this.epochCache.lastEpoch();
+            final long maxPhyOffset = 
AutoSwitchHAConnection.this.haService.getDefaultMessageStore().getMaxPhyOffset();
+            this.byteBufferHeader.position(0);
+            this.byteBufferHeader.limit(MSG_HEADER_SIZE);
+            // State
+            this.byteBufferHeader.putInt(currentState.ordinal());
+            // Body size
+            this.byteBufferHeader.putInt(epochEntries.size() * 
EPOCH_ENTRY_SIZE);
+            // Offset
+            this.byteBufferHeader.putLong(maxPhyOffset);
+            // Epoch
+            this.byteBufferHeader.putInt(lastEpoch);
+            // EpochStartOffset (not needed in handshake)
+            this.byteBufferHeader.putLong(0L);
+            // Additional info (not needed in handshake)
+            this.byteBufferHeader.putLong(0L);
+            this.byteBufferHeader.flip();
+
+            // EpochEntries
+            this.handShakeBuffer.position(0);
+            this.handShakeBuffer.limit(EPOCH_ENTRY_SIZE * epochEntries.size());
+            for (final EpochEntry entry : epochEntries) {
+                if (entry != null) {
+                    this.handShakeBuffer.putInt(entry.getEpoch());
+                    this.handShakeBuffer.putLong(entry.getStartOffset());
+                }
+            }
+            this.handShakeBuffer.flip();
+            LOGGER.info("Master build handshake header: maxEpoch:{}, 
maxOffset:{}, epochEntries:{}", lastEpoch, maxPhyOffset, epochEntries);
+            return true;
+        }
+
+        private boolean handshakeWithSlave() {
+            // Write Header
+            boolean result = this.haWriter.write(this.socketChannel, 
this.byteBufferHeader);
+
+            if (!result) {
+                return false;
+            }
+
+            // Write Body
+            return this.haWriter.write(this.socketChannel, 
this.handShakeBuffer);
+        }
+
+        // Normal transfer method
+        private void buildTransferHeaderBuffer(long nextOffset, int bodySize) {
+            final EpochEntry entry = 
AutoSwitchHAConnection.this.epochCache.getEntry(AutoSwitchHAConnection.this.currentTransferEpoch);
+            if (entry == null) {
+                LOGGER.error("Failed to find epochEntry with epoch {} when 
build msg header", AutoSwitchHAConnection.this.currentTransferEpoch);
+                return;
+            }
+            // Build Header
+            this.byteBufferHeader.position(0);
+            this.byteBufferHeader.limit(MSG_HEADER_SIZE);
+            // State
+            this.byteBufferHeader.putInt(currentState.ordinal());
+            // Body size
+            this.byteBufferHeader.putInt(bodySize);
+            // Offset
+            this.byteBufferHeader.putLong(nextOffset);
+            // Epoch
+            this.byteBufferHeader.putInt(entry.getEpoch());
+            // EpochStartOffset
+            this.byteBufferHeader.putLong(entry.getStartOffset());
+            // Additional info(confirm offset)
+            final long confirmOffset = 
AutoSwitchHAConnection.this.haService.getConfirmOffset();
+            this.byteBufferHeader.putLong(confirmOffset);
+            this.byteBufferHeader.flip();
+            LOGGER.info("Master send msg, state:{}, size:{}, offset:{}, 
epoch:{}, epochStartOffset:{}, confirmOffset:{}",
+                currentState, bodySize, nextOffset, entry.getEpoch(), 
entry.getStartOffset(), confirmOffset);
+        }
+
+        private void transferToSlave() throws Exception {
+            if (this.lastWriteOver) {
+                long interval =
+                    haService.getDefaultMessageStore().getSystemClock().now() 
- this.lastWriteTimestamp;
+
+                if (interval > 
haService.getDefaultMessageStore().getMessageStoreConfig()
+                    .getHaSendHeartbeatInterval()) {
+
+                    buildTransferHeaderBuffer(this.nextTransferFromWhere, 0);
+
+                    this.lastWriteOver = this.transferData(0);
+                    if (!this.lastWriteOver) {
+                        return;
+                    }
+                }
+            } else {
+                // maxTransferSize == -1 means to continue transfer remaining 
data.
+                this.lastWriteOver = this.transferData(-1);
+                if (!this.lastWriteOver) {
+                    return;
+                }
+            }
+
+            int size = this.getNextTransferDataSize();
+            if (size > 0) {
+                if (size > 
haService.getDefaultMessageStore().getMessageStoreConfig().getHaTransferBatchSize())
 {
+                    size = 
haService.getDefaultMessageStore().getMessageStoreConfig().getHaTransferBatchSize();
+                }
+                int canTransferMaxBytes = flowMonitor.canTransferMaxByteNum();
+                if (size > canTransferMaxBytes) {
+                    if (System.currentTimeMillis() - lastPrintTimestamp > 
1000) {
+                        LOGGER.warn("Trigger HA flow control, max transfer 
speed {}KB/s, current speed: {}KB/s",
+                            String.format("%.2f", 
flowMonitor.maxTransferByteInSecond() / 1024.0),
+                            String.format("%.2f", 
flowMonitor.getTransferredByteInSecond() / 1024.0));
+                        lastPrintTimestamp = System.currentTimeMillis();
+                    }
+                    size = canTransferMaxBytes;
+                }
+                if (size <= 0) {
+                    this.releaseData();
+                    this.waitForRunning(100);
+                    return;
+                }
+
+                // We must ensure that the transmitted logs are within the 
same epoch
+                // If currentEpochEndOffset == -1, means that 
currentTransferEpoch = last epoch, so the endOffset = Long.max
+                final long currentEpochEndOffset = 
AutoSwitchHAConnection.this.currentTransferEpochEndOffset;
+                if (currentEpochEndOffset != -1 && this.nextTransferFromWhere 
+ size > currentEpochEndOffset) {
+                    final EpochEntry epochEntry = 
AutoSwitchHAConnection.this.epochCache.nextEntry(AutoSwitchHAConnection.this.currentTransferEpoch);
+                    if (epochEntry == null) {
+                        LOGGER.error("Can't find a bigger epochEntry than 
epoch {}", AutoSwitchHAConnection.this.currentTransferEpoch);
+                        waitForRunning(100);
+                        return;
+                    }
+                    size = (int) (currentEpochEndOffset - 
this.nextTransferFromWhere);
+                    changeTransferEpochToNext(epochEntry);
+                }
+
+                this.transferOffset = this.nextTransferFromWhere;
+                this.nextTransferFromWhere += size;
+
+                // Build Header
+                buildTransferHeaderBuffer(this.transferOffset, size);
+
+                this.lastWriteOver = this.transferData(size);
+            } else {
+                haService.getWaitNotifyObject().allWaitForRunning(100);
+            }
+        }
+
+        @Override public void run() {
+            AutoSwitchHAConnection.LOGGER.info(this.getServiceName() + " 
service started");
+
+            while (!this.isStopped()) {
+                try {
+                    this.selector.select(1000);
+
+                    switch (currentState) {
+                        case HANDSHAKE:
+                            // Wait until the slave send it handshake msg to 
master.
+                            if (!isSlaveSendHandshake) {
+                                this.waitForRunning(10);
+                                continue;
+                            }
+
+                            if (this.lastWriteOver) {
+                                if (!buildHandshakeBuffer()) {
+                                    LOGGER.error("AutoSwitchHAConnection build 
handshake buffer failed");
+                                    this.waitForRunning(5000);
+                                    continue;
+                                }
+                            }
+
+                            this.lastWriteOver = handshakeWithSlave();
+                            if (this.lastWriteOver) {
+                                // change flag to {false} to wait for slave 
notification
+                                isSlaveSendHandshake = false;
+                            }
+                            break;
+                        case TRANSFER:
+                            if (-1 == slaveRequestOffset) {
+                                this.waitForRunning(10);
+                                continue;
+                            }
+
+                            if (-1 == this.nextTransferFromWhere) {
+                                if (0 == slaveRequestOffset) {
+                                    // We must ensure that the starting point 
of syncing log
+                                    // must be the startOffset of a file 
(maybe the last file, or the minOffset)
+                                    final MessageStoreConfig config = 
haService.getDefaultMessageStore().getMessageStoreConfig();
+                                    if (config.isSyncFromLastFile()) {

Review Comment:
   从语义上来说这个参数应该是在slave上生效的



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to