This is an automated email from the ASF dual-hosted git repository.

zhihao pushed a commit to branch perf/szh/window_func_limit_k_optimize
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit 8736c58b2e747db1a9c883f170e78141e8974aa3
Author: Sh-Zh-7 <[email protected]>
AuthorDate: Fri Feb 20 21:43:29 2026 +0800

    Fix window function ORDER BY time while outer query ORDER BY time DESC do 
not work bug.
---
 .../relational/it/db/it/IoTDBLimitKRankingIT.java  | 156 +++++++++++++++++++++
 .../relational/planner/iterative/rule/Util.java    |  12 +-
 .../planner/optimizations/SortElimination.java     |  43 +++++-
 .../planner/WindowFunctionOptimizationTest.java    | 115 +++++++++++++++
 4 files changed, 315 insertions(+), 11 deletions(-)

diff --git 
a/integration-test/src/test/java/org/apache/iotdb/relational/it/db/it/IoTDBLimitKRankingIT.java
 
b/integration-test/src/test/java/org/apache/iotdb/relational/it/db/it/IoTDBLimitKRankingIT.java
index 59cfdedae6c..de90dd2009e 100644
--- 
a/integration-test/src/test/java/org/apache/iotdb/relational/it/db/it/IoTDBLimitKRankingIT.java
+++ 
b/integration-test/src/test/java/org/apache/iotdb/relational/it/db/it/IoTDBLimitKRankingIT.java
@@ -244,4 +244,160 @@ public class IoTDBLimitKRankingIT {
         retArray,
         DATABASE_NAME);
   }
+
+  // ==================== ORDER BY time DESC tests ====================
+
+  @Test
+  public void testFilterPushDownOrderByTimeDesc() {
+    // ROW_NUMBER() OVER (PARTITION BY device ORDER BY time DESC) WHERE rn <= 2
+    // Should use LimitKRankingOperator: take latest 2 rows per device
+    String[] expectedHeader = new String[] {"time", "device", "value", "rn"};
+    String[] retArray =
+        new String[] {
+          "2021-01-01T09:09:00.000Z,d1,3.0,2,",
+          "2021-01-01T09:10:00.000Z,d1,1.0,1,",
+          "2021-01-01T09:08:00.000Z,d2,2.0,2,",
+          "2021-01-01T09:15:00.000Z,d2,4.0,1,",
+          "2021-01-01T09:04:00.000Z,d3,40.0,2,",
+          "2021-01-01T09:06:00.000Z,d3,50.0,1,",
+        };
+    tableResultSetEqualTest(
+        "SELECT * FROM ("
+            + "SELECT *, row_number() OVER (PARTITION BY device ORDER BY time 
DESC) as rn FROM demo"
+            + ") WHERE rn <= 2 ORDER BY device, time",
+        expectedHeader,
+        retArray,
+        DATABASE_NAME);
+  }
+
+  @Test
+  public void testFilterPushDownOrderByTimeDescK1() {
+    // K=1: only the latest row per device
+    String[] expectedHeader = new String[] {"time", "device", "value", "rn"};
+    String[] retArray =
+        new String[] {
+          "2021-01-01T09:10:00.000Z,d1,1.0,1,",
+          "2021-01-01T09:15:00.000Z,d2,4.0,1,",
+          "2021-01-01T09:06:00.000Z,d3,50.0,1,",
+        };
+    tableResultSetEqualTest(
+        "SELECT * FROM ("
+            + "SELECT *, row_number() OVER (PARTITION BY device ORDER BY time 
DESC) as rn FROM demo"
+            + ") WHERE rn <= 1 ORDER BY device, time",
+        expectedHeader,
+        retArray,
+        DATABASE_NAME);
+  }
+
+  @Test
+  public void testFilterPushDownOrderByTimeDescKLargerThanData() {
+    // K=100: larger than any partition, so all rows are returned
+    String[] expectedHeader = new String[] {"time", "device", "value", "rn"};
+    String[] retArray =
+        new String[] {
+          "2021-01-01T09:05:00.000Z,d1,3.0,4,",
+          "2021-01-01T09:07:00.000Z,d1,5.0,3,",
+          "2021-01-01T09:09:00.000Z,d1,3.0,2,",
+          "2021-01-01T09:10:00.000Z,d1,1.0,1,",
+          "2021-01-01T09:08:00.000Z,d2,2.0,2,",
+          "2021-01-01T09:15:00.000Z,d2,4.0,1,",
+          "2021-01-01T09:01:00.000Z,d3,10.0,5,",
+          "2021-01-01T09:02:00.000Z,d3,20.0,4,",
+          "2021-01-01T09:03:00.000Z,d3,30.0,3,",
+          "2021-01-01T09:04:00.000Z,d3,40.0,2,",
+          "2021-01-01T09:06:00.000Z,d3,50.0,1,",
+        };
+    tableResultSetEqualTest(
+        "SELECT * FROM ("
+            + "SELECT *, row_number() OVER (PARTITION BY device ORDER BY time 
DESC) as rn FROM demo"
+            + ") WHERE rn <= 100 ORDER BY device, time",
+        expectedHeader,
+        retArray,
+        DATABASE_NAME);
+  }
+
+  @Test
+  public void testLimitPushDownOrderByTimeDesc() {
+    // LIMIT pushdown: row_number() OVER (PARTITION BY device ORDER BY time 
DESC) LIMIT 4
+    String[] expectedHeader = new String[] {"time", "device", "value", "rn"};
+    String[] retArray =
+        new String[] {
+          "2021-01-01T09:10:00.000Z,d1,1.0,1,",
+          "2021-01-01T09:09:00.000Z,d1,3.0,2,",
+          "2021-01-01T09:07:00.000Z,d1,5.0,3,",
+          "2021-01-01T09:05:00.000Z,d1,3.0,4,",
+        };
+    tableResultSetEqualTest(
+        "SELECT * FROM ("
+            + "SELECT *, row_number() OVER (PARTITION BY device ORDER BY time 
DESC) as rn FROM demo"
+            + ") ORDER BY device, time DESC LIMIT 4",
+        expectedHeader,
+        retArray,
+        DATABASE_NAME);
+  }
+
+  @Test
+  public void testFilterPushDownOrderByTimeDescWithLessThan() {
+    // Use rn < 3 instead of rn <= 2 (should give same result as rn <= 2)
+    String[] expectedHeader = new String[] {"time", "device", "value", "rn"};
+    String[] retArray =
+        new String[] {
+          "2021-01-01T09:09:00.000Z,d1,3.0,2,",
+          "2021-01-01T09:10:00.000Z,d1,1.0,1,",
+          "2021-01-01T09:08:00.000Z,d2,2.0,2,",
+          "2021-01-01T09:15:00.000Z,d2,4.0,1,",
+          "2021-01-01T09:04:00.000Z,d3,40.0,2,",
+          "2021-01-01T09:06:00.000Z,d3,50.0,1,",
+        };
+    tableResultSetEqualTest(
+        "SELECT * FROM ("
+            + "SELECT *, row_number() OVER (PARTITION BY device ORDER BY time 
DESC) as rn FROM demo"
+            + ") WHERE rn < 3 ORDER BY device, time",
+        expectedHeader,
+        retArray,
+        DATABASE_NAME);
+  }
+
+  @Test
+  public void testFilterPushDownOrderByTimeDescSelectSubsetColumns() {
+    // Only select time, device, value (not the ranking column)
+    String[] expectedHeader = new String[] {"time", "device", "value"};
+    String[] retArray =
+        new String[] {
+          "2021-01-01T09:07:00.000Z,d1,5.0,",
+          "2021-01-01T09:09:00.000Z,d1,3.0,",
+          "2021-01-01T09:10:00.000Z,d1,1.0,",
+          "2021-01-01T09:08:00.000Z,d2,2.0,",
+          "2021-01-01T09:15:00.000Z,d2,4.0,",
+          "2021-01-01T09:03:00.000Z,d3,30.0,",
+          "2021-01-01T09:04:00.000Z,d3,40.0,",
+          "2021-01-01T09:06:00.000Z,d3,50.0,",
+        };
+    tableResultSetEqualTest(
+        "SELECT time, device, value FROM ("
+            + "SELECT *, row_number() OVER (PARTITION BY device ORDER BY time 
DESC) as rn FROM demo"
+            + ") WHERE rn <= 3 ORDER BY device, time",
+        expectedHeader,
+        retArray,
+        DATABASE_NAME);
+  }
+
+  @Test
+  public void testFilterPushDownOrderByTimeDescEqual() {
+    // rn = 2: only the second-latest row per device
+    String[] expectedHeader = new String[] {"time", "device", "value", "rn"};
+    String[] retArray =
+        new String[] {
+          "2021-01-01T09:09:00.000Z,d1,3.0,2,",
+          "2021-01-01T09:08:00.000Z,d2,2.0,2,",
+          "2021-01-01T09:04:00.000Z,d3,40.0,2,",
+        };
+    tableResultSetEqualTest(
+        "SELECT * FROM ("
+            + "SELECT *, row_number() OVER (PARTITION BY device ORDER BY time 
DESC) as rn FROM demo"
+            + ") WHERE rn = 2 ORDER BY device, time",
+        expectedHeader,
+        retArray,
+        DATABASE_NAME);
+  }
 }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/iterative/rule/Util.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/iterative/rule/Util.java
index b6a4d6aef72..b4227d11ce2 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/iterative/rule/Util.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/iterative/rule/Util.java
@@ -23,7 +23,6 @@ import 
org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNode;
 import org.apache.iotdb.db.queryengine.plan.relational.function.BoundSignature;
 import org.apache.iotdb.db.queryengine.plan.relational.planner.Assignments;
 import org.apache.iotdb.db.queryengine.plan.relational.planner.OrderingScheme;
-import org.apache.iotdb.db.queryengine.plan.relational.planner.SortOrder;
 import org.apache.iotdb.db.queryengine.plan.relational.planner.Symbol;
 import org.apache.iotdb.db.queryengine.plan.relational.planner.SymbolAllocator;
 import 
org.apache.iotdb.db.queryengine.plan.relational.planner.SymbolsExtractor;
@@ -149,9 +148,10 @@ final class Util {
   }
 
   /**
-   * Returns true when the window is ROW_NUMBER with ORDER BY on a single 
ascending TIMESTAMP
-   * column. In IoTDB, data is naturally sorted by (device, time), so we can 
use the streaming
-   * LimitKRankingNode instead of the buffered TopKRankingNode.
+   * Returns true when the window is ROW_NUMBER with ORDER BY on a single 
TIMESTAMP column (ASC or
+   * DESC). In IoTDB, data is naturally sorted by (device, time) and table 
scans support both
+   * forward and reverse iteration, so we can use the streaming 
LimitKRankingNode instead of the
+   * buffered TopKRankingNode for both ASC (earliest K) and DESC (latest K).
    */
   public static boolean canUseLimitKRanking(
       WindowNode windowNode,
@@ -172,10 +172,6 @@ final class Util {
     }
 
     Symbol orderSymbol = orderBy.get(0);
-    SortOrder sortOrder = orderingScheme.get().getOrdering(orderSymbol);
-    if (!sortOrder.isAscending()) {
-      return false;
-    }
 
     Type type = symbolAllocator.getTypes().getTableModelType(orderSymbol);
     return type instanceof TimestampType;
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/optimizations/SortElimination.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/optimizations/SortElimination.java
index bb276f07150..bdc986e63da 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/optimizations/SortElimination.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/optimizations/SortElimination.java
@@ -22,6 +22,7 @@ package 
org.apache.iotdb.db.queryengine.plan.relational.planner.optimizations;
 import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNode;
 import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanVisitor;
 import org.apache.iotdb.db.queryengine.plan.relational.planner.OrderingScheme;
+import org.apache.iotdb.db.queryengine.plan.relational.planner.SortOrder;
 import org.apache.iotdb.db.queryengine.plan.relational.planner.Symbol;
 import 
org.apache.iotdb.db.queryengine.plan.relational.planner.node.DeviceTableScanNode;
 import org.apache.iotdb.db.queryengine.plan.relational.planner.node.FillNode;
@@ -31,6 +32,7 @@ import 
org.apache.iotdb.db.queryengine.plan.relational.planner.node.SortNode;
 import 
org.apache.iotdb.db.queryengine.plan.relational.planner.node.StreamSortNode;
 import 
org.apache.iotdb.db.queryengine.plan.relational.planner.node.ValueFillNode;
 import org.apache.iotdb.db.queryengine.plan.relational.planner.node.WindowNode;
+import org.apache.iotdb.db.queryengine.plan.statement.component.Ordering;
 
 import java.util.Collections;
 
@@ -72,10 +74,13 @@ public class SortElimination implements PlanOptimizer {
       OrderingScheme orderingScheme = node.getOrderingScheme();
       if (context.canEliminateSort()
           && newContext.getTotalDeviceEntrySize() == 1
-          && 
orderingScheme.getOrderBy().get(0).getName().equals(context.getTimeColumnName()))
 {
+          && 
orderingScheme.getOrderBy().get(0).getName().equals(context.getTimeColumnName())
+          && timeSortDirectionMatchesScanOrder(orderingScheme, newContext)) {
         return child;
       }
-      return context.canEliminateSort() && node.isOrderByAllIdsAndTime()
+      return context.canEliminateSort()
+              && node.isOrderByAllIdsAndTime()
+              && timeSortDirectionMatchesScanOrder(orderingScheme, newContext)
           ? child
           : node.replaceChildren(Collections.singletonList(child));
     }
@@ -86,17 +91,39 @@ public class SortElimination implements PlanOptimizer {
       PlanNode child = node.getChild().accept(this, newContext);
       context.setCannotEliminateSort(newContext.cannotEliminateSort);
       return context.canEliminateSort()
-              && (node.isOrderByAllIdsAndTime()
+              && ((node.isOrderByAllIdsAndTime()
+                      && 
timeSortDirectionMatchesScanOrder(node.getOrderingScheme(), newContext))
                   || node.getStreamCompareKeyEndIndex()
                       == node.getOrderingScheme().getOrderBy().size() - 1)
           ? child
           : node.replaceChildren(Collections.singletonList(child));
     }
 
+    /**
+     * Checks whether the sort direction on the time column matches the 
underlying scan order. When
+     * the scan is DESC (e.g. due to LimitKRanking with ORDER BY time DESC), a 
sort expecting ASC
+     * must not be eliminated.
+     */
+    private boolean timeSortDirectionMatchesScanOrder(
+        OrderingScheme orderingScheme, Context childContext) {
+      if (childContext.getTimeColumnName() == null) {
+        return true;
+      }
+      for (Symbol symbol : orderingScheme.getOrderBy()) {
+        if (symbol.getName().equals(childContext.getTimeColumnName())) {
+          SortOrder sortOrder = orderingScheme.getOrdering(symbol);
+          boolean scanAscending = childContext.getScanOrder() == Ordering.ASC;
+          return sortOrder.isAscending() == scanAscending;
+        }
+      }
+      return true;
+    }
+
     @Override
     public PlanNode visitDeviceTableScan(DeviceTableScanNode node, Context 
context) {
       context.addDeviceEntrySize(node.getDeviceEntries().size());
       
context.setTimeColumnName(node.getTimeColumn().map(Symbol::getName).orElse(null));
+      context.setScanOrder(node.getScanOrder());
       return node;
     }
 
@@ -152,6 +179,8 @@ public class SortElimination implements PlanOptimizer {
 
     private String timeColumnName = null;
 
+    private Ordering scanOrder = Ordering.ASC;
+
     Context() {}
 
     public void addDeviceEntrySize(int deviceEntrySize) {
@@ -177,5 +206,13 @@ public class SortElimination implements PlanOptimizer {
     public void setTimeColumnName(String timeColumnName) {
       this.timeColumnName = timeColumnName;
     }
+
+    public Ordering getScanOrder() {
+      return scanOrder;
+    }
+
+    public void setScanOrder(Ordering scanOrder) {
+      this.scanOrder = scanOrder;
+    }
   }
 }
diff --git 
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/relational/planner/WindowFunctionOptimizationTest.java
 
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/relational/planner/WindowFunctionOptimizationTest.java
index ba55d94af9f..5f3dc0b3dcc 100644
--- 
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/relational/planner/WindowFunctionOptimizationTest.java
+++ 
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/relational/planner/WindowFunctionOptimizationTest.java
@@ -383,6 +383,121 @@ public class WindowFunctionOptimizationTest {
     assertPlan(planTester.getFragmentPlan(5), tableScan);
   }
 
+  @Test
+  public void testLimitKRankingPushDownFilterOrderByTimeDesc() {
+    PlanTester planTester = new PlanTester();
+
+    // ORDER BY time DESC triggers LimitKRankingNode instead of TopKRankingNode
+    String sql =
+        "SELECT * FROM (SELECT *, row_number() OVER (PARTITION BY tag1 ORDER 
BY time DESC) as rn FROM table1) WHERE rn <= 3";
+    LogicalQueryPlan logicalQueryPlan = planTester.createPlan(sql);
+    PlanMatchPattern tableScan = tableScan("testdb.table1");
+
+    /*
+     *   └──OutputNode
+     *         └──LimitKRankingNode
+     *              └──SortNode
+     *                   └──TableScanNode
+     */
+    assertPlan(logicalQueryPlan, output(limitKRanking(sort(tableScan))));
+
+    /*  OutputNode
+     *     └──LimitKRankingNode
+     *         └──CollectNode
+     *               ├──ExchangeNode
+     *               │        └──TableScan
+     *               ├──ExchangeNode
+     *               │        └──TableScan
+     *               └──ExchangeNode
+     *                        └──TableScan
+     */
+    assertPlan(
+        planTester.getFragmentPlan(0),
+        output(limitKRanking(collect(exchange(), exchange(), exchange()))));
+    assertPlan(planTester.getFragmentPlan(1), tableScan);
+    assertPlan(planTester.getFragmentPlan(2), tableScan);
+    assertPlan(planTester.getFragmentPlan(3), tableScan);
+  }
+
+  @Test
+  public void testLimitKRankingPushDownFilterOrderByTimeDescWithAllTags() {
+    PlanTester planTester = new PlanTester();
+
+    // All tag columns in PARTITION BY + ORDER BY time DESC → 
LimitKRankingNode with GroupNode push
+    // down
+    String sql =
+        "SELECT * FROM (SELECT *, row_number() OVER (PARTITION BY tag1, tag2, 
tag3 ORDER BY time DESC) as rn FROM table1) WHERE rn <= 2";
+    LogicalQueryPlan logicalQueryPlan = planTester.createPlan(sql);
+    PlanMatchPattern tableScan = tableScan("testdb.table1");
+
+    /*
+     *   └──OutputNode
+     *        └──LimitKRankingNode
+     *              └──GroupNode
+     *                   └──TableScanNode
+     */
+    assertPlan(logicalQueryPlan, output(limitKRanking(group(tableScan))));
+
+    /*  OutputNode
+     *         └──CollectNode
+     *               ├──ExchangeNode
+     *               │    └──LimitKRankingNode
+     *               │        └──TableScan
+     *               ├──ExchangeNode
+     *               │    └──LimitKRankingNode
+     *               │        └──TableScan
+     *               └──ExchangeNode
+     *                    └──LimitKRankingNode
+     *                        └──SortNode
+     *                            └──TableScan
+     */
+    assertPlan(
+        planTester.getFragmentPlan(0), output((collect(exchange(), exchange(), 
exchange()))));
+    assertPlan(planTester.getFragmentPlan(1), limitKRanking(tableScan));
+    assertPlan(planTester.getFragmentPlan(2), limitKRanking(tableScan));
+    assertPlan(planTester.getFragmentPlan(3), 
limitKRanking(mergeSort(exchange(), exchange())));
+    assertPlan(planTester.getFragmentPlan(4), tableScan);
+    assertPlan(planTester.getFragmentPlan(5), tableScan);
+  }
+
+  @Test
+  public void testLimitKRankingPushDownLimitOrderByTimeDesc() {
+    PlanTester planTester = new PlanTester();
+
+    // LIMIT pushdown + ORDER BY time DESC → LimitKRankingNode
+    String sql =
+        "SELECT * FROM (SELECT *, row_number() OVER (PARTITION BY tag1 ORDER 
BY time DESC) as rn FROM table1) LIMIT 5";
+    LogicalQueryPlan logicalQueryPlan = planTester.createPlan(sql);
+    PlanMatchPattern tableScan = tableScan("testdb.table1");
+
+    /*
+     *   └──OutputNode
+     *        └──LimitNode
+     *            └──LimitKRankingNode
+     *                └──SortNode
+     *                    └──TableScanNode
+     */
+    assertPlan(logicalQueryPlan, output(limit(5, 
limitKRanking(sort(tableScan)))));
+
+    /*  OutputNode
+     *    └──LimitNode
+     *      └──LimitKRankingNode
+     *          └──CollectNode
+     *               ├──ExchangeNode
+     *               │        └──TableScan
+     *               ├──ExchangeNode
+     *               │        └──TableScan
+     *               └──ExchangeNode
+     *                        └──TableScan
+     */
+    assertPlan(
+        planTester.getFragmentPlan(0),
+        output(limit(5, limitKRanking(collect(exchange(), exchange(), 
exchange())))));
+    assertPlan(planTester.getFragmentPlan(1), tableScan);
+    assertPlan(planTester.getFragmentPlan(2), tableScan);
+    assertPlan(planTester.getFragmentPlan(3), tableScan);
+  }
+
   @Test
   public void testLimitKRankingPushDownLimitOrderByTime() {
     PlanTester planTester = new PlanTester();

Reply via email to