This is an automated email from the ASF dual-hosted git repository. caogaofei pushed a commit to branch fix_table_limit in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit f0e9c3c7027aa01f5584ec085b5e8dcf08a1f151 Author: Beyyes <[email protected]> AuthorDate: Sat Aug 10 09:15:03 2024 +0800 add sort elimination optimize --- .../plan/relational/planner/LogicalPlanner.java | 4 +- .../distribute/TableDistributedPlanner.java | 18 +---- .../rule/EliminateLimitProjectWithTableScan.java | 61 ++++++++++++++++ .../rule/EliminateLimitWithTableScan.java | 50 +++++++++++++ .../optimizations/DistributedOptimizeFactory.java | 56 +++++++++++++++ ...izeFactory.java => LogicalOptimizeFactory.java} | 4 +- .../PushLimitOffsetIntoTableScan.java | 4 -- .../planner/optimizations/SortElimination.java | 11 --- .../plan/relational/analyzer/AnalyzerTest.java | 81 +++++++++++++++++++++- .../analyzer/LimitOffsetPushDownTest.java | 50 ++++++++++--- .../plan/relational/analyzer/TestMatadata.java | 22 ++++-- 11 files changed, 309 insertions(+), 52 deletions(-) diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/LogicalPlanner.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/LogicalPlanner.java index dd0123db890..44310f9b7e8 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/LogicalPlanner.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/LogicalPlanner.java @@ -40,7 +40,7 @@ import org.apache.iotdb.db.queryengine.plan.relational.metadata.Metadata; import org.apache.iotdb.db.queryengine.plan.relational.planner.node.CreateTableDeviceNode; import org.apache.iotdb.db.queryengine.plan.relational.planner.node.FilterNode; import org.apache.iotdb.db.queryengine.plan.relational.planner.node.OutputNode; -import org.apache.iotdb.db.queryengine.plan.relational.planner.optimizations.OptimizeFactory; +import org.apache.iotdb.db.queryengine.plan.relational.planner.optimizations.LogicalOptimizeFactory; import org.apache.iotdb.db.queryengine.plan.relational.planner.optimizations.PlanOptimizer; import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.AbstractQueryDevice; import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.CountDevice; @@ -88,7 +88,7 @@ public class LogicalPlanner { this.sessionInfo = requireNonNull(sessionInfo, "session is null"); this.warningCollector = requireNonNull(warningCollector, "warningCollector is null"); this.planOptimizers = - new OptimizeFactory(new PlannerContext(metadata, new InternalTypeManager())) + new LogicalOptimizeFactory(new PlannerContext(metadata, new InternalTypeManager())) .getPlanOptimizers(); } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/distribute/TableDistributedPlanner.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/distribute/TableDistributedPlanner.java index bc6426f8b35..363bcde7a44 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/distribute/TableDistributedPlanner.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/distribute/TableDistributedPlanner.java @@ -30,19 +30,12 @@ import org.apache.iotdb.db.queryengine.plan.relational.analyzer.Analysis; import org.apache.iotdb.db.queryengine.plan.relational.execution.querystats.PlanOptimizersStatsCollector; import org.apache.iotdb.db.queryengine.plan.relational.planner.PlannerContext; import org.apache.iotdb.db.queryengine.plan.relational.planner.SymbolAllocator; -import org.apache.iotdb.db.queryengine.plan.relational.planner.iterative.IterativeOptimizer; -import org.apache.iotdb.db.queryengine.plan.relational.planner.iterative.RuleStatsRecorder; -import org.apache.iotdb.db.queryengine.plan.relational.planner.iterative.rule.MergeLimitOverProjectWithMergeSort; -import org.apache.iotdb.db.queryengine.plan.relational.planner.iterative.rule.MergeLimitWithMergeSort; import org.apache.iotdb.db.queryengine.plan.relational.planner.node.OutputNode; +import org.apache.iotdb.db.queryengine.plan.relational.planner.optimizations.DistributedOptimizeFactory; import org.apache.iotdb.db.queryengine.plan.relational.planner.optimizations.PlanOptimizer; -import org.apache.iotdb.db.queryengine.plan.relational.planner.optimizations.SortElimination; import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.Query; import org.apache.iotdb.db.queryengine.plan.relational.type.InternalTypeManager; -import com.google.common.collect.ImmutableSet; - -import java.util.Arrays; import java.util.Collections; import java.util.HashMap; import java.util.List; @@ -64,13 +57,8 @@ public class TableDistributedPlanner { this.logicalQueryPlan = logicalQueryPlan; this.mppQueryContext = mppQueryContext; this.optimizers = - Arrays.asList( - new IterativeOptimizer( - new PlannerContext(null, new InternalTypeManager()), - new RuleStatsRecorder(), - ImmutableSet.of( - new MergeLimitWithMergeSort(), new MergeLimitOverProjectWithMergeSort())), - new SortElimination()); + new DistributedOptimizeFactory(new PlannerContext(null, new InternalTypeManager())) + .getPlanOptimizers(); } public DistributedQueryPlan plan() { diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/iterative/rule/EliminateLimitProjectWithTableScan.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/iterative/rule/EliminateLimitProjectWithTableScan.java new file mode 100644 index 00000000000..24448b53ed3 --- /dev/null +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/iterative/rule/EliminateLimitProjectWithTableScan.java @@ -0,0 +1,61 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.iotdb.db.queryengine.plan.relational.planner.iterative.rule; + +import org.apache.iotdb.db.queryengine.plan.relational.planner.iterative.Rule; +import org.apache.iotdb.db.queryengine.plan.relational.planner.node.LimitNode; +import org.apache.iotdb.db.queryengine.plan.relational.planner.node.ProjectNode; +import org.apache.iotdb.db.queryengine.plan.relational.planner.node.TableScanNode; +import org.apache.iotdb.db.queryengine.plan.relational.utils.matching.Capture; +import org.apache.iotdb.db.queryengine.plan.relational.utils.matching.Captures; +import org.apache.iotdb.db.queryengine.plan.relational.utils.matching.Pattern; + +import static org.apache.iotdb.db.queryengine.plan.relational.planner.node.Patterns.limit; +import static org.apache.iotdb.db.queryengine.plan.relational.planner.node.Patterns.project; +import static org.apache.iotdb.db.queryengine.plan.relational.planner.node.Patterns.source; +import static org.apache.iotdb.db.queryengine.plan.relational.planner.node.Patterns.tableScan; +import static org.apache.iotdb.db.queryengine.plan.relational.utils.matching.Capture.newCapture; + +public class EliminateLimitProjectWithTableScan implements Rule<LimitNode> { + private static final Capture<ProjectNode> PROJECT = newCapture(); + private static final Capture<TableScanNode> TABLE_SCAN_NODE = newCapture(); + + private static final Pattern<LimitNode> PATTERN = + limit() + .with( + source() + .matching( + project() + .capturedAs(PROJECT) + // .matching(ProjectNode::isIdentity) + .with(source().matching(tableScan().capturedAs(TABLE_SCAN_NODE))))); + + @Override + public Pattern<LimitNode> getPattern() { + return PATTERN; + } + + @Override + public Rule.Result apply(LimitNode parent, Captures captures, Rule.Context context) { + ProjectNode projectNode = captures.get(PROJECT); + TableScanNode tableScanNode = captures.get(TABLE_SCAN_NODE); + + if (parent.getCount() == tableScanNode.getPushDownLimit() + && !tableScanNode.isPushLimitToEachDevice()) { + return Rule.Result.ofPlanNode(projectNode); + } else { + return Rule.Result.empty(); + } + } +} diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/iterative/rule/EliminateLimitWithTableScan.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/iterative/rule/EliminateLimitWithTableScan.java new file mode 100644 index 00000000000..d0d806eacb3 --- /dev/null +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/iterative/rule/EliminateLimitWithTableScan.java @@ -0,0 +1,50 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.iotdb.db.queryengine.plan.relational.planner.iterative.rule; + +import org.apache.iotdb.db.queryengine.plan.relational.planner.iterative.Rule; +import org.apache.iotdb.db.queryengine.plan.relational.planner.node.LimitNode; +import org.apache.iotdb.db.queryengine.plan.relational.planner.node.TableScanNode; +import org.apache.iotdb.db.queryengine.plan.relational.utils.matching.Capture; +import org.apache.iotdb.db.queryengine.plan.relational.utils.matching.Captures; +import org.apache.iotdb.db.queryengine.plan.relational.utils.matching.Pattern; + +import static org.apache.iotdb.db.queryengine.plan.relational.planner.node.Patterns.limit; +import static org.apache.iotdb.db.queryengine.plan.relational.planner.node.Patterns.source; +import static org.apache.iotdb.db.queryengine.plan.relational.planner.node.Patterns.tableScan; +import static org.apache.iotdb.db.queryengine.plan.relational.utils.matching.Capture.newCapture; + +public class EliminateLimitWithTableScan implements Rule<LimitNode> { + private static final Capture<TableScanNode> CHILD = newCapture(); + + private static final Pattern<LimitNode> PATTERN = + limit().with(source().matching(tableScan().capturedAs(CHILD))); + + @Override + public Pattern<LimitNode> getPattern() { + return PATTERN; + } + + @Override + public Rule.Result apply(LimitNode parent, Captures captures, Rule.Context context) { + TableScanNode tableScanNode = captures.get(CHILD); + + if (parent.getCount() == tableScanNode.getPushDownLimit() + && !tableScanNode.isPushLimitToEachDevice()) { + return Rule.Result.ofPlanNode(tableScanNode); + } else { + return Rule.Result.empty(); + } + } +} diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/optimizations/DistributedOptimizeFactory.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/optimizations/DistributedOptimizeFactory.java new file mode 100644 index 00000000000..7c165318406 --- /dev/null +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/optimizations/DistributedOptimizeFactory.java @@ -0,0 +1,56 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.iotdb.db.queryengine.plan.relational.planner.optimizations; + +import org.apache.iotdb.db.queryengine.plan.relational.planner.PlannerContext; +import org.apache.iotdb.db.queryengine.plan.relational.planner.iterative.IterativeOptimizer; +import org.apache.iotdb.db.queryengine.plan.relational.planner.iterative.RuleStatsRecorder; +import org.apache.iotdb.db.queryengine.plan.relational.planner.iterative.rule.EliminateLimitProjectWithTableScan; +import org.apache.iotdb.db.queryengine.plan.relational.planner.iterative.rule.EliminateLimitWithTableScan; +import org.apache.iotdb.db.queryengine.plan.relational.planner.iterative.rule.MergeLimitOverProjectWithMergeSort; +import org.apache.iotdb.db.queryengine.plan.relational.planner.iterative.rule.MergeLimitWithMergeSort; + +import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableSet; + +import java.util.List; + +public class DistributedOptimizeFactory { + private final List<PlanOptimizer> planOptimizers; + + public DistributedOptimizeFactory(PlannerContext plannerContext) { + IterativeOptimizer topKOptimizer = + new IterativeOptimizer( + plannerContext, + new RuleStatsRecorder(), + ImmutableSet.of( + new MergeLimitWithMergeSort(), new MergeLimitOverProjectWithMergeSort())); + + PlanOptimizer sortElimination = new SortElimination(); + + IterativeOptimizer limitElimination = + new IterativeOptimizer( + plannerContext, + new RuleStatsRecorder(), + ImmutableSet.of( + new EliminateLimitWithTableScan(), new EliminateLimitProjectWithTableScan())); + + this.planOptimizers = ImmutableList.of(topKOptimizer, sortElimination, limitElimination); + } + + public List<PlanOptimizer> getPlanOptimizers() { + return planOptimizers; + } +} diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/optimizations/OptimizeFactory.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/optimizations/LogicalOptimizeFactory.java similarity index 97% rename from iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/optimizations/OptimizeFactory.java rename to iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/optimizations/LogicalOptimizeFactory.java index 2a66a20b6d3..b2705b9e65a 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/optimizations/OptimizeFactory.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/optimizations/LogicalOptimizeFactory.java @@ -38,11 +38,11 @@ import com.google.common.collect.ImmutableSet; import java.util.List; import java.util.Set; -public class OptimizeFactory { +public class LogicalOptimizeFactory { private final List<PlanOptimizer> planOptimizers; - public OptimizeFactory(PlannerContext plannerContext) { + public LogicalOptimizeFactory(PlannerContext plannerContext) { PlanOptimizer simplifyExpressionOptimizer = new SimplifyExpressions(); PlanOptimizer pushPredicateIntoTableScanOptimizer = new PushPredicateIntoTableScan(); diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/optimizations/PushLimitOffsetIntoTableScan.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/optimizations/PushLimitOffsetIntoTableScan.java index 08e5a33bdc0..3b0ee37034f 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/optimizations/PushLimitOffsetIntoTableScan.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/optimizations/PushLimitOffsetIntoTableScan.java @@ -261,10 +261,6 @@ public class PushLimitOffsetIntoTableScan implements PlanOptimizer { return pushLimitToEachDevice; } - public void setPushLimitToE5achDevice(boolean pushLimitToEachDevice) { - this.pushLimitToEachDevice = pushLimitToEachDevice; - } - public TableScanNode getTableScanNode() { return tableScanNode; } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/optimizations/SortElimination.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/optimizations/SortElimination.java index 96cb3f6fe05..41231ff7f27 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/optimizations/SortElimination.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/optimizations/SortElimination.java @@ -57,7 +57,6 @@ public class SortElimination implements PlanOptimizer { Context newContext = new Context(); PlanNode child = node.getChild().accept(this, newContext); OrderingScheme orderingScheme = node.getOrderingScheme(); - TableScanNode tableScanNode = newContext.getTableScanNode(); if (newContext.getTotalDeviceEntrySize() == 1 && TIMESTAMP_STR.equalsIgnoreCase(orderingScheme.getOrderBy().get(0).getName())) { return child; @@ -75,14 +74,12 @@ public class SortElimination implements PlanOptimizer { @Override public PlanNode visitTableScan(TableScanNode node, Context context) { context.addDeviceEntrySize(node.getDeviceEntries().size()); - context.setTableScanNode(node); return node; } } private static class Context { private int totalDeviceEntrySize = 0; - private TableScanNode tableScanNode; Context() {} @@ -93,13 +90,5 @@ public class SortElimination implements PlanOptimizer { public int getTotalDeviceEntrySize() { return totalDeviceEntrySize; } - - public TableScanNode getTableScanNode() { - return tableScanNode; - } - - public void setTableScanNode(TableScanNode tableScanNode) { - this.tableScanNode = tableScanNode; - } } } diff --git a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/relational/analyzer/AnalyzerTest.java b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/relational/analyzer/AnalyzerTest.java index b60ca014fab..ae9996eb3db 100644 --- a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/relational/analyzer/AnalyzerTest.java +++ b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/relational/analyzer/AnalyzerTest.java @@ -37,8 +37,10 @@ import org.apache.iotdb.db.queryengine.common.SessionInfo; import org.apache.iotdb.db.queryengine.execution.warnings.WarningCollector; import org.apache.iotdb.db.queryengine.plan.planner.plan.DistributedQueryPlan; import org.apache.iotdb.db.queryengine.plan.planner.plan.LogicalQueryPlan; +import org.apache.iotdb.db.queryengine.plan.planner.plan.PlanFragment; import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNode; import org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.ExchangeNode; +import org.apache.iotdb.db.queryengine.plan.planner.plan.node.sink.IdentitySinkNode; import org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.RelationalInsertRowNode; import org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.RelationalInsertTabletNode; import org.apache.iotdb.db.queryengine.plan.relational.function.OperatorType; @@ -48,7 +50,6 @@ import org.apache.iotdb.db.queryengine.plan.relational.metadata.ITableDeviceSche import org.apache.iotdb.db.queryengine.plan.relational.metadata.Metadata; import org.apache.iotdb.db.queryengine.plan.relational.metadata.OperatorNotFoundException; import org.apache.iotdb.db.queryengine.plan.relational.metadata.QualifiedObjectName; -import org.apache.iotdb.db.queryengine.plan.relational.metadata.TableHandle; import org.apache.iotdb.db.queryengine.plan.relational.metadata.TableSchema; import org.apache.iotdb.db.queryengine.plan.relational.planner.LogicalPlanner; import org.apache.iotdb.db.queryengine.plan.relational.planner.Symbol; @@ -136,8 +137,6 @@ public class AnalyzerTest { Metadata metadata = Mockito.mock(Metadata.class); Mockito.when(metadata.tableExists(Mockito.any())).thenReturn(true); - TableHandle tableHandle = Mockito.mock(TableHandle.class); - Map<String, ColumnHandle> map = new HashMap<>(); TableSchema tableSchema = Mockito.mock(TableSchema.class); Mockito.when(tableSchema.getTableName()).thenReturn("table1"); @@ -850,6 +849,82 @@ public class AnalyzerTest { .toString()); } + @Test + public void limitEliminationTest() { + sql = "SELECT s1+s3 FROM table1 limit 10"; + context = new MPPQueryContext(sql, queryId, sessionInfo, null, null); + actualAnalysis = analyzeSQL(sql, metadata); + logicalQueryPlan = + new LogicalPlanner(context, metadata, sessionInfo, warningCollector).plan(actualAnalysis); + // logical plan: `OutputNode - ProjectNode - LimitNode - TableScanNode` + rootNode = logicalQueryPlan.getRootNode(); + assertTrue(rootNode instanceof OutputNode); + assertTrue(getChildrenNode(rootNode, 1) instanceof ProjectNode); + assertTrue(getChildrenNode(rootNode, 2) instanceof LimitNode); + assertTrue(getChildrenNode(rootNode, 3) instanceof TableScanNode); + // distributed plan: `IdentitySink - OutputNode - ProjectNode - LimitNode - CollectNode - + // TableScanNode`, `IdentitySink - TableScan` + distributionPlanner = new TableDistributedPlanner(actualAnalysis, logicalQueryPlan, context); + distributedQueryPlan = distributionPlanner.plan(); + assertTrue( + getChildrenNode(distributedQueryPlan.getFragments().get(0).getPlanNodeTree(), 4) + instanceof CollectNode); + CollectNode collectNode = + (CollectNode) + getChildrenNode(distributedQueryPlan.getFragments().get(0).getPlanNodeTree(), 4); + assertTrue(collectNode.getChildren().get(1) instanceof TableScanNode); + tableScanNode = (TableScanNode) collectNode.getChildren().get(1); + assertEquals(10, tableScanNode.getPushDownLimit()); + assertFalse(tableScanNode.isPushLimitToEachDevice()); + assertTrue( + distributedQueryPlan.getFragments().get(0).getPlanNodeTree() instanceof IdentitySinkNode); + IdentitySinkNode identitySinkNode = + (IdentitySinkNode) distributedQueryPlan.getFragments().get(1).getPlanNodeTree(); + tableScanNode = (TableScanNode) getChildrenNode(identitySinkNode, 1); + assertEquals(10, tableScanNode.getPushDownLimit()); + assertFalse(tableScanNode.isPushLimitToEachDevice()); + + sql = "SELECT s1,s1+s3 FROM table1 where tag1='beijing' and tag2='A1' limit 10"; + context = new MPPQueryContext(sql, queryId, sessionInfo, null, null); + actualAnalysis = analyzeSQL(sql, metadata); + logicalQueryPlan = + new LogicalPlanner(context, metadata, sessionInfo, warningCollector).plan(actualAnalysis); + // logical plan: `OutputNode - ProjectNode - LimitNode - TableScanNode` + rootNode = logicalQueryPlan.getRootNode(); + assertTrue(rootNode instanceof OutputNode); + assertTrue(getChildrenNode(rootNode, 1) instanceof ProjectNode); + assertTrue(getChildrenNode(rootNode, 2) instanceof LimitNode); + assertTrue(getChildrenNode(rootNode, 3) instanceof TableScanNode); + // distributed plan: `IdentitySink - OutputNode - ProjectNode - TableScanNode` + distributionPlanner = new TableDistributedPlanner(actualAnalysis, logicalQueryPlan, context); + distributedQueryPlan = distributionPlanner.plan(); + assertTrue( + distributedQueryPlan.getFragments().get(0).getPlanNodeTree() instanceof IdentitySinkNode); + identitySinkNode = + (IdentitySinkNode) distributedQueryPlan.getFragments().get(0).getPlanNodeTree(); + assertTrue(getChildrenNode(identitySinkNode, 3) instanceof TableScanNode); + tableScanNode = (TableScanNode) getChildrenNode(identitySinkNode, 3); + assertEquals(10, tableScanNode.getPushDownLimit()); + assertFalse(tableScanNode.isPushLimitToEachDevice()); + + sql = "SELECT diff(s1) FROM table1 where tag1='beijing' and tag2='A1' limit 10"; + context = new MPPQueryContext(sql, queryId, sessionInfo, null, null); + actualAnalysis = analyzeSQL(sql, metadata); + logicalQueryPlan = + new LogicalPlanner(context, metadata, sessionInfo, warningCollector).plan(actualAnalysis); + // logical plan: `OutputNode - ProjectNode - LimitNode - TableScanNode` + rootNode = logicalQueryPlan.getRootNode(); + // distributed plan: `IdentitySink - OutputNode - ProjectNode - LimitNode - TableScanNode` + distributionPlanner = new TableDistributedPlanner(actualAnalysis, logicalQueryPlan, context); + distributedQueryPlan = distributionPlanner.plan(); + List<PlanFragment> fragments = distributedQueryPlan.getFragments(); + identitySinkNode = (IdentitySinkNode) fragments.get(0).getPlanNodeTree(); + assertTrue(getChildrenNode(identitySinkNode, 3) instanceof LimitNode); + assertTrue(getChildrenNode(identitySinkNode, 4) instanceof TableScanNode); + tableScanNode = (TableScanNode) getChildrenNode(identitySinkNode, 4); + assertEquals(0, tableScanNode.getPushDownLimit()); + } + @Test public void duplicateProjectionsTest() { sql = "SELECT Time,time,s1+1,S1+1,tag1,TAG1 FROM table1"; diff --git a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/relational/analyzer/LimitOffsetPushDownTest.java b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/relational/analyzer/LimitOffsetPushDownTest.java index 09e3d763d56..d5045ac4163 100644 --- a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/relational/analyzer/LimitOffsetPushDownTest.java +++ b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/relational/analyzer/LimitOffsetPushDownTest.java @@ -28,6 +28,7 @@ import org.apache.iotdb.db.queryengine.plan.relational.planner.LogicalPlanner; import org.apache.iotdb.db.queryengine.plan.relational.planner.distribute.TableDistributedPlanner; import org.apache.iotdb.db.queryengine.plan.relational.planner.node.CollectNode; import org.apache.iotdb.db.queryengine.plan.relational.planner.node.LimitNode; +import org.apache.iotdb.db.queryengine.plan.relational.planner.node.ProjectNode; import org.apache.iotdb.db.queryengine.plan.relational.planner.node.StreamSortNode; import org.apache.iotdb.db.queryengine.plan.relational.planner.node.TableScanNode; import org.apache.iotdb.db.queryengine.plan.relational.planner.node.TopKNode; @@ -103,9 +104,9 @@ public class LimitOffsetPushDownTest { assertFalse(tableScanNode.isPushLimitToEachDevice()); } - // order by all tags, limit can be pushed into TableScan, pushLimitToEachDevice==false + // order by all IDs, limit can be pushed into TableScan, pushLimitToEachDevice==false @Test - public void orderByAllTagsTest() { + public void orderByAllIDsTest() { sql = "SELECT time, tag3, cast(s2 AS double) FROM table1 where s1>1 order by tag2 desc, tag1 asc, attr1 desc, tag3 desc, time desc, s1+s3 asc offset 5 limit 10"; context = new MPPQueryContext(sql, queryId, sessionInfo, null, null); @@ -117,30 +118,63 @@ public class LimitOffsetPushDownTest { // LogicalPlan: `Output - Offset - Limit - Project - StreamSort - Project - TableScan` assertTrue(getChildrenNode(rootNode, 6) instanceof TableScanNode); - // DistributePlan: `IdentitySink - Output - Offset - Project - TopK - Limit - Project - - // TableScan` + // DistributePlan: `IdentitySink - Output - Offset - Project - TopK - Project - TableScan` distributionPlanner = new TableDistributedPlanner(actualAnalysis, logicalQueryPlan, context); distributedQueryPlan = distributionPlanner.plan(); assertEquals(3, distributedQueryPlan.getFragments().size()); TopKNode topKNode = (TopKNode) getChildrenNode(distributedQueryPlan.getFragments().get(0).getPlanNodeTree(), 4); assertTrue(topKNode.getChildren().get(0) instanceof ExchangeNode); - assertTrue(topKNode.getChildren().get(1) instanceof LimitNode); + assertTrue(topKNode.getChildren().get(1) instanceof ProjectNode); assertTrue(topKNode.getChildren().get(2) instanceof ExchangeNode); - tableScanNode = (TableScanNode) getChildrenNode(topKNode.getChildren().get(1), 2); + tableScanNode = (TableScanNode) getChildrenNode(topKNode.getChildren().get(1), 1); assertEquals(4, tableScanNode.getDeviceEntries().size()); assertEquals(DESC, tableScanNode.getScanOrder()); assertTrue(tableScanNode.getPushDownLimit() == 15 && tableScanNode.getPushDownOffset() == 0); assertFalse(tableScanNode.isPushLimitToEachDevice()); - // `Identity - Limit - Project - TableScan` + // `Identity - Project - TableScan` tableScanNode = (TableScanNode) - getChildrenNode(distributedQueryPlan.getFragments().get(1).getPlanNodeTree(), 3); + getChildrenNode(distributedQueryPlan.getFragments().get(1).getPlanNodeTree(), 2); assertEquals(2, tableScanNode.getDeviceEntries().size()); assertEquals(DESC, tableScanNode.getScanOrder()); assertTrue(tableScanNode.getPushDownLimit() == 15 && tableScanNode.getPushDownOffset() == 0); assertFalse(tableScanNode.isPushLimitToEachDevice()); + + sql = "SELECT * FROM table1 order by tag2 desc, tag1 asc, attr1 desc, tag3 desc limit 10"; + context = new MPPQueryContext(sql, queryId, sessionInfo, null, null); + actualAnalysis = analyzeSQL(sql, metadata); + logicalQueryPlan = + new LogicalPlanner(context, metadata, sessionInfo, WarningCollector.NOOP) + .plan(actualAnalysis); + rootNode = logicalQueryPlan.getRootNode(); + // LogicalPlan: `Output - Limit - StreamSort - TableScan` + assertTrue(getChildrenNode(rootNode, 3) instanceof TableScanNode); + + // DistributePlan: `IdentitySink - Output - TopK - TableScan` + distributionPlanner = new TableDistributedPlanner(actualAnalysis, logicalQueryPlan, context); + distributedQueryPlan = distributionPlanner.plan(); + assertEquals(3, distributedQueryPlan.getFragments().size()); + topKNode = + (TopKNode) getChildrenNode(distributedQueryPlan.getFragments().get(0).getPlanNodeTree(), 2); + assertTrue(topKNode.getChildren().get(0) instanceof ExchangeNode); + assertTrue(topKNode.getChildren().get(1) instanceof TableScanNode); + assertTrue(topKNode.getChildren().get(2) instanceof ExchangeNode); + tableScanNode = (TableScanNode) topKNode.getChildren().get(1); + assertEquals(4, tableScanNode.getDeviceEntries().size()); + assertEquals(ASC, tableScanNode.getScanOrder()); + assertTrue(tableScanNode.getPushDownLimit() == 10 && tableScanNode.getPushDownOffset() == 0); + assertFalse(tableScanNode.isPushLimitToEachDevice()); + + // `Identity - TableScan` + tableScanNode = + (TableScanNode) + distributedQueryPlan.getFragments().get(1).getPlanNodeTree().getChildren().get(0); + assertEquals(2, tableScanNode.getDeviceEntries().size()); + assertEquals(ASC, tableScanNode.getScanOrder()); + assertTrue(tableScanNode.getPushDownLimit() == 10 && tableScanNode.getPushDownOffset() == 0); + assertFalse(tableScanNode.isPushLimitToEachDevice()); } // order by some tags, limit can be pushed into TableScan, pushLimitToEachDevice==true diff --git a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/relational/analyzer/TestMatadata.java b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/relational/analyzer/TestMatadata.java index c98178d79c0..27b2ab58668 100644 --- a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/relational/analyzer/TestMatadata.java +++ b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/relational/analyzer/TestMatadata.java @@ -47,6 +47,7 @@ import org.apache.tsfile.read.common.type.StringType; import org.apache.tsfile.read.common.type.Type; import java.util.Arrays; +import java.util.Collections; import java.util.List; import java.util.Locale; import java.util.Map; @@ -207,13 +208,20 @@ public class TestMatadata implements Metadata { List<Expression> expressionList, List<String> attributeColumns, MPPQueryContext context) { - return Arrays.asList( - new DeviceEntry(new StringArrayDeviceID(DEVICE_4.split("\\.")), DEVICE_4_ATTRIBUTES), - new DeviceEntry(new StringArrayDeviceID(DEVICE_1.split("\\.")), DEVICE_1_ATTRIBUTES), - new DeviceEntry(new StringArrayDeviceID(DEVICE_6.split("\\.")), DEVICE_6_ATTRIBUTES), - new DeviceEntry(new StringArrayDeviceID(DEVICE_5.split("\\.")), DEVICE_5_ATTRIBUTES), - new DeviceEntry(new StringArrayDeviceID(DEVICE_3.split("\\.")), DEVICE_3_ATTRIBUTES), - new DeviceEntry(new StringArrayDeviceID(DEVICE_2.split("\\.")), DEVICE_2_ATTRIBUTES)); + if (expressionList.size() == 2 + && expressionList.get(0).toString().equals("(\"tag1\" = 'beijing')") + && expressionList.get(1).toString().equals("(\"tag2\" = 'A1')")) { + return Collections.singletonList( + new DeviceEntry(new StringArrayDeviceID(DEVICE_1.split("\\.")), DEVICE_1_ATTRIBUTES)); + } else { + return Arrays.asList( + new DeviceEntry(new StringArrayDeviceID(DEVICE_4.split("\\.")), DEVICE_4_ATTRIBUTES), + new DeviceEntry(new StringArrayDeviceID(DEVICE_1.split("\\.")), DEVICE_1_ATTRIBUTES), + new DeviceEntry(new StringArrayDeviceID(DEVICE_6.split("\\.")), DEVICE_6_ATTRIBUTES), + new DeviceEntry(new StringArrayDeviceID(DEVICE_5.split("\\.")), DEVICE_5_ATTRIBUTES), + new DeviceEntry(new StringArrayDeviceID(DEVICE_3.split("\\.")), DEVICE_3_ATTRIBUTES), + new DeviceEntry(new StringArrayDeviceID(DEVICE_2.split("\\.")), DEVICE_2_ATTRIBUTES)); + } } @Override
