This is an automated email from the ASF dual-hosted git repository.

shuwenwei pushed a commit to branch optimizeMemTableRegionScan-1.3
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit 7641d6ea96316410f96318b4ab5659851705ee69
Author: shuwenwei <[email protected]>
AuthorDate: Tue Dec 9 19:10:23 2025 +0800

    Optimize memtable region scan (#16883)
---
 .../db/storageengine/dataregion/DataRegion.java    |   6 +-
 .../dataregion/memtable/AbstractMemTable.java      |  85 ++++--
 .../memtable/AlignedWritableMemChunk.java          | 114 ++++++--
 .../dataregion/memtable/IMemTable.java             |   6 +-
 .../dataregion/memtable/TsFileProcessor.java       |  18 +-
 .../dataregion/memtable/WritableMemChunk.java      |  52 ++--
 .../memtable/WritableMemChunkRegionScanTest.java   | 299 +++++++++++++++++++++
 7 files changed, 494 insertions(+), 86 deletions(-)

diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/DataRegion.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/DataRegion.java
index 1856a5448b3..8ce2185d9ec 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/DataRegion.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/DataRegion.java
@@ -2161,7 +2161,8 @@ public class DataRegion implements IDataRegionForQuery {
       } else {
         tsFileResource
             .getProcessor()
-            .queryForSeriesRegionScanWithoutLock(partialPaths, context, 
fileScanHandles);
+            .queryForSeriesRegionScanWithoutLock(
+                partialPaths, context, fileScanHandles, globalTimeFilter);
       }
     }
     return fileScanHandles;
@@ -2238,7 +2239,8 @@ public class DataRegion implements IDataRegionForQuery {
       } else {
         tsFileResource
             .getProcessor()
-            .queryForDeviceRegionScanWithoutLock(devicePathsToContext, 
context, fileScanHandles);
+            .queryForDeviceRegionScanWithoutLock(
+                devicePathsToContext, context, fileScanHandles, 
globalTimeFilter);
       }
     }
     return fileScanHandles;
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/AbstractMemTable.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/AbstractMemTable.java
index dcae00700ed..9d1078a471e 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/AbstractMemTable.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/AbstractMemTable.java
@@ -67,6 +67,7 @@ import java.util.List;
 import java.util.Map;
 import java.util.Map.Entry;
 import java.util.Objects;
+import java.util.Optional;
 import java.util.concurrent.atomic.AtomicLong;
 import java.util.stream.Collectors;
 
@@ -451,7 +452,8 @@ public abstract class AbstractMemTable implements IMemTable 
{
       long ttlLowerBound,
       Map<String, List<IChunkMetadata>> chunkMetaDataMap,
       Map<String, List<IChunkHandle>> memChunkHandleMap,
-      List<Pair<Modification, IMemTable>> modsToMemTabled) {
+      List<Pair<Modification, IMemTable>> modsToMemTabled,
+      Filter globalTimeFilter) {
 
     IDeviceID deviceID = 
DeviceIDFactory.getInstance().getDeviceID(fullPath.getDevicePath());
 
@@ -469,7 +471,12 @@ public abstract class AbstractMemTable implements 
IMemTable {
                 (MeasurementPath) fullPath, this, modsToMemTabled, 
ttlLowerBound);
       }
       getMemChunkHandleFromMemTable(
-          deviceID, measurementId, chunkMetaDataMap, memChunkHandleMap, 
deletionList);
+          deviceID,
+          measurementId,
+          chunkMetaDataMap,
+          memChunkHandleMap,
+          deletionList,
+          globalTimeFilter);
     } else {
       if (!memTableMap.containsKey(deviceID)) {
         return;
@@ -486,7 +493,8 @@ public abstract class AbstractMemTable implements IMemTable 
{
           ((AlignedPath) fullPath).getSchemaList(),
           chunkMetaDataMap,
           memChunkHandleMap,
-          deletionList);
+          deletionList,
+          globalTimeFilter);
     }
   }
 
@@ -497,7 +505,8 @@ public abstract class AbstractMemTable implements IMemTable 
{
       long ttlLowerBound,
       Map<String, List<IChunkMetadata>> chunkMetadataMap,
       Map<String, List<IChunkHandle>> memChunkHandleMap,
-      List<Pair<Modification, IMemTable>> modsToMemTabled)
+      List<Pair<Modification, IMemTable>> modsToMemTabled,
+      Filter globalTimeFilter)
       throws IllegalPathException {
 
     Map<IDeviceID, IWritableMemChunkGroup> memTableMap = getMemTableMap();
@@ -515,7 +524,8 @@ public abstract class AbstractMemTable implements IMemTable 
{
           chunkMetadataMap,
           memChunkHandleMap,
           ttlLowerBound,
-          modsToMemTabled);
+          modsToMemTabled,
+          globalTimeFilter);
     } else {
       getMemChunkHandleFromMemTable(
           deviceID,
@@ -523,7 +533,8 @@ public abstract class AbstractMemTable implements IMemTable 
{
           chunkMetadataMap,
           memChunkHandleMap,
           ttlLowerBound,
-          modsToMemTabled);
+          modsToMemTabled,
+          globalTimeFilter);
     }
   }
 
@@ -532,24 +543,30 @@ public abstract class AbstractMemTable implements 
IMemTable {
       String measurementId,
       Map<String, List<IChunkMetadata>> chunkMetadataMap,
       Map<String, List<IChunkHandle>> memChunkHandleMap,
-      List<TimeRange> deletionList) {
+      List<TimeRange> deletionList,
+      Filter globalTimeFilter) {
 
     WritableMemChunk memChunk =
         (WritableMemChunk) 
memTableMap.get(deviceID).getMemChunkMap().get(measurementId);
 
-    long[] timestamps = memChunk.getFilteredTimestamp(deletionList);
+    if (memChunk == null) {
+      return;
+    }
+    Optional<Long> anySatisfiedTimestamp =
+        memChunk.getAnySatisfiedTimestamp(deletionList, globalTimeFilter);
+    if (!anySatisfiedTimestamp.isPresent()) {
+      return;
+    }
+    long satisfiedTimestamp = anySatisfiedTimestamp.get();
 
     chunkMetadataMap
         .computeIfAbsent(measurementId, k -> new ArrayList<>())
         .add(
-            buildChunkMetaDataForMemoryChunk(
-                measurementId,
-                timestamps[0],
-                timestamps[timestamps.length - 1],
-                Collections.emptyList()));
+            buildFakeChunkMetaDataForFakeMemoryChunk(
+                measurementId, satisfiedTimestamp, satisfiedTimestamp, 
Collections.emptyList()));
     memChunkHandleMap
         .computeIfAbsent(measurementId, k -> new ArrayList<>())
-        .add(new MemChunkHandleImpl(deviceID, measurementId, timestamps));
+        .add(new MemChunkHandleImpl(deviceID, measurementId, new long[] 
{satisfiedTimestamp}));
   }
 
   private void getMemAlignedChunkHandleFromMemTable(
@@ -557,7 +574,8 @@ public abstract class AbstractMemTable implements IMemTable 
{
       List<IMeasurementSchema> schemaList,
       Map<String, List<IChunkMetadata>> chunkMetadataList,
       Map<String, List<IChunkHandle>> memChunkHandleMap,
-      List<List<TimeRange>> deletionList) {
+      List<List<TimeRange>> deletionList,
+      Filter globalTimeFilter) {
 
     AlignedWritableMemChunk alignedMemChunk =
         ((AlignedWritableMemChunkGroup) 
memTableMap.get(deviceID)).getAlignedMemChunk();
@@ -574,7 +592,11 @@ public abstract class AbstractMemTable implements 
IMemTable {
     }
 
     List<BitMap> bitMaps = new ArrayList<>();
-    long[] timestamps = alignedMemChunk.getFilteredTimestamp(deletionList, 
bitMaps);
+    long[] timestamps =
+        alignedMemChunk.getAnySatisfiedTimestamp(deletionList, bitMaps, 
globalTimeFilter);
+    if (timestamps.length == 0) {
+      return;
+    }
 
     buildAlignedMemChunkHandle(
         deviceID,
@@ -592,7 +614,8 @@ public abstract class AbstractMemTable implements IMemTable 
{
       Map<String, List<IChunkMetadata>> chunkMetadataList,
       Map<String, List<IChunkHandle>> memChunkHandleMap,
       long ttlLowerBound,
-      List<Pair<Modification, IMemTable>> modsToMemTabled)
+      List<Pair<Modification, IMemTable>> modsToMemTabled,
+      Filter globalTimeFilter)
       throws IllegalPathException {
 
     AlignedWritableMemChunk memChunk = 
writableMemChunkGroup.getAlignedMemChunk();
@@ -611,7 +634,10 @@ public abstract class AbstractMemTable implements 
IMemTable {
     }
 
     List<BitMap> bitMaps = new ArrayList<>();
-    long[] timestamps = memChunk.getFilteredTimestamp(deletionList, bitMaps);
+    long[] timestamps = memChunk.getAnySatisfiedTimestamp(deletionList, 
bitMaps, globalTimeFilter);
+    if (timestamps.length == 0) {
+      return;
+    }
     buildAlignedMemChunkHandle(
         deviceID,
         timestamps,
@@ -628,7 +654,8 @@ public abstract class AbstractMemTable implements IMemTable 
{
       Map<String, List<IChunkMetadata>> chunkMetadataMap,
       Map<String, List<IChunkHandle>> memChunkHandleMap,
       long ttlLowerBound,
-      List<Pair<Modification, IMemTable>> modsToMemTabled)
+      List<Pair<Modification, IMemTable>> modsToMemTabled,
+      Filter globalTimeFilter)
       throws IllegalPathException {
 
     for (Entry<String, IWritableMemChunk> entry :
@@ -646,18 +673,20 @@ public abstract class AbstractMemTable implements 
IMemTable {
                 modsToMemTabled,
                 ttlLowerBound);
       }
-      long[] timestamps = writableMemChunk.getFilteredTimestamp(deletionList);
+      Optional<Long> anySatisfiedTimestamp =
+          writableMemChunk.getAnySatisfiedTimestamp(deletionList, 
globalTimeFilter);
+      if (!anySatisfiedTimestamp.isPresent()) {
+        return;
+      }
+      long satisfiedTimestamp = anySatisfiedTimestamp.get();
       chunkMetadataMap
           .computeIfAbsent(measurementId, k -> new ArrayList<>())
           .add(
-              buildChunkMetaDataForMemoryChunk(
-                  measurementId,
-                  timestamps[0],
-                  timestamps[timestamps.length - 1],
-                  Collections.emptyList()));
+              buildFakeChunkMetaDataForFakeMemoryChunk(
+                  measurementId, satisfiedTimestamp, satisfiedTimestamp, 
Collections.emptyList()));
       memChunkHandleMap
           .computeIfAbsent(measurementId, k -> new ArrayList<>())
-          .add(new MemChunkHandleImpl(deviceID, measurementId, timestamps));
+          .add(new MemChunkHandleImpl(deviceID, measurementId, new long[] 
{satisfiedTimestamp}));
     }
   }
 
@@ -681,7 +710,7 @@ public abstract class AbstractMemTable implements IMemTable 
{
       chunkMetadataList
           .computeIfAbsent(measurement, k -> new ArrayList<>())
           .add(
-              buildChunkMetaDataForMemoryChunk(
+              buildFakeChunkMetaDataForFakeMemoryChunk(
                   measurement, startEndTime[0], startEndTime[1], deletion));
       chunkHandleMap
           .computeIfAbsent(measurement, k -> new ArrayList<>())
@@ -712,7 +741,7 @@ public abstract class AbstractMemTable implements IMemTable 
{
     return new long[] {startTime, endTime};
   }
 
-  private IChunkMetadata buildChunkMetaDataForMemoryChunk(
+  private IChunkMetadata buildFakeChunkMetaDataForFakeMemoryChunk(
       String measurement, long startTime, long endTime, List<TimeRange> 
deletionList) {
     TimeStatistics timeStatistics = new TimeStatistics();
     timeStatistics.setStartTime(startTime);
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/AlignedWritableMemChunk.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/AlignedWritableMemChunk.java
index bca23f4df1f..88d55e5bdf8 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/AlignedWritableMemChunk.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/AlignedWritableMemChunk.java
@@ -31,6 +31,7 @@ import org.apache.iotdb.db.utils.datastructure.TVList;
 
 import org.apache.tsfile.enums.TSDataType;
 import org.apache.tsfile.read.common.TimeRange;
+import org.apache.tsfile.read.filter.basic.Filter;
 import org.apache.tsfile.utils.Binary;
 import org.apache.tsfile.utils.BitMap;
 import org.apache.tsfile.utils.Pair;
@@ -52,6 +53,7 @@ import java.util.Objects;
 import java.util.Set;
 import java.util.TreeMap;
 import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.atomic.AtomicInteger;
 
 import static org.apache.iotdb.db.utils.ModificationUtils.isPointDeleted;
 
@@ -712,29 +714,89 @@ public class AlignedWritableMemChunk extends 
AbstractWritableMemChunk {
     return new Pair<>(reorderedColumnValues, reorderedBitMaps);
   }
 
-  private void filterDeletedTimeStamp(
+  public long[] getAnySatisfiedTimestamp(
+      List<List<TimeRange>> deletionList, List<BitMap> bitMaps, Filter 
globalTimeFilter) {
+    BitMap columnHasNonNullValue = new BitMap(schemaList.size());
+    AtomicInteger hasNonNullValueColumnCount = new AtomicInteger(0);
+    Map<Long, BitMap> timestampWithBitmap = new TreeMap<>();
+
+    getAnySatisfiedTimestamp(
+        list,
+        deletionList,
+        timestampWithBitmap,
+        globalTimeFilter,
+        columnHasNonNullValue,
+        hasNonNullValueColumnCount);
+    for (int i = 0;
+        i < sortedList.size() && hasNonNullValueColumnCount.get() < 
schemaList.size();
+        i++) {
+      getAnySatisfiedTimestamp(
+          sortedList.get(i),
+          deletionList,
+          timestampWithBitmap,
+          globalTimeFilter,
+          columnHasNonNullValue,
+          hasNonNullValueColumnCount);
+    }
+
+    long[] timestamps = new long[timestampWithBitmap.size()];
+    int idx = 0;
+    for (Map.Entry<Long, BitMap> entry : timestampWithBitmap.entrySet()) {
+      timestamps[idx++] = entry.getKey();
+      bitMaps.add(entry.getValue());
+    }
+    return timestamps;
+  }
+
+  private void getAnySatisfiedTimestamp(
       AlignedTVList alignedTVList,
       List<List<TimeRange>> valueColumnsDeletionList,
-      Map<Long, BitMap> timestampWithBitmap) {
+      Map<Long, BitMap> timestampWithBitmap,
+      Filter globalTimeFilter,
+      BitMap columnHasNonNullValue,
+      AtomicInteger hasNonNullValueColumnCount) {
+    if (globalTimeFilter != null
+        && !globalTimeFilter.satisfyStartEndTime(
+            alignedTVList.getMinTime(), alignedTVList.getMaxTime())) {
+      return;
+    }
     BitMap allValueColDeletedMap = alignedTVList.getAllValueColDeletedMap();
-
     int rowCount = alignedTVList.rowCount();
     List<int[]> valueColumnDeleteCursor = new ArrayList<>();
     if (valueColumnsDeletionList != null) {
       valueColumnsDeletionList.forEach(x -> valueColumnDeleteCursor.add(new 
int[] {0}));
     }
 
+    // example:
+    // globalTimeFilter:null, ignoreAllNullRows: true
+    // tvList:
+    // time s1    s2    s3
+    // 1    1     null  null
+    // 2    null  1     null
+    // 2    1     1     null
+    // 3    1     null  null
+    // 4    1     null  1
+    // timestampWithBitmap:
+    // timestamp: 1 bitmap: 011
+    // timestamp: 2 bitmap: 101
+    // timestamp: 4 bitmap: 110
     for (int row = 0; row < rowCount; row++) {
       // the row is deleted
       if (allValueColDeletedMap != null && 
allValueColDeletedMap.isMarked(row)) {
         continue;
       }
       long timestamp = alignedTVList.getTime(row);
+      if (globalTimeFilter != null && !globalTimeFilter.satisfy(timestamp, 
null)) {
+        continue;
+      }
+
+      // Note that this method will only perform bitmap unmarking on the first 
occurrence of a
+      // non-null value in multiple timestamps for the same column.
+      BitMap currentRowNullValueBitmap = null;
 
-      BitMap bitMap = new BitMap(schemaList.size());
       for (int column = 0; column < schemaList.size(); column++) {
         if (alignedTVList.isNullValue(alignedTVList.getValueIndex(row), 
column)) {
-          bitMap.mark(column);
+          continue;
         }
 
         // skip deleted row
@@ -744,32 +806,36 @@ public class AlignedWritableMemChunk extends 
AbstractWritableMemChunk {
                 timestamp,
                 valueColumnsDeletionList.get(column),
                 valueColumnDeleteCursor.get(column))) {
-          bitMap.mark(column);
-        }
-
-        // skip all-null row
-        if (bitMap.isAllMarked()) {
           continue;
         }
-        timestampWithBitmap.put(timestamp, bitMap);
+        if (!columnHasNonNullValue.isMarked(column)) {
+          hasNonNullValueColumnCount.incrementAndGet();
+          columnHasNonNullValue.mark(column);
+          currentRowNullValueBitmap =
+              currentRowNullValueBitmap != null
+                  ? currentRowNullValueBitmap
+                  : timestampWithBitmap.computeIfAbsent(
+                      timestamp, k -> getAllMarkedBitmap(schemaList.size()));
+          currentRowNullValueBitmap.unmark(column);
+        }
       }
-    }
-  }
 
-  public long[] getFilteredTimestamp(List<List<TimeRange>> deletionList, 
List<BitMap> bitMaps) {
-    Map<Long, BitMap> timestampWithBitmap = new TreeMap<>();
+      if (currentRowNullValueBitmap == null) {
+        continue;
+      }
+      // found new column with non-null value
+      timestampWithBitmap.put(timestamp, currentRowNullValueBitmap);
 
-    filterDeletedTimeStamp(list, deletionList, timestampWithBitmap);
-    for (AlignedTVList alignedTVList : sortedList) {
-      filterDeletedTimeStamp(alignedTVList, deletionList, timestampWithBitmap);
+      if (hasNonNullValueColumnCount.get() == schemaList.size()) {
+        return;
+      }
     }
+  }
 
-    List<Long> filteredTimestamps = new ArrayList<>();
-    for (Map.Entry<Long, BitMap> entry : timestampWithBitmap.entrySet()) {
-      filteredTimestamps.add(entry.getKey());
-      bitMaps.add(entry.getValue());
-    }
-    return filteredTimestamps.stream().mapToLong(Long::valueOf).toArray();
+  private BitMap getAllMarkedBitmap(int size) {
+    BitMap bitMap = new BitMap(size);
+    bitMap.markAll();
+    return bitMap;
   }
 
   // Choose maximum avgPointSizeOfLargestColumn among working and sorted 
AlignedTVList as
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/IMemTable.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/IMemTable.java
index 3a7e9c55092..cd1e9f14432 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/IMemTable.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/IMemTable.java
@@ -125,7 +125,8 @@ public interface IMemTable extends WALEntryValue {
       long ttlLowerBound,
       Map<String, List<IChunkMetadata>> chunkMetadataMap,
       Map<String, List<IChunkHandle>> memChunkHandleMap,
-      List<Pair<Modification, IMemTable>> modsToMemtabled)
+      List<Pair<Modification, IMemTable>> modsToMemtabled,
+      Filter globalTimeFilter)
       throws IOException, QueryProcessException, MetadataException;
 
   void queryForDeviceRegionScan(
@@ -134,7 +135,8 @@ public interface IMemTable extends WALEntryValue {
       long ttlLowerBound,
       Map<String, List<IChunkMetadata>> chunkMetadataMap,
       Map<String, List<IChunkHandle>> memChunkHandleMap,
-      List<Pair<Modification, IMemTable>> modsToMemtabled)
+      List<Pair<Modification, IMemTable>> modsToMemtabled,
+      Filter globalTimeFilter)
       throws IOException, QueryProcessException, MetadataException;
 
   /** putBack all the memory resources. */
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/TsFileProcessor.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/TsFileProcessor.java
index a508ec48a88..6bbcb427f87 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/TsFileProcessor.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/TsFileProcessor.java
@@ -1826,7 +1826,8 @@ public class TsFileProcessor {
   public void queryForSeriesRegionScanWithoutLock(
       List<PartialPath> pathList,
       QueryContext queryContext,
-      List<IFileScanHandle> fileScanHandlesForQuery) {
+      List<IFileScanHandle> fileScanHandlesForQuery,
+      Filter globalTimeFilter) {
     long startTime = System.nanoTime();
     try {
       Map<IDeviceID, Map<String, List<IChunkHandle>>> 
deviceToMemChunkHandleMap = new HashMap<>();
@@ -1846,7 +1847,8 @@ public class TsFileProcessor {
                 timeLowerBound,
                 measurementToChunkMetaList,
                 measurementToChunkHandleList,
-                modsToMemtable);
+                modsToMemtable,
+                globalTimeFilter);
           }
           if (workMemTable != null) {
             workMemTable.queryForSeriesRegionScan(
@@ -1854,7 +1856,8 @@ public class TsFileProcessor {
                 timeLowerBound,
                 measurementToChunkMetaList,
                 measurementToChunkHandleList,
-                null);
+                null,
+                globalTimeFilter);
           }
           IDeviceID deviceID = 
DeviceIDFactory.getInstance().getDeviceID(seriesPath.getDevice());
           // Some memTable have been flushed already, so we need to get the 
chunk metadata from
@@ -1905,7 +1908,8 @@ public class TsFileProcessor {
   public void queryForDeviceRegionScanWithoutLock(
       Map<IDeviceID, DeviceContext> devicePathsToContext,
       QueryContext queryContext,
-      List<IFileScanHandle> fileScanHandlesForQuery) {
+      List<IFileScanHandle> fileScanHandlesForQuery,
+      Filter globalTimeFilter) {
     long startTime = System.nanoTime();
     try {
       Map<IDeviceID, Map<String, List<IChunkHandle>>> 
deviceToMemChunkHandleMap = new HashMap<>();
@@ -1930,7 +1934,8 @@ public class TsFileProcessor {
                 timeLowerBound,
                 measurementToChunkMetadataList,
                 measurementToMemChunkHandleList,
-                modsToMemtable);
+                modsToMemtable,
+                globalTimeFilter);
           }
           if (workMemTable != null) {
             workMemTable.queryForDeviceRegionScan(
@@ -1939,7 +1944,8 @@ public class TsFileProcessor {
                 timeLowerBound,
                 measurementToChunkMetadataList,
                 measurementToMemChunkHandleList,
-                null);
+                null,
+                globalTimeFilter);
           }
 
           buildChunkHandleForFlushedMemTable(
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/WritableMemChunk.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/WritableMemChunk.java
index 01e91387612..c19ad4f5a22 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/WritableMemChunk.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/WritableMemChunk.java
@@ -30,6 +30,7 @@ import org.apache.iotdb.db.utils.datastructure.TVList;
 import org.apache.tsfile.enums.TSDataType;
 import org.apache.tsfile.read.TimeValuePair;
 import org.apache.tsfile.read.common.TimeRange;
+import org.apache.tsfile.read.filter.basic.Filter;
 import org.apache.tsfile.utils.Binary;
 import org.apache.tsfile.utils.BitMap;
 import org.apache.tsfile.write.UnSupportedDataTypeException;
@@ -43,10 +44,9 @@ import java.io.DataInputStream;
 import java.io.IOException;
 import java.nio.ByteBuffer;
 import java.util.ArrayList;
-import java.util.Arrays;
 import java.util.List;
+import java.util.Optional;
 import java.util.concurrent.BlockingQueue;
-import java.util.stream.Collectors;
 
 import static org.apache.iotdb.db.utils.MemUtils.getBinarySize;
 
@@ -554,11 +554,30 @@ public class WritableMemChunk extends 
AbstractWritableMemChunk {
     return sortedList;
   }
 
-  private void filterDeletedTimestamp(
-      TVList tvlist, List<TimeRange> deletionList, List<Long> timestampList) {
-    long lastTime = Long.MIN_VALUE;
+  public Optional<Long> getAnySatisfiedTimestamp(
+      List<TimeRange> deletionList, Filter globalTimeFilter) {
+    Optional<Long> anySatisfiedTimestamp =
+        getAnySatisfiedTimestamp(list, deletionList, globalTimeFilter);
+    if (anySatisfiedTimestamp.isPresent()) {
+      return anySatisfiedTimestamp;
+    }
+    for (TVList tvList : sortedList) {
+      anySatisfiedTimestamp = getAnySatisfiedTimestamp(tvList, deletionList, 
globalTimeFilter);
+      if (anySatisfiedTimestamp.isPresent()) {
+        break;
+      }
+    }
+    return anySatisfiedTimestamp;
+  }
+
+  private Optional<Long> getAnySatisfiedTimestamp(
+      TVList tvlist, List<TimeRange> deletionList, Filter globalTimeFilter) {
     int[] deletionCursor = {0};
     int rowCount = tvlist.rowCount();
+    if (globalTimeFilter != null
+        && !globalTimeFilter.satisfyStartEndTime(tvlist.getMinTime(), 
tvlist.getMaxTime())) {
+      return Optional.empty();
+    }
     for (int i = 0; i < rowCount; i++) {
       if (tvlist.getBitMap() != null && 
tvlist.isNullValue(tvlist.getValueIndex(i))) {
         continue;
@@ -568,26 +587,11 @@ public class WritableMemChunk extends 
AbstractWritableMemChunk {
           && ModificationUtils.isPointDeleted(curTime, deletionList, 
deletionCursor)) {
         continue;
       }
-
-      if (i == rowCount - 1 || curTime != lastTime) {
-        timestampList.add(curTime);
+      if (globalTimeFilter != null && !globalTimeFilter.satisfy(curTime, 
null)) {
+        continue;
       }
-      lastTime = curTime;
+      return Optional.of(curTime);
     }
-  }
-
-  public long[] getFilteredTimestamp(List<TimeRange> deletionList) {
-    List<Long> timestampList = new ArrayList<>();
-    filterDeletedTimestamp(list, deletionList, timestampList);
-    for (TVList tvList : sortedList) {
-      filterDeletedTimestamp(tvList, deletionList, timestampList);
-    }
-
-    // remove duplicated time
-    List<Long> distinctTimestamps = 
timestampList.stream().distinct().collect(Collectors.toList());
-    // sort timestamps
-    long[] filteredTimestamps = 
distinctTimestamps.stream().mapToLong(Long::longValue).toArray();
-    Arrays.sort(filteredTimestamps);
-    return filteredTimestamps;
+    return Optional.empty();
   }
 }
diff --git 
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/memtable/WritableMemChunkRegionScanTest.java
 
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/memtable/WritableMemChunkRegionScanTest.java
new file mode 100644
index 00000000000..63bbac18e38
--- /dev/null
+++ 
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/memtable/WritableMemChunkRegionScanTest.java
@@ -0,0 +1,299 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.storageengine.dataregion.memtable;
+
+import org.apache.iotdb.commons.exception.IllegalPathException;
+import org.apache.iotdb.commons.path.AlignedPath;
+import org.apache.iotdb.commons.path.MeasurementPath;
+import org.apache.iotdb.db.conf.IoTDBDescriptor;
+import org.apache.iotdb.db.storageengine.dataregion.read.filescan.IChunkHandle;
+
+import org.apache.tsfile.enums.TSDataType;
+import org.apache.tsfile.file.metadata.PlainDeviceID;
+import org.apache.tsfile.read.common.TimeRange;
+import org.apache.tsfile.read.filter.operator.TimeFilterOperators;
+import org.apache.tsfile.utils.BitMap;
+import org.apache.tsfile.write.schema.IMeasurementSchema;
+import org.apache.tsfile.write.schema.MeasurementSchema;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.stream.Collectors;
+
+@RunWith(Parameterized.class)
+public class WritableMemChunkRegionScanTest {
+
+  @Parameterized.Parameters(name = "{0}")
+  public static Collection<Object[]> data() {
+    return Arrays.asList(new Object[][] {{0}, {1000}, {10000}, {20000}});
+  }
+
+  private int defaultTvListThreshold;
+  private int tvListSortThreshold;
+
+  public WritableMemChunkRegionScanTest(int tvListSortThreshold) {
+    this.tvListSortThreshold = tvListSortThreshold;
+  }
+
+  @Before
+  public void setup() {
+    defaultTvListThreshold = 
IoTDBDescriptor.getInstance().getConfig().getTvListSortThreshold();
+    
IoTDBDescriptor.getInstance().getConfig().setTVListSortThreshold(tvListSortThreshold);
+  }
+
+  @After
+  public void tearDown() {
+    
IoTDBDescriptor.getInstance().getConfig().setTVListSortThreshold(defaultTvListThreshold);
+  }
+
+  @Test
+  public void testAlignedWritableMemChunkRegionScan() throws 
IllegalPathException {
+    PrimitiveMemTable memTable = new PrimitiveMemTable("root.test", "0");
+    try {
+      List<IMeasurementSchema> measurementSchemas =
+          Arrays.asList(
+              new MeasurementSchema("s1", TSDataType.INT32),
+              new MeasurementSchema("s2", TSDataType.INT32),
+              new MeasurementSchema("s3", TSDataType.INT32));
+      AlignedWritableMemChunk writableMemChunk = null;
+      int size = 100000;
+      for (int i = 0; i < size; i++) {
+        if (i <= 10000) {
+          memTable.writeAlignedRow(
+              new PlainDeviceID("root.test.d1"), measurementSchemas, i, new 
Object[] {1, null, 1});
+        } else if (i <= 20000) {
+          memTable.writeAlignedRow(
+              new PlainDeviceID("root.test.d1"),
+              measurementSchemas,
+              i,
+              new Object[] {null, null, 2});
+        } else if (i <= 30000) {
+          memTable.writeAlignedRow(
+              new PlainDeviceID("root.test.d1"),
+              measurementSchemas,
+              i,
+              new Object[] {3, null, null});
+        } else {
+          memTable.writeAlignedRow(
+              new PlainDeviceID("root.test.d1"), measurementSchemas, i, new 
Object[] {4, 4, 4});
+        }
+      }
+      writableMemChunk =
+          (AlignedWritableMemChunk)
+              memTable.getWritableMemChunk(new PlainDeviceID("root.test.d1"), 
"");
+      List<BitMap> bitMaps = new ArrayList<>();
+      long[] timestamps =
+          writableMemChunk.getAnySatisfiedTimestamp(
+              Arrays.asList(
+                  Collections.emptyList(), Collections.emptyList(), 
Collections.emptyList()),
+              bitMaps,
+              null);
+      Assert.assertEquals(2, timestamps.length);
+      Assert.assertEquals(0, timestamps[0]);
+      Assert.assertFalse(bitMaps.get(0).isMarked(0));
+      Assert.assertTrue(bitMaps.get(0).isMarked(1));
+      Assert.assertFalse(bitMaps.get(0).isMarked(2));
+      Assert.assertTrue(bitMaps.get(1).isMarked(0));
+      Assert.assertFalse(bitMaps.get(1).isMarked(1));
+      Assert.assertTrue(bitMaps.get(1).isMarked(2));
+      Assert.assertEquals(30001, timestamps[1]);
+
+      bitMaps = new ArrayList<>();
+      timestamps =
+          writableMemChunk.getAnySatisfiedTimestamp(
+              Arrays.asList(
+                  Collections.emptyList(),
+                  Collections.emptyList(),
+                  Collections.singletonList(new TimeRange(0, 12000))),
+              bitMaps,
+              new TimeFilterOperators.TimeGt(10000000));
+      Assert.assertEquals(0, timestamps.length);
+
+      bitMaps = new ArrayList<>();
+      timestamps =
+          writableMemChunk.getAnySatisfiedTimestamp(
+              Arrays.asList(
+                  Collections.emptyList(),
+                  Collections.emptyList(),
+                  Collections.singletonList(new TimeRange(0, 12000))),
+              bitMaps,
+              new TimeFilterOperators.TimeGt(11000));
+
+      Assert.assertEquals(3, timestamps.length);
+      Assert.assertEquals(12001, timestamps[0]);
+      Assert.assertTrue(bitMaps.get(0).isMarked(0));
+      Assert.assertTrue(bitMaps.get(0).isMarked(1));
+      Assert.assertFalse(bitMaps.get(0).isMarked(2));
+      Assert.assertEquals(20001, timestamps[1]);
+      Assert.assertFalse(bitMaps.get(1).isMarked(0));
+      Assert.assertTrue(bitMaps.get(1).isMarked(1));
+      Assert.assertTrue(bitMaps.get(1).isMarked(2));
+      Assert.assertEquals(30001, timestamps[2]);
+      Assert.assertTrue(bitMaps.get(2).isMarked(0));
+      Assert.assertFalse(bitMaps.get(2).isMarked(1));
+      Assert.assertTrue(bitMaps.get(2).isMarked(2));
+
+      writableMemChunk.writeAlignedPoints(
+          1000001, new Object[] {1, null, null}, measurementSchemas);
+      writableMemChunk.writeAlignedPoints(
+          1000002, new Object[] {null, 1, null}, measurementSchemas);
+      writableMemChunk.writeAlignedPoints(1000002, new Object[] {1, 1, null}, 
measurementSchemas);
+      writableMemChunk.writeAlignedPoints(
+          1000003, new Object[] {1, null, null}, measurementSchemas);
+      writableMemChunk.writeAlignedPoints(1000004, new Object[] {1, null, 1}, 
measurementSchemas);
+      bitMaps = new ArrayList<>();
+      timestamps =
+          writableMemChunk.getAnySatisfiedTimestamp(
+              Arrays.asList(
+                  Collections.emptyList(), Collections.emptyList(), 
Collections.emptyList()),
+              bitMaps,
+              new TimeFilterOperators.TimeGt(1000000));
+      Assert.assertEquals(3, timestamps.length);
+      Assert.assertEquals(1000001, timestamps[0]);
+      Assert.assertFalse(bitMaps.get(0).isMarked(0));
+      Assert.assertTrue(bitMaps.get(0).isMarked(1));
+      Assert.assertTrue(bitMaps.get(0).isMarked(2));
+      Assert.assertEquals(1000002, timestamps[1]);
+      Assert.assertTrue(bitMaps.get(1).isMarked(0));
+      Assert.assertFalse(bitMaps.get(1).isMarked(1));
+      Assert.assertTrue(bitMaps.get(1).isMarked(2));
+      Assert.assertEquals(1000004, timestamps[2]);
+      Assert.assertTrue(bitMaps.get(2).isMarked(0));
+      Assert.assertTrue(bitMaps.get(2).isMarked(1));
+      Assert.assertFalse(bitMaps.get(2).isMarked(2));
+
+      Map<String, List<IChunkHandle>> chunkHandleMap = new HashMap<>();
+      memTable.queryForDeviceRegionScan(
+          new PlainDeviceID("root.test.d1"),
+          true,
+          Long.MIN_VALUE,
+          new HashMap<>(),
+          chunkHandleMap,
+          Collections.emptyList(),
+          new TimeFilterOperators.TimeGt(1000000));
+      Assert.assertEquals(3, chunkHandleMap.size());
+      Assert.assertArrayEquals(
+          new long[] {1000001, 1000001}, 
chunkHandleMap.get("s1").get(0).getPageStatisticsTime());
+      Assert.assertArrayEquals(
+          new long[] {1000002, 1000002}, 
chunkHandleMap.get("s2").get(0).getPageStatisticsTime());
+      Assert.assertArrayEquals(
+          new long[] {1000004, 1000004}, 
chunkHandleMap.get("s3").get(0).getPageStatisticsTime());
+
+      memTable.queryForSeriesRegionScan(
+          new AlignedPath(
+              "root.test.d1",
+              measurementSchemas.stream()
+                  .map(IMeasurementSchema::getMeasurementId)
+                  .collect(Collectors.toList()),
+              measurementSchemas),
+          Long.MIN_VALUE,
+          new HashMap<>(),
+          chunkHandleMap,
+          Collections.emptyList(),
+          new TimeFilterOperators.TimeGt(1000000));
+      Assert.assertEquals(3, chunkHandleMap.size());
+      Assert.assertArrayEquals(
+          new long[] {1000001, 1000001}, 
chunkHandleMap.get("s1").get(0).getPageStatisticsTime());
+      Assert.assertArrayEquals(
+          new long[] {1000002, 1000002}, 
chunkHandleMap.get("s2").get(0).getPageStatisticsTime());
+      Assert.assertArrayEquals(
+          new long[] {1000004, 1000004}, 
chunkHandleMap.get("s3").get(0).getPageStatisticsTime());
+    } finally {
+      memTable.release();
+    }
+  }
+
+  @Test
+  public void testNonAlignedWritableMemChunkRegionScan() throws 
IllegalPathException {
+    PrimitiveMemTable memTable = new PrimitiveMemTable("root.test", "0");
+    try {
+      MeasurementSchema measurementSchema = new MeasurementSchema("s1", 
TSDataType.INT32);
+      int size = 100000;
+      for (int i = 0; i < size; i++) {
+        memTable.write(
+            new PlainDeviceID("root.test.d1"),
+            Collections.singletonList(measurementSchema),
+            i,
+            new Object[] {i});
+      }
+      WritableMemChunk writableMemChunk =
+          (WritableMemChunk) memTable.getWritableMemChunk(new 
PlainDeviceID("root.test.d1"), "s1");
+      Optional<Long> timestamp = 
writableMemChunk.getAnySatisfiedTimestamp(null, null);
+      Assert.assertTrue(timestamp.isPresent());
+      Assert.assertEquals(0, timestamp.get().longValue());
+
+      timestamp =
+          writableMemChunk.getAnySatisfiedTimestamp(
+              null, new TimeFilterOperators.TimeBetweenAnd(1000L, 2000L));
+      Assert.assertTrue(timestamp.isPresent());
+      Assert.assertEquals(1000, timestamp.get().longValue());
+
+      timestamp =
+          writableMemChunk.getAnySatisfiedTimestamp(
+              Collections.singletonList(new TimeRange(1, 1500)),
+              new TimeFilterOperators.TimeBetweenAnd(1000L, 2000L));
+      Assert.assertTrue(timestamp.isPresent());
+      Assert.assertEquals(1501, timestamp.get().longValue());
+
+      timestamp =
+          writableMemChunk.getAnySatisfiedTimestamp(
+              Collections.singletonList(new TimeRange(1, 1500)),
+              new TimeFilterOperators.TimeBetweenAnd(100000L, 200000L));
+      Assert.assertFalse(timestamp.isPresent());
+
+      Map<String, List<IChunkHandle>> chunkHandleMap = new HashMap<>();
+      memTable.queryForDeviceRegionScan(
+          new PlainDeviceID("root.test.d1"),
+          false,
+          Long.MIN_VALUE,
+          new HashMap<>(),
+          chunkHandleMap,
+          Collections.emptyList(),
+          new TimeFilterOperators.TimeGt(1));
+      Assert.assertEquals(1, chunkHandleMap.size());
+      Assert.assertArrayEquals(
+          new long[] {2, 2}, 
chunkHandleMap.get("s1").get(0).getPageStatisticsTime());
+      memTable.queryForSeriesRegionScan(
+          new MeasurementPath("root.test.d1", "s1", measurementSchema),
+          Long.MIN_VALUE,
+          new HashMap<>(),
+          chunkHandleMap,
+          Collections.emptyList(),
+          new TimeFilterOperators.TimeGt(1));
+      Assert.assertEquals(1, chunkHandleMap.size());
+      Assert.assertArrayEquals(
+          new long[] {2, 2}, 
chunkHandleMap.get("s1").get(0).getPageStatisticsTime());
+    } finally {
+      memTable.release();
+    }
+  }
+}


Reply via email to