This is an automated email from the ASF dual-hosted git repository.
jinrongtong pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/rocketmq.git
The following commit(s) were added to refs/heads/develop by this push:
new aa0216007 [ISSUE #5651] Fix unable to send messages normally due to HA
disconnection (#5667)
aa0216007 is described below
commit aa02160074d50eaa19a3212b68fdc3c10a89f692
Author: mxsm <[email protected]>
AuthorDate: Fri Dec 9 11:23:31 2022 +0800
[ISSUE #5651] Fix unable to send messages normally due to HA disconnection
(#5667)
---
.../store/ha/autoswitch/AutoSwitchHAClient.java | 112 ++++++++++++---------
1 file changed, 62 insertions(+), 50 deletions(-)
diff --git
a/store/src/main/java/org/apache/rocketmq/store/ha/autoswitch/AutoSwitchHAClient.java
b/store/src/main/java/org/apache/rocketmq/store/ha/autoswitch/AutoSwitchHAClient.java
index 4e0e37aed..ad7644b8f 100644
---
a/store/src/main/java/org/apache/rocketmq/store/ha/autoswitch/AutoSwitchHAClient.java
+++
b/store/src/main/java/org/apache/rocketmq/store/ha/autoswitch/AutoSwitchHAClient.java
@@ -467,65 +467,77 @@ public class AutoSwitchHAClient extends ServiceThread
implements HAClient {
return false;
}
- if (diff >=
AutoSwitchHAConnection.TRANSFER_HEADER_SIZE + bodySize || diff >=
AutoSwitchHAConnection.HANDSHAKE_HEADER_SIZE + bodySize) {
- switch (AutoSwitchHAClient.this.currentState) {
- case HANDSHAKE: {
- AutoSwitchHAClient.this.processPosition +=
AutoSwitchHAConnection.HANDSHAKE_HEADER_SIZE;
- // Truncate log
- int entrySize =
AutoSwitchHAConnection.EPOCH_ENTRY_SIZE;
- final int entryNums = bodySize / entrySize;
- final ArrayList<EpochEntry> epochEntries =
new ArrayList<>(entryNums);
- for (int i = 0; i < entryNums; i++) {
- int epoch =
byteBufferRead.getInt(AutoSwitchHAClient.this.processPosition + i * entrySize);
- long startOffset =
byteBufferRead.getLong(AutoSwitchHAClient.this.processPosition + i * entrySize
+ 4);
- epochEntries.add(new EpochEntry(epoch,
startOffset));
- }
- byteBufferRead.position(readSocketPos);
- AutoSwitchHAClient.this.processPosition +=
bodySize;
- LOGGER.info("Receive handshake,
masterMaxPosition {}, masterEpochEntries:{}, try truncate log", masterOffset,
epochEntries);
- if (!doTruncate(epochEntries,
masterOffset)) {
- waitForRunning(1000 * 2);
- LOGGER.error("AutoSwitchHAClient
truncate log failed in handshake state");
+ //flag whether the received data is complete
+ boolean isComplete = true;
+ switch (AutoSwitchHAClient.this.currentState) {
+ case HANDSHAKE: {
+ if (diff <
AutoSwitchHAConnection.HANDSHAKE_HEADER_SIZE + bodySize) {
+ //The received HANDSHAKE data is not
complete
+ isComplete = false;
+ break;
+ }
+ AutoSwitchHAClient.this.processPosition +=
AutoSwitchHAConnection.HANDSHAKE_HEADER_SIZE;
+ // Truncate log
+ int entrySize =
AutoSwitchHAConnection.EPOCH_ENTRY_SIZE;
+ final int entryNums = bodySize / entrySize;
+ final ArrayList<EpochEntry> epochEntries = new
ArrayList<>(entryNums);
+ for (int i = 0; i < entryNums; i++) {
+ int epoch =
byteBufferRead.getInt(AutoSwitchHAClient.this.processPosition + i * entrySize);
+ long startOffset =
byteBufferRead.getLong(AutoSwitchHAClient.this.processPosition + i * entrySize
+ 4);
+ epochEntries.add(new EpochEntry(epoch,
startOffset));
+ }
+ byteBufferRead.position(readSocketPos);
+ AutoSwitchHAClient.this.processPosition +=
bodySize;
+ LOGGER.info("Receive handshake,
masterMaxPosition {}, masterEpochEntries:{}, try truncate log", masterOffset,
epochEntries);
+ if (!doTruncate(epochEntries, masterOffset)) {
+ waitForRunning(1000 * 2);
+ LOGGER.error("AutoSwitchHAClient truncate
log failed in handshake state");
+ return false;
+ }
+ }
+ break;
+ case TRANSFER: {
+ if (diff <
AutoSwitchHAConnection.TRANSFER_HEADER_SIZE + bodySize) {
+ //The received TRANSFER data is not
complete
+ isComplete = false;
+ break;
+ }
+ byte[] bodyData = new byte[bodySize];
+
byteBufferRead.position(AutoSwitchHAClient.this.processPosition +
AutoSwitchHAConnection.TRANSFER_HEADER_SIZE);
+ byteBufferRead.get(bodyData);
+ byteBufferRead.position(readSocketPos);
+ AutoSwitchHAClient.this.processPosition +=
AutoSwitchHAConnection.TRANSFER_HEADER_SIZE + bodySize;
+ long slavePhyOffset =
AutoSwitchHAClient.this.messageStore.getMaxPhyOffset();
+ if (slavePhyOffset != 0) {
+ if (slavePhyOffset != masterOffset) {
+ LOGGER.error("master pushed offset not
equal the max phy offset in slave, SLAVE: "
+ + slavePhyOffset + " MASTER: " +
masterOffset);
return false;
}
}
- break;
- case TRANSFER: {
- byte[] bodyData = new byte[bodySize];
-
byteBufferRead.position(AutoSwitchHAClient.this.processPosition +
AutoSwitchHAConnection.TRANSFER_HEADER_SIZE);
- byteBufferRead.get(bodyData);
- byteBufferRead.position(readSocketPos);
- AutoSwitchHAClient.this.processPosition +=
AutoSwitchHAConnection.TRANSFER_HEADER_SIZE + bodySize;
- long slavePhyOffset =
AutoSwitchHAClient.this.messageStore.getMaxPhyOffset();
- if (slavePhyOffset != 0) {
- if (slavePhyOffset != masterOffset) {
- LOGGER.error("master pushed offset
not equal the max phy offset in slave, SLAVE: "
- + slavePhyOffset + " MASTER: "
+ masterOffset);
- return false;
- }
- }
- // If epoch changed
- if (masterEpoch !=
AutoSwitchHAClient.this.currentReceivedEpoch) {
-
AutoSwitchHAClient.this.currentReceivedEpoch = masterEpoch;
-
AutoSwitchHAClient.this.epochCache.appendEntry(new EpochEntry(masterEpoch,
masterEpochStartOffset));
- }
+ // If epoch changed
+ if (masterEpoch !=
AutoSwitchHAClient.this.currentReceivedEpoch) {
+
AutoSwitchHAClient.this.currentReceivedEpoch = masterEpoch;
+
AutoSwitchHAClient.this.epochCache.appendEntry(new EpochEntry(masterEpoch,
masterEpochStartOffset));
+ }
- if (bodySize > 0) {
-
AutoSwitchHAClient.this.messageStore.appendToCommitLog(masterOffset, bodyData,
0, bodyData.length);
- }
+ if (bodySize > 0) {
+
AutoSwitchHAClient.this.messageStore.appendToCommitLog(masterOffset, bodyData,
0, bodyData.length);
+ }
-
haService.updateConfirmOffset(Math.min(confirmOffset,
messageStore.getMaxPhyOffset()));
+
haService.updateConfirmOffset(Math.min(confirmOffset,
messageStore.getMaxPhyOffset()));
- if (!reportSlaveMaxOffset()) {
- LOGGER.error("AutoSwitchHAClient
report max offset to master failed");
- return false;
- }
- break;
+ if (!reportSlaveMaxOffset()) {
+ LOGGER.error("AutoSwitchHAClient report
max offset to master failed");
+ return false;
}
- default:
- break;
+ break;
}
+ default:
+ break;
+ }
+ if (isComplete) {
continue;
}