This is an automated email from the ASF dual-hosted git repository.

jinrongtong pushed a commit to branch 5.0.0-beta-dledger-controller
in repository https://gitbox.apache.org/repos/asf/rocketmq.git


The following commit(s) were added to refs/heads/5.0.0-beta-dledger-controller 
by this push:
     new 99d77df54 [Summer of code] Fix the isuue of frequent disconnection of 
ha connection (#4399)
99d77df54 is described below

commit 99d77df54f2e87be73cd33689292cc85f006d353
Author: hzh0425 <642256...@qq.com>
AuthorDate: Thu Jun 2 11:07:19 2022 +0800

    [Summer of code] Fix the isuue of frequent disconnection of ha connection 
(#4399)
    
    * fix bug
    
    * throw exception in haWriter
    
    * fix haconnection can't read msg.
    
    * set channel buf = 0
    
    * code review
    
    * code review
    
    * change client address to slave address
    
    * Revert "change client address to slave address"
    
    This reverts commit 198bcdb00e6828744514df76659404150c02011f.
---
 .../org/apache/rocketmq/broker/BrokerStartup.java  |  28 ++--
 .../container/BrokerContainerProcessor.java        |  30 ++--
 .../rocketmq/container/BrokerContainerStartup.java |  28 ++--
 .../rocketmq/store/ha/DefaultHAConnection.java     |   9 +-
 .../store/ha/autoswitch/AutoSwitchHAClient.java    | 176 +++++++++++----------
 .../ha/autoswitch/AutoSwitchHAConnection.java      |  94 +++++------
 .../store/ha/autoswitch/AutoSwitchHAService.java   |  14 +-
 .../org/apache/rocketmq/store/ha/io/HAWriter.java  |  30 ++--
 8 files changed, 215 insertions(+), 194 deletions(-)

diff --git a/broker/src/main/java/org/apache/rocketmq/broker/BrokerStartup.java 
b/broker/src/main/java/org/apache/rocketmq/broker/BrokerStartup.java
index ca388b6cf..f4a2fe0c3 100644
--- a/broker/src/main/java/org/apache/rocketmq/broker/BrokerStartup.java
+++ b/broker/src/main/java/org/apache/rocketmq/broker/BrokerStartup.java
@@ -154,20 +154,22 @@ public class BrokerStartup {
                 }
             }
 
-            switch (messageStoreConfig.getBrokerRole()) {
-                case ASYNC_MASTER:
-                case SYNC_MASTER:
-                    brokerConfig.setBrokerId(MixAll.MASTER_ID);
-                    break;
-                case SLAVE:
-                    if (brokerConfig.getBrokerId() <= 0) {
-                        System.out.printf("Slave's brokerId must be > 0");
-                        System.exit(-3);
-                    }
+            if (!brokerConfig.isEnableControllerMode()) {
+                switch (messageStoreConfig.getBrokerRole()) {
+                    case ASYNC_MASTER:
+                    case SYNC_MASTER:
+                        brokerConfig.setBrokerId(MixAll.MASTER_ID);
+                        break;
+                    case SLAVE:
+                        if (brokerConfig.getBrokerId() <= 0) {
+                            System.out.printf("Slave's brokerId must be > 0");
+                            System.exit(-3);
+                        }
 
-                    break;
-                default:
-                    break;
+                        break;
+                    default:
+                        break;
+                }
             }
 
             if (messageStoreConfig.isEnableDLegerCommitLog()) {
diff --git 
a/container/src/main/java/org/apache/rocketmq/container/BrokerContainerProcessor.java
 
b/container/src/main/java/org/apache/rocketmq/container/BrokerContainerProcessor.java
index 2512a3307..51cd427dc 100644
--- 
a/container/src/main/java/org/apache/rocketmq/container/BrokerContainerProcessor.java
+++ 
b/container/src/main/java/org/apache/rocketmq/container/BrokerContainerProcessor.java
@@ -117,21 +117,23 @@ public class BrokerContainerProcessor implements 
NettyRequestProcessor {
         }
 
         if (!messageStoreConfig.isEnableDLegerCommitLog()) {
-            switch (messageStoreConfig.getBrokerRole()) {
-                case ASYNC_MASTER:
-                case SYNC_MASTER:
-                    brokerConfig.setBrokerId(MixAll.MASTER_ID);
-                    break;
-                case SLAVE:
-                    if (brokerConfig.getBrokerId() <= 0) {
-                        response.setCode(ResponseCode.SYSTEM_ERROR);
-                        response.setRemark("slave broker id must be > 0");
-                        return response;
-                    }
-                    break;
-                default:
-                    break;
+            if (!brokerConfig.isEnableControllerMode()) {
+                switch (messageStoreConfig.getBrokerRole()) {
+                    case ASYNC_MASTER:
+                    case SYNC_MASTER:
+                        brokerConfig.setBrokerId(MixAll.MASTER_ID);
+                        break;
+                    case SLAVE:
+                        if (brokerConfig.getBrokerId() <= 0) {
+                            response.setCode(ResponseCode.SYSTEM_ERROR);
+                            response.setRemark("slave broker id must be > 0");
+                            return response;
+                        }
+                        break;
+                    default:
+                        break;
 
+                }
             }
 
             if (messageStoreConfig.getTotalReplicas() < 
messageStoreConfig.getInSyncReplicas()
diff --git 
a/container/src/main/java/org/apache/rocketmq/container/BrokerContainerStartup.java
 
b/container/src/main/java/org/apache/rocketmq/container/BrokerContainerStartup.java
index 94066af8a..d4e94a698 100644
--- 
a/container/src/main/java/org/apache/rocketmq/container/BrokerContainerStartup.java
+++ 
b/container/src/main/java/org/apache/rocketmq/container/BrokerContainerStartup.java
@@ -149,20 +149,22 @@ public class BrokerContainerStartup {
 
         
MixAll.properties2Object(ServerUtil.commandLine2Properties(commandLine), 
brokerConfig);
 
-        switch (messageStoreConfig.getBrokerRole()) {
-            case ASYNC_MASTER:
-            case SYNC_MASTER:
-                brokerConfig.setBrokerId(MixAll.MASTER_ID);
-                break;
-            case SLAVE:
-                if (brokerConfig.getBrokerId() <= 0) {
-                    System.out.printf("Slave's brokerId must be > 0%n");
-                    System.exit(-3);
-                }
+        if (!brokerConfig.isEnableControllerMode()) {
+            switch (messageStoreConfig.getBrokerRole()) {
+                case ASYNC_MASTER:
+                case SYNC_MASTER:
+                    brokerConfig.setBrokerId(MixAll.MASTER_ID);
+                    break;
+                case SLAVE:
+                    if (brokerConfig.getBrokerId() <= 0) {
+                        System.out.printf("Slave's brokerId must be > 0%n");
+                        System.exit(-3);
+                    }
 
-                break;
-            default:
-                break;
+                    break;
+                default:
+                    break;
+            }
         }
 
         if (messageStoreConfig.getTotalReplicas() < 
messageStoreConfig.getInSyncReplicas()
diff --git 
a/store/src/main/java/org/apache/rocketmq/store/ha/DefaultHAConnection.java 
b/store/src/main/java/org/apache/rocketmq/store/ha/DefaultHAConnection.java
index d99099844..aa5da1eb4 100644
--- a/store/src/main/java/org/apache/rocketmq/store/ha/DefaultHAConnection.java
+++ b/store/src/main/java/org/apache/rocketmq/store/ha/DefaultHAConnection.java
@@ -27,6 +27,7 @@ import org.apache.rocketmq.common.constant.LoggerName;
 import org.apache.rocketmq.logging.InternalLogger;
 import org.apache.rocketmq.logging.InternalLoggerFactory;
 import org.apache.rocketmq.remoting.common.RemotingUtil;
+import org.apache.rocketmq.remoting.netty.NettySystemConfig;
 import org.apache.rocketmq.store.SelectMappedBufferResult;
 
 public class DefaultHAConnection implements HAConnection {
@@ -48,8 +49,12 @@ public class DefaultHAConnection implements HAConnection {
         this.socketChannel.configureBlocking(false);
         this.socketChannel.socket().setSoLinger(false, -1);
         this.socketChannel.socket().setTcpNoDelay(true);
-        this.socketChannel.socket().setReceiveBufferSize(1024 * 64);
-        this.socketChannel.socket().setSendBufferSize(1024 * 64);
+        if (NettySystemConfig.socketSndbufSize > 0) {
+            
this.socketChannel.socket().setReceiveBufferSize(NettySystemConfig.socketSndbufSize);
+        }
+        if (NettySystemConfig.socketRcvbufSize > 0) {
+            
this.socketChannel.socket().setSendBufferSize(NettySystemConfig.socketRcvbufSize);
+        }
         this.writeSocketService = new WriteSocketService(this.socketChannel);
         this.readSocketService = new ReadSocketService(this.socketChannel);
         this.haService.getConnectionCount().incrementAndGet();
diff --git 
a/store/src/main/java/org/apache/rocketmq/store/ha/autoswitch/AutoSwitchHAClient.java
 
b/store/src/main/java/org/apache/rocketmq/store/ha/autoswitch/AutoSwitchHAClient.java
index 01272d47a..997deb7e8 100644
--- 
a/store/src/main/java/org/apache/rocketmq/store/ha/autoswitch/AutoSwitchHAClient.java
+++ 
b/store/src/main/java/org/apache/rocketmq/store/ha/autoswitch/AutoSwitchHAClient.java
@@ -20,7 +20,6 @@ package org.apache.rocketmq.store.ha.autoswitch;
 import java.io.IOException;
 import java.net.SocketAddress;
 import java.nio.ByteBuffer;
-import java.nio.channels.ClosedChannelException;
 import java.nio.channels.SelectionKey;
 import java.nio.channels.Selector;
 import java.nio.channels.SocketChannel;
@@ -58,6 +57,7 @@ public class AutoSwitchHAClient extends ServiceThread 
implements HAClient {
      * Transfer header buffer size. Schema: state ordinal + maxOffset.
      */
     public static final int TRANSFER_HEADER_SIZE = 4 + 8;
+    public static final int MIN_HEADER_SIZE = Math.min(HANDSHAKE_HEADER_SIZE, 
TRANSFER_HEADER_SIZE);
     private static final InternalLogger LOGGER = 
InternalLoggerFactory.getLogger(LoggerName.STORE_LOGGER_NAME);
     private static final int READ_MAX_BUFFER_SIZE = 1024 * 1024 * 4;
     private final AtomicReference<String> masterHaAddress = new 
AtomicReference<>();
@@ -247,7 +247,7 @@ public class AutoSwitchHAClient extends ServiceThread 
implements HAClient {
         return interval > 
this.messageStore.getMessageStoreConfig().getHaSendHeartbeatInterval();
     }
 
-    private boolean sendHandshakeHeader() {
+    private boolean sendHandshakeHeader() throws IOException {
         this.handshakeHeaderBuffer.position(0);
         this.handshakeHeaderBuffer.limit(HANDSHAKE_SIZE);
         // Original state
@@ -278,11 +278,10 @@ public class AutoSwitchHAClient extends ServiceThread 
implements HAClient {
         result = this.haReader.read(this.socketChannel, this.byteBufferRead);
         if (!result) {
             closeMasterAndWait();
-            return;
         }
     }
 
-    private boolean reportSlaveOffset(final long offsetToReport) {
+    private boolean reportSlaveOffset(final long offsetToReport) throws 
IOException {
         this.transferHeaderBuffer.position(0);
         this.transferHeaderBuffer.limit(TRANSFER_HEADER_SIZE);
         this.transferHeaderBuffer.putInt(this.currentState.ordinal());
@@ -291,7 +290,7 @@ public class AutoSwitchHAClient extends ServiceThread 
implements HAClient {
         return this.haWriter.write(this.socketChannel, 
this.transferHeaderBuffer);
     }
 
-    private boolean reportSlaveMaxOffset() {
+    private boolean reportSlaveMaxOffset() throws IOException {
         boolean result = true;
         final long maxPhyOffset = this.messageStore.getMaxPhyOffset();
         if (maxPhyOffset > this.currentReportedOffset) {
@@ -301,7 +300,7 @@ public class AutoSwitchHAClient extends ServiceThread 
implements HAClient {
         return result;
     }
 
-    public boolean connectMaster() throws ClosedChannelException {
+    public boolean connectMaster() throws IOException {
         if (null == this.socketChannel) {
             String addr = this.masterHaAddress.get();
             if (StringUtils.isNotEmpty(addr)) {
@@ -391,7 +390,7 @@ public class AutoSwitchHAClient extends ServiceThread 
implements HAClient {
     /**
      * Compare the master and slave's epoch file, find consistent point, do 
truncate.
      */
-    private boolean doTruncate(List<EpochEntry> masterEpochEntries, long 
masterEndOffset) {
+    private boolean doTruncate(List<EpochEntry> masterEpochEntries, long 
masterEndOffset) throws IOException {
         if (this.epochCache.getEntrySize() == 0) {
             // If epochMap is empty, means the broker is a new replicas
             LOGGER.info("Slave local epochCache is empty, skip truncate log");
@@ -433,94 +432,97 @@ public class AutoSwitchHAClient extends ServiceThread 
implements HAClient {
         @Override
         protected boolean processReadResult(ByteBuffer byteBufferRead) {
             int readSocketPos = byteBufferRead.position();
+            try {
+                while (true) {
+                    int diff = byteBufferRead.position() - 
AutoSwitchHAClient.this.processPosition;
+                    if (diff >= AutoSwitchHAConnection.MSG_HEADER_SIZE) {
+                        int processPosition = 
AutoSwitchHAClient.this.processPosition;
+                        int masterState = 
byteBufferRead.getInt(processPosition + AutoSwitchHAConnection.MSG_HEADER_SIZE 
- 36);
+                        int bodySize = byteBufferRead.getInt(processPosition + 
AutoSwitchHAConnection.MSG_HEADER_SIZE - 32);
+                        long masterOffset = 
byteBufferRead.getLong(processPosition + AutoSwitchHAConnection.MSG_HEADER_SIZE 
- 28);
+                        int masterEpoch = 
byteBufferRead.getInt(processPosition + AutoSwitchHAConnection.MSG_HEADER_SIZE 
- 20);
+                        long masterEpochStartOffset = 
byteBufferRead.getLong(processPosition + AutoSwitchHAConnection.MSG_HEADER_SIZE 
- 16);
+                        long confirmOffset = 
byteBufferRead.getLong(processPosition + AutoSwitchHAConnection.MSG_HEADER_SIZE 
- 8);
+
+                        if (masterState != 
AutoSwitchHAClient.this.currentState.ordinal()) {
+                            AutoSwitchHAClient.this.processPosition += 
AutoSwitchHAConnection.MSG_HEADER_SIZE + bodySize;
+                            AutoSwitchHAClient.this.waitForRunning(1);
+                            LOGGER.error("State not matched, masterState:{}, 
slaveState:{}, bodySize:{}, offset:{}, masterEpoch:{}, 
masterEpochStartOffset:{}, confirmOffset:{}",
+                                masterState, 
AutoSwitchHAClient.this.currentState, bodySize, masterOffset, masterEpoch, 
masterEpochStartOffset, confirmOffset);
+                            return true;
+                        }
 
-            while (true) {
-                int diff = byteBufferRead.position() - 
AutoSwitchHAClient.this.processPosition;
-                if (diff >= AutoSwitchHAConnection.MSG_HEADER_SIZE) {
-                    int processPosition = 
AutoSwitchHAClient.this.processPosition;
-                    int masterState = byteBufferRead.getInt(processPosition + 
AutoSwitchHAConnection.MSG_HEADER_SIZE - 36);
-                    int bodySize = byteBufferRead.getInt(processPosition + 
AutoSwitchHAConnection.MSG_HEADER_SIZE - 32);
-                    long masterOffset = byteBufferRead.getLong(processPosition 
+ AutoSwitchHAConnection.MSG_HEADER_SIZE - 28);
-                    int masterEpoch = byteBufferRead.getInt(processPosition + 
AutoSwitchHAConnection.MSG_HEADER_SIZE - 20);
-                    long masterEpochStartOffset = 
byteBufferRead.getLong(processPosition + AutoSwitchHAConnection.MSG_HEADER_SIZE 
- 16);
-                    long confirmOffset = 
byteBufferRead.getLong(processPosition + AutoSwitchHAConnection.MSG_HEADER_SIZE 
- 8);
-
-                    if (masterState != 
AutoSwitchHAClient.this.currentState.ordinal()) {
-                        AutoSwitchHAClient.this.processPosition += 
AutoSwitchHAConnection.MSG_HEADER_SIZE + bodySize;
-                        AutoSwitchHAClient.this.waitForRunning(1);
-                        LOGGER.error("State not matched, masterState:{}, 
slaveState:{}, bodySize:{}, offset:{}, masterEpoch:{}, 
masterEpochStartOffset:{}, confirmOffset:{}",
-                            masterState, AutoSwitchHAClient.this.currentState, 
bodySize, masterOffset, masterEpoch, masterEpochStartOffset, confirmOffset);
-                        return true;
-                    }
+                        if (diff >= (AutoSwitchHAConnection.MSG_HEADER_SIZE + 
bodySize)) {
+                            switch (AutoSwitchHAClient.this.currentState) {
+                                case HANDSHAKE:
+                                    AutoSwitchHAClient.this.processPosition += 
AutoSwitchHAConnection.MSG_HEADER_SIZE;
+                                    // Truncate log
+                                    int entrySize = 
AutoSwitchHAConnection.EPOCH_ENTRY_SIZE;
+                                    final int entryNums = bodySize / entrySize;
+                                    final ArrayList<EpochEntry> epochEntries = 
new ArrayList<>(entryNums);
+                                    for (int i = 0; i < entryNums; i++) {
+                                        int epoch = 
byteBufferRead.getInt(AutoSwitchHAClient.this.processPosition + i * entrySize);
+                                        long startOffset = 
byteBufferRead.getLong(AutoSwitchHAClient.this.processPosition + i * entrySize 
+ 4);
+                                        epochEntries.add(new EpochEntry(epoch, 
startOffset));
+                                    }
+                                    byteBufferRead.position(readSocketPos);
+                                    AutoSwitchHAClient.this.processPosition += 
bodySize;
+                                    LOGGER.info("Receive handshake, 
masterMaxPosition {}, masterEpochEntries:{}, try truncate log", masterOffset, 
epochEntries);
+                                    if (!doTruncate(epochEntries, 
masterOffset)) {
+                                        waitForRunning(1000 * 2);
+                                        LOGGER.error("AutoSwitchHAClient 
truncate log failed in handshake state");
+                                        return false;
+                                    }
+                                    break;
+                                case TRANSFER:
+                                    byte[] bodyData = new byte[bodySize];
+                                    
byteBufferRead.position(AutoSwitchHAClient.this.processPosition + 
AutoSwitchHAConnection.MSG_HEADER_SIZE);
+                                    byteBufferRead.get(bodyData);
+                                    byteBufferRead.position(readSocketPos);
+                                    AutoSwitchHAClient.this.processPosition += 
AutoSwitchHAConnection.MSG_HEADER_SIZE + bodySize;
+
+                                    long slavePhyOffset = 
AutoSwitchHAClient.this.messageStore.getMaxPhyOffset();
+                                    if (slavePhyOffset != 0) {
+                                        if (slavePhyOffset != masterOffset) {
+                                            LOGGER.error("master pushed offset 
not equal the max phy offset in slave, SLAVE: "
+                                                + slavePhyOffset + " MASTER: " 
+ masterOffset);
+                                            return false;
+                                        }
+                                    }
+
+                                    // If epoch changed
+                                    if (masterEpoch != 
AutoSwitchHAClient.this.currentReceivedEpoch) {
+                                        
AutoSwitchHAClient.this.currentReceivedEpoch = masterEpoch;
+                                        
AutoSwitchHAClient.this.epochCache.appendEntry(new EpochEntry(masterEpoch, 
masterEpochStartOffset));
+                                    }
+                                    AutoSwitchHAClient.this.confirmOffset = 
Math.min(confirmOffset, messageStore.getMaxPhyOffset());
 
-                    if (diff >= (AutoSwitchHAConnection.MSG_HEADER_SIZE + 
bodySize)) {
-                        switch (AutoSwitchHAClient.this.currentState) {
-                            case HANDSHAKE:
-                                AutoSwitchHAClient.this.processPosition += 
AutoSwitchHAConnection.MSG_HEADER_SIZE;
-                                // Truncate log
-                                int entrySize = 
AutoSwitchHAConnection.EPOCH_ENTRY_SIZE;
-                                final int entryNums = bodySize / entrySize;
-                                final ArrayList<EpochEntry> epochEntries = new 
ArrayList<>(entryNums);
-                                for (int i = 0; i < entryNums; i++) {
-                                    int epoch = 
byteBufferRead.getInt(AutoSwitchHAClient.this.processPosition + i * entrySize);
-                                    long startOffset = 
byteBufferRead.getLong(AutoSwitchHAClient.this.processPosition + i * entrySize 
+ 4);
-                                    epochEntries.add(new EpochEntry(epoch, 
startOffset));
-                                }
-                                byteBufferRead.position(readSocketPos);
-                                AutoSwitchHAClient.this.processPosition += 
bodySize;
-                                LOGGER.info("Receive handshake, 
masterMaxPosition {}, masterEpochEntries:{}, try truncate log", masterOffset, 
epochEntries);
-                                if (!doTruncate(epochEntries, masterOffset)) {
-                                    waitForRunning(1000 * 2);
-                                    LOGGER.error("AutoSwitchHAClient truncate 
log failed in handshake state");
-                                    return false;
-                                }
-                                break;
-                            case TRANSFER:
-                                byte[] bodyData = new byte[bodySize];
-                                
byteBufferRead.position(AutoSwitchHAClient.this.processPosition + 
AutoSwitchHAConnection.MSG_HEADER_SIZE);
-                                byteBufferRead.get(bodyData);
-                                byteBufferRead.position(readSocketPos);
-                                AutoSwitchHAClient.this.processPosition += 
AutoSwitchHAConnection.MSG_HEADER_SIZE + bodySize;
-
-                                long slavePhyOffset = 
AutoSwitchHAClient.this.messageStore.getMaxPhyOffset();
-                                if (slavePhyOffset != 0) {
-                                    if (slavePhyOffset != masterOffset) {
-                                        LOGGER.error("master pushed offset not 
equal the max phy offset in slave, SLAVE: "
-                                            + slavePhyOffset + " MASTER: " + 
masterOffset);
+                                    if (bodySize > 0) {
+                                        
AutoSwitchHAClient.this.messageStore.appendToCommitLog(masterOffset, bodyData, 
0, bodyData.length);
+                                    }
+
+                                    if (!reportSlaveMaxOffset()) {
+                                        LOGGER.error("AutoSwitchHAClient 
report max offset to master failed");
                                         return false;
                                     }
-                                }
-
-                                // If epoch changed
-                                if (masterEpoch != 
AutoSwitchHAClient.this.currentReceivedEpoch) {
-                                    
AutoSwitchHAClient.this.currentReceivedEpoch = masterEpoch;
-                                    
AutoSwitchHAClient.this.epochCache.appendEntry(new EpochEntry(masterEpoch, 
masterEpochStartOffset));
-                                }
-                                AutoSwitchHAClient.this.confirmOffset = 
Math.min(confirmOffset, messageStore.getMaxPhyOffset());
-
-                                if (bodySize > 0) {
-                                    
AutoSwitchHAClient.this.messageStore.appendToCommitLog(masterOffset, bodyData, 
0, bodyData.length);
-                                }
-
-                                if (!reportSlaveMaxOffset()) {
-                                    LOGGER.error("AutoSwitchHAClient report 
max offset to master failed");
-                                    return false;
-                                }
-                                break;
-                            default:
-                                break;
+                                    break;
+                                default:
+                                    break;
+                            }
+                            continue;
                         }
-                        continue;
                     }
-                }
 
-                if (!byteBufferRead.hasRemaining()) {
-                    
byteBufferRead.position(AutoSwitchHAClient.this.processPosition);
-                    byteBufferRead.compact();
-                    AutoSwitchHAClient.this.processPosition = 0;
-                }
+                    if (!byteBufferRead.hasRemaining()) {
+                        
byteBufferRead.position(AutoSwitchHAClient.this.processPosition);
+                        byteBufferRead.compact();
+                        AutoSwitchHAClient.this.processPosition = 0;
+                    }
 
-                break;
+                    break;
+                }
+            } catch (final Exception e) {
+                LOGGER.error("Error when ha client process read request", e);
             }
             return true;
         }
diff --git 
a/store/src/main/java/org/apache/rocketmq/store/ha/autoswitch/AutoSwitchHAConnection.java
 
b/store/src/main/java/org/apache/rocketmq/store/ha/autoswitch/AutoSwitchHAConnection.java
index a253b4cb1..d8a26e20e 100644
--- 
a/store/src/main/java/org/apache/rocketmq/store/ha/autoswitch/AutoSwitchHAConnection.java
+++ 
b/store/src/main/java/org/apache/rocketmq/store/ha/autoswitch/AutoSwitchHAConnection.java
@@ -29,6 +29,7 @@ import org.apache.rocketmq.common.constant.LoggerName;
 import org.apache.rocketmq.logging.InternalLogger;
 import org.apache.rocketmq.logging.InternalLoggerFactory;
 import org.apache.rocketmq.remoting.common.RemotingUtil;
+import org.apache.rocketmq.remoting.netty.NettySystemConfig;
 import org.apache.rocketmq.store.SelectMappedBufferResult;
 import org.apache.rocketmq.store.config.MessageStoreConfig;
 import org.apache.rocketmq.store.ha.FlowMonitor;
@@ -90,8 +91,12 @@ public class AutoSwitchHAConnection implements HAConnection {
         this.socketChannel.configureBlocking(false);
         this.socketChannel.socket().setSoLinger(false, -1);
         this.socketChannel.socket().setTcpNoDelay(true);
-        this.socketChannel.socket().setReceiveBufferSize(1024 * 64);
-        this.socketChannel.socket().setSendBufferSize(1024 * 64);
+        if (NettySystemConfig.socketSndbufSize > 0) {
+            
this.socketChannel.socket().setReceiveBufferSize(NettySystemConfig.socketSndbufSize);
+        }
+        if (NettySystemConfig.socketRcvbufSize > 0) {
+            
this.socketChannel.socket().setSendBufferSize(NettySystemConfig.socketRcvbufSize);
+        }
         this.writeSocketService = new WriteSocketService(this.socketChannel);
         this.readSocketService = new ReadSocketService(this.socketChannel);
         this.haService.getConnectionCount().incrementAndGet();
@@ -277,58 +282,55 @@ public class AutoSwitchHAConnection implements 
HAConnection {
             @Override
             protected boolean processReadResult(ByteBuffer byteBufferRead) {
                 while (true) {
+                    boolean processSuccess = true;
                     int readSocketPos = byteBufferRead.position();
                     int diff = byteBufferRead.position() - 
ReadSocketService.this.processPosition;
-                    if (diff >= 4) {
+                    if (diff >= AutoSwitchHAClient.MIN_HEADER_SIZE) {
                         int readPosition = 
ReadSocketService.this.processPosition;
                         HAConnectionState slaveState = 
HAConnectionState.values()[byteBufferRead.getInt(readPosition)];
 
                         switch (slaveState) {
                             case HANDSHAKE:
-                                if (diff >= 
AutoSwitchHAClient.HANDSHAKE_HEADER_SIZE) {
-                                    // AddressLength
-                                    int addressLength = 
byteBufferRead.getInt(readPosition + AutoSwitchHAClient.HANDSHAKE_HEADER_SIZE - 
4);
-                                    if (diff < 
AutoSwitchHAClient.HANDSHAKE_HEADER_SIZE + addressLength) {
-                                        break;
-                                    }
-                                    // Flag(isSyncFromLastFile)
-                                    short syncFromLastFileFlag = 
byteBufferRead.getShort(readPosition + AutoSwitchHAClient.HANDSHAKE_HEADER_SIZE 
- 8);
-                                    if (syncFromLastFileFlag == 1) {
-                                        
AutoSwitchHAConnection.this.isSyncFromLastFile = true;
-                                    }
-                                    // Flag(isAsyncLearner role)
-                                    short isAsyncLearner = 
byteBufferRead.getShort(readPosition + AutoSwitchHAClient.HANDSHAKE_HEADER_SIZE 
- 6);
-                                    if (isAsyncLearner == 1) {
-                                        
AutoSwitchHAConnection.this.isAsyncLearner = true;
-                                    }
-                                    // Address
-                                    final byte[] addressData = new 
byte[addressLength];
-                                    byteBufferRead.position(readPosition + 
AutoSwitchHAClient.HANDSHAKE_HEADER_SIZE);
-                                    byteBufferRead.get(addressData);
-                                    AutoSwitchHAConnection.this.slaveAddress = 
new String(addressData);
-
-                                    isSlaveSendHandshake = true;
-                                    byteBufferRead.position(readSocketPos);
-                                    ReadSocketService.this.processPosition += 
AutoSwitchHAClient.HANDSHAKE_HEADER_SIZE + addressLength;
-                                    LOGGER.info("Receive slave handshake, 
slaveId:{}, slaveAddress:{}, isSyncFromLastFile:{}, isAsyncLearner:{}",
-                                        AutoSwitchHAConnection.this.slaveId, 
AutoSwitchHAConnection.this.slaveAddress,
-                                        
AutoSwitchHAConnection.this.isSyncFromLastFile, 
AutoSwitchHAConnection.this.isAsyncLearner);
+                                // AddressLength
+                                int addressLength = 
byteBufferRead.getInt(readPosition + AutoSwitchHAClient.HANDSHAKE_HEADER_SIZE - 
4);
+                                if (diff < 
AutoSwitchHAClient.HANDSHAKE_HEADER_SIZE + addressLength) {
+                                    processSuccess = false;
+                                    break;
+                                }
+                                // Flag(isSyncFromLastFile)
+                                short syncFromLastFileFlag = 
byteBufferRead.getShort(readPosition + AutoSwitchHAClient.HANDSHAKE_HEADER_SIZE 
- 8);
+                                if (syncFromLastFileFlag == 1) {
+                                    
AutoSwitchHAConnection.this.isSyncFromLastFile = true;
+                                }
+                                // Flag(isAsyncLearner role)
+                                short isAsyncLearner = 
byteBufferRead.getShort(readPosition + AutoSwitchHAClient.HANDSHAKE_HEADER_SIZE 
- 6);
+                                if (isAsyncLearner == 1) {
+                                    AutoSwitchHAConnection.this.isAsyncLearner 
= true;
                                 }
+                                // Address
+                                final byte[] addressData = new 
byte[addressLength];
+                                byteBufferRead.position(readPosition + 
AutoSwitchHAClient.HANDSHAKE_HEADER_SIZE);
+                                byteBufferRead.get(addressData);
+                                AutoSwitchHAConnection.this.slaveAddress = new 
String(addressData);
+
+                                isSlaveSendHandshake = true;
+                                byteBufferRead.position(readSocketPos);
+                                ReadSocketService.this.processPosition += 
AutoSwitchHAClient.HANDSHAKE_HEADER_SIZE + addressLength;
+                                LOGGER.info("Receive slave handshake, 
slaveAddress:{}, isSyncFromLastFile:{}, isAsyncLearner:{}",
+                                    AutoSwitchHAConnection.this.slaveAddress, 
AutoSwitchHAConnection.this.isSyncFromLastFile, 
AutoSwitchHAConnection.this.isAsyncLearner);
                                 break;
                             case TRANSFER:
-                                if (diff >= 
AutoSwitchHAClient.TRANSFER_HEADER_SIZE) {
-                                    long slaveMaxOffset = 
byteBufferRead.getLong(readPosition + 4);
-                                    ReadSocketService.this.processPosition += 
AutoSwitchHAClient.TRANSFER_HEADER_SIZE;
+                                long slaveMaxOffset = 
byteBufferRead.getLong(readPosition + 4);
+                                ReadSocketService.this.processPosition += 
AutoSwitchHAClient.TRANSFER_HEADER_SIZE;
 
-                                    AutoSwitchHAConnection.this.slaveAckOffset 
= slaveMaxOffset;
-                                    if (slaveRequestOffset < 0) {
-                                        slaveRequestOffset = slaveMaxOffset;
-                                    }
-                                    byteBufferRead.position(readSocketPos);
-                                    maybeExpandInSyncStateSet(slaveMaxOffset);
-                                    
AutoSwitchHAConnection.this.haService.notifyTransferSome(AutoSwitchHAConnection.this.slaveAckOffset);
-                                    LOGGER.info("slave[" + clientAddress + "] 
request offset " + slaveMaxOffset);
+                                AutoSwitchHAConnection.this.slaveAckOffset = 
slaveMaxOffset;
+                                if (slaveRequestOffset < 0) {
+                                    slaveRequestOffset = slaveMaxOffset;
                                 }
+                                byteBufferRead.position(readSocketPos);
+                                maybeExpandInSyncStateSet(slaveMaxOffset);
+                                
AutoSwitchHAConnection.this.haService.notifyTransferSome(AutoSwitchHAConnection.this.slaveAckOffset);
+                                LOGGER.info("slave[" + clientAddress + "] 
request offset " + slaveMaxOffset);
                                 break;
                             default:
                                 LOGGER.error("Current state illegal {}", 
currentState);
@@ -339,7 +341,9 @@ public class AutoSwitchHAConnection implements HAConnection 
{
                             LOGGER.warn("Master change state from {} to {}", 
currentState, slaveState);
                             changeCurrentState(slaveState);
                         }
-                        continue;
+                        if (processSuccess) {
+                            continue;
+                        }
                     }
 
                     if (!byteBufferRead.hasRemaining()) {
@@ -379,7 +383,7 @@ public class AutoSwitchHAConnection implements HAConnection 
{
         }
 
         @Override
-        protected boolean transferData(int maxTransferSize) {
+        protected boolean transferData(int maxTransferSize) throws Exception {
 
             if (null != this.selectMappedBufferResult && maxTransferSize >= 0) 
{
                 
this.selectMappedBufferResult.getByteBuffer().limit(maxTransferSize);
@@ -489,7 +493,7 @@ public class AutoSwitchHAConnection implements HAConnection 
{
             return true;
         }
 
-        private boolean handshakeWithSlave() {
+        private boolean handshakeWithSlave() throws IOException {
             // Write Header
             boolean result = this.haWriter.write(this.socketChannel, 
this.byteBufferHeader);
 
diff --git 
a/store/src/main/java/org/apache/rocketmq/store/ha/autoswitch/AutoSwitchHAService.java
 
b/store/src/main/java/org/apache/rocketmq/store/ha/autoswitch/AutoSwitchHAService.java
index 773817072..84b047e2d 100644
--- 
a/store/src/main/java/org/apache/rocketmq/store/ha/autoswitch/AutoSwitchHAService.java
+++ 
b/store/src/main/java/org/apache/rocketmq/store/ha/autoswitch/AutoSwitchHAService.java
@@ -46,7 +46,7 @@ import 
org.apache.rocketmq.store.ha.HAConnectionStateNotificationService;
  */
 public class AutoSwitchHAService extends DefaultHAService {
     private static final InternalLogger LOGGER = 
InternalLoggerFactory.getLogger(LoggerName.STORE_LOGGER_NAME);
-    private final ExecutorService executorService = 
Executors.newSingleThreadScheduledExecutor(new 
ThreadFactoryImpl("AutoSwitchHAService_Executor_"));
+    private final ExecutorService executorService = 
Executors.newSingleThreadExecutor(new 
ThreadFactoryImpl("AutoSwitchHAService_Executor_"));
     private final List<Consumer<Set<String>>> syncStateSetChangedListeners = 
new ArrayList<>();
     private final CopyOnWriteArraySet<String> syncStateSet = new 
CopyOnWriteArraySet<>();
     private String localAddress;
@@ -144,6 +144,16 @@ public class AutoSwitchHAService extends DefaultHAService {
     @Override public void updateMasterAddress(String newAddr) {
     }
 
+    @Override public void removeConnection(HAConnection conn) {
+        final Set<String> syncStateSet = getSyncStateSet();
+        String slave = ((AutoSwitchHAConnection) conn).getSlaveAddress();
+        if (syncStateSet.contains(slave)) {
+            syncStateSet.remove(slave);
+            notifySyncStateSetChanged(syncStateSet);
+        }
+        super.removeConnection(conn);
+    }
+
     public void registerSyncStateSetChangedListener(final 
Consumer<Set<String>> listener) {
         this.syncStateSetChangedListeners.add(listener);
     }
@@ -168,7 +178,7 @@ public class AutoSwitchHAService extends DefaultHAService {
             final AutoSwitchHAConnection connection = (AutoSwitchHAConnection) 
haConnection;
             final String slaveAddress = connection.getSlaveAddress();
             if (currentSyncStateSet.contains(slaveAddress)) {
-                if (this.defaultMessageStore.getMaxPhyOffset() == 
connection.getSlaveAckOffset()) {
+                if (connection.getSlaveAckOffset() < 0 || 
this.defaultMessageStore.getMaxPhyOffset() == connection.getSlaveAckOffset()) {
                     continue;
                 }
                 if ((System.currentTimeMillis() - 
connection.getLastCatchUpTimeMs()) > haMaxTimeSlaveNotCatchup) {
diff --git a/store/src/main/java/org/apache/rocketmq/store/ha/io/HAWriter.java 
b/store/src/main/java/org/apache/rocketmq/store/ha/io/HAWriter.java
index 21daaa6f7..4835ca8ba 100644
--- a/store/src/main/java/org/apache/rocketmq/store/ha/io/HAWriter.java
+++ b/store/src/main/java/org/apache/rocketmq/store/ha/io/HAWriter.java
@@ -30,28 +30,22 @@ public class HAWriter {
     private static final InternalLogger LOGGER = 
InternalLoggerFactory.getLogger(LoggerName.STORE_LOGGER_NAME);
     protected final List<HAWriteHook> writeHookList = new ArrayList<>();
 
-    public boolean write(SocketChannel socketChannel, ByteBuffer 
byteBufferWrite) {
+    public boolean write(SocketChannel socketChannel, ByteBuffer 
byteBufferWrite) throws IOException {
         int writeSizeZeroTimes = 0;
         while (byteBufferWrite.hasRemaining()) {
-            try {
-                int writeSize = socketChannel.write(byteBufferWrite);
-                for (HAWriteHook writeHook : writeHookList) {
-                    writeHook.afterWrite(writeSize);
-                }
-                if (writeSize > 0) {
-                    writeSizeZeroTimes = 0;
-                } else if (writeSize == 0) {
-                    if (++writeSizeZeroTimes >= 3) {
-                        break;
-                    }
-                } else {
-                    LOGGER.info("Write socket < 0");
+            int writeSize = socketChannel.write(byteBufferWrite);
+            for (HAWriteHook writeHook : writeHookList) {
+                writeHook.afterWrite(writeSize);
+            }
+            if (writeSize > 0) {
+                writeSizeZeroTimes = 0;
+            } else if (writeSize == 0) {
+                if (++writeSizeZeroTimes >= 3) {
+                    break;
                 }
-            } catch (IOException e) {
-                LOGGER.info("Write socket exception", e);
-                return false;
+            } else {
+                LOGGER.info("Write socket < 0");
             }
-
         }
 
         return !byteBufferWrite.hasRemaining();

Reply via email to