This is an automated email from the ASF dual-hosted git repository.

jackietien pushed a commit to branch ty/object_type
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit bdea04155b9e06dfeb2396bc7e82b20b35593580
Author: shuwenwei <[email protected]>
AuthorDate: Tue Dec 9 19:10:23 2025 +0800

    Optimize memtable region scan (#16883)
    
    (cherry picked from commit a899c48a09c94de1c6bfd7730d013b5d12b1a0b9)
---
 .../db/storageengine/dataregion/DataRegion.java    |   6 +-
 .../dataregion/memtable/AbstractMemTable.java      |  86 +++--
 .../memtable/AlignedWritableMemChunk.java          | 132 ++++++--
 .../dataregion/memtable/IMemTable.java             |   6 +-
 .../dataregion/memtable/TsFileProcessor.java       |  18 +-
 .../dataregion/memtable/WritableMemChunk.java      |  52 +--
 .../memtable/WritableMemChunkRegionScanTest.java   | 359 +++++++++++++++++++++
 7 files changed, 572 insertions(+), 87 deletions(-)

diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/DataRegion.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/DataRegion.java
index 1124e33a7df..4db15d48bc6 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/DataRegion.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/DataRegion.java
@@ -2476,7 +2476,8 @@ public class DataRegion implements IDataRegionForQuery {
       } else {
         tsFileResource
             .getProcessor()
-            .queryForSeriesRegionScanWithoutLock(partialPaths, context, 
fileScanHandles);
+            .queryForSeriesRegionScanWithoutLock(
+                partialPaths, context, fileScanHandles, globalTimeFilter);
       }
     }
     return fileScanHandles;
@@ -2553,7 +2554,8 @@ public class DataRegion implements IDataRegionForQuery {
       } else {
         tsFileResource
             .getProcessor()
-            .queryForDeviceRegionScanWithoutLock(devicePathsToContext, 
context, fileScanHandles);
+            .queryForDeviceRegionScanWithoutLock(
+                devicePathsToContext, context, fileScanHandles, 
globalTimeFilter);
       }
     }
     return fileScanHandles;
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/AbstractMemTable.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/AbstractMemTable.java
index 92803984cee..22606f232a8 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/AbstractMemTable.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/AbstractMemTable.java
@@ -70,6 +70,7 @@ import java.util.List;
 import java.util.Map;
 import java.util.Map.Entry;
 import java.util.Objects;
+import java.util.Optional;
 import java.util.concurrent.atomic.AtomicLong;
 import java.util.stream.Collectors;
 
@@ -488,7 +489,8 @@ public abstract class AbstractMemTable implements IMemTable 
{
       long ttlLowerBound,
       Map<String, List<IChunkMetadata>> chunkMetaDataMap,
       Map<String, List<IChunkHandle>> memChunkHandleMap,
-      List<Pair<ModEntry, IMemTable>> modsToMemTabled) {
+      List<Pair<ModEntry, IMemTable>> modsToMemTabled,
+      Filter globalTimeFilter) {
 
     IDeviceID deviceID = fullPath.getDeviceId();
     if (fullPath instanceof NonAlignedFullPath) {
@@ -506,7 +508,12 @@ public abstract class AbstractMemTable implements 
IMemTable {
                 fullPath.getDeviceId(), measurementId, this, modsToMemTabled, 
ttlLowerBound);
       }
       getMemChunkHandleFromMemTable(
-          deviceID, measurementId, chunkMetaDataMap, memChunkHandleMap, 
deletionList);
+          deviceID,
+          measurementId,
+          chunkMetaDataMap,
+          memChunkHandleMap,
+          deletionList,
+          globalTimeFilter);
     } else {
       // check If MemTable Contains this path
       if (!memTableMap.containsKey(deviceID)) {
@@ -528,7 +535,8 @@ public abstract class AbstractMemTable implements IMemTable 
{
           ((AlignedFullPath) fullPath).getSchemaList(),
           chunkMetaDataMap,
           memChunkHandleMap,
-          deletionList);
+          deletionList,
+          globalTimeFilter);
     }
   }
 
@@ -539,7 +547,8 @@ public abstract class AbstractMemTable implements IMemTable 
{
       long ttlLowerBound,
       Map<String, List<IChunkMetadata>> chunkMetadataMap,
       Map<String, List<IChunkHandle>> memChunkHandleMap,
-      List<Pair<ModEntry, IMemTable>> modsToMemTabled) {
+      List<Pair<ModEntry, IMemTable>> modsToMemTabled,
+      Filter globalTimeFilter) {
 
     Map<IDeviceID, IWritableMemChunkGroup> memTableMap = getMemTableMap();
 
@@ -556,7 +565,8 @@ public abstract class AbstractMemTable implements IMemTable 
{
           chunkMetadataMap,
           memChunkHandleMap,
           ttlLowerBound,
-          modsToMemTabled);
+          modsToMemTabled,
+          globalTimeFilter);
     } else {
       getMemChunkHandleFromMemTable(
           deviceID,
@@ -564,7 +574,8 @@ public abstract class AbstractMemTable implements IMemTable 
{
           chunkMetadataMap,
           memChunkHandleMap,
           ttlLowerBound,
-          modsToMemTabled);
+          modsToMemTabled,
+          globalTimeFilter);
     }
   }
 
@@ -573,24 +584,30 @@ public abstract class AbstractMemTable implements 
IMemTable {
       String measurementId,
       Map<String, List<IChunkMetadata>> chunkMetadataMap,
       Map<String, List<IChunkHandle>> memChunkHandleMap,
-      List<TimeRange> deletionList) {
+      List<TimeRange> deletionList,
+      Filter globalTimeFilter) {
 
     WritableMemChunk memChunk =
         (WritableMemChunk) 
memTableMap.get(deviceID).getMemChunkMap().get(measurementId);
 
-    long[] timestamps = memChunk.getFilteredTimestamp(deletionList);
+    if (memChunk == null) {
+      return;
+    }
+    Optional<Long> anySatisfiedTimestamp =
+        memChunk.getAnySatisfiedTimestamp(deletionList, globalTimeFilter);
+    if (!anySatisfiedTimestamp.isPresent()) {
+      return;
+    }
+    long satisfiedTimestamp = anySatisfiedTimestamp.get();
 
     chunkMetadataMap
         .computeIfAbsent(measurementId, k -> new ArrayList<>())
         .add(
-            buildChunkMetaDataForMemoryChunk(
-                measurementId,
-                timestamps[0],
-                timestamps[timestamps.length - 1],
-                Collections.emptyList()));
+            buildFakeChunkMetaDataForFakeMemoryChunk(
+                measurementId, satisfiedTimestamp, satisfiedTimestamp, 
Collections.emptyList()));
     memChunkHandleMap
         .computeIfAbsent(measurementId, k -> new ArrayList<>())
-        .add(new MemChunkHandleImpl(deviceID, measurementId, timestamps));
+        .add(new MemChunkHandleImpl(deviceID, measurementId, new long[] 
{satisfiedTimestamp}));
   }
 
   private void getMemAlignedChunkHandleFromMemTable(
@@ -598,7 +615,8 @@ public abstract class AbstractMemTable implements IMemTable 
{
       List<IMeasurementSchema> schemaList,
       Map<String, List<IChunkMetadata>> chunkMetadataList,
       Map<String, List<IChunkHandle>> memChunkHandleMap,
-      List<List<TimeRange>> deletionList) {
+      List<List<TimeRange>> deletionList,
+      Filter globalTimeFilter) {
 
     AlignedWritableMemChunk alignedMemChunk =
         ((AlignedWritableMemChunkGroup) 
memTableMap.get(deviceID)).getAlignedMemChunk();
@@ -615,7 +633,11 @@ public abstract class AbstractMemTable implements 
IMemTable {
     }
 
     List<BitMap> bitMaps = new ArrayList<>();
-    long[] timestamps = alignedMemChunk.getFilteredTimestamp(deletionList, 
bitMaps, true);
+    long[] timestamps =
+        alignedMemChunk.getAnySatisfiedTimestamp(deletionList, bitMaps, true, 
globalTimeFilter);
+    if (timestamps.length == 0) {
+      return;
+    }
 
     buildAlignedMemChunkHandle(
         deviceID,
@@ -633,7 +655,8 @@ public abstract class AbstractMemTable implements IMemTable 
{
       Map<String, List<IChunkMetadata>> chunkMetadataList,
       Map<String, List<IChunkHandle>> memChunkHandleMap,
       long ttlLowerBound,
-      List<Pair<ModEntry, IMemTable>> modsToMemTabled) {
+      List<Pair<ModEntry, IMemTable>> modsToMemTabled,
+      Filter globalTimeFilter) {
 
     AlignedWritableMemChunk memChunk = 
writableMemChunkGroup.getAlignedMemChunk();
     List<IMeasurementSchema> schemaList = memChunk.getSchemaList();
@@ -648,7 +671,11 @@ public abstract class AbstractMemTable implements 
IMemTable {
     }
 
     List<BitMap> bitMaps = new ArrayList<>();
-    long[] timestamps = memChunk.getFilteredTimestamp(deletionList, bitMaps, 
true);
+    long[] timestamps =
+        memChunk.getAnySatisfiedTimestamp(deletionList, bitMaps, true, 
globalTimeFilter);
+    if (timestamps.length == 0) {
+      return;
+    }
     buildAlignedMemChunkHandle(
         deviceID,
         timestamps,
@@ -665,7 +692,8 @@ public abstract class AbstractMemTable implements IMemTable 
{
       Map<String, List<IChunkMetadata>> chunkMetadataMap,
       Map<String, List<IChunkHandle>> memChunkHandleMap,
       long ttlLowerBound,
-      List<Pair<ModEntry, IMemTable>> modsToMemTabled) {
+      List<Pair<ModEntry, IMemTable>> modsToMemTabled,
+      Filter globalTimeFilter) {
 
     for (Entry<String, IWritableMemChunk> entry :
         writableMemChunkGroup.getMemChunkMap().entrySet()) {
@@ -679,18 +707,20 @@ public abstract class AbstractMemTable implements 
IMemTable {
             ModificationUtils.constructDeletionList(
                 deviceID, measurementId, this, modsToMemTabled, ttlLowerBound);
       }
-      long[] timestamps = writableMemChunk.getFilteredTimestamp(deletionList);
+      Optional<Long> anySatisfiedTimestamp =
+          writableMemChunk.getAnySatisfiedTimestamp(deletionList, 
globalTimeFilter);
+      if (!anySatisfiedTimestamp.isPresent()) {
+        return;
+      }
+      long satisfiedTimestamp = anySatisfiedTimestamp.get();
       chunkMetadataMap
           .computeIfAbsent(measurementId, k -> new ArrayList<>())
           .add(
-              buildChunkMetaDataForMemoryChunk(
-                  measurementId,
-                  timestamps[0],
-                  timestamps[timestamps.length - 1],
-                  Collections.emptyList()));
+              buildFakeChunkMetaDataForFakeMemoryChunk(
+                  measurementId, satisfiedTimestamp, satisfiedTimestamp, 
Collections.emptyList()));
       memChunkHandleMap
           .computeIfAbsent(measurementId, k -> new ArrayList<>())
-          .add(new MemChunkHandleImpl(deviceID, measurementId, timestamps));
+          .add(new MemChunkHandleImpl(deviceID, measurementId, new long[] 
{satisfiedTimestamp}));
     }
   }
 
@@ -714,7 +744,7 @@ public abstract class AbstractMemTable implements IMemTable 
{
       chunkMetadataList
           .computeIfAbsent(measurement, k -> new ArrayList<>())
           .add(
-              buildChunkMetaDataForMemoryChunk(
+              buildFakeChunkMetaDataForFakeMemoryChunk(
                   measurement, startEndTime[0], startEndTime[1], deletion));
       chunkHandleMap
           .computeIfAbsent(measurement, k -> new ArrayList<>())
@@ -745,7 +775,7 @@ public abstract class AbstractMemTable implements IMemTable 
{
     return new long[] {startTime, endTime};
   }
 
-  private IChunkMetadata buildChunkMetaDataForMemoryChunk(
+  private IChunkMetadata buildFakeChunkMetaDataForFakeMemoryChunk(
       String measurement, long startTime, long endTime, List<TimeRange> 
deletionList) {
     TimeStatistics timeStatistics = new TimeStatistics();
     timeStatistics.setStartTime(startTime);
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/AlignedWritableMemChunk.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/AlignedWritableMemChunk.java
index 5f6af57746f..f42867d4772 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/AlignedWritableMemChunk.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/AlignedWritableMemChunk.java
@@ -34,6 +34,7 @@ import org.apache.tsfile.encrypt.EncryptParameter;
 import org.apache.tsfile.encrypt.EncryptUtils;
 import org.apache.tsfile.enums.TSDataType;
 import org.apache.tsfile.read.common.TimeRange;
+import org.apache.tsfile.read.filter.basic.Filter;
 import org.apache.tsfile.utils.Binary;
 import org.apache.tsfile.utils.BitMap;
 import org.apache.tsfile.utils.Pair;
@@ -55,6 +56,7 @@ import java.util.Objects;
 import java.util.Set;
 import java.util.TreeMap;
 import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.atomic.AtomicInteger;
 
 import static org.apache.iotdb.db.utils.ModificationUtils.isPointDeleted;
 
@@ -287,30 +289,99 @@ public class AlignedWritableMemChunk extends 
AbstractWritableMemChunk {
     return new Pair<>(reorderedColumnValues, reorderedBitMaps);
   }
 
-  private void filterDeletedTimeStamp(
+  public long[] getAnySatisfiedTimestamp(
+      List<List<TimeRange>> deletionList,
+      List<BitMap> bitMaps,
+      boolean ignoreAllNullRows,
+      Filter globalTimeFilter) {
+    BitMap columnHasNonNullValue = new BitMap(schemaList.size());
+    AtomicInteger hasNonNullValueColumnCount = new AtomicInteger(0);
+    Map<Long, BitMap> timestampWithBitmap = new TreeMap<>();
+
+    getAnySatisfiedTimestamp(
+        list,
+        deletionList,
+        ignoreAllNullRows,
+        timestampWithBitmap,
+        globalTimeFilter,
+        columnHasNonNullValue,
+        hasNonNullValueColumnCount);
+    for (int i = 0;
+        i < sortedList.size() && hasNonNullValueColumnCount.get() < 
schemaList.size();
+        i++) {
+      if (!ignoreAllNullRows && !timestampWithBitmap.isEmpty()) {
+        // count devices in table model
+        break;
+      }
+      getAnySatisfiedTimestamp(
+          sortedList.get(i),
+          deletionList,
+          ignoreAllNullRows,
+          timestampWithBitmap,
+          globalTimeFilter,
+          columnHasNonNullValue,
+          hasNonNullValueColumnCount);
+    }
+
+    long[] timestamps = new long[timestampWithBitmap.size()];
+    int idx = 0;
+    for (Map.Entry<Long, BitMap> entry : timestampWithBitmap.entrySet()) {
+      timestamps[idx++] = entry.getKey();
+      bitMaps.add(entry.getValue());
+    }
+    return timestamps;
+  }
+
+  private void getAnySatisfiedTimestamp(
       AlignedTVList alignedTVList,
       List<List<TimeRange>> valueColumnsDeletionList,
       boolean ignoreAllNullRows,
-      Map<Long, BitMap> timestampWithBitmap) {
+      Map<Long, BitMap> timestampWithBitmap,
+      Filter globalTimeFilter,
+      BitMap columnHasNonNullValue,
+      AtomicInteger hasNonNullValueColumnCount) {
+    if (globalTimeFilter != null
+        && !globalTimeFilter.satisfyStartEndTime(
+            alignedTVList.getMinTime(), alignedTVList.getMaxTime())) {
+      return;
+    }
     BitMap allValueColDeletedMap = alignedTVList.getAllValueColDeletedMap();
-
     int rowCount = alignedTVList.rowCount();
     List<int[]> valueColumnDeleteCursor = new ArrayList<>();
     if (valueColumnsDeletionList != null) {
       valueColumnsDeletionList.forEach(x -> valueColumnDeleteCursor.add(new 
int[] {0}));
     }
 
+    // example:
+    // globalTimeFilter:null, ignoreAllNullRows: true
+    // tvList:
+    // time s1    s2    s3
+    // 1    1     null  null
+    // 2    null  1     null
+    // 2    1     1     null
+    // 3    1     null  null
+    // 4    1     null  1
+    // timestampWithBitmap:
+    // timestamp: 1 bitmap: 011
+    // timestamp: 2 bitmap: 101
+    // timestamp: 4 bitmap: 110
     for (int row = 0; row < rowCount; row++) {
       // the row is deleted
       if (allValueColDeletedMap != null && 
allValueColDeletedMap.isMarked(row)) {
         continue;
       }
       long timestamp = alignedTVList.getTime(row);
+      if (globalTimeFilter != null && !globalTimeFilter.satisfy(timestamp, 
null)) {
+        continue;
+      }
+
+      // Note that this method will only perform bitmap unmarking on the first 
occurrence of a
+      // non-null value in multiple timestamps for the same column.
+      BitMap currentRowNullValueBitmap = null;
 
-      BitMap bitMap = new BitMap(schemaList.size());
       for (int column = 0; column < schemaList.size(); column++) {
         if (alignedTVList.isNullValue(alignedTVList.getValueIndex(row), 
column)) {
-          bitMap.mark(column);
+          continue;
         }
 
         // skip deleted row
@@ -320,33 +391,44 @@ public class AlignedWritableMemChunk extends 
AbstractWritableMemChunk {
                 timestamp,
                 valueColumnsDeletionList.get(column),
                 valueColumnDeleteCursor.get(column))) {
-          bitMap.mark(column);
-        }
-
-        // skip all-null row
-        if (ignoreAllNullRows && bitMap.isAllMarked()) {
           continue;
         }
-        timestampWithBitmap.put(timestamp, bitMap);
+        if (!columnHasNonNullValue.isMarked(column)) {
+          hasNonNullValueColumnCount.incrementAndGet();
+          columnHasNonNullValue.mark(column);
+          currentRowNullValueBitmap =
+              currentRowNullValueBitmap != null
+                  ? currentRowNullValueBitmap
+                  : timestampWithBitmap.computeIfAbsent(
+                      timestamp, k -> getAllMarkedBitmap(schemaList.size()));
+          currentRowNullValueBitmap.unmark(column);
+        }
       }
-    }
-  }
 
-  public long[] getFilteredTimestamp(
-      List<List<TimeRange>> deletionList, List<BitMap> bitMaps, boolean 
ignoreAllNullRows) {
-    Map<Long, BitMap> timestampWithBitmap = new TreeMap<>();
+      if (!ignoreAllNullRows) {
+        timestampWithBitmap.put(
+            timestamp,
+            currentRowNullValueBitmap != null
+                ? currentRowNullValueBitmap
+                : getAllMarkedBitmap(schemaList.size()));
+        return;
+      }
+      if (currentRowNullValueBitmap == null) {
+        continue;
+      }
+      // found new column with non-null value
+      timestampWithBitmap.put(timestamp, currentRowNullValueBitmap);
 
-    filterDeletedTimeStamp(list, deletionList, ignoreAllNullRows, 
timestampWithBitmap);
-    for (AlignedTVList alignedTVList : sortedList) {
-      filterDeletedTimeStamp(alignedTVList, deletionList, ignoreAllNullRows, 
timestampWithBitmap);
+      if (hasNonNullValueColumnCount.get() == schemaList.size()) {
+        return;
+      }
     }
+  }
 
-    List<Long> filteredTimestamps = new ArrayList<>();
-    for (Map.Entry<Long, BitMap> entry : timestampWithBitmap.entrySet()) {
-      filteredTimestamps.add(entry.getKey());
-      bitMaps.add(entry.getValue());
-    }
-    return filteredTimestamps.stream().mapToLong(Long::valueOf).toArray();
+  private BitMap getAllMarkedBitmap(int size) {
+    BitMap bitMap = new BitMap(size);
+    bitMap.markAll();
+    return bitMap;
   }
 
   @Override
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/IMemTable.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/IMemTable.java
index 6c6e09c5782..fd9ffe90b0a 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/IMemTable.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/IMemTable.java
@@ -125,7 +125,8 @@ public interface IMemTable extends WALEntryValue {
       long ttlLowerBound,
       Map<String, List<IChunkMetadata>> chunkMetadataMap,
       Map<String, List<IChunkHandle>> memChunkHandleMap,
-      List<Pair<ModEntry, IMemTable>> modsToMemtabled)
+      List<Pair<ModEntry, IMemTable>> modsToMemtabled,
+      Filter globalTimeFilter)
       throws IOException, QueryProcessException, MetadataException;
 
   void queryForDeviceRegionScan(
@@ -134,7 +135,8 @@ public interface IMemTable extends WALEntryValue {
       long ttlLowerBound,
       Map<String, List<IChunkMetadata>> chunkMetadataMap,
       Map<String, List<IChunkHandle>> memChunkHandleMap,
-      List<Pair<ModEntry, IMemTable>> modsToMemtabled)
+      List<Pair<ModEntry, IMemTable>> modsToMemtabled,
+      Filter globalTimeFilter)
       throws IOException, QueryProcessException, MetadataException;
 
   /** putBack all the memory resources. */
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/TsFileProcessor.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/TsFileProcessor.java
index 68776c92c0b..36cbb4e0f88 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/TsFileProcessor.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/TsFileProcessor.java
@@ -1974,7 +1974,8 @@ public class TsFileProcessor {
   public void queryForSeriesRegionScanWithoutLock(
       List<IFullPath> pathList,
       QueryContext queryContext,
-      List<IFileScanHandle> fileScanHandlesForQuery) {
+      List<IFileScanHandle> fileScanHandlesForQuery,
+      Filter globalTimeFilter) {
     long startTime = System.nanoTime();
     try {
       Map<IDeviceID, Map<String, List<IChunkHandle>>> 
deviceToMemChunkHandleMap = new HashMap<>();
@@ -1995,7 +1996,8 @@ public class TsFileProcessor {
                 timeLowerBound,
                 measurementToChunkMetaList,
                 measurementToChunkHandleList,
-                modsToMemtable);
+                modsToMemtable,
+                globalTimeFilter);
           }
           if (workMemTable != null) {
             workMemTable.queryForSeriesRegionScan(
@@ -2003,7 +2005,8 @@ public class TsFileProcessor {
                 timeLowerBound,
                 measurementToChunkMetaList,
                 measurementToChunkHandleList,
-                null);
+                null,
+                globalTimeFilter);
           }
           IDeviceID deviceID = seriesPath.getDeviceId();
           // Some memTable have been flushed already, so we need to get the 
chunk metadata from
@@ -2054,7 +2057,8 @@ public class TsFileProcessor {
   public void queryForDeviceRegionScanWithoutLock(
       Map<IDeviceID, DeviceContext> devicePathsToContext,
       QueryContext queryContext,
-      List<IFileScanHandle> fileScanHandlesForQuery) {
+      List<IFileScanHandle> fileScanHandlesForQuery,
+      Filter globalTimeFilter) {
     long startTime = System.nanoTime();
     try {
       Map<IDeviceID, Map<String, List<IChunkHandle>>> 
deviceToMemChunkHandleMap = new HashMap<>();
@@ -2077,7 +2081,8 @@ public class TsFileProcessor {
                 timeLowerBound,
                 measurementToChunkMetadataList,
                 measurementToMemChunkHandleList,
-                modsToMemtable);
+                modsToMemtable,
+                globalTimeFilter);
           }
           if (workMemTable != null) {
             workMemTable.queryForDeviceRegionScan(
@@ -2086,7 +2091,8 @@ public class TsFileProcessor {
                 timeLowerBound,
                 measurementToChunkMetadataList,
                 measurementToMemChunkHandleList,
-                null);
+                null,
+                globalTimeFilter);
           }
 
           buildChunkHandleForFlushedMemTable(
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/WritableMemChunk.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/WritableMemChunk.java
index c4a871a29cc..6499d33fcd9 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/WritableMemChunk.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/WritableMemChunk.java
@@ -35,6 +35,7 @@ import org.apache.tsfile.encrypt.EncryptUtils;
 import org.apache.tsfile.enums.TSDataType;
 import org.apache.tsfile.read.TimeValuePair;
 import org.apache.tsfile.read.common.TimeRange;
+import org.apache.tsfile.read.filter.basic.Filter;
 import org.apache.tsfile.utils.Binary;
 import org.apache.tsfile.utils.BitMap;
 import org.apache.tsfile.write.UnSupportedDataTypeException;
@@ -48,10 +49,9 @@ import java.io.DataInputStream;
 import java.io.IOException;
 import java.nio.ByteBuffer;
 import java.util.ArrayList;
-import java.util.Arrays;
 import java.util.List;
+import java.util.Optional;
 import java.util.concurrent.BlockingQueue;
-import java.util.stream.Collectors;
 
 import static org.apache.iotdb.db.utils.MemUtils.getBinarySize;
 
@@ -575,11 +575,30 @@ public class WritableMemChunk extends 
AbstractWritableMemChunk {
     return sortedList;
   }
 
-  private void filterDeletedTimestamp(
-      TVList tvlist, List<TimeRange> deletionList, List<Long> timestampList) {
-    long lastTime = Long.MIN_VALUE;
+  public Optional<Long> getAnySatisfiedTimestamp(
+      List<TimeRange> deletionList, Filter globalTimeFilter) {
+    Optional<Long> anySatisfiedTimestamp =
+        getAnySatisfiedTimestamp(list, deletionList, globalTimeFilter);
+    if (anySatisfiedTimestamp.isPresent()) {
+      return anySatisfiedTimestamp;
+    }
+    for (TVList tvList : sortedList) {
+      anySatisfiedTimestamp = getAnySatisfiedTimestamp(tvList, deletionList, 
globalTimeFilter);
+      if (anySatisfiedTimestamp.isPresent()) {
+        break;
+      }
+    }
+    return anySatisfiedTimestamp;
+  }
+
+  private Optional<Long> getAnySatisfiedTimestamp(
+      TVList tvlist, List<TimeRange> deletionList, Filter globalTimeFilter) {
     int[] deletionCursor = {0};
     int rowCount = tvlist.rowCount();
+    if (globalTimeFilter != null
+        && !globalTimeFilter.satisfyStartEndTime(tvlist.getMinTime(), 
tvlist.getMaxTime())) {
+      return Optional.empty();
+    }
     for (int i = 0; i < rowCount; i++) {
       if (tvlist.getBitMap() != null && 
tvlist.isNullValue(tvlist.getValueIndex(i))) {
         continue;
@@ -589,27 +608,12 @@ public class WritableMemChunk extends 
AbstractWritableMemChunk {
           && ModificationUtils.isPointDeleted(curTime, deletionList, 
deletionCursor)) {
         continue;
       }
-
-      if (i == rowCount - 1 || curTime != lastTime) {
-        timestampList.add(curTime);
+      if (globalTimeFilter != null && !globalTimeFilter.satisfy(curTime, 
null)) {
+        continue;
       }
-      lastTime = curTime;
+      return Optional.of(curTime);
     }
-  }
-
-  public long[] getFilteredTimestamp(List<TimeRange> deletionList) {
-    List<Long> timestampList = new ArrayList<>();
-    filterDeletedTimestamp(list, deletionList, timestampList);
-    for (TVList tvList : sortedList) {
-      filterDeletedTimestamp(tvList, deletionList, timestampList);
-    }
-
-    // remove duplicated time
-    List<Long> distinctTimestamps = 
timestampList.stream().distinct().collect(Collectors.toList());
-    // sort timestamps
-    long[] filteredTimestamps = 
distinctTimestamps.stream().mapToLong(Long::longValue).toArray();
-    Arrays.sort(filteredTimestamps);
-    return filteredTimestamps;
+    return Optional.empty();
   }
 
   @Override
diff --git 
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/memtable/WritableMemChunkRegionScanTest.java
 
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/memtable/WritableMemChunkRegionScanTest.java
new file mode 100644
index 00000000000..3967409db52
--- /dev/null
+++ 
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/memtable/WritableMemChunkRegionScanTest.java
@@ -0,0 +1,359 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.storageengine.dataregion.memtable;
+
+import org.apache.iotdb.commons.path.AlignedFullPath;
+import org.apache.iotdb.commons.path.NonAlignedFullPath;
+import org.apache.iotdb.db.conf.IoTDBDescriptor;
+import org.apache.iotdb.db.storageengine.dataregion.read.filescan.IChunkHandle;
+
+import org.apache.tsfile.enums.TSDataType;
+import org.apache.tsfile.file.metadata.StringArrayDeviceID;
+import org.apache.tsfile.read.common.TimeRange;
+import org.apache.tsfile.read.filter.operator.TimeFilterOperators;
+import org.apache.tsfile.utils.BitMap;
+import org.apache.tsfile.write.schema.IMeasurementSchema;
+import org.apache.tsfile.write.schema.MeasurementSchema;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+
+@RunWith(Parameterized.class)
+public class WritableMemChunkRegionScanTest {
+
+  @Parameterized.Parameters(name = "{0}")
+  public static Collection<Object[]> data() {
+    return Arrays.asList(new Object[][] {{0}, {1000}, {10000}, {20000}});
+  }
+
+  private int defaultTvListThreshold;
+  private int tvListSortThreshold;
+
+  public WritableMemChunkRegionScanTest(int tvListSortThreshold) {
+    this.tvListSortThreshold = tvListSortThreshold;
+  }
+
+  @Before
+  public void setup() {
+    defaultTvListThreshold = 
IoTDBDescriptor.getInstance().getConfig().getTvListSortThreshold();
+    
IoTDBDescriptor.getInstance().getConfig().setTVListSortThreshold(tvListSortThreshold);
+  }
+
+  @After
+  public void tearDown() {
+    
IoTDBDescriptor.getInstance().getConfig().setTVListSortThreshold(defaultTvListThreshold);
+  }
+
+  @Test
+  public void testAlignedWritableMemChunkRegionScan() {
+    PrimitiveMemTable memTable = new PrimitiveMemTable("root.test", "0");
+    try {
+      List<IMeasurementSchema> measurementSchemas =
+          Arrays.asList(
+              new MeasurementSchema("s1", TSDataType.INT32),
+              new MeasurementSchema("s2", TSDataType.INT32),
+              new MeasurementSchema("s3", TSDataType.INT32));
+      AlignedWritableMemChunk writableMemChunk = null;
+      int size = 100000;
+      for (int i = 0; i < size; i++) {
+        if (i <= 10000) {
+          memTable.writeAlignedRow(
+              new StringArrayDeviceID("root.test.d1"),
+              measurementSchemas,
+              i,
+              new Object[] {1, null, 1});
+        } else if (i <= 20000) {
+          memTable.writeAlignedRow(
+              new StringArrayDeviceID("root.test.d1"),
+              measurementSchemas,
+              i,
+              new Object[] {null, null, 2});
+        } else if (i <= 30000) {
+          memTable.writeAlignedRow(
+              new StringArrayDeviceID("root.test.d1"),
+              measurementSchemas,
+              i,
+              new Object[] {3, null, null});
+        } else {
+          memTable.writeAlignedRow(
+              new StringArrayDeviceID("root.test.d1"),
+              measurementSchemas,
+              i,
+              new Object[] {4, 4, 4});
+        }
+      }
+      writableMemChunk =
+          (AlignedWritableMemChunk)
+              memTable.getWritableMemChunk(new 
StringArrayDeviceID("root.test.d1"), "");
+      List<BitMap> bitMaps = new ArrayList<>();
+      long[] timestamps =
+          writableMemChunk.getAnySatisfiedTimestamp(
+              Arrays.asList(
+                  Collections.emptyList(), Collections.emptyList(), 
Collections.emptyList()),
+              bitMaps,
+              true,
+              null);
+      Assert.assertEquals(2, timestamps.length);
+      Assert.assertEquals(0, timestamps[0]);
+      Assert.assertFalse(bitMaps.get(0).isMarked(0));
+      Assert.assertTrue(bitMaps.get(0).isMarked(1));
+      Assert.assertFalse(bitMaps.get(0).isMarked(2));
+      Assert.assertTrue(bitMaps.get(1).isMarked(0));
+      Assert.assertFalse(bitMaps.get(1).isMarked(1));
+      Assert.assertTrue(bitMaps.get(1).isMarked(2));
+      Assert.assertEquals(30001, timestamps[1]);
+
+      bitMaps = new ArrayList<>();
+      timestamps =
+          writableMemChunk.getAnySatisfiedTimestamp(
+              Arrays.asList(
+                  Collections.emptyList(),
+                  Collections.emptyList(),
+                  Collections.singletonList(new TimeRange(0, 12000))),
+              bitMaps,
+              true,
+              new TimeFilterOperators.TimeGt(10000000));
+      Assert.assertEquals(0, timestamps.length);
+
+      bitMaps = new ArrayList<>();
+      timestamps =
+          writableMemChunk.getAnySatisfiedTimestamp(
+              Arrays.asList(
+                  Collections.emptyList(),
+                  Collections.emptyList(),
+                  Collections.singletonList(new TimeRange(0, 12000))),
+              bitMaps,
+              true,
+              new TimeFilterOperators.TimeGt(11000));
+
+      Assert.assertEquals(3, timestamps.length);
+      Assert.assertEquals(12001, timestamps[0]);
+      Assert.assertTrue(bitMaps.get(0).isMarked(0));
+      Assert.assertTrue(bitMaps.get(0).isMarked(1));
+      Assert.assertFalse(bitMaps.get(0).isMarked(2));
+      Assert.assertEquals(20001, timestamps[1]);
+      Assert.assertFalse(bitMaps.get(1).isMarked(0));
+      Assert.assertTrue(bitMaps.get(1).isMarked(1));
+      Assert.assertTrue(bitMaps.get(1).isMarked(2));
+      Assert.assertEquals(30001, timestamps[2]);
+      Assert.assertTrue(bitMaps.get(2).isMarked(0));
+      Assert.assertFalse(bitMaps.get(2).isMarked(1));
+      Assert.assertTrue(bitMaps.get(2).isMarked(2));
+
+      writableMemChunk.writeAlignedPoints(
+          1000001, new Object[] {1, null, null}, measurementSchemas);
+      writableMemChunk.writeAlignedPoints(
+          1000002, new Object[] {null, 1, null}, measurementSchemas);
+      writableMemChunk.writeAlignedPoints(1000002, new Object[] {1, 1, null}, 
measurementSchemas);
+      writableMemChunk.writeAlignedPoints(
+          1000003, new Object[] {1, null, null}, measurementSchemas);
+      writableMemChunk.writeAlignedPoints(1000004, new Object[] {1, null, 1}, 
measurementSchemas);
+      bitMaps = new ArrayList<>();
+      timestamps =
+          writableMemChunk.getAnySatisfiedTimestamp(
+              Arrays.asList(
+                  Collections.emptyList(), Collections.emptyList(), 
Collections.emptyList()),
+              bitMaps,
+              true,
+              new TimeFilterOperators.TimeGt(1000000));
+      Assert.assertEquals(3, timestamps.length);
+      Assert.assertEquals(1000001, timestamps[0]);
+      Assert.assertFalse(bitMaps.get(0).isMarked(0));
+      Assert.assertTrue(bitMaps.get(0).isMarked(1));
+      Assert.assertTrue(bitMaps.get(0).isMarked(2));
+      Assert.assertEquals(1000002, timestamps[1]);
+      Assert.assertTrue(bitMaps.get(1).isMarked(0));
+      Assert.assertFalse(bitMaps.get(1).isMarked(1));
+      Assert.assertTrue(bitMaps.get(1).isMarked(2));
+      Assert.assertEquals(1000004, timestamps[2]);
+      Assert.assertTrue(bitMaps.get(2).isMarked(0));
+      Assert.assertTrue(bitMaps.get(2).isMarked(1));
+      Assert.assertFalse(bitMaps.get(2).isMarked(2));
+
+      Map<String, List<IChunkHandle>> chunkHandleMap = new HashMap<>();
+      memTable.queryForDeviceRegionScan(
+          new StringArrayDeviceID("root.test.d1"),
+          true,
+          Long.MIN_VALUE,
+          new HashMap<>(),
+          chunkHandleMap,
+          Collections.emptyList(),
+          new TimeFilterOperators.TimeGt(1000000));
+      Assert.assertEquals(3, chunkHandleMap.size());
+      Assert.assertArrayEquals(
+          new long[] {1000001, 1000001}, 
chunkHandleMap.get("s1").get(0).getPageStatisticsTime());
+      Assert.assertArrayEquals(
+          new long[] {1000002, 1000002}, 
chunkHandleMap.get("s2").get(0).getPageStatisticsTime());
+      Assert.assertArrayEquals(
+          new long[] {1000004, 1000004}, 
chunkHandleMap.get("s3").get(0).getPageStatisticsTime());
+
+      memTable.queryForSeriesRegionScan(
+          new AlignedFullPath(
+              new StringArrayDeviceID("root.test.d1"),
+              IMeasurementSchema.getMeasurementNameList(measurementSchemas),
+              measurementSchemas),
+          Long.MIN_VALUE,
+          new HashMap<>(),
+          chunkHandleMap,
+          Collections.emptyList(),
+          new TimeFilterOperators.TimeGt(1000000));
+      Assert.assertEquals(3, chunkHandleMap.size());
+      Assert.assertArrayEquals(
+          new long[] {1000001, 1000001}, 
chunkHandleMap.get("s1").get(0).getPageStatisticsTime());
+      Assert.assertArrayEquals(
+          new long[] {1000002, 1000002}, 
chunkHandleMap.get("s2").get(0).getPageStatisticsTime());
+      Assert.assertArrayEquals(
+          new long[] {1000004, 1000004}, 
chunkHandleMap.get("s3").get(0).getPageStatisticsTime());
+    } finally {
+      memTable.release();
+    }
+  }
+
+  @Test
+  public void testTableWritableMemChunkRegionScan() {
+    List<IMeasurementSchema> measurementSchemas =
+        Arrays.asList(
+            new MeasurementSchema("s1", TSDataType.INT32),
+            new MeasurementSchema("s2", TSDataType.INT32),
+            new MeasurementSchema("s3", TSDataType.INT32));
+    AlignedWritableMemChunk writableMemChunk =
+        new AlignedWritableMemChunk(measurementSchemas, true);
+    int size = 100000;
+    for (int i = 0; i < size; i++) {
+      if (i <= 10000) {
+        writableMemChunk.writeAlignedPoints(i, new Object[] {1, null, 1}, 
measurementSchemas);
+      } else if (i <= 20000) {
+        writableMemChunk.writeAlignedPoints(i, new Object[] {null, null, 2}, 
measurementSchemas);
+      } else if (i <= 30000) {
+        writableMemChunk.writeAlignedPoints(i, new Object[] {3, null, null}, 
measurementSchemas);
+      } else {
+        writableMemChunk.writeAlignedPoints(i, new Object[] {4, 4, 4}, 
measurementSchemas);
+      }
+    }
+    List<BitMap> bitMaps = new ArrayList<>();
+    long[] timestamps =
+        writableMemChunk.getAnySatisfiedTimestamp(
+            Arrays.asList(
+                Collections.emptyList(), Collections.emptyList(), 
Collections.emptyList()),
+            bitMaps,
+            false,
+            null);
+    Assert.assertEquals(1, timestamps.length);
+    Assert.assertEquals(0, timestamps[0]);
+    Assert.assertFalse(bitMaps.get(0).isMarked(0));
+    Assert.assertTrue(bitMaps.get(0).isMarked(1));
+    Assert.assertFalse(bitMaps.get(0).isMarked(2));
+
+    bitMaps = new ArrayList<>();
+    timestamps =
+        writableMemChunk.getAnySatisfiedTimestamp(
+            Arrays.asList(
+                Collections.emptyList(),
+                Collections.emptyList(),
+                Collections.singletonList(new TimeRange(0, 12000))),
+            bitMaps,
+            false,
+            new TimeFilterOperators.TimeGt(11000));
+
+    Assert.assertEquals(1, timestamps.length);
+    Assert.assertEquals(11001, timestamps[0]);
+    Assert.assertTrue(bitMaps.get(0).isMarked(0));
+    Assert.assertTrue(bitMaps.get(0).isMarked(1));
+    Assert.assertTrue(bitMaps.get(0).isMarked(2));
+  }
+
+  @Test
+  public void testNonAlignedWritableMemChunkRegionScan() {
+    PrimitiveMemTable memTable = new PrimitiveMemTable("root.test", "0");
+    try {
+      MeasurementSchema measurementSchema = new MeasurementSchema("s1", 
TSDataType.INT32);
+      int size = 100000;
+      for (int i = 0; i < size; i++) {
+        memTable.write(
+            new StringArrayDeviceID("root.test.d1"),
+            Collections.singletonList(measurementSchema),
+            i,
+            new Object[] {i});
+      }
+      WritableMemChunk writableMemChunk =
+          (WritableMemChunk)
+              memTable.getWritableMemChunk(new 
StringArrayDeviceID("root.test.d1"), "s1");
+      Optional<Long> timestamp = 
writableMemChunk.getAnySatisfiedTimestamp(null, null);
+      Assert.assertTrue(timestamp.isPresent());
+      Assert.assertEquals(0, timestamp.get().longValue());
+
+      timestamp =
+          writableMemChunk.getAnySatisfiedTimestamp(
+              null, new TimeFilterOperators.TimeBetweenAnd(1000L, 2000L));
+      Assert.assertTrue(timestamp.isPresent());
+      Assert.assertEquals(1000, timestamp.get().longValue());
+
+      timestamp =
+          writableMemChunk.getAnySatisfiedTimestamp(
+              Collections.singletonList(new TimeRange(1, 1500)),
+              new TimeFilterOperators.TimeBetweenAnd(1000L, 2000L));
+      Assert.assertTrue(timestamp.isPresent());
+      Assert.assertEquals(1501, timestamp.get().longValue());
+
+      timestamp =
+          writableMemChunk.getAnySatisfiedTimestamp(
+              Collections.singletonList(new TimeRange(1, 1500)),
+              new TimeFilterOperators.TimeBetweenAnd(100000L, 200000L));
+      Assert.assertFalse(timestamp.isPresent());
+
+      Map<String, List<IChunkHandle>> chunkHandleMap = new HashMap<>();
+      memTable.queryForDeviceRegionScan(
+          new StringArrayDeviceID("root.test.d1"),
+          false,
+          Long.MIN_VALUE,
+          new HashMap<>(),
+          chunkHandleMap,
+          Collections.emptyList(),
+          new TimeFilterOperators.TimeGt(1));
+      Assert.assertEquals(1, chunkHandleMap.size());
+      Assert.assertArrayEquals(
+          new long[] {2, 2}, 
chunkHandleMap.get("s1").get(0).getPageStatisticsTime());
+      memTable.queryForSeriesRegionScan(
+          new NonAlignedFullPath(new StringArrayDeviceID("root.test.d1"), 
measurementSchema),
+          Long.MIN_VALUE,
+          new HashMap<>(),
+          chunkHandleMap,
+          Collections.emptyList(),
+          new TimeFilterOperators.TimeGt(1));
+      Assert.assertEquals(1, chunkHandleMap.size());
+      Assert.assertArrayEquals(
+          new long[] {2, 2}, 
chunkHandleMap.get("s1").get(0).getPageStatisticsTime());
+    } finally {
+      memTable.release();
+    }
+  }
+}


Reply via email to