This is an automated email from the ASF dual-hosted git repository.

zhihao pushed a commit to branch perf/szh/push_limit_to_table_scan
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit a1313099273a03e4bfa79bed98436582898f0484
Author: Sh-Zh-7 <[email protected]>
AuthorDate: Thu Feb 26 15:17:59 2026 +0800

    TopK push down limit to table scan node prototype.
---
 .../it/db/it/IoTDBWindowFunction3IT.java           | 124 +++++++++++++++++++++
 .../plan/planner/TableOperatorGenerator.java       |  44 +++++---
 .../distribute/TableDistributedPlanGenerator.java  |  38 ++++++-
 .../relational/planner/node/TopKRankingNode.java   |  23 +++-
 .../planner/WindowFunctionOptimizationTest.java    |  43 +++++++
 5 files changed, 253 insertions(+), 19 deletions(-)

diff --git 
a/integration-test/src/test/java/org/apache/iotdb/relational/it/db/it/IoTDBWindowFunction3IT.java
 
b/integration-test/src/test/java/org/apache/iotdb/relational/it/db/it/IoTDBWindowFunction3IT.java
index d461f3a11fe..a0e23ea1996 100644
--- 
a/integration-test/src/test/java/org/apache/iotdb/relational/it/db/it/IoTDBWindowFunction3IT.java
+++ 
b/integration-test/src/test/java/org/apache/iotdb/relational/it/db/it/IoTDBWindowFunction3IT.java
@@ -51,6 +51,17 @@ public class IoTDBWindowFunction3IT {
         "insert into demo values (2021-01-01T09:10:00, 'd1', 1)",
         "insert into demo values (2021-01-01T09:08:00, 'd2', 2)",
         "insert into demo values (2021-01-01T09:15:00, 'd2', 4)",
+        "create table multi_tag (region string tag, plant string tag, temp 
double field)",
+        "insert into multi_tag values (2021-01-01T08:00:00, 'east', 'A', 10)",
+        "insert into multi_tag values (2021-01-01T09:00:00, 'east', 'A', 20)",
+        "insert into multi_tag values (2021-01-01T10:00:00, 'east', 'A', 15)",
+        "insert into multi_tag values (2021-01-01T11:00:00, 'east', 'A', 25)",
+        "insert into multi_tag values (2021-01-01T08:30:00, 'east', 'B', 30)",
+        "insert into multi_tag values (2021-01-01T09:30:00, 'east', 'B', 35)",
+        "insert into multi_tag values (2021-01-01T10:30:00, 'east', 'B', 32)",
+        "insert into multi_tag values (2021-01-01T07:00:00, 'west', 'C', 50)",
+        "insert into multi_tag values (2021-01-01T08:00:00, 'west', 'C', 55)",
+        "insert into multi_tag values (2021-01-01T09:00:00, 'west', 'C', 52)",
         "FLUSH",
         "CLEAR ATTRIBUTE CACHE",
       };
@@ -176,4 +187,117 @@ public class IoTDBWindowFunction3IT {
         retArray,
         DATABASE_NAME);
   }
+
+  @Test
+  public void testTopKRankingOrderByTimeAsc() {
+    // PARTITION BY all tags + ORDER BY time ASC triggers limit push-down to 
DeviceTableScanNode
+    // and streaming RowNumberOperator optimization.
+    String[] expectedHeader = new String[] {"time", "device", "value", "rn"};
+    String[] retArray =
+        new String[] {
+          "2021-01-01T09:05:00.000Z,d1,3.0,1,",
+          "2021-01-01T09:07:00.000Z,d1,5.0,2,",
+          "2021-01-01T09:08:00.000Z,d2,2.0,1,",
+          "2021-01-01T09:15:00.000Z,d2,4.0,2,",
+        };
+    tableResultSetEqualTest(
+        "SELECT * FROM (SELECT *, row_number() OVER (PARTITION BY device ORDER 
BY time ASC) as rn FROM demo) WHERE rn <= 2 ORDER BY device, time",
+        expectedHeader,
+        retArray,
+        DATABASE_NAME);
+  }
+
+  @Test
+  public void testTopKRankingOrderByTimeDesc() {
+    // ORDER BY time DESC: returns newest K rows per device
+    String[] expectedHeader = new String[] {"time", "device", "value", "rn"};
+    String[] retArray =
+        new String[] {
+          "2021-01-01T09:09:00.000Z,d1,3.0,2,",
+          "2021-01-01T09:10:00.000Z,d1,1.0,1,",
+          "2021-01-01T09:08:00.000Z,d2,2.0,2,",
+          "2021-01-01T09:15:00.000Z,d2,4.0,1,",
+        };
+    tableResultSetEqualTest(
+        "SELECT * FROM (SELECT *, row_number() OVER (PARTITION BY device ORDER 
BY time DESC) as rn FROM demo) WHERE rn <= 2 ORDER BY device, time",
+        expectedHeader,
+        retArray,
+        DATABASE_NAME);
+  }
+
+  @Test
+  public void testTopKRankingOrderByTimeLimit1() {
+    // rn <= 1: get exactly the oldest row per device
+    String[] expectedHeader = new String[] {"time", "device", "value", "rn"};
+    String[] retArray =
+        new String[] {
+          "2021-01-01T09:05:00.000Z,d1,3.0,1,",
+          "2021-01-01T09:08:00.000Z,d2,2.0,1,",
+        };
+    tableResultSetEqualTest(
+        "SELECT * FROM (SELECT *, row_number() OVER (PARTITION BY device ORDER 
BY time ASC) as rn FROM demo) WHERE rn <= 1 ORDER BY device",
+        expectedHeader,
+        retArray,
+        DATABASE_NAME);
+  }
+
+  @Test
+  public void testTopKRankingOrderByTimeMultiTag() {
+    // Multi-tag table: PARTITION BY region, plant (all tags) ORDER BY time
+    String[] expectedHeader = new String[] {"time", "region", "plant", "temp", 
"rn"};
+    String[] retArray =
+        new String[] {
+          "2021-01-01T08:00:00.000Z,east,A,10.0,1,",
+          "2021-01-01T09:00:00.000Z,east,A,20.0,2,",
+          "2021-01-01T08:30:00.000Z,east,B,30.0,1,",
+          "2021-01-01T09:30:00.000Z,east,B,35.0,2,",
+          "2021-01-01T07:00:00.000Z,west,C,50.0,1,",
+          "2021-01-01T08:00:00.000Z,west,C,55.0,2,",
+        };
+    tableResultSetEqualTest(
+        "SELECT * FROM (SELECT *, row_number() OVER (PARTITION BY region, 
plant ORDER BY time ASC) as rn FROM multi_tag) WHERE rn <= 2 ORDER BY region, 
plant, time",
+        expectedHeader,
+        retArray,
+        DATABASE_NAME);
+  }
+
+  @Test
+  public void testTopKRankingOrderByTimeMultiTagDesc() {
+    // Multi-tag table: ORDER BY time DESC returns newest rows per device
+    String[] expectedHeader = new String[] {"time", "region", "plant", "temp", 
"rn"};
+    String[] retArray =
+        new String[] {
+          "2021-01-01T10:00:00.000Z,east,A,15.0,2,",
+          "2021-01-01T11:00:00.000Z,east,A,25.0,1,",
+          "2021-01-01T09:30:00.000Z,east,B,35.0,2,",
+          "2021-01-01T10:30:00.000Z,east,B,32.0,1,",
+          "2021-01-01T08:00:00.000Z,west,C,55.0,2,",
+          "2021-01-01T09:00:00.000Z,west,C,52.0,1,",
+        };
+    tableResultSetEqualTest(
+        "SELECT * FROM (SELECT *, row_number() OVER (PARTITION BY region, 
plant ORDER BY time DESC) as rn FROM multi_tag) WHERE rn <= 2 ORDER BY region, 
plant, time",
+        expectedHeader,
+        retArray,
+        DATABASE_NAME);
+  }
+
+  @Test
+  public void testTopKRankingOrderByTimeLimitExceedsRows() {
+    // rn <= 10 but d2 only has 2 rows - should return all available rows
+    String[] expectedHeader = new String[] {"time", "device", "value", "rn"};
+    String[] retArray =
+        new String[] {
+          "2021-01-01T09:05:00.000Z,d1,3.0,1,",
+          "2021-01-01T09:07:00.000Z,d1,5.0,2,",
+          "2021-01-01T09:09:00.000Z,d1,3.0,3,",
+          "2021-01-01T09:10:00.000Z,d1,1.0,4,",
+          "2021-01-01T09:08:00.000Z,d2,2.0,1,",
+          "2021-01-01T09:15:00.000Z,d2,4.0,2,",
+        };
+    tableResultSetEqualTest(
+        "SELECT * FROM (SELECT *, row_number() OVER (PARTITION BY device ORDER 
BY time ASC) as rn FROM demo) WHERE rn <= 10 ORDER BY device, time",
+        expectedHeader,
+        retArray,
+        DATABASE_NAME);
+  }
 }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/TableOperatorGenerator.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/TableOperatorGenerator.java
index 1bc788251a7..cd23122e4b2 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/TableOperatorGenerator.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/TableOperatorGenerator.java
@@ -4323,6 +4323,35 @@ public class TableOperatorGenerator extends 
PlanVisitor<Operator, LocalExecution
     List<Integer> partitionChannels = 
getChannelsForSymbols(partitionBySymbols, childLayout);
     List<TSDataType> inputDataTypes =
         getOutputColumnTypes(node.getChild(), context.getTypeProvider());
+
+    ImmutableList.Builder<Integer> outputChannels = ImmutableList.builder();
+    for (int i = 0; i < inputDataTypes.size(); i++) {
+      outputChannels.add(i);
+    }
+
+    // compute the layout of the output from the window operator
+    ImmutableMap.Builder<Symbol, Integer> outputMappings = 
ImmutableMap.builder();
+    outputMappings.putAll(childLayout);
+
+    if (!node.isPartial() || !partitionChannels.isEmpty()) {
+      int channel = inputDataTypes.size();
+      outputMappings.put(node.getRankingSymbol(), channel);
+    }
+
+    if (node.isDataPreSortedAndLimited()) {
+      // Data is already limited to K rows per partition and sorted by time.
+      // Use streaming RowNumberOperator (O(n) time, O(partitions) memory)
+      // instead of heap-based TopKRankingOperator (O(n log K) time, O(n) 
memory).
+      return new RowNumberOperator(
+          operatorContext,
+          child,
+          inputDataTypes,
+          outputChannels.build(),
+          partitionChannels,
+          Optional.of(node.getMaxRankingPerPartition()),
+          10_000);
+    }
+
     List<TSDataType> partitionTypes =
         
partitionChannels.stream().map(inputDataTypes::get).collect(toImmutableList());
 
@@ -4341,21 +4370,6 @@ public class TableOperatorGenerator extends 
PlanVisitor<Operator, LocalExecution
               .collect(toImmutableList());
     }
 
-    ImmutableList.Builder<Integer> outputChannels = ImmutableList.builder();
-    for (int i = 0; i < inputDataTypes.size(); i++) {
-      outputChannels.add(i);
-    }
-
-    // compute the layout of the output from the window operator
-    ImmutableMap.Builder<Symbol, Integer> outputMappings = 
ImmutableMap.builder();
-    outputMappings.putAll(childLayout);
-
-    if (!node.isPartial() || !partitionChannels.isEmpty()) {
-      // ranking function goes in the last channel
-      int channel = inputDataTypes.size();
-      outputMappings.put(node.getRankingSymbol(), channel);
-    }
-
     return new TopKRankingOperator(
         operatorContext,
         child,
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/distribute/TableDistributedPlanGenerator.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/distribute/TableDistributedPlanGenerator.java
index 7072b5f519f..52dad04df42 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/distribute/TableDistributedPlanGenerator.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/distribute/TableDistributedPlanGenerator.java
@@ -442,6 +442,33 @@ public class TableDistributedPlanGenerator
     return false;
   }
 
+  private boolean tryPushTopKRankingLimitToScan(
+      TopKRankingNode topKRankingNode, List<PlanNode> children, OrderingScheme 
orderingScheme) {
+    List<Symbol> orderBy = orderingScheme.getOrderBy();
+    if (orderBy.size() != 1) {
+      return false;
+    }
+    Symbol orderSymbol = orderBy.get(0);
+    long limit = topKRankingNode.getMaxRankingPerPartition();
+    boolean pushed = false;
+
+    for (PlanNode child : children) {
+      if (child instanceof DeviceTableScanNode && !(child instanceof 
AggregationTableScanNode)) {
+        DeviceTableScanNode scanNode = (DeviceTableScanNode) child;
+        if (scanNode.isTimeColumn(orderSymbol)) {
+          scanNode.setPushLimitToEachDevice(true);
+          if (scanNode.getPushDownLimit() <= 0) {
+            scanNode.setPushDownLimit(limit);
+          } else {
+            scanNode.setPushDownLimit(Math.min(limit, 
scanNode.getPushDownLimit()));
+          }
+          pushed = true;
+        }
+      }
+    }
+    return pushed;
+  }
+
   @Override
   public List<PlanNode> visitGroup(GroupNode node, PlanContext context) {
     context.setExpectedOrderingScheme(node.getOrderingScheme());
@@ -1885,7 +1912,6 @@ public class TableDistributedPlanGenerator
       nodeOrderingMap.put(node.getPlanNodeId(), orderingScheme.get());
     }
 
-    // TODO: per partition topk eliminate
     checkArgument(
         node.getChildren().size() == 1, "Size of TopKRankingNode can only be 1 
in logical plan.");
     boolean canSplitPushDown = node.getChild() instanceof GroupNode;
@@ -1894,12 +1920,20 @@ public class TableDistributedPlanGenerator
     }
     List<PlanNode> childrenNodes = node.getChildren().get(0).accept(this, 
context);
     if (canSplitPushDown) {
+      // visitGroup may return GroupNode-wrapped children (sort not 
eliminated) or bare
+      // DeviceTableScanNode (sort eliminated). Unwrap GroupNode/SortNode when 
present.
       childrenNodes =
           childrenNodes.stream()
-              .map(child -> child.getChildren().get(0))
+              .map(child -> child instanceof SortNode ? 
child.getChildren().get(0) : child)
               .collect(Collectors.toList());
     }
 
+    if (canSplitPushDown && orderingScheme.isPresent()) {
+      if (tryPushTopKRankingLimitToScan(node, childrenNodes, 
orderingScheme.get())) {
+        node.setDataPreSortedAndLimited(true);
+      }
+    }
+
     if (childrenNodes.size() == 1) {
       node.setChild(childrenNodes.get(0));
       return Collections.singletonList(node);
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/node/TopKRankingNode.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/node/TopKRankingNode.java
index fedb7e1f2e1..fd41ab4ca70 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/node/TopKRankingNode.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/node/TopKRankingNode.java
@@ -50,6 +50,11 @@ public class TopKRankingNode extends SingleChildProcessNode {
   private final int maxRankingPerPartition;
   private final boolean partial;
 
+  // When true, the child scan already returns pre-sorted, pre-limited data 
(at most K rows per
+  // partition, ordered correctly). The operator can skip heap-based TopK 
selection and just assign
+  // sequential row numbers (streaming mode).
+  private boolean dataPreSortedAndLimited = false;
+
   public TopKRankingNode(
       PlanNodeId id,
       DataOrganizationSpecification specification,
@@ -119,6 +124,14 @@ public class TopKRankingNode extends 
SingleChildProcessNode {
     return rankingType;
   }
 
+  public boolean isDataPreSortedAndLimited() {
+    return dataPreSortedAndLimited;
+  }
+
+  public void setDataPreSortedAndLimited(boolean dataPreSortedAndLimited) {
+    this.dataPreSortedAndLimited = dataPreSortedAndLimited;
+  }
+
   @Override
   public List<String> getOutputColumnNames() {
     throw new UnsupportedOperationException();
@@ -132,6 +145,7 @@ public class TopKRankingNode extends SingleChildProcessNode 
{
     Symbol.serialize(rankingSymbol, byteBuffer);
     ReadWriteIOUtils.write(maxRankingPerPartition, byteBuffer);
     ReadWriteIOUtils.write(partial, byteBuffer);
+    ReadWriteIOUtils.write(dataPreSortedAndLimited, byteBuffer);
   }
 
   @Override
@@ -142,6 +156,7 @@ public class TopKRankingNode extends SingleChildProcessNode 
{
     Symbol.serialize(rankingSymbol, stream);
     ReadWriteIOUtils.write(maxRankingPerPartition, stream);
     ReadWriteIOUtils.write(partial, stream);
+    ReadWriteIOUtils.write(dataPreSortedAndLimited, stream);
   }
 
   public static TopKRankingNode deserialize(ByteBuffer byteBuffer) {
@@ -151,10 +166,14 @@ public class TopKRankingNode extends 
SingleChildProcessNode {
     Symbol rankingSymbol = Symbol.deserialize(byteBuffer);
     int maxRankingPerPartition = ReadWriteIOUtils.readInt(byteBuffer);
     boolean partial = ReadWriteIOUtils.readBoolean(byteBuffer);
+    boolean dataPreSortedAndLimited = ReadWriteIOUtils.readBoolean(byteBuffer);
 
     PlanNodeId planNodeId = PlanNodeId.deserialize(byteBuffer);
-    return new TopKRankingNode(
-        planNodeId, specification, rankingType, rankingSymbol, 
maxRankingPerPartition, partial);
+    TopKRankingNode node =
+        new TopKRankingNode(
+            planNodeId, specification, rankingType, rankingSymbol, 
maxRankingPerPartition, partial);
+    node.setDataPreSortedAndLimited(dataPreSortedAndLimited);
+    return node;
   }
 
   @Override
diff --git 
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/relational/planner/WindowFunctionOptimizationTest.java
 
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/relational/planner/WindowFunctionOptimizationTest.java
index e31f2f7e580..6618ad518c5 100644
--- 
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/relational/planner/WindowFunctionOptimizationTest.java
+++ 
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/relational/planner/WindowFunctionOptimizationTest.java
@@ -20,12 +20,19 @@
 package org.apache.iotdb.db.queryengine.plan.relational.planner;
 
 import org.apache.iotdb.db.queryengine.plan.planner.plan.LogicalQueryPlan;
+import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNode;
 import 
org.apache.iotdb.db.queryengine.plan.relational.planner.assertions.PlanMatchPattern;
+import 
org.apache.iotdb.db.queryengine.plan.relational.planner.node.DeviceTableScanNode;
+import 
org.apache.iotdb.db.queryengine.plan.relational.planner.node.TopKRankingNode;
 
 import com.google.common.collect.ImmutableList;
 import com.google.common.collect.ImmutableSet;
 import org.junit.Test;
 
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
+
 import static 
org.apache.iotdb.db.queryengine.plan.relational.planner.assertions.PlanAssert.assertPlan;
 import static 
org.apache.iotdb.db.queryengine.plan.relational.planner.assertions.PlanMatchPattern.collect;
 import static 
org.apache.iotdb.db.queryengine.plan.relational.planner.assertions.PlanMatchPattern.exchange;
@@ -297,4 +304,40 @@ public class WindowFunctionOptimizationTest {
     assertPlan(planTester.getFragmentPlan(1), rowNumber(tableScan));
     assertPlan(planTester.getFragmentPlan(2), rowNumber(tableScan));
   }
+
+  @Test
+  public void testTopKRankingOrderByTimeLimitPushDown() {
+    PlanTester planTester = new PlanTester();
+
+    String sql =
+        "SELECT * FROM (SELECT *, row_number() OVER (PARTITION BY tag1, tag2, 
tag3 ORDER BY time) as rn FROM table1) WHERE rn <= 2";
+    LogicalQueryPlan logicalQueryPlan = planTester.createPlan(sql);
+    PlanMatchPattern tableScan = tableScan("testdb.table1");
+
+    // Logical plan: OutputNode -> TopKRankingNode -> GroupNode -> 
TableScanNode
+    assertPlan(logicalQueryPlan, output(topKRanking(group(tableScan))));
+
+    // Distributed plan: TopKRankingNode pushed down to each partition with 
limit push-down.
+    // Fragment 0: OutputNode -> CollectNode -> ExchangeNodes
+    assertPlan(
+        planTester.getFragmentPlan(0), output(collect(exchange(), exchange(), 
exchange())));
+
+    // Worker fragments: TopKRankingNode -> DeviceTableScanNode
+    // Verify limit is pushed to DeviceTableScanNode and TopKRankingNode is 
marked for streaming.
+    for (int i = 1; i <= 2; i++) {
+      PlanNode fragmentRoot = planTester.getFragmentPlan(i);
+      assertTrue(
+          "Fragment " + i + " root should be TopKRankingNode",
+          fragmentRoot instanceof TopKRankingNode);
+      TopKRankingNode topKNode = (TopKRankingNode) fragmentRoot;
+
+      PlanNode scanChild = topKNode.getChild();
+      assertNotNull("TopKRankingNode should have a child", scanChild);
+      assertTrue(
+          "Child should be DeviceTableScanNode", scanChild instanceof 
DeviceTableScanNode);
+      DeviceTableScanNode dts = (DeviceTableScanNode) scanChild;
+      assertTrue("pushLimitToEachDevice should be true", 
dts.isPushLimitToEachDevice());
+      assertEquals("pushDownLimit should be 2", 2, dts.getPushDownLimit());
+    }
+  }
 }

Reply via email to