This is an automated email from the ASF dual-hosted git repository.

chaow pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/master by this push:
     new ba4cfaa  [IOTDB-1118] [Distributed] Limit query log lag in mid 
consistency level (#2766)
ba4cfaa is described below

commit ba4cfaae2ff103a2035a9d0be164c7f9f40d2741
Author: wangchao316 <[email protected]>
AuthorDate: Sat Mar 6 11:10:07 2021 +0800

    [IOTDB-1118] [Distributed] Limit query log lag in mid consistency level 
(#2766)
---
 .../resources/conf/iotdb-cluster.properties        |   6 +-
 .../apache/iotdb/cluster/config/ClusterConfig.java |  17 +-
 .../iotdb/cluster/config/ClusterDescriptor.java    |   4 +
 .../exception/CheckConsistencyException.java       |   4 +
 .../apache/iotdb/cluster/metadata/CMManager.java   |  28 ++-
 .../apache/iotdb/cluster/metadata/MetaPuller.java  |  11 +-
 .../iotdb/cluster/query/ClusterPlanExecutor.java   |   6 +-
 .../iotdb/cluster/server/DataClusterServer.java    |   6 +-
 .../iotdb/cluster/server/member/RaftMember.java    | 136 +++++++++----
 .../cluster/log/applier/DataLogApplierTest.java    |   3 +-
 .../cluster/server/member/DataGroupMemberTest.java |   2 +-
 .../iotdb/cluster/server/member/MemberTest.java    | 215 ++++++++++++++-------
 .../cluster/server/member/MetaGroupMemberTest.java |   2 +-
 13 files changed, 310 insertions(+), 130 deletions(-)

diff --git a/cluster/src/assembly/resources/conf/iotdb-cluster.properties 
b/cluster/src/assembly/resources/conf/iotdb-cluster.properties
index 1b55407..62aa133 100644
--- a/cluster/src/assembly/resources/conf/iotdb-cluster.properties
+++ b/cluster/src/assembly/resources/conf/iotdb-cluster.properties
@@ -153,4 +153,8 @@ enable_use_persist_log_on_disk_to_catch_up=true
 
 # The number of logs read on the disk at one time, which is mainly used to 
control the memory usage.
 # This value multiplied by the log size is about the amount of memory used to 
read logs from the disk at one time.
-max_number_of_logs_per_fetch_on_disk=1000
\ No newline at end of file
+max_number_of_logs_per_fetch_on_disk=1000
+
+# When consistency level is set to mid, query will fail if the log lag exceeds 
max_read_log_lag
+# This default value is 1000
+max_read_log_lag=1000
\ No newline at end of file
diff --git 
a/cluster/src/main/java/org/apache/iotdb/cluster/config/ClusterConfig.java 
b/cluster/src/main/java/org/apache/iotdb/cluster/config/ClusterConfig.java
index c7efacb..6fe99dc 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/config/ClusterConfig.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/config/ClusterConfig.java
@@ -147,16 +147,17 @@ public class ClusterConfig {
    */
   private boolean waitForSlowNode = true;
 
+  /**
+   * When consistency level is set to mid, query will fail if the log lag 
exceeds max_read_log_lag.
+   */
+  private long maxReadLogLag = 1000L;
+
   private boolean openServerRpcPort = false;
 
   public int getSelectorNumOfClientPool() {
     return selectorNumOfClientPool;
   }
 
-  public void setSelectorNumOfClientPool(int selectorNumOfClientPool) {
-    this.selectorNumOfClientPool = selectorNumOfClientPool;
-  }
-
   public int getMaxClientPerNodePerMember() {
     return maxClientPerNodePerMember;
   }
@@ -417,8 +418,12 @@ public class ClusterConfig {
     return waitForSlowNode;
   }
 
-  public void setWaitForSlowNode(boolean waitForSlowNode) {
-    this.waitForSlowNode = waitForSlowNode;
+  public long getMaxReadLogLag() {
+    return maxReadLogLag;
+  }
+
+  public void setMaxReadLogLag(long maxReadLogLag) {
+    this.maxReadLogLag = maxReadLogLag;
   }
 
   public String getInternalIp() {
diff --git 
a/cluster/src/main/java/org/apache/iotdb/cluster/config/ClusterDescriptor.java 
b/cluster/src/main/java/org/apache/iotdb/cluster/config/ClusterDescriptor.java
index 1fcdd30..631662f 100644
--- 
a/cluster/src/main/java/org/apache/iotdb/cluster/config/ClusterDescriptor.java
+++ 
b/cluster/src/main/java/org/apache/iotdb/cluster/config/ClusterDescriptor.java
@@ -278,6 +278,10 @@ public class ClusterDescriptor {
                 "enable_use_persist_log_on_disk_to_catch_up",
                 
String.valueOf(config.isEnableUsePersistLogOnDiskToCatchUp()))));
 
+    config.setMaxReadLogLag(
+        Long.parseLong(
+            properties.getProperty("max_read_log_lag", 
String.valueOf(config.getMaxReadLogLag()))));
+
     String consistencyLevel = properties.getProperty("consistency_level");
     if (consistencyLevel != null) {
       
config.setConsistencyLevel(ConsistencyLevel.getConsistencyLevel(consistencyLevel));
diff --git 
a/cluster/src/main/java/org/apache/iotdb/cluster/exception/CheckConsistencyException.java
 
b/cluster/src/main/java/org/apache/iotdb/cluster/exception/CheckConsistencyException.java
index 55b9722..5f3bca0 100644
--- 
a/cluster/src/main/java/org/apache/iotdb/cluster/exception/CheckConsistencyException.java
+++ 
b/cluster/src/main/java/org/apache/iotdb/cluster/exception/CheckConsistencyException.java
@@ -30,4 +30,8 @@ public class CheckConsistencyException extends Exception {
 
   public static final CheckConsistencyException 
CHECK_STRONG_CONSISTENCY_EXCEPTION =
       new CheckConsistencyException("strong consistency, sync with leader 
failed");
+
+  public static final CheckConsistencyException 
CHECK_MID_CONSISTENCY_EXCEPTION =
+      new CheckConsistencyException(
+          "mid consistency, localAppliedId is smaller than the 
leaderCommitId");
 }
diff --git 
a/cluster/src/main/java/org/apache/iotdb/cluster/metadata/CMManager.java 
b/cluster/src/main/java/org/apache/iotdb/cluster/metadata/CMManager.java
index 6cbf475..6eae80a 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/metadata/CMManager.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/metadata/CMManager.java
@@ -813,9 +813,13 @@ public class CMManager extends MManager {
   private void pullTimeSeriesSchemas(PartitionGroup partitionGroup, 
List<String> prefixPaths) {
     if (partitionGroup.contains(metaGroupMember.getThisNode())) {
       // the node is in the target group, synchronize with leader should be 
enough
-      metaGroupMember
-          .getLocalDataMember(partitionGroup.getHeader(), "Pull timeseries of 
" + prefixPaths)
-          .syncLeader();
+      try {
+        metaGroupMember
+            .getLocalDataMember(partitionGroup.getHeader(), "Pull timeseries 
of " + prefixPaths)
+            .syncLeader(null);
+      } catch (CheckConsistencyException e) {
+        logger.warn("Failed to check consistency.", e);
+      }
       return;
     }
 
@@ -1062,7 +1066,11 @@ public class CMManager extends MManager {
       if (partitionGroup.contains(metaGroupMember.getThisNode())) {
         // this node is a member of the group, perform a local query after 
synchronizing with the
         // leader
-        
metaGroupMember.getLocalDataMember(partitionGroup.getHeader()).syncLeader();
+        try {
+          
metaGroupMember.getLocalDataMember(partitionGroup.getHeader()).syncLeader(null);
+        } catch (CheckConsistencyException e) {
+          logger.warn("Failed to check consistency.", e);
+        }
         List<PartialPath> allTimeseriesName = 
getMatchedPathsLocally(pathUnderSG, withAlias);
         logger.debug(
             "{}: get matched paths of {} locally, result {}",
@@ -1198,7 +1206,11 @@ public class CMManager extends MManager {
       if (partitionGroup.contains(metaGroupMember.getThisNode())) {
         // this node is a member of the group, perform a local query after 
synchronizing with the
         // leader
-        
metaGroupMember.getLocalDataMember(partitionGroup.getHeader()).syncLeader();
+        try {
+          
metaGroupMember.getLocalDataMember(partitionGroup.getHeader()).syncLeader(null);
+        } catch (CheckConsistencyException e) {
+          logger.warn("Failed to check consistency.", e);
+        }
         Set<PartialPath> allDevices = getDevices(pathUnderSG);
         logger.debug(
             "{}: get matched paths of {} locally, result {}",
@@ -1794,7 +1806,11 @@ public class CMManager extends MManager {
     try {
       return super.getStorageGroupPath(path);
     } catch (StorageGroupNotSetException e) {
-      metaGroupMember.syncLeader();
+      try {
+        metaGroupMember.syncLeader(null);
+      } catch (CheckConsistencyException ex) {
+        logger.warn("Failed to check consistency.", e);
+      }
       return super.getStorageGroupPath(path);
     }
   }
diff --git 
a/cluster/src/main/java/org/apache/iotdb/cluster/metadata/MetaPuller.java 
b/cluster/src/main/java/org/apache/iotdb/cluster/metadata/MetaPuller.java
index e98b67c..260c3c2 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/metadata/MetaPuller.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/metadata/MetaPuller.java
@@ -23,6 +23,7 @@ import org.apache.iotdb.cluster.client.async.AsyncDataClient;
 import org.apache.iotdb.cluster.client.sync.SyncClientAdaptor;
 import org.apache.iotdb.cluster.client.sync.SyncDataClient;
 import org.apache.iotdb.cluster.config.ClusterDescriptor;
+import org.apache.iotdb.cluster.exception.CheckConsistencyException;
 import org.apache.iotdb.cluster.partition.PartitionGroup;
 import org.apache.iotdb.cluster.rpc.thrift.Node;
 import org.apache.iotdb.cluster.rpc.thrift.PullSchemaRequest;
@@ -130,9 +131,13 @@ public class MetaPuller {
       List<MeasurementSchema> results) {
     if (partitionGroup.contains(metaGroupMember.getThisNode())) {
       // the node is in the target group, synchronize with leader should be 
enough
-      metaGroupMember
-          .getLocalDataMember(partitionGroup.getHeader(), "Pull timeseries of 
" + prefixPaths)
-          .syncLeader();
+      try {
+        metaGroupMember
+            .getLocalDataMember(partitionGroup.getHeader(), "Pull timeseries 
of " + prefixPaths)
+            .syncLeader(null);
+      } catch (CheckConsistencyException e) {
+        logger.warn("Failed to check consistency.", e);
+      }
       int preSize = results.size();
       for (PartialPath prefixPath : prefixPaths) {
         IoTDB.metaManager.collectSeries(prefixPath, results);
diff --git 
a/cluster/src/main/java/org/apache/iotdb/cluster/query/ClusterPlanExecutor.java 
b/cluster/src/main/java/org/apache/iotdb/cluster/query/ClusterPlanExecutor.java
index e76be6c..2238546 100644
--- 
a/cluster/src/main/java/org/apache/iotdb/cluster/query/ClusterPlanExecutor.java
+++ 
b/cluster/src/main/java/org/apache/iotdb/cluster/query/ClusterPlanExecutor.java
@@ -502,7 +502,11 @@ public class ClusterPlanExecutor extends PlanExecutor {
 
   @Override
   protected List<StorageGroupMNode> getAllStorageGroupNodes() {
-    metaGroupMember.syncLeader();
+    try {
+      metaGroupMember.syncLeader(null);
+    } catch (CheckConsistencyException e) {
+      logger.warn("Failed to check consistency.", e);
+    }
     return IoTDB.metaManager.getAllStorageGroupNodes();
   }
 
diff --git 
a/cluster/src/main/java/org/apache/iotdb/cluster/server/DataClusterServer.java 
b/cluster/src/main/java/org/apache/iotdb/cluster/server/DataClusterServer.java
index 9f0f098..012569e 100644
--- 
a/cluster/src/main/java/org/apache/iotdb/cluster/server/DataClusterServer.java
+++ 
b/cluster/src/main/java/org/apache/iotdb/cluster/server/DataClusterServer.java
@@ -552,7 +552,11 @@ public class DataClusterServer extends RaftServer
   }
 
   private void removeMember(Node header, DataGroupMember dataGroupMember) {
-    dataGroupMember.syncLeader();
+    try {
+      dataGroupMember.syncLeader(null);
+    } catch (CheckConsistencyException e) {
+      logger.warn("Failed to check consistency.", e);
+    }
     dataGroupMember.setReadOnly();
     dataGroupMember.stop();
     stoppedMemberManager.put(header, dataGroupMember);
diff --git 
a/cluster/src/main/java/org/apache/iotdb/cluster/server/member/RaftMember.java 
b/cluster/src/main/java/org/apache/iotdb/cluster/server/member/RaftMember.java
index 54a632e..8783e6c 100644
--- 
a/cluster/src/main/java/org/apache/iotdb/cluster/server/member/RaftMember.java
+++ 
b/cluster/src/main/java/org/apache/iotdb/cluster/server/member/RaftMember.java
@@ -107,7 +107,7 @@ import java.util.concurrent.atomic.AtomicReference;
  */
 @SuppressWarnings("java:S3077") // reference volatile is enough
 public abstract class RaftMember {
-
+  private static final Logger logger = 
LoggerFactory.getLogger(RaftMember.class);
   public static final boolean USE_LOG_DISPATCHER = false;
 
   private static final String MSG_FORWARD_TIMEOUT = "{}: Forward {} to {} time 
out";
@@ -115,7 +115,6 @@ public abstract class RaftMember {
       "{}: encountered an error when forwarding {} to" + " {}";
   private static final String MSG_NO_LEADER_COMMIT_INDEX =
       "{}: Cannot request commit index from {}";
-  private static final Logger logger = 
LoggerFactory.getLogger(RaftMember.class);
   private static final String MSG_NO_LEADER_IN_SYNC = "{}: No leader is found 
when synchronizing";
   public static final String MSG_LOG_IS_ACCEPTED = "{}: log {} is accepted";
   /**
@@ -279,7 +278,7 @@ public abstract class RaftMember {
         Executors.newFixedThreadPool(
             Runtime.getRuntime().availableProcessors() * 10,
             new ThreadFactoryBuilder().setNameFormat(getName() + 
"-AppendLog%d").build());
-    if (!ClusterDescriptor.getInstance().getConfig().isUseAsyncServer()) {
+    if (!config.isUseAsyncServer()) {
       serialToParallelPool =
           new ThreadPoolExecutor(
               allNodes.size(),
@@ -731,19 +730,16 @@ public abstract class RaftMember {
   public void syncLeaderWithConsistencyCheck(boolean isWriteRequest)
       throws CheckConsistencyException {
     if (isWriteRequest) {
-      if (!syncLeader()) {
-        throw CheckConsistencyException.CHECK_STRONG_CONSISTENCY_EXCEPTION;
-      }
+      syncLeader(new StrongCheckConsistency());
     } else {
-      switch 
(ClusterDescriptor.getInstance().getConfig().getConsistencyLevel()) {
+      switch (config.getConsistencyLevel()) {
         case STRONG_CONSISTENCY:
-          if (!syncLeader()) {
-            throw CheckConsistencyException.CHECK_STRONG_CONSISTENCY_EXCEPTION;
-          }
+          syncLeader(new StrongCheckConsistency());
           return;
         case MID_CONSISTENCY:
-          // do not care success or not
-          syncLeader();
+          // if leaderCommitId bigger than localAppliedId a value,
+          // will throw CHECK_MID_CONSISTENCY_EXCEPTION
+          syncLeader(new MidCheckConsistency());
           return;
         case WEAK_CONSISTENCY:
           // do nothing
@@ -751,8 +747,64 @@ public abstract class RaftMember {
         default:
           // this should not happen in theory
           throw new CheckConsistencyException(
-              "unknown consistency="
-                  + 
ClusterDescriptor.getInstance().getConfig().getConsistencyLevel().name());
+              "unknown consistency=" + config.getConsistencyLevel().name());
+      }
+    }
+  }
+
+  /** call back after syncLeader */
+  public interface CheckConsistency {
+
+    /**
+     * deal leaderCommitId and localAppliedId after syncLeader
+     *
+     * @param leaderCommitId leader commit id
+     * @param localAppliedId local applied id
+     * @throws CheckConsistencyException maybe throw 
CheckConsistencyException, which is defined in
+     *     implements.
+     */
+    void postCheckConsistency(long leaderCommitId, long localAppliedId)
+        throws CheckConsistencyException;
+  }
+
+  public static class MidCheckConsistency implements CheckConsistency {
+
+    /**
+     * if leaderCommitId - localAppliedId > MaxReadLogLag, will throw
+     * CHECK_MID_CONSISTENCY_EXCEPTION
+     *
+     * @param leaderCommitId leader commit id
+     * @param localAppliedId local applied id
+     * @throws CheckConsistencyException
+     */
+    @Override
+    public void postCheckConsistency(long leaderCommitId, long localAppliedId)
+        throws CheckConsistencyException {
+      if (leaderCommitId == Long.MAX_VALUE
+          || leaderCommitId == Long.MIN_VALUE
+          || leaderCommitId - localAppliedId
+              > 
ClusterDescriptor.getInstance().getConfig().getMaxReadLogLag()) {
+        throw CheckConsistencyException.CHECK_MID_CONSISTENCY_EXCEPTION;
+      }
+    }
+  }
+
+  public static class StrongCheckConsistency implements CheckConsistency {
+
+    /**
+     * if leaderCommitId > localAppliedId, will throw 
CHECK_STRONG_CONSISTENCY_EXCEPTION
+     *
+     * @param leaderCommitId leader commit id
+     * @param localAppliedId local applied id
+     * @throws CheckConsistencyException
+     */
+    @Override
+    public void postCheckConsistency(long leaderCommitId, long localAppliedId)
+        throws CheckConsistencyException {
+      if (leaderCommitId > localAppliedId
+          || leaderCommitId == Long.MAX_VALUE
+          || leaderCommitId == Long.MIN_VALUE) {
+        throw CheckConsistencyException.CHECK_STRONG_CONSISTENCY_EXCEPTION;
       }
     }
   }
@@ -761,9 +813,12 @@ public abstract class RaftMember {
    * Request and check the leader's commitId to see whether this node has 
caught up. If not, wait
    * until this node catches up.
    *
+   * @param checkConsistency check after syncleader
    * @return true if the node has caught up, false otherwise
+   * @throws CheckConsistencyException if leaderCommitId bigger than 
localAppliedId a threshold
+   *     value after timeout
    */
-  public boolean syncLeader() {
+  public boolean syncLeader(CheckConsistency checkConsistency) throws 
CheckConsistencyException {
     if (character == NodeCharacter.LEADER) {
       return true;
     }
@@ -777,7 +832,7 @@ public abstract class RaftMember {
       return true;
     }
     logger.debug("{}: try synchronizing with the leader {}", name, 
leader.get());
-    return waitUntilCatchUp();
+    return waitUntilCatchUp(checkConsistency);
   }
 
   /** Wait until the leader of this node becomes known or time out. */
@@ -806,32 +861,42 @@ public abstract class RaftMember {
    * it.
    *
    * @return true if this node has caught up before timeout, false otherwise
+   * @throws CheckConsistencyException if leaderCommitId bigger than 
localAppliedId a threshold
+   *     value after timeout
    */
-  private boolean waitUntilCatchUp() {
-    long startTime = System.currentTimeMillis();
-    long waitedTime = 0;
-    long leaderCommitId;
+  protected boolean waitUntilCatchUp(CheckConsistency checkConsistency)
+      throws CheckConsistencyException {
+    long leaderCommitId = Long.MIN_VALUE;
     try {
-      leaderCommitId =
-          ClusterDescriptor.getInstance().getConfig().isUseAsyncServer()
-              ? requestCommitIdAsync()
-              : requestCommitIdSync();
-      if (leaderCommitId == Long.MAX_VALUE) {
-        // Long.MAX_VALUE representing there is a network issue
-        return false;
-      }
+      leaderCommitId = config.isUseAsyncServer() ? requestCommitIdAsync() : 
requestCommitIdSync();
+      return syncLocalApply(leaderCommitId);
     } catch (TException e) {
       logger.error(MSG_NO_LEADER_COMMIT_INDEX, name, leader.get(), e);
-      return false;
     } catch (InterruptedException e) {
       Thread.currentThread().interrupt();
       logger.error(MSG_NO_LEADER_COMMIT_INDEX, name, leader.get(), e);
-      return false;
+    } finally {
+      if (checkConsistency != null) {
+        checkConsistency.postCheckConsistency(
+            leaderCommitId, logManager.getMaxHaveAppliedCommitIndex());
+      }
     }
+    return false;
+  }
 
+  /**
+   * sync local applyId to leader commitId
+   *
+   * @param leaderCommitId leader commit id
+   * @return true if leaderCommitId <= localAppliedId
+   */
+  private boolean syncLocalApply(long leaderCommitId) {
+    long startTime = System.currentTimeMillis();
+    long waitedTime = 0;
+    long localAppliedId = 0;
     while (waitedTime < RaftServer.getSyncLeaderMaxWaitMs()) {
       try {
-        long localAppliedId = logManager.getMaxHaveAppliedCommitIndex();
+        localAppliedId = logManager.getMaxHaveAppliedCommitIndex();
         logger.debug("{}: synchronizing commitIndex {}/{}", name, 
localAppliedId, leaderCommitId);
         if (leaderCommitId <= localAppliedId) {
           // this node has caught up
@@ -854,6 +919,7 @@ public abstract class RaftMember {
       }
     }
     logger.warn("{}: Failed to synchronize with the leader after {}ms", name, 
waitedTime);
+
     return false;
   }
 
@@ -984,7 +1050,7 @@ public abstract class RaftMember {
   }
 
   @SuppressWarnings("java:S2274") // enable timeout
-  private long requestCommitIdAsync() throws TException, InterruptedException {
+  protected long requestCommitIdAsync() throws TException, 
InterruptedException {
     // use Long.MAX_VALUE to indicate a timeout
     AtomicReference<Long> commitIdResult = new 
AtomicReference<>(Long.MAX_VALUE);
     AsyncClient client = getAsyncClient(leader.get());
@@ -1152,7 +1218,7 @@ public abstract class RaftMember {
     logger.debug("{}: Forward {} to node {}", name, plan, node);
 
     TSStatus status;
-    if (ClusterDescriptor.getInstance().getConfig().isUseAsyncServer()) {
+    if (config.isUseAsyncServer()) {
       status = forwardPlanAsync(plan, node, header);
     } else {
       status = forwardPlanSync(plan, node, header);
@@ -1640,7 +1706,7 @@ public abstract class RaftMember {
       return;
     }
 
-    if (ClusterDescriptor.getInstance().getConfig().isUseAsyncServer()) {
+    if (config.isUseAsyncServer()) {
       sendLogAsync(log, voteCounter, node, leaderShipStale, newLeaderTerm, 
request, peer);
     } else {
       sendLogSync(log, voteCounter, node, leaderShipStale, newLeaderTerm, 
request, peer);
@@ -1653,7 +1719,7 @@ public abstract class RaftMember {
    */
   @SuppressWarnings("java:S2445") // safe synchronized
   public boolean waitForPrevLog(Peer peer, Log log) {
-    final int maxLogDiff = 
ClusterDescriptor.getInstance().getConfig().getMaxNumOfLogsInMem();
+    final int maxLogDiff = config.getMaxNumOfLogsInMem();
     long waitStart = System.currentTimeMillis();
     long alreadyWait = 0;
     // if the peer falls behind too much, wait until it catches up, otherwise 
there may be too
diff --git 
a/cluster/src/test/java/org/apache/iotdb/cluster/log/applier/DataLogApplierTest.java
 
b/cluster/src/test/java/org/apache/iotdb/cluster/log/applier/DataLogApplierTest.java
index ff156e8..af6272e 100644
--- 
a/cluster/src/test/java/org/apache/iotdb/cluster/log/applier/DataLogApplierTest.java
+++ 
b/cluster/src/test/java/org/apache/iotdb/cluster/log/applier/DataLogApplierTest.java
@@ -42,6 +42,7 @@ import 
org.apache.iotdb.cluster.rpc.thrift.RaftService.AsyncClient;
 import org.apache.iotdb.cluster.rpc.thrift.TNodeStatus;
 import org.apache.iotdb.cluster.server.NodeCharacter;
 import org.apache.iotdb.cluster.server.member.DataGroupMember;
+import org.apache.iotdb.cluster.server.member.RaftMember;
 import org.apache.iotdb.cluster.server.monitor.NodeStatusManager;
 import org.apache.iotdb.cluster.server.service.DataAsyncService;
 import org.apache.iotdb.cluster.server.service.MetaAsyncService;
@@ -91,7 +92,7 @@ public class DataLogApplierTest extends IoTDBTest {
   private TestMetaGroupMember testMetaGroupMember =
       new TestMetaGroupMember() {
         @Override
-        public boolean syncLeader() {
+        public boolean syncLeader(RaftMember.CheckConsistency 
checkConsistency) {
           return true;
         }
 
diff --git 
a/cluster/src/test/java/org/apache/iotdb/cluster/server/member/DataGroupMemberTest.java
 
b/cluster/src/test/java/org/apache/iotdb/cluster/server/member/DataGroupMemberTest.java
index 8cab2c1..84d069a 100644
--- 
a/cluster/src/test/java/org/apache/iotdb/cluster/server/member/DataGroupMemberTest.java
+++ 
b/cluster/src/test/java/org/apache/iotdb/cluster/server/member/DataGroupMemberTest.java
@@ -183,7 +183,7 @@ public class DataGroupMemberTest extends MemberTest {
     DataGroupMember dataGroupMember =
         new DataGroupMember(new Factory(), nodes, node, testMetaMember) {
           @Override
-          public boolean syncLeader() {
+          public boolean syncLeader(CheckConsistency checkConsistency) {
             return true;
           }
 
diff --git 
a/cluster/src/test/java/org/apache/iotdb/cluster/server/member/MemberTest.java 
b/cluster/src/test/java/org/apache/iotdb/cluster/server/member/MemberTest.java
index 724b565..55ee3f3 100644
--- 
a/cluster/src/test/java/org/apache/iotdb/cluster/server/member/MemberTest.java
+++ 
b/cluster/src/test/java/org/apache/iotdb/cluster/server/member/MemberTest.java
@@ -46,6 +46,7 @@ import org.apache.iotdb.cluster.rpc.thrift.Node;
 import org.apache.iotdb.cluster.rpc.thrift.RaftService.AsyncClient;
 import org.apache.iotdb.cluster.rpc.thrift.TNodeStatus;
 import org.apache.iotdb.cluster.server.NodeCharacter;
+import org.apache.iotdb.cluster.server.RaftServer;
 import org.apache.iotdb.cluster.server.Response;
 import org.apache.iotdb.db.conf.IoTDBDescriptor;
 import org.apache.iotdb.db.exception.metadata.MetadataException;
@@ -63,6 +64,7 @@ import org.junit.After;
 import org.junit.Assert;
 import org.junit.Before;
 import org.junit.Test;
+import org.mockito.Mockito;
 
 import java.io.File;
 import java.io.IOException;
@@ -95,6 +97,9 @@ public class MemberTest {
   private boolean prevUseAsyncApplier;
   private boolean prevEnableWAL;
 
+  private int syncLeaderMaxWait;
+  private long heartBeatInterval;
+
   @Before
   public void setUp() throws Exception {
     prevUseAsyncApplier = 
ClusterDescriptor.getInstance().getConfig().isUseAsyncApplier();
@@ -109,6 +114,12 @@ public class MemberTest {
     IoTDBDescriptor.getInstance().getConfig().setEnableWal(false);
     RaftMember.setWaitLeaderTimeMs(10);
 
+    syncLeaderMaxWait = RaftServer.getSyncLeaderMaxWaitMs();
+    heartBeatInterval = RaftServer.getHeartBeatIntervalMs();
+
+    RaftServer.setSyncLeaderMaxWaitMs(100);
+    RaftServer.setHeartBeatIntervalMs(100);
+
     allNodes = new PartitionGroup();
     for (int i = 0; i < 100; i += 10) {
       allNodes.add(TestUtils.getNode(i));
@@ -183,6 +194,9 @@ public class MemberTest {
     
ClusterDescriptor.getInstance().getConfig().setRaftLogBufferSize(preLogBufferSize);
     
ClusterDescriptor.getInstance().getConfig().setUseAsyncApplier(prevUseAsyncApplier);
     IoTDBDescriptor.getInstance().getConfig().setEnableWal(prevEnableWAL);
+
+    RaftServer.setSyncLeaderMaxWaitMs(syncLeaderMaxWait);
+    RaftServer.setHeartBeatIntervalMs(heartBeatInterval);
   }
 
   DataGroupMember getDataGroupMember(Node node) {
@@ -194,7 +208,7 @@ public class MemberTest {
         new TestDataGroupMember(node, partitionTable.getHeaderGroup(node)) {
 
           @Override
-          public boolean syncLeader() {
+          public boolean syncLeader(RaftMember.CheckConsistency 
checkConsistency) {
             return true;
           }
 
@@ -304,16 +318,21 @@ public class MemberTest {
     return ret;
   }
 
-  private DataGroupMember newDataGroupMemberWithSyncLeader(Node node, boolean 
syncLeader) {
+  private DataGroupMember newDataGroupMemberWithSyncLeaderFalse(Node node, 
boolean syncLeader) {
     DataGroupMember newMember =
         new TestDataGroupMember(node, partitionTable.getHeaderGroup(node)) {
 
           @Override
-          public boolean syncLeader() {
+          public boolean syncLeader(RaftMember.CheckConsistency 
checkConsistency) {
             return syncLeader;
           }
 
           @Override
+          protected long requestCommitIdAsync() {
+            return 5;
+          }
+
+          @Override
           public long appendEntry(AppendEntryRequest request) {
             return Response.RESPONSE_AGREE;
           }
@@ -336,121 +355,169 @@ public class MemberTest {
     return newMember;
   }
 
-  @Test
-  public void testsyncLeaderWithConsistencyCheck() {
+  private DataGroupMember newDataGroupMemberWithSyncLeaderTrue(Node node, 
boolean syncLeader) {
+    DataGroupMember newMember =
+        new TestDataGroupMember(node, partitionTable.getHeaderGroup(node)) {
+
+          @Override
+          public boolean syncLeader(RaftMember.CheckConsistency 
checkConsistency) {
+            return syncLeader;
+          }
+
+          @Override
+          protected long requestCommitIdAsync() {
+            return 1000L;
+          }
+
+          @Override
+          public long appendEntry(AppendEntryRequest request) {
+            return Response.RESPONSE_AGREE;
+          }
 
+          @Override
+          public AsyncClient getAsyncClient(Node node) {
+            try {
+              return new TestAsyncDataClient(node, dataGroupMemberMap);
+            } catch (IOException e) {
+              return null;
+            }
+          }
+        };
+    newMember.setThisNode(node);
+    newMember.setMetaGroupMember(testMetaMember);
+    newMember.setLeader(node);
+    newMember.setCharacter(NodeCharacter.LEADER);
+    newMember.setLogManager(new TestPartitionedLogManager());
+    newMember.setAppendLogThreadPool(testThreadPool);
+    return newMember;
+  }
+
+  @Test
+  public void testsyncLeaderStrongConsistencyCheckFalse() {
     // 1. write request : Strong consistency level with syncLeader false
     DataGroupMember dataGroupMemberWithWriteStrongConsistencyFalse =
-        newDataGroupMemberWithSyncLeader(TestUtils.getNode(0), false);
+        newDataGroupMemberWithSyncLeaderFalse(TestUtils.getNode(0), false);
     ClusterDescriptor.getInstance()
         .getConfig()
-        .setConsistencyLevel(ConsistencyLevel.WEAK_CONSISTENCY);
-    CheckConsistencyException exception = null;
+        .setConsistencyLevel(ConsistencyLevel.STRONG_CONSISTENCY);
     try {
-      
dataGroupMemberWithWriteStrongConsistencyFalse.syncLeaderWithConsistencyCheck(true);
+      dataGroupMemberWithWriteStrongConsistencyFalse.waitUntilCatchUp(
+          new RaftMember.StrongCheckConsistency());
     } catch (CheckConsistencyException e) {
-      exception = e;
+      Assert.assertNotNull(e);
+      
Assert.assertEquals(CheckConsistencyException.CHECK_STRONG_CONSISTENCY_EXCEPTION,
 e);
     }
-    Assert.assertNotNull(exception);
-    
Assert.assertEquals(CheckConsistencyException.CHECK_STRONG_CONSISTENCY_EXCEPTION,
 exception);
+  }
 
-    // 2. write request : Strong consistency level with syncLeader true
+  @Test
+  public void testsyncLeaderStrongConsistencyCheckTrue() {
+    // 1. write request : Strong consistency level with syncLeader false
     DataGroupMember dataGroupMemberWithWriteStrongConsistencyTrue =
-        newDataGroupMemberWithSyncLeader(TestUtils.getNode(0), true);
-    ClusterDescriptor.getInstance()
-        .getConfig()
-        .setConsistencyLevel(ConsistencyLevel.WEAK_CONSISTENCY);
-    exception = null;
-    try {
-      
dataGroupMemberWithWriteStrongConsistencyTrue.syncLeaderWithConsistencyCheck(true);
-    } catch (CheckConsistencyException e) {
-      exception = e;
-    }
-    Assert.assertNull(exception);
-
-    // 3. Strong consistency level with syncLeader false
-    DataGroupMember dataGroupMemberWithStrongConsistencyFalse =
-        newDataGroupMemberWithSyncLeader(TestUtils.getNode(0), false);
+        newDataGroupMemberWithSyncLeaderTrue(TestUtils.getNode(0), false);
     ClusterDescriptor.getInstance()
         .getConfig()
         .setConsistencyLevel(ConsistencyLevel.STRONG_CONSISTENCY);
-    exception = null;
     try {
-      
dataGroupMemberWithStrongConsistencyFalse.syncLeaderWithConsistencyCheck(false);
-    } catch (CheckConsistencyException e) {
-      exception = e;
-    }
-    Assert.assertNotNull(exception);
-    
Assert.assertEquals(CheckConsistencyException.CHECK_STRONG_CONSISTENCY_EXCEPTION,
 exception);
 
-    // 4. Strong consistency level with syncLeader true
-    DataGroupMember dataGroupMemberWithStrongConsistencyTrue =
-        newDataGroupMemberWithSyncLeader(TestUtils.getNode(0), true);
-    ClusterDescriptor.getInstance()
-        .getConfig()
-        .setConsistencyLevel(ConsistencyLevel.STRONG_CONSISTENCY);
-    exception = null;
-    try {
-      
dataGroupMemberWithStrongConsistencyTrue.syncLeaderWithConsistencyCheck(false);
+      PartitionedSnapshotLogManager partitionedSnapshotLogManager =
+          Mockito.mock(PartitionedSnapshotLogManager.class);
+      
Mockito.when(partitionedSnapshotLogManager.getMaxHaveAppliedCommitIndex()).thenReturn(1000L);
+      
dataGroupMemberWithWriteStrongConsistencyTrue.setLogManager(partitionedSnapshotLogManager);
+
+      dataGroupMemberWithWriteStrongConsistencyTrue.waitUntilCatchUp(
+          new RaftMember.StrongCheckConsistency());
     } catch (CheckConsistencyException e) {
-      exception = e;
+      Assert.fail();
     }
-    Assert.assertNull(exception);
+  }
 
-    // 5. Mid consistency level with syncLeader false
-    DataGroupMember dataGroupMemberWithMidConsistencyFalse =
-        newDataGroupMemberWithSyncLeader(TestUtils.getNode(0), false);
+  @Test
+  public void testsyncLeaderMidConsistencyCheckFalse() {
+    // 1. write request : Strong consistency level with syncLeader false
+    DataGroupMember dataGroupMemberWithWriteStrongConsistencyFalse =
+        newDataGroupMemberWithSyncLeaderFalse(TestUtils.getNode(0), false);
     ClusterDescriptor.getInstance()
         .getConfig()
         .setConsistencyLevel(ConsistencyLevel.MID_CONSISTENCY);
-    exception = null;
+    ClusterDescriptor.getInstance().getConfig().setMaxReadLogLag(1);
     try {
-      
dataGroupMemberWithMidConsistencyFalse.syncLeaderWithConsistencyCheck(false);
+
+      PartitionedSnapshotLogManager partitionedSnapshotLogManager =
+          Mockito.mock(PartitionedSnapshotLogManager.class);
+      
Mockito.when(partitionedSnapshotLogManager.getMaxHaveAppliedCommitIndex()).thenReturn(-2L);
+      
dataGroupMemberWithWriteStrongConsistencyFalse.setLogManager(partitionedSnapshotLogManager);
+
+      dataGroupMemberWithWriteStrongConsistencyFalse.waitUntilCatchUp(
+          new RaftMember.MidCheckConsistency());
     } catch (CheckConsistencyException e) {
-      exception = e;
+      
Assert.assertEquals(CheckConsistencyException.CHECK_MID_CONSISTENCY_EXCEPTION, 
e);
     }
-    Assert.assertNull(exception);
+  }
 
-    // 6. Mid consistency level with syncLeader true
-    DataGroupMember dataGroupMemberWithMidConsistencyTrue =
-        newDataGroupMemberWithSyncLeader(TestUtils.getNode(0), true);
+  @Test
+  public void testsyncLeaderMidConsistencyCheckTrue() {
+    // 1. write request : Strong consistency level with syncLeader false
+    DataGroupMember dataGroupMemberWithWriteStrongConsistencyTrue =
+        newDataGroupMemberWithSyncLeaderTrue(TestUtils.getNode(0), false);
     ClusterDescriptor.getInstance()
         .getConfig()
         .setConsistencyLevel(ConsistencyLevel.MID_CONSISTENCY);
-    exception = null;
+    ClusterDescriptor.getInstance().getConfig().setMaxReadLogLag(500);
     try {
-      
dataGroupMemberWithMidConsistencyTrue.syncLeaderWithConsistencyCheck(false);
+
+      PartitionedSnapshotLogManager partitionedSnapshotLogManager =
+          Mockito.mock(PartitionedSnapshotLogManager.class);
+      
Mockito.when(partitionedSnapshotLogManager.getMaxHaveAppliedCommitIndex()).thenReturn(600L);
+      
dataGroupMemberWithWriteStrongConsistencyTrue.setLogManager(partitionedSnapshotLogManager);
+
+      dataGroupMemberWithWriteStrongConsistencyTrue.waitUntilCatchUp(
+          new RaftMember.MidCheckConsistency());
     } catch (CheckConsistencyException e) {
-      exception = e;
+      Assert.fail();
     }
-    Assert.assertNull(exception);
+  }
 
-    // 7. Weak consistency level with syncLeader false
-    DataGroupMember dataGroupMemberWithWeakConsistencyFalse =
-        newDataGroupMemberWithSyncLeader(TestUtils.getNode(0), false);
+  @Test
+  public void testsyncLeaderWeakConsistencyCheckFalse() {
+    // 1. write request : Strong consistency level with syncLeader false
+    DataGroupMember dataGroupMemberWithWriteStrongConsistencyFalse =
+        newDataGroupMemberWithSyncLeaderFalse(TestUtils.getNode(0), false);
     ClusterDescriptor.getInstance()
         .getConfig()
         .setConsistencyLevel(ConsistencyLevel.WEAK_CONSISTENCY);
-    exception = null;
+    ClusterDescriptor.getInstance().getConfig().setMaxReadLogLag(1);
     try {
-      
dataGroupMemberWithWeakConsistencyFalse.syncLeaderWithConsistencyCheck(false);
+
+      PartitionedSnapshotLogManager partitionedSnapshotLogManager =
+          Mockito.mock(PartitionedSnapshotLogManager.class);
+      
Mockito.when(partitionedSnapshotLogManager.getMaxHaveAppliedCommitIndex()).thenReturn(-2L);
+      
dataGroupMemberWithWriteStrongConsistencyFalse.setLogManager(partitionedSnapshotLogManager);
+
+      dataGroupMemberWithWriteStrongConsistencyFalse.waitUntilCatchUp(null);
     } catch (CheckConsistencyException e) {
-      exception = e;
+      Assert.fail();
     }
-    Assert.assertNull(exception);
+  }
 
-    // 8. Weak consistency level with syncLeader true
-    DataGroupMember dataGroupMemberWithWeakConsistencyTrue =
-        newDataGroupMemberWithSyncLeader(TestUtils.getNode(0), true);
+  @Test
+  public void testsyncLeaderWeakConsistencyCheckTrue() {
+    // 1. write request : Strong consistency level with syncLeader false
+    DataGroupMember dataGroupMemberWithWriteStrongConsistencyTrue =
+        newDataGroupMemberWithSyncLeaderTrue(TestUtils.getNode(0), false);
     ClusterDescriptor.getInstance()
         .getConfig()
         .setConsistencyLevel(ConsistencyLevel.WEAK_CONSISTENCY);
-    exception = null;
+    ClusterDescriptor.getInstance().getConfig().setMaxReadLogLag(500);
     try {
-      
dataGroupMemberWithWeakConsistencyTrue.syncLeaderWithConsistencyCheck(false);
+
+      PartitionedSnapshotLogManager partitionedSnapshotLogManager =
+          Mockito.mock(PartitionedSnapshotLogManager.class);
+      
Mockito.when(partitionedSnapshotLogManager.getMaxHaveAppliedCommitIndex()).thenReturn(600L);
+      
dataGroupMemberWithWriteStrongConsistencyTrue.setLogManager(partitionedSnapshotLogManager);
+
+      dataGroupMemberWithWriteStrongConsistencyTrue.waitUntilCatchUp(null);
     } catch (CheckConsistencyException e) {
-      exception = e;
+      Assert.fail();
     }
-    Assert.assertNull(exception);
   }
 }
diff --git 
a/cluster/src/test/java/org/apache/iotdb/cluster/server/member/MetaGroupMemberTest.java
 
b/cluster/src/test/java/org/apache/iotdb/cluster/server/member/MetaGroupMemberTest.java
index 1a77abf..08914d1 100644
--- 
a/cluster/src/test/java/org/apache/iotdb/cluster/server/member/MetaGroupMemberTest.java
+++ 
b/cluster/src/test/java/org/apache/iotdb/cluster/server/member/MetaGroupMemberTest.java
@@ -193,7 +193,7 @@ public class MetaGroupMemberTest extends MemberTest {
     DataGroupMember dataGroupMember =
         new DataGroupMember(null, group, node, testMetaMember) {
           @Override
-          public boolean syncLeader() {
+          public boolean syncLeader(CheckConsistency checkConsistency) {
             return true;
           }
 

Reply via email to