This is an automated email from the ASF dual-hosted git repository.
jinrongtong pushed a commit to branch 5.0.0-beta-dledger-controller
in repository https://gitbox.apache.org/repos/asf/rocketmq.git
The following commit(s) were added to refs/heads/5.0.0-beta-dledger-controller
by this push:
new f5c5f5c4f [Summer of code] Shrink and expand InSyncStateSet (#4355)
f5c5f5c4f is described below
commit f5c5f5c4f0ac9fe4bfbea0cf7a94c7037abdf716
Author: hzh0425 <[email protected]>
AuthorDate: Tue May 24 18:12:45 2022 +0800
[Summer of code] Shrink and expand InSyncStateSet (#4355)
* feature: add lastCatchupTime ms and expandInSyncStateSet
* code review
* add shrink and expand inSyncStateSet in AutoSwitchHAService
* add option allAckInSyncStateSet
* let replicasManager use AutoSwitchHAService's expand and shrink
inSyncStateSet api.
* fix bug
* use CopyOnWriteArraySet to replace lock
* code review
* code review
* code review
* code review
---
.../broker/hacontroller/ReplicasManager.java | 27 ++--
.../java/org/apache/rocketmq/store/CommitLog.java | 19 ++-
.../rocketmq/store/config/MessageStoreConfig.java | 27 ++++
.../apache/rocketmq/store/ha/DefaultHAService.java | 7 -
.../rocketmq/store/ha/GroupTransferService.java | 45 ++++--
.../org/apache/rocketmq/store/ha/HAService.java | 12 --
.../ha/autoswitch/AutoSwitchHAConnection.java | 35 ++++-
.../store/ha/autoswitch/AutoSwitchHAService.java | 166 ++++++++++++++-------
.../store/ha/autoswitch/AutoSwitchHATest.java | 86 ++++++++++-
.../ha/{ => autoswitch}/EpochFileCacheTest.java | 3 +-
10 files changed, 320 insertions(+), 107 deletions(-)
diff --git
a/broker/src/main/java/org/apache/rocketmq/broker/hacontroller/ReplicasManager.java
b/broker/src/main/java/org/apache/rocketmq/broker/hacontroller/ReplicasManager.java
index b59995ae2..41addf37c 100644
---
a/broker/src/main/java/org/apache/rocketmq/broker/hacontroller/ReplicasManager.java
+++
b/broker/src/main/java/org/apache/rocketmq/broker/hacontroller/ReplicasManager.java
@@ -134,6 +134,9 @@ public class ReplicasManager {
}
schedulingSyncBrokerMetadata();
+
+ // Register syncStateSet changed listener.
+
this.haService.registerSyncStateSetChangedListener(this::doReportSyncStateSetChanged);
return true;
}
@@ -346,7 +349,7 @@ public class ReplicasManager {
*/
private void schedulingCheckSyncStateSet() {
this.checkSyncStateSetTaskFuture =
this.scheduledService.scheduleAtFixedRate(() -> {
- final Set<String> newSyncStateSet =
this.haService.getLatestSyncStateSet();
+ final Set<String> newSyncStateSet =
this.haService.maybeShrinkInSyncStateSet();
newSyncStateSet.add(this.localAddress);
synchronized (this) {
if (this.syncStateSet != null) {
@@ -356,18 +359,22 @@ public class ReplicasManager {
}
}
}
- try {
- final SyncStateSet result =
this.brokerOuterAPI.alterSyncStateSet(this.controllerLeaderAddress,
this.brokerConfig.getBrokerName(), this.masterAddress, this.masterEpoch,
newSyncStateSet, this.syncStateSetEpoch);
- if (result != null) {
- changeSyncStateSet(result.getSyncStateSet(),
result.getSyncStateSetEpoch());
- }
- } catch (final Exception e) {
- LOGGER.error("Error happen when change sync state set,
broker:{}, masterAddress:{}, masterEpoch:{}, oldSyncStateSet:{},
newSyncStateSet:{}, syncStateSetEpoch:{}",
- this.brokerConfig.getBrokerName(), this.masterAddress,
this.masterEpoch, this.syncStateSet, newSyncStateSet, this.syncStateSetEpoch,
e);
- }
+ doReportSyncStateSetChanged(newSyncStateSet);
}, 3 * 1000,
this.brokerConfig.getReplicasManagerCheckSyncStateSetPeriod(),
TimeUnit.MILLISECONDS);
}
+ private void doReportSyncStateSetChanged(Set<String> newSyncStateSet) {
+ try {
+ final SyncStateSet result =
this.brokerOuterAPI.alterSyncStateSet(this.controllerLeaderAddress,
this.brokerConfig.getBrokerName(), this.masterAddress, this.masterEpoch,
newSyncStateSet, this.syncStateSetEpoch);
+ if (result != null) {
+ changeSyncStateSet(result.getSyncStateSet(),
result.getSyncStateSetEpoch());
+ }
+ } catch (final Exception e) {
+ LOGGER.error("Error happen when change sync state set, broker:{},
masterAddress:{}, masterEpoch:{}, oldSyncStateSet:{}, newSyncStateSet:{},
syncStateSetEpoch:{}",
+ this.brokerConfig.getBrokerName(), this.masterAddress,
this.masterEpoch, this.syncStateSet, newSyncStateSet, this.syncStateSetEpoch,
e);
+ }
+ }
+
private void stopCheckSyncStateSet() {
if (this.checkSyncStateSetTaskFuture != null) {
this.checkSyncStateSetTaskFuture.cancel(false);
diff --git a/store/src/main/java/org/apache/rocketmq/store/CommitLog.java
b/store/src/main/java/org/apache/rocketmq/store/CommitLog.java
index be013e9d4..5a96b5345 100644
--- a/store/src/main/java/org/apache/rocketmq/store/CommitLog.java
+++ b/store/src/main/java/org/apache/rocketmq/store/CommitLog.java
@@ -805,7 +805,7 @@ public class CommitLog implements Swappable {
boolean needHandleHA = needHandleHA(msg);
int needAckNums = 1;
- if (needHandleHA) {
+ if (needHandleHA &&
!this.defaultMessageStore.getMessageStoreConfig().isAllAckInSyncStateSet()) {
int inSyncReplicas =
Math.min(this.defaultMessageStore.getAliveReplicaNumInGroup(),
this.defaultMessageStore.getHaService().inSyncSlaveNums(currOffset) + 1);
needAckNums = calcNeedAckNums(inSyncReplicas);
@@ -952,7 +952,7 @@ public class CommitLog implements Swappable {
int needAckNums = 1;
boolean needHandleHA = needHandleHA(messageExtBatch);
- if (needHandleHA) {
+ if (needHandleHA &&
!this.defaultMessageStore.getMessageStoreConfig().isAllAckInSyncStateSet()) {
int inSyncReplicas =
Math.min(this.defaultMessageStore.getAliveReplicaNumInGroup(),
this.defaultMessageStore.getHaService().inSyncSlaveNums(currOffset) + 1);
needAckNums = calcNeedAckNums(inSyncReplicas);
@@ -1103,7 +1103,8 @@ public class CommitLog implements Swappable {
private CompletableFuture<PutMessageStatus> handleHA(AppendMessageResult
result, PutMessageResult putMessageResult,
int needAckNums) {
- if (needAckNums <= 1) {
+ final boolean allAckInSyncStateSet =
this.defaultMessageStore.getMessageStoreConfig().isAllAckInSyncStateSet();
+ if (needAckNums <= 1 && !allAckInSyncStateSet) {
return CompletableFuture.completedFuture(PutMessageStatus.PUT_OK);
}
@@ -1123,7 +1124,7 @@ public class CommitLog implements Swappable {
// }
// Wait enough acks from different slaves
- GroupCommitRequest request = new GroupCommitRequest(nextOffset,
this.defaultMessageStore.getMessageStoreConfig().getSlaveTimeout(), needAckNums
- 1);
+ GroupCommitRequest request = new GroupCommitRequest(nextOffset,
this.defaultMessageStore.getMessageStoreConfig().getSlaveTimeout(), needAckNums
- 1, allAckInSyncStateSet);
haService.putRequest(request);
haService.getWaitNotifyObject().wakeupAll();
return request.future();
@@ -1390,6 +1391,7 @@ public class CommitLog implements Swappable {
private final CompletableFuture<PutMessageStatus> flushOKFuture = new
CompletableFuture<>();
private volatile int ackNums = 1;
private final long deadLine;
+ private boolean allAckInSyncStateSet;
public GroupCommitRequest(long nextOffset, long timeoutMillis) {
this.nextOffset = nextOffset;
@@ -1401,6 +1403,11 @@ public class CommitLog implements Swappable {
this.ackNums = ackNums;
}
+ public GroupCommitRequest(long nextOffset, long timeoutMillis, int
ackNums, boolean allAckInSyncStateSet) {
+ this(nextOffset, timeoutMillis, ackNums);
+ this.allAckInSyncStateSet = allAckInSyncStateSet;
+ }
+
public long getNextOffset() {
return nextOffset;
}
@@ -1413,6 +1420,10 @@ public class CommitLog implements Swappable {
return deadLine;
}
+ public boolean isAllAckInSyncStateSet() {
+ return allAckInSyncStateSet;
+ }
+
public void wakeupCustomer(final PutMessageStatus status) {
this.flushOKFuture.complete(status);
}
diff --git
a/store/src/main/java/org/apache/rocketmq/store/config/MessageStoreConfig.java
b/store/src/main/java/org/apache/rocketmq/store/config/MessageStoreConfig.java
index a182e07cf..310865805 100644
---
a/store/src/main/java/org/apache/rocketmq/store/config/MessageStoreConfig.java
+++
b/store/src/main/java/org/apache/rocketmq/store/config/MessageStoreConfig.java
@@ -246,6 +246,12 @@ public class MessageStoreConfig {
@ImportantField
private int minInSyncReplicas = 1;
+ /**
+ * Each message must be written successfully to all replicas in
InSyncStateSet.
+ */
+ @ImportantField
+ private boolean allAckInSyncStateSet = false;
+
/**
* Dynamically adjust in-sync replicas to provide higher availability, the
real time in-sync replicas
* will smaller than inSyncReplicas config.
@@ -264,6 +270,11 @@ public class MessageStoreConfig {
*/
private long maxHaTransferByteInSecond = 100 * 1024 * 1024;
+ /**
+ * The max gap time that slave doesn't catch up to master.
+ */
+ private long haMaxTimeSlaveNotCatchup = 1000 * 15;
+
/**
* Sync flush offset from master when broker startup, used in upgrading
from old version broker.
*/
@@ -1160,6 +1171,14 @@ public class MessageStoreConfig {
this.minInSyncReplicas = minInSyncReplicas;
}
+ public boolean isAllAckInSyncStateSet() {
+ return allAckInSyncStateSet;
+ }
+
+ public void setAllAckInSyncStateSet(boolean allAckInSyncStateSet) {
+ this.allAckInSyncStateSet = allAckInSyncStateSet;
+ }
+
public boolean isEnableAutoInSyncReplicas() {
return enableAutoInSyncReplicas;
}
@@ -1184,6 +1203,14 @@ public class MessageStoreConfig {
this.maxHaTransferByteInSecond = maxHaTransferByteInSecond;
}
+ public long getHaMaxTimeSlaveNotCatchup() {
+ return haMaxTimeSlaveNotCatchup;
+ }
+
+ public void setHaMaxTimeSlaveNotCatchup(long haMaxTimeSlaveNotCatchup) {
+ this.haMaxTimeSlaveNotCatchup = haMaxTimeSlaveNotCatchup;
+ }
+
public boolean isSyncMasterFlushOffsetWhenStartup() {
return syncMasterFlushOffsetWhenStartup;
}
diff --git
a/store/src/main/java/org/apache/rocketmq/store/ha/DefaultHAService.java
b/store/src/main/java/org/apache/rocketmq/store/ha/DefaultHAService.java
index 3c346ff93..ea27288be 100644
--- a/store/src/main/java/org/apache/rocketmq/store/ha/DefaultHAService.java
+++ b/store/src/main/java/org/apache/rocketmq/store/ha/DefaultHAService.java
@@ -156,13 +156,6 @@ public class DefaultHAService implements HAService {
return false;
}
- @Override public Set<String> getLatestSyncStateSet() {
- return null;
- }
-
- @Override public void setSyncStateSet(Set<String> syncStateSet) {
- }
-
public void destroyConnections() {
synchronized (this.connectionList) {
for (HAConnection c : this.connectionList) {
diff --git
a/store/src/main/java/org/apache/rocketmq/store/ha/GroupTransferService.java
b/store/src/main/java/org/apache/rocketmq/store/ha/GroupTransferService.java
index dc41279b5..67ada60dd 100644
--- a/store/src/main/java/org/apache/rocketmq/store/ha/GroupTransferService.java
+++ b/store/src/main/java/org/apache/rocketmq/store/ha/GroupTransferService.java
@@ -19,6 +19,7 @@ package org.apache.rocketmq.store.ha;
import java.util.ArrayList;
import java.util.List;
+import java.util.Set;
import org.apache.rocketmq.common.ServiceThread;
import org.apache.rocketmq.common.constant.LoggerName;
import org.apache.rocketmq.logging.InternalLogger;
@@ -26,6 +27,8 @@ import org.apache.rocketmq.logging.InternalLoggerFactory;
import org.apache.rocketmq.store.CommitLog;
import org.apache.rocketmq.store.DefaultMessageStore;
import org.apache.rocketmq.store.PutMessageStatus;
+import org.apache.rocketmq.store.ha.autoswitch.AutoSwitchHAConnection;
+import org.apache.rocketmq.store.ha.autoswitch.AutoSwitchHAService;
/**
* GroupTransferService Service
@@ -71,28 +74,52 @@ public class GroupTransferService extends ServiceThread {
boolean transferOK = false;
long deadLine = req.getDeadLine();
+ final boolean allAckInSyncStateSet =
req.isAllAckInSyncStateSet();
for (int i = 0; !transferOK && deadLine -
System.nanoTime() > 0; i++) {
if (i > 0) {
this.notifyTransferObject.waitForRunning(1000);
}
- if (req.getAckNums() <= 1) {
+ if (req.getAckNums() <= 1 && !allAckInSyncStateSet) {
transferOK =
haService.getPush2SlaveMaxOffset().get() >= req.getNextOffset();
continue;
}
- int ackNums = 0;
- for (HAConnection conn :
haService.getConnectionList()) {
- // TODO: We must ensure every HAConnection
represents a different slave
- // Solution: Consider assign a unique and fixed
IP:ADDR for each different slave
- if (conn.getSlaveAckOffset() >=
req.getNextOffset()) {
- ackNums++;
- }
- if (ackNums >= req.getAckNums()) {
+ if (allAckInSyncStateSet && this.haService instanceof
AutoSwitchHAService) {
+ // In this mode, we must wait for all replicas
that in InSyncStateSet.
+ final AutoSwitchHAService autoSwitchHAService =
(AutoSwitchHAService) this.haService;
+ final Set<String> syncStateSet =
autoSwitchHAService.getSyncStateSet();
+ if (syncStateSet.size() <= 1) {
+ // Only master
transferOK = true;
break;
}
+ // Include master.
+ int ackNums = 1;
+ for (HAConnection conn :
haService.getConnectionList()) {
+ final AutoSwitchHAConnection
autoSwitchHAConnection = (AutoSwitchHAConnection) conn;
+ if
(syncStateSet.contains(autoSwitchHAConnection.getClientAddress()) &&
autoSwitchHAConnection.getSlaveAckOffset() >= req.getNextOffset()) {
+ ackNums ++;
+ }
+ if (ackNums >= syncStateSet.size()) {
+ transferOK = true;
+ break;
+ }
+ }
+ } else {
+ int ackNums = 0;
+ for (HAConnection conn :
haService.getConnectionList()) {
+ // TODO: We must ensure every HAConnection
represents a different slave
+ // Solution: Consider assign a unique and
fixed IP:ADDR for each different slave
+ if (conn.getSlaveAckOffset() >=
req.getNextOffset()) {
+ ackNums++;
+ }
+ if (ackNums >= req.getAckNums()) {
+ transferOK = true;
+ break;
+ }
+ }
}
}
diff --git a/store/src/main/java/org/apache/rocketmq/store/ha/HAService.java
b/store/src/main/java/org/apache/rocketmq/store/ha/HAService.java
index b5dbd1fac..4006ff774 100644
--- a/store/src/main/java/org/apache/rocketmq/store/ha/HAService.java
+++ b/store/src/main/java/org/apache/rocketmq/store/ha/HAService.java
@@ -19,7 +19,6 @@ package org.apache.rocketmq.store.ha;
import java.io.IOException;
import java.util.List;
-import java.util.Set;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.rocketmq.common.protocol.body.HARuntimeInfo;
@@ -62,17 +61,6 @@ public interface HAService {
*/
boolean changeToSlave(String newMasterAddr, int newMasterEpoch, Long
slaveId);
- /**
- * Get the latest sync state set.
- * @return new syncStateSet
- */
- Set<String> getLatestSyncStateSet();
-
- /**
- * Set sync state set
- */
- void setSyncStateSet(Set<String> syncStateSet);
-
/**
* Update master address
*
diff --git
a/store/src/main/java/org/apache/rocketmq/store/ha/autoswitch/AutoSwitchHAConnection.java
b/store/src/main/java/org/apache/rocketmq/store/ha/autoswitch/AutoSwitchHAConnection.java
index c5993684b..f29ff283e 100644
---
a/store/src/main/java/org/apache/rocketmq/store/ha/autoswitch/AutoSwitchHAConnection.java
+++
b/store/src/main/java/org/apache/rocketmq/store/ha/autoswitch/AutoSwitchHAConnection.java
@@ -52,6 +52,8 @@ public class AutoSwitchHAConnection implements HAConnection {
private final EpochFileCache epochCache;
private final AbstractWriteSocketService writeSocketService;
private final ReadSocketService readSocketService;
+ private final FlowMonitor flowMonitor;
+
private volatile HAConnectionState currentState =
HAConnectionState.HANDSHAKE;
private volatile long slaveRequestOffset = -1;
private volatile long slaveAckOffset = -1;
@@ -65,7 +67,18 @@ public class AutoSwitchHAConnection implements HAConnection {
private volatile long slaveId = -1;
private volatile String slaveAddress;
- private final FlowMonitor flowMonitor;
+ /**
+ * Last endOffset when master transfer data to slave
+ */
+ private volatile long lastMasterMaxOffset = -1;
+ /**
+ * Last time ms when transfer data to slave.
+ */
+ private volatile long lastTransferTimeMs = 0;
+ /**
+ * Last catchup time ms when slaveAckOffset >= lastMasterMaxOffset.
+ */
+ private volatile long lastCatchUpTimeMs = 0;
public AutoSwitchHAConnection(AutoSwitchHAService haService, SocketChannel
socketChannel,
EpochFileCache epochCache) throws IOException {
@@ -122,6 +135,10 @@ public class AutoSwitchHAConnection implements
HAConnection {
return slaveAddress;
}
+ public long getLastCatchUpTimeMs() {
+ return lastCatchUpTimeMs;
+ }
+
@Override public HAConnectionState getCurrentState() {
return currentState;
}
@@ -155,6 +172,18 @@ public class AutoSwitchHAConnection implements
HAConnection {
}
}
+ private synchronized void updateLastTransferInfo() {
+ this.lastMasterMaxOffset =
this.haService.getDefaultMessageStore().getMaxPhyOffset();
+ this.lastTransferTimeMs = System.currentTimeMillis();
+ }
+
+ private synchronized void maybeExpandInSyncStateSet(long slaveMaxOffset) {
+ if (slaveMaxOffset >= this.lastMasterMaxOffset) {
+ this.lastCatchUpTimeMs = Math.max(this.lastTransferTimeMs,
this.lastCatchUpTimeMs);
+ this.haService.maybeExpandInSyncStateSet(this.slaveAddress,
slaveMaxOffset);
+ }
+ }
+
class ReadSocketService extends ServiceThread {
private static final int READ_MAX_BUFFER_SIZE = 1024 * 1024;
private final Selector selector;
@@ -279,12 +308,13 @@ public class AutoSwitchHAConnection implements
HAConnection {
long slaveMaxOffset =
byteBufferRead.getLong(readPosition + 4);
ReadSocketService.this.processPosition +=
AutoSwitchHAClient.TRANSFER_HEADER_SIZE;
- slaveAckOffset = slaveMaxOffset;
+ AutoSwitchHAConnection.this.slaveAckOffset
= slaveMaxOffset;
if (slaveRequestOffset < 0) {
slaveRequestOffset = slaveMaxOffset;
}
LOGGER.info("slave[" + clientAddress + "]
request offset " + slaveMaxOffset);
byteBufferRead.position(readSocketPos);
+ maybeExpandInSyncStateSet(slaveMaxOffset);
AutoSwitchHAConnection.this.haService.notifyTransferSome(AutoSwitchHAConnection.this.slaveAckOffset);
}
break;
@@ -547,6 +577,7 @@ public class AutoSwitchHAConnection implements HAConnection
{
this.transferOffset = this.nextTransferFromWhere;
this.nextTransferFromWhere += size;
+ updateLastTransferInfo();
// Build Header
buildTransferHeaderBuffer(this.transferOffset, size);
diff --git
a/store/src/main/java/org/apache/rocketmq/store/ha/autoswitch/AutoSwitchHAService.java
b/store/src/main/java/org/apache/rocketmq/store/ha/autoswitch/AutoSwitchHAService.java
index c5768e1b0..98b480d91 100644
---
a/store/src/main/java/org/apache/rocketmq/store/ha/autoswitch/AutoSwitchHAService.java
+++
b/store/src/main/java/org/apache/rocketmq/store/ha/autoswitch/AutoSwitchHAService.java
@@ -19,10 +19,16 @@ package org.apache.rocketmq.store.ha.autoswitch;
import java.io.IOException;
import java.nio.channels.SocketChannel;
+import java.util.ArrayList;
import java.util.HashSet;
+import java.util.List;
import java.util.Set;
-import org.apache.commons.lang3.StringUtils;
+import java.util.concurrent.CopyOnWriteArraySet;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.function.Consumer;
import org.apache.rocketmq.common.EpochEntry;
+import org.apache.rocketmq.common.ThreadFactoryImpl;
import org.apache.rocketmq.common.constant.LoggerName;
import org.apache.rocketmq.logging.InternalLogger;
import org.apache.rocketmq.logging.InternalLoggerFactory;
@@ -41,21 +47,22 @@ import
org.apache.rocketmq.store.ha.HAConnectionStateNotificationService;
*/
public class AutoSwitchHAService extends DefaultHAService {
private static final InternalLogger LOGGER =
InternalLoggerFactory.getLogger(LoggerName.STORE_LOGGER_NAME);
+ private final ExecutorService executorService =
Executors.newSingleThreadScheduledExecutor(new
ThreadFactoryImpl("AutoSwitchHAService_Executor_"));
+ private final List<Consumer<Set<String>>> syncStateSetChangedListeners =
new ArrayList<>();
+ private final CopyOnWriteArraySet<String> syncStateSet = new
CopyOnWriteArraySet<>();
+ private String localAddress;
+
private EpochFileCache epochCache;
private AutoSwitchHAClient haClient;
- private volatile Set<String> syncStateSet;
- private String localAddress;
public AutoSwitchHAService() {
}
- @Override
- public void init(final DefaultMessageStore defaultMessageStore) throws
IOException {
+ @Override public void init(final DefaultMessageStore defaultMessageStore)
throws IOException {
this.epochCache = new
EpochFileCache(defaultMessageStore.getMessageStoreConfig().getStorePathEpochFile());
this.epochCache.initCacheFromFile();
this.defaultMessageStore = defaultMessageStore;
- this.acceptSocketService =
- new
AutoSwitchAcceptSocketService(defaultMessageStore.getMessageStoreConfig().getHaListenPort());
+ this.acceptSocketService = new
AutoSwitchAcceptSocketService(defaultMessageStore.getMessageStoreConfig().getHaListenPort());
this.groupTransferService = new GroupTransferService(this,
defaultMessageStore);
if (this.defaultMessageStore.getMessageStoreConfig().getBrokerRole()
== BrokerRole.SLAVE) {
this.haClient = new AutoSwitchHAClient(this, defaultMessageStore,
this.epochCache);
@@ -63,8 +70,15 @@ public class AutoSwitchHAService extends DefaultHAService {
this.haConnectionStateNotificationService = new
HAConnectionStateNotificationService(this, defaultMessageStore);
}
- @Override
- public boolean changeToMaster(int masterEpoch) {
+ @Override public void shutdown() {
+ super.shutdown();
+ if (this.haClient != null) {
+ this.haClient.shutdown();
+ }
+ this.executorService.shutdown();
+ }
+
+ @Override public boolean changeToMaster(int masterEpoch) {
final int lastEpoch = this.epochCache.lastEpoch();
if (masterEpoch <= lastEpoch) {
return false;
@@ -89,12 +103,15 @@ public class AutoSwitchHAService extends DefaultHAService {
this.epochCache.appendEntry(newEpochEntry);
this.defaultMessageStore.recoverTopicQueueTable();
+
+ final HashSet<String> newSyncStateSet = new HashSet<>();
+ newSyncStateSet.add(this.localAddress);
+ setSyncStateSet(newSyncStateSet);
LOGGER.info("Change ha to master success, newMasterEpoch:{},
startOffset:{}", masterEpoch, newEpochEntry.getStartOffset());
return true;
}
- @Override
- public boolean changeToSlave(String newMasterAddr, int newMasterEpoch,
Long slaveId) {
+ @Override public boolean changeToSlave(String newMasterAddr, int
newMasterEpoch, Long slaveId) {
final int lastEpoch = this.epochCache.lastEpoch();
if (newMasterEpoch <= lastEpoch) {
return false;
@@ -118,28 +135,93 @@ public class AutoSwitchHAService extends DefaultHAService
{
}
}
- @Override
- public void setSyncStateSet(final Set<String> syncStateSet) {
- this.syncStateSet = new HashSet<>(syncStateSet);
+ @Override public HAClient getHAClient() {
+ return this.haClient;
}
- @Override
- public Set<String> getLatestSyncStateSet() {
- final HashSet<String> newSyncStateSet = new
HashSet<>(this.connectionList.size());
- final long masterOffset = this.defaultMessageStore.getMaxPhyOffset();
- for (HAConnection connection : this.connectionList) {
- if (isInSyncSlave(masterOffset, connection)) {
- final String slaveAddress = ((AutoSwitchHAConnection)
connection).getSlaveAddress();
- if (StringUtils.isNoneEmpty(slaveAddress)) {
- newSyncStateSet.add(slaveAddress);
+ @Override public void updateHaMasterAddress(String newAddr) {
+ if (this.haClient != null) {
+ this.haClient.updateHaMasterAddress(newAddr);
+ }
+ }
+
+ @Override public void updateMasterAddress(String newAddr) {
+ }
+
+ public void registerSyncStateSetChangedListener(final
Consumer<Set<String>> listener) {
+ this.syncStateSetChangedListeners.add(listener);
+ }
+
+ public void notifySyncStateSetChanged(final Set<String> newSyncStateSet) {
+ this.executorService.submit(() -> {
+ for (Consumer<Set<String>> listener :
syncStateSetChangedListeners) {
+ listener.accept(newSyncStateSet);
+ }
+ });
+ }
+
+ /**
+ * Check and maybe shrink the inSyncStateSet.
+ * A slave will be removed from inSyncStateSet if (curTime -
HaConnection.lastCaughtUpTime) > option(haMaxTimeSlaveNotCatchup)
+ */
+ public Set<String> maybeShrinkInSyncStateSet() {
+ final Set<String> currentSyncStateSet = getSyncStateSet();
+ final HashSet<String> newSyncStateSet = new
HashSet<>(currentSyncStateSet);
+ final long haMaxTimeSlaveNotCatchup =
this.defaultMessageStore.getMessageStoreConfig().getHaMaxTimeSlaveNotCatchup();
+ for (HAConnection haConnection : this.connectionList) {
+ final AutoSwitchHAConnection connection = (AutoSwitchHAConnection)
haConnection;
+ final String slaveAddress = connection.getSlaveAddress();
+ if (currentSyncStateSet.contains(slaveAddress)) {
+ if ((System.currentTimeMillis() -
connection.getLastCatchUpTimeMs()) > haMaxTimeSlaveNotCatchup) {
+ newSyncStateSet.remove(slaveAddress);
}
}
}
return newSyncStateSet;
}
- @Override public HAClient getHAClient() {
- return this.haClient;
+ /**
+ * Check and maybe add the slave to inSyncStateSet.
+ * A slave will be added to inSyncStateSet if its slaveMaxOffset >=
current confirmOffset, and it is caught up to
+ * an offset within the current leader epoch.
+ */
+ public void maybeExpandInSyncStateSet(final String slaveAddress, final
long slaveMaxOffset) {
+ final Set<String> currentSyncStateSet = getSyncStateSet();
+ if (currentSyncStateSet.contains(slaveAddress)) {
+ return;
+ }
+ final long confirmOffset = getConfirmOffset();
+ if (slaveMaxOffset >= confirmOffset) {
+ final EpochEntry currentLeaderEpoch = this.epochCache.lastEntry();
+ if (slaveMaxOffset >= currentLeaderEpoch.getStartOffset()) {
+ currentSyncStateSet.add(slaveAddress);
+ // Notify the upper layer that syncStateSet changed.
+ notifySyncStateSetChanged(currentSyncStateSet);
+ }
+ }
+ }
+
+ /**
+ * Get confirm offset (min slaveAckOffset of all syncStateSet members)
+ */
+ public long getConfirmOffset() {
+ final Set<String> currentSyncStateSet = getSyncStateSet();
+ long confirmOffset = this.defaultMessageStore.getMaxPhyOffset();
+ for (HAConnection connection : this.connectionList) {
+ if (currentSyncStateSet.contains(connection.getClientAddress())) {
+ confirmOffset = Math.min(confirmOffset,
connection.getSlaveAckOffset());
+ }
+ }
+ return confirmOffset;
+ }
+
+ public synchronized void setSyncStateSet(final Set<String> syncStateSet) {
+ this.syncStateSet.clear();
+ this.syncStateSet.addAll(syncStateSet);
+ }
+
+ public synchronized Set<String> getSyncStateSet() {
+ return new HashSet<>(this.syncStateSet);
}
public void truncateEpochFilePrefix(final long offset) {
@@ -150,6 +232,10 @@ public class AutoSwitchHAService extends DefaultHAService {
this.epochCache.truncateSuffixByOffset(offset);
}
+ public void setLocalAddress(String localAddress) {
+ this.localAddress = localAddress;
+ }
+
/**
* Try to truncate incomplete msg transferred from master.
*/
@@ -201,36 +287,6 @@ public class AutoSwitchHAService extends DefaultHAService {
return reputFromOffset;
}
- /**
- * Get confirm offset (min slaveAckOffset of all syncStateSet)
- */
- public long getConfirmOffset() {
- if (this.syncStateSet != null) {
- final HashSet<String> syncStateSetCopy = new
HashSet<>(this.syncStateSet);
- long confirmOffset = this.defaultMessageStore.getMaxPhyOffset();
- for (HAConnection connection : this.connectionList) {
- if (syncStateSetCopy.contains(connection.getClientAddress())) {
- confirmOffset = Math.min(confirmOffset,
connection.getSlaveAckOffset());
- }
- }
- return confirmOffset;
- }
- return -1;
- }
-
- public void setLocalAddress(String localAddress) {
- this.localAddress = localAddress;
- }
-
- @Override public void updateHaMasterAddress(String newAddr) {
- if (this.haClient != null) {
- this.haClient.updateHaMasterAddress(newAddr);
- }
- }
-
- @Override public void updateMasterAddress(String newAddr) {
- }
-
class AutoSwitchAcceptSocketService extends AcceptSocketService {
public AutoSwitchAcceptSocketService(int port) {
diff --git
a/store/src/test/java/org/apache/rocketmq/store/ha/autoswitch/AutoSwitchHATest.java
b/store/src/test/java/org/apache/rocketmq/store/ha/autoswitch/AutoSwitchHATest.java
index 8f74fbbb0..777a62036 100644
---
a/store/src/test/java/org/apache/rocketmq/store/ha/autoswitch/AutoSwitchHATest.java
+++
b/store/src/test/java/org/apache/rocketmq/store/ha/autoswitch/AutoSwitchHATest.java
@@ -21,8 +21,10 @@ import java.io.File;
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.net.SocketAddress;
+import java.util.Set;
import java.util.UUID;
import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicReference;
import org.apache.rocketmq.common.BrokerConfig;
import org.apache.rocketmq.common.UtilAll;
import org.apache.rocketmq.common.message.MessageDecoder;
@@ -31,6 +33,8 @@ import org.apache.rocketmq.store.DefaultMessageStore;
import org.apache.rocketmq.store.GetMessageResult;
import org.apache.rocketmq.store.GetMessageStatus;
import org.apache.rocketmq.store.MappedFileQueue;
+import org.apache.rocketmq.store.PutMessageResult;
+import org.apache.rocketmq.store.PutMessageStatus;
import org.apache.rocketmq.store.config.BrokerRole;
import org.apache.rocketmq.store.config.FlushDiskType;
import org.apache.rocketmq.store.config.MessageStoreConfig;
@@ -116,6 +120,39 @@ public class AutoSwitchHATest {
messageStore3.start();
}
+ public void init(int mappedFileSize, boolean allAckInSyncStateSet) throws
Exception {
+ QUEUE_TOTAL = 1;
+ MessageBody = StoreMessage.getBytes();
+ StoreHost = new InetSocketAddress(InetAddress.getLocalHost(), 8123);
+ BornHost = new InetSocketAddress(InetAddress.getByName("127.0.0.1"),
0);
+ storeConfig1 = new MessageStoreConfig();
+ storeConfig1.setBrokerRole(BrokerRole.SYNC_MASTER);
+ storeConfig1.setStorePathRootDir(storePathRootDir + File.separator +
"broker1");
+ storeConfig1.setStorePathCommitLog(storePathRootDir + File.separator +
"broker1" + File.separator + "commitlog");
+ storeConfig1.setStorePathEpochFile(storePathRootDir + File.separator +
"broker1" + File.separator + "EpochFileCache");
+ storeConfig1.setAllAckInSyncStateSet(allAckInSyncStateSet);
+ buildMessageStoreConfig(storeConfig1, mappedFileSize);
+ this.store1HaAddress = "127.0.0.1:10912";
+
+ storeConfig2 = new MessageStoreConfig();
+ storeConfig2.setBrokerRole(BrokerRole.SLAVE);
+ storeConfig2.setStorePathRootDir(storePathRootDir + File.separator +
"broker2");
+ storeConfig2.setStorePathCommitLog(storePathRootDir + File.separator +
"broker2" + File.separator + "commitlog");
+ storeConfig2.setStorePathEpochFile(storePathRootDir + File.separator +
"broker2" + File.separator + "EpochFileCache");
+ storeConfig2.setHaListenPort(10943);
+ storeConfig2.setAllAckInSyncStateSet(allAckInSyncStateSet);
+ buildMessageStoreConfig(storeConfig2, mappedFileSize);
+ this.store2HaAddress = "127.0.0.1:10943";
+
+ messageStore1 = buildMessageStore(storeConfig1, 0L);
+ messageStore2 = buildMessageStore(storeConfig2, 1L);
+
+ assertTrue(messageStore1.load());
+ assertTrue(messageStore2.load());
+ messageStore1.start();
+ messageStore2.start();
+ }
+
private void changeMasterAndPutMessage(DefaultMessageStore master,
MessageStoreConfig masterConfig,
DefaultMessageStore slave, long slaveId, MessageStoreConfig
slaveConfig, int epoch, String masterHaAddress,
int totalPutMessageNums) throws Exception {
@@ -147,6 +184,37 @@ public class AutoSwitchHATest {
}
}
+ @Test
+ public void testOptionAllAckInSyncStateSet() throws Exception {
+ init(defaultMappedFileSize, true);
+ AtomicReference<Set<String>> syncStateSet = new AtomicReference<>();
+ ((AutoSwitchHAService)
this.messageStore1.getHaService()).setLocalAddress("127.0.0.1:8000");
+ ((AutoSwitchHAService)
this.messageStore2.getHaService()).setLocalAddress("127.0.0.1:8001");
+ ((AutoSwitchHAService)
this.messageStore1.getHaService()).registerSyncStateSetChangedListener((newSyncStateSet)
-> {
+ System.out.println("Get newSyncStateSet:" + newSyncStateSet);
+ syncStateSet.set(newSyncStateSet);
+ });
+
+ changeMasterAndPutMessage(this.messageStore1, this.storeConfig1,
this.messageStore2, 2, this.storeConfig2, 1, store1HaAddress, 10);
+ Thread.sleep(1000);
+ checkMessage(this.messageStore2, 10, 0);
+
+ Thread.sleep(1000);
+ // Check syncStateSet
+ final Set<String> result = syncStateSet.get();
+ assertTrue(result.contains("127.0.0.1:8000"));
+ assertTrue(result.contains("127.0.0.1:8001"));
+
+ // Now, shutdown store2
+ this.messageStore2.shutdown();
+ this.messageStore2.destroy();
+
+ ((AutoSwitchHAService)
this.messageStore1.getHaService()).setSyncStateSet(result);
+
+ final PutMessageResult putMessageResult =
this.messageStore1.putMessage(buildMessage());
+ assertEquals(putMessageResult.getPutMessageStatus(),
PutMessageStatus.FLUSH_SLAVE_TIMEOUT);
+ }
+
@Test
public void testChangeRoleManyTimes() throws Exception {
// Step1, change store1 to master, store2 to follower
@@ -292,12 +360,18 @@ public class AutoSwitchHATest {
@After
public void destroy() throws Exception {
Thread.sleep(5000L);
- messageStore2.shutdown();
- messageStore2.destroy();
- messageStore1.shutdown();
- messageStore1.destroy();
- messageStore3.shutdown();
- messageStore3.destroy();
+ if (this.messageStore2 != null) {
+ messageStore2.shutdown();
+ messageStore2.destroy();
+ }
+ if (this.messageStore1 != null) {
+ messageStore1.shutdown();
+ messageStore1.destroy();
+ }
+ if (this.messageStore3 != null) {
+ messageStore3.shutdown();
+ messageStore3.destroy();
+ }
File file = new File(storePathRootParentDir);
UtilAll.deleteFile(file);
}
diff --git
a/store/src/test/java/org/apache/rocketmq/store/ha/EpochFileCacheTest.java
b/store/src/test/java/org/apache/rocketmq/store/ha/autoswitch/EpochFileCacheTest.java
similarity index 98%
rename from
store/src/test/java/org/apache/rocketmq/store/ha/EpochFileCacheTest.java
rename to
store/src/test/java/org/apache/rocketmq/store/ha/autoswitch/EpochFileCacheTest.java
index aadf683a2..5fa4891ca 100644
--- a/store/src/test/java/org/apache/rocketmq/store/ha/EpochFileCacheTest.java
+++
b/store/src/test/java/org/apache/rocketmq/store/ha/autoswitch/EpochFileCacheTest.java
@@ -14,12 +14,11 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.rocketmq.store.ha;
+package org.apache.rocketmq.store.ha.autoswitch;
import java.io.File;
import java.nio.file.Paths;
import org.apache.rocketmq.common.EpochEntry;
-import org.apache.rocketmq.store.ha.autoswitch.EpochFileCache;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;