This is an automated email from the ASF dual-hosted git repository.
caogaofei pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new 98acf8e5090 Optimized the query logic to make schema or weak data
query read from local first & random if without local (#15106)
98acf8e5090 is described below
commit 98acf8e5090592ff4a55f4b6d0943cf47c941807
Author: Caideyipi <[email protected]>
AuthorDate: Mon Mar 17 19:12:16 2025 +0800
Optimized the query logic to make schema or weak data query read from local
first & random if without local (#15106)
---
.../distribution/SimpleFragmentParallelPlanner.java | 14 ++++++++++++--
.../planner/distribute/TableModelQueryFragmentPlanner.java | 14 ++++++++++++--
.../plan/scheduler/FragmentInstanceDispatcherImpl.java | 9 +++------
3 files changed, 27 insertions(+), 10 deletions(-)
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/distribution/SimpleFragmentParallelPlanner.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/distribution/SimpleFragmentParallelPlanner.java
index 3ed0ffa7fe4..259aec29526 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/distribution/SimpleFragmentParallelPlanner.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/distribution/SimpleFragmentParallelPlanner.java
@@ -18,6 +18,7 @@
*/
package org.apache.iotdb.db.queryengine.plan.planner.distribution;
+import org.apache.iotdb.common.rpc.thrift.TConsensusGroupType;
import org.apache.iotdb.common.rpc.thrift.TDataNodeLocation;
import org.apache.iotdb.common.rpc.thrift.TEndPoint;
import org.apache.iotdb.common.rpc.thrift.TRegionReplicaSet;
@@ -39,6 +40,7 @@ import
org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNodeId;
import
org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.ExchangeNode;
import
org.apache.iotdb.db.queryengine.plan.planner.plan.node.sink.MultiChildrenSinkNode;
import
org.apache.iotdb.db.queryengine.plan.planner.plan.node.source.LastSeriesSourceNode;
+import
org.apache.iotdb.db.queryengine.plan.scheduler.FragmentInstanceDispatcherImpl;
import org.apache.iotdb.db.queryengine.plan.statement.crud.QueryStatement;
import
org.apache.iotdb.db.queryengine.plan.statement.metadata.ShowTimeSeriesStatement;
import
org.apache.iotdb.db.queryengine.plan.statement.sys.ExplainAnalyzeStatement;
@@ -205,7 +207,9 @@ public class SimpleFragmentParallelPlanner implements
IFragmentParallelPlaner {
IoTDBDescriptor.getInstance().getConfig().getReadConsistencyLevel();
// TODO: (Chen Rongzhao) need to make the values of ReadConsistencyLevel
as static variable or
// enums
- boolean selectRandomDataNode = "weak".equals(readConsistencyLevel);
+ boolean selectLocalOrRandomDataNode =
+ "weak".equals(readConsistencyLevel)
+ ||
regionReplicaSet.getRegionId().getType().equals(TConsensusGroupType.SchemaRegion);
// When planning fragment onto specific DataNode, the DataNode whose
endPoint is in
// black list won't be considered because it may have connection issue now.
@@ -222,9 +226,15 @@ public class SimpleFragmentParallelPlanner implements
IFragmentParallelPlaner {
logger.info("available replicas: {}", availableDataNodes);
}
int targetIndex;
- if (!selectRandomDataNode || queryContext.getSession() == null) {
+ if (!selectLocalOrRandomDataNode || queryContext.getSession() == null) {
targetIndex = 0;
} else {
+ // Always choose local dataNode first
+ for (final TDataNodeLocation location : availableDataNodes) {
+ if
(FragmentInstanceDispatcherImpl.isDispatchedToLocal(location.getInternalEndPoint()))
{
+ return location;
+ }
+ }
targetIndex = (int) (queryContext.getSession().getSessionId() %
availableDataNodes.size());
}
return availableDataNodes.get(targetIndex);
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/distribute/TableModelQueryFragmentPlanner.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/distribute/TableModelQueryFragmentPlanner.java
index 8141acbe3a3..c57399a3f41 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/distribute/TableModelQueryFragmentPlanner.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/distribute/TableModelQueryFragmentPlanner.java
@@ -19,6 +19,7 @@
package org.apache.iotdb.db.queryengine.plan.relational.planner.distribute;
+import org.apache.iotdb.common.rpc.thrift.TConsensusGroupType;
import org.apache.iotdb.common.rpc.thrift.TDataNodeLocation;
import org.apache.iotdb.common.rpc.thrift.TEndPoint;
import org.apache.iotdb.common.rpc.thrift.TRegionReplicaSet;
@@ -42,6 +43,7 @@ import
org.apache.iotdb.db.queryengine.plan.relational.planner.node.ExchangeNode
import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.CountDevice;
import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.ShowDevice;
import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.Statement;
+import
org.apache.iotdb.db.queryengine.plan.scheduler.FragmentInstanceDispatcherImpl;
import org.apache.tsfile.utils.Pair;
import org.slf4j.Logger;
@@ -172,7 +174,9 @@ public class TableModelQueryFragmentPlanner {
}
String readConsistencyLevel =
IoTDBDescriptor.getInstance().getConfig().getReadConsistencyLevel();
- boolean selectRandomDataNode = "weak".equals(readConsistencyLevel);
+ boolean selectLocalOrRandomDataNode =
+ "weak".equals(readConsistencyLevel)
+ ||
regionReplicaSet.getRegionId().getType().equals(TConsensusGroupType.SchemaRegion);
// When planning fragment onto specific DataNode, the DataNode whose
endPoint is in
// black list won't be considered because it may have connection issue now.
@@ -189,9 +193,15 @@ public class TableModelQueryFragmentPlanner {
LOGGER.info("Available replicas: {}", availableDataNodes);
}
int targetIndex;
- if (!selectRandomDataNode || queryContext.getSession() == null) {
+ if (!selectLocalOrRandomDataNode || queryContext.getSession() == null) {
targetIndex = 0;
} else {
+ // Always choose local dataNode first
+ for (final TDataNodeLocation location : availableDataNodes) {
+ if
(FragmentInstanceDispatcherImpl.isDispatchedToLocal(location.getInternalEndPoint()))
{
+ return location;
+ }
+ }
targetIndex = (int) (queryContext.getSession().getSessionId() %
availableDataNodes.size());
}
return availableDataNodes.get(targetIndex);
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/scheduler/FragmentInstanceDispatcherImpl.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/scheduler/FragmentInstanceDispatcherImpl.java
index db0c270d176..94fcc8bbb4d 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/scheduler/FragmentInstanceDispatcherImpl.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/scheduler/FragmentInstanceDispatcherImpl.java
@@ -81,8 +81,6 @@ public class FragmentInstanceDispatcherImpl implements
IFragInstanceDispatcher {
private final ExecutorService writeOperationExecutor;
private final QueryType type;
private final MPPQueryContext queryContext;
- private final String localhostIpAddr;
- private final int localhostInternalPort;
private final IClientManager<TEndPoint, SyncDataNodeInternalServiceClient>
syncInternalServiceClientManager;
private final IClientManager<TEndPoint, AsyncDataNodeInternalServiceClient>
@@ -111,8 +109,6 @@ public class FragmentInstanceDispatcherImpl implements
IFragInstanceDispatcher {
this.writeOperationExecutor = writeOperationExecutor;
this.syncInternalServiceClientManager = syncInternalServiceClientManager;
this.asyncInternalServiceClientManager = asyncInternalServiceClientManager;
- this.localhostIpAddr =
IoTDBDescriptor.getInstance().getConfig().getInternalAddress();
- this.localhostInternalPort =
IoTDBDescriptor.getInstance().getConfig().getInternalPort();
}
@Override
@@ -308,8 +304,9 @@ public class FragmentInstanceDispatcherImpl implements
IFragInstanceDispatcher {
}
}
- private boolean isDispatchedToLocal(TEndPoint endPoint) {
- return this.localhostIpAddr.equals(endPoint.getIp()) &&
localhostInternalPort == endPoint.port;
+ public static boolean isDispatchedToLocal(TEndPoint endPoint) {
+ return
IoTDBDescriptor.getInstance().getConfig().getInternalAddress().equals(endPoint.getIp())
+ && IoTDBDescriptor.getInstance().getConfig().getInternalPort() ==
endPoint.port;
}
private void dispatchRemoteHelper(final FragmentInstance instance, final
TEndPoint endPoint)