This is an automated email from the ASF dual-hosted git repository. caogaofei pushed a commit to branch merge_limits_filters in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit 97484fff3235f7665bac78f8ee5d869a9d09a844 Author: Beyyes <[email protected]> AuthorDate: Wed Aug 28 19:20:42 2024 +0800 add MergeFiltes, MergeLimits optimize rule --- .../planner/iterative/rule/MergeFilters.java | 49 +++++++ .../planner/iterative/rule/MergeLimits.java | 71 +++++++++ .../optimizations/LogicalOptimizeFactory.java | 13 +- .../plan/relational/analyzer/SortTest.java | 26 +--- .../plan/relational/analyzer/SubQueryTest.java | 38 ++++- .../plan/relational/analyzer/TestUtils.java | 160 +++++++++++++++++++++ 6 files changed, 327 insertions(+), 30 deletions(-) diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/iterative/rule/MergeFilters.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/iterative/rule/MergeFilters.java new file mode 100644 index 00000000000..6d620046eaf --- /dev/null +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/iterative/rule/MergeFilters.java @@ -0,0 +1,49 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.iotdb.db.queryengine.plan.relational.planner.iterative.rule; + +import org.apache.iotdb.db.queryengine.plan.relational.planner.iterative.Rule; +import org.apache.iotdb.db.queryengine.plan.relational.planner.node.FilterNode; +import org.apache.iotdb.db.queryengine.plan.relational.utils.matching.Capture; +import org.apache.iotdb.db.queryengine.plan.relational.utils.matching.Captures; +import org.apache.iotdb.db.queryengine.plan.relational.utils.matching.Pattern; + +import static org.apache.iotdb.db.queryengine.plan.relational.planner.ir.IrUtils.combineConjuncts; +import static org.apache.iotdb.db.queryengine.plan.relational.planner.node.Patterns.filter; +import static org.apache.iotdb.db.queryengine.plan.relational.planner.node.Patterns.source; +import static org.apache.iotdb.db.queryengine.plan.relational.utils.matching.Capture.newCapture; + +public class MergeFilters implements Rule<FilterNode> { + private static final Capture<FilterNode> CHILD = newCapture(); + + private static final Pattern<FilterNode> PATTERN = + filter().with(source().matching(filter().capturedAs(CHILD))); + + @Override + public Pattern<FilterNode> getPattern() { + return PATTERN; + } + + @Override + public Result apply(FilterNode parent, Captures captures, Context context) { + FilterNode child = captures.get(CHILD); + + return Result.ofPlanNode( + new FilterNode( + parent.getPlanNodeId(), + child.getChild(), + combineConjuncts(child.getPredicate(), parent.getPredicate()))); + } +} diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/iterative/rule/MergeLimits.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/iterative/rule/MergeLimits.java new file mode 100644 index 00000000000..2ef6116efa4 --- /dev/null +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/iterative/rule/MergeLimits.java @@ -0,0 +1,71 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.iotdb.db.queryengine.plan.relational.planner.iterative.rule; + +import org.apache.iotdb.db.queryengine.plan.relational.planner.iterative.Rule; +import org.apache.iotdb.db.queryengine.plan.relational.planner.node.LimitNode; +import org.apache.iotdb.db.queryengine.plan.relational.utils.matching.Capture; +import org.apache.iotdb.db.queryengine.plan.relational.utils.matching.Captures; +import org.apache.iotdb.db.queryengine.plan.relational.utils.matching.Pattern; + +import static org.apache.iotdb.db.queryengine.plan.relational.planner.node.Patterns.limit; +import static org.apache.iotdb.db.queryengine.plan.relational.planner.node.Patterns.source; +import static org.apache.iotdb.db.queryengine.plan.relational.utils.matching.Capture.newCapture; + +/** + * This rule handles both LimitNode with ties and LimitNode without ties. The parent LimitNode is + * without ties. + * + * <p>If the child LimitNode is without ties, both nodes are merged into a single LimitNode with row + * count being the minimum of their row counts: + * + * <pre> + * - Limit (3) + * - Limit (5) + * </pre> + * + * is transformed into: + * + * <pre> + * - Limit (3) + * </pre> + * + * <p>If the child LimitNode is with ties, the rule's behavior depends on both nodes' row count. If + * parent row count is lower or equal to child row count, child node is removed from the plan: + */ +public class MergeLimits implements Rule<LimitNode> { + private static final Capture<LimitNode> CHILD = newCapture(); + + private static final Pattern<LimitNode> PATTERN = + limit() + // .matching(limit -> !limit.isWithTies()) + .with(source().matching(limit().capturedAs(CHILD))); + + @Override + public Pattern<LimitNode> getPattern() { + return PATTERN; + } + + @Override + public Result apply(LimitNode parent, Captures captures, Context context) { + LimitNode child = captures.get(CHILD); + + return Result.ofPlanNode( + new LimitNode( + parent.getPlanNodeId(), + child.getChild(), + Math.min(parent.getCount(), child.getCount()), + parent.getTiesResolvingScheme())); + } +} diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/optimizations/LogicalOptimizeFactory.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/optimizations/LogicalOptimizeFactory.java index b2705b9e65a..9241e1c8d16 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/optimizations/LogicalOptimizeFactory.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/optimizations/LogicalOptimizeFactory.java @@ -18,8 +18,10 @@ import org.apache.iotdb.db.queryengine.plan.relational.planner.iterative.Iterati import org.apache.iotdb.db.queryengine.plan.relational.planner.iterative.Rule; import org.apache.iotdb.db.queryengine.plan.relational.planner.iterative.RuleStatsRecorder; import org.apache.iotdb.db.queryengine.plan.relational.planner.iterative.rule.InlineProjections; +import org.apache.iotdb.db.queryengine.plan.relational.planner.iterative.rule.MergeFilters; import org.apache.iotdb.db.queryengine.plan.relational.planner.iterative.rule.MergeLimitOverProjectWithSort; import org.apache.iotdb.db.queryengine.plan.relational.planner.iterative.rule.MergeLimitWithSort; +import org.apache.iotdb.db.queryengine.plan.relational.planner.iterative.rule.MergeLimits; import org.apache.iotdb.db.queryengine.plan.relational.planner.iterative.rule.PruneFilterColumns; import org.apache.iotdb.db.queryengine.plan.relational.planner.iterative.rule.PruneLimitColumns; import org.apache.iotdb.db.queryengine.plan.relational.planner.iterative.rule.PruneOffsetColumns; @@ -61,12 +63,15 @@ public class LogicalOptimizeFactory { IterativeOptimizer columnPruningOptimizer = new IterativeOptimizer(plannerContext, new RuleStatsRecorder(), columnPruningRules); - IterativeOptimizer inlineProjectionsOptimizer = + IterativeOptimizer inlineProjectionLimitFiltersOptimizer = new IterativeOptimizer( plannerContext, new RuleStatsRecorder(), ImmutableSet.of( - new InlineProjections(plannerContext), new RemoveRedundantIdentityProjections())); + new InlineProjections(plannerContext), + new RemoveRedundantIdentityProjections(), + new MergeFilters(), + new MergeLimits())); IterativeOptimizer limitPushdownOptimizer = new IterativeOptimizer( @@ -86,11 +91,11 @@ public class LogicalOptimizeFactory { ImmutableList.of( simplifyExpressionOptimizer, columnPruningOptimizer, - inlineProjectionsOptimizer, + inlineProjectionLimitFiltersOptimizer, pushPredicateIntoTableScanOptimizer, // redo columnPrune and inlineProjections after pushPredicateIntoTableScan columnPruningOptimizer, - inlineProjectionsOptimizer, + inlineProjectionLimitFiltersOptimizer, limitPushdownOptimizer, pushLimitOffsetIntoTableScanOptimizer, transformSortToStreamSortOptimizer, diff --git a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/relational/analyzer/SortTest.java b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/relational/analyzer/SortTest.java index 02578aafc7c..a07678ccaaa 100644 --- a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/relational/analyzer/SortTest.java +++ b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/relational/analyzer/SortTest.java @@ -49,10 +49,10 @@ import org.junit.Test; import java.time.ZoneId; import java.util.Arrays; import java.util.List; -import java.util.stream.Collectors; import static org.apache.iotdb.db.queryengine.plan.relational.analyzer.AnalyzerTest.analyzeSQL; import static org.apache.iotdb.db.queryengine.plan.relational.analyzer.LimitOffsetPushDownTest.getChildrenNode; +import static org.apache.iotdb.db.queryengine.plan.relational.analyzer.TestUtils.assertTableScan; import static org.apache.iotdb.db.queryengine.plan.statement.component.Ordering.ASC; import static org.apache.iotdb.db.queryengine.plan.statement.component.Ordering.DESC; import static org.junit.Assert.assertEquals; @@ -747,28 +747,4 @@ public class SortTest { expectedPushDownOffset, isPushLimitToEachDevice); } - - public void assertStreamSortWithFilter( - Ordering expectedOrdering, - long expectedPushDownLimit, - long expectedPushDownOffset, - boolean isPushLimitToEachDevice) {} - - public static void assertTableScan( - TableScanNode tableScanNode, - List<String> deviceEntries, - Ordering ordering, - long pushLimit, - long pushOffset, - boolean pushLimitToEachDevice) { - assertEquals( - deviceEntries, - tableScanNode.getDeviceEntries().stream() - .map(d -> d.getDeviceID().toString()) - .collect(Collectors.toList())); - assertEquals(ordering, tableScanNode.getScanOrder()); - assertEquals(pushLimit, tableScanNode.getPushDownLimit()); - assertEquals(pushOffset, tableScanNode.getPushDownOffset()); - assertEquals(pushLimitToEachDevice, tableScanNode.isPushLimitToEachDevice()); - } } diff --git a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/relational/analyzer/SubQueryTest.java b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/relational/analyzer/SubQueryTest.java index 7d4aa16f82a..f394827ab9f 100644 --- a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/relational/analyzer/SubQueryTest.java +++ b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/relational/analyzer/SubQueryTest.java @@ -44,10 +44,16 @@ import java.util.List; import static org.apache.iotdb.db.queryengine.plan.relational.analyzer.AnalyzerTest.analyzeSQL; import static org.apache.iotdb.db.queryengine.plan.relational.analyzer.LimitOffsetPushDownTest.getChildrenNode; -import static org.apache.iotdb.db.queryengine.plan.relational.analyzer.SortTest.assertTableScan; import static org.apache.iotdb.db.queryengine.plan.relational.analyzer.SortTest.metadata; import static org.apache.iotdb.db.queryengine.plan.relational.analyzer.SortTest.queryId; import static org.apache.iotdb.db.queryengine.plan.relational.analyzer.SortTest.sessionInfo; +import static org.apache.iotdb.db.queryengine.plan.relational.analyzer.TestUtils.ALL_DEVICE_ENTRIES; +import static org.apache.iotdb.db.queryengine.plan.relational.analyzer.TestUtils.DEFAULT_WARNING; +import static org.apache.iotdb.db.queryengine.plan.relational.analyzer.TestUtils.QUERY_CONTEXT; +import static org.apache.iotdb.db.queryengine.plan.relational.analyzer.TestUtils.SESSION_INFO; +import static org.apache.iotdb.db.queryengine.plan.relational.analyzer.TestUtils.TEST_MATADATA; +import static org.apache.iotdb.db.queryengine.plan.relational.analyzer.TestUtils.assertNodeMatches; +import static org.apache.iotdb.db.queryengine.plan.relational.analyzer.TestUtils.assertTableScan; import static org.apache.iotdb.db.queryengine.plan.statement.component.Ordering.ASC; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertTrue; @@ -474,4 +480,34 @@ public class SubQueryTest { 0, true); } + + // test MergeFilters + @Test + public void subQueryTest5() { + sql = "SELECT * FROM (SELECT * FROM table1 WHERE s1>1) WHERE s2>2"; + analysis = analyzeSQL(sql, TEST_MATADATA, QUERY_CONTEXT); + logicalPlanNode = + new LogicalPlanner(QUERY_CONTEXT, TEST_MATADATA, SESSION_INFO, DEFAULT_WARNING) + .plan(analysis) + .getRootNode(); + assertNodeMatches(logicalPlanNode, OutputNode.class, TableScanNode.class); + TableScanNode tableScanNode = (TableScanNode) getChildrenNode(logicalPlanNode, 1); + assertTableScan( + tableScanNode, ALL_DEVICE_ENTRIES, ASC, 0, 0, true, "((\"s1\" > 1) AND (\"s2\" > 2))"); + } + + @Test + public void subQueryTest6() { + sql = "SELECT * FROM (SELECT * FROM table1 limit 10) limit 5"; + analysis = analyzeSQL(sql, TEST_MATADATA, QUERY_CONTEXT); + logicalPlanNode = + new LogicalPlanner(QUERY_CONTEXT, TEST_MATADATA, SESSION_INFO, DEFAULT_WARNING) + .plan(analysis) + .getRootNode(); + assertNodeMatches(logicalPlanNode, OutputNode.class, LimitNode.class, TableScanNode.class); + LimitNode limitNode = (LimitNode) getChildrenNode(logicalPlanNode, 1); + assertEquals(5, limitNode.getCount()); + TableScanNode tableScanNode = (TableScanNode) getChildrenNode(logicalPlanNode, 2); + assertTableScan(tableScanNode, ALL_DEVICE_ENTRIES, ASC, 5, 0, false); + } } diff --git a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/relational/analyzer/TestUtils.java b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/relational/analyzer/TestUtils.java new file mode 100644 index 00000000000..11cb62a59d7 --- /dev/null +++ b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/relational/analyzer/TestUtils.java @@ -0,0 +1,160 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iotdb.db.queryengine.plan.relational.analyzer; + +import org.apache.iotdb.commons.conf.IoTDBConstant; +import org.apache.iotdb.db.protocol.session.IClientSession; +import org.apache.iotdb.db.queryengine.common.MPPQueryContext; +import org.apache.iotdb.db.queryengine.common.QueryId; +import org.apache.iotdb.db.queryengine.common.SessionInfo; +import org.apache.iotdb.db.queryengine.execution.warnings.WarningCollector; +import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNode; +import org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.ExchangeNode; +import org.apache.iotdb.db.queryengine.plan.relational.metadata.Metadata; +import org.apache.iotdb.db.queryengine.plan.relational.planner.Symbol; +import org.apache.iotdb.db.queryengine.plan.relational.planner.node.MergeSortNode; +import org.apache.iotdb.db.queryengine.plan.relational.planner.node.SortNode; +import org.apache.iotdb.db.queryengine.plan.relational.planner.node.TableScanNode; +import org.apache.iotdb.db.queryengine.plan.statement.component.Ordering; + +import java.time.ZoneId; +import java.util.Arrays; +import java.util.Collections; +import java.util.List; +import java.util.stream.Collectors; + +import static org.apache.iotdb.db.queryengine.plan.relational.analyzer.MockTableModelDataPartition.DEVICE_1; +import static org.apache.iotdb.db.queryengine.plan.relational.analyzer.MockTableModelDataPartition.DEVICE_2; +import static org.apache.iotdb.db.queryengine.plan.relational.analyzer.MockTableModelDataPartition.DEVICE_3; +import static org.apache.iotdb.db.queryengine.plan.relational.analyzer.MockTableModelDataPartition.DEVICE_4; +import static org.apache.iotdb.db.queryengine.plan.relational.analyzer.MockTableModelDataPartition.DEVICE_5; +import static org.apache.iotdb.db.queryengine.plan.relational.analyzer.MockTableModelDataPartition.DEVICE_6; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; + +public class TestUtils { + public static final WarningCollector DEFAULT_WARNING = WarningCollector.NOOP; + public static final QueryId QUERY_ID = new QueryId("test_query"); + public static final SessionInfo SESSION_INFO = + new SessionInfo( + 1L, + "iotdb-user", + ZoneId.systemDefault(), + IoTDBConstant.ClientVersion.V_1_0, + "db", + IClientSession.SqlDialect.TABLE); + public static final Metadata TEST_MATADATA = new TestMatadata(); + public static final MPPQueryContext QUERY_CONTEXT = + new MPPQueryContext("only for test", QUERY_ID, SESSION_INFO, null, null); + + public static final List<String> ALL_DEVICE_ENTRIES = + Arrays.asList(DEVICE_4, DEVICE_1, DEVICE_6, DEVICE_5, DEVICE_3, DEVICE_2); + public static final List<String> SHANGHAI_SHENZHEN_DEVICE_ENTRIES = + Arrays.asList(DEVICE_4, DEVICE_6, DEVICE_5, DEVICE_3); + public static final List<String> SHENZHEN_DEVICE_ENTRIES = Arrays.asList(DEVICE_6, DEVICE_5); + public static final List<String> BEIJING_A1_DEVICE_ENTRY = Collections.singletonList(DEVICE_1); + + public static void assertTableScan( + TableScanNode tableScanNode, + List<String> deviceEntries, + Ordering ordering, + long pushLimit, + long pushOffset, + boolean pushLimitToEachDevice, + String pushDownFilter) { + assertEquals( + deviceEntries, + tableScanNode.getDeviceEntries().stream() + .map(d -> d.getDeviceID().toString()) + .collect(Collectors.toList())); + assertEquals(ordering, tableScanNode.getScanOrder()); + assertEquals(pushLimit, tableScanNode.getPushDownLimit()); + assertEquals(pushOffset, tableScanNode.getPushDownOffset()); + if (tableScanNode.getPushDownLimit() > 0) { + assertEquals(pushLimitToEachDevice, tableScanNode.isPushLimitToEachDevice()); + } + if (!pushDownFilter.isEmpty()) { + assert tableScanNode.getPushDownPredicate() != null; + assertEquals(pushDownFilter, tableScanNode.getPushDownPredicate().toString()); + } + } + + public static void assertTableScan( + TableScanNode tableScanNode, + List<String> deviceEntries, + Ordering ordering, + long pushLimit, + long pushOffset, + boolean pushLimitToEachDevice) { + assertTableScan( + tableScanNode, deviceEntries, ordering, pushLimit, pushOffset, pushLimitToEachDevice, ""); + } + + public static void assertMergeSortNode(MergeSortNode mergeSortNode) { + assertTrue(mergeSortNode.getChildren().get(0) instanceof ExchangeNode); + assertTrue(mergeSortNode.getChildren().get(1) instanceof SortNode); + assertTrue(mergeSortNode.getChildren().get(2) instanceof ExchangeNode); + } + + // public static void assertJoinNodeEquals( + // JoinNode joinNode, + // JoinNode.JoinType joinType, + // List<JoinNode.EquiJoinClause> joinCriteria, + // List<Symbol> leftOutputSymbols, + // List<Symbol> rightOutputSymbols) { + // assertEquals(joinType, joinNode.getJoinType()); + // assertEquals(joinCriteria, joinNode.getCriteria()); + // assertEquals(leftOutputSymbols, joinNode.getLeftOutputSymbols()); + // assertEquals(rightOutputSymbols, joinNode.getRightOutputSymbols()); + // } + + public static void assertNodeMatches(PlanNode node, Class... classes) { + int idx = 0; + for (Class clazz : classes) { + assertEquals(clazz, getChildrenNode(node, idx++).getClass()); + } + } + + // public static void assertAnalyzeSemanticException(String sql, String message) { + // try { + // SqlParser sqlParser = new SqlParser(); + // Statement statement = sqlParser.createStatement(sql, ZoneId.systemDefault()); + // SessionInfo session = + // new SessionInfo( + // 0, "test", ZoneId.systemDefault(), "testdb", IClientSession.SqlDialect.TABLE); + // analyzeStatementWithException(statement, TEST_MATADATA, QUERY_CONTEXT, sqlParser, + // session); + // fail("Fail test sql: " + sql); + // } catch (Exception e) { + // Assert.assertTrue(e.getMessage(), e.getMessage().contains(message)); + // } + // } + + public static List<Symbol> buildSymbols(String... names) { + return Arrays.stream(names).map(Symbol::of).collect(Collectors.toList()); + } + + public static PlanNode getChildrenNode(PlanNode root, int idx) { + PlanNode result = root; + for (int i = 1; i <= idx; i++) { + result = result.getChildren().get(0); + } + return result; + } +}
