This is an automated email from the ASF dual-hosted git repository.

jackietien pushed a commit to branch FollowerRead
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit 644a29131cfb5ceffe54415c9517b03bec187e96
Author: JackieTien97 <[email protected]>
AuthorDate: Fri Oct 17 08:45:33 2025 +0800

    Support a new read_consistency_level
---
 .../java/org/apache/iotdb/db/conf/IoTDBConfig.java |  7 ++++++-
 .../plan/AbstractFragmentParallelPlanner.java      | 23 ++++++++++++++++++----
 .../conf/iotdb-system.properties.template          |  1 +
 .../iotdb/commons/enums/ReadConsistencyLevel.java  |  3 ++-
 4 files changed, 28 insertions(+), 6 deletions(-)

diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
index 64d0dc6d214..231a3b77f1d 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
@@ -3342,8 +3342,13 @@ public class IoTDBConfig {
   public void setReadConsistencyLevel(String readConsistencyLevel) {
     if ("weak".equalsIgnoreCase(readConsistencyLevel)) {
       this.readConsistencyLevel = ReadConsistencyLevel.WEAK;
-    } else {
+    } else if ("strong".equalsIgnoreCase(readConsistencyLevel)) {
       this.readConsistencyLevel = ReadConsistencyLevel.STRONG;
+    } else if ("follower_read".equalsIgnoreCase(readConsistencyLevel)) {
+      this.readConsistencyLevel = ReadConsistencyLevel.FOLLOWER_READ;
+    } else {
+      throw new IllegalArgumentException(
+          String.format("Unknown readConsistencyLevel %s", 
readConsistencyLevel));
     }
   }
 
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/AbstractFragmentParallelPlanner.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/AbstractFragmentParallelPlanner.java
index 49ed6e36ee6..bcc6e83f14c 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/AbstractFragmentParallelPlanner.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/AbstractFragmentParallelPlanner.java
@@ -116,7 +116,6 @@ public abstract class AbstractFragmentParallelPlanner 
implements IFragmentParall
       throw new IllegalArgumentException(
           String.format("regionReplicaSet is invalid: %s", regionReplicaSet));
     }
-    boolean selectRandomDataNode = ReadConsistencyLevel.WEAK == 
this.readConsistencyLevel;
 
     // When planning fragment onto specific DataNode, the DataNode whose 
endPoint is in
     // black list won't be considered because it may have connection issue now.
@@ -133,13 +132,29 @@ public abstract class AbstractFragmentParallelPlanner 
implements IFragmentParall
     if (regionReplicaSet.getDataNodeLocationsSize() != 
availableDataNodes.size()) {
       LOGGER.info("available replicas: {}", availableDataNodes);
     }
+    int targetIndex = getTargetIndex(availableDataNodes);
+    return availableDataNodes.get(targetIndex);
+  }
+
+  private int getTargetIndex(List<TDataNodeLocation> availableDataNodes) {
     int targetIndex;
-    if (!selectRandomDataNode || queryContext.getSession() == null) {
+    if (ReadConsistencyLevel.STRONG == this.readConsistencyLevel
+        || queryContext.getSession() == null) {
       targetIndex = 0;
-    } else {
+    } else if (ReadConsistencyLevel.WEAK == this.readConsistencyLevel) {
       targetIndex = (int) (queryContext.getSession().getSessionId() % 
availableDataNodes.size());
+    } else if (ReadConsistencyLevel.FOLLOWER_READ == 
this.readConsistencyLevel) {
+      // The first available data node is always leader which is guaranteed by 
ConfigNode and
+      // PartitionFetcher in DataNode
+      // We only need to randomly choose any one from [1, 
availableDataNodes.size()).
+      // SessionId is a unchanged long value for each connection, so we can 
use that as random seed
+      targetIndex =
+          (int) (queryContext.getSession().getSessionId() % 
(availableDataNodes.size() - 1)) + 1;
+    } else {
+      throw new IllegalArgumentException(
+          String.format("Unknown readConsistencyLevel %s", 
readConsistencyLevel));
     }
-    return availableDataNodes.get(targetIndex);
+    return targetIndex;
   }
 
   protected FragmentInstance findDownStreamInstance(
diff --git 
a/iotdb-core/node-commons/src/assembly/resources/conf/iotdb-system.properties.template
 
b/iotdb-core/node-commons/src/assembly/resources/conf/iotdb-system.properties.template
index 207c0507093..d9a08a646cc 100644
--- 
a/iotdb-core/node-commons/src/assembly/resources/conf/iotdb-system.properties.template
+++ 
b/iotdb-core/node-commons/src/assembly/resources/conf/iotdb-system.properties.template
@@ -1026,6 +1026,7 @@ text_compressor=LZ4
 # These consistency levels are currently supported:
 # 1. strong(Default, read from the leader replica)
 # 2. weak(Read from a random replica)
+# 3. follower_read(If there are available follower replicas, select one from 
any of the follower replicas.)
 # effectiveMode: restart
 # Datatype: string
 read_consistency_level=strong
diff --git 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/enums/ReadConsistencyLevel.java
 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/enums/ReadConsistencyLevel.java
index 3fb8fdb4e84..70bfcf1708d 100644
--- 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/enums/ReadConsistencyLevel.java
+++ 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/enums/ReadConsistencyLevel.java
@@ -21,5 +21,6 @@ package org.apache.iotdb.commons.enums;
 
 public enum ReadConsistencyLevel {
   STRONG,
-  WEAK
+  WEAK,
+  FOLLOWER_READ
 }

Reply via email to