This is an automated email from the ASF dual-hosted git repository.

caogaofei pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/master by this push:
     new 98acf8e5090 Optimized the query logic to make schema or weak data 
query read from local first & random if without local (#15106)
98acf8e5090 is described below

commit 98acf8e5090592ff4a55f4b6d0943cf47c941807
Author: Caideyipi <[email protected]>
AuthorDate: Mon Mar 17 19:12:16 2025 +0800

    Optimized the query logic to make schema or weak data query read from local 
first & random if without local (#15106)
---
 .../distribution/SimpleFragmentParallelPlanner.java        | 14 ++++++++++++--
 .../planner/distribute/TableModelQueryFragmentPlanner.java | 14 ++++++++++++--
 .../plan/scheduler/FragmentInstanceDispatcherImpl.java     |  9 +++------
 3 files changed, 27 insertions(+), 10 deletions(-)

diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/distribution/SimpleFragmentParallelPlanner.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/distribution/SimpleFragmentParallelPlanner.java
index 3ed0ffa7fe4..259aec29526 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/distribution/SimpleFragmentParallelPlanner.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/distribution/SimpleFragmentParallelPlanner.java
@@ -18,6 +18,7 @@
  */
 package org.apache.iotdb.db.queryengine.plan.planner.distribution;
 
+import org.apache.iotdb.common.rpc.thrift.TConsensusGroupType;
 import org.apache.iotdb.common.rpc.thrift.TDataNodeLocation;
 import org.apache.iotdb.common.rpc.thrift.TEndPoint;
 import org.apache.iotdb.common.rpc.thrift.TRegionReplicaSet;
@@ -39,6 +40,7 @@ import 
org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNodeId;
 import 
org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.ExchangeNode;
 import 
org.apache.iotdb.db.queryengine.plan.planner.plan.node.sink.MultiChildrenSinkNode;
 import 
org.apache.iotdb.db.queryengine.plan.planner.plan.node.source.LastSeriesSourceNode;
+import 
org.apache.iotdb.db.queryengine.plan.scheduler.FragmentInstanceDispatcherImpl;
 import org.apache.iotdb.db.queryengine.plan.statement.crud.QueryStatement;
 import 
org.apache.iotdb.db.queryengine.plan.statement.metadata.ShowTimeSeriesStatement;
 import 
org.apache.iotdb.db.queryengine.plan.statement.sys.ExplainAnalyzeStatement;
@@ -205,7 +207,9 @@ public class SimpleFragmentParallelPlanner implements 
IFragmentParallelPlaner {
         IoTDBDescriptor.getInstance().getConfig().getReadConsistencyLevel();
     // TODO: (Chen Rongzhao) need to make the values of ReadConsistencyLevel 
as static variable or
     // enums
-    boolean selectRandomDataNode = "weak".equals(readConsistencyLevel);
+    boolean selectLocalOrRandomDataNode =
+        "weak".equals(readConsistencyLevel)
+            || 
regionReplicaSet.getRegionId().getType().equals(TConsensusGroupType.SchemaRegion);
 
     // When planning fragment onto specific DataNode, the DataNode whose 
endPoint is in
     // black list won't be considered because it may have connection issue now.
@@ -222,9 +226,15 @@ public class SimpleFragmentParallelPlanner implements 
IFragmentParallelPlaner {
       logger.info("available replicas: {}", availableDataNodes);
     }
     int targetIndex;
-    if (!selectRandomDataNode || queryContext.getSession() == null) {
+    if (!selectLocalOrRandomDataNode || queryContext.getSession() == null) {
       targetIndex = 0;
     } else {
+      // Always choose local dataNode first
+      for (final TDataNodeLocation location : availableDataNodes) {
+        if 
(FragmentInstanceDispatcherImpl.isDispatchedToLocal(location.getInternalEndPoint()))
 {
+          return location;
+        }
+      }
       targetIndex = (int) (queryContext.getSession().getSessionId() % 
availableDataNodes.size());
     }
     return availableDataNodes.get(targetIndex);
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/distribute/TableModelQueryFragmentPlanner.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/distribute/TableModelQueryFragmentPlanner.java
index 8141acbe3a3..c57399a3f41 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/distribute/TableModelQueryFragmentPlanner.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/distribute/TableModelQueryFragmentPlanner.java
@@ -19,6 +19,7 @@
 
 package org.apache.iotdb.db.queryengine.plan.relational.planner.distribute;
 
+import org.apache.iotdb.common.rpc.thrift.TConsensusGroupType;
 import org.apache.iotdb.common.rpc.thrift.TDataNodeLocation;
 import org.apache.iotdb.common.rpc.thrift.TEndPoint;
 import org.apache.iotdb.common.rpc.thrift.TRegionReplicaSet;
@@ -42,6 +43,7 @@ import 
org.apache.iotdb.db.queryengine.plan.relational.planner.node.ExchangeNode
 import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.CountDevice;
 import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.ShowDevice;
 import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.Statement;
+import 
org.apache.iotdb.db.queryengine.plan.scheduler.FragmentInstanceDispatcherImpl;
 
 import org.apache.tsfile.utils.Pair;
 import org.slf4j.Logger;
@@ -172,7 +174,9 @@ public class TableModelQueryFragmentPlanner {
     }
     String readConsistencyLevel =
         IoTDBDescriptor.getInstance().getConfig().getReadConsistencyLevel();
-    boolean selectRandomDataNode = "weak".equals(readConsistencyLevel);
+    boolean selectLocalOrRandomDataNode =
+        "weak".equals(readConsistencyLevel)
+            || 
regionReplicaSet.getRegionId().getType().equals(TConsensusGroupType.SchemaRegion);
 
     // When planning fragment onto specific DataNode, the DataNode whose 
endPoint is in
     // black list won't be considered because it may have connection issue now.
@@ -189,9 +193,15 @@ public class TableModelQueryFragmentPlanner {
       LOGGER.info("Available replicas: {}", availableDataNodes);
     }
     int targetIndex;
-    if (!selectRandomDataNode || queryContext.getSession() == null) {
+    if (!selectLocalOrRandomDataNode || queryContext.getSession() == null) {
       targetIndex = 0;
     } else {
+      // Always choose local dataNode first
+      for (final TDataNodeLocation location : availableDataNodes) {
+        if 
(FragmentInstanceDispatcherImpl.isDispatchedToLocal(location.getInternalEndPoint()))
 {
+          return location;
+        }
+      }
       targetIndex = (int) (queryContext.getSession().getSessionId() % 
availableDataNodes.size());
     }
     return availableDataNodes.get(targetIndex);
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/scheduler/FragmentInstanceDispatcherImpl.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/scheduler/FragmentInstanceDispatcherImpl.java
index db0c270d176..94fcc8bbb4d 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/scheduler/FragmentInstanceDispatcherImpl.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/scheduler/FragmentInstanceDispatcherImpl.java
@@ -81,8 +81,6 @@ public class FragmentInstanceDispatcherImpl implements 
IFragInstanceDispatcher {
   private final ExecutorService writeOperationExecutor;
   private final QueryType type;
   private final MPPQueryContext queryContext;
-  private final String localhostIpAddr;
-  private final int localhostInternalPort;
   private final IClientManager<TEndPoint, SyncDataNodeInternalServiceClient>
       syncInternalServiceClientManager;
   private final IClientManager<TEndPoint, AsyncDataNodeInternalServiceClient>
@@ -111,8 +109,6 @@ public class FragmentInstanceDispatcherImpl implements 
IFragInstanceDispatcher {
     this.writeOperationExecutor = writeOperationExecutor;
     this.syncInternalServiceClientManager = syncInternalServiceClientManager;
     this.asyncInternalServiceClientManager = asyncInternalServiceClientManager;
-    this.localhostIpAddr = 
IoTDBDescriptor.getInstance().getConfig().getInternalAddress();
-    this.localhostInternalPort = 
IoTDBDescriptor.getInstance().getConfig().getInternalPort();
   }
 
   @Override
@@ -308,8 +304,9 @@ public class FragmentInstanceDispatcherImpl implements 
IFragInstanceDispatcher {
     }
   }
 
-  private boolean isDispatchedToLocal(TEndPoint endPoint) {
-    return this.localhostIpAddr.equals(endPoint.getIp()) && 
localhostInternalPort == endPoint.port;
+  public static boolean isDispatchedToLocal(TEndPoint endPoint) {
+    return 
IoTDBDescriptor.getInstance().getConfig().getInternalAddress().equals(endPoint.getIp())
+        && IoTDBDescriptor.getInstance().getConfig().getInternalPort() == 
endPoint.port;
   }
 
   private void dispatchRemoteHelper(final FragmentInstance instance, final 
TEndPoint endPoint)

Reply via email to