This is an automated email from the ASF dual-hosted git repository.

zhihao pushed a commit to branch feat/szh/support_dense_in_window_optim
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit 2b32a38eeadedac9adadeb3ea1ffa95ad0a68558
Author: Sh-Zh-7 <[email protected]>
AuthorDate: Sat Feb 21 14:25:45 2026 +0800

    Support rank function in TopK optimization.
---
 .../it/db/it/IoTDBWindowFunction3IT.java           |  80 ++-
 .../operator/GroupedTopNRankAccumulator.java       | 735 +++++++++++++++++++++
 .../execution/operator/GroupedTopNRankBuilder.java | 219 ++++++
 .../operator/RowIdComparisonHashStrategy.java      |   8 +
 .../SimpleTsBlockWithPositionEqualsAndHash.java    | 129 ++++
 .../execution/operator/TopNPeerGroupLookup.java    | 382 +++++++++++
 .../operator/TsBlockWithPositionEqualsAndHash.java |  28 +
 .../process/window/TopKRankingOperator.java        |  28 +-
 .../planner/optimizations/SortElimination.java     |  22 +
 .../optimizations/TransformSortToStreamSort.java   |  14 +
 .../process/window/TopKRankingOperatorTest.java    | 696 +++++++++++++++++++
 .../planner/WindowFunctionOptimizationTest.java    |   4 +-
 12 files changed, 2332 insertions(+), 13 deletions(-)

diff --git 
a/integration-test/src/test/java/org/apache/iotdb/relational/it/db/it/IoTDBWindowFunction3IT.java
 
b/integration-test/src/test/java/org/apache/iotdb/relational/it/db/it/IoTDBWindowFunction3IT.java
index d461f3a11fe..52abba36293 100644
--- 
a/integration-test/src/test/java/org/apache/iotdb/relational/it/db/it/IoTDBWindowFunction3IT.java
+++ 
b/integration-test/src/test/java/org/apache/iotdb/relational/it/db/it/IoTDBWindowFunction3IT.java
@@ -121,8 +121,8 @@ public class IoTDBWindowFunction3IT {
     String[] expectedHeader = new String[] {"time", "device", "value", "rn"};
     String[] retArray =
         new String[] {
-          "2021-01-01T09:10:00.000Z,d1,1.0,1,",
           "2021-01-01T09:05:00.000Z,d1,3.0,2,",
+          "2021-01-01T09:10:00.000Z,d1,1.0,1,",
           "2021-01-01T09:08:00.000Z,d2,2.0,1,",
           "2021-01-01T09:15:00.000Z,d2,4.0,2,",
         };
@@ -166,6 +166,84 @@ public class IoTDBWindowFunction3IT {
         DATABASE_NAME);
   }
 
+  @Test
+  public void testPushDownFilterIntoWindowWithRank() {
+    // Data: d1 values {3,5,3,1}, d2 values {2,4}
+    // rank(PARTITION BY device ORDER BY value):
+    //   d1: 1.0→rank=1, 3.0→rank=2, 3.0→rank=2, 5.0→rank=4
+    //   d2: 2.0→rank=1, 4.0→rank=2
+    // WHERE rk <= 2: keeps d1 rows with rank≤2 (3 rows due to tie), d2 all (2 
rows)
+    String[] expectedHeader = new String[] {"time", "device", "value", "rk"};
+    String[] retArray =
+        new String[] {
+          "2021-01-01T09:05:00.000Z,d1,3.0,2,",
+          "2021-01-01T09:09:00.000Z,d1,3.0,2,",
+          "2021-01-01T09:10:00.000Z,d1,1.0,1,",
+          "2021-01-01T09:08:00.000Z,d2,2.0,1,",
+          "2021-01-01T09:15:00.000Z,d2,4.0,2,",
+        };
+    tableResultSetEqualTest(
+        "SELECT * FROM (SELECT *, rank() OVER (PARTITION BY device ORDER BY 
value) as rk FROM demo) WHERE rk <= 2 ORDER BY device, time",
+        expectedHeader,
+        retArray,
+        DATABASE_NAME);
+  }
+
+  @Test
+  public void testPushDownLimitIntoWindowWithRank() {
+    // TopKRanking(RANK, topN=2) keeps rank≤2 per partition, then LIMIT 2 on 
final result
+    // d1 rank≤2: 1.0(r=1), 3.0(r=2), 3.0(r=2) → 3 rows sorted by time: 
09:05,09:09,09:10
+    // d2 rank≤2: 2.0(r=1), 4.0(r=2) → 2 rows
+    // ORDER BY device, time LIMIT 2 → first 2 from d1
+    String[] expectedHeader = new String[] {"time", "device", "value", "rk"};
+    String[] retArray =
+        new String[] {
+          "2021-01-01T09:05:00.000Z,d1,3.0,2,", 
"2021-01-01T09:07:00.000Z,d1,5.0,4,",
+        };
+    tableResultSetEqualTest(
+        "SELECT * FROM (SELECT *, rank() OVER (PARTITION BY device ORDER BY 
value) as rk FROM demo) ORDER BY device, time LIMIT 2",
+        expectedHeader,
+        retArray,
+        DATABASE_NAME);
+  }
+
+  @Test
+  public void testRankBasic() {
+    // Verifies rank computation: ties get the same rank, gaps after ties
+    String[] expectedHeader = new String[] {"time", "device", "value", "rk"};
+    String[] retArray =
+        new String[] {
+          "2021-01-01T09:05:00.000Z,d1,3.0,2,",
+          "2021-01-01T09:07:00.000Z,d1,5.0,4,",
+          "2021-01-01T09:09:00.000Z,d1,3.0,2,",
+          "2021-01-01T09:10:00.000Z,d1,1.0,1,",
+          "2021-01-01T09:08:00.000Z,d2,2.0,1,",
+          "2021-01-01T09:15:00.000Z,d2,4.0,2,",
+        };
+    tableResultSetEqualTest(
+        "SELECT *, rank() OVER (PARTITION BY device ORDER BY value) as rk FROM 
demo ORDER BY device, time",
+        expectedHeader,
+        retArray,
+        DATABASE_NAME);
+  }
+
+  @Test
+  public void testRankWithFilterEquals() {
+    // WHERE rk = 2 keeps only rows with rank exactly 2 (both d1 rows with 
value=3)
+    String[] expectedHeader = new String[] {"time", "device", "value", "rk"};
+    String[] retArray =
+        new String[] {
+          "2021-01-01T09:05:00.000Z,d1,3.0,2,",
+          "2021-01-01T09:09:00.000Z,d1,3.0,2,",
+          "2021-01-01T09:15:00.000Z,d2,4.0,2,",
+        };
+    tableResultSetEqualTest(
+        "SELECT * FROM (SELECT *, rank() OVER (PARTITION BY device ORDER BY 
value) as rk FROM demo) WHERE rk = 2 ORDER BY device, time",
+        expectedHeader,
+        retArray,
+        DATABASE_NAME);
+  }
+
   @Test
   public void testRemoveRedundantWindow() {
     String[] expectedHeader = new String[] {"time", "device", "value", "rn"};
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/GroupedTopNRankAccumulator.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/GroupedTopNRankAccumulator.java
new file mode 100644
index 00000000000..a0830657eff
--- /dev/null
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/GroupedTopNRankAccumulator.java
@@ -0,0 +1,735 @@
+package org.apache.iotdb.db.queryengine.execution.operator;
+
+import 
org.apache.iotdb.db.queryengine.execution.operator.source.relational.aggregation.grouped.array.LongBigArray;
+import 
org.apache.iotdb.db.queryengine.execution.operator.source.relational.aggregation.grouped.array.LongBigArrayFIFOQueue;
+import org.apache.iotdb.db.utils.HeapTraversal;
+
+import org.apache.tsfile.read.common.block.TsBlock;
+import org.apache.tsfile.utils.RamUsageEstimator;
+
+import java.util.function.LongConsumer;
+
+import static com.google.common.base.Preconditions.checkArgument;
+import static com.google.common.base.Preconditions.checkState;
+import static com.google.common.base.Verify.verify;
+import static java.util.Objects.requireNonNull;
+
+public class GroupedTopNRankAccumulator {
+  private static final long INSTANCE_SIZE =
+      
RamUsageEstimator.shallowSizeOfInstance(GroupedTopNRankAccumulator.class);
+  private static final long UNKNOWN_INDEX = -1;
+  private static final long NULL_GROUP_ID = -1;
+
+  private final GroupIdToHeapBuffer groupIdToHeapBuffer = new 
GroupIdToHeapBuffer();
+  private final HeapNodeBuffer heapNodeBuffer = new HeapNodeBuffer();
+  private final PeerGroupBuffer peerGroupBuffer = new PeerGroupBuffer();
+  private final HeapTraversal heapTraversal = new HeapTraversal();
+
+  // Map from (Group ID, Row Value) to Heap Node Index where the value is 
stored
+  private final TopNPeerGroupLookup peerGroupLookup;
+
+  private final RowIdComparisonHashStrategy strategy;
+  private final int topN;
+  private final LongConsumer rowIdEvictionListener;
+
+  public GroupedTopNRankAccumulator(
+      RowIdComparisonHashStrategy strategy, int topN, LongConsumer 
rowIdEvictionListener) {
+    this.strategy = requireNonNull(strategy, "strategy is null");
+    this.peerGroupLookup = new TopNPeerGroupLookup(10_000, strategy, 
NULL_GROUP_ID, UNKNOWN_INDEX);
+    checkArgument(topN > 0, "topN must be greater than zero");
+    this.topN = topN;
+    this.rowIdEvictionListener =
+        requireNonNull(rowIdEvictionListener, "rowIdEvictionListener is null");
+  }
+
+  public long sizeOf() {
+    return INSTANCE_SIZE
+        + groupIdToHeapBuffer.sizeOf()
+        + heapNodeBuffer.sizeOf()
+        + peerGroupBuffer.sizeOf()
+        + heapTraversal.sizeOf()
+        + peerGroupLookup.sizeOf();
+  }
+
+  public int findFirstPositionToAdd(
+      TsBlock newPage,
+      int groupCount,
+      int[] groupIds,
+      TsBlockWithPositionComparator comparator,
+      RowReferenceTsBlockManager pageManager) {
+    int currentGroups = groupIdToHeapBuffer.getTotalGroups();
+    groupIdToHeapBuffer.allocateGroupIfNeeded(groupCount);
+
+    for (int position = 0; position < newPage.getPositionCount(); position++) {
+      int groupId = groupIds[position];
+      if (groupId >= currentGroups || 
groupIdToHeapBuffer.getHeapValueCount(groupId) < topN) {
+        return position;
+      }
+      long heapRootNodeIndex = 
groupIdToHeapBuffer.getHeapRootNodeIndex(groupId);
+      if (heapRootNodeIndex == UNKNOWN_INDEX) {
+        return position;
+      }
+      long rightPageRowId = peekRootRowIdByHeapNodeIndex(heapRootNodeIndex);
+      TsBlock rightPage = pageManager.getTsBlock(rightPageRowId);
+      int rightPosition = pageManager.getPosition(rightPageRowId);
+      // If the current position is equal to or less than the current heap 
root index, then we may
+      // need to insert it
+      if (comparator.compareTo(newPage, position, rightPage, rightPosition) <= 
0) {
+        return position;
+      }
+    }
+    return -1;
+  }
+
+  /**
+   * Add the specified row to this accumulator.
+   *
+   * <p>This may trigger row eviction callbacks if other rows have to be 
evicted to make space.
+   *
+   * @return true if this row was incorporated, false otherwise
+   */
+  public boolean add(int groupId, RowReference rowReference) {
+    // Insert to any existing peer groups first (heap nodes contain distinct 
values)
+    long peerHeapNodeIndex = peerGroupLookup.get(groupId, rowReference);
+    if (peerHeapNodeIndex != UNKNOWN_INDEX) {
+      directPeerGroupInsert(groupId, peerHeapNodeIndex, 
rowReference.allocateRowId());
+      if (calculateRootRank(groupId, 
groupIdToHeapBuffer.getHeapRootNodeIndex(groupId)) > topN) {
+        heapPop(groupId, rowIdEvictionListener);
+      }
+      // Return true because heapPop is guaranteed not to evict the newly 
inserted row (by
+      // definition of rank)
+      return true;
+    }
+
+    groupIdToHeapBuffer.allocateGroupIfNeeded(groupId);
+    if (groupIdToHeapBuffer.getHeapValueCount(groupId) < topN) {
+      // Always safe to insert if total number of values is still less than 
topN
+      long newPeerGroupIndex =
+          peerGroupBuffer.allocateNewNode(rowReference.allocateRowId(), 
UNKNOWN_INDEX);
+      heapInsert(groupId, newPeerGroupIndex, 1);
+      return true;
+    }
+    long heapRootNodeIndex = groupIdToHeapBuffer.getHeapRootNodeIndex(groupId);
+    if (rowReference.compareTo(strategy, 
peekRootRowIdByHeapNodeIndex(heapRootNodeIndex)) < 0) {
+      // Given that total number of values >= topN, we can only consider 
values that are less than
+      // the root (otherwise topN would be violated)
+      long newPeerGroupIndex =
+          peerGroupBuffer.allocateNewNode(rowReference.allocateRowId(), 
UNKNOWN_INDEX);
+      // Rank will increase by +1 after insertion, so only need to pop if root 
rank is already ==
+      // topN.
+      if (calculateRootRank(groupId, heapRootNodeIndex) < topN) {
+        heapInsert(groupId, newPeerGroupIndex, 1);
+      } else {
+        heapPopAndInsert(groupId, newPeerGroupIndex, 1, rowIdEvictionListener);
+      }
+      return true;
+    }
+    // Row cannot be accepted because the total number of values >= topN, and 
the row is greater
+    // than the root (meaning it's rank would be at least topN+1).
+    return false;
+  }
+
+  /**
+   * Drain the contents of this accumulator to the provided output row ID and 
ranking buffer.
+   *
+   * <p>Rows will be presented in increasing rank order. Draining will not 
trigger any row eviction
+   * callbacks. After this method completion, the Accumulator will contain 
zero rows for the
+   * specified groupId.
+   *
+   * @return number of rows deposited to the output buffers
+   */
+  public long drainTo(int groupId, LongBigArray rowIdOutput, LongBigArray 
rankingOutput) {
+    long valueCount = groupIdToHeapBuffer.getHeapValueCount(groupId);
+    rowIdOutput.ensureCapacity(valueCount);
+    rankingOutput.ensureCapacity(valueCount);
+
+    // Heap is inverted to output order, so insert back to front
+    long insertionIndex = valueCount - 1;
+    while (insertionIndex >= 0) {
+      long heapRootNodeIndex = 
groupIdToHeapBuffer.getHeapRootNodeIndex(groupId);
+      verify(heapRootNodeIndex != UNKNOWN_INDEX);
+
+      long peerGroupIndex = 
heapNodeBuffer.getPeerGroupIndex(heapRootNodeIndex);
+      verify(peerGroupIndex != UNKNOWN_INDEX, "Peer group should have at least 
one value");
+
+      long rank = calculateRootRank(groupId, heapRootNodeIndex);
+      do {
+        rowIdOutput.set(insertionIndex, 
peerGroupBuffer.getRowId(peerGroupIndex));
+        rankingOutput.set(insertionIndex, rank);
+        insertionIndex--;
+        peerGroupIndex = peerGroupBuffer.getNextPeerIndex(peerGroupIndex);
+      } while (peerGroupIndex != UNKNOWN_INDEX);
+
+      heapPop(groupId, null);
+    }
+    return valueCount;
+  }
+
+  /**
+   * Drain the contents of this accumulator to the provided output row ID.
+   *
+   * <p>Rows will be presented in increasing rank order. Draining will not 
trigger any row eviction
+   * callbacks. After this method completion, the Accumulator will contain 
zero rows for the
+   * specified groupId.
+   *
+   * @return number of rows deposited to the output buffer
+   */
+  public long drainTo(int groupId, LongBigArray rowIdOutput) {
+    long valueCount = groupIdToHeapBuffer.getHeapValueCount(groupId);
+    rowIdOutput.ensureCapacity(valueCount);
+
+    // Heap is inverted to output order, so insert back to front
+    long insertionIndex = valueCount - 1;
+    while (insertionIndex >= 0) {
+      long heapRootNodeIndex = 
groupIdToHeapBuffer.getHeapRootNodeIndex(groupId);
+      verify(heapRootNodeIndex != UNKNOWN_INDEX);
+
+      long peerGroupIndex = 
heapNodeBuffer.getPeerGroupIndex(heapRootNodeIndex);
+      verify(peerGroupIndex != UNKNOWN_INDEX, "Peer group should have at least 
one value");
+
+      do {
+        rowIdOutput.set(insertionIndex, 
peerGroupBuffer.getRowId(peerGroupIndex));
+        insertionIndex--;
+        peerGroupIndex = peerGroupBuffer.getNextPeerIndex(peerGroupIndex);
+      } while (peerGroupIndex != UNKNOWN_INDEX);
+
+      heapPop(groupId, null);
+    }
+    return valueCount;
+  }
+
+  private long calculateRootRank(int groupId, long heapRootIndex) {
+    long heapValueCount = groupIdToHeapBuffer.getHeapValueCount(groupId);
+    checkArgument(heapRootIndex != UNKNOWN_INDEX, "Group does not have a 
root");
+    long rootPeerGroupCount = heapNodeBuffer.getPeerGroupCount(heapRootIndex);
+    return heapValueCount - rootPeerGroupCount + 1;
+  }
+
+  private void directPeerGroupInsert(int groupId, long heapNodeIndex, long 
rowId) {
+    long existingPeerGroupIndex = 
heapNodeBuffer.getPeerGroupIndex(heapNodeIndex);
+    long newPeerGroupIndex = peerGroupBuffer.allocateNewNode(rowId, 
existingPeerGroupIndex);
+    heapNodeBuffer.setPeerGroupIndex(heapNodeIndex, newPeerGroupIndex);
+    heapNodeBuffer.incrementPeerGroupCount(heapNodeIndex);
+    groupIdToHeapBuffer.incrementHeapValueCount(groupId);
+  }
+
+  private long peekRootRowIdByHeapNodeIndex(long heapRootNodeIndex) {
+    checkArgument(heapRootNodeIndex != UNKNOWN_INDEX, "Group has nothing to 
peek");
+    return 
peerGroupBuffer.getRowId(heapNodeBuffer.getPeerGroupIndex(heapRootNodeIndex));
+  }
+
+  private long getChildIndex(long heapNodeIndex, HeapTraversal.Child child) {
+    return child == HeapTraversal.Child.LEFT
+        ? heapNodeBuffer.getLeftChildHeapIndex(heapNodeIndex)
+        : heapNodeBuffer.getRightChildHeapIndex(heapNodeIndex);
+  }
+
+  private void setChildIndex(long heapNodeIndex, HeapTraversal.Child child, 
long newChildIndex) {
+    if (child == HeapTraversal.Child.LEFT) {
+      heapNodeBuffer.setLeftChildHeapIndex(heapNodeIndex, newChildIndex);
+    } else {
+      heapNodeBuffer.setRightChildHeapIndex(heapNodeIndex, newChildIndex);
+    }
+  }
+
+  /**
+   * Pop the root node off the group ID's max heap.
+   *
+   * @param contextEvictionListener optional callback for the root node that 
gets popped off
+   */
+  private void heapPop(int groupId, LongConsumer contextEvictionListener) {
+    long heapRootNodeIndex = groupIdToHeapBuffer.getHeapRootNodeIndex(groupId);
+    checkArgument(heapRootNodeIndex != UNKNOWN_INDEX, "Group ID has an empty 
heap");
+
+    long lastHeapNodeIndex = heapDetachLastInsertionLeaf(groupId);
+    long lastPeerGroupIndex = 
heapNodeBuffer.getPeerGroupIndex(lastHeapNodeIndex);
+    long lastPeerGroupCount = 
heapNodeBuffer.getPeerGroupCount(lastHeapNodeIndex);
+
+    if (lastHeapNodeIndex == heapRootNodeIndex) {
+      // The root is the last node remaining
+      dropHeapNodePeerGroup(groupId, lastHeapNodeIndex, 
contextEvictionListener);
+    } else {
+      // Pop the root and insert the last peer group back into the heap to 
ensure a balanced tree
+      heapPopAndInsert(groupId, lastPeerGroupIndex, lastPeerGroupCount, 
contextEvictionListener);
+    }
+
+    // peerGroupLookup entry will be updated by definition of inserting the 
last peer group into a
+    // new node
+    heapNodeBuffer.deallocate(lastHeapNodeIndex);
+  }
+
+  /**
+   * Detaches (but does not deallocate) the leaf in the bottom right-most 
position in the heap.
+   *
+   * <p>Given the fixed insertion order, the bottom right-most leaf will 
correspond to the last leaf
+   * node inserted into the balanced heap.
+   *
+   * @return leaf node index that was detached from the heap
+   */
+  private long heapDetachLastInsertionLeaf(int groupId) {
+    long heapRootNodeIndex = groupIdToHeapBuffer.getHeapRootNodeIndex(groupId);
+    long heapSize = groupIdToHeapBuffer.getHeapSize(groupId);
+
+    long previousNodeIndex = UNKNOWN_INDEX;
+    HeapTraversal.Child childPosition = null;
+    long currentNodeIndex = heapRootNodeIndex;
+
+    heapTraversal.resetWithPathTo(heapSize);
+    while (!heapTraversal.isTarget()) {
+      previousNodeIndex = currentNodeIndex;
+      childPosition = heapTraversal.nextChild();
+      currentNodeIndex = getChildIndex(currentNodeIndex, childPosition);
+      verify(currentNodeIndex != UNKNOWN_INDEX, "Target node must exist");
+    }
+
+    // Detach the last insertion leaf node, but do not deallocate yet
+    if (previousNodeIndex == UNKNOWN_INDEX) {
+      // Last insertion leaf was the root node
+      groupIdToHeapBuffer.setHeapRootNodeIndex(groupId, UNKNOWN_INDEX);
+      groupIdToHeapBuffer.setHeapValueCount(groupId, 0);
+      groupIdToHeapBuffer.setHeapSize(groupId, 0);
+    } else {
+      setChildIndex(previousNodeIndex, childPosition, UNKNOWN_INDEX);
+      groupIdToHeapBuffer.addHeapValueCount(
+          groupId, -heapNodeBuffer.getPeerGroupCount(currentNodeIndex));
+      groupIdToHeapBuffer.addHeapSize(groupId, -1);
+    }
+
+    return currentNodeIndex;
+  }
+
+  /**
+   * Inserts a new row into the heap for the specified group ID.
+   *
+   * <p>The technique involves traversing the heap from the root to a new 
bottom left-priority leaf
+   * position, potentially swapping heap nodes along the way to find the 
proper insertion position
+   * for the new row. Insertions always fill the left child before the right, 
and fill up an entire
+   * heap level before moving to the next level.
+   */
+  private void heapInsert(int groupId, long newPeerGroupIndex, long 
newPeerGroupCount) {
+    long newCanonicalRowId = peerGroupBuffer.getRowId(newPeerGroupIndex);
+
+    long heapRootNodeIndex = groupIdToHeapBuffer.getHeapRootNodeIndex(groupId);
+    if (heapRootNodeIndex == UNKNOWN_INDEX) {
+      // Heap is currently empty, so this will be the first node
+      heapRootNodeIndex = heapNodeBuffer.allocateNewNode(newPeerGroupIndex, 
newPeerGroupCount);
+      verify(peerGroupLookup.put(groupId, newCanonicalRowId, 
heapRootNodeIndex) == UNKNOWN_INDEX);
+      groupIdToHeapBuffer.setHeapRootNodeIndex(groupId, heapRootNodeIndex);
+      groupIdToHeapBuffer.setHeapValueCount(groupId, newPeerGroupCount);
+      groupIdToHeapBuffer.setHeapSize(groupId, 1);
+      return;
+    }
+
+    long previousHeapNodeIndex = UNKNOWN_INDEX;
+    HeapTraversal.Child childPosition = null;
+    long currentHeapNodeIndex = heapRootNodeIndex;
+    boolean swapped = false;
+
+    groupIdToHeapBuffer.addHeapValueCount(groupId, newPeerGroupCount);
+    groupIdToHeapBuffer.incrementHeapSize(groupId);
+    heapTraversal.resetWithPathTo(groupIdToHeapBuffer.getHeapSize(groupId));
+    while (!heapTraversal.isTarget()) {
+      long peerGroupIndex = 
heapNodeBuffer.getPeerGroupIndex(currentHeapNodeIndex);
+      long currentCanonicalRowId = peerGroupBuffer.getRowId(peerGroupIndex);
+      // We can short-circuit the check if a parent has already been swapped 
because the new row to
+      // insert must
+      // be greater than all of it's children.
+      if (swapped || strategy.compare(newCanonicalRowId, 
currentCanonicalRowId) > 0) {
+        long peerGroupCount = 
heapNodeBuffer.getPeerGroupCount(currentHeapNodeIndex);
+
+        // Swap the peer groups
+        heapNodeBuffer.setPeerGroupIndex(currentHeapNodeIndex, 
newPeerGroupIndex);
+        heapNodeBuffer.setPeerGroupCount(currentHeapNodeIndex, 
newPeerGroupCount);
+        peerGroupLookup.put(groupId, newCanonicalRowId, currentHeapNodeIndex);
+
+        newPeerGroupIndex = peerGroupIndex;
+        newPeerGroupCount = peerGroupCount;
+        newCanonicalRowId = currentCanonicalRowId;
+        swapped = true;
+      }
+
+      previousHeapNodeIndex = currentHeapNodeIndex;
+      childPosition = heapTraversal.nextChild();
+      currentHeapNodeIndex = getChildIndex(currentHeapNodeIndex, 
childPosition);
+    }
+
+    verify(
+        previousHeapNodeIndex != UNKNOWN_INDEX && childPosition != null,
+        "heap must have at least one node before starting traversal");
+    verify(currentHeapNodeIndex == UNKNOWN_INDEX, "New child shouldn't exist 
yet");
+
+    long newHeapNodeIndex = heapNodeBuffer.allocateNewNode(newPeerGroupIndex, 
newPeerGroupCount);
+    peerGroupLookup.put(groupId, newCanonicalRowId, newHeapNodeIndex);
+
+    //  Link the new child to the parent
+    setChildIndex(previousHeapNodeIndex, childPosition, newHeapNodeIndex);
+  }
+
+  /**
+   * Pop the root off the group ID's max heap and insert the new peer group.
+   *
+   * <p>These two operations are more efficient if performed together. The 
technique involves
+   * swapping the new row into the root position, and applying a heap down 
bubbling operation to
+   * heap-ify.
+   *
+   * @param contextEvictionListener optional callback for the root node that 
gets popped off
+   */
+  private void heapPopAndInsert(
+      int groupId,
+      long newPeerGroupIndex,
+      long newPeerGroupCount,
+      LongConsumer contextEvictionListener) {
+    long heapRootNodeIndex = groupIdToHeapBuffer.getHeapRootNodeIndex(groupId);
+    checkState(heapRootNodeIndex != UNKNOWN_INDEX, "popAndInsert() requires at 
least a root node");
+
+    // Clear contents of the root node to create a vacancy for the new peer 
group
+    groupIdToHeapBuffer.addHeapValueCount(
+        groupId, newPeerGroupCount - 
heapNodeBuffer.getPeerGroupCount(heapRootNodeIndex));
+    dropHeapNodePeerGroup(groupId, heapRootNodeIndex, contextEvictionListener);
+
+    long newCanonicalRowId = peerGroupBuffer.getRowId(newPeerGroupIndex);
+
+    long currentNodeIndex = heapRootNodeIndex;
+    while (true) {
+      long maxChildNodeIndex = 
heapNodeBuffer.getLeftChildHeapIndex(currentNodeIndex);
+      if (maxChildNodeIndex == UNKNOWN_INDEX) {
+        // Left is always inserted before right, so a missing left child means 
there can't be a
+        // right child,
+        // which means this must already be a leaf position.
+        break;
+      }
+      long maxChildPeerGroupIndex = 
heapNodeBuffer.getPeerGroupIndex(maxChildNodeIndex);
+      long maxChildCanonicalRowId = 
peerGroupBuffer.getRowId(maxChildPeerGroupIndex);
+
+      long rightChildNodeIndex = 
heapNodeBuffer.getRightChildHeapIndex(currentNodeIndex);
+      if (rightChildNodeIndex != UNKNOWN_INDEX) {
+        long rightChildPeerGroupIndex = 
heapNodeBuffer.getPeerGroupIndex(rightChildNodeIndex);
+        long rightChildCanonicalRowId = 
peerGroupBuffer.getRowId(rightChildPeerGroupIndex);
+        if (strategy.compare(rightChildCanonicalRowId, maxChildCanonicalRowId) 
> 0) {
+          maxChildNodeIndex = rightChildNodeIndex;
+          maxChildPeerGroupIndex = rightChildPeerGroupIndex;
+          maxChildCanonicalRowId = rightChildCanonicalRowId;
+        }
+      }
+
+      if (strategy.compare(newCanonicalRowId, maxChildCanonicalRowId) >= 0) {
+        // New row is greater than or equal to both children, so the heap 
invariant is satisfied by
+        // inserting the
+        // new row at this position
+        break;
+      }
+
+      // Swap the max child row value into the current node
+      heapNodeBuffer.setPeerGroupIndex(currentNodeIndex, 
maxChildPeerGroupIndex);
+      heapNodeBuffer.setPeerGroupCount(
+          currentNodeIndex, 
heapNodeBuffer.getPeerGroupCount(maxChildNodeIndex));
+      peerGroupLookup.put(groupId, maxChildCanonicalRowId, currentNodeIndex);
+
+      // Max child now has an unfilled vacancy, so continue processing with 
that as the current node
+      currentNodeIndex = maxChildNodeIndex;
+    }
+
+    heapNodeBuffer.setPeerGroupIndex(currentNodeIndex, newPeerGroupIndex);
+    heapNodeBuffer.setPeerGroupCount(currentNodeIndex, newPeerGroupCount);
+    peerGroupLookup.put(groupId, newCanonicalRowId, currentNodeIndex);
+  }
+
+  /**
+   * Deallocates all peer group associations for this heap node, leaving a 
structural husk with no
+   * contents. Assumes that any required group level metric changes are 
handled externally.
+   */
+  private void dropHeapNodePeerGroup(
+      int groupId, long heapNodeIndex, LongConsumer contextEvictionListener) {
+    long peerGroupIndex = heapNodeBuffer.getPeerGroupIndex(heapNodeIndex);
+    checkState(peerGroupIndex != UNKNOWN_INDEX, "Heap node must have at least 
one peer group");
+
+    long rowId = peerGroupBuffer.getRowId(peerGroupIndex);
+    long nextPeerIndex = peerGroupBuffer.getNextPeerIndex(peerGroupIndex);
+    peerGroupBuffer.deallocate(peerGroupIndex);
+    verify(peerGroupLookup.remove(groupId, rowId) == heapNodeIndex);
+
+    if (contextEvictionListener != null) {
+      contextEvictionListener.accept(rowId);
+    }
+
+    peerGroupIndex = nextPeerIndex;
+
+    while (peerGroupIndex != UNKNOWN_INDEX) {
+      rowId = peerGroupBuffer.getRowId(peerGroupIndex);
+      nextPeerIndex = peerGroupBuffer.getNextPeerIndex(peerGroupIndex);
+      peerGroupBuffer.deallocate(peerGroupIndex);
+
+      if (contextEvictionListener != null) {
+        contextEvictionListener.accept(rowId);
+      }
+
+      peerGroupIndex = nextPeerIndex;
+    }
+  }
+
+  /**
+   * Buffer abstracting a mapping from group ID to a heap. The group ID 
provides the index for all
+   * operations.
+   */
+  private static final class GroupIdToHeapBuffer {
+    private static final long INSTANCE_SIZE =
+        RamUsageEstimator.shallowSizeOfInstance(GroupIdToHeapBuffer.class);
+    private static final int METRICS_POSITIONS_PER_ENTRY = 2;
+    private static final int METRICS_HEAP_SIZE_OFFSET = 1;
+
+    /*
+     *  Memory layout:
+     *  [LONG] heapNodeIndex1,
+     *  [LONG] heapNodeIndex2,
+     *  ...
+     */
+    // Since we have a single element per group, this array is effectively 
indexed on group ID
+    private final LongBigArray heapIndexBuffer = new 
LongBigArray(UNKNOWN_INDEX);
+
+    /*
+     *  Memory layout:
+     *  [LONG] valueCount1, [LONG] heapSize1,
+     *  [LONG] valueCount2, [LONG] heapSize2,
+     *  ...
+     */
+    private final LongBigArray metricsBuffer = new LongBigArray(0);
+
+    private int totalGroups;
+
+    public void allocateGroupIfNeeded(int groupId) {
+      if (totalGroups > groupId) {
+        return;
+      }
+      // Group IDs generated by GroupByHash are always generated consecutively 
starting from 0, so
+      // observing a
+      // group ID N means groups [0, N] inclusive must exist.
+      totalGroups = groupId + 1;
+      heapIndexBuffer.ensureCapacity(totalGroups);
+      metricsBuffer.ensureCapacity((long) totalGroups * 
METRICS_POSITIONS_PER_ENTRY);
+    }
+
+    public int getTotalGroups() {
+      return totalGroups;
+    }
+
+    public long getHeapRootNodeIndex(int groupId) {
+      return heapIndexBuffer.get(groupId);
+    }
+
+    public void setHeapRootNodeIndex(int groupId, long heapNodeIndex) {
+      heapIndexBuffer.set(groupId, heapNodeIndex);
+    }
+
+    public long getHeapValueCount(int groupId) {
+      return metricsBuffer.get((long) groupId * METRICS_POSITIONS_PER_ENTRY);
+    }
+
+    public void setHeapValueCount(int groupId, long count) {
+      metricsBuffer.set((long) groupId * METRICS_POSITIONS_PER_ENTRY, count);
+    }
+
+    public void addHeapValueCount(int groupId, long delta) {
+      metricsBuffer.add((long) groupId * METRICS_POSITIONS_PER_ENTRY, delta);
+    }
+
+    public void incrementHeapValueCount(int groupId) {
+      metricsBuffer.increment((long) groupId * METRICS_POSITIONS_PER_ENTRY);
+    }
+
+    public long getHeapSize(int groupId) {
+      return metricsBuffer.get(
+          (long) groupId * METRICS_POSITIONS_PER_ENTRY + 
METRICS_HEAP_SIZE_OFFSET);
+    }
+
+    public void setHeapSize(int groupId, long size) {
+      metricsBuffer.set(
+          (long) groupId * METRICS_POSITIONS_PER_ENTRY + 
METRICS_HEAP_SIZE_OFFSET, size);
+    }
+
+    public void addHeapSize(int groupId, long delta) {
+      metricsBuffer.add(
+          (long) groupId * METRICS_POSITIONS_PER_ENTRY + 
METRICS_HEAP_SIZE_OFFSET, delta);
+    }
+
+    public void incrementHeapSize(int groupId) {
+      metricsBuffer.increment(
+          (long) groupId * METRICS_POSITIONS_PER_ENTRY + 
METRICS_HEAP_SIZE_OFFSET);
+    }
+
+    public long sizeOf() {
+      return INSTANCE_SIZE + heapIndexBuffer.sizeOf() + metricsBuffer.sizeOf();
+    }
+  }
+
+  /**
+   * Buffer abstracting storage of nodes in the heap. Nodes are referenced by 
their node index for
+   * operations.
+   */
+  private static final class HeapNodeBuffer {
+    private static final long INSTANCE_SIZE =
+        RamUsageEstimator.shallowSizeOfInstance(HeapNodeBuffer.class);
+    private static final int POSITIONS_PER_ENTRY = 4;
+    private static final int PEER_GROUP_COUNT_OFFSET = 1;
+    private static final int LEFT_CHILD_HEAP_INDEX_OFFSET = 2;
+    private static final int RIGHT_CHILD_HEAP_INDEX_OFFSET = 3;
+
+    /*
+     *  Memory layout:
+     *  [LONG] peerGroupIndex1, [LONG] peerGroupCount1, [LONG] 
leftChildNodeIndex1, [LONG] rightChildNodeIndex1,
+     *  [LONG] peerGroupIndex2, [LONG] peerGroupCount2, [LONG] 
leftChildNodeIndex2, [LONG] rightChildNodeIndex2,
+     *  ...
+     */
+    private final LongBigArray buffer = new LongBigArray();
+
+    private final LongBigArrayFIFOQueue emptySlots = new 
LongBigArrayFIFOQueue();
+
+    private long capacity;
+
+    /**
+     * Allocates storage for a new heap node.
+     *
+     * @return index referencing the node
+     */
+    public long allocateNewNode(long peerGroupIndex, long peerGroupCount) {
+      long newHeapIndex;
+      if (!emptySlots.isEmpty()) {
+        newHeapIndex = emptySlots.dequeueLong();
+      } else {
+        newHeapIndex = capacity;
+        capacity++;
+        buffer.ensureCapacity(capacity * POSITIONS_PER_ENTRY);
+      }
+
+      setPeerGroupIndex(newHeapIndex, peerGroupIndex);
+      setPeerGroupCount(newHeapIndex, peerGroupCount);
+      setLeftChildHeapIndex(newHeapIndex, UNKNOWN_INDEX);
+      setRightChildHeapIndex(newHeapIndex, UNKNOWN_INDEX);
+
+      return newHeapIndex;
+    }
+
+    public void deallocate(long index) {
+      emptySlots.enqueue(index);
+    }
+
+    public long getActiveNodeCount() {
+      return capacity - emptySlots.longSize();
+    }
+
+    public long getPeerGroupIndex(long index) {
+      return buffer.get(index * POSITIONS_PER_ENTRY);
+    }
+
+    public void setPeerGroupIndex(long index, long peerGroupIndex) {
+      buffer.set(index * POSITIONS_PER_ENTRY, peerGroupIndex);
+    }
+
+    public long getPeerGroupCount(long index) {
+      return buffer.get(index * POSITIONS_PER_ENTRY + PEER_GROUP_COUNT_OFFSET);
+    }
+
+    public void setPeerGroupCount(long index, long peerGroupCount) {
+      buffer.set(index * POSITIONS_PER_ENTRY + PEER_GROUP_COUNT_OFFSET, 
peerGroupCount);
+    }
+
+    public void incrementPeerGroupCount(long index) {
+      buffer.increment(index * POSITIONS_PER_ENTRY + PEER_GROUP_COUNT_OFFSET);
+    }
+
+    public void addPeerGroupCount(long index, long delta) {
+      buffer.add(index * POSITIONS_PER_ENTRY + PEER_GROUP_COUNT_OFFSET, delta);
+    }
+
+    public long getLeftChildHeapIndex(long index) {
+      return buffer.get(index * POSITIONS_PER_ENTRY + 
LEFT_CHILD_HEAP_INDEX_OFFSET);
+    }
+
+    public void setLeftChildHeapIndex(long index, long childHeapIndex) {
+      buffer.set(index * POSITIONS_PER_ENTRY + LEFT_CHILD_HEAP_INDEX_OFFSET, 
childHeapIndex);
+    }
+
+    public long getRightChildHeapIndex(long index) {
+      return buffer.get(index * POSITIONS_PER_ENTRY + 
RIGHT_CHILD_HEAP_INDEX_OFFSET);
+    }
+
+    public void setRightChildHeapIndex(long index, long childHeapIndex) {
+      buffer.set(index * POSITIONS_PER_ENTRY + RIGHT_CHILD_HEAP_INDEX_OFFSET, 
childHeapIndex);
+    }
+
+    public long sizeOf() {
+      return INSTANCE_SIZE + buffer.sizeOf() + emptySlots.sizeOf();
+    }
+  }
+
+  /**
+   * Buffer abstracting storage of peer groups as linked chains of matching 
values. Peer groups are
+   * referenced by their node index for operations.
+   */
+  private static final class PeerGroupBuffer {
+    private static final long INSTANCE_SIZE =
+        RamUsageEstimator.shallowSizeOfInstance(PeerGroupBuffer.class);
+    private static final int POSITIONS_PER_ENTRY = 2;
+    private static final int NEXT_PEER_INDEX_OFFSET = 1;
+
+    /*
+     *  Memory layout:
+     *  [LONG] rowId1, [LONG] nextPeerIndex1,
+     *  [LONG] rowId2, [LONG] nextPeerIndex2,
+     *  ...
+     */
+    private final LongBigArray buffer = new LongBigArray();
+
+    private final LongBigArrayFIFOQueue emptySlots = new 
LongBigArrayFIFOQueue();
+
+    private long capacity;
+
+    /**
+     * Allocates storage for a new peer group node.
+     *
+     * @return index referencing the node
+     */
+    public long allocateNewNode(long rowId, long nextPeerIndex) {
+      long newPeerIndex;
+      if (!emptySlots.isEmpty()) {
+        newPeerIndex = emptySlots.dequeueLong();
+      } else {
+        newPeerIndex = capacity;
+        capacity++;
+        buffer.ensureCapacity(capacity * POSITIONS_PER_ENTRY);
+      }
+
+      setRowId(newPeerIndex, rowId);
+      setNextPeerIndex(newPeerIndex, nextPeerIndex);
+
+      return newPeerIndex;
+    }
+
+    public void deallocate(long index) {
+      emptySlots.enqueue(index);
+    }
+
+    public long getActiveNodeCount() {
+      return capacity - emptySlots.longSize();
+    }
+
+    public long getRowId(long index) {
+      return buffer.get(index * POSITIONS_PER_ENTRY);
+    }
+
+    public void setRowId(long index, long rowId) {
+      buffer.set(index * POSITIONS_PER_ENTRY, rowId);
+    }
+
+    public long getNextPeerIndex(long index) {
+      return buffer.get(index * POSITIONS_PER_ENTRY + NEXT_PEER_INDEX_OFFSET);
+    }
+
+    public void setNextPeerIndex(long index, long nextPeerIndex) {
+      buffer.set(index * POSITIONS_PER_ENTRY + NEXT_PEER_INDEX_OFFSET, 
nextPeerIndex);
+    }
+
+    public long sizeOf() {
+      return INSTANCE_SIZE + buffer.sizeOf() + emptySlots.sizeOf();
+    }
+  }
+}
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/GroupedTopNRankBuilder.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/GroupedTopNRankBuilder.java
new file mode 100644
index 00000000000..dcfc8dbcee1
--- /dev/null
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/GroupedTopNRankBuilder.java
@@ -0,0 +1,219 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.queryengine.execution.operator;
+
+import 
org.apache.iotdb.db.queryengine.execution.operator.source.relational.aggregation.grouped.array.LongBigArray;
+import 
org.apache.iotdb.db.queryengine.execution.operator.source.relational.aggregation.grouped.hash.GroupByHash;
+
+import com.google.common.collect.AbstractIterator;
+import com.google.common.collect.ImmutableList;
+import org.apache.tsfile.block.column.Column;
+import org.apache.tsfile.block.column.ColumnBuilder;
+import org.apache.tsfile.enums.TSDataType;
+import org.apache.tsfile.read.common.block.TsBlock;
+import org.apache.tsfile.read.common.block.TsBlockBuilder;
+import org.apache.tsfile.read.common.block.column.RunLengthEncodedColumn;
+import org.apache.tsfile.utils.RamUsageEstimator;
+
+import java.util.Iterator;
+import java.util.List;
+
+import static 
org.apache.iotdb.db.queryengine.execution.operator.source.relational.AbstractTableScanOperator.TIME_COLUMN_TEMPLATE;
+
+/**
+ * Finds the top N rows by rank value for each group. Unlike row_number which 
assigns unique
+ * sequential numbers, rank assigns the same number to rows with equal sort 
key values (peers).
+ */
+public class GroupedTopNRankBuilder implements GroupedTopNBuilder {
+  private static final long INSTANCE_SIZE =
+      RamUsageEstimator.shallowSizeOfInstance(GroupedTopNRankBuilder.class);
+
+  private final List<TSDataType> sourceTypes;
+  private final boolean produceRanking;
+  private final int[] groupByChannels;
+  private final GroupByHash groupByHash;
+  private final TsBlockWithPositionComparator comparator;
+  private final RowReferenceTsBlockManager tsBlockManager = new 
RowReferenceTsBlockManager();
+  private final GroupedTopNRankAccumulator groupedTopNRankAccumulator;
+
+  private int effectiveGroupCount = 0;
+
+  public GroupedTopNRankBuilder(
+      List<TSDataType> sourceTypes,
+      TsBlockWithPositionComparator comparator,
+      TsBlockWithPositionEqualsAndHash equalsAndHash,
+      int topN,
+      boolean produceRanking,
+      int[] groupByChannels,
+      GroupByHash groupByHash) {
+    this.sourceTypes = sourceTypes;
+    this.produceRanking = produceRanking;
+    this.groupByChannels = groupByChannels;
+    this.groupByHash = groupByHash;
+    this.comparator = comparator;
+
+    this.groupedTopNRankAccumulator =
+        new GroupedTopNRankAccumulator(
+            new RowIdComparisonHashStrategy() {
+              @Override
+              public int compare(long leftRowId, long rightRowId) {
+                TsBlock leftTsBlock = tsBlockManager.getTsBlock(leftRowId);
+                int leftPosition = tsBlockManager.getPosition(leftRowId);
+                TsBlock rightTsBlock = tsBlockManager.getTsBlock(rightRowId);
+                int rightPosition = tsBlockManager.getPosition(rightRowId);
+                return comparator.compareTo(leftTsBlock, leftPosition, 
rightTsBlock, rightPosition);
+              }
+
+              @Override
+              public boolean equals(long leftRowId, long rightRowId) {
+                TsBlock leftTsBlock = tsBlockManager.getTsBlock(leftRowId);
+                int leftPosition = tsBlockManager.getPosition(leftRowId);
+                TsBlock rightTsBlock = tsBlockManager.getTsBlock(rightRowId);
+                int rightPosition = tsBlockManager.getPosition(rightRowId);
+                return equalsAndHash.equals(leftTsBlock, leftPosition, 
rightTsBlock, rightPosition);
+              }
+
+              @Override
+              public long hashCode(long rowId) {
+                TsBlock tsBlock = tsBlockManager.getTsBlock(rowId);
+                int position = tsBlockManager.getPosition(rowId);
+                return equalsAndHash.hashCode(tsBlock, position);
+              }
+            },
+            topN,
+            tsBlockManager::dereference);
+  }
+
+  @Override
+  public void addTsBlock(TsBlock tsBlock) {
+    int[] groupIds;
+    if (groupByChannels.length == 0) {
+      groupIds = new int[tsBlock.getPositionCount()];
+      if (tsBlock.getPositionCount() > 0) {
+        effectiveGroupCount = 1;
+      }
+    } else {
+      groupIds = groupByHash.getGroupIds(tsBlock.getColumns(groupByChannels));
+      effectiveGroupCount = groupByHash.getGroupCount();
+    }
+
+    processTsBlock(tsBlock, effectiveGroupCount, groupIds);
+  }
+
+  @Override
+  public Iterator<TsBlock> getResult() {
+    return new ResultIterator();
+  }
+
+  @Override
+  public long getEstimatedSizeInBytes() {
+    return INSTANCE_SIZE
+        + groupByHash.getEstimatedSize()
+        + tsBlockManager.sizeOf()
+        + groupedTopNRankAccumulator.sizeOf();
+  }
+
+  private void processTsBlock(TsBlock newTsBlock, int groupCount, int[] 
groupIds) {
+    int firstPositionToAdd =
+        groupedTopNRankAccumulator.findFirstPositionToAdd(
+            newTsBlock, groupCount, groupIds, comparator, tsBlockManager);
+    if (firstPositionToAdd < 0) {
+      return;
+    }
+
+    try (RowReferenceTsBlockManager.LoadCursor loadCursor =
+        tsBlockManager.add(newTsBlock, firstPositionToAdd)) {
+      for (int position = firstPositionToAdd;
+          position < newTsBlock.getPositionCount();
+          position++) {
+        int groupId = groupIds[position];
+        loadCursor.advance();
+        groupedTopNRankAccumulator.add(groupId, loadCursor);
+      }
+    }
+
+    tsBlockManager.compactIfNeeded();
+  }
+
+  private class ResultIterator extends AbstractIterator<TsBlock> {
+    private final TsBlockBuilder tsBlockBuilder;
+    private final int groupIdCount = effectiveGroupCount;
+    private int currentGroupId = -1;
+    private final LongBigArray rowIdOutput = new LongBigArray();
+    private final LongBigArray rankingOutput = new LongBigArray();
+    private long currentGroupSize;
+    private int currentIndexInGroup;
+
+    ResultIterator() {
+      ImmutableList.Builder<TSDataType> sourceTypesBuilders =
+          ImmutableList.<TSDataType>builder().addAll(sourceTypes);
+      if (produceRanking) {
+        sourceTypesBuilders.add(TSDataType.INT64);
+      }
+      tsBlockBuilder = new TsBlockBuilder(sourceTypesBuilders.build());
+    }
+
+    @Override
+    protected TsBlock computeNext() {
+      tsBlockBuilder.reset();
+      while (!tsBlockBuilder.isFull()) {
+        while (currentIndexInGroup >= currentGroupSize) {
+          if (currentGroupId + 1 >= groupIdCount) {
+            if (tsBlockBuilder.isEmpty()) {
+              return endOfData();
+            }
+            return tsBlockBuilder.build(
+                new RunLengthEncodedColumn(
+                    TIME_COLUMN_TEMPLATE, tsBlockBuilder.getPositionCount()));
+          }
+          currentGroupId++;
+          currentGroupSize =
+              produceRanking
+                  ? groupedTopNRankAccumulator.drainTo(currentGroupId, 
rowIdOutput, rankingOutput)
+                  : groupedTopNRankAccumulator.drainTo(currentGroupId, 
rowIdOutput);
+          currentIndexInGroup = 0;
+        }
+
+        long rowId = rowIdOutput.get(currentIndexInGroup);
+        TsBlock tsBlock = tsBlockManager.getTsBlock(rowId);
+        int position = tsBlockManager.getPosition(rowId);
+        for (int i = 0; i < sourceTypes.size(); i++) {
+          ColumnBuilder builder = tsBlockBuilder.getColumnBuilder(i);
+          Column column = tsBlock.getColumn(i);
+          builder.write(column, position);
+        }
+        if (produceRanking) {
+          ColumnBuilder builder = 
tsBlockBuilder.getColumnBuilder(sourceTypes.size());
+          builder.writeLong(rankingOutput.get(currentIndexInGroup));
+        }
+        tsBlockBuilder.declarePosition();
+        currentIndexInGroup++;
+
+        tsBlockManager.dereference(rowId);
+      }
+
+      if (tsBlockBuilder.isEmpty()) {
+        return endOfData();
+      }
+      return tsBlockBuilder.build(
+          new RunLengthEncodedColumn(TIME_COLUMN_TEMPLATE, 
tsBlockBuilder.getPositionCount()));
+    }
+  }
+}
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/RowIdComparisonHashStrategy.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/RowIdComparisonHashStrategy.java
new file mode 100644
index 00000000000..8e5fd2a7942
--- /dev/null
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/RowIdComparisonHashStrategy.java
@@ -0,0 +1,8 @@
+package org.apache.iotdb.db.queryengine.execution.operator;
+
+public interface RowIdComparisonHashStrategy extends RowIdComparisonStrategy, 
RowIdHashStrategy {
+  @Override
+  default boolean equals(long leftRowId, long rightRowId) {
+    return compare(leftRowId, rightRowId) == 0;
+  }
+}
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/SimpleTsBlockWithPositionEqualsAndHash.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/SimpleTsBlockWithPositionEqualsAndHash.java
new file mode 100644
index 00000000000..5b9aa6c3b4c
--- /dev/null
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/SimpleTsBlockWithPositionEqualsAndHash.java
@@ -0,0 +1,129 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.queryengine.execution.operator;
+
+import org.apache.tsfile.block.column.Column;
+import org.apache.tsfile.enums.TSDataType;
+import org.apache.tsfile.read.common.block.TsBlock;
+
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * Computes equality and hash based on specified column channels of a TsBlock. 
Used for peer group
+ * detection in RANK window functions.
+ */
+public class SimpleTsBlockWithPositionEqualsAndHash implements 
TsBlockWithPositionEqualsAndHash {
+  private final List<Integer> channels;
+  private final List<TSDataType> types;
+
+  public SimpleTsBlockWithPositionEqualsAndHash(List<TSDataType> allTypes, 
List<Integer> channels) {
+    this.channels = channels;
+    this.types = new ArrayList<>(channels.size());
+    for (int channel : channels) {
+      types.add(allTypes.get(channel));
+    }
+  }
+
+  @Override
+  public boolean equals(TsBlock left, int leftPosition, TsBlock right, int 
rightPosition) {
+    for (int i = 0; i < channels.size(); i++) {
+      int channel = channels.get(i);
+      Column leftColumn = left.getColumn(channel);
+      Column rightColumn = right.getColumn(channel);
+
+      boolean leftNull = leftColumn.isNull(leftPosition);
+      boolean rightNull = rightColumn.isNull(rightPosition);
+      if (leftNull != rightNull) {
+        return false;
+      }
+      if (leftNull) {
+        continue;
+      }
+
+      if (!valueEquals(leftColumn, leftPosition, rightColumn, rightPosition, 
types.get(i))) {
+        return false;
+      }
+    }
+    return true;
+  }
+
+  @Override
+  public long hashCode(TsBlock block, int position) {
+    long hash = 0;
+    for (int i = 0; i < channels.size(); i++) {
+      Column column = block.getColumn(channels.get(i));
+      hash = hash * 31 + valueHash(column, position, types.get(i));
+    }
+    return hash;
+  }
+
+  private static boolean valueEquals(
+      Column left, int leftPos, Column right, int rightPos, TSDataType type) {
+    switch (type) {
+      case INT32:
+      case DATE:
+        return left.getInt(leftPos) == right.getInt(rightPos);
+      case INT64:
+      case TIMESTAMP:
+        return left.getLong(leftPos) == right.getLong(rightPos);
+      case FLOAT:
+        return Float.compare(left.getFloat(leftPos), right.getFloat(rightPos)) 
== 0;
+      case DOUBLE:
+        return Double.compare(left.getDouble(leftPos), 
right.getDouble(rightPos)) == 0;
+      case BOOLEAN:
+        return left.getBoolean(leftPos) == right.getBoolean(rightPos);
+      case TEXT:
+      case BLOB:
+      case STRING:
+        return left.getBinary(leftPos).equals(right.getBinary(rightPos));
+      default:
+        throw new IllegalArgumentException("Unsupported type: " + type);
+    }
+  }
+
+  private static long valueHash(Column column, int position, TSDataType type) {
+    if (column.isNull(position)) {
+      return 0;
+    }
+    switch (type) {
+      case INT32:
+      case DATE:
+        return column.getInt(position);
+      case INT64:
+      case TIMESTAMP:
+        long v = column.getLong(position);
+        return v ^ (v >>> 32);
+      case FLOAT:
+        return Float.floatToIntBits(column.getFloat(position));
+      case DOUBLE:
+        long dv = Double.doubleToLongBits(column.getDouble(position));
+        return dv ^ (dv >>> 32);
+      case BOOLEAN:
+        return column.getBoolean(position) ? 1231 : 1237;
+      case TEXT:
+      case BLOB:
+      case STRING:
+        return column.getBinary(position).hashCode();
+      default:
+        throw new IllegalArgumentException("Unsupported type: " + type);
+    }
+  }
+}
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/TopNPeerGroupLookup.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/TopNPeerGroupLookup.java
new file mode 100644
index 00000000000..dfb2e425746
--- /dev/null
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/TopNPeerGroupLookup.java
@@ -0,0 +1,382 @@
+package org.apache.iotdb.db.queryengine.execution.operator;
+
+import 
org.apache.iotdb.db.queryengine.execution.operator.source.relational.aggregation.grouped.array.LongBigArray;
+
+import org.apache.tsfile.utils.RamUsageEstimator;
+
+import static com.google.common.base.Preconditions.checkArgument;
+import static java.util.Objects.requireNonNull;
+
+/** Optimized hash table for streaming Top N peer group lookup operations. */
+// Note: this code was forked from fastutil (http://fastutil.di.unimi.it/)
+// Long2LongOpenCustomHashMap.
+// Copyright (C) 2002-2019 Sebastiano Vigna
+public class TopNPeerGroupLookup {
+  private static final long INSTANCE_SIZE =
+      RamUsageEstimator.shallowSizeOfInstance(TopNPeerGroupLookup.class);
+
+  /** The buffer containing key and value data. */
+  private Buffer buffer;
+
+  /** The mask for wrapping a position counter. */
+  private long mask;
+
+  /** The hash strategy. */
+  private final RowIdHashStrategy strategy;
+
+  /** The current allocated table size. */
+  private long tableSize;
+
+  /** Threshold after which we rehash. */
+  private long maxFill;
+
+  /** The acceptable load factor. */
+  private final float fillFactor;
+
+  /** Number of entries in the set. */
+  private long entryCount;
+
+  /**
+   * The value denoting unmapped group IDs. Since group IDs need to co-exist 
at all times with row
+   * IDs, we only need to use one of the two IDs to indicate that a slot is 
unused. Group IDs have
+   * been arbitrarily selected for that purpose.
+   */
+  private final long unmappedGroupId;
+
+  /** The default return value for {@code get()}, {@code put()} and {@code 
remove()}. */
+  private final long defaultReturnValue;
+
+  /**
+   * Standard hash table parameters are expected. {@code unmappedGroupId} 
specifies the internal
+   * marker value for unmapped group IDs.
+   */
+  public TopNPeerGroupLookup(
+      long expected,
+      float fillFactor,
+      RowIdHashStrategy strategy,
+      long unmappedGroupId,
+      long defaultReturnValue) {
+    checkArgument(expected >= 0, "The expected number of elements must be 
nonnegative");
+    checkArgument(
+        fillFactor > 0 && fillFactor <= 1,
+        "Load factor must be greater than 0 and smaller than or equal to 1");
+    this.fillFactor = fillFactor;
+    this.strategy = requireNonNull(strategy, "strategy is null");
+    this.unmappedGroupId = unmappedGroupId;
+    this.defaultReturnValue = defaultReturnValue;
+
+    tableSize = bigArraySize(expected, fillFactor);
+    mask = tableSize - 1;
+    maxFill = maxFill(tableSize, fillFactor);
+    buffer = new Buffer(tableSize, unmappedGroupId);
+  }
+
+  public TopNPeerGroupLookup(
+      long expected, RowIdHashStrategy strategy, long unmappedGroupId, long 
defaultReturnValue) {
+    this(expected, 0.75f, strategy, unmappedGroupId, defaultReturnValue);
+  }
+
+  /** Returns the size of this hash map in bytes. */
+  public long sizeOf() {
+    return INSTANCE_SIZE + buffer.sizeOf();
+  }
+
+  public long size() {
+    return entryCount;
+  }
+
+  public boolean isEmpty() {
+    return entryCount == 0;
+  }
+
+  public long get(long groupId, long rowId) {
+    checkArgument(groupId != unmappedGroupId, "Group ID cannot be the unmapped 
group ID");
+
+    long hash = hash(groupId, rowId);
+    long index = hash & mask;
+    if (buffer.isEmptySlot(index)) {
+      return defaultReturnValue;
+    }
+    if (hash == buffer.getPrecomputedHash(index) && equals(groupId, rowId, 
index)) {
+      return buffer.getValue(index);
+    }
+    // There's always an unused entry.
+    while (true) {
+      index = (index + 1) & mask;
+      if (buffer.isEmptySlot(index)) {
+        return defaultReturnValue;
+      }
+      if (hash == buffer.getPrecomputedHash(index) && equals(groupId, rowId, 
index)) {
+        return buffer.getValue(index);
+      }
+    }
+  }
+
+  public long get(long groupId, RowReference rowReference) {
+    checkArgument(groupId != unmappedGroupId, "Group ID cannot be the unmapped 
group ID");
+
+    long hash = hash(groupId, rowReference);
+    long index = hash & mask;
+    if (buffer.isEmptySlot(index)) {
+      return defaultReturnValue;
+    }
+    if (hash == buffer.getPrecomputedHash(index) && equals(groupId, 
rowReference, index)) {
+      return buffer.getValue(index);
+    }
+    // There's always an unused entry.
+    while (true) {
+      index = (index + 1) & mask;
+      if (buffer.isEmptySlot(index)) {
+        return defaultReturnValue;
+      }
+      if (hash == buffer.getPrecomputedHash(index) && equals(groupId, 
rowReference, index)) {
+        return buffer.getValue(index);
+      }
+    }
+  }
+
+  public long put(long groupId, long rowId, long value) {
+    checkArgument(groupId != unmappedGroupId, "Group ID cannot be the unmapped 
group ID");
+
+    long hash = hash(groupId, rowId);
+
+    long index = find(groupId, rowId, hash);
+    if (index < 0) {
+      insert(twosComplement(index), groupId, rowId, hash, value);
+      return defaultReturnValue;
+    }
+    long oldValue = buffer.getValue(index);
+    buffer.setValue(index, value);
+    return oldValue;
+  }
+
+  private long hash(long groupId, long rowId) {
+    return mix(groupId * 31 + strategy.hashCode(rowId));
+  }
+
+  private long hash(long groupId, RowReference rowReference) {
+    return mix(groupId * 31 + rowReference.hash(strategy));
+  }
+
+  private boolean equals(long groupId, long rowId, long index) {
+    return groupId == buffer.getGroupId(index) && strategy.equals(rowId, 
buffer.getRowId(index));
+  }
+
+  private boolean equals(long groupId, RowReference rowReference, long index) {
+    return groupId == buffer.getGroupId(index)
+        && rowReference.equals(strategy, buffer.getRowId(index));
+  }
+
+  private void insert(long index, long groupId, long rowId, long 
precomputedHash, long value) {
+    buffer.set(index, groupId, rowId, precomputedHash, value);
+    entryCount++;
+    if (entryCount > maxFill) {
+      rehash(bigArraySize(entryCount + 1, fillFactor));
+    }
+  }
+
+  /**
+   * Locate the index for the specified {@code groupId} and {@code rowId} key 
pair. If the index is
+   * unpopulated, then return the index as the two's complement value (which 
will be negative).
+   */
+  private long find(long groupId, long rowId, long precomputedHash) {
+    long index = precomputedHash & mask;
+    if (buffer.isEmptySlot(index)) {
+      return twosComplement(index);
+    }
+    if (precomputedHash == buffer.getPrecomputedHash(index) && equals(groupId, 
rowId, index)) {
+      return index;
+    }
+    // There's always an unused entry.
+    while (true) {
+      index = (index + 1) & mask;
+      if (buffer.isEmptySlot(index)) {
+        return twosComplement(index);
+      }
+      if (precomputedHash == buffer.getPrecomputedHash(index) && 
equals(groupId, rowId, index)) {
+        return index;
+      }
+    }
+  }
+
+  public long remove(long groupId, long rowId) {
+    checkArgument(groupId != unmappedGroupId, "Group ID cannot be the unmapped 
group ID");
+
+    long hash = hash(groupId, rowId);
+    long index = hash & mask;
+    if (buffer.isEmptySlot(index)) {
+      return defaultReturnValue;
+    }
+    if (hash == buffer.getPrecomputedHash(index) && equals(groupId, rowId, 
index)) {
+      return removeEntry(index);
+    }
+    while (true) {
+      index = (index + 1) & mask;
+      if (buffer.isEmptySlot(index)) {
+        return defaultReturnValue;
+      }
+      if (hash == buffer.getPrecomputedHash(index) && equals(groupId, rowId, 
index)) {
+        return removeEntry(index);
+      }
+    }
+  }
+
+  private long removeEntry(long index) {
+    long oldValue = buffer.getValue(index);
+    entryCount--;
+    shiftKeys(index);
+    return oldValue;
+  }
+
+  /**
+   * Shifts left entries with the specified hash code, starting at the 
specified index, and empties
+   * the resulting free entry.
+   *
+   * @param index a starting position.
+   */
+  private void shiftKeys(long index) {
+    // Shift entries with the same hash.
+    while (true) {
+      long currentHash;
+
+      long initialIndex = index;
+      index = ((index) + 1) & mask;
+      while (true) {
+        if (buffer.isEmptySlot(index)) {
+          buffer.clear(initialIndex);
+          return;
+        }
+        currentHash = buffer.getPrecomputedHash(index);
+        long slot = currentHash & mask;
+        // Yes, this is dense logic. See fastutil 
Long2LongOpenCustomHashMap#shiftKeys
+        // implementation.
+        if (initialIndex <= index
+            ? initialIndex >= slot || slot > index
+            : initialIndex >= slot && slot > index) {
+          break;
+        }
+        index = (index + 1) & mask;
+      }
+      buffer.set(
+          initialIndex,
+          buffer.getGroupId(index),
+          buffer.getRowId(index),
+          currentHash,
+          buffer.getValue(index));
+    }
+  }
+
+  private void rehash(long newTableSize) {
+    long newMask = newTableSize - 1; // Note that this is used by the hashing 
macro
+    Buffer newBuffer = new Buffer(newTableSize, unmappedGroupId);
+    long index = tableSize;
+    for (long i = entryCount; i > 0; i--) {
+      index--;
+      while (buffer.isEmptySlot(index)) {
+        index--;
+      }
+      long hash = buffer.getPrecomputedHash(index);
+      long newIndex = hash & newMask;
+      if (!newBuffer.isEmptySlot(newIndex)) {
+        newIndex = (newIndex + 1) & newMask;
+        while (!newBuffer.isEmptySlot(newIndex)) {
+          newIndex = (newIndex + 1) & newMask;
+        }
+      }
+      newBuffer.set(
+          newIndex, buffer.getGroupId(index), buffer.getRowId(index), hash, 
buffer.getValue(index));
+    }
+    tableSize = newTableSize;
+    mask = newMask;
+    maxFill = maxFill(tableSize, fillFactor);
+    buffer = newBuffer;
+  }
+
+  private static long twosComplement(long value) {
+    return -(value + 1);
+  }
+
+  private static class Buffer {
+    private static final long INSTANCE_SIZE = 
RamUsageEstimator.shallowSizeOfInstance(Buffer.class);
+
+    private static final int POSITIONS_PER_ENTRY = 4;
+    private static final int ROW_ID_OFFSET = 1;
+    private static final int PRECOMPUTED_HASH_OFFSET = 2;
+    private static final int VALUE_OFFSET = 3;
+
+    /*
+     *  Memory layout:
+     *  [LONG] groupId1, [LONG] rowId1, [LONG] precomputedHash1, [LONG] value1
+     *  [LONG] groupId2, [LONG] rowId2, [LONG] precomputedHash2, [LONG] value2
+     *  ...
+     */
+    private final LongBigArray buffer;
+    private final long unmappedGroupId;
+
+    public Buffer(long positions, long unmappedGroupId) {
+      buffer = new LongBigArray(unmappedGroupId);
+      buffer.ensureCapacity(positions * POSITIONS_PER_ENTRY);
+      this.unmappedGroupId = unmappedGroupId;
+    }
+
+    public void set(long index, long groupId, long rowId, long 
precomputedHash, long value) {
+      buffer.set(index * POSITIONS_PER_ENTRY, groupId);
+      buffer.set(index * POSITIONS_PER_ENTRY + ROW_ID_OFFSET, rowId);
+      buffer.set(index * POSITIONS_PER_ENTRY + PRECOMPUTED_HASH_OFFSET, 
precomputedHash);
+      buffer.set(index * POSITIONS_PER_ENTRY + VALUE_OFFSET, value);
+    }
+
+    public void clear(long index) {
+      // Since all fields of an index are set/unset together as a unit, we 
only need to choose one
+      // field to serve
+      // as a marker for empty slots. Group IDs have been arbitrarily selected 
for that purpose.
+      buffer.set(index * POSITIONS_PER_ENTRY, unmappedGroupId);
+    }
+
+    public boolean isEmptySlot(long index) {
+      return getGroupId(index) == unmappedGroupId;
+    }
+
+    public long getGroupId(long index) {
+      return buffer.get(index * POSITIONS_PER_ENTRY);
+    }
+
+    public long getRowId(long index) {
+      return buffer.get(index * POSITIONS_PER_ENTRY + ROW_ID_OFFSET);
+    }
+
+    public long getPrecomputedHash(long index) {
+      return buffer.get(index * POSITIONS_PER_ENTRY + PRECOMPUTED_HASH_OFFSET);
+    }
+
+    public long getValue(long index) {
+      return buffer.get(index * POSITIONS_PER_ENTRY + VALUE_OFFSET);
+    }
+
+    public void setValue(long index, long value) {
+      buffer.set(index * POSITIONS_PER_ENTRY + VALUE_OFFSET, value);
+    }
+
+    public long sizeOf() {
+      return INSTANCE_SIZE + buffer.sizeOf();
+    }
+  }
+
+  public static long maxFill(long n, float f) {
+    return Math.min((long) Math.ceil((double) ((float) n * f)), n - 1L);
+  }
+
+  public static long nextPowerOfTwo(long x) {
+    return 1L << 64 - Long.numberOfLeadingZeros(x - 1L);
+  }
+
+  public static long bigArraySize(long expected, float f) {
+    return nextPowerOfTwo((long) Math.ceil((double) ((float) expected / f)));
+  }
+
+  public static long mix(long x) {
+    long h = x * -7046029254386353131L;
+    h ^= h >>> 32;
+    return h ^ h >>> 16;
+  }
+}
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/TsBlockWithPositionEqualsAndHash.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/TsBlockWithPositionEqualsAndHash.java
new file mode 100644
index 00000000000..1c76a90233a
--- /dev/null
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/TsBlockWithPositionEqualsAndHash.java
@@ -0,0 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.queryengine.execution.operator;
+
+import org.apache.tsfile.read.common.block.TsBlock;
+
+public interface TsBlockWithPositionEqualsAndHash {
+  boolean equals(TsBlock left, int leftPosition, TsBlock right, int 
rightPosition);
+
+  long hashCode(TsBlock block, int position);
+}
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/process/window/TopKRankingOperator.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/process/window/TopKRankingOperator.java
index b94c546ac54..c703fb423b9 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/process/window/TopKRankingOperator.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/process/window/TopKRankingOperator.java
@@ -21,11 +21,14 @@ package 
org.apache.iotdb.db.queryengine.execution.operator.process.window;
 
 import org.apache.iotdb.db.queryengine.execution.MemoryEstimationHelper;
 import org.apache.iotdb.db.queryengine.execution.operator.GroupedTopNBuilder;
+import 
org.apache.iotdb.db.queryengine.execution.operator.GroupedTopNRankBuilder;
 import 
org.apache.iotdb.db.queryengine.execution.operator.GroupedTopNRowNumberBuilder;
 import org.apache.iotdb.db.queryengine.execution.operator.Operator;
 import org.apache.iotdb.db.queryengine.execution.operator.OperatorContext;
 import 
org.apache.iotdb.db.queryengine.execution.operator.SimpleTsBlockWithPositionComparator;
+import 
org.apache.iotdb.db.queryengine.execution.operator.SimpleTsBlockWithPositionEqualsAndHash;
 import 
org.apache.iotdb.db.queryengine.execution.operator.TsBlockWithPositionComparator;
+import 
org.apache.iotdb.db.queryengine.execution.operator.TsBlockWithPositionEqualsAndHash;
 import 
org.apache.iotdb.db.queryengine.execution.operator.process.ProcessOperator;
 import 
org.apache.iotdb.db.queryengine.execution.operator.source.relational.aggregation.grouped.UpdateMemory;
 import 
org.apache.iotdb.db.queryengine.execution.operator.source.relational.aggregation.grouped.hash.GroupByHash;
@@ -150,16 +153,21 @@ public class TopKRankingOperator implements 
ProcessOperator {
               groupByHashSupplier.get());
     }
 
-    //    if (rankingType == TopKRankingNode.RankingType.RANK) {
-    //      Comparator<TsBlock> comparator = new 
SimpleTsBlockWithPositionComparator(
-    //          sourceTypes, sortChannels, ascendingOrders);
-    //      return () -> new GroupedTopNRankBuilder(
-    //          sourceTypes,
-    //          comparator,
-    //          maxRankingPerPartition,
-    //          generateRanking,
-    //          groupByHashSupplier.get());
-    //    }
+    if (rankingType == TopKRankingNode.RankingType.RANK) {
+      TsBlockWithPositionComparator comparator =
+          new SimpleTsBlockWithPositionComparator(inputTypes, sortChannels, 
sortOrders);
+      TsBlockWithPositionEqualsAndHash equalsAndHash =
+          new SimpleTsBlockWithPositionEqualsAndHash(inputTypes, sortChannels);
+      return () ->
+          new GroupedTopNRankBuilder(
+              inputTypes,
+              comparator,
+              equalsAndHash,
+              maxRowCountPerPartition,
+              !partial,
+              partitionChannels.stream().mapToInt(Integer::intValue).toArray(),
+              groupByHashSupplier.get());
+    }
 
     if (rankingType == TopKRankingNode.RankingType.DENSE_RANK) {
       throw new UnsupportedOperationException("DENSE_RANK not yet 
implemented");
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/optimizations/SortElimination.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/optimizations/SortElimination.java
index bb276f07150..ce37a5a9ed1 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/optimizations/SortElimination.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/optimizations/SortElimination.java
@@ -27,8 +27,10 @@ import 
org.apache.iotdb.db.queryengine.plan.relational.planner.node.DeviceTableS
 import org.apache.iotdb.db.queryengine.plan.relational.planner.node.FillNode;
 import 
org.apache.iotdb.db.queryengine.plan.relational.planner.node.GapFillNode;
 import 
org.apache.iotdb.db.queryengine.plan.relational.planner.node.PatternRecognitionNode;
+import 
org.apache.iotdb.db.queryengine.plan.relational.planner.node.RowNumberNode;
 import org.apache.iotdb.db.queryengine.plan.relational.planner.node.SortNode;
 import 
org.apache.iotdb.db.queryengine.plan.relational.planner.node.StreamSortNode;
+import 
org.apache.iotdb.db.queryengine.plan.relational.planner.node.TopKRankingNode;
 import 
org.apache.iotdb.db.queryengine.plan.relational.planner.node.ValueFillNode;
 import org.apache.iotdb.db.queryengine.plan.relational.planner.node.WindowNode;
 
@@ -139,6 +141,26 @@ public class SortElimination implements PlanOptimizer {
       context.setCannotEliminateSort(true);
       return newNode;
     }
+
+    @Override
+    public PlanNode visitTopKRanking(TopKRankingNode node, Context context) {
+      PlanNode newNode = node.clone();
+      for (PlanNode child : node.getChildren()) {
+        newNode.addChild(child.accept(this, context));
+      }
+      context.setCannotEliminateSort(true);
+      return newNode;
+    }
+
+    @Override
+    public PlanNode visitRowNumber(RowNumberNode node, Context context) {
+      PlanNode newNode = node.clone();
+      for (PlanNode child : node.getChildren()) {
+        newNode.addChild(child.accept(this, context));
+      }
+      context.setCannotEliminateSort(true);
+      return newNode;
+    }
   }
 
   private static class Context {
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/optimizations/TransformSortToStreamSort.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/optimizations/TransformSortToStreamSort.java
index 7eb6dfb81c9..15da1d39089 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/optimizations/TransformSortToStreamSort.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/optimizations/TransformSortToStreamSort.java
@@ -33,8 +33,10 @@ import 
org.apache.iotdb.db.queryengine.plan.relational.planner.node.CteScanNode;
 import 
org.apache.iotdb.db.queryengine.plan.relational.planner.node.DeviceTableScanNode;
 import org.apache.iotdb.db.queryengine.plan.relational.planner.node.GroupNode;
 import 
org.apache.iotdb.db.queryengine.plan.relational.planner.node.InformationSchemaTableScanNode;
+import 
org.apache.iotdb.db.queryengine.plan.relational.planner.node.RowNumberNode;
 import org.apache.iotdb.db.queryengine.plan.relational.planner.node.SortNode;
 import 
org.apache.iotdb.db.queryengine.plan.relational.planner.node.StreamSortNode;
+import 
org.apache.iotdb.db.queryengine.plan.relational.planner.node.TopKRankingNode;
 import org.apache.iotdb.db.queryengine.plan.relational.planner.node.UnionNode;
 
 import java.util.Map;
@@ -163,6 +165,18 @@ public class TransformSortToStreamSort implements 
PlanOptimizer {
       return visitTableScan(node, context);
     }
 
+    @Override
+    public PlanNode visitTopKRanking(TopKRankingNode node, Context context) {
+      context.setCanTransform(false);
+      return visitPlan(node, context);
+    }
+
+    @Override
+    public PlanNode visitRowNumber(RowNumberNode node, Context context) {
+      context.setCanTransform(false);
+      return visitPlan(node, context);
+    }
+
     @Override
     public PlanNode visitUnion(UnionNode node, Context context) {
       context.setCanTransform(false);
diff --git 
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/operator/process/window/TopKRankingOperatorTest.java
 
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/operator/process/window/TopKRankingOperatorTest.java
new file mode 100644
index 00000000000..1d50b42673d
--- /dev/null
+++ 
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/operator/process/window/TopKRankingOperatorTest.java
@@ -0,0 +1,696 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.queryengine.execution.operator.process.window;
+
+import org.apache.iotdb.commons.concurrent.IoTDBThreadPoolFactory;
+import org.apache.iotdb.db.queryengine.common.FragmentInstanceId;
+import org.apache.iotdb.db.queryengine.common.PlanFragmentId;
+import org.apache.iotdb.db.queryengine.common.QueryId;
+import org.apache.iotdb.db.queryengine.execution.driver.DriverContext;
+import 
org.apache.iotdb.db.queryengine.execution.fragment.FragmentInstanceContext;
+import 
org.apache.iotdb.db.queryengine.execution.fragment.FragmentInstanceStateMachine;
+import org.apache.iotdb.db.queryengine.execution.operator.Operator;
+import org.apache.iotdb.db.queryengine.execution.operator.OperatorContext;
+import 
org.apache.iotdb.db.queryengine.execution.operator.process.TreeLinearFillOperator;
+import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNodeId;
+import org.apache.iotdb.db.queryengine.plan.relational.planner.SortOrder;
+import 
org.apache.iotdb.db.queryengine.plan.relational.planner.node.TopKRankingNode;
+
+import org.apache.tsfile.common.conf.TSFileConfig;
+import org.apache.tsfile.enums.TSDataType;
+import org.apache.tsfile.read.common.block.TsBlock;
+import org.apache.tsfile.read.common.block.TsBlockBuilder;
+import org.apache.tsfile.read.common.block.column.RunLengthEncodedColumn;
+import org.apache.tsfile.utils.Binary;
+import org.junit.Test;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.concurrent.ExecutorService;
+
+import static 
org.apache.iotdb.db.queryengine.execution.fragment.FragmentInstanceContext.createFragmentInstanceContext;
+import static 
org.apache.iotdb.db.queryengine.execution.operator.source.relational.TableScanOperator.TIME_COLUMN_TEMPLATE;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+public class TopKRankingOperatorTest {
+  private static final ExecutorService instanceNotificationExecutor =
+      IoTDBThreadPoolFactory.newFixedThreadPool(
+          1, "topKRankingOperator-test-instance-notification");
+
+  @Test
+  public void testTopKWithPartition() {
+    // Input: 4 rows for d1, 3 rows for d2
+    // Sort by value (column 2) ascending, top 2 per partition
+    long[][] timeArray = {{1, 2, 3, 4, 5, 6, 7}};
+    String[][] deviceArray = {{"d1", "d1", "d1", "d1", "d2", "d2", "d2"}};
+    int[][] valueArray = {{5, 3, 1, 4, 6, 2, 1}};
+
+    // Expected: top 2 per partition sorted by value ASC
+    // d1: value=1(rn=1), value=3(rn=2)
+    // d2: value=1(rn=1), value=2(rn=2)
+    Map<String, List<int[]>> expectedByDevice = new HashMap<>();
+    expectedByDevice.put("d1", Arrays.asList(new int[] {1, 1}, new int[] {3, 
2}));
+    expectedByDevice.put("d2", Arrays.asList(new int[] {1, 1}, new int[] {2, 
2}));
+
+    verifyTopKResultsByPartition(
+        timeArray,
+        deviceArray,
+        valueArray,
+        Collections.singletonList(1),
+        Collections.singletonList(TSDataType.TEXT),
+        Collections.singletonList(2),
+        Collections.singletonList(SortOrder.ASC_NULLS_LAST),
+        2,
+        false,
+        expectedByDevice,
+        4);
+  }
+
+  @Test
+  public void testTopKWithPartitionDescending() {
+    long[][] timeArray = {{1, 2, 3, 4, 5, 6}};
+    String[][] deviceArray = {{"d1", "d1", "d1", "d2", "d2", "d2"}};
+    int[][] valueArray = {{5, 3, 1, 6, 2, 4}};
+
+    // top 2 per partition sorted by value DESC
+    // d1: value=5(rn=1), value=3(rn=2)
+    // d2: value=6(rn=1), value=4(rn=2)
+    Map<String, List<int[]>> expectedByDevice = new HashMap<>();
+    expectedByDevice.put("d1", Arrays.asList(new int[] {5, 1}, new int[] {3, 
2}));
+    expectedByDevice.put("d2", Arrays.asList(new int[] {6, 1}, new int[] {4, 
2}));
+
+    verifyTopKResultsByPartition(
+        timeArray,
+        deviceArray,
+        valueArray,
+        Collections.singletonList(1),
+        Collections.singletonList(TSDataType.TEXT),
+        Collections.singletonList(2),
+        Collections.singletonList(SortOrder.DESC_NULLS_LAST),
+        2,
+        false,
+        expectedByDevice,
+        4);
+  }
+
+  @Test
+  public void testTopKWithoutPartition() {
+    // No partition: all rows in one group
+    long[][] timeArray = {{1, 2, 3, 4, 5}};
+    String[][] deviceArray = {{"d1", "d1", "d2", "d2", "d2"}};
+    int[][] valueArray = {{5, 3, 1, 4, 2}};
+
+    // top 3 globally sorted by value ASC: value=1(rn=1), value=2(rn=2), 
value=3(rn=3)
+    int[][] expectedValueAndRn = {{1, 1}, {2, 2}, {3, 3}};
+
+    verifyTopKResultsGlobal(
+        timeArray,
+        deviceArray,
+        valueArray,
+        Collections.emptyList(),
+        Collections.emptyList(),
+        Collections.singletonList(2),
+        Collections.singletonList(SortOrder.ASC_NULLS_LAST),
+        3,
+        false,
+        expectedValueAndRn,
+        3);
+  }
+
+  @Test
+  public void testTopKWithMultipleTsBlocks() {
+    long[][] timeArray = {{1, 2, 3}, {4, 5}, {6, 7}};
+    String[][] deviceArray = {{"d1", "d1", "d1"}, {"d2", "d2"}, {"d2", "d2"}};
+    int[][] valueArray = {{5, 3, 1}, {6, 2}, {4, 1}};
+
+    // top 2 per partition sorted by value ASC
+    // d1: value=1(rn=1), value=3(rn=2)
+    // d2: value=1(rn=1), value=2(rn=2)
+    Map<String, List<int[]>> expectedByDevice = new HashMap<>();
+    expectedByDevice.put("d1", Arrays.asList(new int[] {1, 1}, new int[] {3, 
2}));
+    expectedByDevice.put("d2", Arrays.asList(new int[] {1, 1}, new int[] {2, 
2}));
+
+    verifyTopKResultsByPartition(
+        timeArray,
+        deviceArray,
+        valueArray,
+        Collections.singletonList(1),
+        Collections.singletonList(TSDataType.TEXT),
+        Collections.singletonList(2),
+        Collections.singletonList(SortOrder.ASC_NULLS_LAST),
+        2,
+        false,
+        expectedByDevice,
+        4);
+  }
+
+  @Test
+  public void testTopKWithTopOne() {
+    long[][] timeArray = {{1, 2, 3, 4}};
+    String[][] deviceArray = {{"d1", "d1", "d2", "d2"}};
+    int[][] valueArray = {{5, 3, 6, 2}};
+
+    // top 1 per partition sorted by value ASC
+    // d1: value=3(rn=1)
+    // d2: value=2(rn=1)
+    Map<String, List<int[]>> expectedByDevice = new HashMap<>();
+    expectedByDevice.put("d1", Collections.singletonList(new int[] {3, 1}));
+    expectedByDevice.put("d2", Collections.singletonList(new int[] {2, 1}));
+
+    verifyTopKResultsByPartition(
+        timeArray,
+        deviceArray,
+        valueArray,
+        Collections.singletonList(1),
+        Collections.singletonList(TSDataType.TEXT),
+        Collections.singletonList(2),
+        Collections.singletonList(SortOrder.ASC_NULLS_LAST),
+        1,
+        false,
+        expectedByDevice,
+        2);
+  }
+
+  // ==================== RANK Tests ====================
+
+  @Test
+  public void testRankWithPartitionAndTies() {
+    // d1: values [5, 3, 3, 1], d2: values [6, 2, 2]
+    // topN=2 ASC → d1 keeps rank≤2: 1(r=1),3(r=2),3(r=2); d2 keeps rank≤2: 
2(r=1),2(r=1)
+    long[][] timeArray = {{1, 2, 3, 4, 5, 6, 7}};
+    String[][] deviceArray = {{"d1", "d1", "d1", "d1", "d2", "d2", "d2"}};
+    int[][] valueArray = {{5, 3, 3, 1, 6, 2, 2}};
+
+    Map<String, List<int[]>> expectedByDevice = new HashMap<>();
+    expectedByDevice.put("d1", Arrays.asList(new int[] {1, 1}, new int[] {3, 
2}, new int[] {3, 2}));
+    expectedByDevice.put("d2", Arrays.asList(new int[] {2, 1}, new int[] {2, 
1}));
+
+    verifyTopKResultsByPartition(
+        timeArray,
+        deviceArray,
+        valueArray,
+        Collections.singletonList(1),
+        Collections.singletonList(TSDataType.TEXT),
+        Collections.singletonList(2),
+        Collections.singletonList(SortOrder.ASC_NULLS_LAST),
+        2,
+        false,
+        TopKRankingNode.RankingType.RANK,
+        expectedByDevice,
+        5);
+  }
+
+  @Test
+  public void testRankWithPartitionDescendingAndTies() {
+    // d1: values [5, 3, 3, 1] DESC → 5(r=1),3(r=2),3(r=2),1(r=4) → keep rank≤2
+    // d2: values [6, 2, 4] DESC → 6(r=1),4(r=2),2(r=3) → keep rank≤2
+    long[][] timeArray = {{1, 2, 3, 4, 5, 6}};
+    String[][] deviceArray = {{"d1", "d1", "d1", "d1", "d2", "d2"}};
+    int[][] valueArray = {{5, 3, 3, 1, 6, 4}};
+
+    Map<String, List<int[]>> expectedByDevice = new HashMap<>();
+    expectedByDevice.put("d1", Arrays.asList(new int[] {5, 1}, new int[] {3, 
2}, new int[] {3, 2}));
+    expectedByDevice.put("d2", Arrays.asList(new int[] {6, 1}, new int[] {4, 
2}));
+
+    verifyTopKResultsByPartition(
+        timeArray,
+        deviceArray,
+        valueArray,
+        Collections.singletonList(1),
+        Collections.singletonList(TSDataType.TEXT),
+        Collections.singletonList(2),
+        Collections.singletonList(SortOrder.DESC_NULLS_LAST),
+        2,
+        false,
+        TopKRankingNode.RankingType.RANK,
+        expectedByDevice,
+        5);
+  }
+
+  @Test
+  public void testRankWithoutPartitionAndTies() {
+    // Global: values [5, 3, 1, 3, 2] ASC → 1(r=1),2(r=2),3(r=3),3(r=3),5(r=5) 
→ keep rank≤3
+    long[][] timeArray = {{1, 2, 3, 4, 5}};
+    String[][] deviceArray = {{"d1", "d1", "d2", "d2", "d2"}};
+    int[][] valueArray = {{5, 3, 1, 3, 2}};
+
+    int[][] expectedValueAndRank = {{1, 1}, {2, 2}, {3, 3}, {3, 3}};
+
+    verifyTopKResultsGlobal(
+        timeArray,
+        deviceArray,
+        valueArray,
+        Collections.emptyList(),
+        Collections.emptyList(),
+        Collections.singletonList(2),
+        Collections.singletonList(SortOrder.ASC_NULLS_LAST),
+        3,
+        false,
+        TopKRankingNode.RankingType.RANK,
+        expectedValueAndRank,
+        4);
+  }
+
+  @Test
+  public void testRankWithMultipleTsBlocksAndTies() {
+    // Same data as testRankWithPartitionAndTies, split across blocks
+    long[][] timeArray = {{1, 2, 3}, {4, 5}, {6, 7}};
+    String[][] deviceArray = {{"d1", "d1", "d1"}, {"d1", "d2"}, {"d2", "d2"}};
+    int[][] valueArray = {{5, 3, 3}, {1, 6}, {2, 2}};
+
+    Map<String, List<int[]>> expectedByDevice = new HashMap<>();
+    expectedByDevice.put("d1", Arrays.asList(new int[] {1, 1}, new int[] {3, 
2}, new int[] {3, 2}));
+    expectedByDevice.put("d2", Arrays.asList(new int[] {2, 1}, new int[] {2, 
1}));
+
+    verifyTopKResultsByPartition(
+        timeArray,
+        deviceArray,
+        valueArray,
+        Collections.singletonList(1),
+        Collections.singletonList(TSDataType.TEXT),
+        Collections.singletonList(2),
+        Collections.singletonList(SortOrder.ASC_NULLS_LAST),
+        2,
+        false,
+        TopKRankingNode.RankingType.RANK,
+        expectedByDevice,
+        5);
+  }
+
+  @Test
+  public void testRankTopOneWithTies() {
+    // d1: values [5, 3], d2: values [2, 2]
+    // topN=1 ASC → d1: 3(r=1); d2: 2(r=1),2(r=1) (ties at rank 1 are all kept)
+    long[][] timeArray = {{1, 2, 3, 4}};
+    String[][] deviceArray = {{"d1", "d1", "d2", "d2"}};
+    int[][] valueArray = {{5, 3, 2, 2}};
+
+    Map<String, List<int[]>> expectedByDevice = new HashMap<>();
+    expectedByDevice.put("d1", Collections.singletonList(new int[] {3, 1}));
+    expectedByDevice.put("d2", Arrays.asList(new int[] {2, 1}, new int[] {2, 
1}));
+
+    verifyTopKResultsByPartition(
+        timeArray,
+        deviceArray,
+        valueArray,
+        Collections.singletonList(1),
+        Collections.singletonList(TSDataType.TEXT),
+        Collections.singletonList(2),
+        Collections.singletonList(SortOrder.ASC_NULLS_LAST),
+        1,
+        false,
+        TopKRankingNode.RankingType.RANK,
+        expectedByDevice,
+        3);
+  }
+
+  @Test
+  public void testRankNoTiesBehavesLikeRowNumber() {
+    // When no ties, rank should produce the same results as row_number
+    long[][] timeArray = {{1, 2, 3, 4, 5, 6, 7}};
+    String[][] deviceArray = {{"d1", "d1", "d1", "d1", "d2", "d2", "d2"}};
+    int[][] valueArray = {{5, 3, 1, 4, 6, 2, 7}};
+
+    Map<String, List<int[]>> expectedByDevice = new HashMap<>();
+    expectedByDevice.put("d1", Arrays.asList(new int[] {1, 1}, new int[] {3, 
2}));
+    expectedByDevice.put("d2", Arrays.asList(new int[] {2, 1}, new int[] {6, 
2}));
+
+    verifyTopKResultsByPartition(
+        timeArray,
+        deviceArray,
+        valueArray,
+        Collections.singletonList(1),
+        Collections.singletonList(TSDataType.TEXT),
+        Collections.singletonList(2),
+        Collections.singletonList(SortOrder.ASC_NULLS_LAST),
+        2,
+        false,
+        TopKRankingNode.RankingType.RANK,
+        expectedByDevice,
+        4);
+  }
+
+  /**
+   * Verifies top-K results grouped by partition (device). The output order 
between partitions is
+   * not guaranteed, so we group results by device and verify each partition 
independently.
+   */
+  private void verifyTopKResultsByPartition(
+      long[][] timeArray,
+      String[][] deviceArray,
+      int[][] valueArray,
+      List<Integer> partitionChannels,
+      List<TSDataType> partitionTypes,
+      List<Integer> sortChannels,
+      List<SortOrder> sortOrders,
+      int maxRowCountPerPartition,
+      boolean partial,
+      Map<String, List<int[]>> expectedByDevice,
+      int expectedTotalCount) {
+    verifyTopKResultsByPartition(
+        timeArray,
+        deviceArray,
+        valueArray,
+        partitionChannels,
+        partitionTypes,
+        sortChannels,
+        sortOrders,
+        maxRowCountPerPartition,
+        partial,
+        TopKRankingNode.RankingType.ROW_NUMBER,
+        expectedByDevice,
+        expectedTotalCount);
+  }
+
+  private void verifyTopKResultsByPartition(
+      long[][] timeArray,
+      String[][] deviceArray,
+      int[][] valueArray,
+      List<Integer> partitionChannels,
+      List<TSDataType> partitionTypes,
+      List<Integer> sortChannels,
+      List<SortOrder> sortOrders,
+      int maxRowCountPerPartition,
+      boolean partial,
+      TopKRankingNode.RankingType rankingType,
+      Map<String, List<int[]>> expectedByDevice,
+      int expectedTotalCount) {
+
+    Map<String, List<int[]>> actualByDevice = new HashMap<>();
+    int count = 0;
+
+    try (TopKRankingOperator operator =
+        genTopKRankingOperator(
+            timeArray,
+            deviceArray,
+            valueArray,
+            partitionChannels,
+            partitionTypes,
+            sortChannels,
+            sortOrders,
+            maxRowCountPerPartition,
+            partial,
+            rankingType)) {
+      while (!operator.isFinished()) {
+        if (operator.hasNext()) {
+          TsBlock tsBlock = operator.next();
+          if (tsBlock != null && !tsBlock.isEmpty()) {
+            int numColumns = tsBlock.getValueColumnCount();
+            for (int i = 0; i < tsBlock.getPositionCount(); i++, count++) {
+              String device =
+                  
tsBlock.getColumn(1).getBinary(i).getStringValue(TSFileConfig.STRING_CHARSET);
+              int value = tsBlock.getColumn(2).getInt(i);
+              long rowNumber = tsBlock.getColumn(numColumns - 1).getLong(i);
+              actualByDevice
+                  .computeIfAbsent(device, k -> new ArrayList<>())
+                  .add(new int[] {value, (int) rowNumber});
+            }
+          }
+        }
+      }
+      assertEquals(expectedTotalCount, count);
+
+      for (Map.Entry<String, List<int[]>> entry : expectedByDevice.entrySet()) 
{
+        String device = entry.getKey();
+        List<int[]> expectedRows = entry.getValue();
+        List<int[]> actualRows = actualByDevice.get(device);
+
+        assertTrue("Missing partition for device: " + device, actualRows != 
null);
+        assertEquals(
+            "Row count mismatch for device " + device, expectedRows.size(), 
actualRows.size());
+        for (int i = 0; i < expectedRows.size(); i++) {
+          assertEquals(
+              "Value mismatch at row " + i + " for device " + device,
+              expectedRows.get(i)[0],
+              actualRows.get(i)[0]);
+          assertEquals(
+              "Row number mismatch at row " + i + " for device " + device,
+              expectedRows.get(i)[1],
+              actualRows.get(i)[1]);
+        }
+      }
+    } catch (Exception e) {
+      e.printStackTrace();
+      fail(e.getMessage());
+    }
+  }
+
+  private void verifyTopKResultsGlobal(
+      long[][] timeArray,
+      String[][] deviceArray,
+      int[][] valueArray,
+      List<Integer> partitionChannels,
+      List<TSDataType> partitionTypes,
+      List<Integer> sortChannels,
+      List<SortOrder> sortOrders,
+      int maxRowCountPerPartition,
+      boolean partial,
+      int[][] expectedValueAndRn,
+      int expectedTotalCount) {
+    verifyTopKResultsGlobal(
+        timeArray,
+        deviceArray,
+        valueArray,
+        partitionChannels,
+        partitionTypes,
+        sortChannels,
+        sortOrders,
+        maxRowCountPerPartition,
+        partial,
+        TopKRankingNode.RankingType.ROW_NUMBER,
+        expectedValueAndRn,
+        expectedTotalCount);
+  }
+
+  private void verifyTopKResultsGlobal(
+      long[][] timeArray,
+      String[][] deviceArray,
+      int[][] valueArray,
+      List<Integer> partitionChannels,
+      List<TSDataType> partitionTypes,
+      List<Integer> sortChannels,
+      List<SortOrder> sortOrders,
+      int maxRowCountPerPartition,
+      boolean partial,
+      TopKRankingNode.RankingType rankingType,
+      int[][] expectedValueAndRn,
+      int expectedTotalCount) {
+
+    List<int[]> results = new ArrayList<>();
+    int count = 0;
+
+    try (TopKRankingOperator operator =
+        genTopKRankingOperator(
+            timeArray,
+            deviceArray,
+            valueArray,
+            partitionChannels,
+            partitionTypes,
+            sortChannels,
+            sortOrders,
+            maxRowCountPerPartition,
+            partial,
+            rankingType)) {
+      while (!operator.isFinished()) {
+        if (operator.hasNext()) {
+          TsBlock tsBlock = operator.next();
+          if (tsBlock != null && !tsBlock.isEmpty()) {
+            int numColumns = tsBlock.getValueColumnCount();
+            for (int i = 0; i < tsBlock.getPositionCount(); i++, count++) {
+              int value = tsBlock.getColumn(2).getInt(i);
+              long rowNumber = tsBlock.getColumn(numColumns - 1).getLong(i);
+              results.add(new int[] {value, (int) rowNumber});
+            }
+          }
+        }
+      }
+      assertEquals(expectedTotalCount, count);
+      for (int i = 0; i < expectedValueAndRn.length; i++) {
+        assertEquals("Value mismatch at row " + i, expectedValueAndRn[i][0], 
results.get(i)[0]);
+        assertEquals(
+            "Row number mismatch at row " + i, expectedValueAndRn[i][1], 
results.get(i)[1]);
+      }
+    } catch (Exception e) {
+      e.printStackTrace();
+      fail(e.getMessage());
+    }
+  }
+
+  private DriverContext createDriverContext() {
+    QueryId queryId = new QueryId("stub_query");
+    FragmentInstanceId instanceId =
+        new FragmentInstanceId(new PlanFragmentId(queryId, 0), 
"stub-instance");
+    FragmentInstanceStateMachine stateMachine =
+        new FragmentInstanceStateMachine(instanceId, 
instanceNotificationExecutor);
+    FragmentInstanceContext fragmentInstanceContext =
+        createFragmentInstanceContext(instanceId, stateMachine);
+    DriverContext driverContext = new DriverContext(fragmentInstanceContext, 
0);
+    PlanNodeId planNode = new PlanNodeId("1");
+    driverContext.addOperatorContext(1, planNode, 
TreeLinearFillOperator.class.getSimpleName());
+    return driverContext;
+  }
+
+  private TopKRankingOperator genTopKRankingOperator(
+      long[][] timeArray,
+      String[][] deviceArray,
+      int[][] valueArray,
+      List<Integer> partitionChannels,
+      List<TSDataType> partitionTypes,
+      List<Integer> sortChannels,
+      List<SortOrder> sortOrders,
+      int maxRowCountPerPartition,
+      boolean partial) {
+    return genTopKRankingOperator(
+        timeArray,
+        deviceArray,
+        valueArray,
+        partitionChannels,
+        partitionTypes,
+        sortChannels,
+        sortOrders,
+        maxRowCountPerPartition,
+        partial,
+        TopKRankingNode.RankingType.ROW_NUMBER);
+  }
+
+  private TopKRankingOperator genTopKRankingOperator(
+      long[][] timeArray,
+      String[][] deviceArray,
+      int[][] valueArray,
+      List<Integer> partitionChannels,
+      List<TSDataType> partitionTypes,
+      List<Integer> sortChannels,
+      List<SortOrder> sortOrders,
+      int maxRowCountPerPartition,
+      boolean partial,
+      TopKRankingNode.RankingType rankingType) {
+    DriverContext driverContext = createDriverContext();
+
+    List<TSDataType> inputDataTypes =
+        Arrays.asList(TSDataType.TIMESTAMP, TSDataType.TEXT, TSDataType.INT32);
+    List<Integer> outputChannels = new ArrayList<>();
+    for (int i = 0; i < inputDataTypes.size(); i++) {
+      outputChannels.add(i);
+    }
+
+    Operator childOperator = new ChildOperator(timeArray, deviceArray, 
valueArray, driverContext);
+    return new TopKRankingOperator(
+        driverContext.getOperatorContexts().get(0),
+        childOperator,
+        rankingType,
+        inputDataTypes,
+        outputChannels,
+        partitionChannels,
+        partitionTypes,
+        sortChannels,
+        sortOrders,
+        maxRowCountPerPartition,
+        partial,
+        Optional.empty(),
+        10,
+        Optional.empty());
+  }
+
+  static class ChildOperator implements Operator {
+    private int index;
+    private final long[][] timeArray;
+    private final String[][] deviceArray;
+    private final int[][] valueArray;
+    private final DriverContext driverContext;
+
+    ChildOperator(
+        long[][] timeArray,
+        String[][] deviceArray,
+        int[][] valueArray,
+        DriverContext driverContext) {
+      this.timeArray = timeArray;
+      this.deviceArray = deviceArray;
+      this.valueArray = valueArray;
+      this.driverContext = driverContext;
+      this.index = 0;
+    }
+
+    @Override
+    public OperatorContext getOperatorContext() {
+      return driverContext.getOperatorContexts().get(0);
+    }
+
+    @Override
+    public TsBlock next() {
+      if (index >= timeArray.length) {
+        return null;
+      }
+      TsBlockBuilder builder =
+          new TsBlockBuilder(
+              timeArray[index].length,
+              Arrays.asList(TSDataType.TIMESTAMP, TSDataType.TEXT, 
TSDataType.INT32));
+      for (int i = 0; i < timeArray[index].length; i++) {
+        builder.getColumnBuilder(0).writeLong(timeArray[index][i]);
+        builder
+            .getColumnBuilder(1)
+            .writeBinary(new Binary(deviceArray[index][i], 
TSFileConfig.STRING_CHARSET));
+        builder.getColumnBuilder(2).writeInt(valueArray[index][i]);
+      }
+      builder.declarePositions(timeArray[index].length);
+      index++;
+      return builder.build(
+          new RunLengthEncodedColumn(TIME_COLUMN_TEMPLATE, 
builder.getPositionCount()));
+    }
+
+    @Override
+    public boolean hasNext() {
+      return index < timeArray.length;
+    }
+
+    @Override
+    public boolean isFinished() {
+      return index >= timeArray.length;
+    }
+
+    @Override
+    public void close() {}
+
+    @Override
+    public long calculateMaxPeekMemory() {
+      return 0;
+    }
+
+    @Override
+    public long calculateMaxReturnSize() {
+      return 0;
+    }
+
+    @Override
+    public long calculateRetainedSizeAfterCallingNext() {
+      return 0;
+    }
+
+    @Override
+    public long ramBytesUsed() {
+      return 0;
+    }
+  }
+}
diff --git 
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/relational/planner/WindowFunctionOptimizationTest.java
 
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/relational/planner/WindowFunctionOptimizationTest.java
index e31f2f7e580..b80fda29e10 100644
--- 
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/relational/planner/WindowFunctionOptimizationTest.java
+++ 
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/relational/planner/WindowFunctionOptimizationTest.java
@@ -122,7 +122,7 @@ public class WindowFunctionOptimizationTest {
     PlanTester planTester = new PlanTester();
 
     String sql =
-        "SELECT * FROM (SELECT *, row_number() OVER (PARTITION BY tag1, tag2, 
tag3 ORDER BY s1) as rn FROM table1) WHERE rn <= 2";
+        "SELECT * FROM (SELECT *, rank() OVER (PARTITION BY tag1, tag2, tag3 
ORDER BY s1) as rn FROM table1) WHERE rn <= 2";
     LogicalQueryPlan logicalQueryPlan = planTester.createPlan(sql);
     PlanMatchPattern tableScan = tableScan("testdb.table1");
 
@@ -161,7 +161,7 @@ public class WindowFunctionOptimizationTest {
     PlanTester planTester = new PlanTester();
 
     String sql =
-        "SELECT * FROM (SELECT *, row_number() OVER (PARTITION BY tag1 ORDER 
BY s1) as rn FROM table1) WHERE rn <= 2";
+        "SELECT * FROM (SELECT *, rank() OVER (PARTITION BY tag1 ORDER BY s1) 
as rn FROM table1) WHERE rn <= 2";
     LogicalQueryPlan logicalQueryPlan = planTester.createPlan(sql);
     PlanMatchPattern tableScan = tableScan("testdb.table1");
 

Reply via email to