This is an automated email from the ASF dual-hosted git repository.
jinrongtong pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/rocketmq.git
The following commit(s) were added to refs/heads/develop by this push:
new 0c3b40586 [ISSUE #5157] Optimize AutoSwitchHAConnection HandShake's
data protocol (#5182)
0c3b40586 is described below
commit 0c3b40586a2b7ec676247f54be32d1466124dda8
Author: mxsm <[email protected]>
AuthorDate: Thu Nov 17 20:06:34 2022 +0800
[ISSUE #5157] Optimize AutoSwitchHAConnection HandShake's data protocol
(#5182)
---
.../store/ha/autoswitch/AutoSwitchHAClient.java | 45 +++++++++++++---------
.../ha/autoswitch/AutoSwitchHAConnection.java | 42 ++++++++++++++------
2 files changed, 57 insertions(+), 30 deletions(-)
diff --git
a/store/src/main/java/org/apache/rocketmq/store/ha/autoswitch/AutoSwitchHAClient.java
b/store/src/main/java/org/apache/rocketmq/store/ha/autoswitch/AutoSwitchHAClient.java
index 7461279c7..fc85d4054 100644
---
a/store/src/main/java/org/apache/rocketmq/store/ha/autoswitch/AutoSwitchHAClient.java
+++
b/store/src/main/java/org/apache/rocketmq/store/ha/autoswitch/AutoSwitchHAClient.java
@@ -445,27 +445,32 @@ public class AutoSwitchHAClient extends ServiceThread
implements HAClient {
try {
while (true) {
int diff = byteBufferRead.position() -
AutoSwitchHAClient.this.processPosition;
- if (diff >= AutoSwitchHAConnection.MSG_HEADER_SIZE) {
- int processPosition =
AutoSwitchHAClient.this.processPosition;
- int masterState =
byteBufferRead.getInt(processPosition + AutoSwitchHAConnection.MSG_HEADER_SIZE
- 36);
- int bodySize = byteBufferRead.getInt(processPosition +
AutoSwitchHAConnection.MSG_HEADER_SIZE - 32);
- long masterOffset =
byteBufferRead.getLong(processPosition + AutoSwitchHAConnection.MSG_HEADER_SIZE
- 28);
- int masterEpoch =
byteBufferRead.getInt(processPosition + AutoSwitchHAConnection.MSG_HEADER_SIZE
- 20);
- long masterEpochStartOffset =
byteBufferRead.getLong(processPosition + AutoSwitchHAConnection.MSG_HEADER_SIZE
- 16);
- long confirmOffset =
byteBufferRead.getLong(processPosition + AutoSwitchHAConnection.MSG_HEADER_SIZE
- 8);
-
+ if (diff >= AutoSwitchHAConnection.HANDSHAKE_HEADER_SIZE) {
+ final int processPosition =
AutoSwitchHAClient.this.processPosition;
+ int masterState =
byteBufferRead.getInt(processPosition +
AutoSwitchHAConnection.HANDSHAKE_HEADER_SIZE - 20);
+ int bodySize = byteBufferRead.getInt(processPosition +
AutoSwitchHAConnection.HANDSHAKE_HEADER_SIZE - 16);
+ long masterOffset =
byteBufferRead.getLong(processPosition +
AutoSwitchHAConnection.HANDSHAKE_HEADER_SIZE - 12);
+ int masterEpoch =
byteBufferRead.getInt(processPosition +
AutoSwitchHAConnection.HANDSHAKE_HEADER_SIZE - 4);
+ long masterEpochStartOffset = 0;
+ long confirmOffset = 0;
+ // if master send transfer header data, set
masterEpochStartOffset and confirmOffset value.
+ if (masterState ==
HAConnectionState.TRANSFER.ordinal() && diff >=
AutoSwitchHAConnection.TRANSFER_HEADER_SIZE) {
+ masterEpochStartOffset =
byteBufferRead.getLong(processPosition +
AutoSwitchHAConnection.TRANSFER_HEADER_SIZE - 16);
+ confirmOffset =
byteBufferRead.getLong(processPosition +
AutoSwitchHAConnection.TRANSFER_HEADER_SIZE - 8);
+ }
if (masterState !=
AutoSwitchHAClient.this.currentState.ordinal()) {
- AutoSwitchHAClient.this.processPosition +=
AutoSwitchHAConnection.MSG_HEADER_SIZE + bodySize;
+ int headerSize = masterState ==
HAConnectionState.TRANSFER.ordinal() ?
AutoSwitchHAConnection.TRANSFER_HEADER_SIZE :
AutoSwitchHAConnection.HANDSHAKE_HEADER_SIZE;
+ AutoSwitchHAClient.this.processPosition +=
headerSize + bodySize;
AutoSwitchHAClient.this.waitForRunning(1);
LOGGER.error("State not matched, masterState:{},
slaveState:{}, bodySize:{}, offset:{}, masterEpoch:{},
masterEpochStartOffset:{}, confirmOffset:{}",
masterState,
AutoSwitchHAClient.this.currentState, bodySize, masterOffset, masterEpoch,
masterEpochStartOffset, confirmOffset);
- return true;
+ return false;
}
- if (diff >= (AutoSwitchHAConnection.MSG_HEADER_SIZE +
bodySize)) {
+ if (diff >=
AutoSwitchHAConnection.TRANSFER_HEADER_SIZE + bodySize || diff >=
AutoSwitchHAConnection.HANDSHAKE_HEADER_SIZE + bodySize) {
switch (AutoSwitchHAClient.this.currentState) {
- case HANDSHAKE:
- AutoSwitchHAClient.this.processPosition +=
AutoSwitchHAConnection.MSG_HEADER_SIZE;
+ case HANDSHAKE: {
+ AutoSwitchHAClient.this.processPosition +=
AutoSwitchHAConnection.HANDSHAKE_HEADER_SIZE;
// Truncate log
int entrySize =
AutoSwitchHAConnection.EPOCH_ENTRY_SIZE;
final int entryNums = bodySize / entrySize;
@@ -483,14 +488,14 @@ public class AutoSwitchHAClient extends ServiceThread
implements HAClient {
LOGGER.error("AutoSwitchHAClient
truncate log failed in handshake state");
return false;
}
- break;
- case TRANSFER:
+ }
+ break;
+ case TRANSFER: {
byte[] bodyData = new byte[bodySize];
-
byteBufferRead.position(AutoSwitchHAClient.this.processPosition +
AutoSwitchHAConnection.MSG_HEADER_SIZE);
+
byteBufferRead.position(AutoSwitchHAClient.this.processPosition +
AutoSwitchHAConnection.TRANSFER_HEADER_SIZE);
byteBufferRead.get(bodyData);
byteBufferRead.position(readSocketPos);
- AutoSwitchHAClient.this.processPosition +=
AutoSwitchHAConnection.MSG_HEADER_SIZE + bodySize;
-
+ AutoSwitchHAClient.this.processPosition +=
AutoSwitchHAConnection.TRANSFER_HEADER_SIZE + bodySize;
long slavePhyOffset =
AutoSwitchHAClient.this.messageStore.getMaxPhyOffset();
if (slavePhyOffset != 0) {
if (slavePhyOffset != masterOffset) {
@@ -517,11 +522,13 @@ public class AutoSwitchHAClient extends ServiceThread
implements HAClient {
return false;
}
break;
+ }
default:
break;
}
continue;
}
+
}
if (!byteBufferRead.hasRemaining()) {
diff --git
a/store/src/main/java/org/apache/rocketmq/store/ha/autoswitch/AutoSwitchHAConnection.java
b/store/src/main/java/org/apache/rocketmq/store/ha/autoswitch/AutoSwitchHAConnection.java
index de20625aa..755b89ade 100644
---
a/store/src/main/java/org/apache/rocketmq/store/ha/autoswitch/AutoSwitchHAConnection.java
+++
b/store/src/main/java/org/apache/rocketmq/store/ha/autoswitch/AutoSwitchHAConnection.java
@@ -40,12 +40,36 @@ import org.apache.rocketmq.store.ha.io.AbstractHAReader;
import org.apache.rocketmq.store.ha.io.HAWriter;
public class AutoSwitchHAConnection implements HAConnection {
+
+ /**
+ * Handshake data protocol in syncing msg from master. Format:
+ * <pre>
+ *
+----------------------------------------------------------------------------------------------+
+ * | current state | body size | offset | epoch |
EpochEntrySize * EpochEntryNums |
+ * | (4bytes) | (4bytes) | (8bytes) | (4bytes) |
(12bytes * EpochEntryNums) |
+ *
+----------------------------------------------------------------------------------------------+
+ * | Header |
Body |
+ * | |
|
+ * </pre>
+ * Handshake Header protocol Format:
+ * current state + body size + offset + epoch
+ */
+ public static final int HANDSHAKE_HEADER_SIZE = 4 + 4 + 8 + 4;
+
/**
- * Header protocol in syncing msg from master. Format: current state +
body size + offset + epoch +
- * epochStartOffset + additionalInfo(confirmOffset). If the msg is
handShakeMsg, the body size = EpochEntrySize *
- * EpochEntryNums, the offset is maxOffset in master.
+ * Transfer data protocol in syncing msg from master. Format:
+ * <pre>
+ *
+---------------------------------------------------------------------------------------------------------------------+
+ * | current state | body size | offset | epoch |
epochStartOffset | confirmOffset | log data |
+ * | (4bytes) | (4bytes) | (8bytes) | (4bytes) |
(8bytes) | (8bytes) | (data size) |
+ *
+---------------------------------------------------------------------------------------------------------------------+
+ * | Header
| Body |
+ * |
| |
+ * </pre>
+ * Transfer Header protocol Format:
+ * current state + body size + offset + epoch + epochStartOffset +
additionalInfo(confirmOffset)
*/
- public static final int MSG_HEADER_SIZE = 4 + 4 + 8 + 4 + 8 + 8;
+ public static final int TRANSFER_HEADER_SIZE = HANDSHAKE_HEADER_SIZE + 8 +
8;
public static final int EPOCH_ENTRY_SIZE = 12;
private static final InternalLogger LOGGER =
InternalLoggerFactory.getLogger(LoggerName.STORE_LOGGER_NAME);
private final AutoSwitchHAService haService;
@@ -433,7 +457,7 @@ public class AutoSwitchHAConnection implements HAConnection
{
protected final SocketChannel socketChannel;
protected final HAWriter haWriter;
- protected final ByteBuffer byteBufferHeader =
ByteBuffer.allocate(MSG_HEADER_SIZE);
+ protected final ByteBuffer byteBufferHeader =
ByteBuffer.allocate(TRANSFER_HEADER_SIZE);
// Store master epochFileCache: (Epoch + startOffset) * 1000
private final ByteBuffer handShakeBuffer =
ByteBuffer.allocate(EPOCH_ENTRY_SIZE * 1000);
protected long nextTransferFromWhere = -1;
@@ -466,7 +490,7 @@ public class AutoSwitchHAConnection implements HAConnection
{
final int lastEpoch =
AutoSwitchHAConnection.this.epochCache.lastEpoch();
final long maxPhyOffset =
AutoSwitchHAConnection.this.haService.getDefaultMessageStore().getMaxPhyOffset();
this.byteBufferHeader.position(0);
- this.byteBufferHeader.limit(MSG_HEADER_SIZE);
+ this.byteBufferHeader.limit(HANDSHAKE_HEADER_SIZE);
// State
this.byteBufferHeader.putInt(currentState.ordinal());
// Body size
@@ -475,10 +499,6 @@ public class AutoSwitchHAConnection implements
HAConnection {
this.byteBufferHeader.putLong(maxPhyOffset);
// Epoch
this.byteBufferHeader.putInt(lastEpoch);
- // EpochStartOffset (not needed in handshake)
- this.byteBufferHeader.putLong(0L);
- // Additional info (not needed in handshake)
- this.byteBufferHeader.putLong(0L);
this.byteBufferHeader.flip();
// EpochEntries
@@ -527,7 +547,7 @@ public class AutoSwitchHAConnection implements HAConnection
{
}
// Build Header
this.byteBufferHeader.position(0);
- this.byteBufferHeader.limit(MSG_HEADER_SIZE);
+ this.byteBufferHeader.limit(TRANSFER_HEADER_SIZE);
// State
this.byteBufferHeader.putInt(currentState.ordinal());
// Body size