This is an automated email from the ASF dual-hosted git repository. jackietien pushed a commit to branch WeakRead in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit 63a25b8b54cfe78f13417f0c2a4a809877bf3850 Author: JackieTien97 <[email protected]> AuthorDate: Wed Aug 20 09:06:38 2025 +0800 Weak Read Consistency Level Refactor --- .../java/org/apache/iotdb/db/conf/IoTDBConfig.java | 11 +++++++--- .../plan/AbstractFragmentParallelPlanner.java | 20 ++++++++--------- .../iotdb/commons/enums/ReadConsistencyLevel.java | 25 ++++++++++++++++++++++ 3 files changed, 43 insertions(+), 13 deletions(-) diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java index a8a4c796557..6e6ac8f0c40 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java @@ -24,6 +24,7 @@ import org.apache.iotdb.common.rpc.thrift.TEndPoint; import org.apache.iotdb.commons.client.property.ClientPoolProperty.DefaultProperty; import org.apache.iotdb.commons.conf.CommonDescriptor; import org.apache.iotdb.commons.conf.IoTDBConstant; +import org.apache.iotdb.commons.enums.ReadConsistencyLevel; import org.apache.iotdb.commons.utils.FileUtils; import org.apache.iotdb.consensus.ConsensusFactory; import org.apache.iotdb.db.audit.AuditLogOperation; @@ -975,7 +976,7 @@ public class IoTDBConfig { private long detailContainerMinDegradeMemoryInBytes = 1024 * 1024L; private int schemaThreadCount = 5; - private String readConsistencyLevel = "strong"; + private ReadConsistencyLevel readConsistencyLevel = ReadConsistencyLevel.STRONG; /** Maximum execution time of a DriverTask */ private int driverTaskExecutionTimeSliceInMs = 200; @@ -3304,12 +3305,16 @@ public class IoTDBConfig { this.schemaThreadCount = schemaThreadCount; } - public String getReadConsistencyLevel() { + public ReadConsistencyLevel getReadConsistencyLevel() { return readConsistencyLevel; } public void setReadConsistencyLevel(String readConsistencyLevel) { - this.readConsistencyLevel = readConsistencyLevel; + if ("weak".equalsIgnoreCase(readConsistencyLevel)) { + this.readConsistencyLevel = ReadConsistencyLevel.WEAK; + } else { + this.readConsistencyLevel = ReadConsistencyLevel.STRONG; + } } public int getDriverTaskExecutionTimeSliceInMs() { diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/AbstractFragmentParallelPlanner.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/AbstractFragmentParallelPlanner.java index 84489a630aa..49ed6e36ee6 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/AbstractFragmentParallelPlanner.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/AbstractFragmentParallelPlanner.java @@ -22,9 +22,11 @@ package org.apache.iotdb.db.queryengine.plan.planner.plan; import org.apache.iotdb.common.rpc.thrift.TDataNodeLocation; import org.apache.iotdb.common.rpc.thrift.TEndPoint; import org.apache.iotdb.common.rpc.thrift.TRegionReplicaSet; +import org.apache.iotdb.commons.enums.ReadConsistencyLevel; import org.apache.iotdb.commons.exception.IoTDBRuntimeException; import org.apache.iotdb.commons.partition.QueryExecutor; import org.apache.iotdb.commons.partition.StorageExecutor; +import org.apache.iotdb.db.conf.IoTDBConfig; import org.apache.iotdb.db.conf.IoTDBDescriptor; import org.apache.iotdb.db.queryengine.common.DataNodeEndPoints; import org.apache.iotdb.db.queryengine.common.MPPQueryContext; @@ -33,7 +35,6 @@ import org.apache.iotdb.db.queryengine.plan.planner.IFragmentParallelPlaner; import org.apache.iotdb.db.queryengine.plan.planner.exceptions.ReplicaSetUnreachableException; import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNode; import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNodeId; -import org.apache.iotdb.db.queryengine.plan.relational.planner.distribute.TableModelQueryFragmentPlanner; import org.apache.iotdb.rpc.TSStatusCode; import org.apache.commons.collections4.CollectionUtils; @@ -45,24 +46,27 @@ import java.util.ArrayList; import java.util.LinkedList; import java.util.List; import java.util.Map; -import java.util.function.Function; import java.util.function.Supplier; +import java.util.function.UnaryOperator; public abstract class AbstractFragmentParallelPlanner implements IFragmentParallelPlaner { private static final Logger LOGGER = - LoggerFactory.getLogger(TableModelQueryFragmentPlanner.class); + LoggerFactory.getLogger(AbstractFragmentParallelPlanner.class); + private static final IoTDBConfig CONFIG = IoTDBDescriptor.getInstance().getConfig(); + private final ReadConsistencyLevel readConsistencyLevel; - protected MPPQueryContext queryContext; + protected final MPPQueryContext queryContext; protected AbstractFragmentParallelPlanner(MPPQueryContext queryContext) { this.queryContext = queryContext; + this.readConsistencyLevel = CONFIG.getReadConsistencyLevel(); } protected void selectExecutorAndHost( PlanFragment fragment, FragmentInstance fragmentInstance, Supplier<TRegionReplicaSet> replicaSetProvider, - Function<TRegionReplicaSet, TRegionReplicaSet> validator, + UnaryOperator<TRegionReplicaSet> validator, Map<TDataNodeLocation, List<FragmentInstance>> dataNodeFIMap) { // Get the target region for origin PlanFragment, then its instance will be distributed one // of them. @@ -112,11 +116,7 @@ public abstract class AbstractFragmentParallelPlanner implements IFragmentParall throw new IllegalArgumentException( String.format("regionReplicaSet is invalid: %s", regionReplicaSet)); } - String readConsistencyLevel = - IoTDBDescriptor.getInstance().getConfig().getReadConsistencyLevel(); - // TODO: (Chen Rongzhao) need to make the values of ReadConsistencyLevel as static variable or - // enums - boolean selectRandomDataNode = "weak".equals(readConsistencyLevel); + boolean selectRandomDataNode = ReadConsistencyLevel.WEAK == this.readConsistencyLevel; // When planning fragment onto specific DataNode, the DataNode whose endPoint is in // black list won't be considered because it may have connection issue now. diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/enums/ReadConsistencyLevel.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/enums/ReadConsistencyLevel.java new file mode 100644 index 00000000000..3fb8fdb4e84 --- /dev/null +++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/enums/ReadConsistencyLevel.java @@ -0,0 +1,25 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.commons.enums; + +public enum ReadConsistencyLevel { + STRONG, + WEAK +}
