This is an automated email from the ASF dual-hosted git repository.

zhihao pushed a commit to branch perf/szh/window_func_limit_k_optimize
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit fc28ab6c65539f58d765d765511af771f81e80bc
Author: Sh-Zh-7 <[email protected]>
AuthorDate: Fri Feb 20 00:45:04 2026 +0800

    Limit K optimization for window function prototype.
---
 .../relational/it/db/it/IoTDBLimitKRankingIT.java  | 247 +++++++++++++
 .../process/window/LimitKRankingOperator.java      | 230 +++++++++++++
 .../plan/planner/TableOperatorGenerator.java       |  44 +++
 .../plan/planner/plan/node/PlanGraphPrinter.java   |  19 +
 .../plan/planner/plan/node/PlanNodeType.java       |   5 +
 .../plan/planner/plan/node/PlanVisitor.java        |   6 +
 .../distribute/TableDistributedPlanGenerator.java  |  31 ++
 .../iterative/rule/PushDownFilterIntoWindow.java   |  32 +-
 .../iterative/rule/PushDownLimitIntoWindow.java    |  38 +-
 .../relational/planner/iterative/rule/Util.java    |  38 ++
 .../relational/planner/node/LimitKRankingNode.java | 165 +++++++++
 .../planner/optimizations/SymbolMapper.java        |  10 +
 .../optimizations/UnaliasSymbolReferences.java     |  12 +
 .../process/window/LimitKRankingOperatorTest.java  | 383 +++++++++++++++++++++
 .../planner/WindowFunctionOptimizationTest.java    | 117 +++++++
 .../planner/assertions/PlanMatchPattern.java       |   5 +
 16 files changed, 1362 insertions(+), 20 deletions(-)

diff --git 
a/integration-test/src/test/java/org/apache/iotdb/relational/it/db/it/IoTDBLimitKRankingIT.java
 
b/integration-test/src/test/java/org/apache/iotdb/relational/it/db/it/IoTDBLimitKRankingIT.java
new file mode 100644
index 00000000000..59cfdedae6c
--- /dev/null
+++ 
b/integration-test/src/test/java/org/apache/iotdb/relational/it/db/it/IoTDBLimitKRankingIT.java
@@ -0,0 +1,247 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.relational.it.db.it;
+
+import org.apache.iotdb.it.env.EnvFactory;
+import org.apache.iotdb.it.framework.IoTDBTestRunner;
+import org.apache.iotdb.itbase.category.TableClusterIT;
+import org.apache.iotdb.itbase.category.TableLocalStandaloneIT;
+
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.junit.runner.RunWith;
+
+import java.sql.Connection;
+import java.sql.Statement;
+
+import static org.apache.iotdb.db.it.utils.TestUtils.tableResultSetEqualTest;
+import static org.junit.Assert.fail;
+
+/**
+ * Integration tests for the LimitKRanking optimization. When using {@code 
ROW_NUMBER() OVER
+ * (PARTITION BY device ORDER BY time)}, the planner should produce a 
streaming LimitKRankingNode
+ * instead of the buffered TopKRankingNode, since data is already sorted by 
(device, time).
+ */
+@RunWith(IoTDBTestRunner.class)
+@Category({TableLocalStandaloneIT.class, TableClusterIT.class})
+public class IoTDBLimitKRankingIT {
+  private static final String DATABASE_NAME = "test";
+  private static final String[] sqls =
+      new String[] {
+        "CREATE DATABASE " + DATABASE_NAME,
+        "USE " + DATABASE_NAME,
+        "create table demo (device string tag, value double field)",
+        // d1: 4 rows
+        "insert into demo values (2021-01-01T09:05:00, 'd1', 3)",
+        "insert into demo values (2021-01-01T09:07:00, 'd1', 5)",
+        "insert into demo values (2021-01-01T09:09:00, 'd1', 3)",
+        "insert into demo values (2021-01-01T09:10:00, 'd1', 1)",
+        // d2: 2 rows
+        "insert into demo values (2021-01-01T09:08:00, 'd2', 2)",
+        "insert into demo values (2021-01-01T09:15:00, 'd2', 4)",
+        // d3: 5 rows
+        "insert into demo values (2021-01-01T09:01:00, 'd3', 10)",
+        "insert into demo values (2021-01-01T09:02:00, 'd3', 20)",
+        "insert into demo values (2021-01-01T09:03:00, 'd3', 30)",
+        "insert into demo values (2021-01-01T09:04:00, 'd3', 40)",
+        "insert into demo values (2021-01-01T09:06:00, 'd3', 50)",
+        "FLUSH",
+        "CLEAR ATTRIBUTE CACHE",
+      };
+
+  protected static void insertData() {
+    try (Connection connection = EnvFactory.getEnv().getTableConnection();
+        Statement statement = connection.createStatement()) {
+      for (String sql : sqls) {
+        statement.execute(sql);
+      }
+    } catch (Exception e) {
+      fail("insertData failed.");
+    }
+  }
+
+  @BeforeClass
+  public static void setUp() {
+    EnvFactory.getEnv().getConfig().getCommonConfig().setSortBufferSize(1024 * 
1024);
+    EnvFactory.getEnv().initClusterEnvironment();
+    insertData();
+  }
+
+  @AfterClass
+  public static void tearDown() {
+    EnvFactory.getEnv().cleanClusterEnvironment();
+  }
+
+  @Test
+  public void testFilterPushDownOrderByTime() {
+    // ROW_NUMBER() OVER (PARTITION BY device ORDER BY time) WHERE rn <= 2
+    // Should use LimitKRankingOperator: take first 2 time-ordered rows per 
device
+    String[] expectedHeader = new String[] {"time", "device", "value", "rn"};
+    String[] retArray =
+        new String[] {
+          "2021-01-01T09:05:00.000Z,d1,3.0,1,",
+          "2021-01-01T09:07:00.000Z,d1,5.0,2,",
+          "2021-01-01T09:08:00.000Z,d2,2.0,1,",
+          "2021-01-01T09:15:00.000Z,d2,4.0,2,",
+          "2021-01-01T09:01:00.000Z,d3,10.0,1,",
+          "2021-01-01T09:02:00.000Z,d3,20.0,2,",
+        };
+    tableResultSetEqualTest(
+        "SELECT * FROM ("
+            + "SELECT *, row_number() OVER (PARTITION BY device ORDER BY time) 
as rn FROM demo"
+            + ") WHERE rn <= 2 ORDER BY device, time",
+        expectedHeader,
+        retArray,
+        DATABASE_NAME);
+  }
+
+  @Test
+  public void testFilterPushDownOrderByTimeK1() {
+    // K=1: only the earliest row per device
+    String[] expectedHeader = new String[] {"time", "device", "value", "rn"};
+    String[] retArray =
+        new String[] {
+          "2021-01-01T09:05:00.000Z,d1,3.0,1,",
+          "2021-01-01T09:08:00.000Z,d2,2.0,1,",
+          "2021-01-01T09:01:00.000Z,d3,10.0,1,",
+        };
+    tableResultSetEqualTest(
+        "SELECT * FROM ("
+            + "SELECT *, row_number() OVER (PARTITION BY device ORDER BY time) 
as rn FROM demo"
+            + ") WHERE rn <= 1 ORDER BY device, time",
+        expectedHeader,
+        retArray,
+        DATABASE_NAME);
+  }
+
+  @Test
+  public void testFilterPushDownOrderByTimeKLargerThanData() {
+    // K=100: larger than any partition, so all rows are returned
+    String[] expectedHeader = new String[] {"time", "device", "value", "rn"};
+    String[] retArray =
+        new String[] {
+          "2021-01-01T09:05:00.000Z,d1,3.0,1,",
+          "2021-01-01T09:07:00.000Z,d1,5.0,2,",
+          "2021-01-01T09:09:00.000Z,d1,3.0,3,",
+          "2021-01-01T09:10:00.000Z,d1,1.0,4,",
+          "2021-01-01T09:08:00.000Z,d2,2.0,1,",
+          "2021-01-01T09:15:00.000Z,d2,4.0,2,",
+          "2021-01-01T09:01:00.000Z,d3,10.0,1,",
+          "2021-01-01T09:02:00.000Z,d3,20.0,2,",
+          "2021-01-01T09:03:00.000Z,d3,30.0,3,",
+          "2021-01-01T09:04:00.000Z,d3,40.0,4,",
+          "2021-01-01T09:06:00.000Z,d3,50.0,5,",
+        };
+    tableResultSetEqualTest(
+        "SELECT * FROM ("
+            + "SELECT *, row_number() OVER (PARTITION BY device ORDER BY time) 
as rn FROM demo"
+            + ") WHERE rn <= 100 ORDER BY device, time",
+        expectedHeader,
+        retArray,
+        DATABASE_NAME);
+  }
+
+  @Test
+  public void testLimitPushDownOrderByTime() {
+    // LIMIT pushdown: row_number() OVER (PARTITION BY device ORDER BY time) 
LIMIT 4
+    String[] expectedHeader = new String[] {"time", "device", "value", "rn"};
+    String[] retArray =
+        new String[] {
+          "2021-01-01T09:05:00.000Z,d1,3.0,1,",
+          "2021-01-01T09:07:00.000Z,d1,5.0,2,",
+          "2021-01-01T09:09:00.000Z,d1,3.0,3,",
+          "2021-01-01T09:10:00.000Z,d1,1.0,4,",
+        };
+    tableResultSetEqualTest(
+        "SELECT * FROM ("
+            + "SELECT *, row_number() OVER (PARTITION BY device ORDER BY time) 
as rn FROM demo"
+            + ") ORDER BY device, time LIMIT 4",
+        expectedHeader,
+        retArray,
+        DATABASE_NAME);
+  }
+
+  @Test
+  public void testFilterPushDownOrderByTimeWithLessThan() {
+    // Use rn < 3 instead of rn <= 2 (should give same result as rn <= 2)
+    String[] expectedHeader = new String[] {"time", "device", "value", "rn"};
+    String[] retArray =
+        new String[] {
+          "2021-01-01T09:05:00.000Z,d1,3.0,1,",
+          "2021-01-01T09:07:00.000Z,d1,5.0,2,",
+          "2021-01-01T09:08:00.000Z,d2,2.0,1,",
+          "2021-01-01T09:15:00.000Z,d2,4.0,2,",
+          "2021-01-01T09:01:00.000Z,d3,10.0,1,",
+          "2021-01-01T09:02:00.000Z,d3,20.0,2,",
+        };
+    tableResultSetEqualTest(
+        "SELECT * FROM ("
+            + "SELECT *, row_number() OVER (PARTITION BY device ORDER BY time) 
as rn FROM demo"
+            + ") WHERE rn < 3 ORDER BY device, time",
+        expectedHeader,
+        retArray,
+        DATABASE_NAME);
+  }
+
+  @Test
+  public void testFilterPushDownOrderByTimeSelectSubsetColumns() {
+    // Only select time and device (not the ranking column)
+    String[] expectedHeader = new String[] {"time", "device", "value"};
+    String[] retArray =
+        new String[] {
+          "2021-01-01T09:05:00.000Z,d1,3.0,",
+          "2021-01-01T09:07:00.000Z,d1,5.0,",
+          "2021-01-01T09:09:00.000Z,d1,3.0,",
+          "2021-01-01T09:08:00.000Z,d2,2.0,",
+          "2021-01-01T09:15:00.000Z,d2,4.0,",
+          "2021-01-01T09:01:00.000Z,d3,10.0,",
+          "2021-01-01T09:02:00.000Z,d3,20.0,",
+          "2021-01-01T09:03:00.000Z,d3,30.0,",
+        };
+    tableResultSetEqualTest(
+        "SELECT time, device, value FROM ("
+            + "SELECT *, row_number() OVER (PARTITION BY device ORDER BY time) 
as rn FROM demo"
+            + ") WHERE rn <= 3 ORDER BY device, time",
+        expectedHeader,
+        retArray,
+        DATABASE_NAME);
+  }
+
+  @Test
+  public void testFilterPushDownOrderByTimeEqual() {
+    // rn = 2: only the second row per device
+    String[] expectedHeader = new String[] {"time", "device", "value", "rn"};
+    String[] retArray =
+        new String[] {
+          "2021-01-01T09:07:00.000Z,d1,5.0,2,",
+          "2021-01-01T09:15:00.000Z,d2,4.0,2,",
+          "2021-01-01T09:02:00.000Z,d3,20.0,2,",
+        };
+    tableResultSetEqualTest(
+        "SELECT * FROM ("
+            + "SELECT *, row_number() OVER (PARTITION BY device ORDER BY time) 
as rn FROM demo"
+            + ") WHERE rn = 2 ORDER BY device, time",
+        expectedHeader,
+        retArray,
+        DATABASE_NAME);
+  }
+}
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/process/window/LimitKRankingOperator.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/process/window/LimitKRankingOperator.java
new file mode 100644
index 00000000000..11f75ab9dde
--- /dev/null
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/process/window/LimitKRankingOperator.java
@@ -0,0 +1,230 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.queryengine.execution.operator.process.window;
+
+import org.apache.iotdb.db.queryengine.execution.MemoryEstimationHelper;
+import org.apache.iotdb.db.queryengine.execution.operator.Operator;
+import org.apache.iotdb.db.queryengine.execution.operator.OperatorContext;
+import 
org.apache.iotdb.db.queryengine.execution.operator.process.ProcessOperator;
+import 
org.apache.iotdb.db.queryengine.execution.operator.source.relational.aggregation.grouped.UpdateMemory;
+import 
org.apache.iotdb.db.queryengine.execution.operator.source.relational.aggregation.grouped.hash.GroupByHash;
+import 
org.apache.iotdb.db.queryengine.plan.relational.type.InternalTypeManager;
+
+import com.google.common.collect.ImmutableList;
+import com.google.common.util.concurrent.ListenableFuture;
+import org.apache.tsfile.block.column.Column;
+import org.apache.tsfile.block.column.ColumnBuilder;
+import org.apache.tsfile.common.conf.TSFileDescriptor;
+import org.apache.tsfile.enums.TSDataType;
+import org.apache.tsfile.read.common.block.TsBlock;
+import org.apache.tsfile.read.common.block.TsBlockBuilder;
+import org.apache.tsfile.read.common.block.column.RunLengthEncodedColumn;
+import org.apache.tsfile.read.common.type.Type;
+import org.apache.tsfile.utils.RamUsageEstimator;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+
+import static 
org.apache.iotdb.db.queryengine.execution.operator.source.relational.AbstractTableScanOperator.TIME_COLUMN_TEMPLATE;
+import static 
org.apache.iotdb.db.queryengine.execution.operator.source.relational.aggregation.grouped.hash.GroupByHash.createGroupByHash;
+
+/**
+ * Streaming operator optimized for {@code PARTITION BY device ORDER BY time} 
with a top-K filter.
+ *
+ * <p>Unlike {@link TopKRankingOperator} which buffers all input and uses 
heap-based selection, this
+ * operator exploits the fact that input data is already sorted by (device, 
time). It simply counts
+ * rows per partition and emits the first K rows for each, discarding the 
rest. This yields O(N)
+ * time with minimal memory overhead, compared to O(N log K) for the general 
TopK path.
+ */
+public class LimitKRankingOperator implements ProcessOperator {
+  private static final long INSTANCE_SIZE =
+      RamUsageEstimator.shallowSizeOfInstance(LimitKRankingOperator.class);
+
+  private final OperatorContext operatorContext;
+  private final Operator inputOperator;
+  private final List<TSDataType> inputDataTypes;
+
+  private final List<Integer> outputChannels;
+  private final List<Integer> partitionChannels;
+  private final int maxRowCountPerPartition;
+  private final boolean produceRowNumber;
+
+  private final Optional<GroupByHash> groupByHash;
+  private final Map<Integer, Long> partitionRowCounts;
+  private final TsBlockBuilder tsBlockBuilder;
+
+  public LimitKRankingOperator(
+      OperatorContext operatorContext,
+      Operator inputOperator,
+      List<TSDataType> inputDataTypes,
+      List<Integer> outputChannels,
+      List<Integer> partitionChannels,
+      List<TSDataType> partitionTSDataTypes,
+      int maxRowCountPerPartition,
+      boolean produceRowNumber,
+      int expectedPositions) {
+    this.operatorContext = operatorContext;
+    this.inputOperator = inputOperator;
+    this.inputDataTypes = inputDataTypes;
+    this.outputChannels = ImmutableList.copyOf(outputChannels);
+    this.partitionChannels = ImmutableList.copyOf(partitionChannels);
+    this.maxRowCountPerPartition = maxRowCountPerPartition;
+    this.produceRowNumber = produceRowNumber;
+
+    List<TSDataType> outputDataTypes = new ArrayList<>();
+    for (int channel : outputChannels) {
+      outputDataTypes.add(inputDataTypes.get(channel));
+    }
+    if (produceRowNumber) {
+      outputDataTypes.add(TSDataType.INT64);
+    }
+    this.tsBlockBuilder = new TsBlockBuilder(outputDataTypes);
+
+    if (partitionChannels.isEmpty()) {
+      this.groupByHash = Optional.empty();
+    } else {
+      List<Type> partitionTypes = new ArrayList<>(partitionTSDataTypes.size());
+      for (TSDataType tsDataType : partitionTSDataTypes) {
+        partitionTypes.add(InternalTypeManager.fromTSDataType(tsDataType));
+      }
+      this.groupByHash =
+          Optional.of(
+              createGroupByHash(partitionTypes, false, expectedPositions, 
UpdateMemory.NOOP));
+    }
+
+    this.partitionRowCounts = new HashMap<>(expectedPositions);
+  }
+
+  @Override
+  public OperatorContext getOperatorContext() {
+    return operatorContext;
+  }
+
+  @Override
+  public TsBlock next() throws Exception {
+    TsBlock input = inputOperator.nextWithTimer();
+    if (input == null) {
+      return null;
+    }
+
+    int positionCount = input.getPositionCount();
+    int[] partitionIds = getPartitionIds(input);
+
+    for (int position = 0; position < positionCount; position++) {
+      int partitionId = groupByHash.isPresent() ? partitionIds[position] : 0;
+      long rowCount = partitionRowCounts.getOrDefault(partitionId, 0L);
+
+      if (rowCount < maxRowCountPerPartition) {
+        emitRow(input, position, rowCount + 1);
+      }
+
+      partitionRowCounts.put(partitionId, rowCount + 1);
+    }
+
+    if (tsBlockBuilder.getPositionCount() == 0) {
+      tsBlockBuilder.reset();
+      return null;
+    }
+
+    TsBlock result =
+        tsBlockBuilder.build(
+            new RunLengthEncodedColumn(TIME_COLUMN_TEMPLATE, 
tsBlockBuilder.getPositionCount()));
+    tsBlockBuilder.reset();
+    return result;
+  }
+
+  private void emitRow(TsBlock tsBlock, int position, long rowNumber) {
+    for (int i = 0; i < outputChannels.size(); i++) {
+      Column column = tsBlock.getColumn(outputChannels.get(i));
+      ColumnBuilder columnBuilder = tsBlockBuilder.getColumnBuilder(i);
+      if (column.isNull(position)) {
+        columnBuilder.appendNull();
+      } else {
+        columnBuilder.write(column, position);
+      }
+    }
+
+    if (produceRowNumber) {
+      
tsBlockBuilder.getColumnBuilder(outputChannels.size()).writeLong(rowNumber);
+    }
+
+    tsBlockBuilder.declarePosition();
+  }
+
+  private int[] getPartitionIds(TsBlock tsBlock) {
+    if (groupByHash.isPresent()) {
+      Column[] partitionColumns = new Column[partitionChannels.size()];
+      for (int i = 0; i < partitionChannels.size(); i++) {
+        partitionColumns[i] = tsBlock.getColumn(partitionChannels.get(i));
+      }
+      return groupByHash.get().getGroupIds(partitionColumns);
+    }
+    return new int[0];
+  }
+
+  @Override
+  public boolean hasNext() throws Exception {
+    return inputOperator.hasNextWithTimer();
+  }
+
+  @Override
+  public void close() throws Exception {
+    inputOperator.close();
+  }
+
+  @Override
+  public boolean isFinished() throws Exception {
+    return !hasNext();
+  }
+
+  @Override
+  public ListenableFuture<?> isBlocked() {
+    return inputOperator.isBlocked();
+  }
+
+  @Override
+  public long calculateMaxPeekMemory() {
+    long maxPeekMemoryFromInput = 
inputOperator.calculateMaxPeekMemoryWithCounter();
+    long maxTsBlockSize = 
TSFileDescriptor.getInstance().getConfig().getMaxTsBlockSizeInBytes();
+    return Math.max(maxPeekMemoryFromInput, maxTsBlockSize)
+        + inputOperator.calculateRetainedSizeAfterCallingNext();
+  }
+
+  @Override
+  public long calculateMaxReturnSize() {
+    return 
TSFileDescriptor.getInstance().getConfig().getMaxTsBlockSizeInBytes();
+  }
+
+  @Override
+  public long calculateRetainedSizeAfterCallingNext() {
+    return inputOperator.calculateRetainedSizeAfterCallingNext();
+  }
+
+  @Override
+  public long ramBytesUsed() {
+    return INSTANCE_SIZE
+        + 
MemoryEstimationHelper.getEstimatedSizeOfAccountableObject(inputOperator)
+        + 
MemoryEstimationHelper.getEstimatedSizeOfAccountableObject(operatorContext)
+        + tsBlockBuilder.getRetainedSizeInBytes();
+  }
+}
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/TableOperatorGenerator.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/TableOperatorGenerator.java
index 1bc788251a7..cfcfb4a99f4 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/TableOperatorGenerator.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/TableOperatorGenerator.java
@@ -101,6 +101,7 @@ import 
org.apache.iotdb.db.queryengine.execution.operator.process.rowpattern.exp
 import 
org.apache.iotdb.db.queryengine.execution.operator.process.rowpattern.matcher.IrRowPatternToProgramRewriter;
 import 
org.apache.iotdb.db.queryengine.execution.operator.process.rowpattern.matcher.Matcher;
 import 
org.apache.iotdb.db.queryengine.execution.operator.process.rowpattern.matcher.Program;
+import 
org.apache.iotdb.db.queryengine.execution.operator.process.window.LimitKRankingOperator;
 import 
org.apache.iotdb.db.queryengine.execution.operator.process.window.RowNumberOperator;
 import 
org.apache.iotdb.db.queryengine.execution.operator.process.window.TableWindowOperator;
 import 
org.apache.iotdb.db.queryengine.execution.operator.process.window.TopKRankingOperator;
@@ -198,6 +199,7 @@ import 
org.apache.iotdb.db.queryengine.plan.relational.planner.node.GroupNode;
 import 
org.apache.iotdb.db.queryengine.plan.relational.planner.node.InformationSchemaTableScanNode;
 import org.apache.iotdb.db.queryengine.plan.relational.planner.node.IntoNode;
 import org.apache.iotdb.db.queryengine.plan.relational.planner.node.JoinNode;
+import 
org.apache.iotdb.db.queryengine.plan.relational.planner.node.LimitKRankingNode;
 import org.apache.iotdb.db.queryengine.plan.relational.planner.node.LimitNode;
 import 
org.apache.iotdb.db.queryengine.plan.relational.planner.node.LinearFillNode;
 import 
org.apache.iotdb.db.queryengine.plan.relational.planner.node.MarkDistinctNode;
@@ -4372,4 +4374,46 @@ public class TableOperatorGenerator extends 
PlanVisitor<Operator, LocalExecution
         1000,
         Optional.empty());
   }
+
+  @Override
+  public Operator visitLimitKRanking(LimitKRankingNode node, 
LocalExecutionPlanContext context) {
+    Operator child = node.getChild().accept(this, context);
+    OperatorContext operatorContext =
+        context
+            .getDriverContext()
+            .addOperatorContext(
+                context.getNextOperatorId(),
+                node.getPlanNodeId(),
+                LimitKRankingOperator.class.getSimpleName());
+
+    List<Symbol> partitionBySymbols = node.getSpecification().getPartitionBy();
+    Map<Symbol, Integer> childLayout =
+        makeLayoutFromOutputSymbols(node.getChild().getOutputSymbols());
+    List<Integer> partitionChannels = 
getChannelsForSymbols(partitionBySymbols, childLayout);
+    List<TSDataType> inputDataTypes =
+        getOutputColumnTypes(node.getChild(), context.getTypeProvider());
+    List<TSDataType> partitionTypes =
+        
partitionChannels.stream().map(inputDataTypes::get).collect(toImmutableList());
+
+    ImmutableList.Builder<Integer> outputChannels = ImmutableList.builder();
+    for (int i = 0; i < inputDataTypes.size(); i++) {
+      outputChannels.add(i);
+    }
+
+    ImmutableMap.Builder<Symbol, Integer> outputMappings = 
ImmutableMap.builder();
+    outputMappings.putAll(childLayout);
+    int channel = inputDataTypes.size();
+    outputMappings.put(node.getRankingSymbol(), channel);
+
+    return new LimitKRankingOperator(
+        operatorContext,
+        child,
+        inputDataTypes,
+        outputChannels.build(),
+        partitionChannels,
+        partitionTypes,
+        node.getMaxRowCountPerPartition(),
+        true,
+        1000);
+  }
 }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/PlanGraphPrinter.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/PlanGraphPrinter.java
index 180241b4192..ae24abe803d 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/PlanGraphPrinter.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/PlanGraphPrinter.java
@@ -79,6 +79,7 @@ import 
org.apache.iotdb.db.queryengine.plan.relational.planner.node.ExplainAnaly
 import 
org.apache.iotdb.db.queryengine.plan.relational.planner.node.GapFillNode;
 import org.apache.iotdb.db.queryengine.plan.relational.planner.node.GroupNode;
 import 
org.apache.iotdb.db.queryengine.plan.relational.planner.node.IntersectNode;
+import 
org.apache.iotdb.db.queryengine.plan.relational.planner.node.LimitKRankingNode;
 import 
org.apache.iotdb.db.queryengine.plan.relational.planner.node.LinearFillNode;
 import 
org.apache.iotdb.db.queryengine.plan.relational.planner.node.MarkDistinctNode;
 import 
org.apache.iotdb.db.queryengine.plan.relational.planner.node.PatternRecognitionNode;
@@ -1155,6 +1156,24 @@ public class PlanGraphPrinter extends 
PlanVisitor<List<String>, PlanGraphPrinter
     return render(node, boxValue, context);
   }
 
+  @Override
+  public List<String> visitLimitKRanking(LimitKRankingNode node, GraphContext 
context) {
+    List<String> boxValue = new ArrayList<>();
+
+    boxValue.add(String.format("LimitKRanking-%s", 
node.getPlanNodeId().getId()));
+    boxValue.add(String.format("RankingSymbol: %s", node.getRankingSymbol()));
+    boxValue.add(String.format("MaxRowCount: %d", 
node.getMaxRowCountPerPartition()));
+    DataOrganizationSpecification specification = node.getSpecification();
+    if (!specification.getPartitionBy().isEmpty()) {
+      boxValue.add("Partition by: [" + Joiner.on(", 
").join(specification.getPartitionBy()) + "]");
+    }
+    specification
+        .getOrderingScheme()
+        .ifPresent(orderingScheme -> boxValue.add("Order by: " + 
orderingScheme));
+
+    return render(node, boxValue, context);
+  }
+
   private String printRegion(TRegionReplicaSet regionReplicaSet) {
     return String.format(
         "Partition: %s",
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/PlanNodeType.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/PlanNodeType.java
index 55aaefe8be6..c9c118e6a3c 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/PlanNodeType.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/PlanNodeType.java
@@ -125,6 +125,7 @@ import 
org.apache.iotdb.db.queryengine.plan.relational.planner.node.GapFillNode;
 import org.apache.iotdb.db.queryengine.plan.relational.planner.node.GroupNode;
 import 
org.apache.iotdb.db.queryengine.plan.relational.planner.node.InformationSchemaTableScanNode;
 import 
org.apache.iotdb.db.queryengine.plan.relational.planner.node.IntersectNode;
+import 
org.apache.iotdb.db.queryengine.plan.relational.planner.node.LimitKRankingNode;
 import 
org.apache.iotdb.db.queryengine.plan.relational.planner.node.LinearFillNode;
 import 
org.apache.iotdb.db.queryengine.plan.relational.planner.node.MarkDistinctNode;
 import 
org.apache.iotdb.db.queryengine.plan.relational.planner.node.PatternRecognitionNode;
@@ -324,6 +325,8 @@ public enum PlanNodeType {
   TABLE_ROW_NUMBER_NODE((short) 1038),
   TABLE_VALUES_NODE((short) 1039),
 
+  TABLE_LIMITK_RANKING_NODE((short) 1040),
+
   RELATIONAL_INSERT_TABLET((short) 2000),
   RELATIONAL_INSERT_ROW((short) 2001),
   RELATIONAL_INSERT_ROWS((short) 2002),
@@ -727,6 +730,8 @@ public enum PlanNodeType {
         return RowNumberNode.deserialize(buffer);
       case 1039:
         return ValuesNode.deserialize(buffer);
+      case 1040:
+        return LimitKRankingNode.deserialize(buffer);
       case 2000:
         return RelationalInsertTabletNode.deserialize(buffer);
       case 2001:
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/PlanVisitor.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/PlanVisitor.java
index 44f1cd8bc1f..d1a9958dea5 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/PlanVisitor.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/PlanVisitor.java
@@ -795,6 +795,12 @@ public abstract class PlanVisitor<R, C> {
     return visitSingleChildProcess(node, context);
   }
 
+  public R visitLimitKRanking(
+      
org.apache.iotdb.db.queryengine.plan.relational.planner.node.LimitKRankingNode 
node,
+      C context) {
+    return visitSingleChildProcess(node, context);
+  }
+
   public R visitRowNumber(RowNumberNode node, C context) {
     return visitSingleChildProcess(node, context);
   }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/distribute/TableDistributedPlanGenerator.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/distribute/TableDistributedPlanGenerator.java
index 7072b5f519f..9defd43755b 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/distribute/TableDistributedPlanGenerator.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/distribute/TableDistributedPlanGenerator.java
@@ -69,6 +69,7 @@ import 
org.apache.iotdb.db.queryengine.plan.relational.planner.node.GroupNode;
 import 
org.apache.iotdb.db.queryengine.plan.relational.planner.node.InformationSchemaTableScanNode;
 import org.apache.iotdb.db.queryengine.plan.relational.planner.node.IntoNode;
 import org.apache.iotdb.db.queryengine.plan.relational.planner.node.JoinNode;
+import 
org.apache.iotdb.db.queryengine.plan.relational.planner.node.LimitKRankingNode;
 import org.apache.iotdb.db.queryengine.plan.relational.planner.node.LimitNode;
 import 
org.apache.iotdb.db.queryengine.plan.relational.planner.node.MarkDistinctNode;
 import 
org.apache.iotdb.db.queryengine.plan.relational.planner.node.MergeSortNode;
@@ -1914,6 +1915,36 @@ public class TableDistributedPlanGenerator
     }
   }
 
+  @Override
+  public List<PlanNode> visitLimitKRanking(LimitKRankingNode node, PlanContext 
context) {
+    Optional<OrderingScheme> orderingScheme = 
node.getSpecification().getOrderingScheme();
+    if (orderingScheme.isPresent()) {
+      context.setExpectedOrderingScheme(orderingScheme.get());
+      nodeOrderingMap.put(node.getPlanNodeId(), orderingScheme.get());
+    }
+
+    checkArgument(
+        node.getChildren().size() == 1, "Size of LimitKRankingNode can only be 
1 in logical plan.");
+    boolean canSplitPushDown = node.getChild() instanceof GroupNode;
+    if (!canSplitPushDown) {
+      node.setChild(((SortNode) node.getChild()).getChild());
+    }
+    List<PlanNode> childrenNodes = node.getChildren().get(0).accept(this, 
context);
+
+    if (childrenNodes.size() == 1) {
+      node.setChild(childrenNodes.get(0));
+      return Collections.singletonList(node);
+    } else if (!canSplitPushDown) {
+      CollectNode collectNode =
+          new CollectNode(queryId.genPlanNodeId(), 
node.getChildren().get(0).getOutputSymbols());
+      childrenNodes.forEach(collectNode::addChild);
+      node.setChild(collectNode);
+      return Collections.singletonList(node);
+    } else {
+      return splitForEachChild(node, childrenNodes);
+    }
+  }
+
   @Override
   public List<PlanNode> visitUnion(UnionNode node, PlanContext context) {
     context.clearExpectedOrderingScheme();
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/iterative/rule/PushDownFilterIntoWindow.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/iterative/rule/PushDownFilterIntoWindow.java
index d7ecab9cabf..c1c5e7734ce 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/iterative/rule/PushDownFilterIntoWindow.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/iterative/rule/PushDownFilterIntoWindow.java
@@ -19,10 +19,12 @@
 
 package org.apache.iotdb.db.queryengine.plan.relational.planner.iterative.rule;
 
+import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNode;
 import org.apache.iotdb.db.queryengine.plan.relational.planner.PlannerContext;
 import org.apache.iotdb.db.queryengine.plan.relational.planner.Symbol;
 import org.apache.iotdb.db.queryengine.plan.relational.planner.iterative.Rule;
 import org.apache.iotdb.db.queryengine.plan.relational.planner.node.FilterNode;
+import 
org.apache.iotdb.db.queryengine.plan.relational.planner.node.LimitKRankingNode;
 import 
org.apache.iotdb.db.queryengine.plan.relational.planner.node.TopKRankingNode;
 import org.apache.iotdb.db.queryengine.plan.relational.planner.node.ValuesNode;
 import org.apache.iotdb.db.queryengine.plan.relational.planner.node.WindowNode;
@@ -42,6 +44,7 @@ import java.util.OptionalInt;
 import static com.google.common.collect.Iterables.getOnlyElement;
 import static java.lang.Math.toIntExact;
 import static java.util.Objects.requireNonNull;
+import static 
org.apache.iotdb.db.queryengine.plan.relational.planner.iterative.rule.Util.canUseLimitKRanking;
 import static 
org.apache.iotdb.db.queryengine.plan.relational.planner.iterative.rule.Util.toTopNRankingType;
 import static 
org.apache.iotdb.db.queryengine.plan.relational.planner.node.Patterns.filter;
 import static 
org.apache.iotdb.db.queryengine.plan.relational.planner.node.Patterns.source;
@@ -90,15 +93,26 @@ public class PushDownFilterIntoWindow implements 
Rule<FilterNode> {
           new ValuesNode(node.getPlanNodeId(), node.getOutputSymbols(), 
ImmutableList.of()));
     }
 
-    TopKRankingNode newSource =
-        new TopKRankingNode(
-            windowNode.getPlanNodeId(),
-            windowNode.getChild(),
-            windowNode.getSpecification(),
-            rankingType.get(),
-            rankingSymbol,
-            upperBound.getAsInt(),
-            false);
+    PlanNode newSource;
+    if (canUseLimitKRanking(windowNode, rankingType.get(), 
context.getSymbolAllocator())) {
+      newSource =
+          new LimitKRankingNode(
+              windowNode.getPlanNodeId(),
+              windowNode.getChild(),
+              windowNode.getSpecification(),
+              rankingSymbol,
+              upperBound.getAsInt());
+    } else {
+      newSource =
+          new TopKRankingNode(
+              windowNode.getPlanNodeId(),
+              windowNode.getChild(),
+              windowNode.getSpecification(),
+              rankingType.get(),
+              rankingSymbol,
+              upperBound.getAsInt(),
+              false);
+    }
 
     if (needToKeepFilter(node.getPredicate(), rankingSymbol, 
upperBound.getAsInt())) {
       return Result.ofPlanNode(
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/iterative/rule/PushDownLimitIntoWindow.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/iterative/rule/PushDownLimitIntoWindow.java
index e3ce8619c5b..b8f2f415d0a 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/iterative/rule/PushDownLimitIntoWindow.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/iterative/rule/PushDownLimitIntoWindow.java
@@ -20,7 +20,9 @@
 package org.apache.iotdb.db.queryengine.plan.relational.planner.iterative.rule;
 
 import org.apache.iotdb.db.queryengine.common.SessionInfo;
+import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNode;
 import org.apache.iotdb.db.queryengine.plan.relational.planner.iterative.Rule;
+import 
org.apache.iotdb.db.queryengine.plan.relational.planner.node.LimitKRankingNode;
 import org.apache.iotdb.db.queryengine.plan.relational.planner.node.LimitNode;
 import 
org.apache.iotdb.db.queryengine.plan.relational.planner.node.TopKRankingNode;
 import org.apache.iotdb.db.queryengine.plan.relational.planner.node.WindowNode;
@@ -34,6 +36,7 @@ import java.util.Optional;
 
 import static com.google.common.collect.Iterables.getOnlyElement;
 import static java.lang.Math.toIntExact;
+import static 
org.apache.iotdb.db.queryengine.plan.relational.planner.iterative.rule.Util.canUseLimitKRanking;
 import static 
org.apache.iotdb.db.queryengine.plan.relational.planner.iterative.rule.Util.toTopNRankingType;
 import static 
org.apache.iotdb.db.queryengine.plan.relational.planner.node.ChildReplacer.replaceChildren;
 import static 
org.apache.iotdb.db.queryengine.plan.relational.planner.node.Patterns.limit;
@@ -83,18 +86,31 @@ public class PushDownLimitIntoWindow implements 
Rule<LimitNode> {
     Optional<TopKRankingNode.RankingType> rankingType = 
toTopNRankingType(source);
 
     int limit = toIntExact(node.getCount());
-    TopKRankingNode topNRowNumberNode =
-        new TopKRankingNode(
-            source.getPlanNodeId(),
-            source.getChild(),
-            source.getSpecification(),
-            rankingType.get(),
-            getOnlyElement(source.getWindowFunctions().keySet()),
-            limit,
-            false);
+
+    PlanNode rankingNode;
+    if (canUseLimitKRanking(source, rankingType.get(), 
context.getSymbolAllocator())) {
+      rankingNode =
+          new LimitKRankingNode(
+              source.getPlanNodeId(),
+              source.getChild(),
+              source.getSpecification(),
+              getOnlyElement(source.getWindowFunctions().keySet()),
+              limit);
+    } else {
+      rankingNode =
+          new TopKRankingNode(
+              source.getPlanNodeId(),
+              source.getChild(),
+              source.getSpecification(),
+              rankingType.get(),
+              getOnlyElement(source.getWindowFunctions().keySet()),
+              limit,
+              false);
+    }
+
     if (rankingType.get() == ROW_NUMBER && 
source.getSpecification().getPartitionBy().isEmpty()) {
-      return Result.ofPlanNode(topNRowNumberNode);
+      return Result.ofPlanNode(rankingNode);
     }
-    return Result.ofPlanNode(replaceChildren(node, 
ImmutableList.of(topNRowNumberNode)));
+    return Result.ofPlanNode(replaceChildren(node, 
ImmutableList.of(rankingNode)));
   }
 }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/iterative/rule/Util.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/iterative/rule/Util.java
index 5845ffd4221..b6a4d6aef72 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/iterative/rule/Util.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/iterative/rule/Util.java
@@ -22,7 +22,10 @@ import org.apache.iotdb.db.queryengine.common.QueryId;
 import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNode;
 import org.apache.iotdb.db.queryengine.plan.relational.function.BoundSignature;
 import org.apache.iotdb.db.queryengine.plan.relational.planner.Assignments;
+import org.apache.iotdb.db.queryengine.plan.relational.planner.OrderingScheme;
+import org.apache.iotdb.db.queryengine.plan.relational.planner.SortOrder;
 import org.apache.iotdb.db.queryengine.plan.relational.planner.Symbol;
+import org.apache.iotdb.db.queryengine.plan.relational.planner.SymbolAllocator;
 import 
org.apache.iotdb.db.queryengine.plan.relational.planner.SymbolsExtractor;
 import 
org.apache.iotdb.db.queryengine.plan.relational.planner.node.ProjectNode;
 import 
org.apache.iotdb.db.queryengine.plan.relational.planner.node.TopKRankingNode;
@@ -32,6 +35,8 @@ import 
org.apache.iotdb.db.queryengine.plan.relational.sql.ast.Expression;
 import com.google.common.collect.ImmutableList;
 import com.google.common.collect.ImmutableSet;
 import com.google.common.collect.Sets;
+import org.apache.tsfile.read.common.type.TimestampType;
+import org.apache.tsfile.read.common.type.Type;
 
 import java.util.Collection;
 import java.util.List;
@@ -142,4 +147,37 @@ final class Util {
     }
     return Optional.empty();
   }
+
+  /**
+   * Returns true when the window is ROW_NUMBER with ORDER BY on a single 
ascending TIMESTAMP
+   * column. In IoTDB, data is naturally sorted by (device, time), so we can 
use the streaming
+   * LimitKRankingNode instead of the buffered TopKRankingNode.
+   */
+  public static boolean canUseLimitKRanking(
+      WindowNode windowNode,
+      TopKRankingNode.RankingType rankingType,
+      SymbolAllocator symbolAllocator) {
+    if (rankingType != ROW_NUMBER) {
+      return false;
+    }
+
+    Optional<OrderingScheme> orderingScheme = 
windowNode.getSpecification().getOrderingScheme();
+    if (!orderingScheme.isPresent()) {
+      return false;
+    }
+
+    List<Symbol> orderBy = orderingScheme.get().getOrderBy();
+    if (orderBy.size() != 1) {
+      return false;
+    }
+
+    Symbol orderSymbol = orderBy.get(0);
+    SortOrder sortOrder = orderingScheme.get().getOrdering(orderSymbol);
+    if (!sortOrder.isAscending()) {
+      return false;
+    }
+
+    Type type = symbolAllocator.getTypes().getTableModelType(orderSymbol);
+    return type instanceof TimestampType;
+  }
 }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/node/LimitKRankingNode.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/node/LimitKRankingNode.java
new file mode 100644
index 00000000000..a78c9f9bbda
--- /dev/null
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/node/LimitKRankingNode.java
@@ -0,0 +1,165 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.queryengine.plan.relational.planner.node;
+
+import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNode;
+import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNodeId;
+import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNodeType;
+import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanVisitor;
+import 
org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.SingleChildProcessNode;
+import 
org.apache.iotdb.db.queryengine.plan.relational.planner.DataOrganizationSpecification;
+import org.apache.iotdb.db.queryengine.plan.relational.planner.Symbol;
+
+import com.google.common.base.Objects;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.Iterables;
+import org.apache.tsfile.utils.ReadWriteIOUtils;
+
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.List;
+
+/**
+ * Plan node for the streaming per-partition limit optimization. Used when the 
input data is already
+ * sorted by (partition_keys, time), so we can take the first K rows per 
partition without sorting.
+ */
+public class LimitKRankingNode extends SingleChildProcessNode {
+
+  private final DataOrganizationSpecification specification;
+  private final Symbol rankingSymbol;
+  private final int maxRowCountPerPartition;
+
+  public LimitKRankingNode(
+      PlanNodeId id,
+      DataOrganizationSpecification specification,
+      Symbol rankingSymbol,
+      int maxRowCountPerPartition) {
+    super(id);
+    this.specification = specification;
+    this.rankingSymbol = rankingSymbol;
+    this.maxRowCountPerPartition = maxRowCountPerPartition;
+  }
+
+  public LimitKRankingNode(
+      PlanNodeId id,
+      PlanNode child,
+      DataOrganizationSpecification specification,
+      Symbol rankingSymbol,
+      int maxRowCountPerPartition) {
+    super(id, child);
+    this.specification = specification;
+    this.rankingSymbol = rankingSymbol;
+    this.maxRowCountPerPartition = maxRowCountPerPartition;
+  }
+
+  @Override
+  public PlanNode clone() {
+    return new LimitKRankingNode(
+        getPlanNodeId(), specification, rankingSymbol, 
maxRowCountPerPartition);
+  }
+
+  @Override
+  public <R, C> R accept(PlanVisitor<R, C> visitor, C context) {
+    return visitor.visitLimitKRanking(this, context);
+  }
+
+  public DataOrganizationSpecification getSpecification() {
+    return specification;
+  }
+
+  public Symbol getRankingSymbol() {
+    return rankingSymbol;
+  }
+
+  public int getMaxRowCountPerPartition() {
+    return maxRowCountPerPartition;
+  }
+
+  @Override
+  public List<String> getOutputColumnNames() {
+    throw new UnsupportedOperationException();
+  }
+
+  @Override
+  public List<Symbol> getOutputSymbols() {
+    return ImmutableList.<Symbol>builder()
+        .addAll(getChild().getOutputSymbols())
+        .add(rankingSymbol)
+        .build();
+  }
+
+  @Override
+  public PlanNode replaceChildren(List<PlanNode> newChildren) {
+    return new LimitKRankingNode(
+        id,
+        Iterables.getOnlyElement(newChildren),
+        specification,
+        rankingSymbol,
+        maxRowCountPerPartition);
+  }
+
+  @Override
+  protected void serializeAttributes(ByteBuffer byteBuffer) {
+    PlanNodeType.TABLE_LIMITK_RANKING_NODE.serialize(byteBuffer);
+    specification.serialize(byteBuffer);
+    Symbol.serialize(rankingSymbol, byteBuffer);
+    ReadWriteIOUtils.write(maxRowCountPerPartition, byteBuffer);
+  }
+
+  @Override
+  protected void serializeAttributes(DataOutputStream stream) throws 
IOException {
+    PlanNodeType.TABLE_LIMITK_RANKING_NODE.serialize(stream);
+    specification.serialize(stream);
+    Symbol.serialize(rankingSymbol, stream);
+    ReadWriteIOUtils.write(maxRowCountPerPartition, stream);
+  }
+
+  public static LimitKRankingNode deserialize(ByteBuffer byteBuffer) {
+    DataOrganizationSpecification specification =
+        DataOrganizationSpecification.deserialize(byteBuffer);
+    Symbol rankingSymbol = Symbol.deserialize(byteBuffer);
+    int maxRowCountPerPartition = ReadWriteIOUtils.readInt(byteBuffer);
+    PlanNodeId planNodeId = PlanNodeId.deserialize(byteBuffer);
+    return new LimitKRankingNode(planNodeId, specification, rankingSymbol, 
maxRowCountPerPartition);
+  }
+
+  @Override
+  public boolean equals(Object o) {
+    if (this == o) return true;
+    if (o == null || getClass() != o.getClass()) return false;
+    if (!super.equals(o)) return false;
+    LimitKRankingNode that = (LimitKRankingNode) o;
+    return Objects.equal(specification, that.specification)
+        && Objects.equal(rankingSymbol, that.rankingSymbol)
+        && Objects.equal(maxRowCountPerPartition, 
that.maxRowCountPerPartition);
+  }
+
+  @Override
+  public int hashCode() {
+    return Objects.hashCode(
+        super.hashCode(), specification, rankingSymbol, 
maxRowCountPerPartition);
+  }
+
+  @Override
+  public String toString() {
+    return "LimitKRankingNode-" + this.getPlanNodeId();
+  }
+}
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/optimizations/SymbolMapper.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/optimizations/SymbolMapper.java
index 002e59f124c..e03140fb42a 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/optimizations/SymbolMapper.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/optimizations/SymbolMapper.java
@@ -30,6 +30,7 @@ import 
org.apache.iotdb.db.queryengine.plan.relational.planner.ir.ExpressionRewr
 import 
org.apache.iotdb.db.queryengine.plan.relational.planner.ir.ExpressionTreeRewriter;
 import 
org.apache.iotdb.db.queryengine.plan.relational.planner.node.AggregationNode;
 import org.apache.iotdb.db.queryengine.plan.relational.planner.node.ApplyNode;
+import 
org.apache.iotdb.db.queryengine.plan.relational.planner.node.LimitKRankingNode;
 import org.apache.iotdb.db.queryengine.plan.relational.planner.node.LimitNode;
 import org.apache.iotdb.db.queryengine.plan.relational.planner.node.Measure;
 import 
org.apache.iotdb.db.queryengine.plan.relational.planner.node.PatternRecognitionNode;
@@ -300,6 +301,15 @@ public class SymbolMapper {
         node.isPartial());
   }
 
+  public LimitKRankingNode map(LimitKRankingNode node, PlanNode source) {
+    return new LimitKRankingNode(
+        node.getPlanNodeId(),
+        source,
+        mapAndDistinct(node.getSpecification()),
+        map(node.getRankingSymbol()),
+        node.getMaxRowCountPerPartition());
+  }
+
   public RowNumberNode map(RowNumberNode node, PlanNode source) {
     return new RowNumberNode(
         node.getPlanNodeId(),
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/optimizations/UnaliasSymbolReferences.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/optimizations/UnaliasSymbolReferences.java
index b7f2e9eaaa8..540e20d0436 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/optimizations/UnaliasSymbolReferences.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/optimizations/UnaliasSymbolReferences.java
@@ -46,6 +46,7 @@ import 
org.apache.iotdb.db.queryengine.plan.relational.planner.node.InformationS
 import 
org.apache.iotdb.db.queryengine.plan.relational.planner.node.IntersectNode;
 import org.apache.iotdb.db.queryengine.plan.relational.planner.node.IntoNode;
 import org.apache.iotdb.db.queryengine.plan.relational.planner.node.JoinNode;
+import 
org.apache.iotdb.db.queryengine.plan.relational.planner.node.LimitKRankingNode;
 import org.apache.iotdb.db.queryengine.plan.relational.planner.node.LimitNode;
 import 
org.apache.iotdb.db.queryengine.plan.relational.planner.node.LinearFillNode;
 import 
org.apache.iotdb.db.queryengine.plan.relational.planner.node.MarkDistinctNode;
@@ -650,6 +651,17 @@ public class UnaliasSymbolReferences implements 
PlanOptimizer {
       return new PlanAndMappings(rewrittenTopKRanking, mapping);
     }
 
+    @Override
+    public PlanAndMappings visitLimitKRanking(LimitKRankingNode node, 
UnaliasContext context) {
+      PlanAndMappings rewrittenSource = node.getChild().accept(this, context);
+      Map<Symbol, Symbol> mapping = new 
HashMap<>(rewrittenSource.getMappings());
+      SymbolMapper mapper = symbolMapper(mapping);
+
+      LimitKRankingNode rewrittenNode = mapper.map(node, 
rewrittenSource.getRoot());
+
+      return new PlanAndMappings(rewrittenNode, mapping);
+    }
+
     @Override
     public PlanAndMappings visitOutput(OutputNode node, UnaliasContext 
context) {
       PlanAndMappings rewrittenSource = node.getChild().accept(this, context);
diff --git 
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/operator/process/window/LimitKRankingOperatorTest.java
 
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/operator/process/window/LimitKRankingOperatorTest.java
new file mode 100644
index 00000000000..9acae751378
--- /dev/null
+++ 
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/operator/process/window/LimitKRankingOperatorTest.java
@@ -0,0 +1,383 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.queryengine.execution.operator.process.window;
+
+import org.apache.iotdb.commons.concurrent.IoTDBThreadPoolFactory;
+import org.apache.iotdb.db.queryengine.common.FragmentInstanceId;
+import org.apache.iotdb.db.queryengine.common.PlanFragmentId;
+import org.apache.iotdb.db.queryengine.common.QueryId;
+import org.apache.iotdb.db.queryengine.execution.driver.DriverContext;
+import 
org.apache.iotdb.db.queryengine.execution.fragment.FragmentInstanceContext;
+import 
org.apache.iotdb.db.queryengine.execution.fragment.FragmentInstanceStateMachine;
+import org.apache.iotdb.db.queryengine.execution.operator.Operator;
+import org.apache.iotdb.db.queryengine.execution.operator.OperatorContext;
+import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNodeId;
+
+import com.google.common.util.concurrent.ListenableFuture;
+import org.apache.tsfile.common.conf.TSFileConfig;
+import org.apache.tsfile.enums.TSDataType;
+import org.apache.tsfile.read.common.block.TsBlock;
+import org.apache.tsfile.read.common.block.TsBlockBuilder;
+import org.apache.tsfile.read.common.block.column.RunLengthEncodedColumn;
+import org.apache.tsfile.utils.Binary;
+import org.junit.Test;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.concurrent.ExecutorService;
+
+import static 
org.apache.iotdb.db.queryengine.execution.fragment.FragmentInstanceContext.createFragmentInstanceContext;
+import static 
org.apache.iotdb.db.queryengine.execution.operator.source.relational.TableScanOperator.TIME_COLUMN_TEMPLATE;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.fail;
+
+public class LimitKRankingOperatorTest {
+  private static final ExecutorService instanceNotificationExecutor =
+      IoTDBThreadPoolFactory.newFixedThreadPool(1, 
"limitKRanking-test-instance-notification");
+
+  private static final List<TSDataType> INPUT_DATA_TYPES =
+      Arrays.asList(TSDataType.TIMESTAMP, TSDataType.TEXT, TSDataType.INT32);
+
+  @Test
+  public void testSingleBlockMultiPartition() {
+    // Input: d1 has 3 rows, d2 has 3 rows; K=2 → keep first 2 per partition
+    long[][] timeArray = {{1, 2, 3, 4, 5, 6}};
+    String[][] deviceArray = {{"d1", "d1", "d1", "d2", "d2", "d2"}};
+    int[][] valueArray = {{10, 20, 30, 40, 50, 60}};
+
+    long[] expectTime = {1, 2, 4, 5};
+    String[] expectDevice = {"d1", "d1", "d2", "d2"};
+    int[] expectValue = {10, 20, 40, 50};
+
+    verifyOperatorOutput(
+        timeArray, deviceArray, valueArray, 2, false, expectTime, 
expectDevice, expectValue, null);
+  }
+
+  @Test
+  public void testPartitionCrossMultiBlocks() {
+    // d1 spans block0 and block1; d2 spans block1 and block2; K=3
+    long[][] timeArray = {{1, 2}, {3, 4, 5}, {6, 7, 8}};
+    String[][] deviceArray = {{"d1", "d1"}, {"d1", "d1", "d2"}, {"d2", "d2", 
"d2"}};
+    int[][] valueArray = {{10, 20}, {30, 40, 50}, {60, 70, 80}};
+
+    long[] expectTime = {1, 2, 3, 5, 6, 7};
+    String[] expectDevice = {"d1", "d1", "d1", "d2", "d2", "d2"};
+    int[] expectValue = {10, 20, 30, 50, 60, 70};
+
+    verifyOperatorOutput(
+        timeArray, deviceArray, valueArray, 3, false, expectTime, 
expectDevice, expectValue, null);
+  }
+
+  @Test
+  public void testWithRowNumber() {
+    long[][] timeArray = {{1, 2, 3, 4}};
+    String[][] deviceArray = {{"d1", "d1", "d2", "d2"}};
+    int[][] valueArray = {{10, 20, 30, 40}};
+
+    long[] expectTime = {1, 2, 3, 4};
+    String[] expectDevice = {"d1", "d1", "d2", "d2"};
+    int[] expectValue = {10, 20, 30, 40};
+    long[] expectRowNumber = {1, 2, 1, 2};
+
+    verifyOperatorOutput(
+        timeArray,
+        deviceArray,
+        valueArray,
+        2,
+        true,
+        expectTime,
+        expectDevice,
+        expectValue,
+        expectRowNumber);
+  }
+
+  @Test
+  public void testKLargerThanData() {
+    // K=10, but only 2 rows per partition → all rows emitted
+    long[][] timeArray = {{1, 2, 3, 4}};
+    String[][] deviceArray = {{"d1", "d1", "d2", "d2"}};
+    int[][] valueArray = {{10, 20, 30, 40}};
+
+    long[] expectTime = {1, 2, 3, 4};
+    String[] expectDevice = {"d1", "d1", "d2", "d2"};
+    int[] expectValue = {10, 20, 30, 40};
+
+    verifyOperatorOutput(
+        timeArray, deviceArray, valueArray, 10, false, expectTime, 
expectDevice, expectValue, null);
+  }
+
+  @Test
+  public void testEntireBlockFiltered() {
+    // K=2, d1 gets 2 rows in block0 → block1 (all d1) should be fully filtered
+    long[][] timeArray = {{1, 2}, {3, 4}};
+    String[][] deviceArray = {{"d1", "d1"}, {"d1", "d1"}};
+    int[][] valueArray = {{10, 20}, {30, 40}};
+
+    long[] expectTime = {1, 2};
+    String[] expectDevice = {"d1", "d1"};
+    int[] expectValue = {10, 20};
+
+    verifyOperatorOutput(
+        timeArray, deviceArray, valueArray, 2, false, expectTime, 
expectDevice, expectValue, null);
+  }
+
+  @Test
+  public void testNoPartition() {
+    // No partition columns → global limit K=3
+    long[][] timeArray = {{1, 2, 3, 4, 5}};
+    String[][] deviceArray = {{"d1", "d1", "d2", "d2", "d2"}};
+    int[][] valueArray = {{10, 20, 30, 40, 50}};
+
+    long[] expectTime = {1, 2, 3};
+    String[] expectDevice = {"d1", "d1", "d2"};
+    int[] expectValue = {10, 20, 30};
+
+    verifyOperatorOutputNoPartition(
+        timeArray, deviceArray, valueArray, 3, false, expectTime, 
expectDevice, expectValue);
+  }
+
+  @Test
+  public void testPartialFilterInBlock() {
+    // Block has mixed: some rows pass, some are filtered
+    // d1: 4 rows, K=2 → only first 2 of d1 pass; d2: 2 rows, K=2 → both pass
+    long[][] timeArray = {{1, 2, 3, 4, 5, 6}};
+    String[][] deviceArray = {{"d1", "d1", "d1", "d1", "d2", "d2"}};
+    int[][] valueArray = {{10, 20, 30, 40, 50, 60}};
+
+    long[] expectTime = {1, 2, 5, 6};
+    String[] expectDevice = {"d1", "d1", "d2", "d2"};
+    int[] expectValue = {10, 20, 50, 60};
+
+    verifyOperatorOutput(
+        timeArray, deviceArray, valueArray, 2, false, expectTime, 
expectDevice, expectValue, null);
+  }
+
+  @Test
+  public void testNullInputBlock() {
+    // ChildOperator can return null TsBlocks
+    long[][] timeArray = {{1, 2}, null, {3, 4}};
+    String[][] deviceArray = {{"d1", "d1"}, null, {"d2", "d2"}};
+    int[][] valueArray = {{10, 20}, null, {30, 40}};
+
+    long[] expectTime = {1, 2, 3, 4};
+    String[] expectDevice = {"d1", "d1", "d2", "d2"};
+    int[] expectValue = {10, 20, 30, 40};
+
+    verifyOperatorOutput(
+        timeArray, deviceArray, valueArray, 5, false, expectTime, 
expectDevice, expectValue, null);
+  }
+
+  // ======================== Helper Methods ========================
+
+  private void verifyOperatorOutput(
+      long[][] timeArray,
+      String[][] deviceArray,
+      int[][] valueArray,
+      int k,
+      boolean produceRowNumber,
+      long[] expectTime,
+      String[] expectDevice,
+      int[] expectValue,
+      long[] expectRowNumber) {
+    int count = 0;
+    try (LimitKRankingOperator operator =
+        createOperator(timeArray, deviceArray, valueArray, k, 
produceRowNumber, true)) {
+      ListenableFuture<?> listenableFuture = operator.isBlocked();
+      listenableFuture.get();
+      while (!operator.isFinished() && operator.hasNext()) {
+        TsBlock tsBlock = operator.next();
+        if (tsBlock != null && !tsBlock.isEmpty()) {
+          for (int i = 0, size = tsBlock.getPositionCount(); i < size; i++, 
count++) {
+            assertEquals(expectTime[count], tsBlock.getColumn(0).getLong(i));
+            assertEquals(
+                expectDevice[count],
+                
tsBlock.getColumn(1).getBinary(i).getStringValue(TSFileConfig.STRING_CHARSET));
+            assertEquals(expectValue[count], tsBlock.getColumn(2).getInt(i));
+            if (produceRowNumber && expectRowNumber != null) {
+              assertEquals(expectRowNumber[count], 
tsBlock.getColumn(3).getLong(i));
+            }
+          }
+        }
+      }
+    } catch (Exception e) {
+      e.printStackTrace();
+      fail(e.getMessage());
+    }
+    assertEquals(expectTime.length, count);
+  }
+
+  private void verifyOperatorOutputNoPartition(
+      long[][] timeArray,
+      String[][] deviceArray,
+      int[][] valueArray,
+      int k,
+      boolean produceRowNumber,
+      long[] expectTime,
+      String[] expectDevice,
+      int[] expectValue) {
+    int count = 0;
+    try (LimitKRankingOperator operator =
+        createOperator(timeArray, deviceArray, valueArray, k, 
produceRowNumber, false)) {
+      ListenableFuture<?> listenableFuture = operator.isBlocked();
+      listenableFuture.get();
+      while (!operator.isFinished() && operator.hasNext()) {
+        TsBlock tsBlock = operator.next();
+        if (tsBlock != null && !tsBlock.isEmpty()) {
+          for (int i = 0, size = tsBlock.getPositionCount(); i < size; i++, 
count++) {
+            assertEquals(expectTime[count], tsBlock.getColumn(0).getLong(i));
+            assertEquals(
+                expectDevice[count],
+                
tsBlock.getColumn(1).getBinary(i).getStringValue(TSFileConfig.STRING_CHARSET));
+            assertEquals(expectValue[count], tsBlock.getColumn(2).getInt(i));
+          }
+        }
+      }
+    } catch (Exception e) {
+      e.printStackTrace();
+      fail(e.getMessage());
+    }
+    assertEquals(expectTime.length, count);
+  }
+
+  private LimitKRankingOperator createOperator(
+      long[][] timeArray,
+      String[][] deviceArray,
+      int[][] valueArray,
+      int k,
+      boolean produceRowNumber,
+      boolean hasPartition) {
+    QueryId queryId = new QueryId("stub_query");
+    FragmentInstanceId instanceId =
+        new FragmentInstanceId(new PlanFragmentId(queryId, 0), 
"stub-instance");
+    FragmentInstanceStateMachine stateMachine =
+        new FragmentInstanceStateMachine(instanceId, 
instanceNotificationExecutor);
+    FragmentInstanceContext fragmentInstanceContext =
+        createFragmentInstanceContext(instanceId, stateMachine);
+    DriverContext driverContext = new DriverContext(fragmentInstanceContext, 
0);
+    PlanNodeId planNodeId = new PlanNodeId("1");
+    driverContext.addOperatorContext(1, planNodeId, 
LimitKRankingOperator.class.getSimpleName());
+
+    Operator childOperator = new ChildOperator(timeArray, deviceArray, 
valueArray, driverContext);
+
+    List<Integer> outputChannels = Arrays.asList(0, 1, 2);
+    List<Integer> partitionChannels;
+    List<TSDataType> partitionTSDataTypes;
+    if (hasPartition) {
+      partitionChannels = Collections.singletonList(1);
+      partitionTSDataTypes = Collections.singletonList(TSDataType.TEXT);
+    } else {
+      partitionChannels = Collections.emptyList();
+      partitionTSDataTypes = Collections.emptyList();
+    }
+
+    return new LimitKRankingOperator(
+        driverContext.getOperatorContexts().get(0),
+        childOperator,
+        INPUT_DATA_TYPES,
+        outputChannels,
+        partitionChannels,
+        partitionTSDataTypes,
+        k,
+        produceRowNumber,
+        16);
+  }
+
+  static class ChildOperator implements Operator {
+    private int index;
+    private final long[][] timeArray;
+    private final String[][] deviceArray;
+    private final int[][] valueArray;
+    private final DriverContext driverContext;
+
+    ChildOperator(
+        long[][] timeArray,
+        String[][] deviceArray,
+        int[][] valueArray,
+        DriverContext driverContext) {
+      this.timeArray = timeArray;
+      this.deviceArray = deviceArray;
+      this.valueArray = valueArray;
+      this.driverContext = driverContext;
+      this.index = 0;
+    }
+
+    @Override
+    public OperatorContext getOperatorContext() {
+      return driverContext.getOperatorContexts().get(0);
+    }
+
+    @Override
+    public TsBlock next() {
+      if (timeArray[index] == null) {
+        index++;
+        return null;
+      }
+      TsBlockBuilder builder =
+          new TsBlockBuilder(
+              timeArray[index].length,
+              Arrays.asList(TSDataType.TIMESTAMP, TSDataType.TEXT, 
TSDataType.INT32));
+      for (int i = 0, size = timeArray[index].length; i < size; i++) {
+        builder.getColumnBuilder(0).writeLong(timeArray[index][i]);
+        builder
+            .getColumnBuilder(1)
+            .writeBinary(new Binary(deviceArray[index][i], 
TSFileConfig.STRING_CHARSET));
+        builder.getColumnBuilder(2).writeInt(valueArray[index][i]);
+      }
+      builder.declarePositions(timeArray[index].length);
+      index++;
+      return builder.build(
+          new RunLengthEncodedColumn(TIME_COLUMN_TEMPLATE, 
builder.getPositionCount()));
+    }
+
+    @Override
+    public boolean hasNext() {
+      return index < timeArray.length;
+    }
+
+    @Override
+    public boolean isFinished() {
+      return index >= timeArray.length;
+    }
+
+    @Override
+    public void close() {}
+
+    @Override
+    public long calculateMaxPeekMemory() {
+      return 0;
+    }
+
+    @Override
+    public long calculateMaxReturnSize() {
+      return 0;
+    }
+
+    @Override
+    public long calculateRetainedSizeAfterCallingNext() {
+      return 0;
+    }
+
+    @Override
+    public long ramBytesUsed() {
+      return 0;
+    }
+  }
+}
diff --git 
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/relational/planner/WindowFunctionOptimizationTest.java
 
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/relational/planner/WindowFunctionOptimizationTest.java
index e31f2f7e580..66b2437e225 100644
--- 
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/relational/planner/WindowFunctionOptimizationTest.java
+++ 
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/relational/planner/WindowFunctionOptimizationTest.java
@@ -31,6 +31,8 @@ import static 
org.apache.iotdb.db.queryengine.plan.relational.planner.assertions
 import static 
org.apache.iotdb.db.queryengine.plan.relational.planner.assertions.PlanMatchPattern.exchange;
 import static 
org.apache.iotdb.db.queryengine.plan.relational.planner.assertions.PlanMatchPattern.group;
 import static 
org.apache.iotdb.db.queryengine.plan.relational.planner.assertions.PlanMatchPattern.limit;
+import static 
org.apache.iotdb.db.queryengine.plan.relational.planner.assertions.PlanMatchPattern.limitKRanking;
+import static 
org.apache.iotdb.db.queryengine.plan.relational.planner.assertions.PlanMatchPattern.mergeSort;
 import static 
org.apache.iotdb.db.queryengine.plan.relational.planner.assertions.PlanMatchPattern.output;
 import static 
org.apache.iotdb.db.queryengine.plan.relational.planner.assertions.PlanMatchPattern.project;
 import static 
org.apache.iotdb.db.queryengine.plan.relational.planner.assertions.PlanMatchPattern.rowNumber;
@@ -296,5 +298,120 @@ public class WindowFunctionOptimizationTest {
     assertPlan(planTester.getFragmentPlan(0), output(collect(exchange(), 
exchange(), exchange())));
     assertPlan(planTester.getFragmentPlan(1), rowNumber(tableScan));
     assertPlan(planTester.getFragmentPlan(2), rowNumber(tableScan));
+    assertPlan(planTester.getFragmentPlan(3), rowNumber(tableScan));
+  }
+
+  @Test
+  public void testLimitKRankingPushDownFilterOrderByTime() {
+    PlanTester planTester = new PlanTester();
+
+    // ORDER BY time triggers LimitKRankingNode instead of TopKRankingNode
+    String sql =
+        "SELECT * FROM (SELECT *, row_number() OVER (PARTITION BY tag1 ORDER 
BY time) as rn FROM table1) WHERE rn <= 3";
+    LogicalQueryPlan logicalQueryPlan = planTester.createPlan(sql);
+    PlanMatchPattern tableScan = tableScan("testdb.table1");
+
+    /*
+     *   └──OutputNode
+     *         └──LimitKRankingNode
+     *              └──SortNode
+     *                   └──TableScanNode
+     */
+    assertPlan(logicalQueryPlan, output(limitKRanking(sort(tableScan))));
+
+    /*  OutputNode
+     *     └──LimitKRankingNode
+     *         └──CollectNode
+     *               ├──ExchangeNode
+     *               │        └──TableScan
+     *               ├──ExchangeNode
+     *               │        └──TableScan
+     *               └──ExchangeNode
+     *                        └──TableScan
+     */
+    assertPlan(
+        planTester.getFragmentPlan(0),
+        output(limitKRanking(collect(exchange(), exchange(), exchange()))));
+    assertPlan(planTester.getFragmentPlan(1), tableScan);
+    assertPlan(planTester.getFragmentPlan(2), tableScan);
+    assertPlan(planTester.getFragmentPlan(3), tableScan);
+  }
+
+  @Test
+  public void testLimitKRankingPushDownFilterOrderByTimeWithAllTags() {
+    PlanTester planTester = new PlanTester();
+
+    // All tag columns in PARTITION BY + ORDER BY time → LimitKRankingNode 
with GroupNode push down
+    String sql =
+        "SELECT * FROM (SELECT *, row_number() OVER (PARTITION BY tag1, tag2, 
tag3 ORDER BY time) as rn FROM table1) WHERE rn <= 2";
+    LogicalQueryPlan logicalQueryPlan = planTester.createPlan(sql);
+    PlanMatchPattern tableScan = tableScan("testdb.table1");
+
+    /*
+     *   └──OutputNode
+     *        └──LimitKRankingNode
+     *              └──GroupNode
+     *                   └──TableScanNode
+     */
+    assertPlan(logicalQueryPlan, output(limitKRanking(group(tableScan))));
+
+    /*  OutputNode
+     *         └──CollectNode
+     *               ├──ExchangeNode
+     *               │    └──LimitKRankingNode
+     *               │        └──TableScan
+     *               ├──ExchangeNode
+     *               │    └──LimitKRankingNode
+     *               │        └──TableScan
+     *               └──ExchangeNode
+     *                    └──LimitKRankingNode
+     *                        └──SortNode
+     *                            └──TableScan
+     */
+    assertPlan(
+        planTester.getFragmentPlan(0), output((collect(exchange(), exchange(), 
exchange()))));
+    assertPlan(planTester.getFragmentPlan(1), limitKRanking(tableScan));
+    assertPlan(planTester.getFragmentPlan(2), limitKRanking(tableScan));
+    assertPlan(planTester.getFragmentPlan(3), 
limitKRanking(mergeSort(exchange(), exchange())));
+    assertPlan(planTester.getFragmentPlan(4), tableScan);
+    assertPlan(planTester.getFragmentPlan(5), tableScan);
+  }
+
+  @Test
+  public void testLimitKRankingPushDownLimitOrderByTime() {
+    PlanTester planTester = new PlanTester();
+
+    // LIMIT pushdown + ORDER BY time → LimitKRankingNode
+    String sql =
+        "SELECT * FROM (SELECT *, row_number() OVER (PARTITION BY tag1 ORDER 
BY time) as rn FROM table1) LIMIT 5";
+    LogicalQueryPlan logicalQueryPlan = planTester.createPlan(sql);
+    PlanMatchPattern tableScan = tableScan("testdb.table1");
+
+    /*
+     *   └──OutputNode
+     *        └──LimitNode
+     *            └──LimitKRankingNode
+     *                └──SortNode
+     *                    └──TableScanNode
+     */
+    assertPlan(logicalQueryPlan, output(limit(5, 
limitKRanking(sort(tableScan)))));
+
+    /*  OutputNode
+     *    └──LimitNode
+     *      └──LimitKRankingNode
+     *          └──CollectNode
+     *               ├──ExchangeNode
+     *               │        └──TableScan
+     *               ├──ExchangeNode
+     *               │        └──TableScan
+     *               └──ExchangeNode
+     *                        └──TableScan
+     */
+    assertPlan(
+        planTester.getFragmentPlan(0),
+        output(limit(5, limitKRanking(collect(exchange(), exchange(), 
exchange())))));
+    assertPlan(planTester.getFragmentPlan(1), tableScan);
+    assertPlan(planTester.getFragmentPlan(2), tableScan);
+    assertPlan(planTester.getFragmentPlan(3), tableScan);
   }
 }
diff --git 
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/relational/planner/assertions/PlanMatchPattern.java
 
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/relational/planner/assertions/PlanMatchPattern.java
index 03f79fd2ec2..f2b73d74459 100644
--- 
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/relational/planner/assertions/PlanMatchPattern.java
+++ 
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/relational/planner/assertions/PlanMatchPattern.java
@@ -40,6 +40,7 @@ import 
org.apache.iotdb.db.queryengine.plan.relational.planner.node.FilterNode;
 import org.apache.iotdb.db.queryengine.plan.relational.planner.node.GroupNode;
 import 
org.apache.iotdb.db.queryengine.plan.relational.planner.node.InformationSchemaTableScanNode;
 import org.apache.iotdb.db.queryengine.plan.relational.planner.node.JoinNode;
+import 
org.apache.iotdb.db.queryengine.plan.relational.planner.node.LimitKRankingNode;
 import org.apache.iotdb.db.queryengine.plan.relational.planner.node.LimitNode;
 import 
org.apache.iotdb.db.queryengine.plan.relational.planner.node.MarkDistinctNode;
 import 
org.apache.iotdb.db.queryengine.plan.relational.planner.node.MergeSortNode;
@@ -477,6 +478,10 @@ public final class PlanMatchPattern {
     return node(TopKRankingNode.class, source);
   }
 
+  public static PlanMatchPattern limitKRanking(PlanMatchPattern source) {
+    return node(LimitKRankingNode.class, source);
+  }
+
   public static PlanMatchPattern rowNumber(PlanMatchPattern source) {
     return node(RowNumberNode.class, source);
   }

Reply via email to