This is an automated email from the ASF dual-hosted git repository.

jinrongtong pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/rocketmq.git


The following commit(s) were added to refs/heads/develop by this push:
     new aa0216007 [ISSUE #5651] Fix unable to send messages normally due to HA 
disconnection (#5667)
aa0216007 is described below

commit aa02160074d50eaa19a3212b68fdc3c10a89f692
Author: mxsm <[email protected]>
AuthorDate: Fri Dec 9 11:23:31 2022 +0800

    [ISSUE #5651] Fix unable to send messages normally due to HA disconnection 
(#5667)
---
 .../store/ha/autoswitch/AutoSwitchHAClient.java    | 112 ++++++++++++---------
 1 file changed, 62 insertions(+), 50 deletions(-)

diff --git 
a/store/src/main/java/org/apache/rocketmq/store/ha/autoswitch/AutoSwitchHAClient.java
 
b/store/src/main/java/org/apache/rocketmq/store/ha/autoswitch/AutoSwitchHAClient.java
index 4e0e37aed..ad7644b8f 100644
--- 
a/store/src/main/java/org/apache/rocketmq/store/ha/autoswitch/AutoSwitchHAClient.java
+++ 
b/store/src/main/java/org/apache/rocketmq/store/ha/autoswitch/AutoSwitchHAClient.java
@@ -467,65 +467,77 @@ public class AutoSwitchHAClient extends ServiceThread 
implements HAClient {
                             return false;
                         }
 
-                        if (diff >= 
AutoSwitchHAConnection.TRANSFER_HEADER_SIZE + bodySize || diff >= 
AutoSwitchHAConnection.HANDSHAKE_HEADER_SIZE + bodySize) {
-                            switch (AutoSwitchHAClient.this.currentState) {
-                                case HANDSHAKE: {
-                                    AutoSwitchHAClient.this.processPosition += 
AutoSwitchHAConnection.HANDSHAKE_HEADER_SIZE;
-                                    // Truncate log
-                                    int entrySize = 
AutoSwitchHAConnection.EPOCH_ENTRY_SIZE;
-                                    final int entryNums = bodySize / entrySize;
-                                    final ArrayList<EpochEntry> epochEntries = 
new ArrayList<>(entryNums);
-                                    for (int i = 0; i < entryNums; i++) {
-                                        int epoch = 
byteBufferRead.getInt(AutoSwitchHAClient.this.processPosition + i * entrySize);
-                                        long startOffset = 
byteBufferRead.getLong(AutoSwitchHAClient.this.processPosition + i * entrySize 
+ 4);
-                                        epochEntries.add(new EpochEntry(epoch, 
startOffset));
-                                    }
-                                    byteBufferRead.position(readSocketPos);
-                                    AutoSwitchHAClient.this.processPosition += 
bodySize;
-                                    LOGGER.info("Receive handshake, 
masterMaxPosition {}, masterEpochEntries:{}, try truncate log", masterOffset, 
epochEntries);
-                                    if (!doTruncate(epochEntries, 
masterOffset)) {
-                                        waitForRunning(1000 * 2);
-                                        LOGGER.error("AutoSwitchHAClient 
truncate log failed in handshake state");
+                        //flag whether the received data is complete
+                        boolean isComplete = true;
+                        switch (AutoSwitchHAClient.this.currentState) {
+                            case HANDSHAKE: {
+                                if (diff < 
AutoSwitchHAConnection.HANDSHAKE_HEADER_SIZE + bodySize) {
+                                    //The received HANDSHAKE data is not 
complete
+                                    isComplete = false;
+                                    break;
+                                }
+                                AutoSwitchHAClient.this.processPosition += 
AutoSwitchHAConnection.HANDSHAKE_HEADER_SIZE;
+                                // Truncate log
+                                int entrySize = 
AutoSwitchHAConnection.EPOCH_ENTRY_SIZE;
+                                final int entryNums = bodySize / entrySize;
+                                final ArrayList<EpochEntry> epochEntries = new 
ArrayList<>(entryNums);
+                                for (int i = 0; i < entryNums; i++) {
+                                    int epoch = 
byteBufferRead.getInt(AutoSwitchHAClient.this.processPosition + i * entrySize);
+                                    long startOffset = 
byteBufferRead.getLong(AutoSwitchHAClient.this.processPosition + i * entrySize 
+ 4);
+                                    epochEntries.add(new EpochEntry(epoch, 
startOffset));
+                                }
+                                byteBufferRead.position(readSocketPos);
+                                AutoSwitchHAClient.this.processPosition += 
bodySize;
+                                LOGGER.info("Receive handshake, 
masterMaxPosition {}, masterEpochEntries:{}, try truncate log", masterOffset, 
epochEntries);
+                                if (!doTruncate(epochEntries, masterOffset)) {
+                                    waitForRunning(1000 * 2);
+                                    LOGGER.error("AutoSwitchHAClient truncate 
log failed in handshake state");
+                                    return false;
+                                }
+                            }
+                            break;
+                            case TRANSFER: {
+                                if (diff < 
AutoSwitchHAConnection.TRANSFER_HEADER_SIZE + bodySize) {
+                                    //The received TRANSFER data is not 
complete
+                                    isComplete = false;
+                                    break;
+                                }
+                                byte[] bodyData = new byte[bodySize];
+                                
byteBufferRead.position(AutoSwitchHAClient.this.processPosition + 
AutoSwitchHAConnection.TRANSFER_HEADER_SIZE);
+                                byteBufferRead.get(bodyData);
+                                byteBufferRead.position(readSocketPos);
+                                AutoSwitchHAClient.this.processPosition += 
AutoSwitchHAConnection.TRANSFER_HEADER_SIZE + bodySize;
+                                long slavePhyOffset = 
AutoSwitchHAClient.this.messageStore.getMaxPhyOffset();
+                                if (slavePhyOffset != 0) {
+                                    if (slavePhyOffset != masterOffset) {
+                                        LOGGER.error("master pushed offset not 
equal the max phy offset in slave, SLAVE: "
+                                            + slavePhyOffset + " MASTER: " + 
masterOffset);
                                         return false;
                                     }
                                 }
-                                break;
-                                case TRANSFER: {
-                                    byte[] bodyData = new byte[bodySize];
-                                    
byteBufferRead.position(AutoSwitchHAClient.this.processPosition + 
AutoSwitchHAConnection.TRANSFER_HEADER_SIZE);
-                                    byteBufferRead.get(bodyData);
-                                    byteBufferRead.position(readSocketPos);
-                                    AutoSwitchHAClient.this.processPosition += 
AutoSwitchHAConnection.TRANSFER_HEADER_SIZE + bodySize;
-                                    long slavePhyOffset = 
AutoSwitchHAClient.this.messageStore.getMaxPhyOffset();
-                                    if (slavePhyOffset != 0) {
-                                        if (slavePhyOffset != masterOffset) {
-                                            LOGGER.error("master pushed offset 
not equal the max phy offset in slave, SLAVE: "
-                                                + slavePhyOffset + " MASTER: " 
+ masterOffset);
-                                            return false;
-                                        }
-                                    }
 
-                                    // If epoch changed
-                                    if (masterEpoch != 
AutoSwitchHAClient.this.currentReceivedEpoch) {
-                                        
AutoSwitchHAClient.this.currentReceivedEpoch = masterEpoch;
-                                        
AutoSwitchHAClient.this.epochCache.appendEntry(new EpochEntry(masterEpoch, 
masterEpochStartOffset));
-                                    }
+                                // If epoch changed
+                                if (masterEpoch != 
AutoSwitchHAClient.this.currentReceivedEpoch) {
+                                    
AutoSwitchHAClient.this.currentReceivedEpoch = masterEpoch;
+                                    
AutoSwitchHAClient.this.epochCache.appendEntry(new EpochEntry(masterEpoch, 
masterEpochStartOffset));
+                                }
 
-                                    if (bodySize > 0) {
-                                        
AutoSwitchHAClient.this.messageStore.appendToCommitLog(masterOffset, bodyData, 
0, bodyData.length);
-                                    }
+                                if (bodySize > 0) {
+                                    
AutoSwitchHAClient.this.messageStore.appendToCommitLog(masterOffset, bodyData, 
0, bodyData.length);
+                                }
 
-                                    
haService.updateConfirmOffset(Math.min(confirmOffset, 
messageStore.getMaxPhyOffset()));
+                                
haService.updateConfirmOffset(Math.min(confirmOffset, 
messageStore.getMaxPhyOffset()));
 
-                                    if (!reportSlaveMaxOffset()) {
-                                        LOGGER.error("AutoSwitchHAClient 
report max offset to master failed");
-                                        return false;
-                                    }
-                                    break;
+                                if (!reportSlaveMaxOffset()) {
+                                    LOGGER.error("AutoSwitchHAClient report 
max offset to master failed");
+                                    return false;
                                 }
-                                default:
-                                    break;
+                                break;
                             }
+                            default:
+                                break;
+                        }
+                        if (isComplete) {
                             continue;
                         }
 

Reply via email to