This is an automated email from the ASF dual-hosted git repository.

shuwenwei pushed a commit to branch fixWriteFlushWithConcurrentQuery
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit 75bbe1ae9b36b7fad6072afafb3decfc535b1567
Author: shuwenwei <[email protected]>
AuthorDate: Tue Feb 10 10:21:51 2026 +0800

    Fixed concurrency issues caused by write and flush sorting during query 
execution
---
 .../memtable/AbstractWritableMemChunk.java         | 21 ++++++++-
 .../memtable/AlignedWritableMemChunk.java          | 11 ++---
 .../dataregion/memtable/WritableMemChunk.java      | 10 +----
 .../db/utils/datastructure/AlignedTVList.java      | 12 ++++++
 .../iotdb/db/utils/datastructure/BinaryTVList.java |  9 ++++
 .../db/utils/datastructure/BooleanTVList.java      |  9 ++++
 .../iotdb/db/utils/datastructure/DoubleTVList.java |  9 ++++
 .../iotdb/db/utils/datastructure/FloatTVList.java  |  9 ++++
 .../iotdb/db/utils/datastructure/IntTVList.java    |  9 ++++
 .../iotdb/db/utils/datastructure/LongTVList.java   |  9 ++++
 .../iotdb/db/utils/datastructure/TVList.java       |  2 +
 .../dataregion/memtable/PrimitiveMemTableTest.java | 50 ++++++++++++++++++++++
 12 files changed, 144 insertions(+), 16 deletions(-)

diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/AbstractWritableMemChunk.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/AbstractWritableMemChunk.java
index f33182fc680..fe494004529 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/AbstractWritableMemChunk.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/AbstractWritableMemChunk.java
@@ -47,6 +47,8 @@ public abstract class AbstractWritableMemChunk implements 
IWritableMemChunk {
   protected static long RETRY_INTERVAL_MS = 100L;
   protected static long MAX_WAIT_QUERY_MS = 60 * 1000L;
 
+  protected TVList listForFlushSort;
+
   /**
    * Release the TVList if there is no query on it. Otherwise, it should set 
the first query as the
    * owner. TVList is released until all queries finish. If it throws 
memory-not-enough exception
@@ -198,7 +200,24 @@ public abstract class AbstractWritableMemChunk implements 
IWritableMemChunk {
   public abstract IMeasurementSchema getSchema();
 
   @Override
-  public abstract void sortTvListForFlush();
+  public synchronized void sortTvListForFlush() {
+    TVList workingList = getWorkingTVList();
+    if (workingList.isSorted()) {
+      listForFlushSort = workingList;
+      return;
+    }
+
+    boolean needCloneTimesAndIndicesInWorkingTVList;
+    workingList.lockQueryList();
+    try {
+      needCloneTimesAndIndicesInWorkingTVList = 
!workingList.getQueryContextSet().isEmpty();
+    } finally {
+      workingList.unlockQueryList();
+    }
+    listForFlushSort =
+        needCloneTimesAndIndicesInWorkingTVList ? 
workingList.cloneForFlushSort() : workingList;
+    listForFlushSort.sort();
+  }
 
   @Override
   public abstract int delete(long lowerBound, long upperBound);
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/AlignedWritableMemChunk.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/AlignedWritableMemChunk.java
index 5fa0008ce75..498698f19ef 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/AlignedWritableMemChunk.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/AlignedWritableMemChunk.java
@@ -499,13 +499,6 @@ public class AlignedWritableMemChunk extends 
AbstractWritableMemChunk {
     return minTime;
   }
 
-  @Override
-  public synchronized void sortTvListForFlush() {
-    if (!list.isSorted()) {
-      list.sort();
-    }
-  }
-
   @Override
   public int delete(long lowerBound, long upperBound) {
     int deletedNumber = list.delete(lowerBound, upperBound);
@@ -557,6 +550,8 @@ public class AlignedWritableMemChunk extends 
AbstractWritableMemChunk {
       long maxNumberOfPointsInChunk,
       int maxNumberOfPointsInPage) {
     BitMap allValueColDeletedMap;
+    AlignedTVList list = (AlignedTVList) listForFlushSort;
+
     allValueColDeletedMap = ignoreAllNullRows ? 
list.getAllValueColDeletedMap() : null;
 
     boolean[] timeDuplicateInfo = null;
@@ -623,6 +618,7 @@ public class AlignedWritableMemChunk extends 
AbstractWritableMemChunk {
       boolean[] timeDuplicateInfo,
       BitMap allValueColDeletedMap,
       int maxNumberOfPointsInPage) {
+    AlignedTVList list = (AlignedTVList) listForFlushSort;
     List<TSDataType> dataTypes = list.getTsDataTypes();
     Pair<Long, Integer>[] lastValidPointIndexForTimeDupCheck = new 
Pair[dataTypes.size()];
     for (List<Integer> pageRange : chunkRange) {
@@ -754,6 +750,7 @@ public class AlignedWritableMemChunk extends 
AbstractWritableMemChunk {
   @Override
   public synchronized void encode(
       BlockingQueue<Object> ioTaskQueue, BatchEncodeInfo encodeInfo, long[] 
times) {
+    AlignedTVList list = (AlignedTVList) listForFlushSort;
     encodeInfo.maxNumberOfPointsInChunk =
         Math.min(
             encodeInfo.maxNumberOfPointsInChunk,
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/WritableMemChunk.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/WritableMemChunk.java
index 0256d5b16e5..0411690adb1 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/WritableMemChunk.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/WritableMemChunk.java
@@ -262,13 +262,6 @@ public class WritableMemChunk extends 
AbstractWritableMemChunk {
     throw new UnSupportedDataTypeException(UNSUPPORTED_TYPE + 
schema.getType());
   }
 
-  @Override
-  public synchronized void sortTvListForFlush() {
-    if (!list.isSorted()) {
-      list.sort();
-    }
-  }
-
   @Override
   public TVList getWorkingTVList() {
     return list;
@@ -391,6 +384,7 @@ public class WritableMemChunk extends 
AbstractWritableMemChunk {
   public void encodeWorkingTVList(
       BlockingQueue<Object> ioTaskQueue, long maxNumberOfPointsInChunk, long 
targetChunkSize) {
 
+    TVList list = listForFlushSort;
     TSDataType tsDataType = schema.getType();
     ChunkWriterImpl chunkWriterImpl = createIChunkWriter();
     long dataSizeInCurrentChunk = 0;
@@ -488,7 +482,7 @@ public class WritableMemChunk extends 
AbstractWritableMemChunk {
 
     // create MultiTvListIterator. It need not handle float/double precision 
here.
     List<TVList> tvLists = new ArrayList<>(sortedList);
-    tvLists.add(list);
+    tvLists.add(listForFlushSort);
     MemPointIterator timeValuePairIterator =
         MemPointIteratorFactory.create(
             schema.getType(), tvLists, encodeInfo.maxNumberOfPointsInPage);
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/AlignedTVList.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/AlignedTVList.java
index 8c732d4f08e..fdbebb70960 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/AlignedTVList.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/AlignedTVList.java
@@ -165,6 +165,18 @@ public abstract class AlignedTVList extends TVList {
     return alignedTvList;
   }
 
+  @Override
+  public AlignedTVList cloneForFlushSort() {
+    AlignedTVList cloneList = AlignedTVList.newAlignedList(new 
ArrayList<>(dataTypes));
+    cloneAs(cloneList);
+    cloneList.timeDeletedCnt = this.timeDeletedCnt;
+    cloneList.memoryBinaryChunkSize = this.memoryBinaryChunkSize;
+    cloneList.values = this.values;
+    cloneList.bitMaps = this.bitMaps;
+    cloneList.timeColDeletedMap = this.timeColDeletedMap;
+    return cloneList;
+  }
+
   @Override
   public synchronized AlignedTVList clone() {
     AlignedTVList cloneList = AlignedTVList.newAlignedList(new 
ArrayList<>(dataTypes));
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/BinaryTVList.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/BinaryTVList.java
index dc4ff5529d4..7899565b75c 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/BinaryTVList.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/BinaryTVList.java
@@ -63,6 +63,15 @@ public abstract class BinaryTVList extends TVList {
     }
   }
 
+  @Override
+  public TVList cloneForFlushSort() {
+    BinaryTVList cloneList = BinaryTVList.newList();
+    cloneAs(cloneList);
+    cloneList.bitMap = this.bitMap;
+    cloneList.values = this.values;
+    return cloneList;
+  }
+
   @Override
   public synchronized BinaryTVList clone() {
     BinaryTVList cloneList = BinaryTVList.newList();
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/BooleanTVList.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/BooleanTVList.java
index b8eb0e508bf..a4cc03401bb 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/BooleanTVList.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/BooleanTVList.java
@@ -62,6 +62,15 @@ public abstract class BooleanTVList extends TVList {
     }
   }
 
+  @Override
+  public TVList cloneForFlushSort() {
+    BooleanTVList cloneList = BooleanTVList.newList();
+    cloneAs(cloneList);
+    cloneList.bitMap = this.bitMap;
+    cloneList.values = this.values;
+    return cloneList;
+  }
+
   @Override
   public synchronized BooleanTVList clone() {
     BooleanTVList cloneList = BooleanTVList.newList();
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/DoubleTVList.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/DoubleTVList.java
index f61995ef062..9897e11db56 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/DoubleTVList.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/DoubleTVList.java
@@ -63,6 +63,15 @@ public abstract class DoubleTVList extends TVList {
     }
   }
 
+  @Override
+  public TVList cloneForFlushSort() {
+    DoubleTVList cloneList = DoubleTVList.newList();
+    cloneAs(cloneList);
+    cloneList.bitMap = this.bitMap;
+    cloneList.values = this.values;
+    return cloneList;
+  }
+
   @Override
   public synchronized DoubleTVList clone() {
     DoubleTVList cloneList = DoubleTVList.newList();
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/FloatTVList.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/FloatTVList.java
index 3623fa49a3e..857668cd3a8 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/FloatTVList.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/FloatTVList.java
@@ -63,6 +63,15 @@ public abstract class FloatTVList extends TVList {
     }
   }
 
+  @Override
+  public TVList cloneForFlushSort() {
+    FloatTVList cloneList = FloatTVList.newList();
+    cloneAs(cloneList);
+    cloneList.bitMap = this.bitMap;
+    cloneList.values = this.values;
+    return cloneList;
+  }
+
   @Override
   public synchronized FloatTVList clone() {
     FloatTVList cloneList = FloatTVList.newList();
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/IntTVList.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/IntTVList.java
index 0146ecf0e6b..172f0920468 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/IntTVList.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/IntTVList.java
@@ -62,6 +62,15 @@ public abstract class IntTVList extends TVList {
     }
   }
 
+  @Override
+  public TVList cloneForFlushSort() {
+    IntTVList cloneList = IntTVList.newList(dataType);
+    cloneAs(cloneList);
+    cloneList.bitMap = this.bitMap;
+    cloneList.values = this.values;
+    return cloneList;
+  }
+
   @Override
   public synchronized IntTVList clone() {
     IntTVList cloneList = IntTVList.newList(dataType);
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/LongTVList.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/LongTVList.java
index 7b4bd8d82d2..74789b1842f 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/LongTVList.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/LongTVList.java
@@ -62,6 +62,15 @@ public abstract class LongTVList extends TVList {
     }
   }
 
+  @Override
+  public TVList cloneForFlushSort() {
+    LongTVList cloneList = LongTVList.newList();
+    cloneAs(cloneList);
+    cloneList.bitMap = this.bitMap;
+    cloneList.values = this.values;
+    return cloneList;
+  }
+
   @Override
   public synchronized LongTVList clone() {
     LongTVList cloneList = LongTVList.newList();
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/TVList.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/TVList.java
index d61611c4cc5..b3bedd62e8f 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/TVList.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/TVList.java
@@ -520,6 +520,8 @@ public abstract class TVList implements WALEntryValue {
 
   protected abstract void expandValues();
 
+  public abstract TVList cloneForFlushSort();
+
   @Override
   public abstract TVList clone();
 
diff --git 
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/memtable/PrimitiveMemTableTest.java
 
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/memtable/PrimitiveMemTableTest.java
index b16e20d4f85..0412981433a 100644
--- 
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/memtable/PrimitiveMemTableTest.java
+++ 
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/memtable/PrimitiveMemTableTest.java
@@ -205,6 +205,56 @@ public class PrimitiveMemTableTest {
     }
   }
 
+  @Test
+  public void 
testWriteAndFlushSortDuringQuerySortTVListAndActualQueryExecution()
+      throws QueryProcessException, IOException, IllegalPathException {
+
+    PrimitiveMemTable memTable = new PrimitiveMemTable("root.test", "0");
+    List<IMeasurementSchema> measurementSchemas =
+        Arrays.asList(
+            new MeasurementSchema("s1", TSDataType.INT32),
+            new MeasurementSchema("s2", TSDataType.INT32),
+            new MeasurementSchema("s3", TSDataType.INT32));
+    for (int i = 1000; i < 2000; i++) {
+      memTable.writeAlignedRow(
+          new StringArrayDeviceID("root.test.d1"), measurementSchemas, i, new 
Object[] {i, i, i});
+    }
+    for (int i = 100; i < 200; i++) {
+      memTable.writeAlignedRow(
+          new StringArrayDeviceID("root.test.d1"), measurementSchemas, i, new 
Object[] {i, i, i});
+    }
+    memTable.delete(
+        new TreeDeletionEntry(new MeasurementPath("root.test.d1.s1", 
TSDataType.INT32), 150, 160));
+    memTable.delete(
+        new TreeDeletionEntry(new MeasurementPath("root.test.d1.s2", 
TSDataType.INT32), 150, 160));
+    memTable.delete(
+        new TreeDeletionEntry(new MeasurementPath("root.test.d1.s3", 
TSDataType.INT32), 150, 160));
+    ResourceByPathUtils resourcesByPathUtils =
+        ResourceByPathUtils.getResourceInstance(
+            new AlignedFullPath(
+                new StringArrayDeviceID("root.test.d1"),
+                Arrays.asList("s1", "s2", "s3"),
+                measurementSchemas));
+    ReadOnlyMemChunk readOnlyMemChunk =
+        resourcesByPathUtils.getReadOnlyMemChunkFromMemTable(
+            new QueryContext(1), memTable, null, Long.MAX_VALUE, null);
+
+    for (int i = 1; i <= 50; i++) {
+      memTable.writeAlignedRow(
+          new StringArrayDeviceID("root.test.d1"), measurementSchemas, i, new 
Object[] {i, i, i});
+    }
+    memTable.getWritableMemChunk(new StringArrayDeviceID("root.test.d1"), 
"s1").sortTvListForFlush();
+    memTable.getWritableMemChunk(new StringArrayDeviceID("root.test.d1"), 
"s2").sortTvListForFlush();
+    memTable.getWritableMemChunk(new StringArrayDeviceID("root.test.d1"), 
"s3").sortTvListForFlush();
+
+    readOnlyMemChunk.sortTvLists();
+
+    MemPointIterator memPointIterator = 
readOnlyMemChunk.createMemPointIterator(Ordering.ASC, null);
+    while (memPointIterator.hasNextBatch()) {
+      memPointIterator.nextBatch();
+    }
+  }
+
   @Test
   public void memSeriesToStringTest() throws IOException {
     TSDataType dataType = TSDataType.INT32;

Reply via email to