This is an automated email from the ASF dual-hosted git repository.
jackietien pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new c396fd81b84 Weak Read Consistency Level Refactor & Print
InternalAddress and port in explain analyze
c396fd81b84 is described below
commit c396fd81b8440e29743bfe1003e5f69601b3bb39
Author: Jackie Tien <[email protected]>
AuthorDate: Wed Aug 20 14:09:04 2025 +0800
Weak Read Consistency Level Refactor & Print InternalAddress and port in
explain analyze
---
.../java/org/apache/iotdb/db/conf/IoTDBConfig.java | 11 +++++++---
.../fragment/FragmentInstanceExecution.java | 4 +++-
.../plan/AbstractFragmentParallelPlanner.java | 20 ++++++++---------
.../iotdb/commons/enums/ReadConsistencyLevel.java | 25 ++++++++++++++++++++++
4 files changed, 46 insertions(+), 14 deletions(-)
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
index a8a4c796557..6e6ac8f0c40 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
@@ -24,6 +24,7 @@ import org.apache.iotdb.common.rpc.thrift.TEndPoint;
import
org.apache.iotdb.commons.client.property.ClientPoolProperty.DefaultProperty;
import org.apache.iotdb.commons.conf.CommonDescriptor;
import org.apache.iotdb.commons.conf.IoTDBConstant;
+import org.apache.iotdb.commons.enums.ReadConsistencyLevel;
import org.apache.iotdb.commons.utils.FileUtils;
import org.apache.iotdb.consensus.ConsensusFactory;
import org.apache.iotdb.db.audit.AuditLogOperation;
@@ -975,7 +976,7 @@ public class IoTDBConfig {
private long detailContainerMinDegradeMemoryInBytes = 1024 * 1024L;
private int schemaThreadCount = 5;
- private String readConsistencyLevel = "strong";
+ private ReadConsistencyLevel readConsistencyLevel =
ReadConsistencyLevel.STRONG;
/** Maximum execution time of a DriverTask */
private int driverTaskExecutionTimeSliceInMs = 200;
@@ -3304,12 +3305,16 @@ public class IoTDBConfig {
this.schemaThreadCount = schemaThreadCount;
}
- public String getReadConsistencyLevel() {
+ public ReadConsistencyLevel getReadConsistencyLevel() {
return readConsistencyLevel;
}
public void setReadConsistencyLevel(String readConsistencyLevel) {
- this.readConsistencyLevel = readConsistencyLevel;
+ if ("weak".equalsIgnoreCase(readConsistencyLevel)) {
+ this.readConsistencyLevel = ReadConsistencyLevel.WEAK;
+ } else {
+ this.readConsistencyLevel = ReadConsistencyLevel.STRONG;
+ }
}
public int getDriverTaskExecutionTimeSliceInMs() {
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/fragment/FragmentInstanceExecution.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/fragment/FragmentInstanceExecution.java
index 7681475c4d1..92471853f60 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/fragment/FragmentInstanceExecution.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/fragment/FragmentInstanceExecution.java
@@ -20,6 +20,7 @@
package org.apache.iotdb.db.queryengine.execution.fragment;
import org.apache.iotdb.commons.utils.FileUtils;
+import org.apache.iotdb.db.conf.IoTDBConfig;
import org.apache.iotdb.db.conf.IoTDBDescriptor;
import org.apache.iotdb.db.queryengine.common.FragmentInstanceId;
import org.apache.iotdb.db.queryengine.exception.CpuNotEnoughException;
@@ -53,6 +54,7 @@ import static
org.apache.iotdb.db.queryengine.statistics.StatisticsMergeUtil.mer
public class FragmentInstanceExecution {
private static final Logger LOGGER =
LoggerFactory.getLogger(FragmentInstanceExecution.class);
+ private static final IoTDBConfig CONFIG =
IoTDBDescriptor.getInstance().getConfig();
private final FragmentInstanceId instanceId;
private final FragmentInstanceContext context;
@@ -152,7 +154,7 @@ public class FragmentInstanceExecution {
return false;
}
statistics.setDataRegion(((DataRegion)
context.getDataRegion()).getDataRegionId());
-
statistics.setIp(IoTDBDescriptor.getInstance().getConfig().getAddressAndPort().ip);
+ statistics.setIp(CONFIG.getInternalAddress() + ":" +
CONFIG.getInternalPort());
statistics.setStartTimeInMS(context.getStartTime());
statistics.setEndTimeInMS(
context.isEndTimeUpdate() ? context.getEndTime() :
System.currentTimeMillis());
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/AbstractFragmentParallelPlanner.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/AbstractFragmentParallelPlanner.java
index 84489a630aa..49ed6e36ee6 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/AbstractFragmentParallelPlanner.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/AbstractFragmentParallelPlanner.java
@@ -22,9 +22,11 @@ package org.apache.iotdb.db.queryengine.plan.planner.plan;
import org.apache.iotdb.common.rpc.thrift.TDataNodeLocation;
import org.apache.iotdb.common.rpc.thrift.TEndPoint;
import org.apache.iotdb.common.rpc.thrift.TRegionReplicaSet;
+import org.apache.iotdb.commons.enums.ReadConsistencyLevel;
import org.apache.iotdb.commons.exception.IoTDBRuntimeException;
import org.apache.iotdb.commons.partition.QueryExecutor;
import org.apache.iotdb.commons.partition.StorageExecutor;
+import org.apache.iotdb.db.conf.IoTDBConfig;
import org.apache.iotdb.db.conf.IoTDBDescriptor;
import org.apache.iotdb.db.queryengine.common.DataNodeEndPoints;
import org.apache.iotdb.db.queryengine.common.MPPQueryContext;
@@ -33,7 +35,6 @@ import
org.apache.iotdb.db.queryengine.plan.planner.IFragmentParallelPlaner;
import
org.apache.iotdb.db.queryengine.plan.planner.exceptions.ReplicaSetUnreachableException;
import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNode;
import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNodeId;
-import
org.apache.iotdb.db.queryengine.plan.relational.planner.distribute.TableModelQueryFragmentPlanner;
import org.apache.iotdb.rpc.TSStatusCode;
import org.apache.commons.collections4.CollectionUtils;
@@ -45,24 +46,27 @@ import java.util.ArrayList;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
-import java.util.function.Function;
import java.util.function.Supplier;
+import java.util.function.UnaryOperator;
public abstract class AbstractFragmentParallelPlanner implements
IFragmentParallelPlaner {
private static final Logger LOGGER =
- LoggerFactory.getLogger(TableModelQueryFragmentPlanner.class);
+ LoggerFactory.getLogger(AbstractFragmentParallelPlanner.class);
+ private static final IoTDBConfig CONFIG =
IoTDBDescriptor.getInstance().getConfig();
+ private final ReadConsistencyLevel readConsistencyLevel;
- protected MPPQueryContext queryContext;
+ protected final MPPQueryContext queryContext;
protected AbstractFragmentParallelPlanner(MPPQueryContext queryContext) {
this.queryContext = queryContext;
+ this.readConsistencyLevel = CONFIG.getReadConsistencyLevel();
}
protected void selectExecutorAndHost(
PlanFragment fragment,
FragmentInstance fragmentInstance,
Supplier<TRegionReplicaSet> replicaSetProvider,
- Function<TRegionReplicaSet, TRegionReplicaSet> validator,
+ UnaryOperator<TRegionReplicaSet> validator,
Map<TDataNodeLocation, List<FragmentInstance>> dataNodeFIMap) {
// Get the target region for origin PlanFragment, then its instance will
be distributed one
// of them.
@@ -112,11 +116,7 @@ public abstract class AbstractFragmentParallelPlanner
implements IFragmentParall
throw new IllegalArgumentException(
String.format("regionReplicaSet is invalid: %s", regionReplicaSet));
}
- String readConsistencyLevel =
- IoTDBDescriptor.getInstance().getConfig().getReadConsistencyLevel();
- // TODO: (Chen Rongzhao) need to make the values of ReadConsistencyLevel
as static variable or
- // enums
- boolean selectRandomDataNode = "weak".equals(readConsistencyLevel);
+ boolean selectRandomDataNode = ReadConsistencyLevel.WEAK ==
this.readConsistencyLevel;
// When planning fragment onto specific DataNode, the DataNode whose
endPoint is in
// black list won't be considered because it may have connection issue now.
diff --git
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/enums/ReadConsistencyLevel.java
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/enums/ReadConsistencyLevel.java
new file mode 100644
index 00000000000..3fb8fdb4e84
--- /dev/null
+++
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/enums/ReadConsistencyLevel.java
@@ -0,0 +1,25 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.commons.enums;
+
+public enum ReadConsistencyLevel {
+ STRONG,
+ WEAK
+}