This is an automated email from the ASF dual-hosted git repository. jackietien pushed a commit to branch FollowerRead in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit 644a29131cfb5ceffe54415c9517b03bec187e96 Author: JackieTien97 <[email protected]> AuthorDate: Fri Oct 17 08:45:33 2025 +0800 Support a new read_consistency_level --- .../java/org/apache/iotdb/db/conf/IoTDBConfig.java | 7 ++++++- .../plan/AbstractFragmentParallelPlanner.java | 23 ++++++++++++++++++---- .../conf/iotdb-system.properties.template | 1 + .../iotdb/commons/enums/ReadConsistencyLevel.java | 3 ++- 4 files changed, 28 insertions(+), 6 deletions(-) diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java index 64d0dc6d214..231a3b77f1d 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java @@ -3342,8 +3342,13 @@ public class IoTDBConfig { public void setReadConsistencyLevel(String readConsistencyLevel) { if ("weak".equalsIgnoreCase(readConsistencyLevel)) { this.readConsistencyLevel = ReadConsistencyLevel.WEAK; - } else { + } else if ("strong".equalsIgnoreCase(readConsistencyLevel)) { this.readConsistencyLevel = ReadConsistencyLevel.STRONG; + } else if ("follower_read".equalsIgnoreCase(readConsistencyLevel)) { + this.readConsistencyLevel = ReadConsistencyLevel.FOLLOWER_READ; + } else { + throw new IllegalArgumentException( + String.format("Unknown readConsistencyLevel %s", readConsistencyLevel)); } } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/AbstractFragmentParallelPlanner.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/AbstractFragmentParallelPlanner.java index 49ed6e36ee6..bcc6e83f14c 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/AbstractFragmentParallelPlanner.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/AbstractFragmentParallelPlanner.java @@ -116,7 +116,6 @@ public abstract class AbstractFragmentParallelPlanner implements IFragmentParall throw new IllegalArgumentException( String.format("regionReplicaSet is invalid: %s", regionReplicaSet)); } - boolean selectRandomDataNode = ReadConsistencyLevel.WEAK == this.readConsistencyLevel; // When planning fragment onto specific DataNode, the DataNode whose endPoint is in // black list won't be considered because it may have connection issue now. @@ -133,13 +132,29 @@ public abstract class AbstractFragmentParallelPlanner implements IFragmentParall if (regionReplicaSet.getDataNodeLocationsSize() != availableDataNodes.size()) { LOGGER.info("available replicas: {}", availableDataNodes); } + int targetIndex = getTargetIndex(availableDataNodes); + return availableDataNodes.get(targetIndex); + } + + private int getTargetIndex(List<TDataNodeLocation> availableDataNodes) { int targetIndex; - if (!selectRandomDataNode || queryContext.getSession() == null) { + if (ReadConsistencyLevel.STRONG == this.readConsistencyLevel + || queryContext.getSession() == null) { targetIndex = 0; - } else { + } else if (ReadConsistencyLevel.WEAK == this.readConsistencyLevel) { targetIndex = (int) (queryContext.getSession().getSessionId() % availableDataNodes.size()); + } else if (ReadConsistencyLevel.FOLLOWER_READ == this.readConsistencyLevel) { + // The first available data node is always leader which is guaranteed by ConfigNode and + // PartitionFetcher in DataNode + // We only need to randomly choose any one from [1, availableDataNodes.size()). + // SessionId is a unchanged long value for each connection, so we can use that as random seed + targetIndex = + (int) (queryContext.getSession().getSessionId() % (availableDataNodes.size() - 1)) + 1; + } else { + throw new IllegalArgumentException( + String.format("Unknown readConsistencyLevel %s", readConsistencyLevel)); } - return availableDataNodes.get(targetIndex); + return targetIndex; } protected FragmentInstance findDownStreamInstance( diff --git a/iotdb-core/node-commons/src/assembly/resources/conf/iotdb-system.properties.template b/iotdb-core/node-commons/src/assembly/resources/conf/iotdb-system.properties.template index 207c0507093..d9a08a646cc 100644 --- a/iotdb-core/node-commons/src/assembly/resources/conf/iotdb-system.properties.template +++ b/iotdb-core/node-commons/src/assembly/resources/conf/iotdb-system.properties.template @@ -1026,6 +1026,7 @@ text_compressor=LZ4 # These consistency levels are currently supported: # 1. strong(Default, read from the leader replica) # 2. weak(Read from a random replica) +# 3. follower_read(If there are available follower replicas, select one from any of the follower replicas.) # effectiveMode: restart # Datatype: string read_consistency_level=strong diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/enums/ReadConsistencyLevel.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/enums/ReadConsistencyLevel.java index 3fb8fdb4e84..70bfcf1708d 100644 --- a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/enums/ReadConsistencyLevel.java +++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/enums/ReadConsistencyLevel.java @@ -21,5 +21,6 @@ package org.apache.iotdb.commons.enums; public enum ReadConsistencyLevel { STRONG, - WEAK + WEAK, + FOLLOWER_READ }
