This is an automated email from the ASF dual-hosted git repository.
jackietien pushed a commit to branch ty/TableModelGrammar
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/ty/TableModelGrammar by this
push:
new 40b3ef4ca84 Correct the way Schema related PlanNode in
DistributionPlanner
40b3ef4ca84 is described below
commit 40b3ef4ca8475cc7fd4a668fdcaec36477b73efd
Author: Jackie Tien <[email protected]>
AuthorDate: Tue Jul 9 19:46:56 2024 +0800
Correct the way Schema related PlanNode in DistributionPlanner
---
.../plan/planner/LocalExecutionPlanner.java | 36 ++---
.../plan/planner/TableOperatorGenerator.java | 19 ---
.../node/metedata/read/TableDeviceFetchNode.java | 6 +-
.../node/metedata/read/TableDeviceSourceNode.java | 23 ++++
.../fetcher/TableDeviceSchemaValidator.java | 2 +-
.../planner/distribute/AddExchangeNodes.java | 24 ++++
.../distribute/DistributedPlanGenerator.java | 147 ++++++++++++---------
.../apache/iotdb/commons/partition/Partition.java | 2 +-
8 files changed, 159 insertions(+), 100 deletions(-)
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/LocalExecutionPlanner.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/LocalExecutionPlanner.java
index 3b74f1b4a08..1f794df63fb 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/LocalExecutionPlanner.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/LocalExecutionPlanner.java
@@ -92,20 +92,7 @@ public class LocalExecutionPlanner {
LocalExecutionPlanContext context =
new LocalExecutionPlanContext(types, instanceContext,
dataNodeQueryContext);
- // Generate pipelines, return the last pipeline data structure
- // TODO Replace operator with operatorFactory to build multiple driver for
one pipeline
- Operator root;
- IClientSession.SqlDialect sqlDialect =
instanceContext.getSessionInfo().getSqlDialect();
- switch (sqlDialect) {
- case TREE:
- root = plan.accept(new OperatorTreeGenerator(), context);
- break;
- case TABLE:
- root = plan.accept(new TableOperatorGenerator(metadata), context);
- break;
- default:
- throw new IllegalArgumentException(String.format("Unknown sql dialect:
%s", sqlDialect));
- }
+ Operator root = generateOperator(instanceContext, context, plan);
PipelineMemoryEstimator memoryEstimator =
context.constructPipelineMemoryEstimator(root, null, plan, -1);
@@ -136,7 +123,7 @@ public class LocalExecutionPlanner {
LocalExecutionPlanContext context =
new LocalExecutionPlanContext(instanceContext, schemaRegion);
- Operator root = plan.accept(new OperatorTreeGenerator(), context);
+ Operator root = generateOperator(instanceContext, context, plan);
PipelineMemoryEstimator memoryEstimator =
context.constructPipelineMemoryEstimator(root, null, plan, -1);
@@ -154,6 +141,25 @@ public class LocalExecutionPlanner {
return context.getPipelineDriverFactories();
}
+ private Operator generateOperator(
+ FragmentInstanceContext instanceContext, LocalExecutionPlanContext
context, PlanNode node) {
+ // Generate pipelines, return the last pipeline data structure
+ // TODO Replace operator with operatorFactory to build multiple driver for
one pipeline
+ Operator root;
+ IClientSession.SqlDialect sqlDialect =
instanceContext.getSessionInfo().getSqlDialect();
+ switch (sqlDialect) {
+ case TREE:
+ root = node.accept(new OperatorTreeGenerator(), context);
+ break;
+ case TABLE:
+ root = node.accept(new TableOperatorGenerator(metadata), context);
+ break;
+ default:
+ throw new IllegalArgumentException(String.format("Unknown sql dialect:
%s", sqlDialect));
+ }
+ return root;
+ }
+
private long checkMemory(
final PipelineMemoryEstimator memoryEstimator,
FragmentInstanceStateMachine stateMachine)
throws MemoryNotEnoughException {
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/TableOperatorGenerator.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/TableOperatorGenerator.java
index a26f284e0d1..b1ba6ded7f0 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/TableOperatorGenerator.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/TableOperatorGenerator.java
@@ -40,7 +40,6 @@ import
org.apache.iotdb.db.queryengine.execution.operator.process.OffsetOperator
import org.apache.iotdb.db.queryengine.execution.operator.process.SortOperator;
import
org.apache.iotdb.db.queryengine.execution.operator.process.StreamSortOperator;
import org.apache.iotdb.db.queryengine.execution.operator.process.TopKOperator;
-import
org.apache.iotdb.db.queryengine.execution.operator.schema.SchemaQueryMergeOperator;
import
org.apache.iotdb.db.queryengine.execution.operator.schema.SchemaQueryScanOperator;
import
org.apache.iotdb.db.queryengine.execution.operator.schema.source.SchemaSourceFactory;
import
org.apache.iotdb.db.queryengine.execution.operator.sink.IdentitySinkOperator;
@@ -53,7 +52,6 @@ import
org.apache.iotdb.db.queryengine.plan.analyze.TypeProvider;
import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNode;
import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNodeId;
import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanVisitor;
-import
org.apache.iotdb.db.queryengine.plan.planner.plan.node.metedata.read.SchemaQueryMergeNode;
import
org.apache.iotdb.db.queryengine.plan.planner.plan.node.metedata.read.TableDeviceFetchNode;
import
org.apache.iotdb.db.queryengine.plan.planner.plan.node.metedata.read.TableDeviceQueryNode;
import
org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.ExchangeNode;
@@ -779,23 +777,6 @@ public class TableOperatorGenerator extends
PlanVisitor<Operator, LocalExecution
TSFileDescriptor.getInstance().getConfig().getMaxTsBlockLineNumber());
}
- @Override
- public Operator visitSchemaQueryMerge(
- SchemaQueryMergeNode node, LocalExecutionPlanContext context) {
- List<Operator> children = new ArrayList<>(node.getChildren().size());
- for (PlanNode child : node.getChildren()) {
- children.add(child.accept(this, context));
- }
- OperatorContext operatorContext =
- context
- .getDriverContext()
- .addOperatorContext(
- context.getNextOperatorId(),
- node.getPlanNodeId(),
- SchemaQueryMergeOperator.class.getSimpleName());
- return new SchemaQueryMergeOperator(operatorContext, children);
- }
-
@Override
public Operator visitTableDeviceFetch(
TableDeviceFetchNode node, LocalExecutionPlanContext context) {
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/metedata/read/TableDeviceFetchNode.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/metedata/read/TableDeviceFetchNode.java
index 28a06fda562..d8e0f5e9028 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/metedata/read/TableDeviceFetchNode.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/metedata/read/TableDeviceFetchNode.java
@@ -38,7 +38,7 @@ import java.util.stream.Collectors;
public class TableDeviceFetchNode extends TableDeviceSourceNode {
- private List<Object[]> deviceIdList;
+ private final List<Object[]> deviceIdList;
public TableDeviceFetchNode(
PlanNodeId id,
@@ -51,6 +51,10 @@ public class TableDeviceFetchNode extends
TableDeviceSourceNode {
this.deviceIdList = deviceIdList;
}
+ public void addDeviceId(Object[] deviceId) {
+ deviceIdList.add(deviceId);
+ }
+
public List<Object[]> getDeviceIdList() {
return deviceIdList;
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/metedata/read/TableDeviceSourceNode.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/metedata/read/TableDeviceSourceNode.java
index 0bc6ee2da94..f2cb19041c2 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/metedata/read/TableDeviceSourceNode.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/metedata/read/TableDeviceSourceNode.java
@@ -27,6 +27,7 @@ import
org.apache.iotdb.db.queryengine.plan.planner.plan.node.source.SourceNode;
import java.util.Collections;
import java.util.List;
+import java.util.Objects;
import java.util.stream.Collectors;
public abstract class TableDeviceSourceNode extends SourceNode {
@@ -97,4 +98,26 @@ public abstract class TableDeviceSourceNode extends
SourceNode {
public int allowedChildCount() {
return 0;
}
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+ if (!super.equals(o)) {
+ return false;
+ }
+ TableDeviceSourceNode that = (TableDeviceSourceNode) o;
+ return Objects.equals(database, that.database)
+ && Objects.equals(tableName, that.tableName)
+ && Objects.equals(schemaRegionReplicaSet, that.schemaRegionReplicaSet);
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(super.hashCode(), database, tableName,
schemaRegionReplicaSet);
+ }
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/metadata/fetcher/TableDeviceSchemaValidator.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/metadata/fetcher/TableDeviceSchemaValidator.java
index 8dd2f5885c1..9f179937934 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/metadata/fetcher/TableDeviceSchemaValidator.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/metadata/fetcher/TableDeviceSchemaValidator.java
@@ -119,7 +119,7 @@ public class TableDeviceSchemaValidator {
}
// we need to truncate the tailing null
- private String[] parseDeviceIdArray(Object[] objects) {
+ public static String[] parseDeviceIdArray(Object[] objects) {
String[] strings = new String[objects.length];
int lastNonNullIndex = -1;
for (int i = 0; i < objects.length; i++) {
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/distribute/AddExchangeNodes.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/distribute/AddExchangeNodes.java
index 61f993ee600..f8c12841ee9 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/distribute/AddExchangeNodes.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/distribute/AddExchangeNodes.java
@@ -25,6 +25,9 @@ import
org.apache.iotdb.db.queryengine.plan.planner.distribution.NodeDistributio
import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNode;
import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanVisitor;
import org.apache.iotdb.db.queryengine.plan.planner.plan.node.WritePlanNode;
+import
org.apache.iotdb.db.queryengine.plan.planner.plan.node.metedata.read.TableDeviceFetchNode;
+import
org.apache.iotdb.db.queryengine.plan.planner.plan.node.metedata.read.TableDeviceQueryNode;
+import
org.apache.iotdb.db.queryengine.plan.planner.plan.node.metedata.read.TableDeviceSourceNode;
import
org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.ExchangeNode;
import
org.apache.iotdb.db.queryengine.plan.relational.planner.node.TableScanNode;
@@ -72,6 +75,7 @@ public class AddExchangeNodes extends PlanVisitor<PlanNode,
DistributedPlanGener
ExchangeNode exchangeNode = new
ExchangeNode(queryContext.getQueryId().genPlanNodeId());
exchangeNode.addChild(rewriteNode);
newNode.addChild(exchangeNode);
+ context.hasExchangeNode = true;
} else {
newNode.addChild(rewriteNode);
}
@@ -91,4 +95,24 @@ public class AddExchangeNodes extends PlanVisitor<PlanNode,
DistributedPlanGener
new NodeDistribution(SAME_WITH_ALL_CHILDREN,
node.getRegionReplicaSet()));
return node;
}
+
+ @Override
+ public PlanNode visitTableDeviceFetch(
+ TableDeviceFetchNode node, DistributedPlanGenerator.PlanContext context)
{
+ return processTableDeviceSourceNode(node, context);
+ }
+
+ @Override
+ public PlanNode visitTableDeviceQuery(
+ TableDeviceQueryNode node, DistributedPlanGenerator.PlanContext context)
{
+ return processTableDeviceSourceNode(node, context);
+ }
+
+ private PlanNode processTableDeviceSourceNode(
+ TableDeviceSourceNode node, DistributedPlanGenerator.PlanContext
context) {
+ context.nodeDistributionMap.put(
+ node.getPlanNodeId(),
+ new NodeDistribution(SAME_WITH_ALL_CHILDREN,
node.getRegionReplicaSet()));
+ return node;
+ }
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/distribute/DistributedPlanGenerator.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/distribute/DistributedPlanGenerator.java
index 0bf5bb32a1b..61543eda7d1 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/distribute/DistributedPlanGenerator.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/distribute/DistributedPlanGenerator.java
@@ -14,20 +14,19 @@
package org.apache.iotdb.db.queryengine.plan.relational.planner.distribute;
import org.apache.iotdb.common.rpc.thrift.TRegionReplicaSet;
+import org.apache.iotdb.common.rpc.thrift.TSeriesPartitionSlot;
+import org.apache.iotdb.commons.partition.SchemaPartition;
import org.apache.iotdb.commons.schema.table.column.TsTableColumnCategory;
import org.apache.iotdb.db.queryengine.common.MPPQueryContext;
import org.apache.iotdb.db.queryengine.common.QueryId;
import
org.apache.iotdb.db.queryengine.plan.planner.distribution.NodeDistribution;
-import
org.apache.iotdb.db.queryengine.plan.planner.distribution.NodeDistributionType;
import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNode;
import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNodeId;
import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanVisitor;
import org.apache.iotdb.db.queryengine.plan.planner.plan.node.WritePlanNode;
-import
org.apache.iotdb.db.queryengine.plan.planner.plan.node.metedata.read.AbstractSchemaMergeNode;
-import
org.apache.iotdb.db.queryengine.plan.planner.plan.node.metedata.read.SchemaQueryMergeNode;
+import
org.apache.iotdb.db.queryengine.plan.planner.plan.node.metedata.read.TableDeviceFetchNode;
+import
org.apache.iotdb.db.queryengine.plan.planner.plan.node.metedata.read.TableDeviceQueryNode;
import
org.apache.iotdb.db.queryengine.plan.planner.plan.node.metedata.read.TableDeviceSourceNode;
-import
org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.ExchangeNode;
-import
org.apache.iotdb.db.queryengine.plan.planner.plan.node.source.SourceNode;
import org.apache.iotdb.db.queryengine.plan.relational.analyzer.Analysis;
import org.apache.iotdb.db.queryengine.plan.relational.metadata.DeviceEntry;
import org.apache.iotdb.db.queryengine.plan.relational.planner.OrderingScheme;
@@ -45,6 +44,8 @@ import
org.apache.iotdb.db.queryengine.plan.relational.planner.node.TableScanNod
import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.Expression;
import org.apache.iotdb.db.queryengine.plan.statement.component.Ordering;
+import org.apache.tsfile.file.metadata.IDeviceID;
+
import java.util.ArrayList;
import java.util.Collections;
import java.util.Comparator;
@@ -58,6 +59,8 @@ import java.util.stream.Collectors;
import java.util.stream.IntStream;
import static com.google.common.collect.ImmutableList.toImmutableList;
+import static org.apache.iotdb.commons.schema.SchemaConstant.ROOT;
+import static
org.apache.iotdb.db.queryengine.plan.relational.metadata.fetcher.TableDeviceSchemaValidator.parseDeviceIdArray;
import static
org.apache.iotdb.db.queryengine.plan.relational.planner.optimizations.PushPredicateIntoTableScan.containsDiffFunction;
import static org.apache.iotdb.db.utils.constant.TestConstant.TIMESTAMP_STR;
@@ -74,7 +77,16 @@ public class DistributedPlanGenerator
}
public List<PlanNode> genResult(PlanNode node, PlanContext context) {
- return node.accept(this, context);
+ List<PlanNode> res = node.accept(this, context);
+ if (res.size() == 1) {
+ return res;
+ } else if (res.size() > 1) {
+ CollectNode collectNode = new CollectNode(queryId.genPlanNodeId());
+ res.forEach(collectNode::addChild);
+ return Collections.singletonList(collectNode);
+ } else {
+ throw new IllegalStateException("List<PlanNode>.size should >= 1, but
now is 0");
+ }
}
@Override
@@ -273,8 +285,6 @@ public class DistributedPlanGenerator
}
}
- context.hasExchangeNode = tableScanNodeMap.size() > 1;
-
List<PlanNode> resultTableScanNodeList = new ArrayList<>();
TRegionReplicaSet mostUsedDataRegion = null;
int maxDeviceEntrySizeOfTableScan = 0;
@@ -401,18 +411,10 @@ public class DistributedPlanGenerator
}
// ------------------- schema related interface
---------------------------------------------
-
@Override
- public List<PlanNode> visitSchemaQueryMerge(SchemaQueryMergeNode node,
PlanContext context) {
- return Collections.singletonList(
- addExchangeNodeForSchemaMerge(rewriteSchemaQuerySource(node, context),
context));
- }
-
- private SchemaQueryMergeNode rewriteSchemaQuerySource(
- SchemaQueryMergeNode node, PlanContext context) {
- SchemaQueryMergeNode root = (SchemaQueryMergeNode) node.clone();
-
- String database = ((TableDeviceSourceNode)
node.getChildren().get(0)).getDatabase();
+ public List<PlanNode> visitTableDeviceQuery(TableDeviceQueryNode node,
PlanContext context) {
+ String database =
+ ROOT + "." + ((TableDeviceSourceNode)
node.getChildren().get(0)).getDatabase();
Set<TRegionReplicaSet> schemaRegionSet = new HashSet<>();
analysis
.getSchemaPartitionInfo()
@@ -421,54 +423,77 @@ public class DistributedPlanGenerator
.forEach(
(deviceGroupId, schemaRegionReplicaSet) ->
schemaRegionSet.add(schemaRegionReplicaSet));
- for (PlanNode child : node.getChildren()) {
+ context.mostUsedDataRegion = schemaRegionSet.iterator().next();
+ if (schemaRegionSet.size() == 1) {
+ node.setRegionReplicaSet(schemaRegionSet.iterator().next());
+ return Collections.singletonList(node);
+ } else {
+ List<PlanNode> res = new ArrayList<>(schemaRegionSet.size());
for (TRegionReplicaSet schemaRegion : schemaRegionSet) {
- SourceNode clonedChild = (SourceNode) child.clone();
+ TableDeviceQueryNode clonedChild = (TableDeviceQueryNode) node.clone();
clonedChild.setPlanNodeId(queryId.genPlanNodeId());
clonedChild.setRegionReplicaSet(schemaRegion);
- root.addChild(clonedChild);
+ res.add(clonedChild);
}
+ return res;
}
- return root;
}
- private PlanNode addExchangeNodeForSchemaMerge(
- AbstractSchemaMergeNode node, PlanContext context) {
- node.getChildren()
- .forEach(
- child ->
- context.putNodeDistribution(
- child.getPlanNodeId(),
- new NodeDistribution(
- NodeDistributionType.NO_CHILD,
- ((SourceNode) child).getRegionReplicaSet())));
- NodeDistribution nodeDistribution =
- new NodeDistribution(NodeDistributionType.DIFFERENT_FROM_ALL_CHILDREN);
- PlanNode newNode = node.clone();
-
nodeDistribution.setRegion(calculateSchemaRegionByChildren(node.getChildren(),
context));
- context.putNodeDistribution(newNode.getPlanNodeId(), nodeDistribution);
- node.getChildren()
- .forEach(
- child -> {
- if (!nodeDistribution
- .getRegion()
-
.equals(context.getNodeDistribution(child.getPlanNodeId()).getRegion())) {
- ExchangeNode exchangeNode = new
ExchangeNode(queryId.genPlanNodeId());
- exchangeNode.setChild(child);
-
exchangeNode.setOutputColumnNames(child.getOutputColumnNames());
- context.hasExchangeNode = true;
- newNode.addChild(exchangeNode);
- } else {
- newNode.addChild(child);
- }
- });
- return newNode;
- }
+ @Override
+ public List<PlanNode> visitTableDeviceFetch(TableDeviceFetchNode node,
PlanContext context) {
+ String database =
+ ROOT + "." + ((TableDeviceSourceNode)
node.getChildren().get(0)).getDatabase();
+ Set<TRegionReplicaSet> schemaRegionSet = new HashSet<>();
+ SchemaPartition schemaPartition = analysis.getSchemaPartitionInfo();
+ Map<TSeriesPartitionSlot, TRegionReplicaSet> databaseMap =
+ schemaPartition.getSchemaPartitionMap().get(database);
- private TRegionReplicaSet calculateSchemaRegionByChildren(
- List<PlanNode> children, PlanContext context) {
- // We always make the schemaRegion of SchemaMergeNode to be the same as
its first child.
- return
context.getNodeDistribution(children.get(0).getPlanNodeId()).getRegion();
+ databaseMap.forEach(
+ (deviceGroupId, schemaRegionReplicaSet) ->
schemaRegionSet.add(schemaRegionReplicaSet));
+
+ if (schemaRegionSet.size() == 1) {
+ context.mostUsedDataRegion = schemaRegionSet.iterator().next();
+ node.setRegionReplicaSet(context.mostUsedDataRegion);
+ return Collections.singletonList(node);
+ } else {
+ Map<TRegionReplicaSet, TableDeviceFetchNode> tableDeviceFetchMap = new
HashMap<>();
+
+ for (Object[] deviceIdArray : node.getDeviceIdList()) {
+ IDeviceID deviceID =
+
IDeviceID.Factory.DEFAULT_FACTORY.create(parseDeviceIdArray(deviceIdArray));
+ TRegionReplicaSet regionReplicaSet =
+ databaseMap.get(schemaPartition.calculateDeviceGroupId(deviceID));
+ tableDeviceFetchMap
+ .computeIfAbsent(
+ regionReplicaSet,
+ k ->
+ new TableDeviceFetchNode(
+ queryId.genPlanNodeId(),
+ node.getDatabase(),
+ node.getTableName(),
+ new ArrayList<>(),
+ node.getColumnHeaderList(),
+ regionReplicaSet))
+ .addDeviceId(deviceIdArray);
+ }
+
+ List<PlanNode> res = new ArrayList<>();
+ TRegionReplicaSet mostUsedDataRegion = null;
+ int maxDeviceEntrySizeOfTableScan = 0;
+ for (Map.Entry<TRegionReplicaSet, TableDeviceFetchNode> entry :
+ tableDeviceFetchMap.entrySet()) {
+ TRegionReplicaSet regionReplicaSet = entry.getKey();
+ TableDeviceFetchNode subTableDeviceFetchNode = entry.getValue();
+ res.add(subTableDeviceFetchNode);
+
+ if (subTableDeviceFetchNode.getDeviceIdList().size() >
maxDeviceEntrySizeOfTableScan) {
+ mostUsedDataRegion = regionReplicaSet;
+ maxDeviceEntrySizeOfTableScan =
subTableDeviceFetchNode.getDeviceIdList().size();
+ }
+ }
+ context.mostUsedDataRegion = mostUsedDataRegion;
+ return res;
+ }
}
public static class PlanContext {
@@ -485,9 +510,5 @@ public class DistributedPlanGenerator
public NodeDistribution getNodeDistribution(PlanNodeId nodeId) {
return this.nodeDistributionMap.get(nodeId);
}
-
- public void putNodeDistribution(PlanNodeId nodeId, NodeDistribution
distribution) {
- this.nodeDistributionMap.put(nodeId, distribution);
- }
}
}
diff --git
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/partition/Partition.java
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/partition/Partition.java
index 3bb5a292a94..b074e5bae16 100644
---
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/partition/Partition.java
+++
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/partition/Partition.java
@@ -41,7 +41,7 @@ public abstract class Partition {
seriesSlotExecutorName, seriesPartitionSlotNum);
}
- protected TSeriesPartitionSlot calculateDeviceGroupId(IDeviceID deviceID) {
+ public TSeriesPartitionSlot calculateDeviceGroupId(IDeviceID deviceID) {
return executor.getSeriesPartitionSlot(deviceID);
}