This is an automated email from the ASF dual-hosted git repository. zhihao pushed a commit to branch perf/szh/window_func_limit_k_optimize in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit 8736c58b2e747db1a9c883f170e78141e8974aa3 Author: Sh-Zh-7 <[email protected]> AuthorDate: Fri Feb 20 21:43:29 2026 +0800 Fix window function ORDER BY time while outer query ORDER BY time DESC do not work bug. --- .../relational/it/db/it/IoTDBLimitKRankingIT.java | 156 +++++++++++++++++++++ .../relational/planner/iterative/rule/Util.java | 12 +- .../planner/optimizations/SortElimination.java | 43 +++++- .../planner/WindowFunctionOptimizationTest.java | 115 +++++++++++++++ 4 files changed, 315 insertions(+), 11 deletions(-) diff --git a/integration-test/src/test/java/org/apache/iotdb/relational/it/db/it/IoTDBLimitKRankingIT.java b/integration-test/src/test/java/org/apache/iotdb/relational/it/db/it/IoTDBLimitKRankingIT.java index 59cfdedae6c..de90dd2009e 100644 --- a/integration-test/src/test/java/org/apache/iotdb/relational/it/db/it/IoTDBLimitKRankingIT.java +++ b/integration-test/src/test/java/org/apache/iotdb/relational/it/db/it/IoTDBLimitKRankingIT.java @@ -244,4 +244,160 @@ public class IoTDBLimitKRankingIT { retArray, DATABASE_NAME); } + + // ==================== ORDER BY time DESC tests ==================== + + @Test + public void testFilterPushDownOrderByTimeDesc() { + // ROW_NUMBER() OVER (PARTITION BY device ORDER BY time DESC) WHERE rn <= 2 + // Should use LimitKRankingOperator: take latest 2 rows per device + String[] expectedHeader = new String[] {"time", "device", "value", "rn"}; + String[] retArray = + new String[] { + "2021-01-01T09:09:00.000Z,d1,3.0,2,", + "2021-01-01T09:10:00.000Z,d1,1.0,1,", + "2021-01-01T09:08:00.000Z,d2,2.0,2,", + "2021-01-01T09:15:00.000Z,d2,4.0,1,", + "2021-01-01T09:04:00.000Z,d3,40.0,2,", + "2021-01-01T09:06:00.000Z,d3,50.0,1,", + }; + tableResultSetEqualTest( + "SELECT * FROM (" + + "SELECT *, row_number() OVER (PARTITION BY device ORDER BY time DESC) as rn FROM demo" + + ") WHERE rn <= 2 ORDER BY device, time", + expectedHeader, + retArray, + DATABASE_NAME); + } + + @Test + public void testFilterPushDownOrderByTimeDescK1() { + // K=1: only the latest row per device + String[] expectedHeader = new String[] {"time", "device", "value", "rn"}; + String[] retArray = + new String[] { + "2021-01-01T09:10:00.000Z,d1,1.0,1,", + "2021-01-01T09:15:00.000Z,d2,4.0,1,", + "2021-01-01T09:06:00.000Z,d3,50.0,1,", + }; + tableResultSetEqualTest( + "SELECT * FROM (" + + "SELECT *, row_number() OVER (PARTITION BY device ORDER BY time DESC) as rn FROM demo" + + ") WHERE rn <= 1 ORDER BY device, time", + expectedHeader, + retArray, + DATABASE_NAME); + } + + @Test + public void testFilterPushDownOrderByTimeDescKLargerThanData() { + // K=100: larger than any partition, so all rows are returned + String[] expectedHeader = new String[] {"time", "device", "value", "rn"}; + String[] retArray = + new String[] { + "2021-01-01T09:05:00.000Z,d1,3.0,4,", + "2021-01-01T09:07:00.000Z,d1,5.0,3,", + "2021-01-01T09:09:00.000Z,d1,3.0,2,", + "2021-01-01T09:10:00.000Z,d1,1.0,1,", + "2021-01-01T09:08:00.000Z,d2,2.0,2,", + "2021-01-01T09:15:00.000Z,d2,4.0,1,", + "2021-01-01T09:01:00.000Z,d3,10.0,5,", + "2021-01-01T09:02:00.000Z,d3,20.0,4,", + "2021-01-01T09:03:00.000Z,d3,30.0,3,", + "2021-01-01T09:04:00.000Z,d3,40.0,2,", + "2021-01-01T09:06:00.000Z,d3,50.0,1,", + }; + tableResultSetEqualTest( + "SELECT * FROM (" + + "SELECT *, row_number() OVER (PARTITION BY device ORDER BY time DESC) as rn FROM demo" + + ") WHERE rn <= 100 ORDER BY device, time", + expectedHeader, + retArray, + DATABASE_NAME); + } + + @Test + public void testLimitPushDownOrderByTimeDesc() { + // LIMIT pushdown: row_number() OVER (PARTITION BY device ORDER BY time DESC) LIMIT 4 + String[] expectedHeader = new String[] {"time", "device", "value", "rn"}; + String[] retArray = + new String[] { + "2021-01-01T09:10:00.000Z,d1,1.0,1,", + "2021-01-01T09:09:00.000Z,d1,3.0,2,", + "2021-01-01T09:07:00.000Z,d1,5.0,3,", + "2021-01-01T09:05:00.000Z,d1,3.0,4,", + }; + tableResultSetEqualTest( + "SELECT * FROM (" + + "SELECT *, row_number() OVER (PARTITION BY device ORDER BY time DESC) as rn FROM demo" + + ") ORDER BY device, time DESC LIMIT 4", + expectedHeader, + retArray, + DATABASE_NAME); + } + + @Test + public void testFilterPushDownOrderByTimeDescWithLessThan() { + // Use rn < 3 instead of rn <= 2 (should give same result as rn <= 2) + String[] expectedHeader = new String[] {"time", "device", "value", "rn"}; + String[] retArray = + new String[] { + "2021-01-01T09:09:00.000Z,d1,3.0,2,", + "2021-01-01T09:10:00.000Z,d1,1.0,1,", + "2021-01-01T09:08:00.000Z,d2,2.0,2,", + "2021-01-01T09:15:00.000Z,d2,4.0,1,", + "2021-01-01T09:04:00.000Z,d3,40.0,2,", + "2021-01-01T09:06:00.000Z,d3,50.0,1,", + }; + tableResultSetEqualTest( + "SELECT * FROM (" + + "SELECT *, row_number() OVER (PARTITION BY device ORDER BY time DESC) as rn FROM demo" + + ") WHERE rn < 3 ORDER BY device, time", + expectedHeader, + retArray, + DATABASE_NAME); + } + + @Test + public void testFilterPushDownOrderByTimeDescSelectSubsetColumns() { + // Only select time, device, value (not the ranking column) + String[] expectedHeader = new String[] {"time", "device", "value"}; + String[] retArray = + new String[] { + "2021-01-01T09:07:00.000Z,d1,5.0,", + "2021-01-01T09:09:00.000Z,d1,3.0,", + "2021-01-01T09:10:00.000Z,d1,1.0,", + "2021-01-01T09:08:00.000Z,d2,2.0,", + "2021-01-01T09:15:00.000Z,d2,4.0,", + "2021-01-01T09:03:00.000Z,d3,30.0,", + "2021-01-01T09:04:00.000Z,d3,40.0,", + "2021-01-01T09:06:00.000Z,d3,50.0,", + }; + tableResultSetEqualTest( + "SELECT time, device, value FROM (" + + "SELECT *, row_number() OVER (PARTITION BY device ORDER BY time DESC) as rn FROM demo" + + ") WHERE rn <= 3 ORDER BY device, time", + expectedHeader, + retArray, + DATABASE_NAME); + } + + @Test + public void testFilterPushDownOrderByTimeDescEqual() { + // rn = 2: only the second-latest row per device + String[] expectedHeader = new String[] {"time", "device", "value", "rn"}; + String[] retArray = + new String[] { + "2021-01-01T09:09:00.000Z,d1,3.0,2,", + "2021-01-01T09:08:00.000Z,d2,2.0,2,", + "2021-01-01T09:04:00.000Z,d3,40.0,2,", + }; + tableResultSetEqualTest( + "SELECT * FROM (" + + "SELECT *, row_number() OVER (PARTITION BY device ORDER BY time DESC) as rn FROM demo" + + ") WHERE rn = 2 ORDER BY device, time", + expectedHeader, + retArray, + DATABASE_NAME); + } } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/iterative/rule/Util.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/iterative/rule/Util.java index b6a4d6aef72..b4227d11ce2 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/iterative/rule/Util.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/iterative/rule/Util.java @@ -23,7 +23,6 @@ import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNode; import org.apache.iotdb.db.queryengine.plan.relational.function.BoundSignature; import org.apache.iotdb.db.queryengine.plan.relational.planner.Assignments; import org.apache.iotdb.db.queryengine.plan.relational.planner.OrderingScheme; -import org.apache.iotdb.db.queryengine.plan.relational.planner.SortOrder; import org.apache.iotdb.db.queryengine.plan.relational.planner.Symbol; import org.apache.iotdb.db.queryengine.plan.relational.planner.SymbolAllocator; import org.apache.iotdb.db.queryengine.plan.relational.planner.SymbolsExtractor; @@ -149,9 +148,10 @@ final class Util { } /** - * Returns true when the window is ROW_NUMBER with ORDER BY on a single ascending TIMESTAMP - * column. In IoTDB, data is naturally sorted by (device, time), so we can use the streaming - * LimitKRankingNode instead of the buffered TopKRankingNode. + * Returns true when the window is ROW_NUMBER with ORDER BY on a single TIMESTAMP column (ASC or + * DESC). In IoTDB, data is naturally sorted by (device, time) and table scans support both + * forward and reverse iteration, so we can use the streaming LimitKRankingNode instead of the + * buffered TopKRankingNode for both ASC (earliest K) and DESC (latest K). */ public static boolean canUseLimitKRanking( WindowNode windowNode, @@ -172,10 +172,6 @@ final class Util { } Symbol orderSymbol = orderBy.get(0); - SortOrder sortOrder = orderingScheme.get().getOrdering(orderSymbol); - if (!sortOrder.isAscending()) { - return false; - } Type type = symbolAllocator.getTypes().getTableModelType(orderSymbol); return type instanceof TimestampType; diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/optimizations/SortElimination.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/optimizations/SortElimination.java index bb276f07150..bdc986e63da 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/optimizations/SortElimination.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/optimizations/SortElimination.java @@ -22,6 +22,7 @@ package org.apache.iotdb.db.queryengine.plan.relational.planner.optimizations; import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNode; import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanVisitor; import org.apache.iotdb.db.queryengine.plan.relational.planner.OrderingScheme; +import org.apache.iotdb.db.queryengine.plan.relational.planner.SortOrder; import org.apache.iotdb.db.queryengine.plan.relational.planner.Symbol; import org.apache.iotdb.db.queryengine.plan.relational.planner.node.DeviceTableScanNode; import org.apache.iotdb.db.queryengine.plan.relational.planner.node.FillNode; @@ -31,6 +32,7 @@ import org.apache.iotdb.db.queryengine.plan.relational.planner.node.SortNode; import org.apache.iotdb.db.queryengine.plan.relational.planner.node.StreamSortNode; import org.apache.iotdb.db.queryengine.plan.relational.planner.node.ValueFillNode; import org.apache.iotdb.db.queryengine.plan.relational.planner.node.WindowNode; +import org.apache.iotdb.db.queryengine.plan.statement.component.Ordering; import java.util.Collections; @@ -72,10 +74,13 @@ public class SortElimination implements PlanOptimizer { OrderingScheme orderingScheme = node.getOrderingScheme(); if (context.canEliminateSort() && newContext.getTotalDeviceEntrySize() == 1 - && orderingScheme.getOrderBy().get(0).getName().equals(context.getTimeColumnName())) { + && orderingScheme.getOrderBy().get(0).getName().equals(context.getTimeColumnName()) + && timeSortDirectionMatchesScanOrder(orderingScheme, newContext)) { return child; } - return context.canEliminateSort() && node.isOrderByAllIdsAndTime() + return context.canEliminateSort() + && node.isOrderByAllIdsAndTime() + && timeSortDirectionMatchesScanOrder(orderingScheme, newContext) ? child : node.replaceChildren(Collections.singletonList(child)); } @@ -86,17 +91,39 @@ public class SortElimination implements PlanOptimizer { PlanNode child = node.getChild().accept(this, newContext); context.setCannotEliminateSort(newContext.cannotEliminateSort); return context.canEliminateSort() - && (node.isOrderByAllIdsAndTime() + && ((node.isOrderByAllIdsAndTime() + && timeSortDirectionMatchesScanOrder(node.getOrderingScheme(), newContext)) || node.getStreamCompareKeyEndIndex() == node.getOrderingScheme().getOrderBy().size() - 1) ? child : node.replaceChildren(Collections.singletonList(child)); } + /** + * Checks whether the sort direction on the time column matches the underlying scan order. When + * the scan is DESC (e.g. due to LimitKRanking with ORDER BY time DESC), a sort expecting ASC + * must not be eliminated. + */ + private boolean timeSortDirectionMatchesScanOrder( + OrderingScheme orderingScheme, Context childContext) { + if (childContext.getTimeColumnName() == null) { + return true; + } + for (Symbol symbol : orderingScheme.getOrderBy()) { + if (symbol.getName().equals(childContext.getTimeColumnName())) { + SortOrder sortOrder = orderingScheme.getOrdering(symbol); + boolean scanAscending = childContext.getScanOrder() == Ordering.ASC; + return sortOrder.isAscending() == scanAscending; + } + } + return true; + } + @Override public PlanNode visitDeviceTableScan(DeviceTableScanNode node, Context context) { context.addDeviceEntrySize(node.getDeviceEntries().size()); context.setTimeColumnName(node.getTimeColumn().map(Symbol::getName).orElse(null)); + context.setScanOrder(node.getScanOrder()); return node; } @@ -152,6 +179,8 @@ public class SortElimination implements PlanOptimizer { private String timeColumnName = null; + private Ordering scanOrder = Ordering.ASC; + Context() {} public void addDeviceEntrySize(int deviceEntrySize) { @@ -177,5 +206,13 @@ public class SortElimination implements PlanOptimizer { public void setTimeColumnName(String timeColumnName) { this.timeColumnName = timeColumnName; } + + public Ordering getScanOrder() { + return scanOrder; + } + + public void setScanOrder(Ordering scanOrder) { + this.scanOrder = scanOrder; + } } } diff --git a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/relational/planner/WindowFunctionOptimizationTest.java b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/relational/planner/WindowFunctionOptimizationTest.java index ba55d94af9f..5f3dc0b3dcc 100644 --- a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/relational/planner/WindowFunctionOptimizationTest.java +++ b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/relational/planner/WindowFunctionOptimizationTest.java @@ -383,6 +383,121 @@ public class WindowFunctionOptimizationTest { assertPlan(planTester.getFragmentPlan(5), tableScan); } + @Test + public void testLimitKRankingPushDownFilterOrderByTimeDesc() { + PlanTester planTester = new PlanTester(); + + // ORDER BY time DESC triggers LimitKRankingNode instead of TopKRankingNode + String sql = + "SELECT * FROM (SELECT *, row_number() OVER (PARTITION BY tag1 ORDER BY time DESC) as rn FROM table1) WHERE rn <= 3"; + LogicalQueryPlan logicalQueryPlan = planTester.createPlan(sql); + PlanMatchPattern tableScan = tableScan("testdb.table1"); + + /* + * └──OutputNode + * └──LimitKRankingNode + * └──SortNode + * └──TableScanNode + */ + assertPlan(logicalQueryPlan, output(limitKRanking(sort(tableScan)))); + + /* OutputNode + * └──LimitKRankingNode + * └──CollectNode + * ├──ExchangeNode + * │ └──TableScan + * ├──ExchangeNode + * │ └──TableScan + * └──ExchangeNode + * └──TableScan + */ + assertPlan( + planTester.getFragmentPlan(0), + output(limitKRanking(collect(exchange(), exchange(), exchange())))); + assertPlan(planTester.getFragmentPlan(1), tableScan); + assertPlan(planTester.getFragmentPlan(2), tableScan); + assertPlan(planTester.getFragmentPlan(3), tableScan); + } + + @Test + public void testLimitKRankingPushDownFilterOrderByTimeDescWithAllTags() { + PlanTester planTester = new PlanTester(); + + // All tag columns in PARTITION BY + ORDER BY time DESC → LimitKRankingNode with GroupNode push + // down + String sql = + "SELECT * FROM (SELECT *, row_number() OVER (PARTITION BY tag1, tag2, tag3 ORDER BY time DESC) as rn FROM table1) WHERE rn <= 2"; + LogicalQueryPlan logicalQueryPlan = planTester.createPlan(sql); + PlanMatchPattern tableScan = tableScan("testdb.table1"); + + /* + * └──OutputNode + * └──LimitKRankingNode + * └──GroupNode + * └──TableScanNode + */ + assertPlan(logicalQueryPlan, output(limitKRanking(group(tableScan)))); + + /* OutputNode + * └──CollectNode + * ├──ExchangeNode + * │ └──LimitKRankingNode + * │ └──TableScan + * ├──ExchangeNode + * │ └──LimitKRankingNode + * │ └──TableScan + * └──ExchangeNode + * └──LimitKRankingNode + * └──SortNode + * └──TableScan + */ + assertPlan( + planTester.getFragmentPlan(0), output((collect(exchange(), exchange(), exchange())))); + assertPlan(planTester.getFragmentPlan(1), limitKRanking(tableScan)); + assertPlan(planTester.getFragmentPlan(2), limitKRanking(tableScan)); + assertPlan(planTester.getFragmentPlan(3), limitKRanking(mergeSort(exchange(), exchange()))); + assertPlan(planTester.getFragmentPlan(4), tableScan); + assertPlan(planTester.getFragmentPlan(5), tableScan); + } + + @Test + public void testLimitKRankingPushDownLimitOrderByTimeDesc() { + PlanTester planTester = new PlanTester(); + + // LIMIT pushdown + ORDER BY time DESC → LimitKRankingNode + String sql = + "SELECT * FROM (SELECT *, row_number() OVER (PARTITION BY tag1 ORDER BY time DESC) as rn FROM table1) LIMIT 5"; + LogicalQueryPlan logicalQueryPlan = planTester.createPlan(sql); + PlanMatchPattern tableScan = tableScan("testdb.table1"); + + /* + * └──OutputNode + * └──LimitNode + * └──LimitKRankingNode + * └──SortNode + * └──TableScanNode + */ + assertPlan(logicalQueryPlan, output(limit(5, limitKRanking(sort(tableScan))))); + + /* OutputNode + * └──LimitNode + * └──LimitKRankingNode + * └──CollectNode + * ├──ExchangeNode + * │ └──TableScan + * ├──ExchangeNode + * │ └──TableScan + * └──ExchangeNode + * └──TableScan + */ + assertPlan( + planTester.getFragmentPlan(0), + output(limit(5, limitKRanking(collect(exchange(), exchange(), exchange()))))); + assertPlan(planTester.getFragmentPlan(1), tableScan); + assertPlan(planTester.getFragmentPlan(2), tableScan); + assertPlan(planTester.getFragmentPlan(3), tableScan); + } + @Test public void testLimitKRankingPushDownLimitOrderByTime() { PlanTester planTester = new PlanTester();
