This is an automated email from the ASF dual-hosted git repository. jackietien pushed a commit to branch ty/RefineSchemaNode in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit 4a37a9aadb6e9b567aa7fc9f2e157d6e6bb630e1 Author: JackieTien97 <[email protected]> AuthorDate: Tue Jul 9 18:39:06 2024 +0800 Correct the way Schema related PlanNode in DistributionPlanner --- .../plan/planner/LocalExecutionPlanner.java | 36 ++--- .../plan/planner/TableOperatorGenerator.java | 19 --- .../node/metedata/read/TableDeviceFetchNode.java | 6 +- .../node/metedata/read/TableDeviceSourceNode.java | 23 ++++ .../fetcher/TableDeviceSchemaValidator.java | 2 +- .../planner/distribute/AddExchangeNodes.java | 24 ++++ .../distribute/DistributedPlanGenerator.java | 147 ++++++++++++--------- .../apache/iotdb/commons/partition/Partition.java | 2 +- 8 files changed, 159 insertions(+), 100 deletions(-) diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/LocalExecutionPlanner.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/LocalExecutionPlanner.java index 3b74f1b4a08..1f794df63fb 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/LocalExecutionPlanner.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/LocalExecutionPlanner.java @@ -92,20 +92,7 @@ public class LocalExecutionPlanner { LocalExecutionPlanContext context = new LocalExecutionPlanContext(types, instanceContext, dataNodeQueryContext); - // Generate pipelines, return the last pipeline data structure - // TODO Replace operator with operatorFactory to build multiple driver for one pipeline - Operator root; - IClientSession.SqlDialect sqlDialect = instanceContext.getSessionInfo().getSqlDialect(); - switch (sqlDialect) { - case TREE: - root = plan.accept(new OperatorTreeGenerator(), context); - break; - case TABLE: - root = plan.accept(new TableOperatorGenerator(metadata), context); - break; - default: - throw new IllegalArgumentException(String.format("Unknown sql dialect: %s", sqlDialect)); - } + Operator root = generateOperator(instanceContext, context, plan); PipelineMemoryEstimator memoryEstimator = context.constructPipelineMemoryEstimator(root, null, plan, -1); @@ -136,7 +123,7 @@ public class LocalExecutionPlanner { LocalExecutionPlanContext context = new LocalExecutionPlanContext(instanceContext, schemaRegion); - Operator root = plan.accept(new OperatorTreeGenerator(), context); + Operator root = generateOperator(instanceContext, context, plan); PipelineMemoryEstimator memoryEstimator = context.constructPipelineMemoryEstimator(root, null, plan, -1); @@ -154,6 +141,25 @@ public class LocalExecutionPlanner { return context.getPipelineDriverFactories(); } + private Operator generateOperator( + FragmentInstanceContext instanceContext, LocalExecutionPlanContext context, PlanNode node) { + // Generate pipelines, return the last pipeline data structure + // TODO Replace operator with operatorFactory to build multiple driver for one pipeline + Operator root; + IClientSession.SqlDialect sqlDialect = instanceContext.getSessionInfo().getSqlDialect(); + switch (sqlDialect) { + case TREE: + root = node.accept(new OperatorTreeGenerator(), context); + break; + case TABLE: + root = node.accept(new TableOperatorGenerator(metadata), context); + break; + default: + throw new IllegalArgumentException(String.format("Unknown sql dialect: %s", sqlDialect)); + } + return root; + } + private long checkMemory( final PipelineMemoryEstimator memoryEstimator, FragmentInstanceStateMachine stateMachine) throws MemoryNotEnoughException { diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/TableOperatorGenerator.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/TableOperatorGenerator.java index a26f284e0d1..b1ba6ded7f0 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/TableOperatorGenerator.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/TableOperatorGenerator.java @@ -40,7 +40,6 @@ import org.apache.iotdb.db.queryengine.execution.operator.process.OffsetOperator import org.apache.iotdb.db.queryengine.execution.operator.process.SortOperator; import org.apache.iotdb.db.queryengine.execution.operator.process.StreamSortOperator; import org.apache.iotdb.db.queryengine.execution.operator.process.TopKOperator; -import org.apache.iotdb.db.queryengine.execution.operator.schema.SchemaQueryMergeOperator; import org.apache.iotdb.db.queryengine.execution.operator.schema.SchemaQueryScanOperator; import org.apache.iotdb.db.queryengine.execution.operator.schema.source.SchemaSourceFactory; import org.apache.iotdb.db.queryengine.execution.operator.sink.IdentitySinkOperator; @@ -53,7 +52,6 @@ import org.apache.iotdb.db.queryengine.plan.analyze.TypeProvider; import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNode; import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNodeId; import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanVisitor; -import org.apache.iotdb.db.queryengine.plan.planner.plan.node.metedata.read.SchemaQueryMergeNode; import org.apache.iotdb.db.queryengine.plan.planner.plan.node.metedata.read.TableDeviceFetchNode; import org.apache.iotdb.db.queryengine.plan.planner.plan.node.metedata.read.TableDeviceQueryNode; import org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.ExchangeNode; @@ -779,23 +777,6 @@ public class TableOperatorGenerator extends PlanVisitor<Operator, LocalExecution TSFileDescriptor.getInstance().getConfig().getMaxTsBlockLineNumber()); } - @Override - public Operator visitSchemaQueryMerge( - SchemaQueryMergeNode node, LocalExecutionPlanContext context) { - List<Operator> children = new ArrayList<>(node.getChildren().size()); - for (PlanNode child : node.getChildren()) { - children.add(child.accept(this, context)); - } - OperatorContext operatorContext = - context - .getDriverContext() - .addOperatorContext( - context.getNextOperatorId(), - node.getPlanNodeId(), - SchemaQueryMergeOperator.class.getSimpleName()); - return new SchemaQueryMergeOperator(operatorContext, children); - } - @Override public Operator visitTableDeviceFetch( TableDeviceFetchNode node, LocalExecutionPlanContext context) { diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/metedata/read/TableDeviceFetchNode.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/metedata/read/TableDeviceFetchNode.java index 28a06fda562..d8e0f5e9028 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/metedata/read/TableDeviceFetchNode.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/metedata/read/TableDeviceFetchNode.java @@ -38,7 +38,7 @@ import java.util.stream.Collectors; public class TableDeviceFetchNode extends TableDeviceSourceNode { - private List<Object[]> deviceIdList; + private final List<Object[]> deviceIdList; public TableDeviceFetchNode( PlanNodeId id, @@ -51,6 +51,10 @@ public class TableDeviceFetchNode extends TableDeviceSourceNode { this.deviceIdList = deviceIdList; } + public void addDeviceId(Object[] deviceId) { + deviceIdList.add(deviceId); + } + public List<Object[]> getDeviceIdList() { return deviceIdList; } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/metedata/read/TableDeviceSourceNode.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/metedata/read/TableDeviceSourceNode.java index 0bc6ee2da94..f2cb19041c2 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/metedata/read/TableDeviceSourceNode.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/metedata/read/TableDeviceSourceNode.java @@ -27,6 +27,7 @@ import org.apache.iotdb.db.queryengine.plan.planner.plan.node.source.SourceNode; import java.util.Collections; import java.util.List; +import java.util.Objects; import java.util.stream.Collectors; public abstract class TableDeviceSourceNode extends SourceNode { @@ -97,4 +98,26 @@ public abstract class TableDeviceSourceNode extends SourceNode { public int allowedChildCount() { return 0; } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + if (!super.equals(o)) { + return false; + } + TableDeviceSourceNode that = (TableDeviceSourceNode) o; + return Objects.equals(database, that.database) + && Objects.equals(tableName, that.tableName) + && Objects.equals(schemaRegionReplicaSet, that.schemaRegionReplicaSet); + } + + @Override + public int hashCode() { + return Objects.hash(super.hashCode(), database, tableName, schemaRegionReplicaSet); + } } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/metadata/fetcher/TableDeviceSchemaValidator.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/metadata/fetcher/TableDeviceSchemaValidator.java index 8dd2f5885c1..9f179937934 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/metadata/fetcher/TableDeviceSchemaValidator.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/metadata/fetcher/TableDeviceSchemaValidator.java @@ -119,7 +119,7 @@ public class TableDeviceSchemaValidator { } // we need to truncate the tailing null - private String[] parseDeviceIdArray(Object[] objects) { + public static String[] parseDeviceIdArray(Object[] objects) { String[] strings = new String[objects.length]; int lastNonNullIndex = -1; for (int i = 0; i < objects.length; i++) { diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/distribute/AddExchangeNodes.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/distribute/AddExchangeNodes.java index 61f993ee600..f8c12841ee9 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/distribute/AddExchangeNodes.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/distribute/AddExchangeNodes.java @@ -25,6 +25,9 @@ import org.apache.iotdb.db.queryengine.plan.planner.distribution.NodeDistributio import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNode; import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanVisitor; import org.apache.iotdb.db.queryengine.plan.planner.plan.node.WritePlanNode; +import org.apache.iotdb.db.queryengine.plan.planner.plan.node.metedata.read.TableDeviceFetchNode; +import org.apache.iotdb.db.queryengine.plan.planner.plan.node.metedata.read.TableDeviceQueryNode; +import org.apache.iotdb.db.queryengine.plan.planner.plan.node.metedata.read.TableDeviceSourceNode; import org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.ExchangeNode; import org.apache.iotdb.db.queryengine.plan.relational.planner.node.TableScanNode; @@ -72,6 +75,7 @@ public class AddExchangeNodes extends PlanVisitor<PlanNode, DistributedPlanGener ExchangeNode exchangeNode = new ExchangeNode(queryContext.getQueryId().genPlanNodeId()); exchangeNode.addChild(rewriteNode); newNode.addChild(exchangeNode); + context.hasExchangeNode = true; } else { newNode.addChild(rewriteNode); } @@ -91,4 +95,24 @@ public class AddExchangeNodes extends PlanVisitor<PlanNode, DistributedPlanGener new NodeDistribution(SAME_WITH_ALL_CHILDREN, node.getRegionReplicaSet())); return node; } + + @Override + public PlanNode visitTableDeviceFetch( + TableDeviceFetchNode node, DistributedPlanGenerator.PlanContext context) { + return processTableDeviceSourceNode(node, context); + } + + @Override + public PlanNode visitTableDeviceQuery( + TableDeviceQueryNode node, DistributedPlanGenerator.PlanContext context) { + return processTableDeviceSourceNode(node, context); + } + + private PlanNode processTableDeviceSourceNode( + TableDeviceSourceNode node, DistributedPlanGenerator.PlanContext context) { + context.nodeDistributionMap.put( + node.getPlanNodeId(), + new NodeDistribution(SAME_WITH_ALL_CHILDREN, node.getRegionReplicaSet())); + return node; + } } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/distribute/DistributedPlanGenerator.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/distribute/DistributedPlanGenerator.java index 0bf5bb32a1b..61543eda7d1 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/distribute/DistributedPlanGenerator.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/distribute/DistributedPlanGenerator.java @@ -14,20 +14,19 @@ package org.apache.iotdb.db.queryengine.plan.relational.planner.distribute; import org.apache.iotdb.common.rpc.thrift.TRegionReplicaSet; +import org.apache.iotdb.common.rpc.thrift.TSeriesPartitionSlot; +import org.apache.iotdb.commons.partition.SchemaPartition; import org.apache.iotdb.commons.schema.table.column.TsTableColumnCategory; import org.apache.iotdb.db.queryengine.common.MPPQueryContext; import org.apache.iotdb.db.queryengine.common.QueryId; import org.apache.iotdb.db.queryengine.plan.planner.distribution.NodeDistribution; -import org.apache.iotdb.db.queryengine.plan.planner.distribution.NodeDistributionType; import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNode; import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNodeId; import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanVisitor; import org.apache.iotdb.db.queryengine.plan.planner.plan.node.WritePlanNode; -import org.apache.iotdb.db.queryengine.plan.planner.plan.node.metedata.read.AbstractSchemaMergeNode; -import org.apache.iotdb.db.queryengine.plan.planner.plan.node.metedata.read.SchemaQueryMergeNode; +import org.apache.iotdb.db.queryengine.plan.planner.plan.node.metedata.read.TableDeviceFetchNode; +import org.apache.iotdb.db.queryengine.plan.planner.plan.node.metedata.read.TableDeviceQueryNode; import org.apache.iotdb.db.queryengine.plan.planner.plan.node.metedata.read.TableDeviceSourceNode; -import org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.ExchangeNode; -import org.apache.iotdb.db.queryengine.plan.planner.plan.node.source.SourceNode; import org.apache.iotdb.db.queryengine.plan.relational.analyzer.Analysis; import org.apache.iotdb.db.queryengine.plan.relational.metadata.DeviceEntry; import org.apache.iotdb.db.queryengine.plan.relational.planner.OrderingScheme; @@ -45,6 +44,8 @@ import org.apache.iotdb.db.queryengine.plan.relational.planner.node.TableScanNod import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.Expression; import org.apache.iotdb.db.queryengine.plan.statement.component.Ordering; +import org.apache.tsfile.file.metadata.IDeviceID; + import java.util.ArrayList; import java.util.Collections; import java.util.Comparator; @@ -58,6 +59,8 @@ import java.util.stream.Collectors; import java.util.stream.IntStream; import static com.google.common.collect.ImmutableList.toImmutableList; +import static org.apache.iotdb.commons.schema.SchemaConstant.ROOT; +import static org.apache.iotdb.db.queryengine.plan.relational.metadata.fetcher.TableDeviceSchemaValidator.parseDeviceIdArray; import static org.apache.iotdb.db.queryengine.plan.relational.planner.optimizations.PushPredicateIntoTableScan.containsDiffFunction; import static org.apache.iotdb.db.utils.constant.TestConstant.TIMESTAMP_STR; @@ -74,7 +77,16 @@ public class DistributedPlanGenerator } public List<PlanNode> genResult(PlanNode node, PlanContext context) { - return node.accept(this, context); + List<PlanNode> res = node.accept(this, context); + if (res.size() == 1) { + return res; + } else if (res.size() > 1) { + CollectNode collectNode = new CollectNode(queryId.genPlanNodeId()); + res.forEach(collectNode::addChild); + return Collections.singletonList(collectNode); + } else { + throw new IllegalStateException("List<PlanNode>.size should >= 1, but now is 0"); + } } @Override @@ -273,8 +285,6 @@ public class DistributedPlanGenerator } } - context.hasExchangeNode = tableScanNodeMap.size() > 1; - List<PlanNode> resultTableScanNodeList = new ArrayList<>(); TRegionReplicaSet mostUsedDataRegion = null; int maxDeviceEntrySizeOfTableScan = 0; @@ -401,18 +411,10 @@ public class DistributedPlanGenerator } // ------------------- schema related interface --------------------------------------------- - @Override - public List<PlanNode> visitSchemaQueryMerge(SchemaQueryMergeNode node, PlanContext context) { - return Collections.singletonList( - addExchangeNodeForSchemaMerge(rewriteSchemaQuerySource(node, context), context)); - } - - private SchemaQueryMergeNode rewriteSchemaQuerySource( - SchemaQueryMergeNode node, PlanContext context) { - SchemaQueryMergeNode root = (SchemaQueryMergeNode) node.clone(); - - String database = ((TableDeviceSourceNode) node.getChildren().get(0)).getDatabase(); + public List<PlanNode> visitTableDeviceQuery(TableDeviceQueryNode node, PlanContext context) { + String database = + ROOT + "." + ((TableDeviceSourceNode) node.getChildren().get(0)).getDatabase(); Set<TRegionReplicaSet> schemaRegionSet = new HashSet<>(); analysis .getSchemaPartitionInfo() @@ -421,54 +423,77 @@ public class DistributedPlanGenerator .forEach( (deviceGroupId, schemaRegionReplicaSet) -> schemaRegionSet.add(schemaRegionReplicaSet)); - for (PlanNode child : node.getChildren()) { + context.mostUsedDataRegion = schemaRegionSet.iterator().next(); + if (schemaRegionSet.size() == 1) { + node.setRegionReplicaSet(schemaRegionSet.iterator().next()); + return Collections.singletonList(node); + } else { + List<PlanNode> res = new ArrayList<>(schemaRegionSet.size()); for (TRegionReplicaSet schemaRegion : schemaRegionSet) { - SourceNode clonedChild = (SourceNode) child.clone(); + TableDeviceQueryNode clonedChild = (TableDeviceQueryNode) node.clone(); clonedChild.setPlanNodeId(queryId.genPlanNodeId()); clonedChild.setRegionReplicaSet(schemaRegion); - root.addChild(clonedChild); + res.add(clonedChild); } + return res; } - return root; } - private PlanNode addExchangeNodeForSchemaMerge( - AbstractSchemaMergeNode node, PlanContext context) { - node.getChildren() - .forEach( - child -> - context.putNodeDistribution( - child.getPlanNodeId(), - new NodeDistribution( - NodeDistributionType.NO_CHILD, - ((SourceNode) child).getRegionReplicaSet()))); - NodeDistribution nodeDistribution = - new NodeDistribution(NodeDistributionType.DIFFERENT_FROM_ALL_CHILDREN); - PlanNode newNode = node.clone(); - nodeDistribution.setRegion(calculateSchemaRegionByChildren(node.getChildren(), context)); - context.putNodeDistribution(newNode.getPlanNodeId(), nodeDistribution); - node.getChildren() - .forEach( - child -> { - if (!nodeDistribution - .getRegion() - .equals(context.getNodeDistribution(child.getPlanNodeId()).getRegion())) { - ExchangeNode exchangeNode = new ExchangeNode(queryId.genPlanNodeId()); - exchangeNode.setChild(child); - exchangeNode.setOutputColumnNames(child.getOutputColumnNames()); - context.hasExchangeNode = true; - newNode.addChild(exchangeNode); - } else { - newNode.addChild(child); - } - }); - return newNode; - } + @Override + public List<PlanNode> visitTableDeviceFetch(TableDeviceFetchNode node, PlanContext context) { + String database = + ROOT + "." + ((TableDeviceSourceNode) node.getChildren().get(0)).getDatabase(); + Set<TRegionReplicaSet> schemaRegionSet = new HashSet<>(); + SchemaPartition schemaPartition = analysis.getSchemaPartitionInfo(); + Map<TSeriesPartitionSlot, TRegionReplicaSet> databaseMap = + schemaPartition.getSchemaPartitionMap().get(database); - private TRegionReplicaSet calculateSchemaRegionByChildren( - List<PlanNode> children, PlanContext context) { - // We always make the schemaRegion of SchemaMergeNode to be the same as its first child. - return context.getNodeDistribution(children.get(0).getPlanNodeId()).getRegion(); + databaseMap.forEach( + (deviceGroupId, schemaRegionReplicaSet) -> schemaRegionSet.add(schemaRegionReplicaSet)); + + if (schemaRegionSet.size() == 1) { + context.mostUsedDataRegion = schemaRegionSet.iterator().next(); + node.setRegionReplicaSet(context.mostUsedDataRegion); + return Collections.singletonList(node); + } else { + Map<TRegionReplicaSet, TableDeviceFetchNode> tableDeviceFetchMap = new HashMap<>(); + + for (Object[] deviceIdArray : node.getDeviceIdList()) { + IDeviceID deviceID = + IDeviceID.Factory.DEFAULT_FACTORY.create(parseDeviceIdArray(deviceIdArray)); + TRegionReplicaSet regionReplicaSet = + databaseMap.get(schemaPartition.calculateDeviceGroupId(deviceID)); + tableDeviceFetchMap + .computeIfAbsent( + regionReplicaSet, + k -> + new TableDeviceFetchNode( + queryId.genPlanNodeId(), + node.getDatabase(), + node.getTableName(), + new ArrayList<>(), + node.getColumnHeaderList(), + regionReplicaSet)) + .addDeviceId(deviceIdArray); + } + + List<PlanNode> res = new ArrayList<>(); + TRegionReplicaSet mostUsedDataRegion = null; + int maxDeviceEntrySizeOfTableScan = 0; + for (Map.Entry<TRegionReplicaSet, TableDeviceFetchNode> entry : + tableDeviceFetchMap.entrySet()) { + TRegionReplicaSet regionReplicaSet = entry.getKey(); + TableDeviceFetchNode subTableDeviceFetchNode = entry.getValue(); + res.add(subTableDeviceFetchNode); + + if (subTableDeviceFetchNode.getDeviceIdList().size() > maxDeviceEntrySizeOfTableScan) { + mostUsedDataRegion = regionReplicaSet; + maxDeviceEntrySizeOfTableScan = subTableDeviceFetchNode.getDeviceIdList().size(); + } + } + context.mostUsedDataRegion = mostUsedDataRegion; + return res; + } } public static class PlanContext { @@ -485,9 +510,5 @@ public class DistributedPlanGenerator public NodeDistribution getNodeDistribution(PlanNodeId nodeId) { return this.nodeDistributionMap.get(nodeId); } - - public void putNodeDistribution(PlanNodeId nodeId, NodeDistribution distribution) { - this.nodeDistributionMap.put(nodeId, distribution); - } } } diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/partition/Partition.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/partition/Partition.java index 3bb5a292a94..b074e5bae16 100644 --- a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/partition/Partition.java +++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/partition/Partition.java @@ -41,7 +41,7 @@ public abstract class Partition { seriesSlotExecutorName, seriesPartitionSlotNum); } - protected TSeriesPartitionSlot calculateDeviceGroupId(IDeviceID deviceID) { + public TSeriesPartitionSlot calculateDeviceGroupId(IDeviceID deviceID) { return executor.getSeriesPartitionSlot(deviceID); }
