This is an automated email from the ASF dual-hosted git repository. caogaofei pushed a commit to branch beyyes/TableModelGrammar_0627 in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit c7e2a13f7e38b3b44aeee7ac92e50b3c8e55b82d Author: Beyyes <[email protected]> AuthorDate: Wed Jul 3 15:05:36 2024 +0800 add more ut --- .../planner/distribute/AddExchangeNodes.java | 74 ++++++++- .../distribute/DistributedPlanGenerator.java | 133 ++++++++++++---- .../distribute/TableDistributionPlanner.java | 33 ++-- .../plan/relational/planner/node/CollectNode.java | 4 + .../plan/relational/analyzer/AnalyzerTest.java | 32 +++- .../analyzer/MockTableModelDataPartition.java | 166 ++++++++++++++++++++ .../relational/analyzer/MockTablePartition.java | 174 --------------------- .../plan/relational/analyzer/TestMatadata.java | 29 +++- .../iotdb/commons/partition/DataPartition.java | 17 +- 9 files changed, 426 insertions(+), 236 deletions(-) diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/distribute/AddExchangeNodes.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/distribute/AddExchangeNodes.java index e8ff90a5598..61f993ee600 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/distribute/AddExchangeNodes.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/distribute/AddExchangeNodes.java @@ -19,4 +19,76 @@ package org.apache.iotdb.db.queryengine.plan.relational.planner.distribute; -public class AddExchangeNodes {} +import org.apache.iotdb.common.rpc.thrift.TRegionReplicaSet; +import org.apache.iotdb.db.queryengine.common.MPPQueryContext; +import org.apache.iotdb.db.queryengine.plan.planner.distribution.NodeDistribution; +import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNode; +import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanVisitor; +import org.apache.iotdb.db.queryengine.plan.planner.plan.node.WritePlanNode; +import org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.ExchangeNode; +import org.apache.iotdb.db.queryengine.plan.relational.planner.node.TableScanNode; + +import static org.apache.iotdb.db.queryengine.plan.planner.distribution.NodeDistributionType.SAME_WITH_ALL_CHILDREN; +import static org.apache.iotdb.db.queryengine.plan.planner.distribution.NodeDistributionType.SAME_WITH_SOME_CHILD; + +public class AddExchangeNodes extends PlanVisitor<PlanNode, DistributedPlanGenerator.PlanContext> { + + private final MPPQueryContext queryContext; + + public AddExchangeNodes(MPPQueryContext queryContext) { + this.queryContext = queryContext; + } + + public PlanNode addExchangeNodes(PlanNode node, DistributedPlanGenerator.PlanContext context) { + return node.accept(this, context); + } + + @Override + public PlanNode visitPlan(PlanNode node, DistributedPlanGenerator.PlanContext context) { + if (node instanceof WritePlanNode) { + return node; + } + + PlanNode newNode = node.clone(); + if (node.getChildren().size() == 1) { + newNode.addChild(node.getChildren().get(0).accept(this, context)); + context.nodeDistributionMap.put( + node.getPlanNodeId(), + new NodeDistribution( + SAME_WITH_ALL_CHILDREN, + context + .nodeDistributionMap + .get(node.getChildren().get(0).getPlanNodeId()) + .getRegion())); + return newNode; + } + + for (PlanNode child : node.getChildren()) { + PlanNode rewriteNode = child.accept(this, context); + + TRegionReplicaSet region = + context.nodeDistributionMap.get(rewriteNode.getPlanNodeId()).getRegion(); + if (!region.equals(context.mostUsedDataRegion)) { + ExchangeNode exchangeNode = new ExchangeNode(queryContext.getQueryId().genPlanNodeId()); + exchangeNode.addChild(rewriteNode); + newNode.addChild(exchangeNode); + } else { + newNode.addChild(rewriteNode); + } + } + + context.nodeDistributionMap.put( + node.getPlanNodeId(), + new NodeDistribution(SAME_WITH_SOME_CHILD, context.mostUsedDataRegion)); + + return newNode; + } + + @Override + public PlanNode visitTableScan(TableScanNode node, DistributedPlanGenerator.PlanContext context) { + context.nodeDistributionMap.put( + node.getPlanNodeId(), + new NodeDistribution(SAME_WITH_ALL_CHILDREN, node.getRegionReplicaSet())); + return node; + } +} diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/distribute/DistributedPlanGenerator.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/distribute/DistributedPlanGenerator.java index 49f0b1257eb..3c605c78640 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/distribute/DistributedPlanGenerator.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/distribute/DistributedPlanGenerator.java @@ -14,6 +14,7 @@ package org.apache.iotdb.db.queryengine.plan.relational.planner.distribute; import org.apache.iotdb.common.rpc.thrift.TRegionReplicaSet; +import org.apache.iotdb.commons.schema.table.column.TsTableColumnCategory; import org.apache.iotdb.db.queryengine.common.MPPQueryContext; import org.apache.iotdb.db.queryengine.plan.planner.distribution.NodeDistribution; import org.apache.iotdb.db.queryengine.plan.planner.distribution.NodeDistributionType; @@ -44,15 +45,18 @@ import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.Expression; import java.util.ArrayList; import java.util.Collections; +import java.util.Comparator; import java.util.HashMap; import java.util.HashSet; import java.util.List; import java.util.Map; import java.util.Set; +import java.util.function.Function; import java.util.stream.Collectors; +import java.util.stream.IntStream; import static com.google.common.collect.ImmutableList.toImmutableList; -import static org.apache.iotdb.db.queryengine.plan.planner.distribution.NodeDistributionType.SAME_WITH_ALL_CHILDREN; +import static org.apache.iotdb.commons.conf.IoTDBConstant.TIME; import static org.apache.iotdb.db.queryengine.plan.relational.planner.optimizations.PushPredicateIntoTableScan.containsDiffFunction; public class DistributedPlanGenerator @@ -90,6 +94,7 @@ public class DistributedPlanGenerator @Override public List<PlanNode> visitOutput(OutputNode outputNode, PlanContext context) { + // TODO only consider the order of IDs context.expectedOrderingScheme = new OrderingScheme( outputNode.getOutputSymbols(), @@ -152,6 +157,7 @@ public class DistributedPlanGenerator @Override public List<PlanNode> visitSort(SortNode sortNode, PlanContext context) { context.expectedOrderingScheme = sortNode.getOrderingScheme(); + context.hasSortNode = true; List<PlanNode> childrenNodes = sortNode.getChild().accept(this, context); if (childrenNodes.size() == 1) { @@ -234,43 +240,100 @@ public class DistributedPlanGenerator } } - int i = 0; - if (tableScanNodeMap.size() > 1) { - context.hasExchangeNode = true; - List<Symbol> orderBy = node.getOutputSymbols().subList(0, 1); - Map<Symbol, SortOrder> orderings = - Collections.singletonMap(node.getOutputSymbols().get(0), SortOrder.ASC_NULLS_LAST); - OrderingScheme orderingScheme = new OrderingScheme(orderBy, orderings); - MergeSortNode mergeSortNode = - new MergeSortNode( - queryContext.getQueryId().genPlanNodeId(), orderingScheme, node.getOutputSymbols()); - - for (Map.Entry<TRegionReplicaSet, TableScanNode> entry : tableScanNodeMap.entrySet()) { - TRegionReplicaSet regionReplicaSet = entry.getKey(); - TableScanNode subTableScanNode = entry.getValue(); - subTableScanNode.setPlanNodeId(queryContext.getQueryId().genPlanNodeId()); - subTableScanNode.setRegionReplicaSet(regionReplicaSet); - context.nodeDistributionMap.put( - subTableScanNode.getPlanNodeId(), - new NodeDistribution(SAME_WITH_ALL_CHILDREN, regionReplicaSet)); - - // TODO not use 0 replica set as root replica? - if (i == 0) { - mergeSortNode.addChild(subTableScanNode); - context.nodeDistributionMap.put( - mergeSortNode.getPlanNodeId(), - new NodeDistribution(SAME_WITH_ALL_CHILDREN, regionReplicaSet)); + context.hasExchangeNode = tableScanNodeMap.size() > 1; + + List<PlanNode> tableScanNodeList = new ArrayList<>(); + TRegionReplicaSet mostUsedDataRegion = null; + int maxDeviceEntrySizeOfTableScan = 0; + for (Map.Entry<TRegionReplicaSet, TableScanNode> entry : tableScanNodeMap.entrySet()) { + TRegionReplicaSet regionReplicaSet = entry.getKey(); + TableScanNode subTableScanNode = entry.getValue(); + subTableScanNode.setPlanNodeId(queryContext.getQueryId().genPlanNodeId()); + subTableScanNode.setRegionReplicaSet(regionReplicaSet); + tableScanNodeList.add(subTableScanNode); + + if (mostUsedDataRegion == null + || subTableScanNode.getDeviceEntries().size() > maxDeviceEntrySizeOfTableScan) { + mostUsedDataRegion = regionReplicaSet; + maxDeviceEntrySizeOfTableScan = subTableScanNode.getDeviceEntries().size(); + } + } + context.mostUsedDataRegion = mostUsedDataRegion; + + List<Symbol> newOrderingSymbols = new ArrayList<>(); + List<SortOrder> newSortOrders = new ArrayList<>(); + OrderingScheme expectedOrderingScheme = context.expectedOrderingScheme; + + for (Symbol symbol : expectedOrderingScheme.getOrderBy()) { + if (!context.hasSortNode && TIME.equalsIgnoreCase(symbol.getName())) { + continue; + } + + if (!node.getIdAndAttributeIndexMap().containsKey(symbol)) { + break; + } + + newOrderingSymbols.add(symbol); + newSortOrders.add(expectedOrderingScheme.getOrdering(symbol)); + } + + List<Function<DeviceEntry, String>> orderingRules = new ArrayList<>(); + for (Symbol symbol : newOrderingSymbols) { + int idx = node.getIdAndAttributeIndexMap().get(symbol); + if (node.getAssignments().get(symbol).getColumnCategory() == TsTableColumnCategory.ID) { + // segments[0] is always tableName + orderingRules.add(deviceEntry -> (String) deviceEntry.getDeviceID().getSegments()[idx + 1]); + } else { + orderingRules.add(deviceEntry -> deviceEntry.getAttributeColumnValues().get(idx)); + } + } + + Comparator<DeviceEntry> comparator; + if (newSortOrders.get(0).isNullsFirst()) { + if (newSortOrders.get(0).isAscending()) { + comparator = Comparator.nullsFirst(Comparator.comparing(orderingRules.get(0))); + } else { + comparator = Comparator.nullsFirst(Comparator.comparing(orderingRules.get(0))).reversed(); + } + } else { + if (newSortOrders.get(0).isAscending()) { + comparator = Comparator.nullsLast(Comparator.comparing(orderingRules.get(0))); + } else { + comparator = Comparator.nullsLast(Comparator.comparing(orderingRules.get(0))).reversed(); + } + } + for (int i = 1; i < orderingRules.size(); i++) { + Comparator<DeviceEntry> thenComparator; + if (newSortOrders.get(i).isNullsFirst()) { + if (newSortOrders.get(i).isAscending()) { + thenComparator = Comparator.nullsFirst(Comparator.comparing(orderingRules.get(i))); + } else { + thenComparator = Comparator.nullsFirst(Comparator.comparing(orderingRules.get(i))).reversed(); + } + } else { + if (newSortOrders.get(i).isAscending()) { + thenComparator = Comparator.nullsLast(Comparator.comparing(orderingRules.get(i))); } else { - ExchangeNode exchangeNode = new ExchangeNode(queryContext.getQueryId().genPlanNodeId()); - exchangeNode.addChild(subTableScanNode); - mergeSortNode.addChild(exchangeNode); + thenComparator = Comparator.nullsLast(Comparator.comparing(orderingRules.get(i))).reversed(); } - i++; } - return Collections.singletonList(mergeSortNode); - } else { - return Collections.singletonList(tableScanNodeMap.entrySet().iterator().next().getValue()); + comparator = comparator.thenComparing(thenComparator); + } + + OrderingScheme newOrderingScheme = + new OrderingScheme( + newOrderingSymbols, + IntStream.range(0, newOrderingSymbols.size()) + .boxed() + .collect(Collectors.toMap(newOrderingSymbols::get, newSortOrders::get))); + for (PlanNode planNode : tableScanNodeList) { + TableScanNode tableScanNode = (TableScanNode) planNode; + planNodeOrderingSchemeMap.put(tableScanNode.getPlanNodeId(), newOrderingScheme); + List<DeviceEntry> deviceEntries = tableScanNode.getDeviceEntries(); + deviceEntries.sort(comparator); } + + return tableScanNodeList; } private List<PlanNode> connectViaMergeSort( @@ -363,7 +426,9 @@ public class DistributedPlanGenerator public static class PlanContext { final Map<PlanNodeId, NodeDistribution> nodeDistributionMap; boolean hasExchangeNode = false; + boolean hasSortNode = false; OrderingScheme expectedOrderingScheme; + TRegionReplicaSet mostUsedDataRegion; public PlanContext() { this.nodeDistributionMap = new HashMap<>(); diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/distribute/TableDistributionPlanner.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/distribute/TableDistributionPlanner.java index 8eb0f0c90fd..c72f2594043 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/distribute/TableDistributionPlanner.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/distribute/TableDistributionPlanner.java @@ -35,6 +35,7 @@ import java.util.List; import java.util.Map; import java.util.stream.Collectors; +import static com.google.common.base.Preconditions.checkArgument; import static org.apache.iotdb.db.queryengine.plan.expression.leaf.TimestampOperand.TIMESTAMP_EXPRESSION_STRING; public class TableDistributionPlanner { @@ -50,16 +51,18 @@ public class TableDistributionPlanner { } public DistributedQueryPlan plan() { + + // generate table model distributed plan DistributedPlanGenerator.PlanContext planContext = new DistributedPlanGenerator.PlanContext(); List<PlanNode> distributedPlanResult = new DistributedPlanGenerator(mppQueryContext, analysis) .genResult(logicalQueryPlan.getRootNode(), planContext); + checkArgument(distributedPlanResult.size() == 1, "Root node must return only one"); - if (distributedPlanResult.size() != 1) { - throw new IllegalStateException("root node must return only one"); - } - - PlanNode outputNodeWithExchange = distributedPlanResult.get(0); + // add exchange node for distributed plan + PlanNode outputNodeWithExchange = + new AddExchangeNodes(mppQueryContext) + .addExchangeNodes(distributedPlanResult.get(0), planContext); if (analysis.getStatement() instanceof Query) { analysis .getRespDatasetHeader() @@ -71,17 +74,19 @@ public class TableDistributionPlanner { } adjustUpStream(outputNodeWithExchange, planContext); + // generate subPlan SubPlan subPlan = new SubPlanGenerator() .splitToSubPlan(logicalQueryPlan.getContext().getQueryId(), outputNodeWithExchange); subPlan.getPlanFragment().setRoot(true); + // generate fragment instances List<FragmentInstance> fragmentInstances = mppQueryContext.getQueryType() == QueryType.READ ? new TableModelQueryFragmentPlanner(subPlan, analysis, mppQueryContext).plan() : new WriteFragmentParallelPlanner(subPlan, analysis, mppQueryContext).parallelPlan(); - // Only execute this step for READ operation + // only execute this step for READ operation if (mppQueryContext.getQueryType() == QueryType.READ) { setSinkForRootInstance(subPlan, fragmentInstances); } @@ -124,32 +129,32 @@ public class TableDistributionPlanner { rootInstance.getFragment().setPlanNodeTree(sinkNode); } - private void adjustUpStream(PlanNode root, DistributedPlanGenerator.PlanContext exchangeContext) { - if (!exchangeContext.hasExchangeNode) { + private void adjustUpStream(PlanNode root, DistributedPlanGenerator.PlanContext context) { + if (!context.hasExchangeNode) { return; } - adjustUpStreamHelper(root, exchangeContext, new HashMap<>()); + adjustUpStreamHelper(root, context, new HashMap<>()); } private void adjustUpStreamHelper( PlanNode root, - DistributedPlanGenerator.PlanContext exchangeContext, + DistributedPlanGenerator.PlanContext context, Map<TRegionReplicaSet, IdentitySinkNode> regionNodemap) { for (PlanNode child : root.getChildren()) { - adjustUpStreamHelper(child, exchangeContext, regionNodemap); + adjustUpStreamHelper(child, context, regionNodemap); if (child instanceof ExchangeNode) { ExchangeNode exchangeNode = (ExchangeNode) child; + IdentitySinkNode identitySinkNode = regionNodemap.computeIfAbsent( - exchangeContext - .getNodeDistribution(exchangeNode.getChild().getPlanNodeId()) - .getRegion(), + context.getNodeDistribution(exchangeNode.getChild().getPlanNodeId()).getRegion(), k -> new IdentitySinkNode(mppQueryContext.getQueryId().genPlanNodeId())); identitySinkNode.addChild(exchangeNode.getChild()); identitySinkNode.addDownStreamChannelLocation( new DownStreamChannelLocation(exchangeNode.getPlanNodeId().toString())); + exchangeNode.setChild(identitySinkNode); exchangeNode.setIndexOfUpstreamSinkHandle(identitySinkNode.getCurrentLastIndex()); } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/node/CollectNode.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/node/CollectNode.java new file mode 100644 index 00000000000..85444b62264 --- /dev/null +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/node/CollectNode.java @@ -0,0 +1,4 @@ +package org.apache.iotdb.db.queryengine.plan.relational.planner.node; + +public class CollectNode { +} diff --git a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/relational/analyzer/AnalyzerTest.java b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/relational/analyzer/AnalyzerTest.java index 77162d0a330..d10b3354605 100644 --- a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/relational/analyzer/AnalyzerTest.java +++ b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/relational/analyzer/AnalyzerTest.java @@ -28,6 +28,7 @@ import org.apache.iotdb.db.queryengine.execution.warnings.WarningCollector; import org.apache.iotdb.db.queryengine.plan.planner.plan.DistributedQueryPlan; import org.apache.iotdb.db.queryengine.plan.planner.plan.LogicalQueryPlan; import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNode; +import org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.ExchangeNode; import org.apache.iotdb.db.queryengine.plan.relational.function.OperatorType; import org.apache.iotdb.db.queryengine.plan.relational.metadata.ColumnHandle; import org.apache.iotdb.db.queryengine.plan.relational.metadata.ColumnSchema; @@ -41,11 +42,13 @@ import org.apache.iotdb.db.queryengine.plan.relational.planner.Symbol; import org.apache.iotdb.db.queryengine.plan.relational.planner.distribute.TableDistributionPlanner; import org.apache.iotdb.db.queryengine.plan.relational.planner.node.FilterNode; import org.apache.iotdb.db.queryengine.plan.relational.planner.node.LimitNode; +import org.apache.iotdb.db.queryengine.plan.relational.planner.node.MergeSortNode; import org.apache.iotdb.db.queryengine.plan.relational.planner.node.OffsetNode; import org.apache.iotdb.db.queryengine.plan.relational.planner.node.OutputNode; import org.apache.iotdb.db.queryengine.plan.relational.planner.node.ProjectNode; import org.apache.iotdb.db.queryengine.plan.relational.planner.node.TableScanNode; import org.apache.iotdb.db.queryengine.plan.relational.security.AccessControl; +import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.Expression; import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.LogicalExpression; import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.Statement; import org.apache.iotdb.db.queryengine.plan.relational.sql.parser.SqlParser; @@ -61,6 +64,7 @@ import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Optional; +import java.util.stream.Collectors; import java.util.stream.Stream; import static org.apache.iotdb.db.queryengine.execution.warnings.WarningCollector.NOOP; @@ -214,12 +218,34 @@ public class AnalyzerTest { Arrays.asList("time", "tag1", "tag2", "tag3", "attr1", "attr2", "s1", "s2", "s3"), tableScanNode.getOutputColumnNames()); assertEquals(9, tableScanNode.getAssignments().size()); - assertEquals(1, tableScanNode.getDeviceEntries().size()); + assertEquals(6, tableScanNode.getDeviceEntries().size()); assertEquals(5, tableScanNode.getIdAndAttributeIndexMap().size()); - assertEquals("(\"time\" > 1)", tableScanNode.getTimePredicate().get().toString()); + assertEquals( + "(\"time\" > 1)", tableScanNode.getTimePredicate().map(Expression::toString).orElse(null)); assertNull(tableScanNode.getPushDownPredicate()); assertEquals(ASC, tableScanNode.getScanOrder()); distributionPlanner = new TableDistributionPlanner(actualAnalysis, logicalQueryPlan, context); + distributedQueryPlan = distributionPlanner.plan(); + assertEquals(3, distributedQueryPlan.getFragments().size()); + assertTrue( + distributedQueryPlan.getFragments().get(0).getPlanNodeTree().getChildren().get(0) + instanceof OutputNode); + OutputNode outputNode = + (OutputNode) + distributedQueryPlan.getFragments().get(0).getPlanNodeTree().getChildren().get(0); + assertTrue(outputNode.getChildren().get(0) instanceof MergeSortNode); + MergeSortNode mergeSortNode = (MergeSortNode) outputNode.getChildren().get(0); + assertEquals( + Arrays.asList("tag1", "tag2", "tag3", "attr1", "attr2"), + mergeSortNode.getOrderingScheme().getOrderBy().stream() + .map(Symbol::getName) + .collect(Collectors.toList())); + assertTrue(mergeSortNode.getChildren().get(0) instanceof ExchangeNode); + assertTrue(mergeSortNode.getChildren().get(1) instanceof TableScanNode); + assertTrue(mergeSortNode.getChildren().get(2) instanceof ExchangeNode); + TableScanNode tableScanNode = (TableScanNode) mergeSortNode.getChildren().get(1); + assertEquals(4, tableScanNode.getDeviceEntries().size()); + assertEquals(Arrays.asList(), tableScanNode.getDeviceEntries().stream().map(d -> d.getDeviceID().toString())); } @Test @@ -378,7 +404,7 @@ public class AnalyzerTest { @Test public void singleTableProjectTest() { // 1. project without filter - sql = "SELECT tag1, attr1, s1 FROM table1"; + sql = "SELECT time, tag1, attr1, s1 FROM table1"; actualAnalysis = analyzeSQL(sql, metadata); assertNotNull(actualAnalysis); assertEquals(1, actualAnalysis.getTables().size()); diff --git a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/relational/analyzer/MockTableModelDataPartition.java b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/relational/analyzer/MockTableModelDataPartition.java new file mode 100644 index 00000000000..90e9d71f46b --- /dev/null +++ b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/relational/analyzer/MockTableModelDataPartition.java @@ -0,0 +1,166 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.db.queryengine.plan.relational.analyzer; + +import org.apache.iotdb.common.rpc.thrift.TConsensusGroupId; +import org.apache.iotdb.common.rpc.thrift.TConsensusGroupType; +import org.apache.iotdb.common.rpc.thrift.TDataNodeLocation; +import org.apache.iotdb.common.rpc.thrift.TEndPoint; +import org.apache.iotdb.common.rpc.thrift.TRegionReplicaSet; +import org.apache.iotdb.common.rpc.thrift.TSeriesPartitionSlot; +import org.apache.iotdb.common.rpc.thrift.TTimePartitionSlot; +import org.apache.iotdb.commons.partition.DataPartition; +import org.apache.iotdb.commons.partition.SchemaPartition; +import org.apache.iotdb.commons.partition.executor.SeriesPartitionExecutor; +import org.apache.iotdb.db.conf.IoTDBDescriptor; + +import com.google.common.collect.ImmutableMap; + +import java.util.Arrays; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +public class MockTableModelDataPartition { + + private static final SeriesPartitionExecutor EXECUTOR = + SeriesPartitionExecutor.getSeriesPartitionExecutor( + IoTDBDescriptor.getInstance().getConfig().getSeriesPartitionExecutorClass(), + IoTDBDescriptor.getInstance().getConfig().getSeriesPartitionSlotNum()); + + private static final String DB_NAME = "testdb"; + + static final String DEVICE_1 = "table1.beijing.A1.ZZ"; + static final String DEVICE_2 = "table1.beijing.A2.XX"; + static final String DEVICE_3 = "table1.shanghai.A3.YY"; + static final String DEVICE_4 = "table1.shanghai.B3.YY"; + static final String DEVICE_5 = "table1.shenzhen.B2.ZZ"; + static final String DEVICE_6 = "table1.shenzhen.B1.XX"; + + static final List<String> DEVICE_1_ATTRIBUTES = Arrays.asList("high", "big"); + static final List<String> DEVICE_2_ATTRIBUTES = Arrays.asList("high", "small"); + static final List<String> DEVICE_3_ATTRIBUTES = Arrays.asList("low", "small"); + static final List<String> DEVICE_4_ATTRIBUTES = Arrays.asList("low", "big"); + static final List<String> DEVICE_5_ATTRIBUTES = Arrays.asList("mid", "big"); + static final List<String> DEVICE_6_ATTRIBUTES = Arrays.asList("mid", "small"); + + private static final TRegionReplicaSet DATA_REGION_GROUP_1 = genDataRegionGroup(10, 1, 2); + private static final TRegionReplicaSet DATA_REGION_GROUP_2 = genDataRegionGroup(11, 3, 2); + private static final TRegionReplicaSet DATA_REGION_GROUP_3 = genDataRegionGroup(12, 2, 1); + private static final TRegionReplicaSet DATA_REGION_GROUP_4 = genDataRegionGroup(13, 1, 3); + + /* + * DataPartition: + * + * device1(startTime:0): DataRegionGroup_1, + * device2(startTime:0): DataRegionGroup_1, + * device3(startTime:0): DataRegionGroup_2, + * device4(startTime:0): DataRegionGroup_2, + * device5(startTime:0): DataRegionGroup_2, + * device5(startTime:100): DataRegionGroup_3, + * device6(startTime:0): DataRegionGroup_2, + * device6(startTime:100): DataRegionGroup_3, + */ + public static DataPartition constructDataPartition() { + DataPartition dataPartition = + new DataPartition( + IoTDBDescriptor.getInstance().getConfig().getSeriesPartitionExecutorClass(), + IoTDBDescriptor.getInstance().getConfig().getSeriesPartitionSlotNum()); + + Map<String, Map<TSeriesPartitionSlot, Map<TTimePartitionSlot, List<TRegionReplicaSet>>>> + dbPartitionMap = new HashMap<>(); + Map<TSeriesPartitionSlot, Map<TTimePartitionSlot, List<TRegionReplicaSet>>> devicePartitionMap = + new HashMap<>(); + + List<TRegionReplicaSet> regionGroup1 = Collections.singletonList(DATA_REGION_GROUP_1); + Map<TTimePartitionSlot, List<TRegionReplicaSet>> dataRegionMap1 = + Collections.singletonMap(new TTimePartitionSlot(0L), regionGroup1); + devicePartitionMap.put(EXECUTOR.getSeriesPartitionSlot(DEVICE_1), dataRegionMap1); + devicePartitionMap.put(EXECUTOR.getSeriesPartitionSlot(DEVICE_2), dataRegionMap1); + + List<TRegionReplicaSet> regionGroup2 = Collections.singletonList(DATA_REGION_GROUP_2); + Map<TTimePartitionSlot, List<TRegionReplicaSet>> dataRegionMap2 = + Collections.singletonMap(new TTimePartitionSlot(0L), regionGroup2); + devicePartitionMap.put(EXECUTOR.getSeriesPartitionSlot(DEVICE_3), dataRegionMap2); + devicePartitionMap.put(EXECUTOR.getSeriesPartitionSlot(DEVICE_4), dataRegionMap2); + + List<TRegionReplicaSet> regionGroup3 = Collections.singletonList(DATA_REGION_GROUP_2); + List<TRegionReplicaSet> regionGroup4 = Collections.singletonList(DATA_REGION_GROUP_3); + Map<TTimePartitionSlot, List<TRegionReplicaSet>> dataRegionMap3 = + ImmutableMap.<TTimePartitionSlot, List<TRegionReplicaSet>>builder() + .put(new TTimePartitionSlot(0L), regionGroup3) + .put(new TTimePartitionSlot(100L), regionGroup4) + .build(); + devicePartitionMap.put(EXECUTOR.getSeriesPartitionSlot(DEVICE_5), dataRegionMap3); + devicePartitionMap.put(EXECUTOR.getSeriesPartitionSlot(DEVICE_6), dataRegionMap3); + + dbPartitionMap.put(DB_NAME, devicePartitionMap); + dataPartition.setDataPartitionMap(dbPartitionMap); + + return dataPartition; + } + + public static SchemaPartition constructSchemaPartition() { + SchemaPartition schemaPartition = + new SchemaPartition( + IoTDBDescriptor.getInstance().getConfig().getSeriesPartitionExecutorClass(), + IoTDBDescriptor.getInstance().getConfig().getSeriesPartitionSlotNum()); + Map<String, Map<TSeriesPartitionSlot, TRegionReplicaSet>> schemaPartitionMap = new HashMap<>(); + + TRegionReplicaSet schemaRegion1 = + new TRegionReplicaSet( + new TConsensusGroupId(TConsensusGroupType.SchemaRegion, 11), + Arrays.asList( + genDataNodeLocation(11, "192.0.1.1"), genDataNodeLocation(12, "192.0.1.2"))); + + TRegionReplicaSet schemaRegion2 = + new TRegionReplicaSet( + new TConsensusGroupId(TConsensusGroupType.SchemaRegion, 21), + Arrays.asList( + genDataNodeLocation(21, "192.0.2.1"), genDataNodeLocation(22, "192.0.2.2"))); + + Map<TSeriesPartitionSlot, TRegionReplicaSet> schemaRegionMap = new HashMap<>(); + schemaRegionMap.put(EXECUTOR.getSeriesPartitionSlot(DEVICE_1), schemaRegion1); + schemaRegionMap.put(EXECUTOR.getSeriesPartitionSlot(DEVICE_2), schemaRegion2); + schemaRegionMap.put(EXECUTOR.getSeriesPartitionSlot(DEVICE_3), schemaRegion2); + schemaPartitionMap.put(DB_NAME, schemaRegionMap); + schemaPartition.setSchemaPartitionMap(schemaPartitionMap); + + return schemaPartition; + } + + private static TRegionReplicaSet genDataRegionGroup( + int regionGroupId, int dataNodeId1, int dataNodeId2) { + return new TRegionReplicaSet( + new TConsensusGroupId(TConsensusGroupType.DataRegion, regionGroupId), + Arrays.asList( + genDataNodeLocation(dataNodeId1, String.format("192.0.%s.1", regionGroupId)), + genDataNodeLocation(dataNodeId2, String.format("192.0.%s.2", regionGroupId)))); + } + + private static TDataNodeLocation genDataNodeLocation(int dataNodeId, String ip) { + return new TDataNodeLocation() + .setDataNodeId(dataNodeId) + .setClientRpcEndPoint(new TEndPoint(ip, 9000)) + .setMPPDataExchangeEndPoint(new TEndPoint(ip, 9001)) + .setInternalEndPoint(new TEndPoint(ip, 9002)); + } +} diff --git a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/relational/analyzer/MockTablePartition.java b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/relational/analyzer/MockTablePartition.java deleted file mode 100644 index ae24f4a219e..00000000000 --- a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/relational/analyzer/MockTablePartition.java +++ /dev/null @@ -1,174 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.iotdb.db.queryengine.plan.relational.analyzer; - -import org.apache.iotdb.common.rpc.thrift.TConsensusGroupId; -import org.apache.iotdb.common.rpc.thrift.TConsensusGroupType; -import org.apache.iotdb.common.rpc.thrift.TDataNodeLocation; -import org.apache.iotdb.common.rpc.thrift.TEndPoint; -import org.apache.iotdb.common.rpc.thrift.TRegionReplicaSet; -import org.apache.iotdb.common.rpc.thrift.TSeriesPartitionSlot; -import org.apache.iotdb.common.rpc.thrift.TTimePartitionSlot; -import org.apache.iotdb.commons.partition.DataPartition; -import org.apache.iotdb.commons.partition.SchemaPartition; -import org.apache.iotdb.commons.partition.executor.SeriesPartitionExecutor; -import org.apache.iotdb.db.conf.IoTDBDescriptor; - -import java.util.ArrayList; -import java.util.Arrays; -import java.util.HashMap; -import java.util.List; -import java.util.Map; - -public class MockTablePartition { - - private static final SeriesPartitionExecutor EXECUTOR = - SeriesPartitionExecutor.getSeriesPartitionExecutor( - IoTDBDescriptor.getInstance().getConfig().getSeriesPartitionExecutorClass(), - IoTDBDescriptor.getInstance().getConfig().getSeriesPartitionSlotNum()); - - private static final String DB_NAME = "root.testdb"; - - private static final String device1 = "root.testdb.d1"; - private static final String device2 = "root.testdb.d22"; - private static final String device3 = "root.testdb.d333"; - private static final String device4 = "root.testdb.d4444"; - private static final String device5 = "root.testdb.d55555"; - private static final String device6 = "root.testdb.d666666"; - - public static DataPartition constructDataPartition() { - TRegionReplicaSet dataRegion1 = - new TRegionReplicaSet( - new TConsensusGroupId(TConsensusGroupType.DataRegion, 1), - Arrays.asList( - genDataNodeLocation(11, "192.0.1.1"), genDataNodeLocation(12, "192.0.1.2"))); - - TRegionReplicaSet dataRegion2 = - new TRegionReplicaSet( - new TConsensusGroupId(TConsensusGroupType.DataRegion, 2), - Arrays.asList( - genDataNodeLocation(21, "192.0.2.1"), genDataNodeLocation(22, "192.0.2.2"))); - - TRegionReplicaSet dataRegion3 = - new TRegionReplicaSet( - new TConsensusGroupId(TConsensusGroupType.DataRegion, 3), - Arrays.asList( - genDataNodeLocation(31, "192.0.3.1"), genDataNodeLocation(32, "192.0.3.2"))); - - TRegionReplicaSet dataRegion4 = - new TRegionReplicaSet( - new TConsensusGroupId(TConsensusGroupType.DataRegion, 4), - Arrays.asList( - genDataNodeLocation(41, "192.0.4.1"), genDataNodeLocation(42, "192.0.4.2"))); - - DataPartition dataPartition = - new DataPartition( - IoTDBDescriptor.getInstance().getConfig().getSeriesPartitionExecutorClass(), - IoTDBDescriptor.getInstance().getConfig().getSeriesPartitionSlotNum()); - - Map<String, Map<TSeriesPartitionSlot, Map<TTimePartitionSlot, List<TRegionReplicaSet>>>> - dataPartitionMap = new HashMap<>(); - Map<TSeriesPartitionSlot, Map<TTimePartitionSlot, List<TRegionReplicaSet>>> sgPartitionMap = - new HashMap<>(); - - List<TRegionReplicaSet> d1DataRegions = new ArrayList<>(); - d1DataRegions.add(dataRegion1); - d1DataRegions.add(dataRegion2); - Map<TTimePartitionSlot, List<TRegionReplicaSet>> d1DataRegionMap = new HashMap<>(); - d1DataRegionMap.put(new TTimePartitionSlot(), d1DataRegions); - - List<TRegionReplicaSet> d2DataRegions = new ArrayList<>(); - d2DataRegions.add(dataRegion3); - Map<TTimePartitionSlot, List<TRegionReplicaSet>> d2DataRegionMap = new HashMap<>(); - d2DataRegionMap.put(new TTimePartitionSlot(), d2DataRegions); - - List<TRegionReplicaSet> d3DataRegions = new ArrayList<>(); - d3DataRegions.add(dataRegion1); - d3DataRegions.add(dataRegion4); - Map<TTimePartitionSlot, List<TRegionReplicaSet>> d3DataRegionMap = new HashMap<>(); - d3DataRegionMap.put(new TTimePartitionSlot(), d3DataRegions); - - List<TRegionReplicaSet> d4DataRegions = new ArrayList<>(); - d4DataRegions.add(dataRegion1); - d4DataRegions.add(dataRegion4); - Map<TTimePartitionSlot, List<TRegionReplicaSet>> d4DataRegionMap = new HashMap<>(); - d4DataRegionMap.put(new TTimePartitionSlot(), d4DataRegions); - - List<TRegionReplicaSet> d5DataRegions = new ArrayList<>(); - d5DataRegions.add(dataRegion4); - Map<TTimePartitionSlot, List<TRegionReplicaSet>> d5DataRegionMap = new HashMap<>(); - d5DataRegionMap.put(new TTimePartitionSlot(), d5DataRegions); - - List<TRegionReplicaSet> d6DataRegions = new ArrayList<>(); - d6DataRegions.add(dataRegion1); - d6DataRegions.add(dataRegion2); - Map<TTimePartitionSlot, List<TRegionReplicaSet>> d6DataRegionMap = new HashMap<>(); - d6DataRegionMap.put(new TTimePartitionSlot(), d6DataRegions); - - sgPartitionMap.put(EXECUTOR.getSeriesPartitionSlot(device1), d1DataRegionMap); - sgPartitionMap.put(EXECUTOR.getSeriesPartitionSlot(device2), d2DataRegionMap); - sgPartitionMap.put(EXECUTOR.getSeriesPartitionSlot(device3), d3DataRegionMap); - sgPartitionMap.put(EXECUTOR.getSeriesPartitionSlot(device4), d4DataRegionMap); - sgPartitionMap.put(EXECUTOR.getSeriesPartitionSlot(device5), d5DataRegionMap); - sgPartitionMap.put(EXECUTOR.getSeriesPartitionSlot(device6), d6DataRegionMap); - - dataPartitionMap.put(DB_NAME, sgPartitionMap); - dataPartition.setDataPartitionMap(dataPartitionMap); - - return dataPartition; - } - - public static SchemaPartition constructSchemaPartition() { - SchemaPartition schemaPartition = - new SchemaPartition( - IoTDBDescriptor.getInstance().getConfig().getSeriesPartitionExecutorClass(), - IoTDBDescriptor.getInstance().getConfig().getSeriesPartitionSlotNum()); - Map<String, Map<TSeriesPartitionSlot, TRegionReplicaSet>> schemaPartitionMap = new HashMap<>(); - - TRegionReplicaSet schemaRegion1 = - new TRegionReplicaSet( - new TConsensusGroupId(TConsensusGroupType.SchemaRegion, 11), - Arrays.asList( - genDataNodeLocation(11, "192.0.1.1"), genDataNodeLocation(12, "192.0.1.2"))); - - TRegionReplicaSet schemaRegion2 = - new TRegionReplicaSet( - new TConsensusGroupId(TConsensusGroupType.SchemaRegion, 21), - Arrays.asList( - genDataNodeLocation(21, "192.0.2.1"), genDataNodeLocation(22, "192.0.2.2"))); - - Map<TSeriesPartitionSlot, TRegionReplicaSet> schemaRegionMap = new HashMap<>(); - schemaRegionMap.put(EXECUTOR.getSeriesPartitionSlot(device1), schemaRegion1); - schemaRegionMap.put(EXECUTOR.getSeriesPartitionSlot(device2), schemaRegion2); - schemaRegionMap.put(EXECUTOR.getSeriesPartitionSlot(device3), schemaRegion2); - schemaPartitionMap.put(DB_NAME, schemaRegionMap); - schemaPartition.setSchemaPartitionMap(schemaPartitionMap); - - return schemaPartition; - } - - private static TDataNodeLocation genDataNodeLocation(int dataNodeId, String ip) { - return new TDataNodeLocation() - .setDataNodeId(dataNodeId) - .setClientRpcEndPoint(new TEndPoint(ip, 9000)) - .setMPPDataExchangeEndPoint(new TEndPoint(ip, 9001)) - .setInternalEndPoint(new TEndPoint(ip, 9002)); - } -} diff --git a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/relational/analyzer/TestMatadata.java b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/relational/analyzer/TestMatadata.java index 55fbeacb86c..b20d1e0dc83 100644 --- a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/relational/analyzer/TestMatadata.java +++ b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/relational/analyzer/TestMatadata.java @@ -47,12 +47,23 @@ import org.apache.tsfile.read.common.type.BinaryType; import org.apache.tsfile.read.common.type.Type; import java.util.Arrays; -import java.util.Collections; import java.util.List; import java.util.Locale; import java.util.Map; import java.util.Optional; +import static org.apache.iotdb.db.queryengine.plan.relational.analyzer.MockTableModelDataPartition.DEVICE_1; +import static org.apache.iotdb.db.queryengine.plan.relational.analyzer.MockTableModelDataPartition.DEVICE_1_ATTRIBUTES; +import static org.apache.iotdb.db.queryengine.plan.relational.analyzer.MockTableModelDataPartition.DEVICE_2; +import static org.apache.iotdb.db.queryengine.plan.relational.analyzer.MockTableModelDataPartition.DEVICE_2_ATTRIBUTES; +import static org.apache.iotdb.db.queryengine.plan.relational.analyzer.MockTableModelDataPartition.DEVICE_3; +import static org.apache.iotdb.db.queryengine.plan.relational.analyzer.MockTableModelDataPartition.DEVICE_3_ATTRIBUTES; +import static org.apache.iotdb.db.queryengine.plan.relational.analyzer.MockTableModelDataPartition.DEVICE_4; +import static org.apache.iotdb.db.queryengine.plan.relational.analyzer.MockTableModelDataPartition.DEVICE_4_ATTRIBUTES; +import static org.apache.iotdb.db.queryengine.plan.relational.analyzer.MockTableModelDataPartition.DEVICE_5; +import static org.apache.iotdb.db.queryengine.plan.relational.analyzer.MockTableModelDataPartition.DEVICE_5_ATTRIBUTES; +import static org.apache.iotdb.db.queryengine.plan.relational.analyzer.MockTableModelDataPartition.DEVICE_6; +import static org.apache.iotdb.db.queryengine.plan.relational.analyzer.MockTableModelDataPartition.DEVICE_6_ATTRIBUTES; import static org.apache.iotdb.db.queryengine.plan.relational.metadata.TableMetadataImpl.getFunctionType; import static org.apache.tsfile.read.common.type.BinaryType.TEXT; import static org.apache.tsfile.read.common.type.BooleanType.BOOLEAN; @@ -195,10 +206,13 @@ public class TestMatadata implements Metadata { QualifiedObjectName tableName, List<Expression> expressionList, List<String> attributeColumns) { - return Collections.singletonList( - new DeviceEntry( - new StringArrayDeviceID("root.testdb", "table1", "t1", "t2", "t3"), - Arrays.asList("a1", "a2"))); + return Arrays.asList( + new DeviceEntry(new StringArrayDeviceID(DEVICE_4.split("\\.")), DEVICE_4_ATTRIBUTES), + new DeviceEntry(new StringArrayDeviceID(DEVICE_1.split("\\.")), DEVICE_1_ATTRIBUTES), + new DeviceEntry(new StringArrayDeviceID(DEVICE_6.split("\\.")), DEVICE_6_ATTRIBUTES), + new DeviceEntry(new StringArrayDeviceID(DEVICE_5.split("\\.")), DEVICE_5_ATTRIBUTES), + new DeviceEntry(new StringArrayDeviceID(DEVICE_3.split("\\.")), DEVICE_3_ATTRIBUTES), + new DeviceEntry(new StringArrayDeviceID(DEVICE_2.split("\\.")), DEVICE_2_ATTRIBUTES)); } @Override @@ -277,9 +291,10 @@ public class TestMatadata implements Metadata { return isNumericType(left) && isNumericType(right); } - private static final DataPartition DATA_PARTITION = MockTablePartition.constructDataPartition(); + private static final DataPartition DATA_PARTITION = + MockTableModelDataPartition.constructDataPartition(); private static final SchemaPartition SCHEMA_PARTITION = - MockTablePartition.constructSchemaPartition(); + MockTableModelDataPartition.constructSchemaPartition(); private static IPartitionFetcher getFakePartitionFetcher() { diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/partition/DataPartition.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/partition/DataPartition.java index edeed746f13..1fe12378834 100644 --- a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/partition/DataPartition.java +++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/partition/DataPartition.java @@ -140,9 +140,20 @@ public class DataPartition extends Partition { * <p>The device id shall be [table, seg1, ....] */ public List<TRegionReplicaSet> getDataRegionReplicaSetWithTimeFilter( - String database, IDeviceID deviceID, Filter timeFilter) { - // TODO implement this interface, @Potato - throw new UnsupportedOperationException(); + String database, IDeviceID deviceId, Filter timeFilter) { + // TODO perfect this interface, @Potato + TSeriesPartitionSlot seriesPartitionSlot = calculateDeviceGroupId(deviceId); + if (!dataPartitionMap.containsKey(database) + || !dataPartitionMap.get(database).containsKey(seriesPartitionSlot)) { + return Collections.singletonList(NOT_ASSIGNED); + } + return dataPartitionMap.get(database).get(seriesPartitionSlot).entrySet().stream() + .filter( + entry -> + TimePartitionUtils.satisfyPartitionStartTime(timeFilter, entry.getKey().startTime)) + .flatMap(entry -> entry.getValue().stream()) + .distinct() + .collect(toList()); } public List<TRegionReplicaSet> getDataRegionReplicaSet(
