This is an automated email from the ASF dual-hosted git repository.
chaow pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new ba4cfaa [IOTDB-1118] [Distributed] Limit query log lag in mid
consistency level (#2766)
ba4cfaa is described below
commit ba4cfaae2ff103a2035a9d0be164c7f9f40d2741
Author: wangchao316 <[email protected]>
AuthorDate: Sat Mar 6 11:10:07 2021 +0800
[IOTDB-1118] [Distributed] Limit query log lag in mid consistency level
(#2766)
---
.../resources/conf/iotdb-cluster.properties | 6 +-
.../apache/iotdb/cluster/config/ClusterConfig.java | 17 +-
.../iotdb/cluster/config/ClusterDescriptor.java | 4 +
.../exception/CheckConsistencyException.java | 4 +
.../apache/iotdb/cluster/metadata/CMManager.java | 28 ++-
.../apache/iotdb/cluster/metadata/MetaPuller.java | 11 +-
.../iotdb/cluster/query/ClusterPlanExecutor.java | 6 +-
.../iotdb/cluster/server/DataClusterServer.java | 6 +-
.../iotdb/cluster/server/member/RaftMember.java | 136 +++++++++----
.../cluster/log/applier/DataLogApplierTest.java | 3 +-
.../cluster/server/member/DataGroupMemberTest.java | 2 +-
.../iotdb/cluster/server/member/MemberTest.java | 215 ++++++++++++++-------
.../cluster/server/member/MetaGroupMemberTest.java | 2 +-
13 files changed, 310 insertions(+), 130 deletions(-)
diff --git a/cluster/src/assembly/resources/conf/iotdb-cluster.properties
b/cluster/src/assembly/resources/conf/iotdb-cluster.properties
index 1b55407..62aa133 100644
--- a/cluster/src/assembly/resources/conf/iotdb-cluster.properties
+++ b/cluster/src/assembly/resources/conf/iotdb-cluster.properties
@@ -153,4 +153,8 @@ enable_use_persist_log_on_disk_to_catch_up=true
# The number of logs read on the disk at one time, which is mainly used to
control the memory usage.
# This value multiplied by the log size is about the amount of memory used to
read logs from the disk at one time.
-max_number_of_logs_per_fetch_on_disk=1000
\ No newline at end of file
+max_number_of_logs_per_fetch_on_disk=1000
+
+# When consistency level is set to mid, query will fail if the log lag exceeds
max_read_log_lag
+# This default value is 1000
+max_read_log_lag=1000
\ No newline at end of file
diff --git
a/cluster/src/main/java/org/apache/iotdb/cluster/config/ClusterConfig.java
b/cluster/src/main/java/org/apache/iotdb/cluster/config/ClusterConfig.java
index c7efacb..6fe99dc 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/config/ClusterConfig.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/config/ClusterConfig.java
@@ -147,16 +147,17 @@ public class ClusterConfig {
*/
private boolean waitForSlowNode = true;
+ /**
+ * When consistency level is set to mid, query will fail if the log lag
exceeds max_read_log_lag.
+ */
+ private long maxReadLogLag = 1000L;
+
private boolean openServerRpcPort = false;
public int getSelectorNumOfClientPool() {
return selectorNumOfClientPool;
}
- public void setSelectorNumOfClientPool(int selectorNumOfClientPool) {
- this.selectorNumOfClientPool = selectorNumOfClientPool;
- }
-
public int getMaxClientPerNodePerMember() {
return maxClientPerNodePerMember;
}
@@ -417,8 +418,12 @@ public class ClusterConfig {
return waitForSlowNode;
}
- public void setWaitForSlowNode(boolean waitForSlowNode) {
- this.waitForSlowNode = waitForSlowNode;
+ public long getMaxReadLogLag() {
+ return maxReadLogLag;
+ }
+
+ public void setMaxReadLogLag(long maxReadLogLag) {
+ this.maxReadLogLag = maxReadLogLag;
}
public String getInternalIp() {
diff --git
a/cluster/src/main/java/org/apache/iotdb/cluster/config/ClusterDescriptor.java
b/cluster/src/main/java/org/apache/iotdb/cluster/config/ClusterDescriptor.java
index 1fcdd30..631662f 100644
---
a/cluster/src/main/java/org/apache/iotdb/cluster/config/ClusterDescriptor.java
+++
b/cluster/src/main/java/org/apache/iotdb/cluster/config/ClusterDescriptor.java
@@ -278,6 +278,10 @@ public class ClusterDescriptor {
"enable_use_persist_log_on_disk_to_catch_up",
String.valueOf(config.isEnableUsePersistLogOnDiskToCatchUp()))));
+ config.setMaxReadLogLag(
+ Long.parseLong(
+ properties.getProperty("max_read_log_lag",
String.valueOf(config.getMaxReadLogLag()))));
+
String consistencyLevel = properties.getProperty("consistency_level");
if (consistencyLevel != null) {
config.setConsistencyLevel(ConsistencyLevel.getConsistencyLevel(consistencyLevel));
diff --git
a/cluster/src/main/java/org/apache/iotdb/cluster/exception/CheckConsistencyException.java
b/cluster/src/main/java/org/apache/iotdb/cluster/exception/CheckConsistencyException.java
index 55b9722..5f3bca0 100644
---
a/cluster/src/main/java/org/apache/iotdb/cluster/exception/CheckConsistencyException.java
+++
b/cluster/src/main/java/org/apache/iotdb/cluster/exception/CheckConsistencyException.java
@@ -30,4 +30,8 @@ public class CheckConsistencyException extends Exception {
public static final CheckConsistencyException
CHECK_STRONG_CONSISTENCY_EXCEPTION =
new CheckConsistencyException("strong consistency, sync with leader
failed");
+
+ public static final CheckConsistencyException
CHECK_MID_CONSISTENCY_EXCEPTION =
+ new CheckConsistencyException(
+ "mid consistency, localAppliedId is smaller than the
leaderCommitId");
}
diff --git
a/cluster/src/main/java/org/apache/iotdb/cluster/metadata/CMManager.java
b/cluster/src/main/java/org/apache/iotdb/cluster/metadata/CMManager.java
index 6cbf475..6eae80a 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/metadata/CMManager.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/metadata/CMManager.java
@@ -813,9 +813,13 @@ public class CMManager extends MManager {
private void pullTimeSeriesSchemas(PartitionGroup partitionGroup,
List<String> prefixPaths) {
if (partitionGroup.contains(metaGroupMember.getThisNode())) {
// the node is in the target group, synchronize with leader should be
enough
- metaGroupMember
- .getLocalDataMember(partitionGroup.getHeader(), "Pull timeseries of
" + prefixPaths)
- .syncLeader();
+ try {
+ metaGroupMember
+ .getLocalDataMember(partitionGroup.getHeader(), "Pull timeseries
of " + prefixPaths)
+ .syncLeader(null);
+ } catch (CheckConsistencyException e) {
+ logger.warn("Failed to check consistency.", e);
+ }
return;
}
@@ -1062,7 +1066,11 @@ public class CMManager extends MManager {
if (partitionGroup.contains(metaGroupMember.getThisNode())) {
// this node is a member of the group, perform a local query after
synchronizing with the
// leader
-
metaGroupMember.getLocalDataMember(partitionGroup.getHeader()).syncLeader();
+ try {
+
metaGroupMember.getLocalDataMember(partitionGroup.getHeader()).syncLeader(null);
+ } catch (CheckConsistencyException e) {
+ logger.warn("Failed to check consistency.", e);
+ }
List<PartialPath> allTimeseriesName =
getMatchedPathsLocally(pathUnderSG, withAlias);
logger.debug(
"{}: get matched paths of {} locally, result {}",
@@ -1198,7 +1206,11 @@ public class CMManager extends MManager {
if (partitionGroup.contains(metaGroupMember.getThisNode())) {
// this node is a member of the group, perform a local query after
synchronizing with the
// leader
-
metaGroupMember.getLocalDataMember(partitionGroup.getHeader()).syncLeader();
+ try {
+
metaGroupMember.getLocalDataMember(partitionGroup.getHeader()).syncLeader(null);
+ } catch (CheckConsistencyException e) {
+ logger.warn("Failed to check consistency.", e);
+ }
Set<PartialPath> allDevices = getDevices(pathUnderSG);
logger.debug(
"{}: get matched paths of {} locally, result {}",
@@ -1794,7 +1806,11 @@ public class CMManager extends MManager {
try {
return super.getStorageGroupPath(path);
} catch (StorageGroupNotSetException e) {
- metaGroupMember.syncLeader();
+ try {
+ metaGroupMember.syncLeader(null);
+ } catch (CheckConsistencyException ex) {
+ logger.warn("Failed to check consistency.", e);
+ }
return super.getStorageGroupPath(path);
}
}
diff --git
a/cluster/src/main/java/org/apache/iotdb/cluster/metadata/MetaPuller.java
b/cluster/src/main/java/org/apache/iotdb/cluster/metadata/MetaPuller.java
index e98b67c..260c3c2 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/metadata/MetaPuller.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/metadata/MetaPuller.java
@@ -23,6 +23,7 @@ import org.apache.iotdb.cluster.client.async.AsyncDataClient;
import org.apache.iotdb.cluster.client.sync.SyncClientAdaptor;
import org.apache.iotdb.cluster.client.sync.SyncDataClient;
import org.apache.iotdb.cluster.config.ClusterDescriptor;
+import org.apache.iotdb.cluster.exception.CheckConsistencyException;
import org.apache.iotdb.cluster.partition.PartitionGroup;
import org.apache.iotdb.cluster.rpc.thrift.Node;
import org.apache.iotdb.cluster.rpc.thrift.PullSchemaRequest;
@@ -130,9 +131,13 @@ public class MetaPuller {
List<MeasurementSchema> results) {
if (partitionGroup.contains(metaGroupMember.getThisNode())) {
// the node is in the target group, synchronize with leader should be
enough
- metaGroupMember
- .getLocalDataMember(partitionGroup.getHeader(), "Pull timeseries of
" + prefixPaths)
- .syncLeader();
+ try {
+ metaGroupMember
+ .getLocalDataMember(partitionGroup.getHeader(), "Pull timeseries
of " + prefixPaths)
+ .syncLeader(null);
+ } catch (CheckConsistencyException e) {
+ logger.warn("Failed to check consistency.", e);
+ }
int preSize = results.size();
for (PartialPath prefixPath : prefixPaths) {
IoTDB.metaManager.collectSeries(prefixPath, results);
diff --git
a/cluster/src/main/java/org/apache/iotdb/cluster/query/ClusterPlanExecutor.java
b/cluster/src/main/java/org/apache/iotdb/cluster/query/ClusterPlanExecutor.java
index e76be6c..2238546 100644
---
a/cluster/src/main/java/org/apache/iotdb/cluster/query/ClusterPlanExecutor.java
+++
b/cluster/src/main/java/org/apache/iotdb/cluster/query/ClusterPlanExecutor.java
@@ -502,7 +502,11 @@ public class ClusterPlanExecutor extends PlanExecutor {
@Override
protected List<StorageGroupMNode> getAllStorageGroupNodes() {
- metaGroupMember.syncLeader();
+ try {
+ metaGroupMember.syncLeader(null);
+ } catch (CheckConsistencyException e) {
+ logger.warn("Failed to check consistency.", e);
+ }
return IoTDB.metaManager.getAllStorageGroupNodes();
}
diff --git
a/cluster/src/main/java/org/apache/iotdb/cluster/server/DataClusterServer.java
b/cluster/src/main/java/org/apache/iotdb/cluster/server/DataClusterServer.java
index 9f0f098..012569e 100644
---
a/cluster/src/main/java/org/apache/iotdb/cluster/server/DataClusterServer.java
+++
b/cluster/src/main/java/org/apache/iotdb/cluster/server/DataClusterServer.java
@@ -552,7 +552,11 @@ public class DataClusterServer extends RaftServer
}
private void removeMember(Node header, DataGroupMember dataGroupMember) {
- dataGroupMember.syncLeader();
+ try {
+ dataGroupMember.syncLeader(null);
+ } catch (CheckConsistencyException e) {
+ logger.warn("Failed to check consistency.", e);
+ }
dataGroupMember.setReadOnly();
dataGroupMember.stop();
stoppedMemberManager.put(header, dataGroupMember);
diff --git
a/cluster/src/main/java/org/apache/iotdb/cluster/server/member/RaftMember.java
b/cluster/src/main/java/org/apache/iotdb/cluster/server/member/RaftMember.java
index 54a632e..8783e6c 100644
---
a/cluster/src/main/java/org/apache/iotdb/cluster/server/member/RaftMember.java
+++
b/cluster/src/main/java/org/apache/iotdb/cluster/server/member/RaftMember.java
@@ -107,7 +107,7 @@ import java.util.concurrent.atomic.AtomicReference;
*/
@SuppressWarnings("java:S3077") // reference volatile is enough
public abstract class RaftMember {
-
+ private static final Logger logger =
LoggerFactory.getLogger(RaftMember.class);
public static final boolean USE_LOG_DISPATCHER = false;
private static final String MSG_FORWARD_TIMEOUT = "{}: Forward {} to {} time
out";
@@ -115,7 +115,6 @@ public abstract class RaftMember {
"{}: encountered an error when forwarding {} to" + " {}";
private static final String MSG_NO_LEADER_COMMIT_INDEX =
"{}: Cannot request commit index from {}";
- private static final Logger logger =
LoggerFactory.getLogger(RaftMember.class);
private static final String MSG_NO_LEADER_IN_SYNC = "{}: No leader is found
when synchronizing";
public static final String MSG_LOG_IS_ACCEPTED = "{}: log {} is accepted";
/**
@@ -279,7 +278,7 @@ public abstract class RaftMember {
Executors.newFixedThreadPool(
Runtime.getRuntime().availableProcessors() * 10,
new ThreadFactoryBuilder().setNameFormat(getName() +
"-AppendLog%d").build());
- if (!ClusterDescriptor.getInstance().getConfig().isUseAsyncServer()) {
+ if (!config.isUseAsyncServer()) {
serialToParallelPool =
new ThreadPoolExecutor(
allNodes.size(),
@@ -731,19 +730,16 @@ public abstract class RaftMember {
public void syncLeaderWithConsistencyCheck(boolean isWriteRequest)
throws CheckConsistencyException {
if (isWriteRequest) {
- if (!syncLeader()) {
- throw CheckConsistencyException.CHECK_STRONG_CONSISTENCY_EXCEPTION;
- }
+ syncLeader(new StrongCheckConsistency());
} else {
- switch
(ClusterDescriptor.getInstance().getConfig().getConsistencyLevel()) {
+ switch (config.getConsistencyLevel()) {
case STRONG_CONSISTENCY:
- if (!syncLeader()) {
- throw CheckConsistencyException.CHECK_STRONG_CONSISTENCY_EXCEPTION;
- }
+ syncLeader(new StrongCheckConsistency());
return;
case MID_CONSISTENCY:
- // do not care success or not
- syncLeader();
+ // if leaderCommitId bigger than localAppliedId a value,
+ // will throw CHECK_MID_CONSISTENCY_EXCEPTION
+ syncLeader(new MidCheckConsistency());
return;
case WEAK_CONSISTENCY:
// do nothing
@@ -751,8 +747,64 @@ public abstract class RaftMember {
default:
// this should not happen in theory
throw new CheckConsistencyException(
- "unknown consistency="
- +
ClusterDescriptor.getInstance().getConfig().getConsistencyLevel().name());
+ "unknown consistency=" + config.getConsistencyLevel().name());
+ }
+ }
+ }
+
+ /** call back after syncLeader */
+ public interface CheckConsistency {
+
+ /**
+ * deal leaderCommitId and localAppliedId after syncLeader
+ *
+ * @param leaderCommitId leader commit id
+ * @param localAppliedId local applied id
+ * @throws CheckConsistencyException maybe throw
CheckConsistencyException, which is defined in
+ * implements.
+ */
+ void postCheckConsistency(long leaderCommitId, long localAppliedId)
+ throws CheckConsistencyException;
+ }
+
+ public static class MidCheckConsistency implements CheckConsistency {
+
+ /**
+ * if leaderCommitId - localAppliedId > MaxReadLogLag, will throw
+ * CHECK_MID_CONSISTENCY_EXCEPTION
+ *
+ * @param leaderCommitId leader commit id
+ * @param localAppliedId local applied id
+ * @throws CheckConsistencyException
+ */
+ @Override
+ public void postCheckConsistency(long leaderCommitId, long localAppliedId)
+ throws CheckConsistencyException {
+ if (leaderCommitId == Long.MAX_VALUE
+ || leaderCommitId == Long.MIN_VALUE
+ || leaderCommitId - localAppliedId
+ >
ClusterDescriptor.getInstance().getConfig().getMaxReadLogLag()) {
+ throw CheckConsistencyException.CHECK_MID_CONSISTENCY_EXCEPTION;
+ }
+ }
+ }
+
+ public static class StrongCheckConsistency implements CheckConsistency {
+
+ /**
+ * if leaderCommitId > localAppliedId, will throw
CHECK_STRONG_CONSISTENCY_EXCEPTION
+ *
+ * @param leaderCommitId leader commit id
+ * @param localAppliedId local applied id
+ * @throws CheckConsistencyException
+ */
+ @Override
+ public void postCheckConsistency(long leaderCommitId, long localAppliedId)
+ throws CheckConsistencyException {
+ if (leaderCommitId > localAppliedId
+ || leaderCommitId == Long.MAX_VALUE
+ || leaderCommitId == Long.MIN_VALUE) {
+ throw CheckConsistencyException.CHECK_STRONG_CONSISTENCY_EXCEPTION;
}
}
}
@@ -761,9 +813,12 @@ public abstract class RaftMember {
* Request and check the leader's commitId to see whether this node has
caught up. If not, wait
* until this node catches up.
*
+ * @param checkConsistency check after syncleader
* @return true if the node has caught up, false otherwise
+ * @throws CheckConsistencyException if leaderCommitId bigger than
localAppliedId a threshold
+ * value after timeout
*/
- public boolean syncLeader() {
+ public boolean syncLeader(CheckConsistency checkConsistency) throws
CheckConsistencyException {
if (character == NodeCharacter.LEADER) {
return true;
}
@@ -777,7 +832,7 @@ public abstract class RaftMember {
return true;
}
logger.debug("{}: try synchronizing with the leader {}", name,
leader.get());
- return waitUntilCatchUp();
+ return waitUntilCatchUp(checkConsistency);
}
/** Wait until the leader of this node becomes known or time out. */
@@ -806,32 +861,42 @@ public abstract class RaftMember {
* it.
*
* @return true if this node has caught up before timeout, false otherwise
+ * @throws CheckConsistencyException if leaderCommitId bigger than
localAppliedId a threshold
+ * value after timeout
*/
- private boolean waitUntilCatchUp() {
- long startTime = System.currentTimeMillis();
- long waitedTime = 0;
- long leaderCommitId;
+ protected boolean waitUntilCatchUp(CheckConsistency checkConsistency)
+ throws CheckConsistencyException {
+ long leaderCommitId = Long.MIN_VALUE;
try {
- leaderCommitId =
- ClusterDescriptor.getInstance().getConfig().isUseAsyncServer()
- ? requestCommitIdAsync()
- : requestCommitIdSync();
- if (leaderCommitId == Long.MAX_VALUE) {
- // Long.MAX_VALUE representing there is a network issue
- return false;
- }
+ leaderCommitId = config.isUseAsyncServer() ? requestCommitIdAsync() :
requestCommitIdSync();
+ return syncLocalApply(leaderCommitId);
} catch (TException e) {
logger.error(MSG_NO_LEADER_COMMIT_INDEX, name, leader.get(), e);
- return false;
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
logger.error(MSG_NO_LEADER_COMMIT_INDEX, name, leader.get(), e);
- return false;
+ } finally {
+ if (checkConsistency != null) {
+ checkConsistency.postCheckConsistency(
+ leaderCommitId, logManager.getMaxHaveAppliedCommitIndex());
+ }
}
+ return false;
+ }
+ /**
+ * sync local applyId to leader commitId
+ *
+ * @param leaderCommitId leader commit id
+ * @return true if leaderCommitId <= localAppliedId
+ */
+ private boolean syncLocalApply(long leaderCommitId) {
+ long startTime = System.currentTimeMillis();
+ long waitedTime = 0;
+ long localAppliedId = 0;
while (waitedTime < RaftServer.getSyncLeaderMaxWaitMs()) {
try {
- long localAppliedId = logManager.getMaxHaveAppliedCommitIndex();
+ localAppliedId = logManager.getMaxHaveAppliedCommitIndex();
logger.debug("{}: synchronizing commitIndex {}/{}", name,
localAppliedId, leaderCommitId);
if (leaderCommitId <= localAppliedId) {
// this node has caught up
@@ -854,6 +919,7 @@ public abstract class RaftMember {
}
}
logger.warn("{}: Failed to synchronize with the leader after {}ms", name,
waitedTime);
+
return false;
}
@@ -984,7 +1050,7 @@ public abstract class RaftMember {
}
@SuppressWarnings("java:S2274") // enable timeout
- private long requestCommitIdAsync() throws TException, InterruptedException {
+ protected long requestCommitIdAsync() throws TException,
InterruptedException {
// use Long.MAX_VALUE to indicate a timeout
AtomicReference<Long> commitIdResult = new
AtomicReference<>(Long.MAX_VALUE);
AsyncClient client = getAsyncClient(leader.get());
@@ -1152,7 +1218,7 @@ public abstract class RaftMember {
logger.debug("{}: Forward {} to node {}", name, plan, node);
TSStatus status;
- if (ClusterDescriptor.getInstance().getConfig().isUseAsyncServer()) {
+ if (config.isUseAsyncServer()) {
status = forwardPlanAsync(plan, node, header);
} else {
status = forwardPlanSync(plan, node, header);
@@ -1640,7 +1706,7 @@ public abstract class RaftMember {
return;
}
- if (ClusterDescriptor.getInstance().getConfig().isUseAsyncServer()) {
+ if (config.isUseAsyncServer()) {
sendLogAsync(log, voteCounter, node, leaderShipStale, newLeaderTerm,
request, peer);
} else {
sendLogSync(log, voteCounter, node, leaderShipStale, newLeaderTerm,
request, peer);
@@ -1653,7 +1719,7 @@ public abstract class RaftMember {
*/
@SuppressWarnings("java:S2445") // safe synchronized
public boolean waitForPrevLog(Peer peer, Log log) {
- final int maxLogDiff =
ClusterDescriptor.getInstance().getConfig().getMaxNumOfLogsInMem();
+ final int maxLogDiff = config.getMaxNumOfLogsInMem();
long waitStart = System.currentTimeMillis();
long alreadyWait = 0;
// if the peer falls behind too much, wait until it catches up, otherwise
there may be too
diff --git
a/cluster/src/test/java/org/apache/iotdb/cluster/log/applier/DataLogApplierTest.java
b/cluster/src/test/java/org/apache/iotdb/cluster/log/applier/DataLogApplierTest.java
index ff156e8..af6272e 100644
---
a/cluster/src/test/java/org/apache/iotdb/cluster/log/applier/DataLogApplierTest.java
+++
b/cluster/src/test/java/org/apache/iotdb/cluster/log/applier/DataLogApplierTest.java
@@ -42,6 +42,7 @@ import
org.apache.iotdb.cluster.rpc.thrift.RaftService.AsyncClient;
import org.apache.iotdb.cluster.rpc.thrift.TNodeStatus;
import org.apache.iotdb.cluster.server.NodeCharacter;
import org.apache.iotdb.cluster.server.member.DataGroupMember;
+import org.apache.iotdb.cluster.server.member.RaftMember;
import org.apache.iotdb.cluster.server.monitor.NodeStatusManager;
import org.apache.iotdb.cluster.server.service.DataAsyncService;
import org.apache.iotdb.cluster.server.service.MetaAsyncService;
@@ -91,7 +92,7 @@ public class DataLogApplierTest extends IoTDBTest {
private TestMetaGroupMember testMetaGroupMember =
new TestMetaGroupMember() {
@Override
- public boolean syncLeader() {
+ public boolean syncLeader(RaftMember.CheckConsistency
checkConsistency) {
return true;
}
diff --git
a/cluster/src/test/java/org/apache/iotdb/cluster/server/member/DataGroupMemberTest.java
b/cluster/src/test/java/org/apache/iotdb/cluster/server/member/DataGroupMemberTest.java
index 8cab2c1..84d069a 100644
---
a/cluster/src/test/java/org/apache/iotdb/cluster/server/member/DataGroupMemberTest.java
+++
b/cluster/src/test/java/org/apache/iotdb/cluster/server/member/DataGroupMemberTest.java
@@ -183,7 +183,7 @@ public class DataGroupMemberTest extends MemberTest {
DataGroupMember dataGroupMember =
new DataGroupMember(new Factory(), nodes, node, testMetaMember) {
@Override
- public boolean syncLeader() {
+ public boolean syncLeader(CheckConsistency checkConsistency) {
return true;
}
diff --git
a/cluster/src/test/java/org/apache/iotdb/cluster/server/member/MemberTest.java
b/cluster/src/test/java/org/apache/iotdb/cluster/server/member/MemberTest.java
index 724b565..55ee3f3 100644
---
a/cluster/src/test/java/org/apache/iotdb/cluster/server/member/MemberTest.java
+++
b/cluster/src/test/java/org/apache/iotdb/cluster/server/member/MemberTest.java
@@ -46,6 +46,7 @@ import org.apache.iotdb.cluster.rpc.thrift.Node;
import org.apache.iotdb.cluster.rpc.thrift.RaftService.AsyncClient;
import org.apache.iotdb.cluster.rpc.thrift.TNodeStatus;
import org.apache.iotdb.cluster.server.NodeCharacter;
+import org.apache.iotdb.cluster.server.RaftServer;
import org.apache.iotdb.cluster.server.Response;
import org.apache.iotdb.db.conf.IoTDBDescriptor;
import org.apache.iotdb.db.exception.metadata.MetadataException;
@@ -63,6 +64,7 @@ import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
+import org.mockito.Mockito;
import java.io.File;
import java.io.IOException;
@@ -95,6 +97,9 @@ public class MemberTest {
private boolean prevUseAsyncApplier;
private boolean prevEnableWAL;
+ private int syncLeaderMaxWait;
+ private long heartBeatInterval;
+
@Before
public void setUp() throws Exception {
prevUseAsyncApplier =
ClusterDescriptor.getInstance().getConfig().isUseAsyncApplier();
@@ -109,6 +114,12 @@ public class MemberTest {
IoTDBDescriptor.getInstance().getConfig().setEnableWal(false);
RaftMember.setWaitLeaderTimeMs(10);
+ syncLeaderMaxWait = RaftServer.getSyncLeaderMaxWaitMs();
+ heartBeatInterval = RaftServer.getHeartBeatIntervalMs();
+
+ RaftServer.setSyncLeaderMaxWaitMs(100);
+ RaftServer.setHeartBeatIntervalMs(100);
+
allNodes = new PartitionGroup();
for (int i = 0; i < 100; i += 10) {
allNodes.add(TestUtils.getNode(i));
@@ -183,6 +194,9 @@ public class MemberTest {
ClusterDescriptor.getInstance().getConfig().setRaftLogBufferSize(preLogBufferSize);
ClusterDescriptor.getInstance().getConfig().setUseAsyncApplier(prevUseAsyncApplier);
IoTDBDescriptor.getInstance().getConfig().setEnableWal(prevEnableWAL);
+
+ RaftServer.setSyncLeaderMaxWaitMs(syncLeaderMaxWait);
+ RaftServer.setHeartBeatIntervalMs(heartBeatInterval);
}
DataGroupMember getDataGroupMember(Node node) {
@@ -194,7 +208,7 @@ public class MemberTest {
new TestDataGroupMember(node, partitionTable.getHeaderGroup(node)) {
@Override
- public boolean syncLeader() {
+ public boolean syncLeader(RaftMember.CheckConsistency
checkConsistency) {
return true;
}
@@ -304,16 +318,21 @@ public class MemberTest {
return ret;
}
- private DataGroupMember newDataGroupMemberWithSyncLeader(Node node, boolean
syncLeader) {
+ private DataGroupMember newDataGroupMemberWithSyncLeaderFalse(Node node,
boolean syncLeader) {
DataGroupMember newMember =
new TestDataGroupMember(node, partitionTable.getHeaderGroup(node)) {
@Override
- public boolean syncLeader() {
+ public boolean syncLeader(RaftMember.CheckConsistency
checkConsistency) {
return syncLeader;
}
@Override
+ protected long requestCommitIdAsync() {
+ return 5;
+ }
+
+ @Override
public long appendEntry(AppendEntryRequest request) {
return Response.RESPONSE_AGREE;
}
@@ -336,121 +355,169 @@ public class MemberTest {
return newMember;
}
- @Test
- public void testsyncLeaderWithConsistencyCheck() {
+ private DataGroupMember newDataGroupMemberWithSyncLeaderTrue(Node node,
boolean syncLeader) {
+ DataGroupMember newMember =
+ new TestDataGroupMember(node, partitionTable.getHeaderGroup(node)) {
+
+ @Override
+ public boolean syncLeader(RaftMember.CheckConsistency
checkConsistency) {
+ return syncLeader;
+ }
+
+ @Override
+ protected long requestCommitIdAsync() {
+ return 1000L;
+ }
+
+ @Override
+ public long appendEntry(AppendEntryRequest request) {
+ return Response.RESPONSE_AGREE;
+ }
+ @Override
+ public AsyncClient getAsyncClient(Node node) {
+ try {
+ return new TestAsyncDataClient(node, dataGroupMemberMap);
+ } catch (IOException e) {
+ return null;
+ }
+ }
+ };
+ newMember.setThisNode(node);
+ newMember.setMetaGroupMember(testMetaMember);
+ newMember.setLeader(node);
+ newMember.setCharacter(NodeCharacter.LEADER);
+ newMember.setLogManager(new TestPartitionedLogManager());
+ newMember.setAppendLogThreadPool(testThreadPool);
+ return newMember;
+ }
+
+ @Test
+ public void testsyncLeaderStrongConsistencyCheckFalse() {
// 1. write request : Strong consistency level with syncLeader false
DataGroupMember dataGroupMemberWithWriteStrongConsistencyFalse =
- newDataGroupMemberWithSyncLeader(TestUtils.getNode(0), false);
+ newDataGroupMemberWithSyncLeaderFalse(TestUtils.getNode(0), false);
ClusterDescriptor.getInstance()
.getConfig()
- .setConsistencyLevel(ConsistencyLevel.WEAK_CONSISTENCY);
- CheckConsistencyException exception = null;
+ .setConsistencyLevel(ConsistencyLevel.STRONG_CONSISTENCY);
try {
-
dataGroupMemberWithWriteStrongConsistencyFalse.syncLeaderWithConsistencyCheck(true);
+ dataGroupMemberWithWriteStrongConsistencyFalse.waitUntilCatchUp(
+ new RaftMember.StrongCheckConsistency());
} catch (CheckConsistencyException e) {
- exception = e;
+ Assert.assertNotNull(e);
+
Assert.assertEquals(CheckConsistencyException.CHECK_STRONG_CONSISTENCY_EXCEPTION,
e);
}
- Assert.assertNotNull(exception);
-
Assert.assertEquals(CheckConsistencyException.CHECK_STRONG_CONSISTENCY_EXCEPTION,
exception);
+ }
- // 2. write request : Strong consistency level with syncLeader true
+ @Test
+ public void testsyncLeaderStrongConsistencyCheckTrue() {
+ // 1. write request : Strong consistency level with syncLeader false
DataGroupMember dataGroupMemberWithWriteStrongConsistencyTrue =
- newDataGroupMemberWithSyncLeader(TestUtils.getNode(0), true);
- ClusterDescriptor.getInstance()
- .getConfig()
- .setConsistencyLevel(ConsistencyLevel.WEAK_CONSISTENCY);
- exception = null;
- try {
-
dataGroupMemberWithWriteStrongConsistencyTrue.syncLeaderWithConsistencyCheck(true);
- } catch (CheckConsistencyException e) {
- exception = e;
- }
- Assert.assertNull(exception);
-
- // 3. Strong consistency level with syncLeader false
- DataGroupMember dataGroupMemberWithStrongConsistencyFalse =
- newDataGroupMemberWithSyncLeader(TestUtils.getNode(0), false);
+ newDataGroupMemberWithSyncLeaderTrue(TestUtils.getNode(0), false);
ClusterDescriptor.getInstance()
.getConfig()
.setConsistencyLevel(ConsistencyLevel.STRONG_CONSISTENCY);
- exception = null;
try {
-
dataGroupMemberWithStrongConsistencyFalse.syncLeaderWithConsistencyCheck(false);
- } catch (CheckConsistencyException e) {
- exception = e;
- }
- Assert.assertNotNull(exception);
-
Assert.assertEquals(CheckConsistencyException.CHECK_STRONG_CONSISTENCY_EXCEPTION,
exception);
- // 4. Strong consistency level with syncLeader true
- DataGroupMember dataGroupMemberWithStrongConsistencyTrue =
- newDataGroupMemberWithSyncLeader(TestUtils.getNode(0), true);
- ClusterDescriptor.getInstance()
- .getConfig()
- .setConsistencyLevel(ConsistencyLevel.STRONG_CONSISTENCY);
- exception = null;
- try {
-
dataGroupMemberWithStrongConsistencyTrue.syncLeaderWithConsistencyCheck(false);
+ PartitionedSnapshotLogManager partitionedSnapshotLogManager =
+ Mockito.mock(PartitionedSnapshotLogManager.class);
+
Mockito.when(partitionedSnapshotLogManager.getMaxHaveAppliedCommitIndex()).thenReturn(1000L);
+
dataGroupMemberWithWriteStrongConsistencyTrue.setLogManager(partitionedSnapshotLogManager);
+
+ dataGroupMemberWithWriteStrongConsistencyTrue.waitUntilCatchUp(
+ new RaftMember.StrongCheckConsistency());
} catch (CheckConsistencyException e) {
- exception = e;
+ Assert.fail();
}
- Assert.assertNull(exception);
+ }
- // 5. Mid consistency level with syncLeader false
- DataGroupMember dataGroupMemberWithMidConsistencyFalse =
- newDataGroupMemberWithSyncLeader(TestUtils.getNode(0), false);
+ @Test
+ public void testsyncLeaderMidConsistencyCheckFalse() {
+ // 1. write request : Strong consistency level with syncLeader false
+ DataGroupMember dataGroupMemberWithWriteStrongConsistencyFalse =
+ newDataGroupMemberWithSyncLeaderFalse(TestUtils.getNode(0), false);
ClusterDescriptor.getInstance()
.getConfig()
.setConsistencyLevel(ConsistencyLevel.MID_CONSISTENCY);
- exception = null;
+ ClusterDescriptor.getInstance().getConfig().setMaxReadLogLag(1);
try {
-
dataGroupMemberWithMidConsistencyFalse.syncLeaderWithConsistencyCheck(false);
+
+ PartitionedSnapshotLogManager partitionedSnapshotLogManager =
+ Mockito.mock(PartitionedSnapshotLogManager.class);
+
Mockito.when(partitionedSnapshotLogManager.getMaxHaveAppliedCommitIndex()).thenReturn(-2L);
+
dataGroupMemberWithWriteStrongConsistencyFalse.setLogManager(partitionedSnapshotLogManager);
+
+ dataGroupMemberWithWriteStrongConsistencyFalse.waitUntilCatchUp(
+ new RaftMember.MidCheckConsistency());
} catch (CheckConsistencyException e) {
- exception = e;
+
Assert.assertEquals(CheckConsistencyException.CHECK_MID_CONSISTENCY_EXCEPTION,
e);
}
- Assert.assertNull(exception);
+ }
- // 6. Mid consistency level with syncLeader true
- DataGroupMember dataGroupMemberWithMidConsistencyTrue =
- newDataGroupMemberWithSyncLeader(TestUtils.getNode(0), true);
+ @Test
+ public void testsyncLeaderMidConsistencyCheckTrue() {
+ // 1. write request : Strong consistency level with syncLeader false
+ DataGroupMember dataGroupMemberWithWriteStrongConsistencyTrue =
+ newDataGroupMemberWithSyncLeaderTrue(TestUtils.getNode(0), false);
ClusterDescriptor.getInstance()
.getConfig()
.setConsistencyLevel(ConsistencyLevel.MID_CONSISTENCY);
- exception = null;
+ ClusterDescriptor.getInstance().getConfig().setMaxReadLogLag(500);
try {
-
dataGroupMemberWithMidConsistencyTrue.syncLeaderWithConsistencyCheck(false);
+
+ PartitionedSnapshotLogManager partitionedSnapshotLogManager =
+ Mockito.mock(PartitionedSnapshotLogManager.class);
+
Mockito.when(partitionedSnapshotLogManager.getMaxHaveAppliedCommitIndex()).thenReturn(600L);
+
dataGroupMemberWithWriteStrongConsistencyTrue.setLogManager(partitionedSnapshotLogManager);
+
+ dataGroupMemberWithWriteStrongConsistencyTrue.waitUntilCatchUp(
+ new RaftMember.MidCheckConsistency());
} catch (CheckConsistencyException e) {
- exception = e;
+ Assert.fail();
}
- Assert.assertNull(exception);
+ }
- // 7. Weak consistency level with syncLeader false
- DataGroupMember dataGroupMemberWithWeakConsistencyFalse =
- newDataGroupMemberWithSyncLeader(TestUtils.getNode(0), false);
+ @Test
+ public void testsyncLeaderWeakConsistencyCheckFalse() {
+ // 1. write request : Strong consistency level with syncLeader false
+ DataGroupMember dataGroupMemberWithWriteStrongConsistencyFalse =
+ newDataGroupMemberWithSyncLeaderFalse(TestUtils.getNode(0), false);
ClusterDescriptor.getInstance()
.getConfig()
.setConsistencyLevel(ConsistencyLevel.WEAK_CONSISTENCY);
- exception = null;
+ ClusterDescriptor.getInstance().getConfig().setMaxReadLogLag(1);
try {
-
dataGroupMemberWithWeakConsistencyFalse.syncLeaderWithConsistencyCheck(false);
+
+ PartitionedSnapshotLogManager partitionedSnapshotLogManager =
+ Mockito.mock(PartitionedSnapshotLogManager.class);
+
Mockito.when(partitionedSnapshotLogManager.getMaxHaveAppliedCommitIndex()).thenReturn(-2L);
+
dataGroupMemberWithWriteStrongConsistencyFalse.setLogManager(partitionedSnapshotLogManager);
+
+ dataGroupMemberWithWriteStrongConsistencyFalse.waitUntilCatchUp(null);
} catch (CheckConsistencyException e) {
- exception = e;
+ Assert.fail();
}
- Assert.assertNull(exception);
+ }
- // 8. Weak consistency level with syncLeader true
- DataGroupMember dataGroupMemberWithWeakConsistencyTrue =
- newDataGroupMemberWithSyncLeader(TestUtils.getNode(0), true);
+ @Test
+ public void testsyncLeaderWeakConsistencyCheckTrue() {
+ // 1. write request : Strong consistency level with syncLeader false
+ DataGroupMember dataGroupMemberWithWriteStrongConsistencyTrue =
+ newDataGroupMemberWithSyncLeaderTrue(TestUtils.getNode(0), false);
ClusterDescriptor.getInstance()
.getConfig()
.setConsistencyLevel(ConsistencyLevel.WEAK_CONSISTENCY);
- exception = null;
+ ClusterDescriptor.getInstance().getConfig().setMaxReadLogLag(500);
try {
-
dataGroupMemberWithWeakConsistencyTrue.syncLeaderWithConsistencyCheck(false);
+
+ PartitionedSnapshotLogManager partitionedSnapshotLogManager =
+ Mockito.mock(PartitionedSnapshotLogManager.class);
+
Mockito.when(partitionedSnapshotLogManager.getMaxHaveAppliedCommitIndex()).thenReturn(600L);
+
dataGroupMemberWithWriteStrongConsistencyTrue.setLogManager(partitionedSnapshotLogManager);
+
+ dataGroupMemberWithWriteStrongConsistencyTrue.waitUntilCatchUp(null);
} catch (CheckConsistencyException e) {
- exception = e;
+ Assert.fail();
}
- Assert.assertNull(exception);
}
}
diff --git
a/cluster/src/test/java/org/apache/iotdb/cluster/server/member/MetaGroupMemberTest.java
b/cluster/src/test/java/org/apache/iotdb/cluster/server/member/MetaGroupMemberTest.java
index 1a77abf..08914d1 100644
---
a/cluster/src/test/java/org/apache/iotdb/cluster/server/member/MetaGroupMemberTest.java
+++
b/cluster/src/test/java/org/apache/iotdb/cluster/server/member/MetaGroupMemberTest.java
@@ -193,7 +193,7 @@ public class MetaGroupMemberTest extends MemberTest {
DataGroupMember dataGroupMember =
new DataGroupMember(null, group, node, testMetaMember) {
@Override
- public boolean syncLeader() {
+ public boolean syncLeader(CheckConsistency checkConsistency) {
return true;
}