This is an automated email from the ASF dual-hosted git repository.

shuwenwei pushed a commit to branch fixMemTableQueryBug
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit fb03dda1a10302ecc781314d05a0df1eb683c0b0
Author: shuwenwei <[email protected]>
AuthorDate: Mon Dec 29 17:53:59 2025 +0800

    try fix
---
 .../execution/operator/source/SeriesScanUtil.java  | 108 +++++++++++++++------
 1 file changed, 77 insertions(+), 31 deletions(-)

diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/SeriesScanUtil.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/SeriesScanUtil.java
index e2fbb4be4ec..5da3a9c315f 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/SeriesScanUtil.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/SeriesScanUtil.java
@@ -890,7 +890,17 @@ public class SeriesScanUtil implements Accountable {
         return true;
       }
 
-      tryToPutAllDirectlyOverlappedUnseqPageReadersIntoMergeReader();
+      // init the merge reader for current call
+      // The original process is changed to lazy loading because different mem 
page readers
+      // belonging to the same mem chunk need to be read in a streaming 
manner. Therefore, it is
+      // necessary to ensure that these mem page readers cannot coexist in the 
mergeReader at the
+      // same time.
+      // The initial endPointTime is calculated as follows:
+      // 1. If mergeReader is empty, use the endpoint of firstPageReader to 
find all overlapped
+      // unseq pages and take the end point.
+      // 2. If mergeReader is not empty, use the readStopTime of mergeReader 
to find all overlapping
+      // unseq pages and take the end point.
+      long initialEndPointTime = 
tryToPutAllDirectlyOverlappedUnseqPageReadersIntoMergeReader();
 
       while (true) {
 
@@ -898,7 +908,8 @@ public class SeriesScanUtil implements Accountable {
         if (mergeReader.hasNextTimeValuePair()) {
 
           TsBlockBuilder builder = new TsBlockBuilder(getTsDataTypeList());
-          long currentPageEndPointTime = mergeReader.getCurrentReadStopTime();
+          long currentPageEndPointTime =
+              Math.max(mergeReader.getCurrentReadStopTime(), 
initialEndPointTime);
           while (mergeReader.hasNextTimeValuePair()) {
 
             /*
@@ -928,7 +939,7 @@ public class SeriesScanUtil implements Accountable {
             unpackAllOverlappedTimeSeriesMetadataToCachedChunkMetadata(
                 timeValuePair.getTimestamp(), false);
             
unpackAllOverlappedChunkMetadataToPageReaders(timeValuePair.getTimestamp(), 
false);
-            
unpackAllOverlappedUnseqPageReadersToMergeReader(timeValuePair.getTimestamp());
+            unpackAllOverlappedUnseqPageReadersToMergeReader();
 
             // update if there are unpacked unSeqPageReaders
             timeValuePair = mergeReader.currentTimeValuePair();
@@ -1017,33 +1028,59 @@ public class SeriesScanUtil implements Accountable {
     }
   }
 
-  private void tryToPutAllDirectlyOverlappedUnseqPageReadersIntoMergeReader() 
throws IOException {
+  private long tryToPutAllDirectlyOverlappedUnseqPageReadersIntoMergeReader() 
throws IOException {
+    do {
+      /*
+       * no cached page readers
+       */
+      if (firstPageReader == null && unSeqPageReaders.isEmpty() && 
seqPageReaders.isEmpty()) {
+        return mergeReader.getCurrentReadStopTime();
+      }
 
-    /*
-     * no cached page readers
-     */
-    if (firstPageReader == null && unSeqPageReaders.isEmpty() && 
seqPageReaders.isEmpty()) {
-      return;
-    }
+      /*
+       * init firstPageReader
+       */
+      if (firstPageReader == null) {
+        initFirstPageReader();
+      }
+      putPageReaderToMergeReader(firstPageReader);
+      firstPageReader = null;
+    } while (!mergeReader.hasNextTimeValuePair());
 
     /*
-     * init firstPageReader
+     * put all currently directly overlapped unseq page reader to merge reader
      */
-    if (firstPageReader == null) {
-      initFirstPageReader();
-    }
+    long mergeReaderStopTime = mergeReader.getCurrentReadStopTime();
+    unpackAllOverlappedUnseqPageReadersToMergeReader();
 
-    long currentPageEndpointTime;
-    if (mergeReader.hasNextTimeValuePair()) {
-      currentPageEndpointTime = mergeReader.getCurrentReadStopTime();
-    } else {
-      currentPageEndpointTime = 
orderUtils.getOverlapCheckTime(firstPageReader.getStatistics());
-    }
+    return calculateInitialEndPointTime(mergeReaderStopTime);
+  }
 
-    /*
-     * put all currently directly overlapped unseq page reader to merge reader
-     */
-    unpackAllOverlappedUnseqPageReadersToMergeReader(currentPageEndpointTime);
+  private long calculateInitialEndPointTime(long currentReadStopTime) {
+    if (firstPageReader != null
+        && !firstPageReader.isSeq()
+        && orderUtils.isOverlapped(currentReadStopTime, 
firstPageReader.getStatistics())) {
+      if (orderUtils.getAscending()) {
+        currentReadStopTime =
+            Math.max(currentReadStopTime, 
firstPageReader.getStatistics().getEndTime());
+      } else {
+        currentReadStopTime =
+            Math.min(currentReadStopTime, 
firstPageReader.getStatistics().getEndTime());
+      }
+    }
+    for (IVersionPageReader unSeqPageReader : unSeqPageReaders) {
+      if (orderUtils.isOverlapped(currentReadStopTime, 
unSeqPageReader.getStatistics())) {
+        if (orderUtils.getAscending()) {
+          currentReadStopTime =
+              Math.max(currentReadStopTime, 
firstPageReader.getStatistics().getEndTime());
+        } else {
+          currentReadStopTime =
+              Math.min(currentReadStopTime, 
firstPageReader.getStatistics().getEndTime());
+        }
+      }
+      break;
+    }
+    return currentReadStopTime;
   }
 
   private void addTimeValuePairToResult(TimeValuePair timeValuePair, 
TsBlockBuilder builder) {
@@ -1135,17 +1172,26 @@ public class SeriesScanUtil implements Accountable {
     return firstPageReader;
   }
 
-  private void unpackAllOverlappedUnseqPageReadersToMergeReader(long 
endpointTime)
-      throws IOException {
-    while (!unSeqPageReaders.isEmpty()
-        && orderUtils.isOverlapped(endpointTime, 
unSeqPageReaders.peek().getStatistics())) {
-      putPageReaderToMergeReader(unSeqPageReaders.poll());
-    }
+  // This process loads overlapped unseq pages based on the current time value 
pair of the
+  // mergeReader. The current time value pair of the mergeReader is 
recalculated each time an unseq
+  // page is added.
+  // The current time obtained from mergeReader each time is not necessarily 
the minimum among all
+  // the actual unseq data, so it is necessary to repeatedly calculate and 
include potentially
+  // overlapping unseq pages.
+  private void unpackAllOverlappedUnseqPageReadersToMergeReader() throws 
IOException {
+    long actualFirstTimeOfMergeReader = 
mergeReader.currentTimeValuePair().getTimestamp();
     if (firstPageReader != null
         && !firstPageReader.isSeq()
-        && orderUtils.isOverlapped(endpointTime, 
firstPageReader.getStatistics())) {
+        && orderUtils.isOverlapped(actualFirstTimeOfMergeReader, 
firstPageReader.getStatistics())) {
       putPageReaderToMergeReader(firstPageReader);
       firstPageReader = null;
+      actualFirstTimeOfMergeReader = 
mergeReader.currentTimeValuePair().getTimestamp();
+    }
+    while (!unSeqPageReaders.isEmpty()
+        && orderUtils.isOverlapped(
+            actualFirstTimeOfMergeReader, 
unSeqPageReaders.peek().getStatistics())) {
+      putPageReaderToMergeReader(unSeqPageReaders.poll());
+      actualFirstTimeOfMergeReader = 
mergeReader.currentTimeValuePair().getTimestamp();
     }
   }
 

Reply via email to