This is an automated email from the ASF dual-hosted git repository.

jackietien pushed a commit to branch WeakRead
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit 63a25b8b54cfe78f13417f0c2a4a809877bf3850
Author: JackieTien97 <[email protected]>
AuthorDate: Wed Aug 20 09:06:38 2025 +0800

    Weak Read Consistency Level Refactor
---
 .../java/org/apache/iotdb/db/conf/IoTDBConfig.java | 11 +++++++---
 .../plan/AbstractFragmentParallelPlanner.java      | 20 ++++++++---------
 .../iotdb/commons/enums/ReadConsistencyLevel.java  | 25 ++++++++++++++++++++++
 3 files changed, 43 insertions(+), 13 deletions(-)

diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
index a8a4c796557..6e6ac8f0c40 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
@@ -24,6 +24,7 @@ import org.apache.iotdb.common.rpc.thrift.TEndPoint;
 import 
org.apache.iotdb.commons.client.property.ClientPoolProperty.DefaultProperty;
 import org.apache.iotdb.commons.conf.CommonDescriptor;
 import org.apache.iotdb.commons.conf.IoTDBConstant;
+import org.apache.iotdb.commons.enums.ReadConsistencyLevel;
 import org.apache.iotdb.commons.utils.FileUtils;
 import org.apache.iotdb.consensus.ConsensusFactory;
 import org.apache.iotdb.db.audit.AuditLogOperation;
@@ -975,7 +976,7 @@ public class IoTDBConfig {
   private long detailContainerMinDegradeMemoryInBytes = 1024 * 1024L;
   private int schemaThreadCount = 5;
 
-  private String readConsistencyLevel = "strong";
+  private ReadConsistencyLevel readConsistencyLevel = 
ReadConsistencyLevel.STRONG;
 
   /** Maximum execution time of a DriverTask */
   private int driverTaskExecutionTimeSliceInMs = 200;
@@ -3304,12 +3305,16 @@ public class IoTDBConfig {
     this.schemaThreadCount = schemaThreadCount;
   }
 
-  public String getReadConsistencyLevel() {
+  public ReadConsistencyLevel getReadConsistencyLevel() {
     return readConsistencyLevel;
   }
 
   public void setReadConsistencyLevel(String readConsistencyLevel) {
-    this.readConsistencyLevel = readConsistencyLevel;
+    if ("weak".equalsIgnoreCase(readConsistencyLevel)) {
+      this.readConsistencyLevel = ReadConsistencyLevel.WEAK;
+    } else {
+      this.readConsistencyLevel = ReadConsistencyLevel.STRONG;
+    }
   }
 
   public int getDriverTaskExecutionTimeSliceInMs() {
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/AbstractFragmentParallelPlanner.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/AbstractFragmentParallelPlanner.java
index 84489a630aa..49ed6e36ee6 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/AbstractFragmentParallelPlanner.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/AbstractFragmentParallelPlanner.java
@@ -22,9 +22,11 @@ package org.apache.iotdb.db.queryengine.plan.planner.plan;
 import org.apache.iotdb.common.rpc.thrift.TDataNodeLocation;
 import org.apache.iotdb.common.rpc.thrift.TEndPoint;
 import org.apache.iotdb.common.rpc.thrift.TRegionReplicaSet;
+import org.apache.iotdb.commons.enums.ReadConsistencyLevel;
 import org.apache.iotdb.commons.exception.IoTDBRuntimeException;
 import org.apache.iotdb.commons.partition.QueryExecutor;
 import org.apache.iotdb.commons.partition.StorageExecutor;
+import org.apache.iotdb.db.conf.IoTDBConfig;
 import org.apache.iotdb.db.conf.IoTDBDescriptor;
 import org.apache.iotdb.db.queryengine.common.DataNodeEndPoints;
 import org.apache.iotdb.db.queryengine.common.MPPQueryContext;
@@ -33,7 +35,6 @@ import 
org.apache.iotdb.db.queryengine.plan.planner.IFragmentParallelPlaner;
 import 
org.apache.iotdb.db.queryengine.plan.planner.exceptions.ReplicaSetUnreachableException;
 import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNode;
 import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNodeId;
-import 
org.apache.iotdb.db.queryengine.plan.relational.planner.distribute.TableModelQueryFragmentPlanner;
 import org.apache.iotdb.rpc.TSStatusCode;
 
 import org.apache.commons.collections4.CollectionUtils;
@@ -45,24 +46,27 @@ import java.util.ArrayList;
 import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
-import java.util.function.Function;
 import java.util.function.Supplier;
+import java.util.function.UnaryOperator;
 
 public abstract class AbstractFragmentParallelPlanner implements 
IFragmentParallelPlaner {
   private static final Logger LOGGER =
-      LoggerFactory.getLogger(TableModelQueryFragmentPlanner.class);
+      LoggerFactory.getLogger(AbstractFragmentParallelPlanner.class);
+  private static final IoTDBConfig CONFIG = 
IoTDBDescriptor.getInstance().getConfig();
+  private final ReadConsistencyLevel readConsistencyLevel;
 
-  protected MPPQueryContext queryContext;
+  protected final MPPQueryContext queryContext;
 
   protected AbstractFragmentParallelPlanner(MPPQueryContext queryContext) {
     this.queryContext = queryContext;
+    this.readConsistencyLevel = CONFIG.getReadConsistencyLevel();
   }
 
   protected void selectExecutorAndHost(
       PlanFragment fragment,
       FragmentInstance fragmentInstance,
       Supplier<TRegionReplicaSet> replicaSetProvider,
-      Function<TRegionReplicaSet, TRegionReplicaSet> validator,
+      UnaryOperator<TRegionReplicaSet> validator,
       Map<TDataNodeLocation, List<FragmentInstance>> dataNodeFIMap) {
     // Get the target region for origin PlanFragment, then its instance will 
be distributed one
     // of them.
@@ -112,11 +116,7 @@ public abstract class AbstractFragmentParallelPlanner 
implements IFragmentParall
       throw new IllegalArgumentException(
           String.format("regionReplicaSet is invalid: %s", regionReplicaSet));
     }
-    String readConsistencyLevel =
-        IoTDBDescriptor.getInstance().getConfig().getReadConsistencyLevel();
-    // TODO: (Chen Rongzhao) need to make the values of ReadConsistencyLevel 
as static variable or
-    // enums
-    boolean selectRandomDataNode = "weak".equals(readConsistencyLevel);
+    boolean selectRandomDataNode = ReadConsistencyLevel.WEAK == 
this.readConsistencyLevel;
 
     // When planning fragment onto specific DataNode, the DataNode whose 
endPoint is in
     // black list won't be considered because it may have connection issue now.
diff --git 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/enums/ReadConsistencyLevel.java
 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/enums/ReadConsistencyLevel.java
new file mode 100644
index 00000000000..3fb8fdb4e84
--- /dev/null
+++ 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/enums/ReadConsistencyLevel.java
@@ -0,0 +1,25 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.commons.enums;
+
+public enum ReadConsistencyLevel {
+  STRONG,
+  WEAK
+}

Reply via email to