This is an automated email from the ASF dual-hosted git repository.

zhihao pushed a commit to branch perf/szh/window_func_optimize
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit fa450f17865ce878ba74e1c49b376d62f587cbc3
Author: Sh-Zh-7 <[email protected]>
AuthorDate: Fri Dec 26 03:15:01 2025 +0800

    Finish all rules, nodes and operators.
---
 .../execution/operator/GroupedTopNBuilder.java     |  13 +
 .../operator/GroupedTopNRowNumberAccumulator.java  | 538 +++++++++++++++++++++
 .../operator/GroupedTopNRowNumberBuilder.java      | 162 +++++++
 .../queryengine/execution/operator/IdRegistry.java |  57 +++
 .../operator/RowIdComparisonStrategy.java          |   5 +
 .../execution/operator/RowIdHashStrategy.java      |   8 +
 .../execution/operator/RowReference.java           |  26 +
 .../operator/RowReferencePageManager.java          | 361 ++++++++++++++
 .../SimpleTsBlockWithPositionComparator.java       |  27 ++
 .../operator/TsBlockWithPositionComparator.java    |   7 +
 .../execution/operator/process/ValuesOperator.java | 120 +++++
 .../operator/process/window/RowNumberOperator.java | 214 ++++++++
 .../process/window/TopKRankingOperator.java        | 277 +++++++++++
 .../grouped/array/IntArrayFIFOQueue.java           | 182 +++++++
 .../grouped/array/LongBigArrayFIFOQueue.java       | 178 +++++++
 .../grouped/hash/NoChannelGroupByHash.java         |  53 ++
 .../plan/planner/plan/node/PlanNodeType.java       |   3 +
 .../plan/planner/plan/node/PlanVisitor.java        |  11 +
 .../iterative/rule/GatherAndMergeWindows.java      | 315 ++++++++++++
 .../iterative/rule/PushDownFilterIntoWindow.java   | 147 ++++++
 .../iterative/rule/PushDownLimitIntoWindow.java    |  81 ++++
 .../iterative/rule/RemoveRedundantWindow.java      |  26 +
 .../iterative/rule/ReplaceWindowWithRowNumber.java |  43 ++
 .../relational/planner/iterative/rule/Util.java    |  45 +-
 .../relational/planner/node/RowNumberNode.java     | 177 +++++++
 .../relational/planner/node/TopKRankingNode.java   | 168 +++++++
 .../plan/relational/planner/node/ValuesNode.java   | 211 ++++++++
 .../plan/relational/planner/node/WindowNode.java   |   4 +
 .../org/apache/iotdb/db/utils/HeapTraversal.java   |  45 ++
 29 files changed, 3483 insertions(+), 21 deletions(-)

diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/GroupedTopNBuilder.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/GroupedTopNBuilder.java
new file mode 100644
index 00000000000..54b1a2bf66a
--- /dev/null
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/GroupedTopNBuilder.java
@@ -0,0 +1,13 @@
+package org.apache.iotdb.db.queryengine.execution.operator;
+
+import org.apache.tsfile.read.common.block.TsBlock;
+
+import java.util.Iterator;
+
+public interface GroupedTopNBuilder {
+  void addTsBlock(TsBlock tsBlock);
+
+  Iterator<TsBlock> getResult();
+
+  long getEstimatedSizeInBytes();
+}
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/GroupedTopNRowNumberAccumulator.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/GroupedTopNRowNumberAccumulator.java
new file mode 100644
index 00000000000..5d53ff08b76
--- /dev/null
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/GroupedTopNRowNumberAccumulator.java
@@ -0,0 +1,538 @@
+package org.apache.iotdb.db.queryengine.execution.operator;
+
+import 
org.apache.iotdb.db.queryengine.execution.operator.source.relational.aggregation.grouped.array.LongBigArray;
+import 
org.apache.iotdb.db.queryengine.execution.operator.source.relational.aggregation.grouped.array.LongBigArrayFIFOQueue;
+import org.apache.iotdb.db.utils.HeapTraversal;
+
+import com.google.common.annotations.VisibleForTesting;
+import org.apache.tsfile.read.common.block.TsBlock;
+import org.apache.tsfile.utils.RamUsageEstimator;
+
+import java.util.function.LongConsumer;
+
+import static com.google.common.base.Preconditions.checkArgument;
+import static com.google.common.base.Preconditions.checkState;
+import static com.google.common.base.Verify.verify;
+import static java.lang.Math.abs;
+import static java.lang.Math.max;
+import static java.util.Objects.requireNonNull;
+
+public class GroupedTopNRowNumberAccumulator {
+  private static final long INSTANCE_SIZE =
+      
RamUsageEstimator.shallowSizeOfInstance(GroupedTopNRowNumberAccumulator.class);
+  private static final long UNKNOWN_INDEX = -1;
+
+  private final GroupIdToHeapBuffer groupIdToHeapBuffer = new 
GroupIdToHeapBuffer();
+  private final HeapNodeBuffer heapNodeBuffer = new HeapNodeBuffer();
+  private final HeapTraversal heapTraversal = new HeapTraversal();
+
+  private final RowIdComparisonStrategy strategy;
+  private final int topN;
+  private final LongConsumer rowIdEvictionListener;
+
+  public GroupedTopNRowNumberAccumulator(
+      RowIdComparisonStrategy strategy, int topN, LongConsumer 
rowIdEvictionListener) {
+    this.strategy = requireNonNull(strategy, "strategy is null");
+    checkArgument(topN > 0, "topN must be greater than zero");
+    this.topN = topN;
+    this.rowIdEvictionListener =
+        requireNonNull(rowIdEvictionListener, "rowIdEvictionListener is null");
+  }
+
+  public long sizeOf() {
+    return INSTANCE_SIZE
+        + groupIdToHeapBuffer.sizeOf()
+        + heapNodeBuffer.sizeOf()
+        + heapTraversal.sizeOf();
+  }
+
+  public int findFirstPositionToAdd(
+      TsBlock newPage,
+      int groupCount,
+      int[] groupIds,
+      TsBlockWithPositionComparator comparator,
+      RowReferencePageManager pageManager) {
+    int currentTotalGroups = groupIdToHeapBuffer.getTotalGroups();
+    groupIdToHeapBuffer.allocateGroupIfNeeded(groupCount);
+
+    for (int position = 0; position < newPage.getPositionCount(); position++) {
+      int groupId = groupIds[position];
+      if (groupId >= currentTotalGroups || calculateRootRowNumber(groupId) < 
topN) {
+        return position;
+      }
+      long heapRootNodeIndex = 
groupIdToHeapBuffer.getHeapRootNodeIndex(groupId);
+      if (heapRootNodeIndex == UNKNOWN_INDEX) {
+        return position;
+      }
+      long rowId = heapNodeBuffer.getRowId(heapRootNodeIndex);
+      TsBlock rightPage = pageManager.getPage(rowId);
+      int rightPosition = pageManager.getPosition(rowId);
+      if (comparator.compareTo(newPage, position, rightPage, rightPosition) < 
0) {
+        return position;
+      }
+    }
+    return -1;
+  }
+
+  /**
+   * Add the specified row to this accumulator.
+   *
+   * <p>This may trigger row eviction callbacks if other rows have to be 
evicted to make space.
+   *
+   * @return true if this row was incorporated, false otherwise
+   */
+  public boolean add(int groupId, RowReference rowReference) {
+    groupIdToHeapBuffer.allocateGroupIfNeeded(groupId);
+
+    long heapRootNodeIndex = groupIdToHeapBuffer.getHeapRootNodeIndex(groupId);
+    if (heapRootNodeIndex == UNKNOWN_INDEX || calculateRootRowNumber(groupId) 
< topN) {
+      heapInsert(groupId, rowReference.allocateRowId());
+      return true;
+    }
+    if (rowReference.compareTo(strategy, 
heapNodeBuffer.getRowId(heapRootNodeIndex)) < 0) {
+      heapPopAndInsert(groupId, rowReference.allocateRowId(), 
rowIdEvictionListener);
+      return true;
+    }
+    return false;
+  }
+
+  /**
+   * Drain the contents of groupId from this accumulator to the provided 
output row ID buffer.
+   *
+   * <p>Rows will be presented in increasing rank order. Draining will not 
trigger any row eviction
+   * callbacks. After this method completion, the Accumulator will contain 
zero rows for the
+   * specified groupId.
+   *
+   * @return number of rows deposited to the output buffer
+   */
+  public long drainTo(int groupId, LongBigArray rowIdOutput) {
+    long heapSize = groupIdToHeapBuffer.getHeapSize(groupId);
+    rowIdOutput.ensureCapacity(heapSize);
+    // Heap is inverted to output order, so insert back to front
+    for (long i = heapSize - 1; i >= 0; i--) {
+      rowIdOutput.set(i, peekRootRowId(groupId));
+      // No eviction listener needed because this is an explicit caller 
directive to extract data
+      heapPop(groupId, null);
+    }
+    return heapSize;
+  }
+
+  private long calculateRootRowNumber(int groupId) {
+    return groupIdToHeapBuffer.getHeapSize(groupId);
+  }
+
+  private long peekRootRowId(int groupId) {
+    long heapRootNodeIndex = groupIdToHeapBuffer.getHeapRootNodeIndex(groupId);
+    checkArgument(heapRootNodeIndex != UNKNOWN_INDEX, "No root to peek");
+    return heapNodeBuffer.getRowId(heapRootNodeIndex);
+  }
+
+  private long getChildIndex(long heapNodeIndex, HeapTraversal.Child child) {
+    return child == HeapTraversal.Child.LEFT
+        ? heapNodeBuffer.getLeftChildHeapIndex(heapNodeIndex)
+        : heapNodeBuffer.getRightChildHeapIndex(heapNodeIndex);
+  }
+
+  private void setChildIndex(long heapNodeIndex, HeapTraversal.Child child, 
long newChildIndex) {
+    if (child == HeapTraversal.Child.LEFT) {
+      heapNodeBuffer.setLeftChildHeapIndex(heapNodeIndex, newChildIndex);
+    } else {
+      heapNodeBuffer.setRightChildHeapIndex(heapNodeIndex, newChildIndex);
+    }
+  }
+
+  /**
+   * Pop the root node off the group ID's max heap.
+   *
+   * @param contextEvictionListener optional callback for the root node that 
gets popped off
+   */
+  private void heapPop(int groupId, LongConsumer contextEvictionListener) {
+    long heapRootNodeIndex = groupIdToHeapBuffer.getHeapRootNodeIndex(groupId);
+    checkArgument(heapRootNodeIndex != UNKNOWN_INDEX, "Group ID has an empty 
heap");
+
+    long lastNodeIndex = heapDetachLastInsertionLeaf(groupId);
+    long lastRowId = heapNodeBuffer.getRowId(lastNodeIndex);
+    heapNodeBuffer.deallocate(lastNodeIndex);
+
+    if (lastNodeIndex == heapRootNodeIndex) {
+      // The root is the last node remaining
+      if (contextEvictionListener != null) {
+        contextEvictionListener.accept(lastRowId);
+      }
+    } else {
+      // Pop the root and insert lastRowId back into the heap to ensure a 
balanced tree
+      heapPopAndInsert(groupId, lastRowId, contextEvictionListener);
+    }
+  }
+
+  /**
+   * Detaches (but does not deallocate) the leaf in the bottom right-most 
position in the heap.
+   *
+   * <p>Given the fixed insertion order, the bottom right-most leaf will 
correspond to the last leaf
+   * node inserted into the balanced heap.
+   *
+   * @return leaf node index that was detached from the heap
+   */
+  private long heapDetachLastInsertionLeaf(int groupId) {
+    long heapRootNodeIndex = groupIdToHeapBuffer.getHeapRootNodeIndex(groupId);
+    long heapSize = groupIdToHeapBuffer.getHeapSize(groupId);
+
+    long previousNodeIndex = UNKNOWN_INDEX;
+    HeapTraversal.Child childPosition = null;
+    long currentNodeIndex = heapRootNodeIndex;
+
+    heapTraversal.resetWithPathTo(heapSize);
+    while (!heapTraversal.isTarget()) {
+      previousNodeIndex = currentNodeIndex;
+      childPosition = heapTraversal.nextChild();
+      currentNodeIndex = getChildIndex(currentNodeIndex, childPosition);
+      verify(currentNodeIndex != UNKNOWN_INDEX, "Target node must exist");
+    }
+
+    // Detach the last insertion leaf node, but do not deallocate yet
+    if (previousNodeIndex == UNKNOWN_INDEX) {
+      // Last insertion leaf was the root node
+      groupIdToHeapBuffer.setHeapRootNodeIndex(groupId, UNKNOWN_INDEX);
+      groupIdToHeapBuffer.setHeapSize(groupId, 0);
+    } else {
+      setChildIndex(previousNodeIndex, childPosition, UNKNOWN_INDEX);
+      groupIdToHeapBuffer.addHeapSize(groupId, -1);
+    }
+
+    return currentNodeIndex;
+  }
+
+  /**
+   * Inserts a new row into the heap for the specified group ID.
+   *
+   * <p>The technique involves traversing the heap from the root to a new 
bottom left-priority leaf
+   * position, potentially swapping heap nodes along the way to find the 
proper insertion position
+   * for the new row. Insertions always fill the left child before the right, 
and fill up an entire
+   * heap level before moving to the next level.
+   */
+  private void heapInsert(int groupId, long newRowId) {
+    long heapRootNodeIndex = groupIdToHeapBuffer.getHeapRootNodeIndex(groupId);
+    if (heapRootNodeIndex == UNKNOWN_INDEX) {
+      // Heap is currently empty, so this will be the first node
+      heapRootNodeIndex = heapNodeBuffer.allocateNewNode(newRowId);
+
+      groupIdToHeapBuffer.setHeapRootNodeIndex(groupId, heapRootNodeIndex);
+      groupIdToHeapBuffer.setHeapSize(groupId, 1);
+      return;
+    }
+
+    long previousHeapNodeIndex = UNKNOWN_INDEX;
+    HeapTraversal.Child childPosition = null;
+    long currentHeapNodeIndex = heapRootNodeIndex;
+
+    heapTraversal.resetWithPathTo(groupIdToHeapBuffer.getHeapSize(groupId) + 
1);
+    while (!heapTraversal.isTarget()) {
+      long currentRowId = heapNodeBuffer.getRowId(currentHeapNodeIndex);
+      if (strategy.compare(newRowId, currentRowId) > 0) {
+        // Swap the row values
+        heapNodeBuffer.setRowId(currentHeapNodeIndex, newRowId);
+
+        newRowId = currentRowId;
+      }
+
+      previousHeapNodeIndex = currentHeapNodeIndex;
+      childPosition = heapTraversal.nextChild();
+      currentHeapNodeIndex = getChildIndex(currentHeapNodeIndex, 
childPosition);
+    }
+
+    verify(
+        previousHeapNodeIndex != UNKNOWN_INDEX && childPosition != null,
+        "heap must have at least one node before starting traversal");
+    verify(currentHeapNodeIndex == UNKNOWN_INDEX, "New child shouldn't exist 
yet");
+
+    long newHeapNodeIndex = heapNodeBuffer.allocateNewNode(newRowId);
+
+    //  Link the new child to the parent
+    setChildIndex(previousHeapNodeIndex, childPosition, newHeapNodeIndex);
+
+    groupIdToHeapBuffer.incrementHeapSize(groupId);
+  }
+
+  /**
+   * Pop the root node off the group ID's max heap and insert the newRowId.
+   *
+   * <p>These two operations are more efficient if performed together. The 
technique involves
+   * swapping the new row into the root position, and applying a heap down 
bubbling operation to
+   * heap-ify.
+   *
+   * @param contextEvictionListener optional callback for the root node that 
gets popped off
+   */
+  private void heapPopAndInsert(int groupId, long newRowId, LongConsumer 
contextEvictionListener) {
+    long heapRootNodeIndex = groupIdToHeapBuffer.getHeapRootNodeIndex(groupId);
+    checkState(heapRootNodeIndex != UNKNOWN_INDEX, "popAndInsert() requires at 
least a root node");
+
+    // Clear contents of the root node to create a vacancy for another row
+    long poppedRowId = heapNodeBuffer.getRowId(heapRootNodeIndex);
+
+    long currentNodeIndex = heapRootNodeIndex;
+    while (true) {
+      long maxChildNodeIndex = 
heapNodeBuffer.getLeftChildHeapIndex(currentNodeIndex);
+      if (maxChildNodeIndex == UNKNOWN_INDEX) {
+        // Left is always inserted before right, so a missing left child means 
there can't be a
+        // right child,
+        // which means this must already be a leaf position.
+        break;
+      }
+      long maxChildRowId = heapNodeBuffer.getRowId(maxChildNodeIndex);
+
+      long rightChildNodeIndex = 
heapNodeBuffer.getRightChildHeapIndex(currentNodeIndex);
+      if (rightChildNodeIndex != UNKNOWN_INDEX) {
+        long rightRowId = heapNodeBuffer.getRowId(rightChildNodeIndex);
+        if (strategy.compare(rightRowId, maxChildRowId) > 0) {
+          maxChildNodeIndex = rightChildNodeIndex;
+          maxChildRowId = rightRowId;
+        }
+      }
+
+      if (strategy.compare(newRowId, maxChildRowId) >= 0) {
+        // New row is greater than or equal to both children, so the heap 
invariant is satisfied by
+        // inserting the
+        // new row at this position
+        break;
+      }
+
+      // Swap the max child row value into the current node
+      heapNodeBuffer.setRowId(currentNodeIndex, maxChildRowId);
+
+      // Max child now has an unfilled vacancy, so continue processing with 
that as the current node
+      currentNodeIndex = maxChildNodeIndex;
+    }
+
+    heapNodeBuffer.setRowId(currentNodeIndex, newRowId);
+
+    if (contextEvictionListener != null) {
+      contextEvictionListener.accept(poppedRowId);
+    }
+  }
+
+  /** Sanity check the invariants of the underlying data structure. */
+  @VisibleForTesting
+  void verifyIntegrity() {
+    long totalHeapNodes = 0;
+    for (int groupId = 0; groupId < groupIdToHeapBuffer.getTotalGroups(); 
groupId++) {
+      long heapSize = groupIdToHeapBuffer.getHeapSize(groupId);
+      long rootNodeIndex = groupIdToHeapBuffer.getHeapRootNodeIndex(groupId);
+      verify(
+          rootNodeIndex == UNKNOWN_INDEX || calculateRootRowNumber(groupId) <= 
topN,
+          "Max heap has more values than needed");
+      IntegrityStats integrityStats = verifyHeapIntegrity(rootNodeIndex);
+      verify(
+          integrityStats.getNodeCount() == heapSize,
+          "Recorded heap size does not match actual heap size");
+      totalHeapNodes += integrityStats.getNodeCount();
+    }
+    verify(
+        totalHeapNodes == heapNodeBuffer.getActiveNodeCount(),
+        "Failed to deallocate some unused nodes");
+  }
+
+  private IntegrityStats verifyHeapIntegrity(long heapNodeIndex) {
+    if (heapNodeIndex == UNKNOWN_INDEX) {
+      return new IntegrityStats(0, 0);
+    }
+    long rowId = heapNodeBuffer.getRowId(heapNodeIndex);
+    long leftChildHeapIndex = 
heapNodeBuffer.getLeftChildHeapIndex(heapNodeIndex);
+    long rightChildHeapIndex = 
heapNodeBuffer.getRightChildHeapIndex(heapNodeIndex);
+
+    if (leftChildHeapIndex != UNKNOWN_INDEX) {
+      verify(
+          strategy.compare(rowId, heapNodeBuffer.getRowId(leftChildHeapIndex)) 
>= 0,
+          "Max heap invariant violated");
+    }
+    if (rightChildHeapIndex != UNKNOWN_INDEX) {
+      verify(leftChildHeapIndex != UNKNOWN_INDEX, "Left should always be 
inserted before right");
+      verify(
+          strategy.compare(rowId, 
heapNodeBuffer.getRowId(rightChildHeapIndex)) >= 0,
+          "Max heap invariant violated");
+    }
+
+    IntegrityStats leftIntegrityStats = 
verifyHeapIntegrity(leftChildHeapIndex);
+    IntegrityStats rightIntegrityStats = 
verifyHeapIntegrity(rightChildHeapIndex);
+
+    verify(
+        abs(leftIntegrityStats.getMaxDepth() - 
rightIntegrityStats.getMaxDepth()) <= 1,
+        "Heap not balanced");
+
+    return new IntegrityStats(
+        max(leftIntegrityStats.getMaxDepth(), 
rightIntegrityStats.getMaxDepth()) + 1,
+        leftIntegrityStats.getNodeCount() + rightIntegrityStats.getNodeCount() 
+ 1);
+  }
+
+  private static class IntegrityStats {
+    private final long maxDepth;
+    private final long nodeCount;
+
+    public IntegrityStats(long maxDepth, long nodeCount) {
+      this.maxDepth = maxDepth;
+      this.nodeCount = nodeCount;
+    }
+
+    public long getMaxDepth() {
+      return maxDepth;
+    }
+
+    public long getNodeCount() {
+      return nodeCount;
+    }
+  }
+
+  /**
+   * Buffer abstracting a mapping from group ID to a heap. The group ID 
provides the index for all
+   * operations.
+   */
+  private static class GroupIdToHeapBuffer {
+    private static final long INSTANCE_SIZE =
+        RamUsageEstimator.shallowSizeOfInstance(GroupIdToHeapBuffer.class);
+
+    /*
+     *  Memory layout:
+     *  [LONG] heapNodeIndex1,
+     *  [LONG] heapNodeIndex2,
+     *  ...
+     */
+    // Since we have a single element per group, this array is effectively 
indexed on group ID
+    private final LongBigArray heapIndexBuffer = new 
LongBigArray(UNKNOWN_INDEX);
+
+    /*
+     *  Memory layout:
+     *  [LONG] heapSize1,
+     *  [LONG] heapSize2,
+     *  ...
+     */
+    // Since we have a single element per group, this array is effectively 
indexed on group ID
+    private final LongBigArray sizeBuffer = new LongBigArray(0);
+
+    private int totalGroups;
+
+    public void allocateGroupIfNeeded(int groupId) {
+      if (totalGroups > groupId) {
+        return;
+      }
+      // Group IDs generated by GroupByHash are always generated consecutively 
starting from 0, so
+      // observing a
+      // group ID N means groups [0, N] inclusive must exist.
+      totalGroups = groupId + 1;
+      heapIndexBuffer.ensureCapacity(totalGroups);
+      sizeBuffer.ensureCapacity(totalGroups);
+    }
+
+    public int getTotalGroups() {
+      return totalGroups;
+    }
+
+    public long getHeapRootNodeIndex(int groupId) {
+      return heapIndexBuffer.get(groupId);
+    }
+
+    public void setHeapRootNodeIndex(int groupId, long heapNodeIndex) {
+      heapIndexBuffer.set(groupId, heapNodeIndex);
+    }
+
+    public long getHeapSize(int groupId) {
+      return sizeBuffer.get(groupId);
+    }
+
+    public void setHeapSize(int groupId, long count) {
+      sizeBuffer.set(groupId, count);
+    }
+
+    public void addHeapSize(int groupId, long delta) {
+      sizeBuffer.add(groupId, delta);
+    }
+
+    public void incrementHeapSize(int groupId) {
+      sizeBuffer.increment(groupId);
+    }
+
+    public long sizeOf() {
+      return INSTANCE_SIZE + heapIndexBuffer.sizeOf() + sizeBuffer.sizeOf();
+    }
+  }
+
+  /**
+   * Buffer abstracting storage of nodes in the heap. Nodes are referenced by 
their node index for
+   * operations.
+   */
+  private static class HeapNodeBuffer {
+    private static final long INSTANCE_SIZE =
+        RamUsageEstimator.shallowSizeOfInstance(HeapNodeBuffer.class);
+    private static final int POSITIONS_PER_ENTRY = 3;
+    private static final int LEFT_CHILD_HEAP_INDEX_OFFSET = 1;
+    private static final int RIGHT_CHILD_HEAP_INDEX_OFFSET = 2;
+
+    /*
+     *  Memory layout:
+     *  [LONG] rowId1, [LONG] leftChildNodeIndex1, [LONG] rightChildNodeIndex1,
+     *  [LONG] rowId2, [LONG] leftChildNodeIndex2, [LONG] rightChildNodeIndex2,
+     *  ...
+     */
+    private final LongBigArray buffer = new LongBigArray();
+
+    private final LongBigArrayFIFOQueue emptySlots = new 
LongBigArrayFIFOQueue();
+
+    private long capacity;
+
+    /**
+     * Allocates storage for a new heap node.
+     *
+     * @return index referencing the node
+     */
+    public long allocateNewNode(long rowId) {
+      long newHeapIndex;
+      if (!emptySlots.isEmpty()) {
+        newHeapIndex = emptySlots.dequeueLong();
+      } else {
+        newHeapIndex = capacity;
+        capacity++;
+        buffer.ensureCapacity(capacity * POSITIONS_PER_ENTRY);
+      }
+
+      setRowId(newHeapIndex, rowId);
+      setLeftChildHeapIndex(newHeapIndex, UNKNOWN_INDEX);
+      setRightChildHeapIndex(newHeapIndex, UNKNOWN_INDEX);
+
+      return newHeapIndex;
+    }
+
+    public void deallocate(long index) {
+      emptySlots.enqueue(index);
+    }
+
+    public long getActiveNodeCount() {
+      return capacity - emptySlots.longSize();
+    }
+
+    public long getRowId(long index) {
+      return buffer.get(index * POSITIONS_PER_ENTRY);
+    }
+
+    public void setRowId(long index, long rowId) {
+      buffer.set(index * POSITIONS_PER_ENTRY, rowId);
+    }
+
+    public long getLeftChildHeapIndex(long index) {
+      return buffer.get(index * POSITIONS_PER_ENTRY + 
LEFT_CHILD_HEAP_INDEX_OFFSET);
+    }
+
+    public void setLeftChildHeapIndex(long index, long childHeapIndex) {
+      buffer.set(index * POSITIONS_PER_ENTRY + LEFT_CHILD_HEAP_INDEX_OFFSET, 
childHeapIndex);
+    }
+
+    public long getRightChildHeapIndex(long index) {
+      return buffer.get(index * POSITIONS_PER_ENTRY + 
RIGHT_CHILD_HEAP_INDEX_OFFSET);
+    }
+
+    public void setRightChildHeapIndex(long index, long childHeapIndex) {
+      buffer.set(index * POSITIONS_PER_ENTRY + RIGHT_CHILD_HEAP_INDEX_OFFSET, 
childHeapIndex);
+    }
+
+    public long sizeOf() {
+      return INSTANCE_SIZE + buffer.sizeOf() + emptySlots.sizeOf();
+    }
+  }
+}
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/GroupedTopNRowNumberBuilder.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/GroupedTopNRowNumberBuilder.java
new file mode 100644
index 00000000000..941d2ccafa1
--- /dev/null
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/GroupedTopNRowNumberBuilder.java
@@ -0,0 +1,162 @@
+package org.apache.iotdb.db.queryengine.execution.operator;
+
+import 
org.apache.iotdb.db.queryengine.execution.operator.source.relational.aggregation.grouped.array.LongBigArray;
+import 
org.apache.iotdb.db.queryengine.execution.operator.source.relational.aggregation.grouped.hash.GroupByHash;
+
+import com.google.common.collect.AbstractIterator;
+import com.google.common.collect.ImmutableList;
+import org.apache.tsfile.block.column.Column;
+import org.apache.tsfile.block.column.ColumnBuilder;
+import org.apache.tsfile.enums.TSDataType;
+import org.apache.tsfile.read.common.block.TsBlock;
+import org.apache.tsfile.read.common.block.TsBlockBuilder;
+import org.apache.tsfile.read.common.block.column.RunLengthEncodedColumn;
+import org.apache.tsfile.utils.RamUsageEstimator;
+
+import java.util.Iterator;
+import java.util.List;
+
+import static 
org.apache.iotdb.db.queryengine.execution.operator.source.relational.AbstractTableScanOperator.TIME_COLUMN_TEMPLATE;
+
+public class GroupedTopNRowNumberBuilder implements GroupedTopNBuilder {
+  private static final long INSTANCE_SIZE =
+      
RamUsageEstimator.shallowSizeOfInstance(GroupedTopNRowNumberBuilder.class);
+
+  private final List<TSDataType> sourceTypes;
+  private final boolean produceRowNumber;
+  private final int[] groupByChannels;
+  private final GroupByHash groupByHash;
+  private final RowReferencePageManager pageManager = new 
RowReferencePageManager();
+  private final GroupedTopNRowNumberAccumulator 
groupedTopNRowNumberAccumulator;
+  private final TsBlockWithPositionComparator comparator;
+
+  public GroupedTopNRowNumberBuilder(
+      List<TSDataType> sourceTypes,
+      TsBlockWithPositionComparator comparator,
+      int topN,
+      boolean produceRowNumber,
+      int[] groupByChannels,
+      GroupByHash groupByHash) {
+    this.sourceTypes = sourceTypes;
+    this.produceRowNumber = produceRowNumber;
+    this.groupByChannels = groupByChannels;
+    this.groupByHash = groupByHash;
+    this.comparator = comparator;
+
+    this.groupedTopNRowNumberAccumulator =
+        new GroupedTopNRowNumberAccumulator(
+            (leftRowId, rightRowId) -> {
+              TsBlock leftTsBlock = pageManager.getPage(leftRowId);
+              int leftPosition = pageManager.getPosition(leftRowId);
+              TsBlock rightTsBlock = pageManager.getPage(rightRowId);
+              int rightPosition = pageManager.getPosition(rightRowId);
+              return comparator.compareTo(leftTsBlock, leftPosition, 
rightTsBlock, rightPosition);
+            },
+            topN,
+            pageManager::dereference);
+  }
+
+  @Override
+  public void addTsBlock(TsBlock tsBlock) {
+    int[] groupIds = 
groupByHash.getGroupIds(tsBlock.getColumns(groupByChannels));
+    int groupCount = groupByHash.getGroupCount();
+
+    processTsBlock(tsBlock, groupCount, groupIds);
+  }
+
+  @Override
+  public Iterator<TsBlock> getResult() {
+    return new ResultIterator();
+  }
+
+  @Override
+  public long getEstimatedSizeInBytes() {
+    return INSTANCE_SIZE
+        + groupByHash.getEstimatedSize()
+        + pageManager.sizeOf()
+        + groupedTopNRowNumberAccumulator.sizeOf();
+  }
+
+  private void processTsBlock(TsBlock newTsBlock, int groupCount, int[] 
groupIds) {
+    int firstPositionToAdd =
+        groupedTopNRowNumberAccumulator.findFirstPositionToAdd(
+            newTsBlock, groupCount, groupIds, comparator, pageManager);
+    if (firstPositionToAdd < 0) {
+      return;
+    }
+
+    try (RowReferencePageManager.LoadCursor loadCursor =
+        pageManager.add(newTsBlock, firstPositionToAdd)) {
+      for (int position = firstPositionToAdd;
+          position < newTsBlock.getPositionCount();
+          position++) {
+        int groupId = groupIds[position];
+        loadCursor.advance();
+        groupedTopNRowNumberAccumulator.add(groupId, loadCursor);
+      }
+    }
+
+    pageManager.compactIfNeeded();
+  }
+
+  private class ResultIterator extends AbstractIterator<TsBlock> {
+    private final TsBlockBuilder pageBuilder;
+    private final int groupIdCount = groupByHash.getGroupCount();
+    private int currentGroupId = -1;
+    private final LongBigArray rowIdOutput = new LongBigArray();
+    private long currentGroupSize;
+    private int currentIndexInGroup;
+
+    ResultIterator() {
+      ImmutableList.Builder<TSDataType> sourceTypesBuilders =
+          ImmutableList.<TSDataType>builder().addAll(sourceTypes);
+      if (produceRowNumber) {
+        sourceTypesBuilders.add(TSDataType.INT64);
+      }
+      pageBuilder = new TsBlockBuilder(sourceTypesBuilders.build());
+    }
+
+    @Override
+    protected TsBlock computeNext() {
+      pageBuilder.reset();
+      while (!pageBuilder.isFull()) {
+        while (currentIndexInGroup >= currentGroupSize) {
+          if (currentGroupId + 1 >= groupIdCount) {
+            if (pageBuilder.isEmpty()) {
+              return endOfData();
+            }
+            return pageBuilder.build(
+                new RunLengthEncodedColumn(TIME_COLUMN_TEMPLATE, 
pageBuilder.getPositionCount()));
+          }
+          currentGroupId++;
+          currentGroupSize = 
groupedTopNRowNumberAccumulator.drainTo(currentGroupId, rowIdOutput);
+          currentIndexInGroup = 0;
+        }
+
+        long rowId = rowIdOutput.get(currentIndexInGroup);
+        TsBlock page = pageManager.getPage(rowId);
+        int position = pageManager.getPosition(rowId);
+        for (int i = 0; i < sourceTypes.size(); i++) {
+          ColumnBuilder builder = pageBuilder.getColumnBuilder(i);
+          Column column = page.getColumn(i);
+          builder.write(column, position);
+        }
+        if (produceRowNumber) {
+          ColumnBuilder builder = 
pageBuilder.getColumnBuilder(sourceTypes.size());
+          builder.writeLong(currentGroupId + 1);
+        }
+        pageBuilder.declarePosition();
+        currentIndexInGroup++;
+
+        // Deference the row for hygiene, but no need to compact them at this 
point
+        pageManager.dereference(rowId);
+      }
+
+      if (pageBuilder.isEmpty()) {
+        return endOfData();
+      }
+      return pageBuilder.build(
+          new RunLengthEncodedColumn(TIME_COLUMN_TEMPLATE, 
pageBuilder.getPositionCount()));
+    }
+  }
+}
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/IdRegistry.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/IdRegistry.java
new file mode 100644
index 00000000000..99b2f53e9de
--- /dev/null
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/IdRegistry.java
@@ -0,0 +1,57 @@
+package org.apache.iotdb.db.queryengine.execution.operator;
+
+import 
org.apache.iotdb.db.queryengine.execution.operator.source.relational.aggregation.grouped.array.IntArrayFIFOQueue;
+
+import org.apache.tsfile.utils.RamUsageEstimator;
+
+import java.util.HashMap;
+import java.util.function.IntFunction;
+
+public class IdRegistry<T> {
+  private static final long INSTANCE_SIZE =
+      RamUsageEstimator.shallowSizeOfInstance(IdRegistry.class);
+
+  private final HashMap<Integer, T> objects = new HashMap<>();
+  private final IntFIFOQueue emptySlots = new IntFIFOQueue();
+
+  /**
+   * Provides a new ID referencing the provided object.
+   *
+   * @return ID referencing the provided object
+   */
+  public T allocateId(IntFunction<T> factory) {
+    T result;
+    if (emptySlots.size() != 0) {
+      int id = emptySlots.dequeueInt();
+      result = factory.apply(id);
+      objects.put(id, result);
+    } else {
+      result = factory.apply(objects.size());
+      objects.put(objects.size(), result);
+    }
+    return result;
+  }
+
+  public void deallocate(int id) {
+    objects.remove(id);
+    emptySlots.enqueue(id);
+  }
+
+  public T get(int id) {
+    return objects.get(id);
+  }
+
+  /** Does not include the sizes of the referenced objects themselves. */
+  public long sizeOf() {
+    return INSTANCE_SIZE + RamUsageEstimator.sizeOfMap(objects) + 
emptySlots.sizeOf();
+  }
+
+  private static class IntFIFOQueue extends IntArrayFIFOQueue {
+    private static final long INSTANCE_SIZE =
+        RamUsageEstimator.shallowSizeOfInstance(IntFIFOQueue.class);
+
+    public long sizeOf() {
+      return INSTANCE_SIZE + RamUsageEstimator.sizeOf(array);
+    }
+  }
+}
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/RowIdComparisonStrategy.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/RowIdComparisonStrategy.java
new file mode 100644
index 00000000000..9b6f3df232e
--- /dev/null
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/RowIdComparisonStrategy.java
@@ -0,0 +1,5 @@
+package org.apache.iotdb.db.queryengine.execution.operator;
+
+public interface RowIdComparisonStrategy {
+  int compare(long leftRowId, long rightRowId);
+}
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/RowIdHashStrategy.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/RowIdHashStrategy.java
new file mode 100644
index 00000000000..137b55499e5
--- /dev/null
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/RowIdHashStrategy.java
@@ -0,0 +1,8 @@
+package org.apache.iotdb.db.queryengine.execution.operator;
+
+/** Hash strategy that evaluates over row IDs */
+public interface RowIdHashStrategy {
+  boolean equals(long leftRowId, long rightRowId);
+
+  long hashCode(long rowId);
+}
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/RowReference.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/RowReference.java
new file mode 100644
index 00000000000..92e7a9fb4b4
--- /dev/null
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/RowReference.java
@@ -0,0 +1,26 @@
+package org.apache.iotdb.db.queryengine.execution.operator;
+
+/**
+ * Reference to a row.
+ *
+ * <p>Note: RowReference gives us the ability to defer row ID generation 
(which can be expensive in
+ * tight loops).
+ */
+public interface RowReference {
+  /**
+   * Compares the referenced row to the specified row ID using the provided 
RowIdComparisonStrategy.
+   */
+  int compareTo(RowIdComparisonStrategy strategy, long rowId);
+
+  /**
+   * Checks equality of the referenced row with the specified row ID using the 
provided
+   * RowIdHashStrategy.
+   */
+  boolean equals(RowIdHashStrategy strategy, long rowId);
+
+  /** Calculates the hash of the referenced row using the provided 
RowIdHashStrategy. */
+  long hash(RowIdHashStrategy strategy);
+
+  /** Allocate a stable row ID that can be used to reference this row at a 
future point. */
+  long allocateRowId();
+}
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/RowReferencePageManager.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/RowReferencePageManager.java
new file mode 100644
index 00000000000..31a2de738f2
--- /dev/null
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/RowReferencePageManager.java
@@ -0,0 +1,361 @@
+package org.apache.iotdb.db.queryengine.execution.operator;
+
+import 
org.apache.iotdb.db.queryengine.execution.operator.source.relational.aggregation.grouped.array.IntBigArray;
+import 
org.apache.iotdb.db.queryengine.execution.operator.source.relational.aggregation.grouped.array.LongBigArrayFIFOQueue;
+
+import org.apache.tsfile.read.common.block.TsBlock;
+import org.apache.tsfile.utils.RamUsageEstimator;
+
+import java.util.Arrays;
+import java.util.HashSet;
+
+import static com.google.common.base.Preconditions.checkArgument;
+import static com.google.common.base.Preconditions.checkState;
+import static com.google.common.base.Verify.verify;
+
+public class RowReferencePageManager {
+  private static final long INSTANCE_SIZE =
+      RamUsageEstimator.shallowSizeOfInstance(RowReferencePageManager.class);
+  private static final long PAGE_ACCOUNTING_INSTANCE_SIZE =
+      RamUsageEstimator.shallowSizeOfInstance(PageAccounting.class);
+  private static final int RESERVED_ROW_ID_FOR_CURSOR = -1;
+
+  private final IdRegistry<PageAccounting> pages = new IdRegistry<>();
+  private final RowIdBuffer rowIdBuffer = new RowIdBuffer();
+  private final HashSet<Integer> compactionCandidates = new HashSet<>();
+
+  private LoadCursor currentCursor;
+  private long pageBytes;
+
+  public LoadCursor add(TsBlock tsBlock) {
+    return add(tsBlock, 0);
+  }
+
+  public LoadCursor add(TsBlock page, int startingPosition) {
+    checkState(currentCursor == null, "Cursor still active");
+    checkArgument(
+        startingPosition >= 0 && startingPosition <= page.getPositionCount(),
+        "invalid startingPosition: %s",
+        startingPosition);
+
+    PageAccounting pageAccounting = pages.allocateId(id -> new 
PageAccounting(id, page));
+
+    pageAccounting.lockPage();
+    currentCursor =
+        new LoadCursor(
+            pageAccounting,
+            startingPosition,
+            () -> {
+              // Initiate additional actions on close
+              checkState(currentCursor != null);
+              pageAccounting.unlockPage();
+              pageAccounting.loadPageLoadIfNeeded();
+              // Account for page size after lazy loading (which can change 
the page size)
+              pageBytes += pageAccounting.sizeOf();
+              currentCursor = null;
+
+              checkPageMaintenance(pageAccounting);
+            });
+
+    return currentCursor;
+  }
+
+  public void dereference(long rowId) {
+    PageAccounting pageAccounting = pages.get(rowIdBuffer.getPageId(rowId));
+    pageAccounting.dereference(rowId);
+    checkPageMaintenance(pageAccounting);
+  }
+
+  private void checkPageMaintenance(PageAccounting pageAccounting) {
+    int pageId = pageAccounting.getPageId();
+    if (pageAccounting.isPruneEligible()) {
+      compactionCandidates.remove(pageId);
+      pages.deallocate(pageId);
+      pageBytes -= pageAccounting.sizeOf();
+    } else if (pageAccounting.isCompactionEligible()) {
+      compactionCandidates.add(pageId);
+    }
+  }
+
+  public TsBlock getPage(long rowId) {
+    if (isCursorRowId(rowId)) {
+      checkState(currentCursor != null, "No active cursor");
+      return currentCursor.getPage();
+    }
+    int pageId = rowIdBuffer.getPageId(rowId);
+    return pages.get(pageId).getPage();
+  }
+
+  public int getPosition(long rowId) {
+    if (isCursorRowId(rowId)) {
+      checkState(currentCursor != null, "No active cursor");
+      // rowId for cursors only reference the single current position
+      return currentCursor.getCurrentPosition();
+    }
+    return rowIdBuffer.getPosition(rowId);
+  }
+
+  private static boolean isCursorRowId(long rowId) {
+    return rowId == RESERVED_ROW_ID_FOR_CURSOR;
+  }
+
+  public void compactIfNeeded() {
+    for (int pageId : compactionCandidates) {
+      PageAccounting pageAccounting = pages.get(pageId);
+      pageBytes -= pageAccounting.sizeOf();
+      pageAccounting.compact();
+      pageBytes += pageAccounting.sizeOf();
+    }
+    compactionCandidates.clear();
+  }
+
+  public long sizeOf() {
+    return INSTANCE_SIZE
+        + pageBytes
+        + pages.sizeOf()
+        + rowIdBuffer.sizeOf()
+        + RamUsageEstimator.sizeOfHashSet(compactionCandidates);
+  }
+
+  /**
+   * Cursor that allows callers to advance through the registered page and 
dictate whether a
+   * specific position should be preserved with a stable row ID. Row ID 
generation can be expensive
+   * in tight loops, so this allows callers to quickly skip positions that 
won't be needed.
+   */
+  public static final class LoadCursor implements RowReference, AutoCloseable {
+    private final PageAccounting pageAccounting;
+    private final Runnable closeCallback;
+
+    private int currentPosition;
+
+    private LoadCursor(
+        PageAccounting pageAccounting, int startingPosition, Runnable 
closeCallback) {
+      this.pageAccounting = pageAccounting;
+      this.currentPosition = startingPosition - 1;
+      this.closeCallback = closeCallback;
+    }
+
+    private TsBlock getPage() {
+      return pageAccounting.getPage();
+    }
+
+    private int getCurrentPosition() {
+      checkState(currentPosition >= 0, "Not yet advanced");
+      return currentPosition;
+    }
+
+    public boolean advance() {
+      if (currentPosition >= pageAccounting.getPage().getPositionCount() - 1) {
+        return false;
+      }
+      currentPosition++;
+      return true;
+    }
+
+    @Override
+    public int compareTo(RowIdComparisonStrategy strategy, long rowId) {
+      checkState(currentPosition >= 0, "Not yet advanced");
+      return strategy.compare(RESERVED_ROW_ID_FOR_CURSOR, rowId);
+    }
+
+    @Override
+    public boolean equals(RowIdHashStrategy strategy, long rowId) {
+      checkState(currentPosition >= 0, "Not yet advanced");
+      return strategy.equals(RESERVED_ROW_ID_FOR_CURSOR, rowId);
+    }
+
+    @Override
+    public long hash(RowIdHashStrategy strategy) {
+      checkState(currentPosition >= 0, "Not yet advanced");
+      return strategy.hashCode(RESERVED_ROW_ID_FOR_CURSOR);
+    }
+
+    @Override
+    public long allocateRowId() {
+      checkState(currentPosition >= 0, "Not yet advanced");
+      return pageAccounting.referencePosition(currentPosition);
+    }
+
+    @Override
+    public void close() {
+      closeCallback.run();
+    }
+  }
+
+  private final class PageAccounting {
+    private static final int COMPACTION_MIN_FILL_MULTIPLIER = 2;
+
+    private final int pageId;
+    private TsBlock page;
+    private boolean isPageLoaded;
+    private long[] rowIds;
+    // Start off locked to give the caller time to declare which rows to 
reference
+    private boolean lockedPage = true;
+    private int activePositions;
+
+    public PageAccounting(int pageId, TsBlock page) {
+      this.pageId = pageId;
+      this.page = page;
+      rowIds = new long[page.getPositionCount()];
+      Arrays.fill(rowIds, RowIdBuffer.UNKNOWN_ID);
+    }
+
+    /** Record the position as referenced and return a corresponding stable 
row ID */
+    public long referencePosition(int position) {
+      long rowId = rowIds[position];
+      if (rowId == RowIdBuffer.UNKNOWN_ID) {
+        rowId = rowIdBuffer.allocateRowId(pageId, position);
+        rowIds[position] = rowId;
+        activePositions++;
+      }
+      return rowId;
+    }
+
+    /**
+     * Locks the current page so that it can't be compacted (thus allowing for 
stable position-based
+     * access).
+     */
+    public void lockPage() {
+      lockedPage = true;
+    }
+
+    /** Unlocks the current page so that it becomes eligible for compaction. */
+    public void unlockPage() {
+      lockedPage = false;
+    }
+
+    public int getPageId() {
+      return pageId;
+    }
+
+    public TsBlock getPage() {
+      return page;
+    }
+
+    /** Dereferences the row ID from this page. */
+    public void dereference(long rowId) {
+      int position = rowIdBuffer.getPosition(rowId);
+      checkArgument(rowId == rowIds[position], "rowId does not match this 
page");
+      rowIds[position] = RowIdBuffer.UNKNOWN_ID;
+      activePositions--;
+      rowIdBuffer.deallocate(rowId);
+    }
+
+    public boolean isPruneEligible() {
+      // Pruning is only allowed if the page is unlocked
+      return !lockedPage && activePositions == 0;
+    }
+
+    public boolean isCompactionEligible() {
+      // Compaction is only allowed if the page is unlocked
+      return !lockedPage
+          && activePositions * COMPACTION_MIN_FILL_MULTIPLIER < 
page.getPositionCount();
+    }
+
+    public void loadPageLoadIfNeeded() {
+      if (!isPageLoaded && activePositions > 0) {
+        //        page = page.getLoadedPage();
+        isPageLoaded = true;
+      }
+    }
+
+    public void compact() {
+      checkState(!lockedPage, "Should not attempt compaction when page is 
locked");
+
+      if (activePositions == page.getPositionCount()) {
+        return;
+      }
+
+      loadPageLoadIfNeeded();
+
+      int newIndex = 0;
+      int[] positionsToKeep = new int[activePositions];
+      long[] newRowIds = new long[activePositions];
+      for (int i = 0; i < page.getPositionCount() && newIndex < 
positionsToKeep.length; i++) {
+        long rowId = rowIds[i];
+        positionsToKeep[newIndex] = i;
+        newRowIds[newIndex] = rowId;
+        newIndex += rowId == RowIdBuffer.UNKNOWN_ID ? 0 : 1;
+      }
+      verify(newIndex == activePositions);
+      for (int i = 0; i < newRowIds.length; i++) {
+        rowIdBuffer.setPosition(newRowIds[i], i);
+      }
+
+      // Compact page
+      //      page = page.copyPositions(positionsToKeep, 0, 
positionsToKeep.length);
+      rowIds = newRowIds;
+    }
+
+    public long sizeOf() {
+      // Getting the size of a page forces a lazy page to be loaded, so only 
provide the size after
+      // an explicit decision to load
+      long loadedPageSize = isPageLoaded ? page.getSizeInBytes() : 0;
+      return PAGE_ACCOUNTING_INSTANCE_SIZE + loadedPageSize + 
RamUsageEstimator.sizeOf(rowIds);
+    }
+  }
+
+  /** Buffer abstracting a mapping between row IDs and their associated page 
IDs and positions. */
+  private static class RowIdBuffer {
+    public static final long UNKNOWN_ID = -1;
+    private static final long INSTANCE_SIZE =
+        RamUsageEstimator.shallowSizeOfInstance(RowIdBuffer.class);
+
+    /*
+     *  Memory layout:
+     *  [INT] pageId1, [INT] position1,
+     *  [INT] pageId2, [INT] position2,
+     *  ...
+     */
+    private final IntBigArray buffer = new IntBigArray();
+
+    private final LongBigArrayFIFOQueue emptySlots = new 
LongBigArrayFIFOQueue();
+
+    private long capacity;
+
+    /**
+     * Provides a new row ID referencing the provided page position.
+     *
+     * @return ID referencing the provided page position
+     */
+    public long allocateRowId(int pageId, int position) {
+      long newRowId;
+      if (!emptySlots.isEmpty()) {
+        newRowId = emptySlots.dequeueLong();
+      } else {
+        newRowId = capacity;
+        capacity++;
+        buffer.ensureCapacity(capacity * 2);
+      }
+
+      setPageId(newRowId, pageId);
+      setPosition(newRowId, position);
+
+      return newRowId;
+    }
+
+    public void deallocate(long rowId) {
+      emptySlots.enqueue(rowId);
+    }
+
+    public int getPageId(long rowId) {
+      return buffer.get(rowId * 2);
+    }
+
+    public void setPageId(long rowId, int pageId) {
+      buffer.set(rowId * 2, pageId);
+    }
+
+    public int getPosition(long rowId) {
+      return buffer.get(rowId * 2 + 1);
+    }
+
+    public void setPosition(long rowId, int position) {
+      buffer.set(rowId * 2 + 1, position);
+    }
+
+    public long sizeOf() {
+      return INSTANCE_SIZE + buffer.sizeOf() + emptySlots.sizeOf();
+    }
+  }
+}
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/SimpleTsBlockWithPositionComparator.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/SimpleTsBlockWithPositionComparator.java
new file mode 100644
index 00000000000..a675ec97fb1
--- /dev/null
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/SimpleTsBlockWithPositionComparator.java
@@ -0,0 +1,27 @@
+package org.apache.iotdb.db.queryengine.execution.operator;
+
+import 
org.apache.iotdb.db.queryengine.execution.operator.process.join.merge.MergeSortComparator;
+import org.apache.iotdb.db.queryengine.plan.statement.component.SortItem;
+import org.apache.iotdb.db.utils.datastructure.MergeSortKey;
+import org.apache.iotdb.db.utils.datastructure.SortKey;
+
+import org.apache.tsfile.enums.TSDataType;
+import org.apache.tsfile.read.common.block.TsBlock;
+
+import java.util.Comparator;
+import java.util.List;
+
+public class SimpleTsBlockWithPositionComparator implements 
TsBlockWithPositionComparator {
+  private final Comparator<SortKey> comparator;
+
+  public SimpleTsBlockWithPositionComparator(
+      List<TSDataType> types, List<Integer> sortChannels, List<SortItem> 
sortItems) {
+    this.comparator = MergeSortComparator.getComparator(sortItems, 
sortChannels, types);
+  }
+
+  @Override
+  public int compareTo(TsBlock left, int leftPosition, TsBlock right, int 
rightPosition) {
+    return comparator.compare(
+        new MergeSortKey(left, leftPosition), new MergeSortKey(right, 
rightPosition));
+  }
+}
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/TsBlockWithPositionComparator.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/TsBlockWithPositionComparator.java
new file mode 100644
index 00000000000..dcb7e8ca621
--- /dev/null
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/TsBlockWithPositionComparator.java
@@ -0,0 +1,7 @@
+package org.apache.iotdb.db.queryengine.execution.operator;
+
+import org.apache.tsfile.read.common.block.TsBlock;
+
+public interface TsBlockWithPositionComparator {
+  int compareTo(TsBlock left, int leftPosition, TsBlock right, int 
rightPosition);
+}
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/process/ValuesOperator.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/process/ValuesOperator.java
new file mode 100644
index 00000000000..eeac35d9189
--- /dev/null
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/process/ValuesOperator.java
@@ -0,0 +1,120 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.queryengine.execution.operator.process;
+
+import org.apache.iotdb.db.queryengine.execution.MemoryEstimationHelper;
+import org.apache.iotdb.db.queryengine.execution.operator.Operator;
+import org.apache.iotdb.db.queryengine.execution.operator.OperatorContext;
+
+import com.google.common.collect.ImmutableList;
+import com.google.common.util.concurrent.ListenableFuture;
+import org.apache.tsfile.read.common.block.TsBlock;
+import org.apache.tsfile.utils.RamUsageEstimator;
+
+import java.util.Iterator;
+import java.util.List;
+
+import static java.util.Objects.requireNonNull;
+
+public class ValuesOperator implements Operator {
+  private static final long INSTANCE_SIZE =
+      RamUsageEstimator.shallowSizeOfInstance(ValuesOperator.class);
+
+  private final OperatorContext operatorContext;
+  private final Iterator<TsBlock> tsBlocks;
+  private final long maxTsBlockSize;
+  private long currentRetainedSize;
+
+  public ValuesOperator(OperatorContext operatorContext, List<TsBlock> 
tsBlocks) {
+    this.operatorContext = requireNonNull(operatorContext, "operatorContext is 
null");
+    requireNonNull(tsBlocks, "tsBlocks is null");
+
+    this.tsBlocks = ImmutableList.copyOf(tsBlocks).iterator();
+
+    long maxSize = 0;
+    long totalSize = 0;
+    for (TsBlock tsBlock : tsBlocks) {
+      long blockSize = tsBlock.getRetainedSizeInBytes();
+      maxSize = Math.max(maxSize, blockSize);
+      totalSize += blockSize;
+    }
+
+    this.maxTsBlockSize = maxSize;
+    this.currentRetainedSize = totalSize;
+  }
+
+  @Override
+  public OperatorContext getOperatorContext() {
+    return operatorContext;
+  }
+
+  @Override
+  public ListenableFuture<?> isBlocked() {
+    return NOT_BLOCKED;
+  }
+
+  @Override
+  public TsBlock next() throws Exception {
+    if (!tsBlocks.hasNext()) {
+      return null;
+    }
+
+    TsBlock tsBlock = tsBlocks.next();
+    if (tsBlock != null) {
+      currentRetainedSize -= tsBlock.getRetainedSizeInBytes();
+    }
+
+    return tsBlock;
+  }
+
+  @Override
+  public boolean hasNext() throws Exception {
+    return tsBlocks.hasNext();
+  }
+
+  @Override
+  public void close() throws Exception {}
+
+  @Override
+  public boolean isFinished() throws Exception {
+    return !tsBlocks.hasNext();
+  }
+
+  @Override
+  public long calculateMaxPeekMemory() {
+    return maxTsBlockSize;
+  }
+
+  @Override
+  public long calculateMaxReturnSize() {
+    return maxTsBlockSize;
+  }
+
+  @Override
+  public long calculateRetainedSizeAfterCallingNext() {
+    return currentRetainedSize;
+  }
+
+  @Override
+  public long ramBytesUsed() {
+    return INSTANCE_SIZE
+        + 
MemoryEstimationHelper.getEstimatedSizeOfAccountableObject(operatorContext);
+  }
+}
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/process/window/RowNumberOperator.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/process/window/RowNumberOperator.java
new file mode 100644
index 00000000000..8a31aa11c25
--- /dev/null
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/process/window/RowNumberOperator.java
@@ -0,0 +1,214 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.db.queryengine.execution.operator.process.window;
+
+import org.apache.iotdb.db.queryengine.execution.MemoryEstimationHelper;
+import org.apache.iotdb.db.queryengine.execution.operator.Operator;
+import org.apache.iotdb.db.queryengine.execution.operator.OperatorContext;
+import 
org.apache.iotdb.db.queryengine.execution.operator.process.ProcessOperator;
+import 
org.apache.iotdb.db.queryengine.execution.operator.source.relational.aggregation.grouped.UpdateMemory;
+import 
org.apache.iotdb.db.queryengine.execution.operator.source.relational.aggregation.grouped.hash.GroupByHash;
+import 
org.apache.iotdb.db.queryengine.plan.relational.type.InternalTypeManager;
+
+import com.google.common.collect.ImmutableList;
+import com.google.common.util.concurrent.ListenableFuture;
+import org.apache.tsfile.block.column.Column;
+import org.apache.tsfile.block.column.ColumnBuilder;
+import org.apache.tsfile.common.conf.TSFileDescriptor;
+import org.apache.tsfile.enums.TSDataType;
+import org.apache.tsfile.read.common.block.TsBlock;
+import org.apache.tsfile.read.common.block.TsBlockBuilder;
+import org.apache.tsfile.read.common.block.column.RunLengthEncodedColumn;
+import org.apache.tsfile.read.common.type.Type;
+import org.apache.tsfile.utils.RamUsageEstimator;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+
+import static 
org.apache.iotdb.db.queryengine.execution.operator.source.relational.AbstractTableScanOperator.TIME_COLUMN_TEMPLATE;
+import static 
org.apache.iotdb.db.queryengine.execution.operator.source.relational.aggregation.grouped.hash.GroupByHash.createGroupByHash;
+
+public class RowNumberOperator implements ProcessOperator {
+  private static final long INSTANCE_SIZE =
+      RamUsageEstimator.shallowSizeOfInstance(RowNumberOperator.class);
+
+  private final OperatorContext operatorContext;
+  private final Operator inputOperator;
+  private final List<Integer> outputChannels;
+  private final List<Integer> partitionChannels;
+  private final TsBlockBuilder tsBlockBuilder;
+
+  private final Optional<GroupByHash> groupByHash;
+  private final Optional<Integer> maxRowsPerPartition;
+  private final Map<Integer, Long> partitionRowCounts;
+
+  public RowNumberOperator(
+      OperatorContext operatorContext,
+      Operator inputOperator,
+      List<TSDataType> inputDataTypes,
+      List<Integer> outputChannels,
+      List<Integer> partitionChannels,
+      Optional<Integer> maxRowsPerPartition,
+      int expectedPositions) {
+    this.operatorContext = operatorContext;
+    this.inputOperator = inputOperator;
+    this.outputChannels = ImmutableList.copyOf(outputChannels);
+    this.partitionChannels = ImmutableList.copyOf(partitionChannels);
+    this.maxRowsPerPartition = maxRowsPerPartition;
+
+    // Output data types
+    // original output channels + row number column
+    List<TSDataType> outputDataTypes = new ArrayList<>();
+    for (int channel : outputChannels) {
+      outputDataTypes.add(inputDataTypes.get(channel));
+    }
+    outputDataTypes.add(TSDataType.INT64);
+    this.tsBlockBuilder = new TsBlockBuilder(outputDataTypes);
+
+    if (partitionChannels.isEmpty()) {
+      this.groupByHash = Optional.empty();
+    } else {
+      // Partition data types
+      List<Type> partitionDataTypes = new ArrayList<>();
+      for (int channel : partitionChannels) {
+        TSDataType tsDataType = inputDataTypes.get(channel);
+        Type convertType = InternalTypeManager.fromTSDataType(tsDataType);
+        partitionDataTypes.add(convertType);
+      }
+      this.groupByHash =
+          Optional.of(
+              createGroupByHash(partitionDataTypes, false, expectedPositions, 
UpdateMemory.NOOP));
+    }
+
+    this.partitionRowCounts = new HashMap<>(expectedPositions);
+  }
+
+  @Override
+  public OperatorContext getOperatorContext() {
+    return operatorContext;
+  }
+
+  @Override
+  public TsBlock next() throws Exception {
+    TsBlock tsBlock = inputOperator.nextWithTimer();
+    if (tsBlock == null) {
+      return null;
+    }
+
+    int[] partitionIds = getTsBlockPartitionIds(tsBlock);
+    for (int position = 0; position < tsBlock.getPositionCount(); position++) {
+      int partitionId = groupByHash.isPresent() ? partitionIds[position] : 0;
+      long rowCount = partitionRowCounts.getOrDefault(partitionId, 0L);
+      processRow(tsBlock, partitionId, rowCount + 1);
+      partitionRowCounts.put(partitionId, rowCount + 1);
+    }
+
+    TsBlock result =
+        tsBlockBuilder.build(
+            new RunLengthEncodedColumn(TIME_COLUMN_TEMPLATE, 
tsBlockBuilder.getPositionCount()));
+    tsBlockBuilder.reset();
+    return result;
+  }
+
+  private void processRow(TsBlock tsBlock, int position, long rowNumber) {
+    // Check max rows per partition limit
+    if (maxRowsPerPartition.isPresent() && rowNumber >= 
maxRowsPerPartition.get()) {
+      return; // Skip this row, partition has reached limit
+    }
+
+    // Copy origin values
+    for (int i = 0; i < outputChannels.size(); i++) {
+      Column column = tsBlock.getColumn(outputChannels.get(i));
+      ColumnBuilder columnBuilder = tsBlockBuilder.getColumnBuilder(i);
+      if (column.isNull(position)) {
+        columnBuilder.appendNull();
+      } else {
+        columnBuilder.write(column, position);
+      }
+    }
+    // Write row number
+    int rowNumberChannel = outputChannels.size();
+    ColumnBuilder columnBuilder = 
tsBlockBuilder.getColumnBuilder(rowNumberChannel);
+    columnBuilder.writeLong(rowNumber);
+
+    tsBlockBuilder.declarePosition();
+  }
+
+  private int[] getTsBlockPartitionIds(TsBlock tsBlock) {
+    if (groupByHash.isPresent()) {
+      Column[] partitionColumns = new Column[partitionChannels.size()];
+      for (int i = 0; i < partitionChannels.size(); i++) {
+        partitionColumns[i] = tsBlock.getColumn(partitionChannels.get(i));
+      }
+      return groupByHash.get().getGroupIds(partitionColumns);
+    } else {
+      return new int[] {0};
+    }
+  }
+
+  @Override
+  public boolean hasNext() throws Exception {
+    return inputOperator.hasNext();
+  }
+
+  @Override
+  public void close() throws Exception {
+    inputOperator.close();
+  }
+
+  @Override
+  public boolean isFinished() throws Exception {
+    return !this.hasNextWithTimer();
+  }
+
+  @Override
+  public long calculateMaxPeekMemory() {
+    long maxPeekMemoryFromInput = 
inputOperator.calculateMaxPeekMemoryWithCounter();
+    long maxPeekMemoryFromCurrent =
+        TSFileDescriptor.getInstance().getConfig().getMaxTsBlockSizeInBytes();
+    return Math.max(maxPeekMemoryFromInput, maxPeekMemoryFromCurrent)
+        + inputOperator.calculateRetainedSizeAfterCallingNext();
+  }
+
+  @Override
+  public long calculateMaxReturnSize() {
+    return 
TSFileDescriptor.getInstance().getConfig().getMaxTsBlockSizeInBytes();
+  }
+
+  @Override
+  public long calculateRetainedSizeAfterCallingNext() {
+    return inputOperator.calculateRetainedSizeAfterCallingNext();
+  }
+
+  @Override
+  public long ramBytesUsed() {
+    return INSTANCE_SIZE
+        + 
MemoryEstimationHelper.getEstimatedSizeOfAccountableObject(inputOperator)
+        + 
MemoryEstimationHelper.getEstimatedSizeOfAccountableObject(operatorContext)
+        + tsBlockBuilder.getRetainedSizeInBytes();
+  }
+
+  @Override
+  public ListenableFuture<?> isBlocked() {
+    return inputOperator.isBlocked();
+  }
+}
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/process/window/TopKRankingOperator.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/process/window/TopKRankingOperator.java
new file mode 100644
index 00000000000..b89ed378340
--- /dev/null
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/process/window/TopKRankingOperator.java
@@ -0,0 +1,277 @@
+package org.apache.iotdb.db.queryengine.execution.operator.process.window;
+
+import org.apache.iotdb.db.queryengine.common.SessionInfo;
+import org.apache.iotdb.db.queryengine.execution.operator.GroupedTopNBuilder;
+import 
org.apache.iotdb.db.queryengine.execution.operator.GroupedTopNRowNumberBuilder;
+import org.apache.iotdb.db.queryengine.execution.operator.Operator;
+import org.apache.iotdb.db.queryengine.execution.operator.OperatorContext;
+import 
org.apache.iotdb.db.queryengine.execution.operator.SimpleTsBlockWithPositionComparator;
+import 
org.apache.iotdb.db.queryengine.execution.operator.TsBlockWithPositionComparator;
+import 
org.apache.iotdb.db.queryengine.execution.operator.process.ProcessOperator;
+import 
org.apache.iotdb.db.queryengine.execution.operator.source.relational.aggregation.grouped.UpdateMemory;
+import 
org.apache.iotdb.db.queryengine.execution.operator.source.relational.aggregation.grouped.hash.GroupByHash;
+import 
org.apache.iotdb.db.queryengine.execution.operator.source.relational.aggregation.grouped.hash.NoChannelGroupByHash;
+import 
org.apache.iotdb.db.queryengine.plan.relational.planner.node.TopKRankingNode;
+import 
org.apache.iotdb.db.queryengine.plan.relational.type.InternalTypeManager;
+import org.apache.iotdb.db.queryengine.plan.statement.component.SortItem;
+
+import com.google.common.base.Supplier;
+import com.google.common.base.Suppliers;
+import com.google.common.collect.ImmutableList;
+import org.apache.tsfile.common.conf.TSFileDescriptor;
+import org.apache.tsfile.enums.TSDataType;
+import org.apache.tsfile.read.common.block.TsBlock;
+import org.apache.tsfile.read.common.type.Type;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Optional;
+
+public class TopKRankingOperator implements ProcessOperator {
+  private final OperatorContext operatorContext;
+  private final Operator inputOperator;
+  private final TopKRankingNode.RankingType rankingType;
+  private final List<TSDataType> inputTypes;
+
+  private final List<Integer> outputChannels;
+  private final List<Integer> partitionChannels;
+  private final List<TSDataType> partitionTypes;
+  private final List<Integer> sortChannels;
+  private final List<SortItem> sortItems;
+  private final int maxRowCountPerPartition;
+  private final boolean partial;
+  private final boolean generateRanking;
+  private final Optional<Integer> hashChannel;
+  private final int expectedPositions;
+
+  private final long maxFlushableBytes;
+
+  private final Supplier<GroupByHash> groupByHashSupplier;
+  private final Supplier<GroupedTopNBuilder> groupedTopNBuilderSupplier;
+
+  private GroupByHash groupByHash;
+  private GroupedTopNBuilder groupedTopNBuilder;
+  private boolean finishing;
+  private java.util.Iterator<TsBlock> outputIterator;
+
+  public TopKRankingOperator(
+      OperatorContext operatorContext,
+      Operator inputOperator,
+      TopKRankingNode.RankingType rankingType,
+      List<TSDataType> inputTypes,
+      List<Integer> outputChannels,
+      List<Integer> partitionChannels,
+      List<TSDataType> partitionTypes,
+      List<Integer> sortChannels,
+      List<SortItem> sortItems,
+      int maxRowCountPerPartition,
+      boolean generateRanking,
+      Optional<Integer> hashChannel,
+      int expectedPositions,
+      Optional<Long> maxPartialMemory) {
+    this.operatorContext = operatorContext;
+    this.inputOperator = inputOperator;
+    this.rankingType = rankingType;
+    this.inputTypes = inputTypes;
+    this.partitionChannels = partitionChannels;
+    this.partitionTypes = partitionTypes;
+    this.sortChannels = sortChannels;
+    this.sortItems = sortItems;
+    this.maxRowCountPerPartition = maxRowCountPerPartition;
+    this.partial = !generateRanking;
+    this.generateRanking = generateRanking;
+    this.hashChannel = hashChannel;
+    this.expectedPositions = expectedPositions;
+    this.maxFlushableBytes = maxPartialMemory.orElse(0L);
+
+    ImmutableList.Builder<Integer> outputChannelsBuilder = 
ImmutableList.builder();
+    for (int channel : outputChannels) {
+      outputChannelsBuilder.add(channel);
+    }
+    if (generateRanking) {
+      outputChannelsBuilder.add(outputChannels.size());
+    }
+    this.outputChannels = outputChannelsBuilder.build();
+
+    this.groupByHashSupplier =
+        getGroupByHashSupplier(
+            expectedPositions,
+            partitionTypes,
+            hashChannel.isPresent(),
+            operatorContext.getSessionInfo(),
+            UpdateMemory.NOOP);
+
+    // Prepare grouped topN builder supplier
+    this.groupedTopNBuilderSupplier =
+        getGroupedTopNBuilderSupplier(
+            rankingType,
+            inputTypes,
+            partitionChannels,
+            sortChannels,
+            sortItems,
+            maxRowCountPerPartition,
+            generateRanking,
+            groupByHashSupplier);
+  }
+
+  private static Supplier<GroupByHash> getGroupByHashSupplier(
+      int expectedPositions,
+      List<TSDataType> partitionTsDataTypes,
+      boolean hasPrecomputedHash,
+      SessionInfo session,
+      UpdateMemory updateMemory) {
+
+    if (partitionTsDataTypes.isEmpty()) {
+      return Suppliers.ofInstance(new NoChannelGroupByHash());
+    }
+
+    List<Type> partitionTypes = new ArrayList<>(partitionTsDataTypes.size());
+    for (TSDataType partitionTsDataType : partitionTsDataTypes) {
+      
partitionTypes.add(InternalTypeManager.fromTSDataType(partitionTsDataType));
+    }
+
+    return () ->
+        GroupByHash.createGroupByHash(
+            partitionTypes, hasPrecomputedHash, expectedPositions, 
updateMemory);
+  }
+
+  private static Supplier<GroupedTopNBuilder> getGroupedTopNBuilderSupplier(
+      TopKRankingNode.RankingType rankingType,
+      List<TSDataType> sourceTypes,
+      List<Integer> partitionChannels,
+      List<Integer> sortChannels,
+      List<SortItem> sortItems,
+      int maxRankingPerPartition,
+      boolean generateRanking,
+      Supplier<GroupByHash> groupByHashSupplier) {
+
+    if (rankingType == TopKRankingNode.RankingType.ROW_NUMBER) {
+      TsBlockWithPositionComparator comparator =
+          new SimpleTsBlockWithPositionComparator(sourceTypes, sortChannels, 
sortItems);
+      return () ->
+          new GroupedTopNRowNumberBuilder(
+              sourceTypes,
+              comparator,
+              maxRankingPerPartition,
+              generateRanking,
+              partitionChannels.stream().mapToInt(Integer::intValue).toArray(),
+              groupByHashSupplier.get());
+    }
+
+    //    if (rankingType == TopKRankingNode.RankingType.RANK) {
+    //      Comparator<TsBlock> comparator = new 
SimpleTsBlockWithPositionComparator(
+    //          sourceTypes, sortChannels, ascendingOrders);
+    //      return () -> new GroupedTopNRankBuilder(
+    //          sourceTypes,
+    //          comparator,
+    //          maxRankingPerPartition,
+    //          generateRanking,
+    //          groupByHashSupplier.get());
+    //    }
+
+    if (rankingType == TopKRankingNode.RankingType.DENSE_RANK) {
+      throw new UnsupportedOperationException("DENSE_RANK not yet 
implemented");
+    }
+
+    throw new IllegalArgumentException("Unknown ranking type: " + rankingType);
+  }
+
+  @Override
+  public OperatorContext getOperatorContext() {
+    return operatorContext;
+  }
+
+  @Override
+  public TsBlock next() throws Exception {
+    if (!finishing && (!partial || !isBuilderFull()) && outputIterator == 
null) {
+      // Still collecting input, nothing to output yet
+      return null;
+    }
+
+    if (outputIterator == null && groupedTopNBuilder != null) {
+      // Start flushing results
+      outputIterator = groupedTopNBuilder.getResult();
+    }
+
+    if (outputIterator != null && outputIterator.hasNext()) {
+      return outputIterator.next();
+    } else {
+      closeGroupedTopNBuilder();
+      return null;
+    }
+  }
+
+  @Override
+  public boolean hasNext() throws Exception {
+    // If we have an output iterator with more data, return true
+    if (outputIterator != null && outputIterator.hasNext()) {
+      return true;
+    }
+
+    // If we're finishing and have no more output, return false
+    if (finishing && outputIterator == null && groupedTopNBuilder == null) {
+      return false;
+    }
+
+    // If we have a builder that's full (partial) or we're finishing, we 
should have output
+    return (partial && isBuilderFull()) || finishing;
+  }
+
+  @Override
+  public void close() throws Exception {
+    closeGroupedTopNBuilder();
+    if (inputOperator != null) {
+      inputOperator.close();
+    }
+  }
+
+  @Override
+  public boolean isFinished() throws Exception {
+    return finishing && outputIterator == null && groupedTopNBuilder == null;
+  }
+
+  @Override
+  public long calculateMaxPeekMemory() {
+    long maxPeekMemoryFromInput = 
inputOperator.calculateMaxPeekMemoryWithCounter();
+    long maxTsBlockMemory = 
TSFileDescriptor.getInstance().getConfig().getMaxTsBlockSizeInBytes();
+    long builderMemory =
+        groupedTopNBuilder != null ? 
groupedTopNBuilder.getEstimatedSizeInBytes() : 0;
+    return Math.max(maxTsBlockMemory + builderMemory, maxPeekMemoryFromInput)
+        + inputOperator.calculateRetainedSizeAfterCallingNext();
+  }
+
+  @Override
+  public long calculateMaxReturnSize() {
+    return 
TSFileDescriptor.getInstance().getConfig().getMaxTsBlockSizeInBytes();
+  }
+
+  @Override
+  public long calculateRetainedSizeAfterCallingNext() {
+    long retainedSize = inputOperator.calculateRetainedSizeAfterCallingNext();
+    if (groupedTopNBuilder != null) {
+      retainedSize += groupedTopNBuilder.getEstimatedSizeInBytes();
+    }
+
+    return retainedSize;
+  }
+
+  private void closeGroupedTopNBuilder() {
+    if (groupedTopNBuilder != null) {
+      groupedTopNBuilder = null;
+    }
+    if (groupByHash != null) {
+      groupByHash = null;
+    }
+    outputIterator = null;
+  }
+
+  private boolean isBuilderFull() {
+    return groupedTopNBuilder != null
+        && groupedTopNBuilder.getEstimatedSizeInBytes() >= maxFlushableBytes;
+  }
+
+  @Override
+  public long ramBytesUsed() {
+    return 0;
+  }
+}
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/aggregation/grouped/array/IntArrayFIFOQueue.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/aggregation/grouped/array/IntArrayFIFOQueue.java
new file mode 100644
index 00000000000..7ef6fdc01f1
--- /dev/null
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/aggregation/grouped/array/IntArrayFIFOQueue.java
@@ -0,0 +1,182 @@
+package 
org.apache.iotdb.db.queryengine.execution.operator.source.relational.aggregation.grouped.array;
+
+import java.io.IOException;
+import java.io.ObjectInputStream;
+import java.io.ObjectOutputStream;
+import java.io.Serializable;
+import java.util.NoSuchElementException;
+
+public class IntArrayFIFOQueue implements Serializable {
+  private static final long serialVersionUID = 0L;
+  public static final int INITIAL_CAPACITY = 4;
+  protected transient int[] array;
+  protected transient int length;
+  protected transient int start;
+  protected transient int end;
+
+  public IntArrayFIFOQueue(int capacity) {
+    if (capacity > 2147483638) {
+      throw new IllegalArgumentException(
+          "Initial capacity (" + capacity + ") exceeds " + 2147483638);
+    } else if (capacity < 0) {
+      throw new IllegalArgumentException("Initial capacity (" + capacity + ") 
is negative");
+    } else {
+      this.array = new int[Math.max(1, capacity + 1)];
+      this.length = this.array.length;
+    }
+  }
+
+  public IntArrayFIFOQueue() {
+    this(4);
+  }
+
+  public IntComparator comparator() {
+    return null;
+  }
+
+  public int dequeueInt() {
+    if (this.start == this.end) {
+      throw new NoSuchElementException();
+    } else {
+      int t = this.array[this.start];
+      if (++this.start == this.length) {
+        this.start = 0;
+      }
+
+      this.reduce();
+      return t;
+    }
+  }
+
+  public int dequeueLastInt() {
+    if (this.start == this.end) {
+      throw new NoSuchElementException();
+    } else {
+      if (this.end == 0) {
+        this.end = this.length;
+      }
+
+      int t = this.array[--this.end];
+      this.reduce();
+      return t;
+    }
+  }
+
+  private final void resize(int size, int newLength) {
+    int[] newArray = new int[newLength];
+    if (this.start >= this.end) {
+      if (size != 0) {
+        System.arraycopy(this.array, this.start, newArray, 0, this.length - 
this.start);
+        System.arraycopy(this.array, 0, newArray, this.length - this.start, 
this.end);
+      }
+    } else {
+      System.arraycopy(this.array, this.start, newArray, 0, this.end - 
this.start);
+    }
+
+    this.start = 0;
+    this.end = size;
+    this.array = newArray;
+    this.length = newLength;
+  }
+
+  private final void expand() {
+    this.resize(this.length, (int) Math.min(2147483639L, 2L * (long) 
this.length));
+  }
+
+  private final void reduce() {
+    int size = this.size();
+    if (this.length > 4 && size <= this.length / 4) {
+      this.resize(size, this.length / 2);
+    }
+  }
+
+  public void enqueue(int x) {
+    this.array[this.end++] = x;
+    if (this.end == this.length) {
+      this.end = 0;
+    }
+
+    if (this.end == this.start) {
+      this.expand();
+    }
+  }
+
+  public void enqueueFirst(int x) {
+    if (this.start == 0) {
+      this.start = this.length;
+    }
+
+    this.array[--this.start] = x;
+    if (this.end == this.start) {
+      this.expand();
+    }
+  }
+
+  public int firstInt() {
+    if (this.start == this.end) {
+      throw new NoSuchElementException();
+    } else {
+      return this.array[this.start];
+    }
+  }
+
+  public int lastInt() {
+    if (this.start == this.end) {
+      throw new NoSuchElementException();
+    } else {
+      return this.array[(this.end == 0 ? this.length : this.end) - 1];
+    }
+  }
+
+  public void clear() {
+    this.start = this.end = 0;
+  }
+
+  public void trim() {
+    int size = this.size();
+    int[] newArray = new int[size + 1];
+    if (this.start <= this.end) {
+      System.arraycopy(this.array, this.start, newArray, 0, this.end - 
this.start);
+    } else {
+      System.arraycopy(this.array, this.start, newArray, 0, this.length - 
this.start);
+      System.arraycopy(this.array, 0, newArray, this.length - this.start, 
this.end);
+    }
+
+    this.start = 0;
+    this.length = (this.end = size) + 1;
+    this.array = newArray;
+  }
+
+  public int size() {
+    int apparentLength = this.end - this.start;
+    return apparentLength >= 0 ? apparentLength : this.length + apparentLength;
+  }
+
+  private void writeObject(ObjectOutputStream s) throws IOException {
+    s.defaultWriteObject();
+    int size = this.size();
+    s.writeInt(size);
+    int i = this.start;
+
+    while (size-- != 0) {
+      s.writeInt(this.array[i++]);
+      if (i == this.length) {
+        i = 0;
+      }
+    }
+  }
+
+  private void readObject(ObjectInputStream s) throws IOException, 
ClassNotFoundException {
+    s.defaultReadObject();
+    this.end = s.readInt();
+    this.array = new int[this.length = nextPowerOfTwo(this.end + 1)];
+
+    for (int i = 0; i < this.end; ++i) {
+      this.array[i] = s.readInt();
+    }
+  }
+
+  private static int nextPowerOfTwo(int x) {
+    return 1 << 32 - Integer.numberOfLeadingZeros(x - 1);
+  }
+}
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/aggregation/grouped/array/LongBigArrayFIFOQueue.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/aggregation/grouped/array/LongBigArrayFIFOQueue.java
new file mode 100644
index 00000000000..b87053254d9
--- /dev/null
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/aggregation/grouped/array/LongBigArrayFIFOQueue.java
@@ -0,0 +1,178 @@
+package 
org.apache.iotdb.db.queryengine.execution.operator.source.relational.aggregation.grouped.array;
+
+import java.util.NoSuchElementException;
+
+import static java.lang.Math.toIntExact;
+import static org.apache.tsfile.utils.RamUsageEstimator.shallowSizeOfInstance;
+
+public class LongBigArrayFIFOQueue {
+  private static final long INSTANCE_SIZE = 
shallowSizeOfInstance(LongBigArrayFIFOQueue.class);
+
+  /** The standard initial capacity of a queue. */
+  public static final long INITIAL_CAPACITY = BigArrays.SEGMENT_SIZE;
+
+  /** The backing array. */
+  protected LongBigArray array;
+
+  /** The current (cached) length of {@link #array}. */
+  protected long length;
+
+  /** The start position in {@link #array}. It is always strictly smaller than 
{@link #length}. */
+  protected long start;
+
+  /**
+   * The end position in {@link #array}. It is always strictly smaller than 
{@link #length}. Might
+   * be actually smaller than {@link #start} because {@link #array} is used 
cyclically.
+   */
+  protected long end;
+
+  /**
+   * Creates a new empty queue with given capacity.
+   *
+   * @param capacity the initial capacity of this queue.
+   */
+  public LongBigArrayFIFOQueue(final long capacity) {
+    if (capacity < 0) {
+      throw new IllegalArgumentException("Initial capacity (" + capacity + ") 
is negative");
+    }
+    array = new LongBigArray();
+    length =
+        Math.max(INITIAL_CAPACITY, capacity); // Never build a queue smaller 
than INITIAL_CAPACITY
+    array.ensureCapacity(length);
+  }
+
+  /** Creates a new empty queue with standard {@linkplain #INITIAL_CAPACITY 
initial capacity}. */
+  public LongBigArrayFIFOQueue() {
+    this(INITIAL_CAPACITY);
+  }
+
+  public long sizeOf() {
+    return INSTANCE_SIZE + array.sizeOf();
+  }
+
+  public long dequeueLong() {
+    if (start == end) {
+      throw new NoSuchElementException();
+    }
+    final long t = array.get(start);
+    if (++start == length) {
+      start = 0;
+    }
+    reduce();
+    return t;
+  }
+
+  public long dequeueLastLong() {
+    if (start == end) {
+      throw new NoSuchElementException();
+    }
+    if (end == 0) {
+      end = length;
+    }
+    final long t = array.get(--end);
+    reduce();
+    return t;
+  }
+
+  private void resize(final long size, final long newLength) {
+    final LongBigArray newArray = new LongBigArray();
+    newArray.ensureCapacity(newLength);
+    if (start >= end) {
+      if (size != 0) {
+        array.copyTo(start, newArray, 0, length - start);
+        array.copyTo(0, newArray, length - start, end);
+      }
+    } else {
+      array.copyTo(start, newArray, 0, end - start);
+    }
+    start = 0;
+    end = size;
+    array = newArray;
+    length = newLength;
+  }
+
+  private void expand() {
+    resize(length, 2L * length);
+  }
+
+  private void reduce() {
+    final long size = longSize();
+    if (length > INITIAL_CAPACITY && size <= length / 4) {
+      resize(size, length / 2);
+    }
+  }
+
+  public void enqueue(long x) {
+    array.set(end++, x);
+    if (end == length) {
+      end = 0;
+    }
+    if (end == start) {
+      expand();
+    }
+  }
+
+  /**
+   * Enqueues a new element as the first element (in dequeuing order) of the 
queue.
+   *
+   * @param x the element to enqueue.
+   */
+  public void enqueueFirst(long x) {
+    if (start == 0) {
+      start = length;
+    }
+    array.set(--start, x);
+    if (end == start) {
+      expand();
+    }
+  }
+
+  public long firstLong() {
+    if (start == end) {
+      throw new NoSuchElementException();
+    }
+    return array.get(start);
+  }
+
+  public long lastLong() {
+    if (start == end) {
+      throw new NoSuchElementException();
+    }
+    return array.get((end == 0 ? length : end) - 1);
+  }
+
+  public void clear() {
+    end = 0;
+    start = 0;
+  }
+
+  /** Trims the queue to the smallest possible size. */
+  public void trim() {
+    final long size = longSize();
+    final LongBigArray newArray = new LongBigArray();
+    newArray.ensureCapacity(size + 1);
+    if (start <= end) {
+      array.copyTo(start, newArray, 0, end - start);
+    } else {
+      array.copyTo(start, newArray, 0, length - start);
+      array.copyTo(0, newArray, length - start, end);
+    }
+    start = 0;
+    end = size;
+    length = size + 1;
+    array = newArray;
+  }
+
+  public int size() {
+    return toIntExact(longSize());
+  }
+
+  public long longSize() {
+    final long apparentLength = end - start;
+    return apparentLength >= 0 ? apparentLength : length + apparentLength;
+  }
+
+  public boolean isEmpty() {
+    return end == start;
+  }
+}
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/aggregation/grouped/hash/NoChannelGroupByHash.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/aggregation/grouped/hash/NoChannelGroupByHash.java
new file mode 100644
index 00000000000..3d530c809f9
--- /dev/null
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/aggregation/grouped/hash/NoChannelGroupByHash.java
@@ -0,0 +1,53 @@
+package 
org.apache.iotdb.db.queryengine.execution.operator.source.relational.aggregation.grouped.hash;
+
+import org.apache.tsfile.block.column.Column;
+import org.apache.tsfile.read.common.block.TsBlockBuilder;
+import org.apache.tsfile.utils.RamUsageEstimator;
+
+public class NoChannelGroupByHash implements GroupByHash {
+  private static final long INSTANCE_SIZE =
+      RamUsageEstimator.shallowSizeOfInstance(NoChannelGroupByHash.class);
+
+  private int groupCount;
+
+  @Override
+  public long getEstimatedSize() {
+    return INSTANCE_SIZE;
+  }
+
+  @Override
+  public int getGroupCount() {
+    return groupCount;
+  }
+
+  @Override
+  public void appendValuesTo(int groupId, TsBlockBuilder pageBuilder) {
+    throw new UnsupportedOperationException("NoChannelGroupByHash does not 
support appendValuesTo");
+  }
+
+  @Override
+  public void addPage(Column[] groupedColumns) {
+    updateGroupCount(groupedColumns);
+  }
+
+  @Override
+  public int[] getGroupIds(Column[] groupedColumns) {
+    return new int[groupedColumns[0].getPositionCount()];
+  }
+
+  @Override
+  public long getRawHash(int groupId) {
+    throw new UnsupportedOperationException("NoChannelGroupByHash does not 
support getRawHash");
+  }
+
+  @Override
+  public int getCapacity() {
+    return 2;
+  }
+
+  private void updateGroupCount(Column[] columns) {
+    if (columns[0].getPositionCount() > 0 && groupCount == 0) {
+      groupCount = 1;
+    }
+  }
+}
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/PlanNodeType.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/PlanNodeType.java
index 56ad6d59ba0..6ab628a64b7 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/PlanNodeType.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/PlanNodeType.java
@@ -316,6 +316,9 @@ public enum PlanNodeType {
   TABLE_UNION_NODE((short) 1034),
   TABLE_INTERSECT_NODE((short) 1035),
   TABLE_EXCEPT_NODE((short) 1036),
+  TABLE_TOPK_RANKING_NODE((short) 1037),
+  TABLE_ROW_NUMBER_NODE((short) 1038),
+  TABLE_VALUES_NODE((short) 1039),
 
   RELATIONAL_INSERT_TABLET((short) 2000),
   RELATIONAL_INSERT_ROW((short) 2001),
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/PlanVisitor.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/PlanVisitor.java
index ea669491a5f..5658cf04129 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/PlanVisitor.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/PlanVisitor.java
@@ -132,6 +132,7 @@ import 
org.apache.iotdb.db.queryengine.plan.relational.planner.node.LinearFillNo
 import 
org.apache.iotdb.db.queryengine.plan.relational.planner.node.MarkDistinctNode;
 import 
org.apache.iotdb.db.queryengine.plan.relational.planner.node.PatternRecognitionNode;
 import 
org.apache.iotdb.db.queryengine.plan.relational.planner.node.PreviousFillNode;
+import 
org.apache.iotdb.db.queryengine.plan.relational.planner.node.RowNumberNode;
 import 
org.apache.iotdb.db.queryengine.plan.relational.planner.node.SemiJoinNode;
 import 
org.apache.iotdb.db.queryengine.plan.relational.planner.node.TableFunctionNode;
 import 
org.apache.iotdb.db.queryengine.plan.relational.planner.node.TableFunctionProcessorNode;
@@ -773,6 +774,16 @@ public abstract class PlanVisitor<R, C> {
     return visitMultiChildProcess(node, context);
   }
 
+  public R visitTopKRanking(
+      
org.apache.iotdb.db.queryengine.plan.relational.planner.node.TopKRankingNode 
node,
+      C context) {
+    return visitMultiChildProcess(node, context);
+  }
+
+  public R visitRowNumber(RowNumberNode node, C context) {
+    return visitSingleChildProcess(node, context);
+  }
+
   public R visitJoin(
       org.apache.iotdb.db.queryengine.plan.relational.planner.node.JoinNode 
node, C context) {
     return visitTwoChildProcess(node, context);
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/iterative/rule/GatherAndMergeWindows.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/iterative/rule/GatherAndMergeWindows.java
new file mode 100644
index 00000000000..3e301f8e3a3
--- /dev/null
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/iterative/rule/GatherAndMergeWindows.java
@@ -0,0 +1,315 @@
+package org.apache.iotdb.db.queryengine.plan.relational.planner.iterative.rule;
+
+import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNode;
+import org.apache.iotdb.db.queryengine.plan.relational.planner.Assignments;
+import org.apache.iotdb.db.queryengine.plan.relational.planner.OrderingScheme;
+import org.apache.iotdb.db.queryengine.plan.relational.planner.Symbol;
+import 
org.apache.iotdb.db.queryengine.plan.relational.planner.SymbolsExtractor;
+import org.apache.iotdb.db.queryengine.plan.relational.planner.iterative.Rule;
+import 
org.apache.iotdb.db.queryengine.plan.relational.planner.node.ProjectNode;
+import org.apache.iotdb.db.queryengine.plan.relational.planner.node.WindowNode;
+import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.Expression;
+import org.apache.iotdb.db.queryengine.plan.relational.utils.matching.Capture;
+import org.apache.iotdb.db.queryengine.plan.relational.utils.matching.Captures;
+import org.apache.iotdb.db.queryengine.plan.relational.utils.matching.Pattern;
+import 
org.apache.iotdb.db.queryengine.plan.relational.utils.matching.PropertyPattern;
+
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.ImmutableSet;
+import com.google.common.collect.Maps;
+
+import java.util.Collection;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+import java.util.stream.IntStream;
+import java.util.stream.Stream;
+
+import static com.google.common.collect.ImmutableList.toImmutableList;
+import static com.google.common.collect.ImmutableSet.toImmutableSet;
+import static 
org.apache.iotdb.db.queryengine.plan.relational.planner.iterative.rule.Util.restrictOutputs;
+import static 
org.apache.iotdb.db.queryengine.plan.relational.planner.iterative.rule.Util.transpose;
+import static 
org.apache.iotdb.db.queryengine.plan.relational.planner.node.Patterns.project;
+import static 
org.apache.iotdb.db.queryengine.plan.relational.planner.node.Patterns.source;
+import static 
org.apache.iotdb.db.queryengine.plan.relational.planner.node.Patterns.window;
+import static 
org.apache.iotdb.db.queryengine.plan.relational.utils.matching.Capture.newCapture;
+
+public class GatherAndMergeWindows {
+  private GatherAndMergeWindows() {}
+
+  public static Set<Rule<?>> rules() {
+    // TODO convert to a pattern that allows for a sequence of ProjectNode, 
instead
+    // of a canned number, once the pattern system supports it.
+    return IntStream.range(0, 5)
+        .boxed()
+        .flatMap(
+            numProjects ->
+                Stream.of(
+                    new MergeAdjacentWindowsOverProjects(numProjects),
+                    new SwapAdjacentWindowsBySpecifications(numProjects)))
+        .collect(toImmutableSet());
+  }
+
+  private abstract static class ManipulateAdjacentWindowsOverProjects 
implements Rule<WindowNode> {
+    private final Capture<WindowNode> childCapture = newCapture();
+    private final List<Capture<ProjectNode>> projectCaptures;
+    private final Pattern<WindowNode> pattern;
+
+    protected ManipulateAdjacentWindowsOverProjects(int numProjects) {
+      PropertyPattern<PlanNode, ?, ?> childPattern =
+          source().matching(window().capturedAs(childCapture));
+      ImmutableList.Builder<Capture<ProjectNode>> projectCapturesBuilder = 
ImmutableList.builder();
+      for (int i = 0; i < numProjects; ++i) {
+        Capture<ProjectNode> projectCapture = newCapture();
+        projectCapturesBuilder.add(projectCapture);
+        childPattern = 
source().matching(project().capturedAs(projectCapture).with(childPattern));
+      }
+      this.projectCaptures = projectCapturesBuilder.build();
+      this.pattern = window().with(childPattern);
+    }
+
+    @Override
+    public Pattern<WindowNode> getPattern() {
+      return pattern;
+    }
+
+    @Override
+    public Result apply(WindowNode parent, Captures captures, Context context) 
{
+      // Pulling the descendant WindowNode above projects is done as a part of 
this rule, as opposed
+      // in a
+      // separate rule, because that pullup is not useful on its own, and 
could be undone by other
+      // rules.
+      // For example, a rule could insert a project-off node between adjacent 
WindowNodes that use
+      // different
+      // input symbols.
+      List<ProjectNode> projects =
+          
projectCaptures.stream().map(captures::get).collect(toImmutableList());
+
+      return pullWindowNodeAboveProjects(captures.get(childCapture), projects)
+          .flatMap(newChild -> manipulateAdjacentWindowNodes(parent, newChild, 
context))
+          .map(Result::ofPlanNode)
+          .orElse(Result.empty());
+    }
+
+    protected abstract Optional<PlanNode> manipulateAdjacentWindowNodes(
+        WindowNode parent, WindowNode child, Context context);
+
+    /**
+     * Looks for the pattern (ProjectNode*)WindowNode, and rewrites it to 
WindowNode(ProjectNode*),
+     * returning an empty option if it can't rewrite the projects, for example 
because they rely on
+     * the output of the WindowNode.
+     *
+     * @param projects the nodes above the target, bottom first.
+     */
+    protected static Optional<WindowNode> pullWindowNodeAboveProjects(
+        WindowNode target, List<ProjectNode> projects) {
+      if (projects.isEmpty()) {
+        return Optional.of(target);
+      }
+
+      PlanNode targetChild = target.getChild();
+
+      Set<Symbol> targetInputs = 
ImmutableSet.copyOf(targetChild.getOutputSymbols());
+      Set<Symbol> targetOutputs = 
ImmutableSet.copyOf(target.getOutputSymbols());
+
+      PlanNode newTargetChild = targetChild;
+
+      for (ProjectNode project : projects) {
+        Set<Symbol> newTargetChildOutputs = 
ImmutableSet.copyOf(newTargetChild.getOutputSymbols());
+
+        // The only kind of use of the output of the target that we can safely 
ignore is a simple
+        // identity propagation.
+        // The target node, when hoisted above the projections, will provide 
the symbols directly.
+        Map<Symbol, Expression> assignmentsWithoutTargetOutputIdentities =
+            Maps.filterKeys(
+                project.getAssignments().getMap(),
+                output ->
+                    !(project.getAssignments().isIdentity(output)
+                        && targetOutputs.contains(output)));
+
+        if 
(targetInputs.stream().anyMatch(assignmentsWithoutTargetOutputIdentities::containsKey))
 {
+          // Redefinition of an input to the target -- can't handle this case.
+          return Optional.empty();
+        }
+
+        Assignments newAssignments =
+            Assignments.builder()
+                .putAll(assignmentsWithoutTargetOutputIdentities)
+                .putIdentities(targetInputs)
+                .build();
+
+        if (!newTargetChildOutputs.containsAll(
+            SymbolsExtractor.extractUnique(newAssignments.getExpressions()))) {
+          // Projection uses an output of the target -- can't move the target 
above this projection.
+          return Optional.empty();
+        }
+
+        newTargetChild = new ProjectNode(project.getPlanNodeId(), 
newTargetChild, newAssignments);
+      }
+
+      WindowNode newTarget = (WindowNode) 
target.replaceChildren(ImmutableList.of(newTargetChild));
+      Set<Symbol> newTargetOutputs = 
ImmutableSet.copyOf(newTarget.getOutputSymbols());
+      if (!newTargetOutputs.containsAll(projects.get(projects.size() - 
1).getOutputSymbols())) {
+        // The new target node is hiding some of the projections, which makes 
this rewrite
+        // incorrect.
+        return Optional.empty();
+      }
+      return Optional.of(newTarget);
+    }
+  }
+
+  public static class MergeAdjacentWindowsOverProjects
+      extends ManipulateAdjacentWindowsOverProjects {
+    public MergeAdjacentWindowsOverProjects(int numProjects) {
+      super(numProjects);
+    }
+
+    @Override
+    protected Optional<PlanNode> manipulateAdjacentWindowNodes(
+        WindowNode parent, WindowNode child, Context context) {
+      if (!child.getSpecification().equals(parent.getSpecification()) || 
dependsOn(parent, child)) {
+        return Optional.empty();
+      }
+
+      ImmutableMap.Builder<Symbol, WindowNode.Function> functionsBuilder = 
ImmutableMap.builder();
+      functionsBuilder.putAll(parent.getWindowFunctions());
+      functionsBuilder.putAll(child.getWindowFunctions());
+
+      WindowNode mergedWindowNode =
+          new WindowNode(
+              parent.getPlanNodeId(),
+              child.getChild(),
+              parent.getSpecification(),
+              functionsBuilder.buildOrThrow(),
+              parent.getHashSymbol(),
+              parent.getPrePartitionedInputs(),
+              parent.getPreSortedOrderPrefix());
+
+      return Optional.of(
+          restrictOutputs(
+                  context.getIdAllocator(),
+                  mergedWindowNode,
+                  ImmutableSet.copyOf(parent.getOutputSymbols()))
+              .orElse(mergedWindowNode));
+    }
+  }
+
+  public static class SwapAdjacentWindowsBySpecifications
+      extends ManipulateAdjacentWindowsOverProjects {
+    public SwapAdjacentWindowsBySpecifications(int numProjects) {
+      super(numProjects);
+    }
+
+    @Override
+    protected Optional<PlanNode> manipulateAdjacentWindowNodes(
+        WindowNode parent, WindowNode child, Context context) {
+      if ((compare(parent, child) < 0) && (!dependsOn(parent, child))) {
+        PlanNode transposedWindows = transpose(parent, child);
+        return Optional.of(
+            restrictOutputs(
+                    context.getIdAllocator(),
+                    transposedWindows,
+                    ImmutableSet.copyOf(parent.getOutputSymbols()))
+                .orElse(transposedWindows));
+      }
+      return Optional.empty();
+    }
+
+    private static int compare(WindowNode o1, WindowNode o2) {
+      int comparison = comparePartitionBy(o1, o2);
+      if (comparison != 0) {
+        return comparison;
+      }
+
+      comparison = compareOrderBy(o1, o2);
+      if (comparison != 0) {
+        return comparison;
+      }
+
+      // If PartitionBy and OrderBy clauses are identical, let's establish an 
arbitrary order to
+      // prevent non-deterministic results of swapping WindowNodes in such a 
case
+      return 
o1.getPlanNodeId().toString().compareTo(o2.getPlanNodeId().toString());
+    }
+
+    private static int comparePartitionBy(WindowNode o1, WindowNode o2) {
+      Iterator<Symbol> iterator1 = 
o1.getSpecification().getPartitionBy().iterator();
+      Iterator<Symbol> iterator2 = 
o2.getSpecification().getPartitionBy().iterator();
+
+      while (iterator1.hasNext() && iterator2.hasNext()) {
+        Symbol symbol1 = iterator1.next();
+        Symbol symbol2 = iterator2.next();
+
+        int partitionByComparison = symbol1.compareTo(symbol2);
+        if (partitionByComparison != 0) {
+          return partitionByComparison;
+        }
+      }
+
+      if (iterator1.hasNext()) {
+        return 1;
+      }
+      if (iterator2.hasNext()) {
+        return -1;
+      }
+      return 0;
+    }
+
+    private static int compareOrderBy(WindowNode o1, WindowNode o2) {
+      if (!o1.getSpecification().getOrderingScheme().isPresent()
+          && !o2.getSpecification().getOrderingScheme().isPresent()) {
+        return 0;
+      }
+      if (o1.getSpecification().getOrderingScheme().isPresent()
+          && !o2.getSpecification().getOrderingScheme().isPresent()) {
+        return 1;
+      }
+      if (!o1.getSpecification().getOrderingScheme().isPresent()
+          && o2.getSpecification().getOrderingScheme().isPresent()) {
+        return -1;
+      }
+
+      OrderingScheme o1OrderingScheme = 
o1.getSpecification().getOrderingScheme().get();
+      OrderingScheme o2OrderingScheme = 
o2.getSpecification().getOrderingScheme().get();
+      Iterator<Symbol> iterator1 = o1OrderingScheme.getOrderBy().iterator();
+      Iterator<Symbol> iterator2 = o2OrderingScheme.getOrderBy().iterator();
+
+      while (iterator1.hasNext() && iterator2.hasNext()) {
+        Symbol symbol1 = iterator1.next();
+        Symbol symbol2 = iterator2.next();
+
+        int orderByComparison = symbol1.compareTo(symbol2);
+        if (orderByComparison != 0) {
+          return orderByComparison;
+        }
+        int sortOrderComparison =
+            
o1OrderingScheme.getOrdering(symbol1).compareTo(o2OrderingScheme.getOrdering(symbol2));
+        if (sortOrderComparison != 0) {
+          return sortOrderComparison;
+        }
+      }
+
+      if (iterator1.hasNext()) {
+        return 1;
+      }
+      if (iterator2.hasNext()) {
+        return -1;
+      }
+      return 0;
+    }
+  }
+
+  private static boolean dependsOn(WindowNode parent, WindowNode child) {
+    return parent.getSpecification().getPartitionBy().stream()
+            .anyMatch(child.getCreatedSymbols()::contains)
+        || (parent.getSpecification().getOrderingScheme().isPresent()
+            && 
parent.getSpecification().getOrderingScheme().get().getOrderBy().stream()
+                .anyMatch(child.getCreatedSymbols()::contains))
+        || parent.getWindowFunctions().values().stream()
+            .map(SymbolsExtractor::extractUnique)
+            .flatMap(Collection::stream)
+            .anyMatch(child.getCreatedSymbols()::contains);
+  }
+}
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/iterative/rule/PushDownFilterIntoWindow.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/iterative/rule/PushDownFilterIntoWindow.java
new file mode 100644
index 00000000000..ad388c09bda
--- /dev/null
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/iterative/rule/PushDownFilterIntoWindow.java
@@ -0,0 +1,147 @@
+package org.apache.iotdb.db.queryengine.plan.relational.planner.iterative.rule;
+
+import org.apache.iotdb.db.queryengine.plan.relational.planner.PlannerContext;
+import org.apache.iotdb.db.queryengine.plan.relational.planner.Symbol;
+import org.apache.iotdb.db.queryengine.plan.relational.planner.iterative.Rule;
+import org.apache.iotdb.db.queryengine.plan.relational.planner.node.FilterNode;
+import 
org.apache.iotdb.db.queryengine.plan.relational.planner.node.TopKRankingNode;
+import org.apache.iotdb.db.queryengine.plan.relational.planner.node.ValuesNode;
+import org.apache.iotdb.db.queryengine.plan.relational.planner.node.WindowNode;
+import 
org.apache.iotdb.db.queryengine.plan.relational.sql.ast.ComparisonExpression;
+import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.Expression;
+import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.Literal;
+import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.SymbolReference;
+import org.apache.iotdb.db.queryengine.plan.relational.utils.matching.Capture;
+import org.apache.iotdb.db.queryengine.plan.relational.utils.matching.Captures;
+import org.apache.iotdb.db.queryengine.plan.relational.utils.matching.Pattern;
+
+import com.google.common.collect.ImmutableList;
+
+import java.util.Optional;
+import java.util.OptionalInt;
+
+import static com.google.common.collect.Iterables.getOnlyElement;
+import static java.lang.Math.toIntExact;
+import static java.util.Objects.requireNonNull;
+import static 
org.apache.iotdb.db.queryengine.plan.relational.planner.iterative.rule.Util.toTopNRankingType;
+import static 
org.apache.iotdb.db.queryengine.plan.relational.planner.node.Patterns.filter;
+import static 
org.apache.iotdb.db.queryengine.plan.relational.planner.node.Patterns.source;
+import static 
org.apache.iotdb.db.queryengine.plan.relational.planner.node.Patterns.window;
+import static 
org.apache.iotdb.db.queryengine.plan.relational.utils.matching.Capture.newCapture;
+
+public class PushDownFilterIntoWindow implements Rule<FilterNode> {
+  private static final Capture<WindowNode> childCapture = newCapture();
+
+  private final Pattern<FilterNode> pattern;
+  private final PlannerContext plannerContext;
+
+  public PushDownFilterIntoWindow(PlannerContext plannerContext) {
+    this.plannerContext = requireNonNull(plannerContext, "plannerContext is 
null");
+    this.pattern =
+        filter()
+            .with(
+                source()
+                    .matching(
+                        window()
+                            .matching(
+                                window -> 
window.getSpecification().getOrderingScheme().isPresent())
+                            .matching(window -> 
toTopNRankingType(window).isPresent())
+                            .capturedAs(childCapture)));
+  }
+
+  @Override
+  public Pattern<FilterNode> getPattern() {
+    return pattern;
+  }
+
+  @Override
+  public Result apply(FilterNode node, Captures captures, Context context) {
+    WindowNode windowNode = captures.get(childCapture);
+    Optional<TopKRankingNode.RankingType> rankingType = 
toTopNRankingType(windowNode);
+    Symbol rankingSymbol = 
getOnlyElement(windowNode.getWindowFunctions().keySet());
+
+    OptionalInt upperBound = 
extractUpperBoundFromComparison(node.getPredicate(), rankingSymbol);
+
+    if (!upperBound.isPresent()) {
+      return Result.empty();
+    }
+
+    if (upperBound.getAsInt() <= 0) {
+      return Result.ofPlanNode(
+          new ValuesNode(node.getPlanNodeId(), node.getOutputSymbols(), 
ImmutableList.of()));
+    }
+
+    TopKRankingNode newSource =
+        new TopKRankingNode(
+            windowNode.getPlanNodeId(),
+            windowNode.getChildren(),
+            windowNode.getSpecification(),
+            rankingType.get(),
+            rankingSymbol,
+            upperBound.getAsInt(),
+            false);
+
+    if (needToKeepFilter(node.getPredicate(), rankingSymbol, 
upperBound.getAsInt())) {
+      return Result.ofPlanNode(
+          new FilterNode(node.getPlanNodeId(), newSource, 
node.getPredicate()));
+    }
+
+    return Result.ofPlanNode(newSource);
+  }
+
+  private OptionalInt extractUpperBoundFromComparison(Expression predicate, 
Symbol rankingSymbol) {
+    if (!(predicate instanceof ComparisonExpression)) {
+      return OptionalInt.empty();
+    }
+
+    ComparisonExpression comparison = (ComparisonExpression) predicate;
+    Expression left = comparison.getLeft();
+    Expression right = comparison.getRight();
+
+    if (!(left instanceof SymbolReference) || !(right instanceof Literal)) {
+      return OptionalInt.empty();
+    }
+
+    SymbolReference symbolRef = (SymbolReference) left;
+    if (!symbolRef.getName().equals(rankingSymbol.getName())) {
+      return OptionalInt.empty();
+    }
+
+    Literal literal = (Literal) right;
+    Object value = literal.getTsValue();
+    if (!(value instanceof Number)) {
+      return OptionalInt.empty();
+    }
+
+    long constantValue = ((Number) value).longValue();
+
+    switch (comparison.getOperator()) {
+      case LESS_THAN:
+        return OptionalInt.of(toIntExact(constantValue - 1));
+      case LESS_THAN_OR_EQUAL:
+      case EQUAL:
+        return OptionalInt.of(toIntExact(constantValue));
+      default:
+        return OptionalInt.empty();
+    }
+  }
+
+  private boolean needToKeepFilter(Expression predicate, Symbol rankingSymbol, 
int upperBound) {
+    if (!(predicate instanceof ComparisonExpression)) {
+      return true;
+    }
+
+    ComparisonExpression comparison = (ComparisonExpression) predicate;
+
+    if (comparison.getOperator() == ComparisonExpression.Operator.EQUAL) {
+      return true;
+    }
+
+    if (comparison.getOperator() == 
ComparisonExpression.Operator.LESS_THAN_OR_EQUAL
+        || comparison.getOperator() == 
ComparisonExpression.Operator.LESS_THAN) {
+      return false;
+    }
+
+    return true;
+  }
+}
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/iterative/rule/PushDownLimitIntoWindow.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/iterative/rule/PushDownLimitIntoWindow.java
new file mode 100644
index 00000000000..94e52c5caa5
--- /dev/null
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/iterative/rule/PushDownLimitIntoWindow.java
@@ -0,0 +1,81 @@
+package org.apache.iotdb.db.queryengine.plan.relational.planner.iterative.rule;
+
+import org.apache.iotdb.db.queryengine.common.SessionInfo;
+import org.apache.iotdb.db.queryengine.plan.relational.planner.iterative.Rule;
+import org.apache.iotdb.db.queryengine.plan.relational.planner.node.LimitNode;
+import 
org.apache.iotdb.db.queryengine.plan.relational.planner.node.TopKRankingNode;
+import org.apache.iotdb.db.queryengine.plan.relational.planner.node.WindowNode;
+import org.apache.iotdb.db.queryengine.plan.relational.utils.matching.Capture;
+import org.apache.iotdb.db.queryengine.plan.relational.utils.matching.Captures;
+import org.apache.iotdb.db.queryengine.plan.relational.utils.matching.Pattern;
+
+import com.google.common.collect.ImmutableList;
+
+import java.util.Optional;
+
+import static com.google.common.collect.Iterables.getOnlyElement;
+import static java.lang.Math.toIntExact;
+import static 
org.apache.iotdb.db.queryengine.plan.relational.planner.iterative.rule.Util.toTopNRankingType;
+import static 
org.apache.iotdb.db.queryengine.plan.relational.planner.node.ChildReplacer.replaceChildren;
+import static 
org.apache.iotdb.db.queryengine.plan.relational.planner.node.Patterns.limit;
+import static 
org.apache.iotdb.db.queryengine.plan.relational.planner.node.Patterns.source;
+import static 
org.apache.iotdb.db.queryengine.plan.relational.planner.node.Patterns.window;
+import static 
org.apache.iotdb.db.queryengine.plan.relational.planner.node.TopKRankingNode.RankingType.ROW_NUMBER;
+import static 
org.apache.iotdb.db.queryengine.plan.relational.utils.matching.Capture.newCapture;
+
+public class PushDownLimitIntoWindow implements Rule<LimitNode> {
+  private static final Capture<WindowNode> childCapture = newCapture();
+  private final Pattern<LimitNode> pattern;
+
+  public PushDownLimitIntoWindow() {
+    this.pattern =
+        limit()
+            .matching(
+                limit ->
+                    !limit.isWithTies()
+                        && limit.getCount() != 0
+                        && limit.getCount() <= Integer.MAX_VALUE
+                        && !limit.requiresPreSortedInputs())
+            .with(
+                source()
+                    .matching(
+                        window()
+                            .matching(
+                                window -> 
window.getSpecification().getOrderingScheme().isPresent())
+                            .matching(window -> 
toTopNRankingType(window).isPresent())
+                            .capturedAs(childCapture)));
+  }
+
+  // TODO: isOptimizeTopNRanking(session);
+  @Override
+  public boolean isEnabled(SessionInfo session) {
+    return true;
+  }
+
+  @Override
+  public Pattern<LimitNode> getPattern() {
+    return pattern;
+  }
+
+  @Override
+  public Result apply(LimitNode node, Captures captures, Context context) {
+    WindowNode source = captures.get(childCapture);
+
+    Optional<TopKRankingNode.RankingType> rankingType = 
toTopNRankingType(source);
+
+    int limit = toIntExact(node.getCount());
+    TopKRankingNode topNRowNumberNode =
+        new TopKRankingNode(
+            source.getPlanNodeId(),
+            source.getChildren(),
+            source.getSpecification(),
+            rankingType.get(),
+            getOnlyElement(source.getWindowFunctions().keySet()),
+            limit,
+            false);
+    if (rankingType.get() == ROW_NUMBER && 
source.getSpecification().getPartitionBy().isEmpty()) {
+      return Result.ofPlanNode(topNRowNumberNode);
+    }
+    return Result.ofPlanNode(replaceChildren(node, 
ImmutableList.of(topNRowNumberNode)));
+  }
+}
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/iterative/rule/RemoveRedundantWindow.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/iterative/rule/RemoveRedundantWindow.java
new file mode 100644
index 00000000000..6a6b18c8a9f
--- /dev/null
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/iterative/rule/RemoveRedundantWindow.java
@@ -0,0 +1,26 @@
+package org.apache.iotdb.db.queryengine.plan.relational.planner.iterative.rule;
+
+import org.apache.iotdb.db.queryengine.plan.relational.planner.iterative.Rule;
+import org.apache.iotdb.db.queryengine.plan.relational.planner.node.WindowNode;
+import org.apache.iotdb.db.queryengine.plan.relational.utils.matching.Captures;
+import org.apache.iotdb.db.queryengine.plan.relational.utils.matching.Pattern;
+
+import static 
org.apache.iotdb.db.queryengine.plan.relational.planner.node.Patterns.window;
+
+public class RemoveRedundantWindow implements Rule<WindowNode> {
+  private static final Pattern<WindowNode> PATTERN = window();
+
+  @Override
+  public Pattern<WindowNode> getPattern() {
+    return PATTERN;
+  }
+
+  @Override
+  public Result apply(WindowNode window, Captures captures, Context context) {
+    //    if (isEmpty(window.getChild(), context.getLookup())) {
+    //      return Result.ofPlanNode(new ValuesNode(window.getPlanNodeId(),
+    // window.getOutputSymbols(), ImmutableList.of()));
+    //    }
+    return Result.empty();
+  }
+}
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/iterative/rule/ReplaceWindowWithRowNumber.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/iterative/rule/ReplaceWindowWithRowNumber.java
new file mode 100644
index 00000000000..4c777806800
--- /dev/null
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/iterative/rule/ReplaceWindowWithRowNumber.java
@@ -0,0 +1,43 @@
+package org.apache.iotdb.db.queryengine.plan.relational.planner.iterative.rule;
+
+import org.apache.iotdb.db.queryengine.plan.relational.function.BoundSignature;
+import org.apache.iotdb.db.queryengine.plan.relational.metadata.Metadata;
+import org.apache.iotdb.db.queryengine.plan.relational.planner.iterative.Rule;
+import org.apache.iotdb.db.queryengine.plan.relational.planner.node.WindowNode;
+import org.apache.iotdb.db.queryengine.plan.relational.utils.matching.Captures;
+import org.apache.iotdb.db.queryengine.plan.relational.utils.matching.Pattern;
+
+import static com.google.common.collect.Iterables.getOnlyElement;
+import static 
org.apache.iotdb.db.queryengine.plan.relational.planner.node.Patterns.window;
+
+public class ReplaceWindowWithRowNumber implements Rule<WindowNode> {
+  private final Pattern<WindowNode> pattern;
+
+  public ReplaceWindowWithRowNumber(Metadata metadata) {
+    this.pattern =
+        window()
+            .matching(
+                window -> {
+                  if (window.getWindowFunctions().size() != 1) {
+                    return false;
+                  }
+                  BoundSignature signature =
+                      getOnlyElement(window.getWindowFunctions().values())
+                          .getResolvedFunction()
+                          .getSignature();
+                  return signature.getArgumentTypes().isEmpty()
+                      && signature.getName().equals("row_number");
+                })
+            .matching(window -> 
!window.getSpecification().getOrderingScheme().isPresent());
+  }
+
+  @Override
+  public Pattern<WindowNode> getPattern() {
+    return pattern;
+  }
+
+  @Override
+  public Result apply(WindowNode node, Captures captures, Context context) {
+    return null;
+  }
+}
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/iterative/rule/Util.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/iterative/rule/Util.java
index 7447508ab1e..5845ffd4221 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/iterative/rule/Util.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/iterative/rule/Util.java
@@ -20,10 +20,13 @@ package 
org.apache.iotdb.db.queryengine.plan.relational.planner.iterative.rule;
 
 import org.apache.iotdb.db.queryengine.common.QueryId;
 import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNode;
+import org.apache.iotdb.db.queryengine.plan.relational.function.BoundSignature;
 import org.apache.iotdb.db.queryengine.plan.relational.planner.Assignments;
 import org.apache.iotdb.db.queryengine.plan.relational.planner.Symbol;
 import 
org.apache.iotdb.db.queryengine.plan.relational.planner.SymbolsExtractor;
 import 
org.apache.iotdb.db.queryengine.plan.relational.planner.node.ProjectNode;
+import 
org.apache.iotdb.db.queryengine.plan.relational.planner.node.TopKRankingNode;
+import org.apache.iotdb.db.queryengine.plan.relational.planner.node.WindowNode;
 import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.Expression;
 
 import com.google.common.collect.ImmutableList;
@@ -37,12 +40,11 @@ import java.util.Set;
 
 import static com.google.common.base.Preconditions.checkArgument;
 import static com.google.common.collect.ImmutableList.toImmutableList;
+import static com.google.common.collect.Iterables.getOnlyElement;
+import static 
org.apache.iotdb.db.queryengine.plan.relational.planner.node.TopKRankingNode.RankingType.RANK;
+import static 
org.apache.iotdb.db.queryengine.plan.relational.planner.node.TopKRankingNode.RankingType.ROW_NUMBER;
 
 final class Util {
-  // private static final CatalogSchemaFunctionName ROW_NUMBER_NAME =
-  // builtinFunctionName("row_number");
-  // private static final CatalogSchemaFunctionName RANK_NAME = 
builtinFunctionName("rank");
-
   private Util() {}
 
   /**
@@ -121,22 +123,23 @@ final class Util {
     return Optional.of(node.replaceChildren(newChildrenBuilder.build()));
   }
 
-  /*public static Optional<RankingType> toTopNRankingType(WindowNode node)
-  {
-      if (node.getWindowFunctions().size() != 1 || 
node.getOrderingScheme().isEmpty()) {
-          return Optional.empty();
-      }
-
-      BoundSignature signature = 
getOnlyElement(node.getWindowFunctions().values()).getResolvedFunction().getSignature();
-      if (!signature.getArgumentTypes().isEmpty()) {
-          return Optional.empty();
-      }
-      if (signature.getName().equals(ROW_NUMBER_NAME)) {
-          return Optional.of(ROW_NUMBER);
-      }
-      if (signature.getName().equals(RANK_NAME)) {
-          return Optional.of(RANK);
-      }
+  public static Optional<TopKRankingNode.RankingType> 
toTopNRankingType(WindowNode node) {
+    if (node.getWindowFunctions().size() != 1
+        || !node.getSpecification().getOrderingScheme().isPresent()) {
+      return Optional.empty();
+    }
+
+    BoundSignature signature =
+        
getOnlyElement(node.getWindowFunctions().values()).getResolvedFunction().getSignature();
+    if (!signature.getArgumentTypes().isEmpty()) {
       return Optional.empty();
-  }*/
+    }
+    if (signature.getName().equals("row_number")) {
+      return Optional.of(ROW_NUMBER);
+    }
+    if (signature.getName().equals("rank")) {
+      return Optional.of(RANK);
+    }
+    return Optional.empty();
+  }
 }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/node/RowNumberNode.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/node/RowNumberNode.java
new file mode 100644
index 00000000000..9c8c3408f4c
--- /dev/null
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/node/RowNumberNode.java
@@ -0,0 +1,177 @@
+package org.apache.iotdb.db.queryengine.plan.relational.planner.node;
+
+import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNode;
+import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNodeId;
+import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNodeType;
+import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanVisitor;
+import 
org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.SingleChildProcessNode;
+import org.apache.iotdb.db.queryengine.plan.relational.planner.Symbol;
+
+import com.google.common.base.Objects;
+import com.google.common.collect.ImmutableList;
+import org.apache.tsfile.utils.ReadWriteIOUtils;
+
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.Collections;
+import java.util.List;
+import java.util.Optional;
+
+import static com.google.common.base.Preconditions.checkArgument;
+
+public class RowNumberNode extends SingleChildProcessNode {
+  private final List<Symbol> partitionBy;
+  /*
+   * This flag indicates that the node depends on the row order established by 
the subplan.
+   * It is taken into account while adding local exchanges to the plan, 
ensuring that sorted order
+   * of data will be respected.
+   * Note: if the subplan doesn't produce sorted output, this flag doesn't 
change the resulting plan.
+   * Note: this flag is used for planning of queries involving ORDER BY and 
OFFSET.
+   */
+  private final boolean orderSensitive;
+  private final Optional<Integer> maxRowCountPerPartition;
+  private final Symbol rowNumberSymbol;
+
+  public RowNumberNode(
+      PlanNodeId id,
+      List<Symbol> partitionBy,
+      boolean orderSensitive,
+      Symbol rowNumberSymbol,
+      Optional<Integer> maxRowCountPerPartition) {
+    super(id);
+
+    this.partitionBy = ImmutableList.copyOf(partitionBy);
+    this.orderSensitive = orderSensitive;
+    this.rowNumberSymbol = rowNumberSymbol;
+    this.maxRowCountPerPartition = maxRowCountPerPartition;
+  }
+
+  public RowNumberNode(
+      PlanNodeId id,
+      PlanNode child,
+      List<Symbol> partitionBy,
+      boolean orderSensitive,
+      Symbol rowNumberSymbol,
+      Optional<Integer> maxRowCountPerPartition) {
+    super(id, child);
+
+    this.partitionBy = ImmutableList.copyOf(partitionBy);
+    this.orderSensitive = orderSensitive;
+    this.rowNumberSymbol = rowNumberSymbol;
+    this.maxRowCountPerPartition = maxRowCountPerPartition;
+  }
+
+  @Override
+  public PlanNode clone() {
+    return new RowNumberNode(
+        getPlanNodeId(), partitionBy, orderSensitive, rowNumberSymbol, 
maxRowCountPerPartition);
+  }
+
+  public <R, C> R accept(PlanVisitor<R, C> visitor, C context) {
+    return visitor.visitRowNumber(this, context);
+  }
+
+  @Override
+  public List<String> getOutputColumnNames() {
+    throw new UnsupportedOperationException();
+  }
+
+  @Override
+  protected void serializeAttributes(ByteBuffer byteBuffer) {
+    PlanNodeType.TABLE_ROW_NUMBER_NODE.serialize(byteBuffer);
+    ReadWriteIOUtils.write(partitionBy.size(), byteBuffer);
+    for (Symbol symbol : partitionBy) {
+      Symbol.serialize(symbol, byteBuffer);
+    }
+    ReadWriteIOUtils.write(orderSensitive, byteBuffer);
+    Symbol.serialize(rowNumberSymbol, byteBuffer);
+    if (maxRowCountPerPartition.isPresent()) {
+      ReadWriteIOUtils.write(true, byteBuffer);
+      ReadWriteIOUtils.write(maxRowCountPerPartition.get(), byteBuffer);
+    } else {
+      ReadWriteIOUtils.write(false, byteBuffer);
+    }
+  }
+
+  @Override
+  protected void serializeAttributes(DataOutputStream stream) throws 
IOException {
+    PlanNodeType.TABLE_ROW_NUMBER_NODE.serialize(stream);
+    ReadWriteIOUtils.write(partitionBy.size(), stream);
+    for (Symbol symbol : partitionBy) {
+      Symbol.serialize(symbol, stream);
+    }
+    ReadWriteIOUtils.write(orderSensitive, stream);
+    Symbol.serialize(rowNumberSymbol, stream);
+    if (maxRowCountPerPartition.isPresent()) {
+      ReadWriteIOUtils.write(true, stream);
+      ReadWriteIOUtils.write(maxRowCountPerPartition.get(), stream);
+    } else {
+      ReadWriteIOUtils.write(false, stream);
+    }
+  }
+
+  public static RowNumberNode deserialize(ByteBuffer buffer) {
+    int partitionBySize = ReadWriteIOUtils.readInt(buffer);
+    ImmutableList.Builder<Symbol> partitionBy = ImmutableList.builder();
+    for (int i = 0; i < partitionBySize; i++) {
+      partitionBy.add(Symbol.deserialize(buffer));
+    }
+    boolean orderSensitive = ReadWriteIOUtils.readBoolean(buffer);
+    Symbol rowNumberSymbol = Symbol.deserialize(buffer);
+    Optional<Integer> maxRowCountPerPartition;
+    if (ReadWriteIOUtils.readBoolean(buffer)) {
+      maxRowCountPerPartition = Optional.of(ReadWriteIOUtils.readInt(buffer));
+    } else {
+      maxRowCountPerPartition = Optional.empty();
+    }
+
+    PlanNodeId planNodeId = PlanNodeId.deserialize(buffer);
+    return new RowNumberNode(
+        planNodeId, partitionBy.build(), orderSensitive, rowNumberSymbol, 
maxRowCountPerPartition);
+  }
+
+  @Override
+  public List<Symbol> getOutputSymbols() {
+    return Collections.singletonList(rowNumberSymbol);
+  }
+
+  @Override
+  public PlanNode replaceChildren(List<PlanNode> newChildren) {
+    checkArgument(newChildren.size() == 1, "wrong number of new children");
+    return new RowNumberNode(
+        id,
+        newChildren.get(0),
+        partitionBy,
+        orderSensitive,
+        rowNumberSymbol,
+        maxRowCountPerPartition);
+  }
+
+  @Override
+  public boolean equals(Object o) {
+    if (this == o) return true;
+    if (o == null || getClass() != o.getClass()) return false;
+    if (!super.equals(o)) return false;
+    RowNumberNode node = (RowNumberNode) o;
+
+    if (node.partitionBy.size() != partitionBy.size()) return false;
+    for (int i = 0; i < partitionBy.size(); i++) {
+      if (!node.partitionBy.get(i).equals(partitionBy.get(i))) return false;
+    }
+    return Objects.equal(orderSensitive, node.orderSensitive)
+        && Objects.equal(rowNumberSymbol, node.rowNumberSymbol)
+        && Objects.equal(maxRowCountPerPartition, 
node.maxRowCountPerPartition);
+  }
+
+  @Override
+  public int hashCode() {
+    return Objects.hashCode(
+        super.hashCode(), partitionBy, orderSensitive, rowNumberSymbol, 
maxRowCountPerPartition);
+  }
+
+  @Override
+  public String toString() {
+    return "RowNumber-" + this.getPlanNodeId();
+  }
+}
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/node/TopKRankingNode.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/node/TopKRankingNode.java
new file mode 100644
index 00000000000..1e5934f6ab8
--- /dev/null
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/node/TopKRankingNode.java
@@ -0,0 +1,168 @@
+package org.apache.iotdb.db.queryengine.plan.relational.planner.node;
+
+import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNode;
+import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNodeId;
+import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNodeType;
+import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanVisitor;
+import 
org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.MultiChildProcessNode;
+import 
org.apache.iotdb.db.queryengine.plan.relational.planner.DataOrganizationSpecification;
+import org.apache.iotdb.db.queryengine.plan.relational.planner.Symbol;
+
+import com.google.common.base.Objects;
+import org.apache.tsfile.utils.ReadWriteIOUtils;
+
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.Collections;
+import java.util.List;
+
+import static com.google.common.base.Preconditions.checkArgument;
+
+public class TopKRankingNode extends MultiChildProcessNode {
+  public enum RankingType {
+    ROW_NUMBER,
+    RANK,
+    DENSE_RANK
+  }
+
+  private final DataOrganizationSpecification specification;
+  private final RankingType rankingType;
+  private final Symbol rankingSymbol;
+  private final int maxRankingPerPartition;
+  private final boolean partial;
+
+  public TopKRankingNode(
+      PlanNodeId id,
+      DataOrganizationSpecification specification,
+      RankingType rankingType,
+      Symbol rankingSymbol,
+      int maxRankingPerPartition,
+      boolean partial) {
+    super(id);
+
+    this.specification = specification;
+    this.rankingType = rankingType;
+    this.rankingSymbol = rankingSymbol;
+    this.maxRankingPerPartition = maxRankingPerPartition;
+    this.partial = partial;
+  }
+
+  public TopKRankingNode(
+      PlanNodeId id,
+      List<PlanNode> children,
+      DataOrganizationSpecification specification,
+      RankingType rankingType,
+      Symbol rankingSymbol,
+      int maxRankingPerPartition,
+      boolean partial) {
+    super(id, children);
+
+    this.specification = specification;
+    this.rankingType = rankingType;
+    this.rankingSymbol = rankingSymbol;
+    this.maxRankingPerPartition = maxRankingPerPartition;
+    this.partial = partial;
+  }
+
+  @Override
+  public PlanNode clone() {
+    return new TopKRankingNode(
+        getPlanNodeId(),
+        specification,
+        rankingType,
+        rankingSymbol,
+        maxRankingPerPartition,
+        partial);
+  }
+
+  @Override
+  public <R, C> R accept(PlanVisitor<R, C> visitor, C context) {
+    return visitor.visitTopKRanking(this, context);
+  }
+
+  @Override
+  public List<String> getOutputColumnNames() {
+    throw new UnsupportedOperationException();
+  }
+
+  @Override
+  protected void serializeAttributes(ByteBuffer byteBuffer) {
+    PlanNodeType.TABLE_TOPK_RANKING_NODE.serialize(byteBuffer);
+    specification.serialize(byteBuffer);
+    ReadWriteIOUtils.write(rankingType.ordinal(), byteBuffer);
+    Symbol.serialize(rankingSymbol, byteBuffer);
+    ReadWriteIOUtils.write(maxRankingPerPartition, byteBuffer);
+    ReadWriteIOUtils.write(partial, byteBuffer);
+  }
+
+  @Override
+  protected void serializeAttributes(DataOutputStream stream) throws 
IOException {
+    PlanNodeType.TABLE_TOPK_RANKING_NODE.serialize(stream);
+    specification.serialize(stream);
+    ReadWriteIOUtils.write(rankingType.ordinal(), stream);
+    Symbol.serialize(rankingSymbol, stream);
+    ReadWriteIOUtils.write(maxRankingPerPartition, stream);
+    ReadWriteIOUtils.write(partial, stream);
+  }
+
+  public static TopKRankingNode deserialize(ByteBuffer byteBuffer) {
+    DataOrganizationSpecification specification =
+        DataOrganizationSpecification.deserialize(byteBuffer);
+    RankingType rankingType = 
RankingType.values()[ReadWriteIOUtils.readInt(byteBuffer)];
+    Symbol rankingSymbol = Symbol.deserialize(byteBuffer);
+    int maxRankingPerPartition = ReadWriteIOUtils.readInt(byteBuffer);
+    boolean partial = ReadWriteIOUtils.readBoolean(byteBuffer);
+
+    PlanNodeId planNodeId = PlanNodeId.deserialize(byteBuffer);
+    return new TopKRankingNode(
+        planNodeId, specification, rankingType, rankingSymbol, 
maxRankingPerPartition, partial);
+  }
+
+  @Override
+  public List<Symbol> getOutputSymbols() {
+    return Collections.singletonList(rankingSymbol);
+  }
+
+  @Override
+  public PlanNode replaceChildren(List<PlanNode> newChildren) {
+    checkArgument(children.size() == newChildren.size(), "wrong number of new 
children");
+    return new TopKRankingNode(
+        id,
+        newChildren,
+        specification,
+        rankingType,
+        rankingSymbol,
+        maxRankingPerPartition,
+        partial);
+  }
+
+  @Override
+  public boolean equals(Object o) {
+    if (this == o) return true;
+    if (o == null || getClass() != o.getClass()) return false;
+    if (!super.equals(o)) return false;
+    TopKRankingNode rankingNode = (TopKRankingNode) o;
+    return Objects.equal(specification, rankingNode.specification)
+        && Objects.equal(rankingType, rankingNode.rankingType)
+        && Objects.equal(rankingSymbol, rankingNode.rankingSymbol)
+        && Objects.equal(maxRankingPerPartition, 
rankingNode.maxRankingPerPartition)
+        && Objects.equal(partial, rankingNode.partial);
+  }
+
+  @Override
+  public int hashCode() {
+    return Objects.hashCode(
+        super.hashCode(),
+        specification,
+        rankingType,
+        rankingSymbol,
+        maxRankingPerPartition,
+        partial);
+  }
+
+  @Override
+  public String toString() {
+    return "TopKRankingNode-" + this.getPlanNodeId();
+  }
+}
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/node/ValuesNode.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/node/ValuesNode.java
new file mode 100644
index 00000000000..7a2a971e6af
--- /dev/null
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/node/ValuesNode.java
@@ -0,0 +1,211 @@
+package org.apache.iotdb.db.queryengine.plan.relational.planner.node;
+
+import org.apache.iotdb.common.rpc.thrift.TRegionReplicaSet;
+import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNode;
+import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNodeId;
+import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNodeType;
+import 
org.apache.iotdb.db.queryengine.plan.planner.plan.node.source.SourceNode;
+import org.apache.iotdb.db.queryengine.plan.relational.planner.Symbol;
+import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.Expression;
+import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.Row;
+
+import com.google.common.collect.ImmutableList;
+import org.apache.tsfile.utils.ReadWriteIOUtils;
+
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Optional;
+
+import static com.google.common.base.Preconditions.checkArgument;
+import static com.google.common.base.Preconditions.checkState;
+import static com.google.common.collect.ImmutableList.toImmutableList;
+import static com.google.common.collect.Iterables.getOnlyElement;
+import static java.util.Objects.requireNonNull;
+
+public class ValuesNode extends SourceNode {
+  private final List<Symbol> outputSymbols;
+  private final int rowCount;
+  // If ValuesNode produces output symbols, each row in ValuesNode is 
represented by a single
+  // expression in `rows` list.
+  // It can be an expression of type Row or any other expression that 
evaluates to RowType.
+  // In case when output symbols are present but ValuesNode does not have any 
rows, `rows` is an
+  // Optional with empty list.
+  // If ValuesNode does not produce any output symbols, `rows` is 
Optional.empty().
+  private final Optional<List<Expression>> rows;
+
+  protected TRegionReplicaSet regionReplicaSet;
+
+  /** Constructor of ValuesNode with non-empty output symbols list */
+  public ValuesNode(PlanNodeId id, List<Symbol> outputSymbols, 
List<Expression> rows) {
+    this(id, outputSymbols, rows.size(), Optional.of(rows));
+  }
+
+  /** Constructor of ValuesNode with empty output symbols list */
+  public ValuesNode(PlanNodeId id, int rowCount) {
+    this(id, ImmutableList.of(), rowCount, Optional.empty());
+  }
+
+  public ValuesNode(
+      PlanNodeId id, List<Symbol> outputSymbols, int rowCount, 
Optional<List<Expression>> rows) {
+    super(id);
+    this.outputSymbols =
+        ImmutableList.copyOf(requireNonNull(outputSymbols, "outputSymbols is 
null"));
+    this.rowCount = rowCount;
+
+    requireNonNull(rows, "rows is null");
+    if (rows.isPresent()) {
+      checkArgument(
+          rowCount == rows.get().size(),
+          "declared and actual row counts don't match: %s vs %s",
+          rowCount,
+          rows.get().size());
+
+      // check row size consistency (only for rows specified as Row)
+      List<Integer> rowSizes =
+          rows.get().stream()
+              .map(row -> requireNonNull(row, "row is null"))
+              .filter(expression -> expression instanceof Row)
+              .map(expression -> ((Row) expression).getItems().size())
+              .distinct()
+              .collect(toImmutableList());
+      checkState(rowSizes.size() <= 1, "mismatched rows. All rows must be the 
same size");
+
+      // check if row size matches the number of output symbols (only for rows 
specified as Row)
+      if (rowSizes.size() == 1) {
+        checkState(
+            getOnlyElement(rowSizes).equals(outputSymbols.size()),
+            "row size doesn't match the number of output symbols: %s vs %s",
+            getOnlyElement(rowSizes),
+            outputSymbols.size());
+      }
+    } else {
+      checkArgument(
+          outputSymbols.isEmpty(),
+          "missing rows specification for Values with non-empty output 
symbols");
+    }
+
+    if (outputSymbols.isEmpty()) {
+      this.rows = Optional.empty();
+    } else {
+      this.rows = rows.map(ImmutableList::copyOf);
+    }
+  }
+
+  @Override
+  public List<PlanNode> getChildren() {
+    return ImmutableList.of();
+  }
+
+  @Override
+  public void addChild(PlanNode child) {}
+
+  @Override
+  public PlanNode clone() {
+    return new ValuesNode(
+        getPlanNodeId(), outputSymbols, rowCount, 
rows.map(ImmutableList::copyOf));
+  }
+
+  @Override
+  public int allowedChildCount() {
+    return 0;
+  }
+
+  @Override
+  public List<String> getOutputColumnNames() {
+    return 
outputSymbols.stream().map(Symbol::getName).collect(toImmutableList());
+  }
+
+  @Override
+  public void serializeAttributes(ByteBuffer byteBuffer) {
+    ReadWriteIOUtils.write(outputSymbols.size(), byteBuffer);
+    outputSymbols.forEach(symbol -> ReadWriteIOUtils.write(symbol.getName(), 
byteBuffer));
+    ReadWriteIOUtils.write(rowCount, byteBuffer);
+    if (rows.isPresent()) {
+      ReadWriteIOUtils.write(true, byteBuffer);
+      ReadWriteIOUtils.write(rows.get().size(), byteBuffer);
+      for (Expression expression : rows.get()) {
+        Expression.serialize(expression, byteBuffer);
+      }
+    } else {
+      ReadWriteIOUtils.write(false, byteBuffer);
+    }
+  }
+
+  @Override
+  public void serializeAttributes(DataOutputStream stream) throws IOException {
+    PlanNodeType.TABLE_VALUES_NODE.serialize(stream);
+
+    ReadWriteIOUtils.write(outputSymbols.size(), stream);
+    for (Symbol symbol : outputSymbols) {
+      ReadWriteIOUtils.write(symbol.getName(), stream);
+    }
+    ReadWriteIOUtils.write(rowCount, stream);
+    if (rows.isPresent()) {
+      ReadWriteIOUtils.write(true, stream);
+      ReadWriteIOUtils.write(rows.get().size(), stream);
+      for (Expression expression : rows.get()) {
+        Expression.serialize(expression, stream);
+      }
+    } else {
+      ReadWriteIOUtils.write(false, stream);
+    }
+  }
+
+  public static ValuesNode deserialize(ByteBuffer byteBuffer) {
+    PlanNodeType.TABLE_VALUES_NODE.serialize(byteBuffer);
+
+    int size = ReadWriteIOUtils.read(byteBuffer);
+    List<Symbol> outputSymbols = new ArrayList<>();
+    for (int i = 0; i < size; i++) {
+      outputSymbols.add(new Symbol(ReadWriteIOUtils.readString(byteBuffer)));
+    }
+
+    int rowCount = ReadWriteIOUtils.read(byteBuffer);
+
+    List<Expression> rows = new ArrayList<>();
+    boolean flag = ReadWriteIOUtils.readBool(byteBuffer);
+    if (flag) {
+      size = ReadWriteIOUtils.readInt(byteBuffer);
+      for (int i = 0; i < size; i++) {
+        rows.add(Expression.deserialize(byteBuffer));
+      }
+    }
+
+    PlanNodeId planNodeId = PlanNodeId.deserialize(byteBuffer);
+    return new ValuesNode(
+        planNodeId, outputSymbols, rowCount, flag ? Optional.of(rows) : 
Optional.empty());
+  }
+
+  public int getRowCount() {
+    return rowCount;
+  }
+
+  public Optional<List<Expression>> getRows() {
+    return rows;
+  }
+
+  @Override
+  public PlanNode replaceChildren(List<PlanNode> newChildren) {
+    checkArgument(newChildren.isEmpty(), "newChildren is not empty");
+    return this;
+  }
+
+  @Override
+  public void setRegionReplicaSet(TRegionReplicaSet regionReplicaSet) {
+    this.regionReplicaSet = regionReplicaSet;
+  }
+
+  @Override
+  public TRegionReplicaSet getRegionReplicaSet() {
+    return regionReplicaSet;
+  }
+
+  @Override
+  public void open() throws Exception {}
+
+  @Override
+  public void close() throws Exception {}
+}
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/node/WindowNode.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/node/WindowNode.java
index 021d5060540..02a77920c30 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/node/WindowNode.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/node/WindowNode.java
@@ -136,6 +136,10 @@ public class WindowNode extends SingleChildProcessNode {
     return ImmutableList.copyOf(concat(child.getOutputSymbols(), 
windowFunctions.keySet()));
   }
 
+  public Set<Symbol> getCreatedSymbols() {
+    return ImmutableSet.copyOf(windowFunctions.keySet());
+  }
+
   @Override
   public PlanNode replaceChildren(List<PlanNode> newChildren) {
     return new WindowNode(
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/HeapTraversal.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/HeapTraversal.java
new file mode 100644
index 00000000000..25eb13b30e5
--- /dev/null
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/HeapTraversal.java
@@ -0,0 +1,45 @@
+package org.apache.iotdb.db.utils;
+
+import org.apache.tsfile.utils.RamUsageEstimator;
+
+import static com.google.common.base.Preconditions.checkArgument;
+import static com.google.common.base.Preconditions.checkState;
+
+public class HeapTraversal {
+  public enum Child {
+    LEFT,
+    RIGHT
+  }
+
+  private static final long INSTANCE_SIZE =
+      RamUsageEstimator.shallowSizeOfInstance(HeapTraversal.class);
+  private static final long TOP_BIT_MASK = 1L << (Long.SIZE - 1);
+
+  private long shifted;
+  private int treeDepthToNode;
+
+  public void resetWithPathTo(long targetNodeIndex) {
+    checkArgument(targetNodeIndex >= 1, "Target node index must be greater 
than or equal to one");
+    int leadingZeros = Long.numberOfLeadingZeros(targetNodeIndex);
+    // Shift off the leading zeros PLUS the most significant one bit (which is 
not needed for this
+    // calculation)
+    shifted = targetNodeIndex << (leadingZeros + 1);
+    treeDepthToNode = Long.SIZE - (leadingZeros + 1);
+  }
+
+  public boolean isTarget() {
+    return treeDepthToNode == 0;
+  }
+
+  public Child nextChild() {
+    checkState(!isTarget(), "Already at target");
+    Child childToFollow = (shifted & TOP_BIT_MASK) == 0 ? Child.LEFT : 
Child.RIGHT;
+    shifted <<= 1;
+    treeDepthToNode--;
+    return childToFollow;
+  }
+
+  public long sizeOf() {
+    return INSTANCE_SIZE;
+  }
+}


Reply via email to