This is an automated email from the ASF dual-hosted git repository.
jinrongtong pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/rocketmq.git
The following commit(s) were added to refs/heads/develop by this push:
new 2f460855d [ISSUE #5668] Polish AutoSwitchHAClient and
AutoSwitchHAConnection Comments (#5669)
2f460855d is described below
commit 2f460855de0e93ad748c40d24ce571fbf7d8baad
Author: mxsm <[email protected]>
AuthorDate: Fri Dec 9 14:39:25 2022 +0800
[ISSUE #5668] Polish AutoSwitchHAClient and AutoSwitchHAConnection Comments
(#5669)
---
.../store/ha/autoswitch/AutoSwitchHAClient.java | 48 ++++++++++++++++++++--
.../ha/autoswitch/AutoSwitchHAConnection.java | 28 ++++++-------
2 files changed, 59 insertions(+), 17 deletions(-)
diff --git
a/store/src/main/java/org/apache/rocketmq/store/ha/autoswitch/AutoSwitchHAClient.java
b/store/src/main/java/org/apache/rocketmq/store/ha/autoswitch/AutoSwitchHAClient.java
index ad7644b8f..2c3ab85f7 100644
---
a/store/src/main/java/org/apache/rocketmq/store/ha/autoswitch/AutoSwitchHAClient.java
+++
b/store/src/main/java/org/apache/rocketmq/store/ha/autoswitch/AutoSwitchHAClient.java
@@ -45,18 +45,60 @@ import org.apache.rocketmq.store.ha.io.HAWriter;
public class AutoSwitchHAClient extends ServiceThread implements HAClient {
/**
- * Handshake header buffer size. Schema: state ordinal + Two flags +
slaveAddressLength
+ * Handshake header buffer size. Schema: state ordinal + Two flags +
slaveAddressLength. Format:
+ *
+ * <pre>
+ * ┌──────────────────┬───────────────┐
+ * │isSyncFromLastFile│ isAsyncLearner│
+ * │ (2bytes) │ (2bytes) │
+ * └──────────────────┴───────────────┘
+ * \ /
+ * \ /
+ * ╲ /
+ * ╲ /
+ *
┌───────────────────────┬───────────────────────┬───────────────────────┐
+ * │ current state │ Flags │ slaveAddressLength
│
+ * │ (4bytes) │ (4bytes) │ (4bytes)
│
+ *
├───────────────────────┴───────────────────────┴───────────────────────┤
+ * │
│
+ * │ HANDSHAKE Header
│
+ * </pre>
+ * <p>
* Flag: isSyncFromLastFile(short), isAsyncLearner(short)... we can add
more flags in the future if needed
*/
public static final int HANDSHAKE_HEADER_SIZE = 4 + 4 + 4;
/**
- * Header + slaveAddress.
+ * Header + slaveAddress, Format:
+ * <pre>
+ * ┌──────────────────┬───────────────┐
+ * │isSyncFromLastFile│ isAsyncLearner│
+ * │ (2bytes) │ (2bytes) │
+ * └──────────────────┴───────────────┘
+ * \ /
+ * \ /
+ * ╲ /
+ * ╲ /
+ *
┌───────────────────────┬───────────────────────┬───────────────────────┬───────────────────────────────┐
+ * │ current state │ Flags │ slaveAddressLength
│ slaveAddress │
+ * │ (4bytes) │ (4bytes) │ (4bytes)
│ (50bytes) │
+ *
├───────────────────────┴───────────────────────┴───────────────────────┼───────────────────────────────┤
+ * │
│ │
+ * │ HANDSHAKE Header
│ body │
+ * </pre>
*/
public static final int HANDSHAKE_SIZE = HANDSHAKE_HEADER_SIZE + 50;
/**
- * Transfer header buffer size. Schema: state ordinal + maxOffset.
+ * Transfer header buffer size. Schema: state ordinal + maxOffset. Format:
+ * <pre>
+ * ┌───────────────────────┬───────────────────────┐
+ * │ current state │ maxOffset │
+ * │ (4bytes) │ (8bytes) │
+ * ├───────────────────────┴───────────────────────┤
+ * │ │
+ * │ TRANSFER Header │
+ * </pre>
*/
public static final int TRANSFER_HEADER_SIZE = 4 + 8;
public static final int MIN_HEADER_SIZE = Math.min(HANDSHAKE_HEADER_SIZE,
TRANSFER_HEADER_SIZE);
diff --git
a/store/src/main/java/org/apache/rocketmq/store/ha/autoswitch/AutoSwitchHAConnection.java
b/store/src/main/java/org/apache/rocketmq/store/ha/autoswitch/AutoSwitchHAConnection.java
index 1afb9f6de..7401574e5 100644
---
a/store/src/main/java/org/apache/rocketmq/store/ha/autoswitch/AutoSwitchHAConnection.java
+++
b/store/src/main/java/org/apache/rocketmq/store/ha/autoswitch/AutoSwitchHAConnection.java
@@ -44,30 +44,30 @@ public class AutoSwitchHAConnection implements HAConnection
{
/**
* Handshake data protocol in syncing msg from master. Format:
* <pre>
- *
+----------------------------------------------------------------------------------------------+
- * | current state | body size | offset | epoch |
EpochEntrySize * EpochEntryNums |
- * | (4bytes) | (4bytes) | (8bytes) | (4bytes) |
(12bytes * EpochEntryNums) |
- *
+----------------------------------------------------------------------------------------------+
- * | Header |
Body |
- * | |
|
+ *
┌─────────────────┬───────────────┬───────────┬───────────┬────────────────────────────────────┐
+ * │ current state │ body size │ offset │ epoch │
EpochEntrySize * EpochEntryNums │
+ * │ (4bytes) │ (4bytes) │ (8bytes) │ (4bytes) │
(12bytes * EpochEntryNums) │
+ *
├─────────────────┴───────────────┴───────────┴───────────┼────────────────────────────────────┤
+ * │ Header │
Body │
+ * │ │
│
* </pre>
* Handshake Header protocol Format:
- * current state + body size + offset + epoch
+ * current state + body size + offset + epoch
*/
public static final int HANDSHAKE_HEADER_SIZE = 4 + 4 + 8 + 4;
/**
* Transfer data protocol in syncing msg from master. Format:
* <pre>
- *
+---------------------------------------------------------------------------------------------------------------------+
- * | current state | body size | offset | epoch |
epochStartOffset | confirmOffset | log data |
- * | (4bytes) | (4bytes) | (8bytes) | (4bytes) |
(8bytes) | (8bytes) | (data size) |
- *
+---------------------------------------------------------------------------------------------------------------------+
- * | Header
| Body |
- * |
| |
+ *
┌─────────────────┬───────────────┬───────────┬───────────┬─────────────────────┬──────────────────┬──────────────────┐
+ * │ current state │ body size │ offset │ epoch │
epochStartOffset │ confirmOffset │ log data │
+ * │ (4bytes) │ (4bytes) │ (8bytes) │ (4bytes) │
(8bytes) │ (8bytes) │ (data size) │
+ *
├─────────────────┴───────────────┴───────────┴───────────┴─────────────────────┴──────────────────┼──────────────────┤
+ * │ Header
│ Body │
+ * │
│ │
* </pre>
* Transfer Header protocol Format:
- * current state + body size + offset + epoch + epochStartOffset +
additionalInfo(confirmOffset)
+ * current state + body size + offset + epoch + epochStartOffset +
additionalInfo(confirmOffset)
*/
public static final int TRANSFER_HEADER_SIZE = HANDSHAKE_HEADER_SIZE + 8 +
8;
public static final int EPOCH_ENTRY_SIZE = 12;