This is an automated email from the ASF dual-hosted git repository.

jackietien pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/master by this push:
     new 9478b77da3d fix mem table query bug #16964
9478b77da3d is described below

commit 9478b77da3df8879eae99c599aba3b6387496525
Author: shuwenwei <[email protected]>
AuthorDate: Tue Dec 30 15:06:18 2025 +0800

    fix mem table query bug #16964
---
 .../org/apache/iotdb/db/it/IoTDBFlushQueryIT.java  |  82 ++++++++++++++
 .../execution/operator/source/SeriesScanUtil.java  | 122 +++++++++++++++------
 2 files changed, 173 insertions(+), 31 deletions(-)

diff --git 
a/integration-test/src/test/java/org/apache/iotdb/db/it/IoTDBFlushQueryIT.java 
b/integration-test/src/test/java/org/apache/iotdb/db/it/IoTDBFlushQueryIT.java
index 6253e618eda..b1a5a3d1562 100644
--- 
a/integration-test/src/test/java/org/apache/iotdb/db/it/IoTDBFlushQueryIT.java
+++ 
b/integration-test/src/test/java/org/apache/iotdb/db/it/IoTDBFlushQueryIT.java
@@ -19,12 +19,21 @@
 
 package org.apache.iotdb.db.it;
 
+import org.apache.iotdb.isession.ISession;
+import org.apache.iotdb.isession.SessionDataSet;
 import org.apache.iotdb.it.env.EnvFactory;
 import org.apache.iotdb.it.framework.IoTDBTestRunner;
 import org.apache.iotdb.itbase.category.ClusterIT;
 import org.apache.iotdb.itbase.category.LocalStandaloneIT;
+import org.apache.iotdb.rpc.IoTDBConnectionException;
+import org.apache.iotdb.rpc.StatementExecutionException;
 
+import org.apache.tsfile.enums.TSDataType;
+import org.apache.tsfile.write.record.Tablet;
+import org.apache.tsfile.write.schema.IMeasurementSchema;
+import org.apache.tsfile.write.schema.MeasurementSchema;
 import org.junit.AfterClass;
+import org.junit.Assert;
 import org.junit.BeforeClass;
 import org.junit.Test;
 import org.junit.experimental.categories.Category;
@@ -34,6 +43,8 @@ import java.sql.Connection;
 import java.sql.ResultSet;
 import java.sql.SQLException;
 import java.sql.Statement;
+import java.util.Collections;
+import java.util.List;
 import java.util.Locale;
 
 import static org.junit.Assert.assertEquals;
@@ -181,4 +192,75 @@ public class IoTDBFlushQueryIT {
       fail(e.getMessage());
     }
   }
+
+  @Test
+  public void testStreamingQueryMemTableWithOverlappedData()
+      throws IoTDBConnectionException, StatementExecutionException {
+    String device = "root.stream1.d1";
+    try (ISession session = EnvFactory.getEnv().getSessionConnection()) {
+      session.open();
+      generateTimeRangeWithTimestamp(session, device, 1, 10);
+
+      generateTimeRangeWithTimestamp(session, device, 500000, 510000);
+      session.executeNonQueryStatement("flush");
+      generateTimeRangeWithTimestamp(session, device, 100000, 350000);
+
+      SessionDataSet sessionDataSet =
+          session.executeQueryStatement("select count(*) from 
root.stream1.d1");
+      SessionDataSet.DataIterator iterator = sessionDataSet.iterator();
+      long count = 0;
+      while (iterator.next()) {
+        count = iterator.getLong(1);
+      }
+      Assert.assertEquals(10 + 10001 + 250001, count);
+    }
+  }
+
+  @Test
+  public void testStreamingQueryMemTableWithOverlappedData2()
+      throws IoTDBConnectionException, StatementExecutionException {
+    String device = "root.stream2.d1";
+    try (ISession session = EnvFactory.getEnv().getSessionConnection()) {
+      session.open();
+      generateTimeRangeWithTimestamp(session, device, 1, 10);
+
+      generateTimeRangeWithTimestamp(session, device, 500000, 510000);
+      session.executeNonQueryStatement("flush");
+      generateTimeRangeWithTimestamp(session, device, 1, 20);
+      generateTimeRangeWithTimestamp(session, device, 100000, 210000);
+      session.executeNonQueryStatement("flush");
+
+      generateTimeRangeWithTimestamp(session, device, 150000, 450000);
+
+      SessionDataSet sessionDataSet =
+          session.executeQueryStatement("select count(*) from 
root.stream2.d1");
+      SessionDataSet.DataIterator iterator = sessionDataSet.iterator();
+      long count = 0;
+      while (iterator.next()) {
+        count = iterator.getLong(1);
+      }
+      Assert.assertEquals(20 + 10001 + 350001, count);
+    }
+  }
+
+  private static void generateTimeRangeWithTimestamp(
+      ISession session, String device, long start, long end)
+      throws IoTDBConnectionException, StatementExecutionException {
+    List<IMeasurementSchema> measurementSchemas =
+        Collections.singletonList(new MeasurementSchema("s1", 
TSDataType.INT64));
+    Tablet tablet = new Tablet(device, measurementSchemas);
+    for (long currentTime = start; currentTime <= end; currentTime++) {
+      int rowIndex = tablet.getRowSize();
+      if (rowIndex == tablet.getMaxRowNumber()) {
+        session.insertTablet(tablet);
+        tablet.reset();
+        rowIndex = 0;
+      }
+      tablet.addTimestamp(rowIndex, currentTime);
+      tablet.addValue(rowIndex, 0, currentTime);
+    }
+    if (tablet.getRowSize() > 0) {
+      session.insertTablet(tablet);
+    }
+  }
 }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/SeriesScanUtil.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/SeriesScanUtil.java
index e2fbb4be4ec..74196196104 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/SeriesScanUtil.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/SeriesScanUtil.java
@@ -890,7 +890,17 @@ public class SeriesScanUtil implements Accountable {
         return true;
       }
 
-      tryToPutAllDirectlyOverlappedUnseqPageReadersIntoMergeReader();
+      // init the merge reader for current call
+      // The original process is changed to lazy loading because different mem 
page readers
+      // belonging to the same mem chunk need to be read in a streaming 
manner. Therefore, it is
+      // necessary to ensure that these mem page readers cannot coexist in the 
mergeReader at the
+      // same time.
+      // The initial endPointTime is calculated as follows:
+      // 1. If mergeReader is empty, use the endpoint of firstPageReader to 
find all overlapped
+      // unseq pages and take the end point.
+      // 2. If mergeReader is not empty, use the readStopTime of mergeReader 
to find all overlapping
+      // unseq pages and take the end point.
+      long initialEndPointTime = 
tryToPutAllDirectlyOverlappedUnseqPageReadersIntoMergeReader();
 
       while (true) {
 
@@ -898,7 +908,10 @@ public class SeriesScanUtil implements Accountable {
         if (mergeReader.hasNextTimeValuePair()) {
 
           TsBlockBuilder builder = new TsBlockBuilder(getTsDataTypeList());
-          long currentPageEndPointTime = mergeReader.getCurrentReadStopTime();
+          long currentPageEndPointTime =
+              orderUtils.getAscending()
+                  ? Math.max(mergeReader.getCurrentReadStopTime(), 
initialEndPointTime)
+                  : Math.min(mergeReader.getCurrentReadStopTime(), 
initialEndPointTime);
           while (mergeReader.hasNextTimeValuePair()) {
 
             /*
@@ -928,7 +941,7 @@ public class SeriesScanUtil implements Accountable {
             unpackAllOverlappedTimeSeriesMetadataToCachedChunkMetadata(
                 timeValuePair.getTimestamp(), false);
             
unpackAllOverlappedChunkMetadataToPageReaders(timeValuePair.getTimestamp(), 
false);
-            
unpackAllOverlappedUnseqPageReadersToMergeReader(timeValuePair.getTimestamp());
+            unpackAllOverlappedUnseqPageReadersToMergeReader();
 
             // update if there are unpacked unSeqPageReaders
             timeValuePair = mergeReader.currentTimeValuePair();
@@ -1017,33 +1030,71 @@ public class SeriesScanUtil implements Accountable {
     }
   }
 
-  private void tryToPutAllDirectlyOverlappedUnseqPageReadersIntoMergeReader() 
throws IOException {
+  private long tryToPutAllDirectlyOverlappedUnseqPageReadersIntoMergeReader() 
throws IOException {
+    do {
+      /*
+       * no cached page readers
+       */
+      if (firstPageReader == null && unSeqPageReaders.isEmpty() && 
seqPageReaders.isEmpty()) {
+        return mergeReader.getCurrentReadStopTime();
+      }
 
-    /*
-     * no cached page readers
-     */
-    if (firstPageReader == null && unSeqPageReaders.isEmpty() && 
seqPageReaders.isEmpty()) {
-      return;
-    }
+      /*
+       * init firstPageReader
+       */
+      if (firstPageReader == null) {
+        initFirstPageReader();
+      }
+      if (!mergeReader.hasNextTimeValuePair()) {
+        putPageReaderToMergeReader(firstPageReader);
+        firstPageReader = null;
+      }
+    } while (!mergeReader.hasNextTimeValuePair());
 
     /*
-     * init firstPageReader
+     * put all currently directly overlapped unseq page reader to merge reader
      */
-    if (firstPageReader == null) {
-      initFirstPageReader();
-    }
+    long mergeReaderStopTime = mergeReader.getCurrentReadStopTime();
+    unpackAllOverlappedUnseqPageReadersToMergeReader();
 
-    long currentPageEndpointTime;
-    if (mergeReader.hasNextTimeValuePair()) {
-      currentPageEndpointTime = mergeReader.getCurrentReadStopTime();
-    } else {
-      currentPageEndpointTime = 
orderUtils.getOverlapCheckTime(firstPageReader.getStatistics());
-    }
+    return calculateInitialEndPointTime(mergeReaderStopTime);
+  }
 
-    /*
-     * put all currently directly overlapped unseq page reader to merge reader
-     */
-    unpackAllOverlappedUnseqPageReadersToMergeReader(currentPageEndpointTime);
+  private long calculateInitialEndPointTime(final long currentReadStopTime) {
+    long initialReadStopTime = currentReadStopTime;
+    if (firstPageReader != null
+        && !firstPageReader.isSeq()
+        && orderUtils.isOverlapped(currentReadStopTime, 
firstPageReader.getStatistics())) {
+      if (orderUtils.getAscending()) {
+        initialReadStopTime =
+            Math.max(
+                initialReadStopTime,
+                
orderUtils.getOverlapCheckTime(firstPageReader.getStatistics()));
+      } else {
+        initialReadStopTime =
+            Math.min(
+                initialReadStopTime,
+                
orderUtils.getOverlapCheckTime(firstPageReader.getStatistics()));
+      }
+    }
+    for (IVersionPageReader unSeqPageReader : unSeqPageReaders) {
+      if (orderUtils.isOverlapped(currentReadStopTime, 
unSeqPageReader.getStatistics())) {
+        if (orderUtils.getAscending()) {
+          initialReadStopTime =
+              Math.max(
+                  initialReadStopTime,
+                  
orderUtils.getOverlapCheckTime(unSeqPageReader.getStatistics()));
+        } else {
+          initialReadStopTime =
+              Math.min(
+                  initialReadStopTime,
+                  
orderUtils.getOverlapCheckTime(unSeqPageReader.getStatistics()));
+        }
+      } else {
+        break;
+      }
+    }
+    return initialReadStopTime;
   }
 
   private void addTimeValuePairToResult(TimeValuePair timeValuePair, 
TsBlockBuilder builder) {
@@ -1135,17 +1186,26 @@ public class SeriesScanUtil implements Accountable {
     return firstPageReader;
   }
 
-  private void unpackAllOverlappedUnseqPageReadersToMergeReader(long 
endpointTime)
-      throws IOException {
-    while (!unSeqPageReaders.isEmpty()
-        && orderUtils.isOverlapped(endpointTime, 
unSeqPageReaders.peek().getStatistics())) {
-      putPageReaderToMergeReader(unSeqPageReaders.poll());
-    }
+  // This process loads overlapped unseq pages based on the current time value 
pair of the
+  // mergeReader. The current time value pair of the mergeReader is 
recalculated each time an unseq
+  // page is added.
+  // The current time obtained from mergeReader each time is not necessarily 
the minimum among all
+  // the actual unseq data, so it is necessary to repeatedly calculate and 
include potentially
+  // overlapping unseq pages.
+  private void unpackAllOverlappedUnseqPageReadersToMergeReader() throws 
IOException {
+    long actualFirstTimeOfMergeReader = 
mergeReader.currentTimeValuePair().getTimestamp();
     if (firstPageReader != null
         && !firstPageReader.isSeq()
-        && orderUtils.isOverlapped(endpointTime, 
firstPageReader.getStatistics())) {
+        && orderUtils.isOverlapped(actualFirstTimeOfMergeReader, 
firstPageReader.getStatistics())) {
       putPageReaderToMergeReader(firstPageReader);
       firstPageReader = null;
+      actualFirstTimeOfMergeReader = 
mergeReader.currentTimeValuePair().getTimestamp();
+    }
+    while (!unSeqPageReaders.isEmpty()
+        && orderUtils.isOverlapped(
+            actualFirstTimeOfMergeReader, 
unSeqPageReaders.peek().getStatistics())) {
+      putPageReaderToMergeReader(unSeqPageReaders.poll());
+      actualFirstTimeOfMergeReader = 
mergeReader.currentTimeValuePair().getTimestamp();
     }
   }
 

Reply via email to