This is an automated email from the ASF dual-hosted git repository.
jackietien pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new 9478b77da3d fix mem table query bug #16964
9478b77da3d is described below
commit 9478b77da3df8879eae99c599aba3b6387496525
Author: shuwenwei <[email protected]>
AuthorDate: Tue Dec 30 15:06:18 2025 +0800
fix mem table query bug #16964
---
.../org/apache/iotdb/db/it/IoTDBFlushQueryIT.java | 82 ++++++++++++++
.../execution/operator/source/SeriesScanUtil.java | 122 +++++++++++++++------
2 files changed, 173 insertions(+), 31 deletions(-)
diff --git
a/integration-test/src/test/java/org/apache/iotdb/db/it/IoTDBFlushQueryIT.java
b/integration-test/src/test/java/org/apache/iotdb/db/it/IoTDBFlushQueryIT.java
index 6253e618eda..b1a5a3d1562 100644
---
a/integration-test/src/test/java/org/apache/iotdb/db/it/IoTDBFlushQueryIT.java
+++
b/integration-test/src/test/java/org/apache/iotdb/db/it/IoTDBFlushQueryIT.java
@@ -19,12 +19,21 @@
package org.apache.iotdb.db.it;
+import org.apache.iotdb.isession.ISession;
+import org.apache.iotdb.isession.SessionDataSet;
import org.apache.iotdb.it.env.EnvFactory;
import org.apache.iotdb.it.framework.IoTDBTestRunner;
import org.apache.iotdb.itbase.category.ClusterIT;
import org.apache.iotdb.itbase.category.LocalStandaloneIT;
+import org.apache.iotdb.rpc.IoTDBConnectionException;
+import org.apache.iotdb.rpc.StatementExecutionException;
+import org.apache.tsfile.enums.TSDataType;
+import org.apache.tsfile.write.record.Tablet;
+import org.apache.tsfile.write.schema.IMeasurementSchema;
+import org.apache.tsfile.write.schema.MeasurementSchema;
import org.junit.AfterClass;
+import org.junit.Assert;
import org.junit.BeforeClass;
import org.junit.Test;
import org.junit.experimental.categories.Category;
@@ -34,6 +43,8 @@ import java.sql.Connection;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.sql.Statement;
+import java.util.Collections;
+import java.util.List;
import java.util.Locale;
import static org.junit.Assert.assertEquals;
@@ -181,4 +192,75 @@ public class IoTDBFlushQueryIT {
fail(e.getMessage());
}
}
+
+ @Test
+ public void testStreamingQueryMemTableWithOverlappedData()
+ throws IoTDBConnectionException, StatementExecutionException {
+ String device = "root.stream1.d1";
+ try (ISession session = EnvFactory.getEnv().getSessionConnection()) {
+ session.open();
+ generateTimeRangeWithTimestamp(session, device, 1, 10);
+
+ generateTimeRangeWithTimestamp(session, device, 500000, 510000);
+ session.executeNonQueryStatement("flush");
+ generateTimeRangeWithTimestamp(session, device, 100000, 350000);
+
+ SessionDataSet sessionDataSet =
+ session.executeQueryStatement("select count(*) from
root.stream1.d1");
+ SessionDataSet.DataIterator iterator = sessionDataSet.iterator();
+ long count = 0;
+ while (iterator.next()) {
+ count = iterator.getLong(1);
+ }
+ Assert.assertEquals(10 + 10001 + 250001, count);
+ }
+ }
+
+ @Test
+ public void testStreamingQueryMemTableWithOverlappedData2()
+ throws IoTDBConnectionException, StatementExecutionException {
+ String device = "root.stream2.d1";
+ try (ISession session = EnvFactory.getEnv().getSessionConnection()) {
+ session.open();
+ generateTimeRangeWithTimestamp(session, device, 1, 10);
+
+ generateTimeRangeWithTimestamp(session, device, 500000, 510000);
+ session.executeNonQueryStatement("flush");
+ generateTimeRangeWithTimestamp(session, device, 1, 20);
+ generateTimeRangeWithTimestamp(session, device, 100000, 210000);
+ session.executeNonQueryStatement("flush");
+
+ generateTimeRangeWithTimestamp(session, device, 150000, 450000);
+
+ SessionDataSet sessionDataSet =
+ session.executeQueryStatement("select count(*) from
root.stream2.d1");
+ SessionDataSet.DataIterator iterator = sessionDataSet.iterator();
+ long count = 0;
+ while (iterator.next()) {
+ count = iterator.getLong(1);
+ }
+ Assert.assertEquals(20 + 10001 + 350001, count);
+ }
+ }
+
+ private static void generateTimeRangeWithTimestamp(
+ ISession session, String device, long start, long end)
+ throws IoTDBConnectionException, StatementExecutionException {
+ List<IMeasurementSchema> measurementSchemas =
+ Collections.singletonList(new MeasurementSchema("s1",
TSDataType.INT64));
+ Tablet tablet = new Tablet(device, measurementSchemas);
+ for (long currentTime = start; currentTime <= end; currentTime++) {
+ int rowIndex = tablet.getRowSize();
+ if (rowIndex == tablet.getMaxRowNumber()) {
+ session.insertTablet(tablet);
+ tablet.reset();
+ rowIndex = 0;
+ }
+ tablet.addTimestamp(rowIndex, currentTime);
+ tablet.addValue(rowIndex, 0, currentTime);
+ }
+ if (tablet.getRowSize() > 0) {
+ session.insertTablet(tablet);
+ }
+ }
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/SeriesScanUtil.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/SeriesScanUtil.java
index e2fbb4be4ec..74196196104 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/SeriesScanUtil.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/SeriesScanUtil.java
@@ -890,7 +890,17 @@ public class SeriesScanUtil implements Accountable {
return true;
}
- tryToPutAllDirectlyOverlappedUnseqPageReadersIntoMergeReader();
+ // init the merge reader for current call
+ // The original process is changed to lazy loading because different mem
page readers
+ // belonging to the same mem chunk need to be read in a streaming
manner. Therefore, it is
+ // necessary to ensure that these mem page readers cannot coexist in the
mergeReader at the
+ // same time.
+ // The initial endPointTime is calculated as follows:
+ // 1. If mergeReader is empty, use the endpoint of firstPageReader to
find all overlapped
+ // unseq pages and take the end point.
+ // 2. If mergeReader is not empty, use the readStopTime of mergeReader
to find all overlapping
+ // unseq pages and take the end point.
+ long initialEndPointTime =
tryToPutAllDirectlyOverlappedUnseqPageReadersIntoMergeReader();
while (true) {
@@ -898,7 +908,10 @@ public class SeriesScanUtil implements Accountable {
if (mergeReader.hasNextTimeValuePair()) {
TsBlockBuilder builder = new TsBlockBuilder(getTsDataTypeList());
- long currentPageEndPointTime = mergeReader.getCurrentReadStopTime();
+ long currentPageEndPointTime =
+ orderUtils.getAscending()
+ ? Math.max(mergeReader.getCurrentReadStopTime(),
initialEndPointTime)
+ : Math.min(mergeReader.getCurrentReadStopTime(),
initialEndPointTime);
while (mergeReader.hasNextTimeValuePair()) {
/*
@@ -928,7 +941,7 @@ public class SeriesScanUtil implements Accountable {
unpackAllOverlappedTimeSeriesMetadataToCachedChunkMetadata(
timeValuePair.getTimestamp(), false);
unpackAllOverlappedChunkMetadataToPageReaders(timeValuePair.getTimestamp(),
false);
-
unpackAllOverlappedUnseqPageReadersToMergeReader(timeValuePair.getTimestamp());
+ unpackAllOverlappedUnseqPageReadersToMergeReader();
// update if there are unpacked unSeqPageReaders
timeValuePair = mergeReader.currentTimeValuePair();
@@ -1017,33 +1030,71 @@ public class SeriesScanUtil implements Accountable {
}
}
- private void tryToPutAllDirectlyOverlappedUnseqPageReadersIntoMergeReader()
throws IOException {
+ private long tryToPutAllDirectlyOverlappedUnseqPageReadersIntoMergeReader()
throws IOException {
+ do {
+ /*
+ * no cached page readers
+ */
+ if (firstPageReader == null && unSeqPageReaders.isEmpty() &&
seqPageReaders.isEmpty()) {
+ return mergeReader.getCurrentReadStopTime();
+ }
- /*
- * no cached page readers
- */
- if (firstPageReader == null && unSeqPageReaders.isEmpty() &&
seqPageReaders.isEmpty()) {
- return;
- }
+ /*
+ * init firstPageReader
+ */
+ if (firstPageReader == null) {
+ initFirstPageReader();
+ }
+ if (!mergeReader.hasNextTimeValuePair()) {
+ putPageReaderToMergeReader(firstPageReader);
+ firstPageReader = null;
+ }
+ } while (!mergeReader.hasNextTimeValuePair());
/*
- * init firstPageReader
+ * put all currently directly overlapped unseq page reader to merge reader
*/
- if (firstPageReader == null) {
- initFirstPageReader();
- }
+ long mergeReaderStopTime = mergeReader.getCurrentReadStopTime();
+ unpackAllOverlappedUnseqPageReadersToMergeReader();
- long currentPageEndpointTime;
- if (mergeReader.hasNextTimeValuePair()) {
- currentPageEndpointTime = mergeReader.getCurrentReadStopTime();
- } else {
- currentPageEndpointTime =
orderUtils.getOverlapCheckTime(firstPageReader.getStatistics());
- }
+ return calculateInitialEndPointTime(mergeReaderStopTime);
+ }
- /*
- * put all currently directly overlapped unseq page reader to merge reader
- */
- unpackAllOverlappedUnseqPageReadersToMergeReader(currentPageEndpointTime);
+ private long calculateInitialEndPointTime(final long currentReadStopTime) {
+ long initialReadStopTime = currentReadStopTime;
+ if (firstPageReader != null
+ && !firstPageReader.isSeq()
+ && orderUtils.isOverlapped(currentReadStopTime,
firstPageReader.getStatistics())) {
+ if (orderUtils.getAscending()) {
+ initialReadStopTime =
+ Math.max(
+ initialReadStopTime,
+
orderUtils.getOverlapCheckTime(firstPageReader.getStatistics()));
+ } else {
+ initialReadStopTime =
+ Math.min(
+ initialReadStopTime,
+
orderUtils.getOverlapCheckTime(firstPageReader.getStatistics()));
+ }
+ }
+ for (IVersionPageReader unSeqPageReader : unSeqPageReaders) {
+ if (orderUtils.isOverlapped(currentReadStopTime,
unSeqPageReader.getStatistics())) {
+ if (orderUtils.getAscending()) {
+ initialReadStopTime =
+ Math.max(
+ initialReadStopTime,
+
orderUtils.getOverlapCheckTime(unSeqPageReader.getStatistics()));
+ } else {
+ initialReadStopTime =
+ Math.min(
+ initialReadStopTime,
+
orderUtils.getOverlapCheckTime(unSeqPageReader.getStatistics()));
+ }
+ } else {
+ break;
+ }
+ }
+ return initialReadStopTime;
}
private void addTimeValuePairToResult(TimeValuePair timeValuePair,
TsBlockBuilder builder) {
@@ -1135,17 +1186,26 @@ public class SeriesScanUtil implements Accountable {
return firstPageReader;
}
- private void unpackAllOverlappedUnseqPageReadersToMergeReader(long
endpointTime)
- throws IOException {
- while (!unSeqPageReaders.isEmpty()
- && orderUtils.isOverlapped(endpointTime,
unSeqPageReaders.peek().getStatistics())) {
- putPageReaderToMergeReader(unSeqPageReaders.poll());
- }
+ // This process loads overlapped unseq pages based on the current time value
pair of the
+ // mergeReader. The current time value pair of the mergeReader is
recalculated each time an unseq
+ // page is added.
+ // The current time obtained from mergeReader each time is not necessarily
the minimum among all
+ // the actual unseq data, so it is necessary to repeatedly calculate and
include potentially
+ // overlapping unseq pages.
+ private void unpackAllOverlappedUnseqPageReadersToMergeReader() throws
IOException {
+ long actualFirstTimeOfMergeReader =
mergeReader.currentTimeValuePair().getTimestamp();
if (firstPageReader != null
&& !firstPageReader.isSeq()
- && orderUtils.isOverlapped(endpointTime,
firstPageReader.getStatistics())) {
+ && orderUtils.isOverlapped(actualFirstTimeOfMergeReader,
firstPageReader.getStatistics())) {
putPageReaderToMergeReader(firstPageReader);
firstPageReader = null;
+ actualFirstTimeOfMergeReader =
mergeReader.currentTimeValuePair().getTimestamp();
+ }
+ while (!unSeqPageReaders.isEmpty()
+ && orderUtils.isOverlapped(
+ actualFirstTimeOfMergeReader,
unSeqPageReaders.peek().getStatistics())) {
+ putPageReaderToMergeReader(unSeqPageReaders.poll());
+ actualFirstTimeOfMergeReader =
mergeReader.currentTimeValuePair().getTimestamp();
}
}