This is an automated email from the ASF dual-hosted git repository.
JackieTien97 pushed a commit to branch dev/1.3
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/dev/1.3 by this push:
new 1d3578026d8 [to dev/1.3] Fix query reuse of flushing memtable TVList
(#17709)
1d3578026d8 is described below
commit 1d3578026d8e19d5801f4d47fe0a9e03c141678b
Author: shuwenwei <[email protected]>
AuthorDate: Tue May 19 14:27:43 2026 +0800
[to dev/1.3] Fix query reuse of flushing memtable TVList (#17709)
---
.../execution/fragment/QueryContext.java | 4 +-
.../schemaregion/utils/ResourceByPathUtils.java | 40 ++++++++++++++-
.../dataregion/flush/MemTableFlushTask.java | 1 -
.../memtable/AbstractWritableMemChunk.java | 20 ++++++--
.../memtable/AlignedReadOnlyMemChunk.java | 2 +-
.../dataregion/memtable/IWritableMemChunk.java | 3 ++
.../dataregion/memtable/ReadOnlyMemChunk.java | 2 +-
.../fragment/FragmentInstanceExecutionTest.java | 6 +--
.../dataregion/memtable/PrimitiveMemTableTest.java | 59 +++++++++++++++++++++-
9 files changed, 123 insertions(+), 14 deletions(-)
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/fragment/QueryContext.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/fragment/QueryContext.java
index 8d62e207766..d1b235ff7b1 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/fragment/QueryContext.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/fragment/QueryContext.java
@@ -217,7 +217,7 @@ public class QueryContext {
this.queryStatistics = queryStatistics;
}
- public void addTVListToSet(Map<TVList, Integer> tvListMap) {
- tvListSet.addAll(tvListMap.keySet());
+ public void addTVListToSet(Set<TVList> set) {
+ tvListSet.addAll(set);
}
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/utils/ResourceByPathUtils.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/utils/ResourceByPathUtils.java
index c64d32c1e32..fa2f603d6fa 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/utils/ResourceByPathUtils.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/utils/ResourceByPathUtils.java
@@ -158,10 +158,46 @@ public abstract class ResourceByPathUtils {
}
if (!isWorkMemTable) {
+ /*
+ * 1. Q1 queries this TVList while it is still in the working memtable
and records a smaller
+ * visible row count.
+ * 2. Later writes append out-of-order rows to the same TVList, then
FLUSH moves the
+ * memtable to the flushing list.
+ * 3. Q2 queries the flushing memtable. If Q2 directly reuses the
original mutable TVList,
+ * Q2's query-side sort may reorder the indices in place.
+ * 4. Q1 continues to read with its old row count and the reordered
indices. The converted
+ * value index can exceed Q1's bitmap range and cause out-of-bound
access.
+ *
+ * Therefore, this flushing branch can reuse the original list only
when it is already
+ * sorted or no active query is using it. Otherwise, Q2 should read
from
+ * workingListForFlush.
+ */
+ boolean canUseListDirectly = list.isSorted() ||
list.getQueryContextSet().isEmpty();
LOGGER.debug(
"Flushing MemTable - add current query context to mutable TVList's
query list");
- list.getQueryContextSet().add(context);
- tvListQueryMap.put(list, list.rowCount());
+ if (canUseListDirectly) {
+ list.getQueryContextSet().add(context);
+ tvListQueryMap.put(list, list.rowCount());
+ } else {
+ TVList workingListForFlushSort =
memChunk.initWorkingListForFlushIfNecessary(list, true);
+ /*
+ * The query will read from workingListForFlushSort, but
cloneForFlushSort() only clones
+ * times and indices. The value arrays and bitmaps are still shared
with the original
+ * list.
+ *
+ * Therefore, this query must also hold the original list until it
finishes. Adding
+ * context to list.getQueryContextSet() lets flush/query cleanup see
that the original
+ * list is still in use. Adding list to context.tvListSet makes
+ * releaseTVListOwnedByQuery() remove this context from the original
list later.
+ *
+ * Do not put the original list into tvListQueryMap here. The actual
read path must use
+ * workingListForFlushSort to avoid sorting the original list in
place.
+ */
+ list.getQueryContextSet().add(context);
+ context.addTVListToSet(Collections.singleton(list));
+ workingListForFlushSort.getQueryContextSet().add(context);
+ tvListQueryMap.put(workingListForFlushSort,
workingListForFlushSort.rowCount());
+ }
} else {
if (list.isSorted() || list.getQueryContextSet().isEmpty()) {
LOGGER.debug(
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/flush/MemTableFlushTask.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/flush/MemTableFlushTask.java
index 0c5ccc9ac1b..48efe55c223 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/flush/MemTableFlushTask.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/flush/MemTableFlushTask.java
@@ -278,7 +278,6 @@ public class MemTableFlushTask {
times = new long[MAX_NUMBER_OF_POINTS_IN_PAGE];
}
writableMemChunk.encode(ioTaskQueue, encodeInfo, times);
- writableMemChunk.releaseTemporaryTvListForFlush();
long subTaskTime = System.currentTimeMillis() - starTime;
WRITING_METRICS.recordFlushSubTaskCost(WritingMetrics.ENCODING_TASK,
subTaskTime);
memSerializeTime += subTaskTime;
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/AbstractWritableMemChunk.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/AbstractWritableMemChunk.java
index 5167ea96b56..6c773942fb7 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/AbstractWritableMemChunk.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/AbstractWritableMemChunk.java
@@ -206,8 +206,9 @@ public abstract class AbstractWritableMemChunk implements
IWritableMemChunk {
/*
* Concurrency background:
*
- * A query may start earlier and record the current row count (rows) of
the TVList as its visible range.
- * After that, new unseq writes may arrive and immediately trigger a
flush, which will sort the TVList.
+ * A query may start earlier and record the current row count (rows) of
the TVList as its
+ * visible range. After that, new unseq writes may arrive and immediately
trigger a flush, which
+ * will sort the TVList.
*
* During sorting, the underlying indices array of the TVList may be
reordered.
* If the query continues to use the previously recorded rows as its upper
bound,
@@ -219,6 +220,9 @@ public abstract class AbstractWritableMemChunk implements
IWritableMemChunk {
* To avoid this issue, when there are active queries on the working
TVList, we must
* clone the times and indices before sorting, so that the flush sort does
not mutate
* the data structures that concurrent queries rely on.
+ *
+ * Flushing-memtable queries may also reuse workingListForFlush instead of
the original working
+ * TVList for the same reason.
*/
boolean needCloneTimesAndIndicesInWorkingTVList;
workingList.lockQueryList();
@@ -228,7 +232,7 @@ public abstract class AbstractWritableMemChunk implements
IWritableMemChunk {
workingList.unlockQueryList();
}
workingListForFlush =
- needCloneTimesAndIndicesInWorkingTVList ?
workingList.cloneForFlushSort() : workingList;
+ initWorkingListForFlushIfNecessary(workingList,
needCloneTimesAndIndicesInWorkingTVList);
workingListForFlush.sort();
}
@@ -267,4 +271,14 @@ public abstract class AbstractWritableMemChunk implements
IWritableMemChunk {
@Override
public abstract int serializedSize();
+
+ @Override
+ public synchronized TVList initWorkingListForFlushIfNecessary(
+ TVList workingList, boolean needCloneTimesAndIndicesInWorkingTVList) {
+ if (workingListForFlush == null) {
+ workingListForFlush =
+ needCloneTimesAndIndicesInWorkingTVList ?
workingList.cloneForFlushSort() : workingList;
+ }
+ return workingListForFlush;
+ }
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/AlignedReadOnlyMemChunk.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/AlignedReadOnlyMemChunk.java
index 30976fb6790..bb2ee311d30 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/AlignedReadOnlyMemChunk.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/AlignedReadOnlyMemChunk.java
@@ -104,7 +104,7 @@ public class AlignedReadOnlyMemChunk extends
ReadOnlyMemChunk {
this.valueStatisticsList = new ArrayList<>();
this.alignedTvListQueryMap = alignedTvListQueryMap;
this.columnIndexList = columnIndexList;
- this.context.addTVListToSet(alignedTvListQueryMap);
+ this.context.addTVListToSet(alignedTvListQueryMap.keySet());
}
@Override
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/IWritableMemChunk.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/IWritableMemChunk.java
index 99404502437..376136e1d12 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/IWritableMemChunk.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/IWritableMemChunk.java
@@ -126,4 +126,7 @@ public interface IWritableMemChunk extends WALEntryValue {
TVList getWorkingTVList();
void setWorkingTVList(TVList list);
+
+ TVList initWorkingListForFlushIfNecessary(
+ TVList workingList, boolean needCloneTimesAndIndicesInWorkingTVList);
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/ReadOnlyMemChunk.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/ReadOnlyMemChunk.java
index 9e46740a759..c0a71bf7edc 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/ReadOnlyMemChunk.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/ReadOnlyMemChunk.java
@@ -127,7 +127,7 @@ public class ReadOnlyMemChunk {
this.deletionList = deletionList;
this.tvListQueryMap = tvListQueryMap;
this.pageStatisticsList = new ArrayList<>();
- this.context.addTVListToSet(tvListQueryMap);
+ this.context.addTVListToSet(tvListQueryMap.keySet());
}
public void sortTvLists() {
diff --git
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/fragment/FragmentInstanceExecutionTest.java
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/fragment/FragmentInstanceExecutionTest.java
index 22e5c360ae3..cfc7f887dcf 100644
---
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/fragment/FragmentInstanceExecutionTest.java
+++
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/fragment/FragmentInstanceExecutionTest.java
@@ -44,7 +44,7 @@ import
org.apache.iotdb.db.storageengine.dataregion.memtable.ReadOnlyMemChunk;
import org.apache.iotdb.db.utils.datastructure.AlignedTVList;
import org.apache.iotdb.db.utils.datastructure.TVList;
-import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.ImmutableSet;
import org.apache.tsfile.enums.TSDataType;
import org.apache.tsfile.file.metadata.enums.CompressionType;
import org.apache.tsfile.file.metadata.enums.TSEncoding;
@@ -133,13 +133,13 @@ public class FragmentInstanceExecutionTest {
FragmentInstanceExecution execution1 =
createFragmentInstanceExecution(1, instanceNotificationExecutor);
FragmentInstanceContext fragmentInstanceContext1 =
execution1.getFragmentInstanceContext();
- fragmentInstanceContext1.addTVListToSet(ImmutableMap.of(tvList, 0));
+ fragmentInstanceContext1.addTVListToSet(ImmutableSet.of(tvList));
tvList.getQueryContextSet().add(fragmentInstanceContext1);
FragmentInstanceExecution execution2 =
createFragmentInstanceExecution(2, instanceNotificationExecutor);
FragmentInstanceContext fragmentInstanceContext2 =
execution2.getFragmentInstanceContext();
- fragmentInstanceContext2.addTVListToSet(ImmutableMap.of(tvList, 0));
+ fragmentInstanceContext2.addTVListToSet(ImmutableSet.of(tvList));
tvList.getQueryContextSet().add(fragmentInstanceContext2);
// mock flush's behavior
diff --git
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/memtable/PrimitiveMemTableTest.java
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/memtable/PrimitiveMemTableTest.java
index b366a48ab77..b9259db0a0f 100644
---
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/memtable/PrimitiveMemTableTest.java
+++
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/memtable/PrimitiveMemTableTest.java
@@ -222,6 +222,63 @@ public class PrimitiveMemTableTest {
}
}
+ @Test
+ public void testFlushingQueryDoesNotSortWorkingTVListUsedByPreviousQuery()
+ throws QueryProcessException, IOException, IllegalPathException {
+
+ PrimitiveMemTable memTable = new PrimitiveMemTable("root.test", "0");
+ List<IMeasurementSchema> measurementSchemas =
+ Arrays.asList(
+ new MeasurementSchema("s1", TSDataType.INT32),
+ new MeasurementSchema("s2", TSDataType.INT32),
+ new MeasurementSchema("s3", TSDataType.INT32));
+ PlainDeviceID deviceID = new PlainDeviceID("root.test.d1");
+ for (int i = 1000; i < 2000; i++) {
+ memTable.writeAlignedRow(deviceID, measurementSchemas, i, new Object[]
{i, i, i});
+ }
+
+ ResourceByPathUtils resourcesByPathUtils =
+ ResourceByPathUtils.getResourceInstance(
+ new AlignedPath("root.test.d1", Arrays.asList("s1", "s2", "s3"),
measurementSchemas));
+ AlignedReadOnlyMemChunk firstQueryMemChunk =
+ (AlignedReadOnlyMemChunk)
+ resourcesByPathUtils.getReadOnlyMemChunkFromMemTable(
+ new QueryContext(1), memTable, null, Long.MAX_VALUE, null);
+ TVList originalWorkingList = memTable.getWritableMemChunk(deviceID,
"").getWorkingTVList();
+ Assert.assertSame(
+ originalWorkingList,
+
firstQueryMemChunk.getAligendTvListQueryMap().keySet().iterator().next());
+
+ for (int i = 1; i <= 50; i++) {
+ memTable.writeAlignedRow(deviceID, measurementSchemas, i, new Object[]
{i, i, i});
+ }
+ MeasurementPath path = new MeasurementPath("root.test.d1.s1",
TSDataType.INT32);
+ memTable.delete(path, path.getDevicePath(), 1, 10);
+ path = new MeasurementPath("root.test.d1.s2", TSDataType.INT32);
+ memTable.delete(path, path.getDevicePath(), 1, 10);
+ path = new MeasurementPath("root.test.d1.s3", TSDataType.INT32);
+ memTable.delete(path, path.getDevicePath(), 1, 10);
+ Assert.assertFalse(originalWorkingList.isSorted());
+
+ AlignedReadOnlyMemChunk flushingQueryMemChunk =
+ (AlignedReadOnlyMemChunk)
+ resourcesByPathUtils.getReadOnlyMemChunkFromMemTable(
+ new QueryContext(2), memTable, new ArrayList<>(),
Long.MAX_VALUE, null);
+ TVList flushingQueryList =
+
flushingQueryMemChunk.getAligendTvListQueryMap().keySet().iterator().next();
+ Assert.assertNotSame(originalWorkingList, flushingQueryList);
+
+ flushingQueryMemChunk.sortTvLists();
+ Assert.assertFalse(originalWorkingList.isSorted());
+
+ firstQueryMemChunk.sortTvLists();
+ MemPointIterator memPointIterator =
+ firstQueryMemChunk.createMemPointIterator(Ordering.ASC, null);
+ while (memPointIterator.hasNextBatch()) {
+ memPointIterator.nextBatch();
+ }
+ }
+
@Test
public void memSeriesToStringTest() throws IOException {
TSDataType dataType = TSDataType.INT32;
@@ -743,7 +800,7 @@ public class PrimitiveMemTableTest {
list.getQueryContextSet().add(queryContext);
Map<TVList, Integer> tvlistMap = new HashMap<>();
tvlistMap.put(list, 100);
- queryContext.addTVListToSet(tvlistMap);
+ queryContext.addTVListToSet(tvlistMap.keySet());
// fragment instance execution
IDriverScheduler scheduler = Mockito.mock(IDriverScheduler.class);