This is an automated email from the ASF dual-hosted git repository. shuwenwei pushed a commit to branch fixMemTableQueryBug in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit fb03dda1a10302ecc781314d05a0df1eb683c0b0 Author: shuwenwei <[email protected]> AuthorDate: Mon Dec 29 17:53:59 2025 +0800 try fix --- .../execution/operator/source/SeriesScanUtil.java | 108 +++++++++++++++------ 1 file changed, 77 insertions(+), 31 deletions(-) diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/SeriesScanUtil.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/SeriesScanUtil.java index e2fbb4be4ec..5da3a9c315f 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/SeriesScanUtil.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/SeriesScanUtil.java @@ -890,7 +890,17 @@ public class SeriesScanUtil implements Accountable { return true; } - tryToPutAllDirectlyOverlappedUnseqPageReadersIntoMergeReader(); + // init the merge reader for current call + // The original process is changed to lazy loading because different mem page readers + // belonging to the same mem chunk need to be read in a streaming manner. Therefore, it is + // necessary to ensure that these mem page readers cannot coexist in the mergeReader at the + // same time. + // The initial endPointTime is calculated as follows: + // 1. If mergeReader is empty, use the endpoint of firstPageReader to find all overlapped + // unseq pages and take the end point. + // 2. If mergeReader is not empty, use the readStopTime of mergeReader to find all overlapping + // unseq pages and take the end point. + long initialEndPointTime = tryToPutAllDirectlyOverlappedUnseqPageReadersIntoMergeReader(); while (true) { @@ -898,7 +908,8 @@ public class SeriesScanUtil implements Accountable { if (mergeReader.hasNextTimeValuePair()) { TsBlockBuilder builder = new TsBlockBuilder(getTsDataTypeList()); - long currentPageEndPointTime = mergeReader.getCurrentReadStopTime(); + long currentPageEndPointTime = + Math.max(mergeReader.getCurrentReadStopTime(), initialEndPointTime); while (mergeReader.hasNextTimeValuePair()) { /* @@ -928,7 +939,7 @@ public class SeriesScanUtil implements Accountable { unpackAllOverlappedTimeSeriesMetadataToCachedChunkMetadata( timeValuePair.getTimestamp(), false); unpackAllOverlappedChunkMetadataToPageReaders(timeValuePair.getTimestamp(), false); - unpackAllOverlappedUnseqPageReadersToMergeReader(timeValuePair.getTimestamp()); + unpackAllOverlappedUnseqPageReadersToMergeReader(); // update if there are unpacked unSeqPageReaders timeValuePair = mergeReader.currentTimeValuePair(); @@ -1017,33 +1028,59 @@ public class SeriesScanUtil implements Accountable { } } - private void tryToPutAllDirectlyOverlappedUnseqPageReadersIntoMergeReader() throws IOException { + private long tryToPutAllDirectlyOverlappedUnseqPageReadersIntoMergeReader() throws IOException { + do { + /* + * no cached page readers + */ + if (firstPageReader == null && unSeqPageReaders.isEmpty() && seqPageReaders.isEmpty()) { + return mergeReader.getCurrentReadStopTime(); + } - /* - * no cached page readers - */ - if (firstPageReader == null && unSeqPageReaders.isEmpty() && seqPageReaders.isEmpty()) { - return; - } + /* + * init firstPageReader + */ + if (firstPageReader == null) { + initFirstPageReader(); + } + putPageReaderToMergeReader(firstPageReader); + firstPageReader = null; + } while (!mergeReader.hasNextTimeValuePair()); /* - * init firstPageReader + * put all currently directly overlapped unseq page reader to merge reader */ - if (firstPageReader == null) { - initFirstPageReader(); - } + long mergeReaderStopTime = mergeReader.getCurrentReadStopTime(); + unpackAllOverlappedUnseqPageReadersToMergeReader(); - long currentPageEndpointTime; - if (mergeReader.hasNextTimeValuePair()) { - currentPageEndpointTime = mergeReader.getCurrentReadStopTime(); - } else { - currentPageEndpointTime = orderUtils.getOverlapCheckTime(firstPageReader.getStatistics()); - } + return calculateInitialEndPointTime(mergeReaderStopTime); + } - /* - * put all currently directly overlapped unseq page reader to merge reader - */ - unpackAllOverlappedUnseqPageReadersToMergeReader(currentPageEndpointTime); + private long calculateInitialEndPointTime(long currentReadStopTime) { + if (firstPageReader != null + && !firstPageReader.isSeq() + && orderUtils.isOverlapped(currentReadStopTime, firstPageReader.getStatistics())) { + if (orderUtils.getAscending()) { + currentReadStopTime = + Math.max(currentReadStopTime, firstPageReader.getStatistics().getEndTime()); + } else { + currentReadStopTime = + Math.min(currentReadStopTime, firstPageReader.getStatistics().getEndTime()); + } + } + for (IVersionPageReader unSeqPageReader : unSeqPageReaders) { + if (orderUtils.isOverlapped(currentReadStopTime, unSeqPageReader.getStatistics())) { + if (orderUtils.getAscending()) { + currentReadStopTime = + Math.max(currentReadStopTime, firstPageReader.getStatistics().getEndTime()); + } else { + currentReadStopTime = + Math.min(currentReadStopTime, firstPageReader.getStatistics().getEndTime()); + } + } + break; + } + return currentReadStopTime; } private void addTimeValuePairToResult(TimeValuePair timeValuePair, TsBlockBuilder builder) { @@ -1135,17 +1172,26 @@ public class SeriesScanUtil implements Accountable { return firstPageReader; } - private void unpackAllOverlappedUnseqPageReadersToMergeReader(long endpointTime) - throws IOException { - while (!unSeqPageReaders.isEmpty() - && orderUtils.isOverlapped(endpointTime, unSeqPageReaders.peek().getStatistics())) { - putPageReaderToMergeReader(unSeqPageReaders.poll()); - } + // This process loads overlapped unseq pages based on the current time value pair of the + // mergeReader. The current time value pair of the mergeReader is recalculated each time an unseq + // page is added. + // The current time obtained from mergeReader each time is not necessarily the minimum among all + // the actual unseq data, so it is necessary to repeatedly calculate and include potentially + // overlapping unseq pages. + private void unpackAllOverlappedUnseqPageReadersToMergeReader() throws IOException { + long actualFirstTimeOfMergeReader = mergeReader.currentTimeValuePair().getTimestamp(); if (firstPageReader != null && !firstPageReader.isSeq() - && orderUtils.isOverlapped(endpointTime, firstPageReader.getStatistics())) { + && orderUtils.isOverlapped(actualFirstTimeOfMergeReader, firstPageReader.getStatistics())) { putPageReaderToMergeReader(firstPageReader); firstPageReader = null; + actualFirstTimeOfMergeReader = mergeReader.currentTimeValuePair().getTimestamp(); + } + while (!unSeqPageReaders.isEmpty() + && orderUtils.isOverlapped( + actualFirstTimeOfMergeReader, unSeqPageReaders.peek().getStatistics())) { + putPageReaderToMergeReader(unSeqPageReaders.poll()); + actualFirstTimeOfMergeReader = mergeReader.currentTimeValuePair().getTimestamp(); } }
