This is an automated email from the ASF dual-hosted git repository.

shuwenwei pushed a commit to branch fixBug0518-1.3
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit b30181c5dce6f49c841cb8165e83f98cbde15664
Author: shuwenwei <[email protected]>
AuthorDate: Mon May 18 18:54:47 2026 +0800

    Fix query reuse of flushing memtable TVList
---
 .../schemaregion/utils/ResourceByPathUtils.java    | 29 ++++++++++-
 .../dataregion/flush/MemTableFlushTask.java        |  1 -
 .../memtable/AbstractWritableMemChunk.java         | 20 ++++++--
 .../dataregion/memtable/IWritableMemChunk.java     |  3 ++
 .../dataregion/memtable/PrimitiveMemTableTest.java | 57 ++++++++++++++++++++++
 5 files changed, 104 insertions(+), 6 deletions(-)

diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/utils/ResourceByPathUtils.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/utils/ResourceByPathUtils.java
index c64d32c1e32..98a4408bb10 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/utils/ResourceByPathUtils.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/utils/ResourceByPathUtils.java
@@ -158,10 +158,35 @@ public abstract class ResourceByPathUtils {
       }
 
       if (!isWorkMemTable) {
+        /*
+         * 1. Q1 queries this TVList while it is still in the working memtable 
and records a smaller
+         *    visible row count.
+         * 2. Later writes append out-of-order rows to the same TVList, then 
FLUSH moves the
+         *    memtable to the flushing list.
+         * 3. Q2 queries the flushing memtable. If Q2 directly reuses the 
original mutable TVList,
+         *    Q2's query-side sort may reorder the indices in place.
+         * 4. Q1 continues to read with its old row count and the reordered 
indices. The converted
+         *    value index can exceed Q1's bitmap range and cause out-of-bound 
access.
+         *
+         * Therefore, this flushing branch can reuse the original list only 
when it is already
+         * sorted or no active query is using it. Otherwise, Q2 should read 
from
+         * workingListForFlush.
+         */
+        boolean canUseListDirectly = list.isSorted() || 
list.getQueryContextSet().isEmpty();
         LOGGER.debug(
             "Flushing MemTable - add current query context to mutable TVList's 
query list");
-        list.getQueryContextSet().add(context);
-        tvListQueryMap.put(list, list.rowCount());
+        if (canUseListDirectly) {
+          list.getQueryContextSet().add(context);
+          tvListQueryMap.put(list, list.rowCount());
+        } else {
+          TVList workingListForFlushSort = 
memChunk.initWorkingListForFlushIfNecessary(list, true);
+          // The flush list shares value arrays with the original list, so 
keep the original list
+          // referenced by this query until the query finishes.
+          list.getQueryContextSet().add(context);
+          workingListForFlushSort.getQueryContextSet().add(context);
+          context.addTVListToSet(Collections.singletonMap(list, 0));
+          tvListQueryMap.put(workingListForFlushSort, 
workingListForFlushSort.rowCount());
+        }
       } else {
         if (list.isSorted() || list.getQueryContextSet().isEmpty()) {
           LOGGER.debug(
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/flush/MemTableFlushTask.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/flush/MemTableFlushTask.java
index 0c5ccc9ac1b..48efe55c223 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/flush/MemTableFlushTask.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/flush/MemTableFlushTask.java
@@ -278,7 +278,6 @@ public class MemTableFlushTask {
                 times = new long[MAX_NUMBER_OF_POINTS_IN_PAGE];
               }
               writableMemChunk.encode(ioTaskQueue, encodeInfo, times);
-              writableMemChunk.releaseTemporaryTvListForFlush();
               long subTaskTime = System.currentTimeMillis() - starTime;
               
WRITING_METRICS.recordFlushSubTaskCost(WritingMetrics.ENCODING_TASK, 
subTaskTime);
               memSerializeTime += subTaskTime;
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/AbstractWritableMemChunk.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/AbstractWritableMemChunk.java
index 5167ea96b56..6c773942fb7 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/AbstractWritableMemChunk.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/AbstractWritableMemChunk.java
@@ -206,8 +206,9 @@ public abstract class AbstractWritableMemChunk implements 
IWritableMemChunk {
     /*
      * Concurrency background:
      *
-     * A query may start earlier and record the current row count (rows) of 
the TVList as its visible range.
-     *  After that, new unseq writes may arrive and immediately trigger a 
flush, which will sort the TVList.
+     * A query may start earlier and record the current row count (rows) of 
the TVList as its
+     * visible range. After that, new unseq writes may arrive and immediately 
trigger a flush, which
+     * will sort the TVList.
      *
      * During sorting, the underlying indices array of the TVList may be 
reordered.
      * If the query continues to use the previously recorded rows as its upper 
bound,
@@ -219,6 +220,9 @@ public abstract class AbstractWritableMemChunk implements 
IWritableMemChunk {
      * To avoid this issue, when there are active queries on the working 
TVList, we must
      * clone the times and indices before sorting, so that the flush sort does 
not mutate
      * the data structures that concurrent queries rely on.
+     *
+     * Flushing-memtable queries may also reuse workingListForFlush instead of 
the original working
+     * TVList for the same reason.
      */
     boolean needCloneTimesAndIndicesInWorkingTVList;
     workingList.lockQueryList();
@@ -228,7 +232,7 @@ public abstract class AbstractWritableMemChunk implements 
IWritableMemChunk {
       workingList.unlockQueryList();
     }
     workingListForFlush =
-        needCloneTimesAndIndicesInWorkingTVList ? 
workingList.cloneForFlushSort() : workingList;
+        initWorkingListForFlushIfNecessary(workingList, 
needCloneTimesAndIndicesInWorkingTVList);
     workingListForFlush.sort();
   }
 
@@ -267,4 +271,14 @@ public abstract class AbstractWritableMemChunk implements 
IWritableMemChunk {
 
   @Override
   public abstract int serializedSize();
+
+  @Override
+  public synchronized TVList initWorkingListForFlushIfNecessary(
+      TVList workingList, boolean needCloneTimesAndIndicesInWorkingTVList) {
+    if (workingListForFlush == null) {
+      workingListForFlush =
+          needCloneTimesAndIndicesInWorkingTVList ? 
workingList.cloneForFlushSort() : workingList;
+    }
+    return workingListForFlush;
+  }
 }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/IWritableMemChunk.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/IWritableMemChunk.java
index 99404502437..376136e1d12 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/IWritableMemChunk.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/IWritableMemChunk.java
@@ -126,4 +126,7 @@ public interface IWritableMemChunk extends WALEntryValue {
   TVList getWorkingTVList();
 
   void setWorkingTVList(TVList list);
+
+  TVList initWorkingListForFlushIfNecessary(
+      TVList workingList, boolean needCloneTimesAndIndicesInWorkingTVList);
 }
diff --git 
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/memtable/PrimitiveMemTableTest.java
 
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/memtable/PrimitiveMemTableTest.java
index b366a48ab77..b55a15bcdd7 100644
--- 
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/memtable/PrimitiveMemTableTest.java
+++ 
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/memtable/PrimitiveMemTableTest.java
@@ -222,6 +222,63 @@ public class PrimitiveMemTableTest {
     }
   }
 
+  @Test
+  public void testFlushingQueryDoesNotSortWorkingTVListUsedByPreviousQuery()
+      throws QueryProcessException, IOException, IllegalPathException {
+
+    PrimitiveMemTable memTable = new PrimitiveMemTable("root.test", "0");
+    List<IMeasurementSchema> measurementSchemas =
+        Arrays.asList(
+            new MeasurementSchema("s1", TSDataType.INT32),
+            new MeasurementSchema("s2", TSDataType.INT32),
+            new MeasurementSchema("s3", TSDataType.INT32));
+    PlainDeviceID deviceID = new PlainDeviceID("root.test.d1");
+    for (int i = 1000; i < 2000; i++) {
+      memTable.writeAlignedRow(deviceID, measurementSchemas, i, new Object[] 
{i, i, i});
+    }
+
+    ResourceByPathUtils resourcesByPathUtils =
+        ResourceByPathUtils.getResourceInstance(
+            new AlignedPath("root.test.d1", Arrays.asList("s1", "s2", "s3"), 
measurementSchemas));
+    AlignedReadOnlyMemChunk firstQueryMemChunk =
+        (AlignedReadOnlyMemChunk)
+            resourcesByPathUtils.getReadOnlyMemChunkFromMemTable(
+                new QueryContext(1), memTable, null, Long.MAX_VALUE, null);
+    TVList originalWorkingList = memTable.getWritableMemChunk(deviceID, 
"").getWorkingTVList();
+    Assert.assertSame(
+        originalWorkingList,
+        
firstQueryMemChunk.getAligendTvListQueryMap().keySet().iterator().next());
+
+    for (int i = 1; i <= 50; i++) {
+      memTable.writeAlignedRow(deviceID, measurementSchemas, i, new Object[] 
{i, i, i});
+    }
+    MeasurementPath path = new MeasurementPath("root.test.d1.s1", 
TSDataType.INT32);
+    memTable.delete(path, path.getDevicePath(), 1, 10);
+    path = new MeasurementPath("root.test.d1.s2", TSDataType.INT32);
+    memTable.delete(path, path.getDevicePath(), 1, 10);
+    path = new MeasurementPath("root.test.d1.s3", TSDataType.INT32);
+    memTable.delete(path, path.getDevicePath(), 1, 10);
+    Assert.assertFalse(originalWorkingList.isSorted());
+
+    AlignedReadOnlyMemChunk flushingQueryMemChunk =
+        (AlignedReadOnlyMemChunk)
+            resourcesByPathUtils.getReadOnlyMemChunkFromMemTable(
+                new QueryContext(2), memTable, new ArrayList<>(), 
Long.MAX_VALUE, null);
+    TVList flushingQueryList =
+        
flushingQueryMemChunk.getAligendTvListQueryMap().keySet().iterator().next();
+    Assert.assertNotSame(originalWorkingList, flushingQueryList);
+
+    flushingQueryMemChunk.sortTvLists();
+    Assert.assertFalse(originalWorkingList.isSorted());
+
+    firstQueryMemChunk.sortTvLists();
+    MemPointIterator memPointIterator =
+        firstQueryMemChunk.createMemPointIterator(Ordering.ASC, null);
+    while (memPointIterator.hasNextBatch()) {
+      memPointIterator.nextBatch();
+    }
+  }
+
   @Test
   public void memSeriesToStringTest() throws IOException {
     TSDataType dataType = TSDataType.INT32;

Reply via email to