This is an automated email from the ASF dual-hosted git repository.

tanxinyu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/master by this push:
     new fa7fc93b040 [IOTDB-6251] Make read in SchemaRegion linearizable by 
default (#11571)
fa7fc93b040 is described below

commit fa7fc93b04029b776db402be50717c7b4ef8a611
Author: William Song <[email protected]>
AuthorDate: Fri Nov 24 23:12:21 2023 +0800

    [IOTDB-6251] Make read in SchemaRegion linearizable by default (#11571)
---
 .../apache/iotdb/consensus/config/RatisConfig.java |  2 +-
 ...ion.java => RatisReadUnavailableException.java} | 11 ++++----
 .../ratis/ApplicationStateMachineProxy.java        | 17 ++++--------
 .../iotdb/consensus/ratis/RatisConsensus.java      | 32 +++++++++++++++++-----
 .../apache/iotdb/consensus/ratis/utils/Utils.java  |  8 ++----
 .../iotdb/consensus/ratis/RecoverReadTest.java     |  4 +--
 .../apache/iotdb/consensus/ratis/TestUtils.java    |  4 +--
 .../db/consensus/SchemaRegionConsensusImpl.java    |  1 +
 8 files changed, 44 insertions(+), 35 deletions(-)

diff --git 
a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/config/RatisConfig.java
 
b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/config/RatisConfig.java
index 6ea4d71604b..ad40495972b 100644
--- 
a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/config/RatisConfig.java
+++ 
b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/config/RatisConfig.java
@@ -1123,7 +1123,7 @@ public class RatisConfig {
     }
 
     public static class Builder {
-      private Read.Option readOption = Option.LINEARIZABLE;
+      private Read.Option readOption = Option.DEFAULT;
       private TimeDuration readTimeout = TimeDuration.valueOf(10, 
TimeUnit.SECONDS);
 
       public Read.Builder setReadOption(Read.Option readOption) {
diff --git 
a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/exception/RatisUnderRecoveryException.java
 
b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/exception/RatisReadUnavailableException.java
similarity index 71%
rename from 
iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/exception/RatisUnderRecoveryException.java
rename to 
iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/exception/RatisReadUnavailableException.java
index edf67629890..d5ac76b93fe 100644
--- 
a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/exception/RatisUnderRecoveryException.java
+++ 
b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/exception/RatisReadUnavailableException.java
@@ -19,14 +19,13 @@
 
 package org.apache.iotdb.consensus.exception;
 
-/** RaftServer is redoing RaftLog. Unable to serve linearizable read requests. 
*/
-public class RatisUnderRecoveryException extends ConsensusException {
+/** RaftServer is unable to serve linearizable read requests. */
+public class RatisReadUnavailableException extends ConsensusException {
 
-  public RatisUnderRecoveryException(Throwable cause) {
+  public RatisReadUnavailableException(Throwable cause) {
     super(
-        "Raft Server is redoing Raft Log and cannot serve read requests now. "
-            + "Please try read later: "
-            + cause,
+        "Raft Server cannot serve read requests now (leader is unknown or 
under recovery). "
+            + "Please try read later: ",
         cause);
   }
 }
diff --git 
a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/ratis/ApplicationStateMachineProxy.java
 
b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/ratis/ApplicationStateMachineProxy.java
index 1460398ab05..06e7df52891 100644
--- 
a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/ratis/ApplicationStateMachineProxy.java
+++ 
b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/ratis/ApplicationStateMachineProxy.java
@@ -21,7 +21,6 @@ package org.apache.iotdb.consensus.ratis;
 
 import org.apache.iotdb.common.rpc.thrift.TConsensusGroupType;
 import org.apache.iotdb.common.rpc.thrift.TSStatus;
-import org.apache.iotdb.commons.consensus.ConsensusGroupId;
 import org.apache.iotdb.commons.service.metric.PerformanceOverviewMetrics;
 import org.apache.iotdb.consensus.IStateMachine;
 import org.apache.iotdb.consensus.common.DataSet;
@@ -55,10 +54,8 @@ import java.io.File;
 import java.io.IOException;
 import java.nio.file.Files;
 import java.nio.file.StandardCopyOption;
-import java.util.Optional;
 import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.function.BiConsumer;
 
 public class ApplicationStateMachineProxy extends BaseStateMachine {
 
@@ -68,9 +65,8 @@ public class ApplicationStateMachineProxy extends 
BaseStateMachine {
   private final IStateMachine applicationStateMachine;
   private final SnapshotStorage snapshotStorage;
   private final RaftGroupId groupId;
-  private final ConsensusGroupId consensusGroupId;
   private final TConsensusGroupType consensusGroupType;
-  private final ConcurrentHashMap<ConsensusGroupId, AtomicBoolean> 
canStaleRead;
+  private final BiConsumer<RaftGroupMemberId, RaftPeerId> leaderChangeListener;
 
   ApplicationStateMachineProxy(IStateMachine stateMachine, RaftGroupId id) {
     this(stateMachine, id, null);
@@ -79,11 +75,10 @@ public class ApplicationStateMachineProxy extends 
BaseStateMachine {
   ApplicationStateMachineProxy(
       IStateMachine stateMachine,
       RaftGroupId id,
-      ConcurrentHashMap<ConsensusGroupId, AtomicBoolean> canStaleRead) {
+      BiConsumer<RaftGroupMemberId, RaftPeerId> onLeaderChanged) {
     this.applicationStateMachine = stateMachine;
-    this.canStaleRead = canStaleRead;
+    this.leaderChangeListener = onLeaderChanged;
     this.groupId = id;
-    this.consensusGroupId = Utils.fromRaftGroupIdToConsensusGroupId(id);
     snapshotStorage = new SnapshotStorage(applicationStateMachine, groupId);
     consensusGroupType = 
Utils.getConsensusGroupTypeFromPrefix(groupId.toString());
     applicationStateMachine.start();
@@ -282,9 +277,7 @@ public class ApplicationStateMachineProxy extends 
BaseStateMachine {
 
   @Override
   public void notifyLeaderChanged(RaftGroupMemberId groupMemberId, RaftPeerId 
newLeaderId) {
-    Optional.ofNullable(canStaleRead)
-        .ifPresent(
-            m -> m.computeIfAbsent(consensusGroupId, id -> new 
AtomicBoolean(false)).set(false));
+    leaderChangeListener.accept(groupMemberId, newLeaderId);
     applicationStateMachine
         .event()
         .notifyLeaderChanged(
diff --git 
a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/ratis/RatisConsensus.java
 
b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/ratis/RatisConsensus.java
index 349bde3a804..2ae303ec96c 100644
--- 
a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/ratis/RatisConsensus.java
+++ 
b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/ratis/RatisConsensus.java
@@ -44,8 +44,8 @@ import 
org.apache.iotdb.consensus.exception.ConsensusGroupAlreadyExistException;
 import org.apache.iotdb.consensus.exception.ConsensusGroupNotExistException;
 import 
org.apache.iotdb.consensus.exception.PeerAlreadyInConsensusGroupException;
 import org.apache.iotdb.consensus.exception.PeerNotInConsensusGroupException;
+import org.apache.iotdb.consensus.exception.RatisReadUnavailableException;
 import org.apache.iotdb.consensus.exception.RatisRequestFailedException;
-import org.apache.iotdb.consensus.exception.RatisUnderRecoveryException;
 import org.apache.iotdb.consensus.ratis.metrics.RatisMetricSet;
 import org.apache.iotdb.consensus.ratis.metrics.RatisMetricsManager;
 import org.apache.iotdb.consensus.ratis.utils.Retriable;
@@ -67,6 +67,7 @@ import org.apache.ratis.protocol.RaftClientReply;
 import org.apache.ratis.protocol.RaftClientRequest;
 import org.apache.ratis.protocol.RaftGroup;
 import org.apache.ratis.protocol.RaftGroupId;
+import org.apache.ratis.protocol.RaftGroupMemberId;
 import org.apache.ratis.protocol.RaftPeer;
 import org.apache.ratis.protocol.RaftPeerId;
 import org.apache.ratis.protocol.SnapshotManagementRequest;
@@ -127,14 +128,14 @@ class RatisConsensus implements IConsensus {
   private static final int DEFAULT_WAIT_LEADER_READY_TIMEOUT = (int) 
TimeUnit.SECONDS.toMillis(20);
 
   private final RatisConfig config;
+  private final RatisConfig.Read.Option readOption;
   private final RetryPolicy<RaftClientReply> readRetryPolicy;
   private final RetryPolicy<RaftClientReply> writeRetryPolicy;
 
   private final RatisMetricSet ratisMetricSet;
   private final TConsensusGroupType consensusGroupType;
 
-  private final ConcurrentHashMap<ConsensusGroupId, AtomicBoolean> 
canServeStaleRead =
-      new ConcurrentHashMap<>();
+  private final ConcurrentHashMap<ConsensusGroupId, AtomicBoolean> 
canServeStaleRead;
 
   public RatisConsensus(ConsensusConfig config, IStateMachine.Registry 
registry)
       throws IOException {
@@ -148,6 +149,9 @@ class RatisConsensus implements IConsensus {
 
     Utils.initRatisConfig(properties, config.getRatisConfig());
     this.config = config.getRatisConfig();
+    this.readOption = this.config.getRead().getReadOption();
+    this.canServeStaleRead =
+        this.readOption == RatisConfig.Read.Option.DEFAULT ? new 
ConcurrentHashMap<>() : null;
     this.consensusGroupType = config.getConsensusGroupType();
     this.ratisMetricSet = new RatisMetricSet();
     this.readRetryPolicy =
@@ -188,7 +192,7 @@ class RatisConsensus implements IConsensus {
                     new ApplicationStateMachineProxy(
                         
registry.apply(Utils.fromRaftGroupIdToConsensusGroupId(raftGroupId)),
                         raftGroupId,
-                        canServeStaleRead))
+                        this::onLeaderChanged))
             .build();
   }
 
@@ -327,21 +331,25 @@ class RatisConsensus implements IConsensus {
       throw new ConsensusGroupNotExistException(groupId);
     }
 
+    // perform linearizable read under following two conditions:
+    // 1. Read.Option is linearizable
+    // 2. First probing read when Read.Option is default
     final boolean isLinearizableRead =
-        !canServeStaleRead.computeIfAbsent(groupId, id -> new 
AtomicBoolean(false)).get();
+        readOption == RatisConfig.Read.Option.LINEARIZABLE
+            || !canServeStaleRead.computeIfAbsent(groupId, id -> new 
AtomicBoolean(false)).get();
 
     RaftClientReply reply;
     try {
       reply = doRead(raftGroupId, request, isLinearizableRead);
       // allow stale read if current linearizable read returns successfully
-      if (isLinearizableRead) {
+      if (canServeStaleRead != null && isLinearizableRead) {
         canServeStaleRead.get(groupId).set(true);
       }
     } catch (ReadException | ReadIndexException e) {
       if (isLinearizableRead) {
         // linearizable read failed. the RaftServer is recovering from Raft 
Log and cannot serve
         // read requests.
-        throw new RatisUnderRecoveryException(e);
+        throw new RatisReadUnavailableException(e);
       } else {
         throw new RatisRequestFailedException(e);
       }
@@ -766,6 +774,16 @@ class RatisConsensus implements IConsensus {
     return reply;
   }
 
+  private void onLeaderChanged(RaftGroupMemberId groupMemberId, RaftPeerId 
leaderId) {
+    Optional.ofNullable(canServeStaleRead)
+        .ifPresent(
+            m -> {
+              final ConsensusGroupId gid =
+                  
Utils.fromRaftGroupIdToConsensusGroupId(groupMemberId.getGroupId());
+              canServeStaleRead.computeIfAbsent(gid, id -> new 
AtomicBoolean()).set(false);
+            });
+  }
+
   @TestOnly
   public RaftServer getServer() {
     return server;
diff --git 
a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/ratis/utils/Utils.java
 
b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/ratis/utils/Utils.java
index 5390420b304..e3f4d2328c5 100644
--- 
a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/ratis/utils/Utils.java
+++ 
b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/ratis/utils/Utils.java
@@ -307,11 +307,9 @@ public class Utils {
     RaftServerConfigKeys.Rpc.setFirstElectionTimeoutMax(
         properties, config.getRpc().getFirstElectionTimeoutMax());
 
-    RaftServerConfigKeys.Read.Option option =
-        config.getRead().getReadOption() == RatisConfig.Read.Option.DEFAULT
-            ? RaftServerConfigKeys.Read.Option.DEFAULT
-            : RaftServerConfigKeys.Read.Option.LINEARIZABLE;
-    RaftServerConfigKeys.Read.setOption(properties, option);
+    /* linearizable means we can obtain consistent data from followers.
+    If we prefer latency, we can directly use staleRead */
+    RaftServerConfigKeys.Read.setOption(properties, 
RaftServerConfigKeys.Read.Option.LINEARIZABLE);
     RaftServerConfigKeys.Read.setTimeout(properties, 
config.getRead().getReadTimeout());
 
     RaftServerConfigKeys.setSleepDeviationThreshold(
diff --git 
a/iotdb-core/consensus/src/test/java/org/apache/iotdb/consensus/ratis/RecoverReadTest.java
 
b/iotdb-core/consensus/src/test/java/org/apache/iotdb/consensus/ratis/RecoverReadTest.java
index 55d12730ffa..68df45a74a8 100644
--- 
a/iotdb-core/consensus/src/test/java/org/apache/iotdb/consensus/ratis/RecoverReadTest.java
+++ 
b/iotdb-core/consensus/src/test/java/org/apache/iotdb/consensus/ratis/RecoverReadTest.java
@@ -23,7 +23,7 @@ import org.apache.iotdb.commons.consensus.ConsensusGroupId;
 import org.apache.iotdb.consensus.common.Peer;
 import org.apache.iotdb.consensus.common.request.IConsensusRequest;
 import org.apache.iotdb.consensus.config.RatisConfig;
-import org.apache.iotdb.consensus.exception.RatisUnderRecoveryException;
+import org.apache.iotdb.consensus.exception.RatisReadUnavailableException;
 
 import org.apache.ratis.util.TimeDuration;
 import org.junit.After;
@@ -228,6 +228,6 @@ public class RecoverReadTest {
     miniCluster.waitUntilActiveLeader();
 
     // query during redo: get exception that ratis is under recovery
-    Assert.assertThrows(RatisUnderRecoveryException.class, () -> 
miniCluster.readThrough(0));
+    Assert.assertThrows(RatisReadUnavailableException.class, () -> 
miniCluster.readThrough(0));
   }
 }
diff --git 
a/iotdb-core/consensus/src/test/java/org/apache/iotdb/consensus/ratis/TestUtils.java
 
b/iotdb-core/consensus/src/test/java/org/apache/iotdb/consensus/ratis/TestUtils.java
index 64f00266d71..275097796ba 100644
--- 
a/iotdb-core/consensus/src/test/java/org/apache/iotdb/consensus/ratis/TestUtils.java
+++ 
b/iotdb-core/consensus/src/test/java/org/apache/iotdb/consensus/ratis/TestUtils.java
@@ -35,7 +35,7 @@ import 
org.apache.iotdb.consensus.common.request.IConsensusRequest;
 import org.apache.iotdb.consensus.config.ConsensusConfig;
 import org.apache.iotdb.consensus.config.RatisConfig;
 import org.apache.iotdb.consensus.exception.ConsensusException;
-import org.apache.iotdb.consensus.exception.RatisUnderRecoveryException;
+import org.apache.iotdb.consensus.exception.RatisReadUnavailableException;
 
 import org.apache.ratis.thirdparty.com.google.common.base.Preconditions;
 import org.apache.ratis.util.FileUtils;
@@ -406,7 +406,7 @@ public class TestUtils {
         try {
           readResp = readThrough(serverIndex);
           break;
-        } catch (RatisUnderRecoveryException e) {
+        } catch (RatisReadUnavailableException e) {
           logger.warn("ratis is redoing raft log, shall wait some time: ", e);
           waitDuration.sleep();
         } catch (ConsensusException e) {
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/consensus/SchemaRegionConsensusImpl.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/consensus/SchemaRegionConsensusImpl.java
index 9f6d42ca5f8..711fa5d36a2 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/consensus/SchemaRegionConsensusImpl.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/consensus/SchemaRegionConsensusImpl.java
@@ -142,6 +142,7 @@ public class SchemaRegionConsensusImpl {
                                     .build())
                             .setRead(
                                 RatisConfig.Read.newBuilder()
+                                    
.setReadOption(RatisConfig.Read.Option.LINEARIZABLE)
                                     // use thrift connection timeout to unify 
read timeout
                                     .setReadTimeout(
                                         TimeDuration.valueOf(

Reply via email to