This is an automated email from the ASF dual-hosted git repository.

shuwenwei pushed a commit to branch skipNotSatisfiedTimeRange-1.3
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit a179718fb5b5af0d395f7a99d8b0bd2868a74d58
Author: shuwenwei <[email protected]>
AuthorDate: Fri Oct 17 18:29:12 2025 +0800

    Optimize memtable scan
---
 .../fragment/FragmentInstanceContext.java          |  20 ++
 .../execution/operator/source/SeriesScanUtil.java  |  65 +++++-
 .../read/reader/common/NoDataPointReader.java      |  59 ++++++
 .../db/utils/datastructure/AlignedTVList.java      |  90 +++++---
 .../iotdb/db/utils/datastructure/LazyBitMap.java   |  93 +++++++++
 .../db/utils/datastructure/MemPointIterator.java   |   3 +
 .../MergeSortMultiAlignedTVListIterator.java       |  22 ++
 .../MergeSortMultiTVListIterator.java              |  22 ++
 .../OrderedMultiAlignedTVListIterator.java         |  24 +++
 .../datastructure/OrderedMultiTVListIterator.java  |  24 +++
 .../iotdb/db/utils/datastructure/TVList.java       | 229 ++++++++++++++++++++-
 .../memtable/AlignedTVListIteratorTest.java        | 110 ++++++++++
 .../memtable/NonAlignedTVListIteratorTest.java     |  87 ++++++++
 13 files changed, 799 insertions(+), 49 deletions(-)

diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/fragment/FragmentInstanceContext.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/fragment/FragmentInstanceContext.java
index cdb44db5f66..c8c89d05399 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/fragment/FragmentInstanceContext.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/fragment/FragmentInstanceContext.java
@@ -55,12 +55,14 @@ import org.apache.iotdb.db.utils.datastructure.TVList;
 import org.apache.iotdb.mpp.rpc.thrift.TFetchFragmentInstanceStatisticsResp;
 
 import org.apache.tsfile.file.metadata.IDeviceID;
+import org.apache.tsfile.read.common.TimeRange;
 import org.apache.tsfile.read.filter.basic.Filter;
 import org.apache.tsfile.utils.RamUsageEstimator;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.time.ZoneId;
+import java.util.Collections;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
@@ -91,6 +93,7 @@ public class FragmentInstanceContext extends QueryContext {
 
   protected IDataRegionForQuery dataRegion;
   private Filter globalTimeFilter;
+  private List<TimeRange> globalTimeFilterTimeRanges;
 
   // it will only be used once, after sharedQueryDataSource being inited, it 
will be set to null
   protected List<PartialPath> sourcePaths;
@@ -511,6 +514,23 @@ public class FragmentInstanceContext extends QueryContext {
     return globalTimeFilter;
   }
 
+  public List<TimeRange> getGlobalTimeFilterTimeRanges() {
+    if (globalTimeFilter == null) {
+      return Collections.singletonList(new TimeRange(Long.MIN_VALUE, 
Long.MAX_VALUE));
+    }
+    List<TimeRange> local = globalTimeFilterTimeRanges;
+    if (local == null) {
+      synchronized (this) {
+        local = globalTimeFilterTimeRanges;
+        if (local == null) {
+          local = globalTimeFilter.getTimeRanges();
+          globalTimeFilterTimeRanges = local;
+        }
+      }
+    }
+    return local;
+  }
+
   public IDataRegionForQuery getDataRegion() {
     return dataRegion;
   }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/SeriesScanUtil.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/SeriesScanUtil.java
index d9e49d63ebf..2df717d9738 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/SeriesScanUtil.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/SeriesScanUtil.java
@@ -35,6 +35,7 @@ import 
org.apache.iotdb.db.storageengine.dataregion.read.reader.chunk.MemChunkLo
 import 
org.apache.iotdb.db.storageengine.dataregion.read.reader.chunk.MemPageReader;
 import 
org.apache.iotdb.db.storageengine.dataregion.read.reader.common.DescPriorityMergeReader;
 import 
org.apache.iotdb.db.storageengine.dataregion.read.reader.common.MergeReaderPriority;
+import 
org.apache.iotdb.db.storageengine.dataregion.read.reader.common.NoDataPointReader;
 import 
org.apache.iotdb.db.storageengine.dataregion.read.reader.common.PriorityMergeReader;
 import org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileResource;
 import org.apache.iotdb.db.utils.datastructure.MemPointIterator;
@@ -131,7 +132,10 @@ public class SeriesScanUtil implements Accountable {
           + RamUsageEstimator.shallowSizeOfInstance(IDeviceID.class)
           + RamUsageEstimator.shallowSizeOfInstance(TimeOrderUtils.class)
           + RamUsageEstimator.shallowSizeOfInstance(PaginationController.class)
-          + RamUsageEstimator.shallowSizeOfInstance(SeriesScanOptions.class);
+          + RamUsageEstimator.shallowSizeOfInstance(SeriesScanOptions.class)
+          + RamUsageEstimator.shallowSizeOfInstance(TimeRange.class);
+
+  protected TimeRange satisfiedTimeRange;
 
   public SeriesScanUtil(
       PartialPath seriesPath,
@@ -193,6 +197,20 @@ public class SeriesScanUtil implements Accountable {
     // init file index
     orderUtils.setCurSeqFileIndex(dataSource);
     curUnseqFileIndex = 0;
+
+    if (satisfiedTimeRange == null) {
+      long startTime = Long.MAX_VALUE;
+      long endTime = Long.MIN_VALUE;
+      if (scanOptions.getGlobalTimeFilter() == null) {
+        satisfiedTimeRange = new TimeRange(Long.MIN_VALUE, Long.MAX_VALUE);
+        return;
+      }
+      for (TimeRange timeRange : context.getGlobalTimeFilterTimeRanges()) {
+        startTime = Math.min(startTime, timeRange.getMin());
+        endTime = Math.max(endTime, timeRange.getMax());
+      }
+      satisfiedTimeRange = new TimeRange(startTime, endTime);
+    }
   }
 
   protected PriorityMergeReader getPriorityMergeReader() {
@@ -660,6 +678,13 @@ public class SeriesScanUtil implements Accountable {
         readOnlyMemChunk.createMemPointIterator(
             orderUtils.getScanOrder(), scanOptions.getGlobalTimeFilter());
     for (Statistics<? extends Serializable> statistics : statisticsList) {
+      long orderTime = orderUtils.getOrderTime(statistics);
+      boolean canSkip =
+          (orderUtils.getAscending() && orderTime > 
satisfiedTimeRange.getMax())
+              || (!orderUtils.getAscending() && orderTime < 
satisfiedTimeRange.getMin());
+      if (canSkip) {
+        break;
+      }
       IVersionPageReader versionPageReader =
           new LazyMemVersionPageReader(
               context,
@@ -1434,6 +1459,7 @@ public class SeriesScanUtil implements Accountable {
     protected final boolean isSeq;
     protected final boolean isAligned;
     private boolean inited = false;
+    private boolean hasData = true;
 
     LazyMemVersionPageReader(
         QueryContext context,
@@ -1453,11 +1479,14 @@ public class SeriesScanUtil implements Accountable {
     }
 
     public IPointReader getPointReader() {
+      if (!hasData) {
+        return NoDataPointReader.getInstance();
+      }
       return memPointIterator;
     }
 
     public boolean hasNextBatch() {
-      return memPointIterator.hasNextBatch();
+      return hasData && memPointIterator.hasNextBatch();
     }
 
     public void setCurrentPageTimeRangeToMemPointIterator() {
@@ -1466,10 +1495,34 @@ public class SeriesScanUtil implements Accountable {
       }
       if (statistics.getStartTime() > statistics.getEndTime()) {
         // empty
+        hasData = false;
         return;
       }
-      this.memPointIterator.setCurrentPageTimeRange(
-          new TimeRange(statistics.getStartTime(), statistics.getEndTime()));
+      Filter globalTimeFilter = ((FragmentInstanceContext) 
context).getGlobalTimeFilter();
+      if (globalTimeFilter == null) {
+        this.memPointIterator.setCurrentPageTimeRange(
+            new TimeRange(statistics.getStartTime(), statistics.getEndTime()));
+        return;
+      }
+
+      long startTime = statistics.getStartTime();
+      long endTime = statistics.getEndTime();
+      long minStart = Long.MAX_VALUE;
+      long maxEnd = Long.MIN_VALUE;
+      for (TimeRange timeRange :
+          ((FragmentInstanceContext) context).getGlobalTimeFilterTimeRanges()) 
{
+        if (timeRange.overlaps(new TimeRange(startTime, endTime))) {
+          minStart = Math.min(minStart, Math.max(timeRange.getMin(), 
startTime));
+          maxEnd = Math.max(maxEnd, Math.min(timeRange.getMax(), endTime));
+        }
+      }
+
+      if (minStart > maxEnd) {
+        hasData = false;
+        return;
+      }
+
+      this.memPointIterator.setCurrentPageTimeRange(new TimeRange(minStart, 
maxEnd));
     }
 
     public TsBlock nextBatch() {
@@ -1605,7 +1658,7 @@ public class SeriesScanUtil implements Accountable {
     @SuppressWarnings("squid:S3740")
     @Override
     public long getOverlapCheckTime(Statistics range) {
-      return range.getStartTime();
+      return Math.max(satisfiedTimeRange.getMin(), range.getStartTime());
     }
 
     @SuppressWarnings("squid:S3740")
@@ -1734,7 +1787,7 @@ public class SeriesScanUtil implements Accountable {
     @SuppressWarnings("squid:S3740")
     @Override
     public long getOverlapCheckTime(Statistics range) {
-      return range.getEndTime();
+      return Math.min(satisfiedTimeRange.getMax(), range.getEndTime());
     }
 
     @SuppressWarnings("squid:S3740")
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/read/reader/common/NoDataPointReader.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/read/reader/common/NoDataPointReader.java
new file mode 100644
index 00000000000..945318f6d22
--- /dev/null
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/read/reader/common/NoDataPointReader.java
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.storageengine.dataregion.read.reader.common;
+
+import org.apache.tsfile.read.TimeValuePair;
+import org.apache.tsfile.read.reader.IPointReader;
+
+import java.io.IOException;
+
+public class NoDataPointReader implements IPointReader {
+
+  private NoDataPointReader() {}
+
+  private static final IPointReader instance = new NoDataPointReader();
+
+  public static IPointReader getInstance() {
+    return instance;
+  }
+
+  @Override
+  public boolean hasNextTimeValuePair() throws IOException {
+    return false;
+  }
+
+  @Override
+  public TimeValuePair nextTimeValuePair() throws IOException {
+    return null;
+  }
+
+  @Override
+  public TimeValuePair currentTimeValuePair() throws IOException {
+    return null;
+  }
+
+  @Override
+  public long getUsedMemorySize() {
+    return 0;
+  }
+
+  @Override
+  public void close() throws IOException {}
+}
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/AlignedTVList.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/AlignedTVList.java
index 18853a94596..cf701383b37 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/AlignedTVList.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/AlignedTVList.java
@@ -1617,13 +1617,16 @@ public abstract class AlignedTVList extends TVList {
 
     @Override
     public TsBlock nextBatch() {
-      TsBlockBuilder builder = new TsBlockBuilder(maxNumberOfPointsInPage, 
dataTypeList);
+      int maxRowCountOfCurrentBatch = Math.min(rows - index, 
maxNumberOfPointsInPage);
+      TsBlockBuilder builder = new TsBlockBuilder(maxRowCountOfCurrentBatch, 
dataTypeList);
       // Time column
       TimeColumnBuilder timeBuilder = builder.getTimeColumnBuilder();
 
       int validRowCount = 0;
-      // duplicated time or deleted time are all invalid, true if we don't 
need this row
-      BitMap timeDuplicateInfo = null;
+      // Rows that are deleted or whose timestamps do not match the filter are 
considered invalid.
+      // The corresponding bit is set to true if the row is not needed.
+      LazyBitMap timeInvalidInfo = null;
+      LazyBitMap timeDuplicatedInfo = null;
 
       int startIndex = index;
       // time column
@@ -1632,12 +1635,15 @@ public abstract class AlignedTVList extends TVList {
         if (validRowCount >= maxNumberOfPointsInPage || 
isCurrentTimeExceedTimeRange(time)) {
           break;
         }
-        // skip empty row
-        if (allValueColDeletedMap != null
-            && 
allValueColDeletedMap.isMarked(getValueIndex(getScanOrderIndex(index)))) {
-          continue;
-        }
-        if (!isTimeSatisfied(time)) {
+        // skip invalid row
+        if ((allValueColDeletedMap != null
+                && 
allValueColDeletedMap.isMarked(getValueIndex(getScanOrderIndex(index))))
+            || !isTimeSatisfied(time)) {
+          timeInvalidInfo =
+              timeInvalidInfo == null
+                  ? new LazyBitMap(index, maxRowCountOfCurrentBatch, rows - 1)
+                  : timeInvalidInfo;
+          timeInvalidInfo.mark(index);
           continue;
         }
         int nextRowIndex = index + 1;
@@ -1645,20 +1651,25 @@ public abstract class AlignedTVList extends TVList {
             && ((allValueColDeletedMap != null
                     && allValueColDeletedMap.isMarked(
                         getValueIndex(getScanOrderIndex(nextRowIndex))))
-                || !isTimeSatisfied(time))) {
+                || 
!isTimeSatisfied(getTime(getScanOrderIndex(nextRowIndex))))) {
+          timeInvalidInfo =
+              timeInvalidInfo == null
+                  ? new LazyBitMap(nextRowIndex, maxRowCountOfCurrentBatch, 
rows - 1)
+                  : timeInvalidInfo;
+          timeInvalidInfo.mark(nextRowIndex);
           nextRowIndex++;
         }
         if ((nextRowIndex == rows || time != 
getTime(getScanOrderIndex(nextRowIndex)))) {
           timeBuilder.writeLong(time);
           validRowCount++;
         } else {
-          if (Objects.isNull(timeDuplicateInfo)) {
-            timeDuplicateInfo = new BitMap(rows);
+          if (Objects.isNull(timeDuplicatedInfo)) {
+            timeDuplicatedInfo = new LazyBitMap(index, 
maxRowCountOfCurrentBatch, rows - 1);
           }
-          // For this timeInvalidInfo, we mark all positions that are not the 
last one in the
+          // For this timeDuplicatedInfo, we mark all positions that are not 
the last one in the
           // ASC traversal. It has the same behaviour for the DESC traversal, 
because our ultimate
           // goal is to process all the data with the same timestamp before 
writing it into TsBlock.
-          timeDuplicateInfo.mark(index);
+          timeDuplicatedInfo.mark(index);
         }
         index = nextRowIndex - 1;
       }
@@ -1673,21 +1684,20 @@ public abstract class AlignedTVList extends TVList {
         int[] deleteCursor = {0};
         // Pair of Time and Index
         Pair<Long, Integer> lastValidPointIndexForTimeDupCheck = null;
-        if (Objects.nonNull(timeDuplicateInfo)) {
+        if (Objects.nonNull(timeDuplicatedInfo)) {
           lastValidPointIndexForTimeDupCheck = new Pair<>(Long.MIN_VALUE, 
null);
         }
         ColumnBuilder valueBuilder = builder.getColumnBuilder(columnIndex);
         currentWriteRowIndex = 0;
         for (int sortedRowIndex = startIndex; sortedRowIndex < index; 
sortedRowIndex++) {
-          // skip empty row
-          if ((allValueColDeletedMap != null
-                  && allValueColDeletedMap.isMarked(
-                      getValueIndex(getScanOrderIndex(sortedRowIndex))))
-              || !isTimeSatisfied(getTime(getScanOrderIndex(sortedRowIndex)))) 
{
-            continue;
+          // skip invalid rows
+          if (Objects.nonNull(timeInvalidInfo)) {
+            if (timeInvalidInfo.isMarked(sortedRowIndex)) {
+              continue;
+            }
           }
-          // skip time duplicated or totally deleted rows
-          if (Objects.nonNull(timeDuplicateInfo)) {
+          // skip time duplicated rows
+          if (Objects.nonNull(timeDuplicatedInfo)) {
             if (!outer.isNullValue(
                 getValueIndex(getScanOrderIndex(sortedRowIndex)), 
validColumnIndex)) {
               lastValidPointIndexForTimeDupCheck.left = 
getTime(getScanOrderIndex(sortedRowIndex));
@@ -1702,11 +1712,11 @@ public abstract class AlignedTVList extends TVList {
                     getValueIndex(getScanOrderIndex(sortedRowIndex));
               }
             }
-            // timeInvalidInfo was constructed when traversing the time column 
before. It can be
-            // reused when traversing each value column to skip deleted rows 
or non-last rows with
+            // timeDuplicatedInfo was constructed when traversing the time 
column before. It can be
+            // reused when traversing each value column to skip non-last rows 
with
             // duplicated timestamps.
             // Until the last duplicate timestamp is encountered, it will be 
skipped here.
-            if (timeDuplicateInfo.isMarked(sortedRowIndex)) {
+            if (timeDuplicatedInfo.isMarked(sortedRowIndex)) {
               continue;
             }
           }
@@ -1759,9 +1769,9 @@ public abstract class AlignedTVList extends TVList {
       TsBlock tsBlock = builder.build();
       if (needRebuildTsBlock(hasAnyNonNullValue)) {
         // if exist all null rows, at most have validRowCount - 1 valid rows
+        // When rebuilding TsBlock, pushDownFilter and paginationController 
are also processed.
         tsBlock = reBuildTsBlock(hasAnyNonNullValue, validRowCount, 
dataTypeList, tsBlock);
-      }
-      if (pushDownFilter != null) {
+      } else if (pushDownFilter != null) {
         tsBlock =
             TsBlockUtil.applyFilterAndLimitOffsetToTsBlock(
                 tsBlock,
@@ -1823,21 +1833,35 @@ public abstract class AlignedTVList extends TVList {
         int previousValidRowCount,
         List<TSDataType> tsDataTypeList,
         TsBlock previousTsBlock) {
+      boolean[] selection = hasAnyNonNullValue;
+      if (pushDownFilter != null) {
+        selection = pushDownFilter.satisfyTsBlock(hasAnyNonNullValue, 
previousTsBlock);
+      }
       TsBlockBuilder builder = new TsBlockBuilder(previousValidRowCount - 1, 
tsDataTypeList);
       TimeColumnBuilder timeColumnBuilder = builder.getTimeColumnBuilder();
       Column timeColumn = previousTsBlock.getTimeColumn();
+      int stopIndex = previousValidRowCount;
       for (int i = 0; i < previousValidRowCount; i++) {
-        if (hasAnyNonNullValue[i]) {
-          timeColumnBuilder.writeLong(timeColumn.getLong(i));
-          builder.declarePosition();
+        if (selection[i]) {
+          if (paginationController.hasCurOffset()) {
+            paginationController.consumeOffset();
+            selection[i] = false;
+          } else if (paginationController.hasCurLimit()) {
+            timeColumnBuilder.writeLong(timeColumn.getLong(i));
+            builder.declarePosition();
+            paginationController.consumeLimit();
+          } else {
+            stopIndex = i;
+            break;
+          }
         }
       }
 
       for (int columnIndex = 0; columnIndex < tsDataTypeList.size(); 
columnIndex++) {
         ColumnBuilder columnBuilder = builder.getColumnBuilder(columnIndex);
         Column column = previousTsBlock.getColumn(columnIndex);
-        for (int i = 0; i < previousValidRowCount; i++) {
-          if (hasAnyNonNullValue[i]) {
+        for (int i = 0; i < stopIndex; i++) {
+          if (selection[i]) {
             if (column.isNull(i)) {
               columnBuilder.appendNull();
             } else {
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/LazyBitMap.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/LazyBitMap.java
new file mode 100644
index 00000000000..dd5eac1a528
--- /dev/null
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/LazyBitMap.java
@@ -0,0 +1,93 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.utils.datastructure;
+
+import org.apache.tsfile.utils.BitMap;
+
+import java.util.ArrayList;
+
+public class LazyBitMap {
+  private final int startPosition;
+  private final int endPosition;
+  private final int blockSize;
+  private final ArrayList<BitMap> blocks;
+
+  public LazyBitMap(int startIndex, int appendSize, int endIndex) {
+    if (endIndex < startIndex) {
+      throw new IllegalArgumentException("endIndex must be >= startIndex");
+    }
+    if (appendSize <= 0) {
+      throw new IllegalArgumentException("appendSize must be positive");
+    }
+    this.startPosition = startIndex;
+    this.endPosition = endIndex;
+    this.blockSize = appendSize;
+    this.blocks = new ArrayList<>(2);
+  }
+
+  public void mark(int index) {
+    if (index < startPosition) {
+      throw new IndexOutOfBoundsException("Index below startPosition: " + 
index);
+    }
+    if (index > endPosition) {
+      throw new IndexOutOfBoundsException("Index exceeds endPosition: " + 
index);
+    }
+    int blockIndex = getBlockIndex(index);
+    ensureCapacity(blockIndex);
+    BitMap block = blocks.get(blockIndex);
+    if (block == null) {
+      block = new BitMap(blockSize);
+      blocks.set(blockIndex, block);
+    }
+    block.mark(getInnerIndex(index));
+  }
+
+  private void ensureCapacity(int blockIndex) {
+    while (blockIndex >= blocks.size()) {
+      blocks.add(null);
+    }
+  }
+
+  public boolean isMarked(int index) {
+    if (index < startPosition) {
+      return false;
+    }
+    if (index > endPosition) {
+      throw new IndexOutOfBoundsException("Index exceeds endPosition: " + 
index);
+    }
+    int blockIndex = getBlockIndex(index);
+    if (blockIndex >= blocks.size()) {
+      return false;
+    }
+    BitMap block = blocks.get(blockIndex);
+    if (block == null) {
+      return false;
+    }
+    return block.isMarked(getInnerIndex(index));
+  }
+
+  private int getBlockIndex(int index) {
+    return (index - startPosition) / blockSize;
+  }
+
+  private int getInnerIndex(int index) {
+    return (index - startPosition) % blockSize;
+  }
+}
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/MemPointIterator.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/MemPointIterator.java
index 1cce0ff0b4d..ca5686f7ac3 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/MemPointIterator.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/MemPointIterator.java
@@ -69,8 +69,11 @@ public abstract class MemPointIterator implements 
IPointReader {
 
   public void setCurrentPageTimeRange(TimeRange timeRange) {
     this.timeRange = timeRange;
+    skipToCurrentTimeRangeStartPosition();
   }
 
+  protected void skipToCurrentTimeRangeStartPosition() {}
+
   protected boolean isCurrentTimeExceedTimeRange(long time) {
     return timeRange != null
         && (scanOrder.isAscending() ? (time > timeRange.getMax()) : (time < 
timeRange.getMin()));
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/MergeSortMultiAlignedTVListIterator.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/MergeSortMultiAlignedTVListIterator.java
index c507bd75d50..7fa1949959d 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/MergeSortMultiAlignedTVListIterator.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/MergeSortMultiAlignedTVListIterator.java
@@ -86,6 +86,20 @@ public class MergeSortMultiAlignedTVListIterator extends 
MultiAlignedTVListItera
                     a.left.equals(b.left) ? a.right.compareTo(b.right) : 
b.left.compareTo(a.left));
   }
 
+  @Override
+  protected void skipToCurrentTimeRangeStartPosition() {
+    hasNext = false;
+    probeIterators.clear();
+    for (int i = 0; i < alignedTvListIterators.size(); i++) {
+      AlignedTVList.AlignedTVListIterator iterator = 
alignedTvListIterators.get(i);
+      iterator.skipToCurrentTimeRangeStartPosition();
+      if (iterator.hasNextTimeValuePair()) {
+        probeIterators.add(i);
+      }
+    }
+    probeNext = false;
+  }
+
   @Override
   protected void prepareNext() {
     hasNext = false;
@@ -288,4 +302,12 @@ public class MergeSortMultiAlignedTVListIterator extends 
MultiAlignedTVListItera
   protected int currentRowIndex(int columnIndex) {
     return rowIndices[columnIndex];
   }
+
+  @Override
+  public void setCurrentPageTimeRange(TimeRange timeRange) {
+    for (TVList.TVListIterator tvListIterator : this.alignedTvListIterators) {
+      tvListIterator.timeRange = timeRange;
+    }
+    super.setCurrentPageTimeRange(timeRange);
+  }
 }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/MergeSortMultiTVListIterator.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/MergeSortMultiTVListIterator.java
index 2e33811c257..aebd71dbddd 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/MergeSortMultiTVListIterator.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/MergeSortMultiTVListIterator.java
@@ -73,6 +73,20 @@ public class MergeSortMultiTVListIterator extends 
MultiTVListIterator {
                     a.left.equals(b.left) ? a.right.compareTo(b.right) : 
b.left.compareTo(a.left));
   }
 
+  @Override
+  protected void skipToCurrentTimeRangeStartPosition() {
+    hasNext = false;
+    probeIterators.clear();
+    for (int i = 0; i < tvListIterators.size(); i++) {
+      TVList.TVListIterator iterator = tvListIterators.get(i);
+      iterator.skipToCurrentTimeRangeStartPosition();
+      if (iterator.hasNextTimeValuePair()) {
+        probeIterators.add(i);
+      }
+    }
+    probeNext = false;
+  }
+
   @Override
   protected void prepareNext() {
     hasNext = false;
@@ -167,4 +181,12 @@ public class MergeSortMultiTVListIterator extends 
MultiTVListIterator {
       }
     }
   }
+
+  @Override
+  public void setCurrentPageTimeRange(TimeRange timeRange) {
+    for (TVList.TVListIterator tvListIterator : this.tvListIterators) {
+      tvListIterator.timeRange = timeRange;
+    }
+    super.setCurrentPageTimeRange(timeRange);
+  }
 }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/OrderedMultiAlignedTVListIterator.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/OrderedMultiAlignedTVListIterator.java
index 46d8c5f02ac..b21a619d238 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/OrderedMultiAlignedTVListIterator.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/OrderedMultiAlignedTVListIterator.java
@@ -98,6 +98,22 @@ public class OrderedMultiAlignedTVListIterator extends 
MultiAlignedTVListIterato
     probeNext = true;
   }
 
+  @Override
+  protected void skipToCurrentTimeRangeStartPosition() {
+    hasNext = false;
+    iteratorIndex = 0;
+    while (iteratorIndex < alignedTvListIterators.size() && !hasNext) {
+      AlignedTVList.AlignedTVListIterator iterator = 
alignedTvListIterators.get(iteratorIndex);
+      iterator.skipToCurrentTimeRangeStartPosition();
+      if (!iterator.hasNextTimeValuePair()) {
+        iteratorIndex++;
+        continue;
+      }
+      hasNext = iterator.hasNextTimeValuePair();
+    }
+    probeNext = false;
+  }
+
   @Override
   protected void next() {
     TVList.TVListIterator iterator = alignedTvListIterators.get(iteratorIndex);
@@ -129,4 +145,12 @@ public class OrderedMultiAlignedTVListIterator extends 
MultiAlignedTVListIterato
   protected int currentRowIndex(int columnIndex) {
     return rowIndices[columnIndex];
   }
+
+  @Override
+  public void setCurrentPageTimeRange(TimeRange timeRange) {
+    for (AlignedTVList.AlignedTVListIterator iterator : 
alignedTvListIterators) {
+      iterator.timeRange = timeRange;
+    }
+    super.setCurrentPageTimeRange(timeRange);
+  }
 }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/OrderedMultiTVListIterator.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/OrderedMultiTVListIterator.java
index 61ae5f1360d..27f79eca846 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/OrderedMultiTVListIterator.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/OrderedMultiTVListIterator.java
@@ -68,6 +68,22 @@ public class OrderedMultiTVListIterator extends 
MultiTVListIterator {
     probeNext = true;
   }
 
+  @Override
+  protected void skipToCurrentTimeRangeStartPosition() {
+    hasNext = false;
+    iteratorIndex = 0;
+    while (iteratorIndex < tvListIterators.size() && !hasNext) {
+      TVList.TVListIterator iterator = tvListIterators.get(iteratorIndex);
+      iterator.skipToCurrentTimeRangeStartPosition();
+      if (!iterator.hasNextTimeValuePair()) {
+        iteratorIndex++;
+        continue;
+      }
+      hasNext = iterator.hasNextTimeValuePair();
+    }
+    probeNext = false;
+  }
+
   @Override
   protected void next() {
     tvListIterators.get(iteratorIndex).next();
@@ -90,4 +106,12 @@ public class OrderedMultiTVListIterator extends 
MultiTVListIterator {
     }
     probeNext = false;
   }
+
+  @Override
+  public void setCurrentPageTimeRange(TimeRange timeRange) {
+    for (TVList.TVListIterator tvListIterator : this.tvListIterators) {
+      tvListIterator.timeRange = timeRange;
+    }
+    super.setCurrentPageTimeRange(timeRange);
+  }
 }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/TVList.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/TVList.java
index e37fdec551e..8fa7925bc3c 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/TVList.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/TVList.java
@@ -199,6 +199,105 @@ public abstract class TVList implements WALEntryValue {
     return timestamps.get(arrayIndex)[elementIndex];
   }
 
+  /**
+   * Performs a binary search to find the first position whose timestamp is 
greater than or equal to
+   * the given {@code time}.
+   *
+   * <p>This method assumes timestamps are sorted in ascending order. If the 
list is not sorted, an
+   * {@link UnsupportedOperationException} will be thrown.
+   *
+   * <p>Typical use case: locate the starting index of a time range query.
+   *
+   * <p>Example:
+   *
+   * <ul>
+   *   <li>timestamps = [10, 20, 20, 25, 30]
+   *   <li>time = 5 → return 0
+   *   <li>time = 20 → return 1
+   *   <li>time = 21 → return 3
+   *   <li>time = 40 → return 5 (all timestamps &lt; 40)
+   * </ul>
+   *
+   * <p><b>Return value range:</b>
+   *
+   * <ul>
+   *   <li>When a matching or greater element exists: {@code 0 <= index <= 
seqRowCount - 1}
+   *   <li>When all elements are smaller than {@code time}: {@code index == 
seqRowCount}
+   * </ul>
+   *
+   * @param time the target timestamp
+   * @param low the lower bound index (inclusive)
+   * @param high the upper bound index (inclusive)
+   * @return the index of the first timestamp ≥ {@code time}, or {@code 
seqRowCount} if all
+   *     timestamps are smaller
+   */
+  private int binarySearchTimestampFirstGreaterOrEqualsPosition(long time, int 
low, int high) {
+    if (!sorted && high >= seqRowCount) {
+      throw new UnsupportedOperationException("Current TVList is not sorted");
+    }
+    int mid;
+    while (low <= high) {
+      mid = low + ((high - low) >>> 1);
+      long midTime = getTime(mid);
+      if (midTime < time) {
+        low = mid + 1;
+      } else {
+        high = mid - 1;
+      }
+    }
+    return low;
+  }
+
+  /**
+   * Performs a binary search to find the last position whose timestamp is 
less than or equal to the
+   * given {@code time}.
+   *
+   * <p>This method assumes timestamps are sorted in ascending order. If the 
list is not sorted, an
+   * {@link UnsupportedOperationException} will be thrown.
+   *
+   * <p>Typical use case: locate the ending index of a time range query.
+   *
+   * <p>Example:
+   *
+   * <ul>
+   *   <li>timestamps = [10, 20, 20, 25, 30]
+   *   <li>time = 5 → return -1 (no timestamp ≤ 5)
+   *   <li>time = 20 → return 2
+   *   <li>time = 21 → return 2
+   *   <li>time = 50 → return 4
+   * </ul>
+   *
+   * <p><b>Return value range:</b>
+   *
+   * <ul>
+   *   <li>When a matching or smaller element exists: {@code 0 <= index <= 
seqRowCount - 1}
+   *   <li>When all elements are greater than {@code time}: {@code index == -1}
+   * </ul>
+   *
+   * @param time the target timestamp
+   * @param low the lower bound index (inclusive)
+   * @param high the upper bound index (inclusive)
+   * @return the index of the last timestamp ≤ {@code time}, or {@code -1} if 
all timestamps are
+   *     greater
+   */
+  private int binarySearchTimestampLastLessOrEqualsPosition(long time, int 
low, int high) {
+    if (!sorted && high >= seqRowCount) {
+      throw new UnsupportedOperationException("Current TVList is not sorted");
+    }
+
+    int mid;
+    while (low <= high) {
+      mid = low + ((high - low) >>> 1);
+      long midTime = getTime(mid);
+      if (midTime <= time) {
+        low = mid + 1;
+      } else {
+        high = mid - 1;
+      }
+    }
+    return high;
+  }
+
   protected void set(int src, int dest) {
     long srcT = getTime(src);
     int srcV = getValueIndex(src);
@@ -699,6 +798,61 @@ public abstract class TVList implements WALEntryValue {
       deleteCursor = new int[] {cursor};
     }
 
+    @Override
+    protected void skipToCurrentTimeRangeStartPosition() {
+      if (timeRange == null || index >= rows) {
+        return;
+      }
+      if (timeRange.contains(getTime(getScanOrderIndex(index)))) {
+        return;
+      }
+
+      int indexInTVList;
+      // Since there may be duplicate timestamps in TVList, we need to move to 
the index of the
+      // first timestamp that meets the requirements under current scanOrder.
+      if (scanOrder.isAscending()) {
+        long searchTimestamp = timeRange.getMin();
+        if (searchTimestamp <= outer.getMinTime()) {
+          return;
+        }
+        if (searchTimestamp > outer.getMaxTime()) {
+          // all satisfied data has been consumed
+          index = rows;
+          probeNext = true;
+          return;
+        }
+        // For asc scan, if it can not be found, the indexInTVList is too 
small, and we should move
+        // to the next timestamp position.
+        // If it can be found, move to the min index of current timestamp.
+        indexInTVList =
+            binarySearchTimestampFirstGreaterOrEqualsPosition(
+                searchTimestamp, getScanOrderIndex(index), rows - 1);
+      } else {
+        long searchTimestamp = timeRange.getMax();
+        if (searchTimestamp >= outer.getMaxTime()) {
+          return;
+        }
+        if (searchTimestamp < outer.getMinTime()) {
+          // all satisfied data has been consumed
+          index = rows;
+          probeNext = true;
+          return;
+        }
+        // For desc scan, regardless of whether it is found, the timestamp 
corresponding to
+        // indexInTVList has met the conditions. We only need to find the 
index that first
+        // encounters this timestamp during desc scan.
+        indexInTVList =
+            binarySearchTimestampLastLessOrEqualsPosition(
+                searchTimestamp, 0, getScanOrderIndex(index));
+      }
+      int newIndex = getScanOrderIndex(indexInTVList);
+      if (newIndex > index) {
+        index = newIndex;
+      }
+
+      probeNext = false;
+    }
+
     protected void prepareNext() {
       if (scanOrder.isAscending()) {
         // For ASC traversal, we first find a valid index and then handle 
duplicate timestamps.
@@ -808,10 +962,19 @@ public abstract class TVList implements WALEntryValue {
     @Override
     public TsBlock nextBatch() {
       TSDataType dataType = getDataType();
-      TsBlockBuilder builder = new 
TsBlockBuilder(Collections.singletonList(dataType));
+      int maxRowCountOfCurrentBatch =
+          Math.min(
+              paginationController.hasLimit()
+                  ? (int) paginationController.getCurLimit()
+                  : Integer.MAX_VALUE,
+              Math.min(maxNumberOfPointsInPage, rows - index));
+      TsBlockBuilder builder =
+          new TsBlockBuilder(maxRowCountOfCurrentBatch, 
Collections.singletonList(dataType));
       switch (dataType) {
         case BOOLEAN:
-          while (index < rows && builder.getPositionCount() < 
maxNumberOfPointsInPage) {
+          while (index < rows
+              && builder.getPositionCount() < maxNumberOfPointsInPage
+              && paginationController.hasCurLimit()) {
             long time = getTime(getScanOrderIndex(index));
             if (isCurrentTimeExceedTimeRange(time)) {
               break;
@@ -822,6 +985,12 @@ public abstract class TVList implements WALEntryValue {
                 && isTimeSatisfied(time)) {
               boolean aBoolean = getBoolean(getScanOrderIndex(index));
               if (pushDownFilter == null || 
pushDownFilter.satisfyBoolean(time, aBoolean)) {
+                if (paginationController.hasCurOffset()) {
+                  paginationController.consumeOffset();
+                  index++;
+                  continue;
+                }
+                paginationController.consumeLimit();
                 builder.getTimeColumnBuilder().writeLong(time);
                 builder.getColumnBuilder(0).writeBoolean(aBoolean);
                 builder.declarePosition();
@@ -832,7 +1001,9 @@ public abstract class TVList implements WALEntryValue {
           break;
         case INT32:
         case DATE:
-          while (index < rows && builder.getPositionCount() < 
maxNumberOfPointsInPage) {
+          while (index < rows
+              && builder.getPositionCount() < maxNumberOfPointsInPage
+              && paginationController.hasCurLimit()) {
             long time = getTime(getScanOrderIndex(index));
             if (isCurrentTimeExceedTimeRange(time)) {
               break;
@@ -843,6 +1014,12 @@ public abstract class TVList implements WALEntryValue {
                 && isTimeSatisfied(time)) {
               int anInt = getInt(getScanOrderIndex(index));
               if (pushDownFilter == null || 
pushDownFilter.satisfyInteger(time, anInt)) {
+                if (paginationController.hasCurOffset()) {
+                  paginationController.consumeOffset();
+                  index++;
+                  continue;
+                }
+                paginationController.consumeLimit();
                 builder.getTimeColumnBuilder().writeLong(time);
                 builder.getColumnBuilder(0).writeInt(anInt);
                 builder.declarePosition();
@@ -853,7 +1030,9 @@ public abstract class TVList implements WALEntryValue {
           break;
         case INT64:
         case TIMESTAMP:
-          while (index < rows && builder.getPositionCount() < 
maxNumberOfPointsInPage) {
+          while (index < rows
+              && builder.getPositionCount() < maxNumberOfPointsInPage
+              && paginationController.hasCurLimit()) {
             long time = getTime(getScanOrderIndex(index));
             if (isCurrentTimeExceedTimeRange(time)) {
               break;
@@ -864,6 +1043,12 @@ public abstract class TVList implements WALEntryValue {
                 && isTimeSatisfied(time)) {
               long aLong = getLong(getScanOrderIndex(index));
               if (pushDownFilter == null || pushDownFilter.satisfyLong(time, 
aLong)) {
+                if (paginationController.hasCurOffset()) {
+                  paginationController.consumeOffset();
+                  index++;
+                  continue;
+                }
+                paginationController.consumeLimit();
                 builder.getTimeColumnBuilder().writeLong(time);
                 builder.getColumnBuilder(0).writeLong(aLong);
                 builder.declarePosition();
@@ -873,7 +1058,9 @@ public abstract class TVList implements WALEntryValue {
           }
           break;
         case FLOAT:
-          while (index < rows && builder.getPositionCount() < 
maxNumberOfPointsInPage) {
+          while (index < rows
+              && builder.getPositionCount() < maxNumberOfPointsInPage
+              && paginationController.hasCurLimit()) {
             long time = getTime(getScanOrderIndex(index));
             if (isCurrentTimeExceedTimeRange(time)) {
               break;
@@ -886,6 +1073,12 @@ public abstract class TVList implements WALEntryValue {
                   roundValueWithGivenPrecision(
                       getFloat(getScanOrderIndex(index)), floatPrecision, 
encoding);
               if (pushDownFilter == null || pushDownFilter.satisfyFloat(time, 
aFloat)) {
+                if (paginationController.hasCurOffset()) {
+                  paginationController.consumeOffset();
+                  index++;
+                  continue;
+                }
+                paginationController.consumeLimit();
                 builder.getTimeColumnBuilder().writeLong(time);
                 builder.getColumnBuilder(0).writeFloat(aFloat);
                 builder.declarePosition();
@@ -895,7 +1088,9 @@ public abstract class TVList implements WALEntryValue {
           }
           break;
         case DOUBLE:
-          while (index < rows && builder.getPositionCount() < 
maxNumberOfPointsInPage) {
+          while (index < rows
+              && builder.getPositionCount() < maxNumberOfPointsInPage
+              && paginationController.hasCurLimit()) {
             long time = getTime(getScanOrderIndex(index));
             if (isCurrentTimeExceedTimeRange(time)) {
               break;
@@ -908,6 +1103,12 @@ public abstract class TVList implements WALEntryValue {
                   roundValueWithGivenPrecision(
                       getDouble(getScanOrderIndex(index)), floatPrecision, 
encoding);
               if (pushDownFilter == null || pushDownFilter.satisfyDouble(time, 
aDouble)) {
+                if (paginationController.hasCurOffset()) {
+                  paginationController.consumeOffset();
+                  index++;
+                  continue;
+                }
+                paginationController.consumeLimit();
                 builder.getTimeColumnBuilder().writeLong(time);
                 builder.getColumnBuilder(0).writeDouble(aDouble);
                 builder.declarePosition();
@@ -919,7 +1120,9 @@ public abstract class TVList implements WALEntryValue {
         case TEXT:
         case BLOB:
         case STRING:
-          while (index < rows && builder.getPositionCount() < 
maxNumberOfPointsInPage) {
+          while (index < rows
+              && builder.getPositionCount() < maxNumberOfPointsInPage
+              && paginationController.hasCurLimit()) {
             long time = getTime(getScanOrderIndex(index));
             if (isCurrentTimeExceedTimeRange(time)) {
               break;
@@ -930,6 +1133,12 @@ public abstract class TVList implements WALEntryValue {
                 && isTimeSatisfied(time)) {
               Binary binary = getBinary(getScanOrderIndex(index));
               if (pushDownFilter == null || pushDownFilter.satisfyBinary(time, 
binary)) {
+                if (paginationController.hasCurOffset()) {
+                  paginationController.consumeOffset();
+                  index++;
+                  continue;
+                }
+                paginationController.consumeLimit();
                 builder.getTimeColumnBuilder().writeLong(time);
                 builder.getColumnBuilder(0).writeBinary(binary);
                 builder.declarePosition();
@@ -942,9 +1151,9 @@ public abstract class TVList implements WALEntryValue {
           throw new UnSupportedDataTypeException(
               String.format("Data type %s is not supported.", dataType));
       }
-      // There is no need to process pushDownFilter here because it has been 
applied when
-      // constructing the tsBlock
-      TsBlock tsBlock = paginationController.applyTsBlock(builder.build());
+      // There is no need to process pushDownFilter and paginationController 
here because it has
+      // been applied when constructing the tsBlock
+      TsBlock tsBlock = builder.build();
       addTsBlock(tsBlock);
       return tsBlock;
     }
diff --git 
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/memtable/AlignedTVListIteratorTest.java
 
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/memtable/AlignedTVListIteratorTest.java
index 758e0869f2c..442f69109e2 100644
--- 
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/memtable/AlignedTVListIteratorTest.java
+++ 
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/memtable/AlignedTVListIteratorTest.java
@@ -20,6 +20,7 @@
 package org.apache.iotdb.db.storageengine.dataregion.memtable;
 
 import org.apache.iotdb.commons.concurrent.IoTDBThreadPoolFactory;
+import org.apache.iotdb.db.exception.query.QueryProcessException;
 import org.apache.iotdb.db.queryengine.common.FragmentInstanceId;
 import org.apache.iotdb.db.queryengine.common.PlanFragmentId;
 import 
org.apache.iotdb.db.queryengine.execution.fragment.FragmentInstanceContext;
@@ -159,6 +160,27 @@ public class AlignedTVListIteratorTest {
             Collections.emptyList()),
         false,
         10);
+    tvListMap = buildAlignedSingleTvListMap(Collections.singletonList(new 
TimeRange(1, 10)));
+    testAligned(
+        tvListMap,
+        Ordering.ASC,
+        new TimeFilterOperators.TimeBetweenAnd(1L, 10L),
+        new LongFilterOperators.ValueBetweenAnd(0, 1, 10),
+        new PaginationController(10, 1),
+        Collections.singletonList(new TimeRange(4, 4)),
+        Arrays.asList(Collections.emptyList(), Collections.emptyList(), 
Collections.emptyList()),
+        true,
+        8);
+    testAligned(
+        tvListMap,
+        Ordering.DESC,
+        new TimeFilterOperators.TimeBetweenAnd(1L, 10L),
+        new LongFilterOperators.ValueBetweenAnd(0, 1, 10),
+        new PaginationController(10, 1),
+        Collections.singletonList(new TimeRange(4, 4)),
+        Arrays.asList(Collections.emptyList(), Collections.emptyList(), 
Collections.emptyList()),
+        true,
+        8);
   }
 
   @Test
@@ -806,4 +828,92 @@ public class AlignedTVListIteratorTest {
         : new PaginationController(
             paginationController.getCurLimit(), 
paginationController.getCurOffset());
   }
+
+  @Test
+  public void testSkipTimeRange() throws QueryProcessException, IOException {
+    List<Map<TVList, Integer>> list =
+        Arrays.asList(
+            buildAlignedSingleTvListMap(
+                Arrays.asList(new TimeRange(10, 20), new TimeRange(22, 40))),
+            buildAlignedMultiTvListMap(Arrays.asList(new TimeRange(10, 20), 
new TimeRange(22, 40))),
+            buildAlignedMultiTvListMap(
+                Arrays.asList(
+                    new TimeRange(10, 20),
+                    new TimeRange(10, 20),
+                    new TimeRange(24, 30),
+                    new TimeRange(22, 40))));
+    for (Map<TVList, Integer> tvListMap : list) {
+      testSkipTimeRange(
+          tvListMap,
+          Ordering.ASC,
+          Arrays.asList(new TimeRange(11, 13), new TimeRange(21, 21), new 
TimeRange(33, 34)),
+          Arrays.asList(new TimeRange(11, 13), new TimeRange(33, 34)));
+      testSkipTimeRange(
+          tvListMap,
+          Ordering.DESC,
+          Arrays.asList(new TimeRange(33, 34), new TimeRange(21, 21), new 
TimeRange(11, 13)),
+          Arrays.asList(new TimeRange(33, 34), new TimeRange(11, 13)));
+    }
+  }
+
+  private void testSkipTimeRange(
+      Map<TVList, Integer> tvListMap,
+      Ordering scanOrder,
+      List<TimeRange> statisticsTimeRanges,
+      List<TimeRange> expectedResultTimeRange)
+      throws QueryProcessException, IOException {
+    List<Integer> columnIdxList = Arrays.asList(0, 1, 2);
+    IMeasurementSchema measurementSchema = getMeasurementSchema();
+    AlignedReadOnlyMemChunk chunk =
+        new AlignedReadOnlyMemChunk(
+            fragmentInstanceContext,
+            columnIdxList,
+            measurementSchema,
+            tvListMap,
+            Collections.emptyList(),
+            Arrays.asList(
+                Collections.emptyList(), Collections.emptyList(), 
Collections.emptyList()));
+    chunk.sortTvLists();
+    chunk.initChunkMetaFromTVListsWithFakeStatistics();
+    MemPointIterator memPointIterator = 
chunk.createMemPointIterator(scanOrder, null);
+    List<Long> expectedTimestamps = new ArrayList<>();
+    for (TimeRange timeRange : expectedResultTimeRange) {
+      if (scanOrder == Ordering.ASC) {
+        for (long i = timeRange.getMin(); i <= timeRange.getMax(); i++) {
+          expectedTimestamps.add(i);
+        }
+      } else {
+        for (long i = timeRange.getMax(); i >= timeRange.getMin(); i--) {
+          expectedTimestamps.add(i);
+        }
+      }
+    }
+    List<Long> resultTimestamps = new ArrayList<>(expectedTimestamps.size());
+    for (TimeRange timeRange : statisticsTimeRanges) {
+      memPointIterator.setCurrentPageTimeRange(timeRange);
+      while (memPointIterator.hasNextBatch()) {
+        TsBlock tsBlock = memPointIterator.nextBatch();
+        for (int i = 0; i < tsBlock.getPositionCount(); i++) {
+          long currentTimestamp = tsBlock.getTimeByIndex(i);
+          long value = tsBlock.getColumn(0).getLong(i);
+          Assert.assertEquals(currentTimestamp, value);
+          resultTimestamps.add(currentTimestamp);
+        }
+      }
+    }
+    Assert.assertEquals(expectedTimestamps, resultTimestamps);
+
+    memPointIterator = chunk.createMemPointIterator(scanOrder, null);
+
+    resultTimestamps.clear();
+    for (TimeRange timeRange : statisticsTimeRanges) {
+      memPointIterator.setCurrentPageTimeRange(timeRange);
+      while (memPointIterator.hasNextTimeValuePair()) {
+        TimeValuePair timeValuePair = memPointIterator.nextTimeValuePair();
+        Assert.assertEquals(timeValuePair.getTimestamp(), 
timeValuePair.getValues()[0]);
+        resultTimestamps.add(timeValuePair.getTimestamp());
+      }
+    }
+    Assert.assertEquals(expectedTimestamps, resultTimestamps);
+  }
 }
diff --git 
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/memtable/NonAlignedTVListIteratorTest.java
 
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/memtable/NonAlignedTVListIteratorTest.java
index c22dcbb6b0f..b28979efd67 100644
--- 
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/memtable/NonAlignedTVListIteratorTest.java
+++ 
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/memtable/NonAlignedTVListIteratorTest.java
@@ -630,4 +630,91 @@ public class NonAlignedTVListIteratorTest {
     }
     return tvListMap;
   }
+
+  @Test
+  public void testSkipTimeRange() throws QueryProcessException, IOException {
+    List<Map<TVList, Integer>> list =
+        Arrays.asList(
+            buildNonAlignedSingleTvListMap(
+                Arrays.asList(new TimeRange(10, 20), new TimeRange(22, 40))),
+            buildNonAlignedMultiTvListMap(
+                Arrays.asList(new TimeRange(10, 20), new TimeRange(22, 40))),
+            buildNonAlignedMultiTvListMap(
+                Arrays.asList(
+                    new TimeRange(10, 20),
+                    new TimeRange(10, 20),
+                    new TimeRange(24, 30),
+                    new TimeRange(22, 40))));
+    for (Map<TVList, Integer> tvListMap : list) {
+      testSkipTimeRange(
+          tvListMap,
+          Ordering.ASC,
+          Arrays.asList(new TimeRange(11, 13), new TimeRange(21, 21), new 
TimeRange(33, 34)),
+          Arrays.asList(new TimeRange(11, 13), new TimeRange(33, 34)));
+      testSkipTimeRange(
+          tvListMap,
+          Ordering.DESC,
+          Arrays.asList(new TimeRange(33, 34), new TimeRange(21, 21), new 
TimeRange(11, 13)),
+          Arrays.asList(new TimeRange(33, 34), new TimeRange(11, 13)));
+    }
+  }
+
+  private void testSkipTimeRange(
+      Map<TVList, Integer> tvListMap,
+      Ordering scanOrder,
+      List<TimeRange> statisticsTimeRanges,
+      List<TimeRange> expectedResultTimeRange)
+      throws QueryProcessException, IOException {
+    ReadOnlyMemChunk chunk =
+        new ReadOnlyMemChunk(
+            fragmentInstanceContext,
+            "s1",
+            TSDataType.INT64,
+            TSEncoding.PLAIN,
+            tvListMap,
+            null,
+            Collections.emptyList());
+    chunk.sortTvLists();
+    chunk.initChunkMetaFromTVListsWithFakeStatistics();
+    MemPointIterator memPointIterator = 
chunk.createMemPointIterator(scanOrder, null);
+    List<Long> expectedTimestamps = new ArrayList<>();
+    for (TimeRange timeRange : expectedResultTimeRange) {
+      if (scanOrder == Ordering.ASC) {
+        for (long i = timeRange.getMin(); i <= timeRange.getMax(); i++) {
+          expectedTimestamps.add(i);
+        }
+      } else {
+        for (long i = timeRange.getMax(); i >= timeRange.getMin(); i--) {
+          expectedTimestamps.add(i);
+        }
+      }
+    }
+    List<Long> resultTimestamps = new ArrayList<>(expectedTimestamps.size());
+    for (TimeRange timeRange : statisticsTimeRanges) {
+      memPointIterator.setCurrentPageTimeRange(timeRange);
+      while (memPointIterator.hasNextBatch()) {
+        TsBlock tsBlock = memPointIterator.nextBatch();
+        for (int i = 0; i < tsBlock.getPositionCount(); i++) {
+          long currentTimestamp = tsBlock.getTimeByIndex(i);
+          long value = tsBlock.getColumn(0).getLong(i);
+          Assert.assertEquals(currentTimestamp, value);
+          resultTimestamps.add(currentTimestamp);
+        }
+      }
+    }
+    Assert.assertEquals(expectedTimestamps, resultTimestamps);
+
+    memPointIterator = chunk.createMemPointIterator(scanOrder, null);
+
+    resultTimestamps.clear();
+    for (TimeRange timeRange : statisticsTimeRanges) {
+      memPointIterator.setCurrentPageTimeRange(timeRange);
+      while (memPointIterator.hasNextTimeValuePair()) {
+        TimeValuePair timeValuePair = memPointIterator.nextTimeValuePair();
+        Assert.assertEquals(timeValuePair.getTimestamp(), 
timeValuePair.getValue().getLong());
+        resultTimestamps.add(timeValuePair.getTimestamp());
+      }
+    }
+    Assert.assertEquals(expectedTimestamps, resultTimestamps);
+  }
 }

Reply via email to