This is an automated email from the ASF dual-hosted git repository.
jinrongtong pushed a commit to branch 5.0.0-beta-dledger-controller
in repository https://gitbox.apache.org/repos/asf/rocketmq.git
The following commit(s) were added to refs/heads/5.0.0-beta-dledger-controller
by this push:
new 25ee6eb32 [Summer of code] Support async learner in controller mode
(#4367)
25ee6eb32 is described below
commit 25ee6eb322da4a5c901d60e09b7e255ff0e392d7
Author: hzh0425 <[email protected]>
AuthorDate: Wed May 25 21:41:20 2022 +0800
[Summer of code] Support async learner in controller mode (#4367)
* merge branch support_async_learner
* use isSlave() to replace BrokerRole == Slave
* mark asyncLearner
* code review
* Revert "use isSlave() to replace BrokerRole == Slave"
This reverts commit 6599f97f44ece74e6a2e4cbbfc062c31c96237ec.
* review
* remove asyncLeaner role
* code review
* code review
---
.../rocketmq/store/config/MessageStoreConfig.java | 10 ++++
.../store/ha/autoswitch/AutoSwitchHAClient.java | 33 ++++++-------
.../ha/autoswitch/AutoSwitchHAConnection.java | 54 +++++++++++++---------
.../store/ha/autoswitch/AutoSwitchHATest.java | 28 +++++++++++
4 files changed, 85 insertions(+), 40 deletions(-)
diff --git
a/store/src/main/java/org/apache/rocketmq/store/config/MessageStoreConfig.java
b/store/src/main/java/org/apache/rocketmq/store/config/MessageStoreConfig.java
index 310865805..80ac3f79d 100644
---
a/store/src/main/java/org/apache/rocketmq/store/config/MessageStoreConfig.java
+++
b/store/src/main/java/org/apache/rocketmq/store/config/MessageStoreConfig.java
@@ -306,6 +306,8 @@ public class MessageStoreConfig {
*/
private boolean syncFromLastFile = false;
+ private boolean isAsyncLearner = false;
+
public boolean isDebugLockEnable() {
return debugLockEnable;
}
@@ -1322,4 +1324,12 @@ public class MessageStoreConfig {
public void setScheduleAsyncDeliverMaxResendNum2Blocked(int
scheduleAsyncDeliverMaxResendNum2Blocked) {
this.scheduleAsyncDeliverMaxResendNum2Blocked =
scheduleAsyncDeliverMaxResendNum2Blocked;
}
+
+ public boolean isAsyncLearner() {
+ return isAsyncLearner;
+ }
+
+ public void setAsyncLearner(boolean asyncLearner) {
+ this.isAsyncLearner = asyncLearner;
+ }
}
diff --git
a/store/src/main/java/org/apache/rocketmq/store/ha/autoswitch/AutoSwitchHAClient.java
b/store/src/main/java/org/apache/rocketmq/store/ha/autoswitch/AutoSwitchHAClient.java
index 81d2ecf17..f784d3873 100644
---
a/store/src/main/java/org/apache/rocketmq/store/ha/autoswitch/AutoSwitchHAClient.java
+++
b/store/src/main/java/org/apache/rocketmq/store/ha/autoswitch/AutoSwitchHAClient.java
@@ -44,9 +44,10 @@ import org.apache.rocketmq.store.ha.io.HAWriter;
public class AutoSwitchHAClient extends ServiceThread implements HAClient {
/**
- * Handshake header buffer size. Schema: state ordinal +
flag(isSyncFromLastFile) + slaveId + slaveAddressLength.
+ * Handshake header buffer size. Schema: state ordinal + Two flags +
slaveAddressLength
+ * Flag: isSyncFromLastFile(short), isAsyncLearner(short)... we can add
more flags in the future if needed
*/
- public static final int HANDSHAKE_HEADER_SIZE = 4 + 4 + 8 + 4;
+ public static final int HANDSHAKE_HEADER_SIZE = 4 + 4 + 4;
/**
* Header + slaveAddress.
@@ -97,10 +98,6 @@ public class AutoSwitchHAClient extends ServiceThread
implements HAClient {
*/
private volatile long confirmOffset;
- public static final int SYNC_FROM_LAST_FILE = -1;
-
- public static final int SYNC_FROM_FIRST_FILE = -2;
-
public AutoSwitchHAClient(AutoSwitchHAService haService,
DefaultMessageStore defaultMessageStore,
EpochFileCache epochCache) throws IOException {
this.haService = haService;
@@ -256,13 +253,11 @@ public class AutoSwitchHAClient extends ServiceThread
implements HAClient {
// Original state
this.handshakeHeaderBuffer.putInt(HAConnectionState.HANDSHAKE.ordinal());
// IsSyncFromLastFile
- if
(this.haService.getDefaultMessageStore().getMessageStoreConfig().isSyncFromLastFile())
{
- this.handshakeHeaderBuffer.putInt(SYNC_FROM_LAST_FILE);
- } else {
- this.handshakeHeaderBuffer.putInt(SYNC_FROM_FIRST_FILE);
- }
- // Slave Id
- this.handshakeHeaderBuffer.putLong(this.slaveId.get());
+ short isSyncFromLastFile =
this.haService.getDefaultMessageStore().getMessageStoreConfig().isSyncFromLastFile()
? (short)1 : (short) 0;
+ this.handshakeHeaderBuffer.putShort(isSyncFromLastFile);
+ // IsAsyncLearner role
+ short isAsyncLearner =
this.haService.getDefaultMessageStore().getMessageStoreConfig().isAsyncLearner()
? (short)1 : (short) 0;
+ this.handshakeHeaderBuffer.putShort(isAsyncLearner);
// Address length
this.handshakeHeaderBuffer.putInt(this.localAddress == null ? 0 :
this.localAddress.length());
// Slave address
@@ -443,12 +438,12 @@ public class AutoSwitchHAClient extends ServiceThread
implements HAClient {
int diff = byteBufferRead.position() -
AutoSwitchHAClient.this.processPosition;
if (diff >= AutoSwitchHAConnection.MSG_HEADER_SIZE) {
int processPosition =
AutoSwitchHAClient.this.processPosition;
- int masterState = byteBufferRead.getInt(processPosition);
- int bodySize = byteBufferRead.getInt(processPosition + 4);
- long masterOffset = byteBufferRead.getLong(processPosition
+ 4 + 4);
- int masterEpoch = byteBufferRead.getInt(processPosition +
4 + 4 + 8);
- long masterEpochStartOffset =
byteBufferRead.getLong(processPosition + 4 + 4 + 8 + 4);
- long confirmOffset =
byteBufferRead.getLong(processPosition + 4 + 4 + 8 + 4 + 8);
+ int masterState = byteBufferRead.getInt(processPosition +
AutoSwitchHAConnection.MSG_HEADER_SIZE - 36);
+ int bodySize = byteBufferRead.getInt(processPosition +
AutoSwitchHAConnection.MSG_HEADER_SIZE - 32);
+ long masterOffset = byteBufferRead.getLong(processPosition
+ AutoSwitchHAConnection.MSG_HEADER_SIZE - 28);
+ int masterEpoch = byteBufferRead.getInt(processPosition +
AutoSwitchHAConnection.MSG_HEADER_SIZE - 20);
+ long masterEpochStartOffset =
byteBufferRead.getLong(processPosition + AutoSwitchHAConnection.MSG_HEADER_SIZE
- 16);
+ long confirmOffset =
byteBufferRead.getLong(processPosition + AutoSwitchHAConnection.MSG_HEADER_SIZE
- 8);
if (masterState !=
AutoSwitchHAClient.this.currentState.ordinal()) {
AutoSwitchHAClient.this.processPosition +=
AutoSwitchHAConnection.MSG_HEADER_SIZE + bodySize;
diff --git
a/store/src/main/java/org/apache/rocketmq/store/ha/autoswitch/AutoSwitchHAConnection.java
b/store/src/main/java/org/apache/rocketmq/store/ha/autoswitch/AutoSwitchHAConnection.java
index f29ff283e..ef03dc583 100644
---
a/store/src/main/java/org/apache/rocketmq/store/ha/autoswitch/AutoSwitchHAConnection.java
+++
b/store/src/main/java/org/apache/rocketmq/store/ha/autoswitch/AutoSwitchHAConnection.java
@@ -64,6 +64,7 @@ public class AutoSwitchHAConnection implements HAConnection {
private volatile int currentTransferEpoch = -1;
private volatile long currentTransferEpochEndOffset = 0;
private volatile boolean isSyncFromLastFile = false;
+ private volatile boolean isAsyncLearner = false;
private volatile long slaveId = -1;
private volatile String slaveAddress;
@@ -172,13 +173,21 @@ public class AutoSwitchHAConnection implements
HAConnection {
}
}
+ public boolean isAsyncLearner() {
+ return isAsyncLearner;
+ }
+
+ public boolean isSyncFromLastFile() {
+ return isSyncFromLastFile;
+ }
+
private synchronized void updateLastTransferInfo() {
this.lastMasterMaxOffset =
this.haService.getDefaultMessageStore().getMaxPhyOffset();
this.lastTransferTimeMs = System.currentTimeMillis();
}
private synchronized void maybeExpandInSyncStateSet(long slaveMaxOffset) {
- if (slaveMaxOffset >= this.lastMasterMaxOffset) {
+ if (!this.isAsyncLearner && slaveMaxOffset >=
this.lastMasterMaxOffset) {
this.lastCatchUpTimeMs = Math.max(this.lastTransferTimeMs,
this.lastCatchUpTimeMs);
this.haService.maybeExpandInSyncStateSet(this.slaveAddress,
slaveMaxOffset);
}
@@ -277,30 +286,33 @@ public class AutoSwitchHAConnection implements
HAConnection {
switch (slaveState) {
case HANDSHAKE:
if (diff >=
AutoSwitchHAClient.HANDSHAKE_HEADER_SIZE) {
- isSlaveSendHandshake = true;
- // Flag(SyncFromLastFile)
- long syncFromLastFileFlag =
byteBufferRead.getInt(readPosition + 4);
- if (syncFromLastFileFlag ==
AutoSwitchHAClient.SYNC_FROM_LAST_FILE) {
+ // AddressLength
+ int addressLength =
byteBufferRead.getInt(readPosition + AutoSwitchHAClient.HANDSHAKE_HEADER_SIZE -
4);
+ if (diff <
AutoSwitchHAClient.HANDSHAKE_HEADER_SIZE + addressLength) {
+ break;
+ }
+ // Flag(isSyncFromLastFile)
+ short syncFromLastFileFlag =
byteBufferRead.getShort(readPosition + AutoSwitchHAClient.HANDSHAKE_HEADER_SIZE
- 8);
+ if (syncFromLastFileFlag == 1) {
AutoSwitchHAConnection.this.isSyncFromLastFile = true;
- LOGGER.info("Slave request sync from
lastFile");
}
- // SlaveId
- AutoSwitchHAConnection.this.slaveId =
byteBufferRead.getLong(readPosition + 8);
- // AddressLength
- int addressLength =
byteBufferRead.getInt(readPosition + 16);
- // Address
- if (diff >=
AutoSwitchHAClient.HANDSHAKE_HEADER_SIZE + addressLength) {
- final byte[] addressData = new
byte[addressLength];
- byteBufferRead.position(readPosition +
20);
- byteBufferRead.get(addressData);
-
AutoSwitchHAConnection.this.slaveAddress = new String(addressData);
- } else {
-
AutoSwitchHAConnection.this.slaveAddress = "";
+ // Flag(isAsyncLearner role)
+ short isAsyncLearner =
byteBufferRead.getShort(readPosition + AutoSwitchHAClient.HANDSHAKE_HEADER_SIZE
- 6);
+ if (isAsyncLearner == 1) {
+
AutoSwitchHAConnection.this.isAsyncLearner = true;
}
- LOGGER.info("Receive slave handshake,
syncFromLastFile:{}, slaveId:{}, slaveAddress:{}",
-
AutoSwitchHAConnection.this.isSyncFromLastFile,
AutoSwitchHAConnection.this.slaveId, AutoSwitchHAConnection.this.slaveAddress);
+ // Address
+ final byte[] addressData = new
byte[addressLength];
+ byteBufferRead.position(readPosition +
AutoSwitchHAClient.HANDSHAKE_HEADER_SIZE);
+ byteBufferRead.get(addressData);
+ AutoSwitchHAConnection.this.slaveAddress =
new String(addressData);
+
+ isSlaveSendHandshake = true;
byteBufferRead.position(readSocketPos);
ReadSocketService.this.processPosition +=
AutoSwitchHAClient.HANDSHAKE_HEADER_SIZE + addressLength;
+ LOGGER.info("Receive slave handshake,
slaveId:{}, slaveAddress:{}, isSyncFromLastFile:{}, isAsyncLearner:{}",
+ AutoSwitchHAConnection.this.slaveId,
AutoSwitchHAConnection.this.slaveAddress,
+
AutoSwitchHAConnection.this.isSyncFromLastFile,
AutoSwitchHAConnection.this.isAsyncLearner);
}
break;
case TRANSFER:
@@ -312,10 +324,10 @@ public class AutoSwitchHAConnection implements
HAConnection {
if (slaveRequestOffset < 0) {
slaveRequestOffset = slaveMaxOffset;
}
- LOGGER.info("slave[" + clientAddress + "]
request offset " + slaveMaxOffset);
byteBufferRead.position(readSocketPos);
maybeExpandInSyncStateSet(slaveMaxOffset);
AutoSwitchHAConnection.this.haService.notifyTransferSome(AutoSwitchHAConnection.this.slaveAckOffset);
+ LOGGER.info("slave[" + clientAddress + "]
request offset " + slaveMaxOffset);
}
break;
default:
diff --git
a/store/src/test/java/org/apache/rocketmq/store/ha/autoswitch/AutoSwitchHATest.java
b/store/src/test/java/org/apache/rocketmq/store/ha/autoswitch/AutoSwitchHATest.java
index 777a62036..8e8f98db0 100644
---
a/store/src/test/java/org/apache/rocketmq/store/ha/autoswitch/AutoSwitchHATest.java
+++
b/store/src/test/java/org/apache/rocketmq/store/ha/autoswitch/AutoSwitchHATest.java
@@ -45,6 +45,7 @@ import org.junit.Test;
import static org.assertj.core.api.Assertions.assertThat;
import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
public class AutoSwitchHATest {
@@ -184,6 +185,33 @@ public class AutoSwitchHATest {
}
}
+ @Test
+ public void testAsyncLearnerBrokerRole() throws Exception {
+ init(defaultMappedFileSize);
+ ((AutoSwitchHAService)
this.messageStore1.getHaService()).setLocalAddress("127.0.0.1:8000");
+ ((AutoSwitchHAService)
this.messageStore2.getHaService()).setLocalAddress("127.0.0.1:8001");
+
+ storeConfig1.setBrokerRole(BrokerRole.SYNC_MASTER);
+ storeConfig2.setBrokerRole(BrokerRole.SLAVE);
+ storeConfig2.setAsyncLearner(true);
+ messageStore1.getHaService().changeToMaster(1);
+ messageStore2.getHaService().changeToSlave("", 1, 2L);
+ messageStore2.getHaService().updateHaMasterAddress(store1HaAddress);
+ Thread.sleep(6000);
+
+ // Put message on master
+ for (int i = 0; i < 10; i++) {
+ messageStore1.putMessage(buildMessage());
+ }
+ Thread.sleep(200);
+
+ checkMessage(messageStore2, 10, 0);
+
+ Thread.sleep(1000);
+ final Set<String> syncStateSet = ((AutoSwitchHAService)
this.messageStore1.getHaService()).getSyncStateSet();
+ assertFalse(syncStateSet.contains("127.0.0.1:8001"));
+ }
+
@Test
public void testOptionAllAckInSyncStateSet() throws Exception {
init(defaultMappedFileSize, true);