This is an automated email from the ASF dual-hosted git repository. zhihao pushed a commit to branch perf/szh/push_limit_to_table_scan in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit a1313099273a03e4bfa79bed98436582898f0484 Author: Sh-Zh-7 <[email protected]> AuthorDate: Thu Feb 26 15:17:59 2026 +0800 TopK push down limit to table scan node prototype. --- .../it/db/it/IoTDBWindowFunction3IT.java | 124 +++++++++++++++++++++ .../plan/planner/TableOperatorGenerator.java | 44 +++++--- .../distribute/TableDistributedPlanGenerator.java | 38 ++++++- .../relational/planner/node/TopKRankingNode.java | 23 +++- .../planner/WindowFunctionOptimizationTest.java | 43 +++++++ 5 files changed, 253 insertions(+), 19 deletions(-) diff --git a/integration-test/src/test/java/org/apache/iotdb/relational/it/db/it/IoTDBWindowFunction3IT.java b/integration-test/src/test/java/org/apache/iotdb/relational/it/db/it/IoTDBWindowFunction3IT.java index d461f3a11fe..a0e23ea1996 100644 --- a/integration-test/src/test/java/org/apache/iotdb/relational/it/db/it/IoTDBWindowFunction3IT.java +++ b/integration-test/src/test/java/org/apache/iotdb/relational/it/db/it/IoTDBWindowFunction3IT.java @@ -51,6 +51,17 @@ public class IoTDBWindowFunction3IT { "insert into demo values (2021-01-01T09:10:00, 'd1', 1)", "insert into demo values (2021-01-01T09:08:00, 'd2', 2)", "insert into demo values (2021-01-01T09:15:00, 'd2', 4)", + "create table multi_tag (region string tag, plant string tag, temp double field)", + "insert into multi_tag values (2021-01-01T08:00:00, 'east', 'A', 10)", + "insert into multi_tag values (2021-01-01T09:00:00, 'east', 'A', 20)", + "insert into multi_tag values (2021-01-01T10:00:00, 'east', 'A', 15)", + "insert into multi_tag values (2021-01-01T11:00:00, 'east', 'A', 25)", + "insert into multi_tag values (2021-01-01T08:30:00, 'east', 'B', 30)", + "insert into multi_tag values (2021-01-01T09:30:00, 'east', 'B', 35)", + "insert into multi_tag values (2021-01-01T10:30:00, 'east', 'B', 32)", + "insert into multi_tag values (2021-01-01T07:00:00, 'west', 'C', 50)", + "insert into multi_tag values (2021-01-01T08:00:00, 'west', 'C', 55)", + "insert into multi_tag values (2021-01-01T09:00:00, 'west', 'C', 52)", "FLUSH", "CLEAR ATTRIBUTE CACHE", }; @@ -176,4 +187,117 @@ public class IoTDBWindowFunction3IT { retArray, DATABASE_NAME); } + + @Test + public void testTopKRankingOrderByTimeAsc() { + // PARTITION BY all tags + ORDER BY time ASC triggers limit push-down to DeviceTableScanNode + // and streaming RowNumberOperator optimization. + String[] expectedHeader = new String[] {"time", "device", "value", "rn"}; + String[] retArray = + new String[] { + "2021-01-01T09:05:00.000Z,d1,3.0,1,", + "2021-01-01T09:07:00.000Z,d1,5.0,2,", + "2021-01-01T09:08:00.000Z,d2,2.0,1,", + "2021-01-01T09:15:00.000Z,d2,4.0,2,", + }; + tableResultSetEqualTest( + "SELECT * FROM (SELECT *, row_number() OVER (PARTITION BY device ORDER BY time ASC) as rn FROM demo) WHERE rn <= 2 ORDER BY device, time", + expectedHeader, + retArray, + DATABASE_NAME); + } + + @Test + public void testTopKRankingOrderByTimeDesc() { + // ORDER BY time DESC: returns newest K rows per device + String[] expectedHeader = new String[] {"time", "device", "value", "rn"}; + String[] retArray = + new String[] { + "2021-01-01T09:09:00.000Z,d1,3.0,2,", + "2021-01-01T09:10:00.000Z,d1,1.0,1,", + "2021-01-01T09:08:00.000Z,d2,2.0,2,", + "2021-01-01T09:15:00.000Z,d2,4.0,1,", + }; + tableResultSetEqualTest( + "SELECT * FROM (SELECT *, row_number() OVER (PARTITION BY device ORDER BY time DESC) as rn FROM demo) WHERE rn <= 2 ORDER BY device, time", + expectedHeader, + retArray, + DATABASE_NAME); + } + + @Test + public void testTopKRankingOrderByTimeLimit1() { + // rn <= 1: get exactly the oldest row per device + String[] expectedHeader = new String[] {"time", "device", "value", "rn"}; + String[] retArray = + new String[] { + "2021-01-01T09:05:00.000Z,d1,3.0,1,", + "2021-01-01T09:08:00.000Z,d2,2.0,1,", + }; + tableResultSetEqualTest( + "SELECT * FROM (SELECT *, row_number() OVER (PARTITION BY device ORDER BY time ASC) as rn FROM demo) WHERE rn <= 1 ORDER BY device", + expectedHeader, + retArray, + DATABASE_NAME); + } + + @Test + public void testTopKRankingOrderByTimeMultiTag() { + // Multi-tag table: PARTITION BY region, plant (all tags) ORDER BY time + String[] expectedHeader = new String[] {"time", "region", "plant", "temp", "rn"}; + String[] retArray = + new String[] { + "2021-01-01T08:00:00.000Z,east,A,10.0,1,", + "2021-01-01T09:00:00.000Z,east,A,20.0,2,", + "2021-01-01T08:30:00.000Z,east,B,30.0,1,", + "2021-01-01T09:30:00.000Z,east,B,35.0,2,", + "2021-01-01T07:00:00.000Z,west,C,50.0,1,", + "2021-01-01T08:00:00.000Z,west,C,55.0,2,", + }; + tableResultSetEqualTest( + "SELECT * FROM (SELECT *, row_number() OVER (PARTITION BY region, plant ORDER BY time ASC) as rn FROM multi_tag) WHERE rn <= 2 ORDER BY region, plant, time", + expectedHeader, + retArray, + DATABASE_NAME); + } + + @Test + public void testTopKRankingOrderByTimeMultiTagDesc() { + // Multi-tag table: ORDER BY time DESC returns newest rows per device + String[] expectedHeader = new String[] {"time", "region", "plant", "temp", "rn"}; + String[] retArray = + new String[] { + "2021-01-01T10:00:00.000Z,east,A,15.0,2,", + "2021-01-01T11:00:00.000Z,east,A,25.0,1,", + "2021-01-01T09:30:00.000Z,east,B,35.0,2,", + "2021-01-01T10:30:00.000Z,east,B,32.0,1,", + "2021-01-01T08:00:00.000Z,west,C,55.0,2,", + "2021-01-01T09:00:00.000Z,west,C,52.0,1,", + }; + tableResultSetEqualTest( + "SELECT * FROM (SELECT *, row_number() OVER (PARTITION BY region, plant ORDER BY time DESC) as rn FROM multi_tag) WHERE rn <= 2 ORDER BY region, plant, time", + expectedHeader, + retArray, + DATABASE_NAME); + } + + @Test + public void testTopKRankingOrderByTimeLimitExceedsRows() { + // rn <= 10 but d2 only has 2 rows - should return all available rows + String[] expectedHeader = new String[] {"time", "device", "value", "rn"}; + String[] retArray = + new String[] { + "2021-01-01T09:05:00.000Z,d1,3.0,1,", + "2021-01-01T09:07:00.000Z,d1,5.0,2,", + "2021-01-01T09:09:00.000Z,d1,3.0,3,", + "2021-01-01T09:10:00.000Z,d1,1.0,4,", + "2021-01-01T09:08:00.000Z,d2,2.0,1,", + "2021-01-01T09:15:00.000Z,d2,4.0,2,", + }; + tableResultSetEqualTest( + "SELECT * FROM (SELECT *, row_number() OVER (PARTITION BY device ORDER BY time ASC) as rn FROM demo) WHERE rn <= 10 ORDER BY device, time", + expectedHeader, + retArray, + DATABASE_NAME); + } } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/TableOperatorGenerator.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/TableOperatorGenerator.java index 1bc788251a7..cd23122e4b2 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/TableOperatorGenerator.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/TableOperatorGenerator.java @@ -4323,6 +4323,35 @@ public class TableOperatorGenerator extends PlanVisitor<Operator, LocalExecution List<Integer> partitionChannels = getChannelsForSymbols(partitionBySymbols, childLayout); List<TSDataType> inputDataTypes = getOutputColumnTypes(node.getChild(), context.getTypeProvider()); + + ImmutableList.Builder<Integer> outputChannels = ImmutableList.builder(); + for (int i = 0; i < inputDataTypes.size(); i++) { + outputChannels.add(i); + } + + // compute the layout of the output from the window operator + ImmutableMap.Builder<Symbol, Integer> outputMappings = ImmutableMap.builder(); + outputMappings.putAll(childLayout); + + if (!node.isPartial() || !partitionChannels.isEmpty()) { + int channel = inputDataTypes.size(); + outputMappings.put(node.getRankingSymbol(), channel); + } + + if (node.isDataPreSortedAndLimited()) { + // Data is already limited to K rows per partition and sorted by time. + // Use streaming RowNumberOperator (O(n) time, O(partitions) memory) + // instead of heap-based TopKRankingOperator (O(n log K) time, O(n) memory). + return new RowNumberOperator( + operatorContext, + child, + inputDataTypes, + outputChannels.build(), + partitionChannels, + Optional.of(node.getMaxRankingPerPartition()), + 10_000); + } + List<TSDataType> partitionTypes = partitionChannels.stream().map(inputDataTypes::get).collect(toImmutableList()); @@ -4341,21 +4370,6 @@ public class TableOperatorGenerator extends PlanVisitor<Operator, LocalExecution .collect(toImmutableList()); } - ImmutableList.Builder<Integer> outputChannels = ImmutableList.builder(); - for (int i = 0; i < inputDataTypes.size(); i++) { - outputChannels.add(i); - } - - // compute the layout of the output from the window operator - ImmutableMap.Builder<Symbol, Integer> outputMappings = ImmutableMap.builder(); - outputMappings.putAll(childLayout); - - if (!node.isPartial() || !partitionChannels.isEmpty()) { - // ranking function goes in the last channel - int channel = inputDataTypes.size(); - outputMappings.put(node.getRankingSymbol(), channel); - } - return new TopKRankingOperator( operatorContext, child, diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/distribute/TableDistributedPlanGenerator.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/distribute/TableDistributedPlanGenerator.java index 7072b5f519f..52dad04df42 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/distribute/TableDistributedPlanGenerator.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/distribute/TableDistributedPlanGenerator.java @@ -442,6 +442,33 @@ public class TableDistributedPlanGenerator return false; } + private boolean tryPushTopKRankingLimitToScan( + TopKRankingNode topKRankingNode, List<PlanNode> children, OrderingScheme orderingScheme) { + List<Symbol> orderBy = orderingScheme.getOrderBy(); + if (orderBy.size() != 1) { + return false; + } + Symbol orderSymbol = orderBy.get(0); + long limit = topKRankingNode.getMaxRankingPerPartition(); + boolean pushed = false; + + for (PlanNode child : children) { + if (child instanceof DeviceTableScanNode && !(child instanceof AggregationTableScanNode)) { + DeviceTableScanNode scanNode = (DeviceTableScanNode) child; + if (scanNode.isTimeColumn(orderSymbol)) { + scanNode.setPushLimitToEachDevice(true); + if (scanNode.getPushDownLimit() <= 0) { + scanNode.setPushDownLimit(limit); + } else { + scanNode.setPushDownLimit(Math.min(limit, scanNode.getPushDownLimit())); + } + pushed = true; + } + } + } + return pushed; + } + @Override public List<PlanNode> visitGroup(GroupNode node, PlanContext context) { context.setExpectedOrderingScheme(node.getOrderingScheme()); @@ -1885,7 +1912,6 @@ public class TableDistributedPlanGenerator nodeOrderingMap.put(node.getPlanNodeId(), orderingScheme.get()); } - // TODO: per partition topk eliminate checkArgument( node.getChildren().size() == 1, "Size of TopKRankingNode can only be 1 in logical plan."); boolean canSplitPushDown = node.getChild() instanceof GroupNode; @@ -1894,12 +1920,20 @@ public class TableDistributedPlanGenerator } List<PlanNode> childrenNodes = node.getChildren().get(0).accept(this, context); if (canSplitPushDown) { + // visitGroup may return GroupNode-wrapped children (sort not eliminated) or bare + // DeviceTableScanNode (sort eliminated). Unwrap GroupNode/SortNode when present. childrenNodes = childrenNodes.stream() - .map(child -> child.getChildren().get(0)) + .map(child -> child instanceof SortNode ? child.getChildren().get(0) : child) .collect(Collectors.toList()); } + if (canSplitPushDown && orderingScheme.isPresent()) { + if (tryPushTopKRankingLimitToScan(node, childrenNodes, orderingScheme.get())) { + node.setDataPreSortedAndLimited(true); + } + } + if (childrenNodes.size() == 1) { node.setChild(childrenNodes.get(0)); return Collections.singletonList(node); diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/node/TopKRankingNode.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/node/TopKRankingNode.java index fedb7e1f2e1..fd41ab4ca70 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/node/TopKRankingNode.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/node/TopKRankingNode.java @@ -50,6 +50,11 @@ public class TopKRankingNode extends SingleChildProcessNode { private final int maxRankingPerPartition; private final boolean partial; + // When true, the child scan already returns pre-sorted, pre-limited data (at most K rows per + // partition, ordered correctly). The operator can skip heap-based TopK selection and just assign + // sequential row numbers (streaming mode). + private boolean dataPreSortedAndLimited = false; + public TopKRankingNode( PlanNodeId id, DataOrganizationSpecification specification, @@ -119,6 +124,14 @@ public class TopKRankingNode extends SingleChildProcessNode { return rankingType; } + public boolean isDataPreSortedAndLimited() { + return dataPreSortedAndLimited; + } + + public void setDataPreSortedAndLimited(boolean dataPreSortedAndLimited) { + this.dataPreSortedAndLimited = dataPreSortedAndLimited; + } + @Override public List<String> getOutputColumnNames() { throw new UnsupportedOperationException(); @@ -132,6 +145,7 @@ public class TopKRankingNode extends SingleChildProcessNode { Symbol.serialize(rankingSymbol, byteBuffer); ReadWriteIOUtils.write(maxRankingPerPartition, byteBuffer); ReadWriteIOUtils.write(partial, byteBuffer); + ReadWriteIOUtils.write(dataPreSortedAndLimited, byteBuffer); } @Override @@ -142,6 +156,7 @@ public class TopKRankingNode extends SingleChildProcessNode { Symbol.serialize(rankingSymbol, stream); ReadWriteIOUtils.write(maxRankingPerPartition, stream); ReadWriteIOUtils.write(partial, stream); + ReadWriteIOUtils.write(dataPreSortedAndLimited, stream); } public static TopKRankingNode deserialize(ByteBuffer byteBuffer) { @@ -151,10 +166,14 @@ public class TopKRankingNode extends SingleChildProcessNode { Symbol rankingSymbol = Symbol.deserialize(byteBuffer); int maxRankingPerPartition = ReadWriteIOUtils.readInt(byteBuffer); boolean partial = ReadWriteIOUtils.readBoolean(byteBuffer); + boolean dataPreSortedAndLimited = ReadWriteIOUtils.readBoolean(byteBuffer); PlanNodeId planNodeId = PlanNodeId.deserialize(byteBuffer); - return new TopKRankingNode( - planNodeId, specification, rankingType, rankingSymbol, maxRankingPerPartition, partial); + TopKRankingNode node = + new TopKRankingNode( + planNodeId, specification, rankingType, rankingSymbol, maxRankingPerPartition, partial); + node.setDataPreSortedAndLimited(dataPreSortedAndLimited); + return node; } @Override diff --git a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/relational/planner/WindowFunctionOptimizationTest.java b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/relational/planner/WindowFunctionOptimizationTest.java index e31f2f7e580..6618ad518c5 100644 --- a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/relational/planner/WindowFunctionOptimizationTest.java +++ b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/relational/planner/WindowFunctionOptimizationTest.java @@ -20,12 +20,19 @@ package org.apache.iotdb.db.queryengine.plan.relational.planner; import org.apache.iotdb.db.queryengine.plan.planner.plan.LogicalQueryPlan; +import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNode; import org.apache.iotdb.db.queryengine.plan.relational.planner.assertions.PlanMatchPattern; +import org.apache.iotdb.db.queryengine.plan.relational.planner.node.DeviceTableScanNode; +import org.apache.iotdb.db.queryengine.plan.relational.planner.node.TopKRankingNode; import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableSet; import org.junit.Test; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertTrue; + import static org.apache.iotdb.db.queryengine.plan.relational.planner.assertions.PlanAssert.assertPlan; import static org.apache.iotdb.db.queryengine.plan.relational.planner.assertions.PlanMatchPattern.collect; import static org.apache.iotdb.db.queryengine.plan.relational.planner.assertions.PlanMatchPattern.exchange; @@ -297,4 +304,40 @@ public class WindowFunctionOptimizationTest { assertPlan(planTester.getFragmentPlan(1), rowNumber(tableScan)); assertPlan(planTester.getFragmentPlan(2), rowNumber(tableScan)); } + + @Test + public void testTopKRankingOrderByTimeLimitPushDown() { + PlanTester planTester = new PlanTester(); + + String sql = + "SELECT * FROM (SELECT *, row_number() OVER (PARTITION BY tag1, tag2, tag3 ORDER BY time) as rn FROM table1) WHERE rn <= 2"; + LogicalQueryPlan logicalQueryPlan = planTester.createPlan(sql); + PlanMatchPattern tableScan = tableScan("testdb.table1"); + + // Logical plan: OutputNode -> TopKRankingNode -> GroupNode -> TableScanNode + assertPlan(logicalQueryPlan, output(topKRanking(group(tableScan)))); + + // Distributed plan: TopKRankingNode pushed down to each partition with limit push-down. + // Fragment 0: OutputNode -> CollectNode -> ExchangeNodes + assertPlan( + planTester.getFragmentPlan(0), output(collect(exchange(), exchange(), exchange()))); + + // Worker fragments: TopKRankingNode -> DeviceTableScanNode + // Verify limit is pushed to DeviceTableScanNode and TopKRankingNode is marked for streaming. + for (int i = 1; i <= 2; i++) { + PlanNode fragmentRoot = planTester.getFragmentPlan(i); + assertTrue( + "Fragment " + i + " root should be TopKRankingNode", + fragmentRoot instanceof TopKRankingNode); + TopKRankingNode topKNode = (TopKRankingNode) fragmentRoot; + + PlanNode scanChild = topKNode.getChild(); + assertNotNull("TopKRankingNode should have a child", scanChild); + assertTrue( + "Child should be DeviceTableScanNode", scanChild instanceof DeviceTableScanNode); + DeviceTableScanNode dts = (DeviceTableScanNode) scanChild; + assertTrue("pushLimitToEachDevice should be true", dts.isPushLimitToEachDevice()); + assertEquals("pushDownLimit should be 2", 2, dts.getPushDownLimit()); + } + } }
