This is an automated email from the ASF dual-hosted git repository.

jackietien pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/master by this push:
     new 15a0a7288cd Fixed concurrency issues caused by write and flush sorting 
during query execution (#17193)
15a0a7288cd is described below

commit 15a0a7288cd2de3b89d6ec173a39cf9a65c08fce
Author: shuwenwei <[email protected]>
AuthorDate: Wed Feb 11 18:17:08 2026 +0800

    Fixed concurrency issues caused by write and flush sorting during query 
execution (#17193)
---
 .../dataregion/flush/MemTableFlushTask.java        |  1 +
 .../memtable/AbstractWritableMemChunk.java         | 43 +++++++++-
 .../memtable/AlignedWritableMemChunk.java          | 94 +++++++++++++---------
 .../dataregion/memtable/IWritableMemChunk.java     |  2 +
 .../dataregion/memtable/WritableMemChunk.java      | 38 ++++-----
 .../db/utils/datastructure/AlignedTVList.java      | 12 +++
 .../iotdb/db/utils/datastructure/BinaryTVList.java |  9 +++
 .../db/utils/datastructure/BooleanTVList.java      |  9 +++
 .../iotdb/db/utils/datastructure/DoubleTVList.java |  9 +++
 .../iotdb/db/utils/datastructure/FloatTVList.java  |  9 +++
 .../iotdb/db/utils/datastructure/IntTVList.java    |  9 +++
 .../iotdb/db/utils/datastructure/LongTVList.java   |  9 +++
 .../iotdb/db/utils/datastructure/TVList.java       |  2 +
 .../dataregion/memtable/PrimitiveMemTableTest.java | 48 +++++++++++
 14 files changed, 236 insertions(+), 58 deletions(-)

diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/flush/MemTableFlushTask.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/flush/MemTableFlushTask.java
index 6977167f0e6..ffcbb97ffd1 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/flush/MemTableFlushTask.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/flush/MemTableFlushTask.java
@@ -272,6 +272,7 @@ public class MemTableFlushTask {
                 times = new long[MAX_NUMBER_OF_POINTS_IN_PAGE];
               }
               writableMemChunk.encode(ioTaskQueue, encodeInfo, times);
+              writableMemChunk.releaseTemporaryTvListForFlush();
               long subTaskTime = System.currentTimeMillis() - starTime;
               
WRITING_METRICS.recordFlushSubTaskCost(WritingMetrics.ENCODING_TASK, 
subTaskTime);
               memSerializeTime += subTaskTime;
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/AbstractWritableMemChunk.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/AbstractWritableMemChunk.java
index f33182fc680..7a3233d58b6 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/AbstractWritableMemChunk.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/AbstractWritableMemChunk.java
@@ -47,6 +47,8 @@ public abstract class AbstractWritableMemChunk implements 
IWritableMemChunk {
   protected static long RETRY_INTERVAL_MS = 100L;
   protected static long MAX_WAIT_QUERY_MS = 60 * 1000L;
 
+  protected TVList workingListForFlush;
+
   /**
    * Release the TVList if there is no query on it. Otherwise, it should set 
the first query as the
    * owner. TVList is released until all queries finish. If it throws 
memory-not-enough exception
@@ -198,7 +200,46 @@ public abstract class AbstractWritableMemChunk implements 
IWritableMemChunk {
   public abstract IMeasurementSchema getSchema();
 
   @Override
-  public abstract void sortTvListForFlush();
+  public void sortTvListForFlush() {
+    TVList workingList = getWorkingTVList();
+    if (workingList.isSorted()) {
+      workingListForFlush = workingList;
+      return;
+    }
+
+    /*
+     * Concurrency background:
+     *
+     * A query may start earlier and record the current row count (rows) of 
the TVList as its visible range.
+     *  After that, new unseq writes may arrive and immediately trigger a 
flush, which will sort the TVList.
+     *
+     * During sorting, the underlying indices array of the TVList may be 
reordered.
+     * If the query continues to use the previously recorded rows as its upper 
bound,
+     * it may convert a logical index to a physical index via the updated 
indices array.
+     *
+     * In this case, the converted physical index may exceed the previously 
visible
+     * rows range, leading to invalid access or unexpected behavior.
+     *
+     * To avoid this issue, when there are active queries on the working 
TVList, we must
+     * clone the times and indices before sorting, so that the flush sort does 
not mutate
+     * the data structures that concurrent queries rely on.
+     */
+    boolean needCloneTimesAndIndicesInWorkingTVList;
+    workingList.lockQueryList();
+    try {
+      needCloneTimesAndIndicesInWorkingTVList = 
!workingList.getQueryContextSet().isEmpty();
+    } finally {
+      workingList.unlockQueryList();
+    }
+    workingListForFlush =
+        needCloneTimesAndIndicesInWorkingTVList ? 
workingList.cloneForFlushSort() : workingList;
+    workingListForFlush.sort();
+  }
+
+  @Override
+  public void releaseTemporaryTvListForFlush() {
+    workingListForFlush = null;
+  }
 
   @Override
   public abstract int delete(long lowerBound, long upperBound);
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/AlignedWritableMemChunk.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/AlignedWritableMemChunk.java
index 5fa0008ce75..86878ea8dc9 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/AlignedWritableMemChunk.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/AlignedWritableMemChunk.java
@@ -68,6 +68,7 @@ public class AlignedWritableMemChunk extends 
AbstractWritableMemChunk {
   private Map<String, Integer> measurementIndexMap;
   private List<TSDataType> dataTypes;
   private final List<IMeasurementSchema> schemaList;
+  // Note: Use AbstractWritableMemChunk.workingListForFlush instead of list in 
FlushTask
   private AlignedTVList list;
   private List<AlignedTVList> sortedList;
   private long sortedRowCount = 0;
@@ -499,13 +500,6 @@ public class AlignedWritableMemChunk extends 
AbstractWritableMemChunk {
     return minTime;
   }
 
-  @Override
-  public synchronized void sortTvListForFlush() {
-    if (!list.isSorted()) {
-      list.sort();
-    }
-  }
-
   @Override
   public int delete(long lowerBound, long upperBound) {
     int deletedNumber = list.delete(lowerBound, upperBound);
@@ -557,7 +551,10 @@ public class AlignedWritableMemChunk extends 
AbstractWritableMemChunk {
       long maxNumberOfPointsInChunk,
       int maxNumberOfPointsInPage) {
     BitMap allValueColDeletedMap;
-    allValueColDeletedMap = ignoreAllNullRows ? 
list.getAllValueColDeletedMap() : null;
+    AlignedTVList alignedWorkingListForFlush = (AlignedTVList) 
workingListForFlush;
+
+    allValueColDeletedMap =
+        ignoreAllNullRows ? 
alignedWorkingListForFlush.getAllValueColDeletedMap() : null;
 
     boolean[] timeDuplicateInfo = null;
 
@@ -569,8 +566,10 @@ public class AlignedWritableMemChunk extends 
AbstractWritableMemChunk {
     int pointNumInPage = 0;
     int pointNumInChunk = 0;
 
-    for (int sortedRowIndex = 0; sortedRowIndex < list.rowCount(); 
sortedRowIndex++) {
-      long time = list.getTime(sortedRowIndex);
+    for (int sortedRowIndex = 0;
+        sortedRowIndex < alignedWorkingListForFlush.rowCount();
+        sortedRowIndex++) {
+      long time = alignedWorkingListForFlush.getTime(sortedRowIndex);
       if (pointNumInPage == 0) {
         pageRange.add(sortedRowIndex);
       }
@@ -591,15 +590,17 @@ public class AlignedWritableMemChunk extends 
AbstractWritableMemChunk {
       }
 
       int nextRowIndex = sortedRowIndex + 1;
-      while (nextRowIndex < list.rowCount()
+      while (nextRowIndex < alignedWorkingListForFlush.rowCount()
           && ((allValueColDeletedMap != null
-                  && 
allValueColDeletedMap.isMarked(list.getValueIndex(nextRowIndex)))
-              || list.isTimeDeleted(nextRowIndex))) {
+                  && allValueColDeletedMap.isMarked(
+                      alignedWorkingListForFlush.getValueIndex(nextRowIndex)))
+              || alignedWorkingListForFlush.isTimeDeleted(nextRowIndex))) {
         nextRowIndex++;
       }
-      if (nextRowIndex != list.rowCount() && time == 
list.getTime(nextRowIndex)) {
+      if (nextRowIndex != alignedWorkingListForFlush.rowCount()
+          && time == alignedWorkingListForFlush.getTime(nextRowIndex)) {
         if (Objects.isNull(timeDuplicateInfo)) {
-          timeDuplicateInfo = new boolean[list.rowCount()];
+          timeDuplicateInfo = new 
boolean[alignedWorkingListForFlush.rowCount()];
         }
         timeDuplicateInfo[sortedRowIndex] = true;
       }
@@ -607,7 +608,7 @@ public class AlignedWritableMemChunk extends 
AbstractWritableMemChunk {
     }
 
     if (pointNumInPage != 0) {
-      pageRange.add(list.rowCount() - 1);
+      pageRange.add(alignedWorkingListForFlush.rowCount() - 1);
     }
     if (pointNumInChunk != 0) {
       chunkRange.add(pageRange);
@@ -623,7 +624,8 @@ public class AlignedWritableMemChunk extends 
AbstractWritableMemChunk {
       boolean[] timeDuplicateInfo,
       BitMap allValueColDeletedMap,
       int maxNumberOfPointsInPage) {
-    List<TSDataType> dataTypes = list.getTsDataTypes();
+    AlignedTVList alignedWorkingListForFlush = (AlignedTVList) 
workingListForFlush;
+    List<TSDataType> dataTypes = alignedWorkingListForFlush.getTsDataTypes();
     Pair<Long, Integer>[] lastValidPointIndexForTimeDupCheck = new 
Pair[dataTypes.size()];
     for (List<Integer> pageRange : chunkRange) {
       AlignedChunkWriterImpl alignedChunkWriter =
@@ -641,16 +643,18 @@ public class AlignedWritableMemChunk extends 
AbstractWritableMemChunk {
               sortedRowIndex++) {
             // skip empty row
             if (allValueColDeletedMap != null
-                && 
allValueColDeletedMap.isMarked(list.getValueIndex(sortedRowIndex))) {
+                && allValueColDeletedMap.isMarked(
+                    alignedWorkingListForFlush.getValueIndex(sortedRowIndex))) 
{
               continue;
             }
             // skip time duplicated rows
-            long time = list.getTime(sortedRowIndex);
+            long time = alignedWorkingListForFlush.getTime(sortedRowIndex);
             if (Objects.nonNull(timeDuplicateInfo)) {
-              if (!list.isNullValue(list.getValueIndex(sortedRowIndex), 
columnIndex)) {
+              if (!alignedWorkingListForFlush.isNullValue(
+                  alignedWorkingListForFlush.getValueIndex(sortedRowIndex), 
columnIndex)) {
                 lastValidPointIndexForTimeDupCheck[columnIndex].left = time;
                 lastValidPointIndexForTimeDupCheck[columnIndex].right =
-                    list.getValueIndex(sortedRowIndex);
+                    alignedWorkingListForFlush.getValueIndex(sortedRowIndex);
               }
               if (timeDuplicateInfo[sortedRowIndex]) {
                 continue;
@@ -671,41 +675,55 @@ public class AlignedWritableMemChunk extends 
AbstractWritableMemChunk {
                 && (time == 
lastValidPointIndexForTimeDupCheck[columnIndex].left)) {
               originRowIndex = 
lastValidPointIndexForTimeDupCheck[columnIndex].right;
             } else {
-              originRowIndex = list.getValueIndex(sortedRowIndex);
+              originRowIndex = 
alignedWorkingListForFlush.getValueIndex(sortedRowIndex);
             }
 
-            boolean isNull = list.isNullValue(originRowIndex, columnIndex);
+            boolean isNull = 
alignedWorkingListForFlush.isNullValue(originRowIndex, columnIndex);
             switch (tsDataType) {
               case BOOLEAN:
                 alignedChunkWriter.writeByColumn(
                     time,
-                    !isNull && list.getBooleanByValueIndex(originRowIndex, 
columnIndex),
+                    !isNull
+                        && alignedWorkingListForFlush.getBooleanByValueIndex(
+                            originRowIndex, columnIndex),
                     isNull);
                 break;
               case INT32:
               case DATE:
                 alignedChunkWriter.writeByColumn(
                     time,
-                    isNull ? 0 : list.getIntByValueIndex(originRowIndex, 
columnIndex),
+                    isNull
+                        ? 0
+                        : alignedWorkingListForFlush.getIntByValueIndex(
+                            originRowIndex, columnIndex),
                     isNull);
                 break;
               case INT64:
               case TIMESTAMP:
                 alignedChunkWriter.writeByColumn(
                     time,
-                    isNull ? 0 : list.getLongByValueIndex(originRowIndex, 
columnIndex),
+                    isNull
+                        ? 0
+                        : alignedWorkingListForFlush.getLongByValueIndex(
+                            originRowIndex, columnIndex),
                     isNull);
                 break;
               case FLOAT:
                 alignedChunkWriter.writeByColumn(
                     time,
-                    isNull ? 0 : list.getFloatByValueIndex(originRowIndex, 
columnIndex),
+                    isNull
+                        ? 0
+                        : alignedWorkingListForFlush.getFloatByValueIndex(
+                            originRowIndex, columnIndex),
                     isNull);
                 break;
               case DOUBLE:
                 alignedChunkWriter.writeByColumn(
                     time,
-                    isNull ? 0 : list.getDoubleByValueIndex(originRowIndex, 
columnIndex),
+                    isNull
+                        ? 0
+                        : alignedWorkingListForFlush.getDoubleByValueIndex(
+                            originRowIndex, columnIndex),
                     isNull);
                 break;
               case TEXT:
@@ -714,7 +732,10 @@ public class AlignedWritableMemChunk extends 
AbstractWritableMemChunk {
               case OBJECT:
                 alignedChunkWriter.writeByColumn(
                     time,
-                    isNull ? null : list.getBinaryByValueIndex(originRowIndex, 
columnIndex),
+                    isNull
+                        ? null
+                        : alignedWorkingListForFlush.getBinaryByValueIndex(
+                            originRowIndex, columnIndex),
                     isNull);
                 break;
               default:
@@ -724,19 +745,21 @@ public class AlignedWritableMemChunk extends 
AbstractWritableMemChunk {
           alignedChunkWriter.nextColumn();
         }
 
-        long[] times = new long[Math.min(maxNumberOfPointsInPage, 
list.rowCount())];
+        long[] times =
+            new long[Math.min(maxNumberOfPointsInPage, 
alignedWorkingListForFlush.rowCount())];
         int pointsInPage = 0;
         for (int sortedRowIndex = pageRange.get(pageNum * 2);
             sortedRowIndex <= pageRange.get(pageNum * 2 + 1);
             sortedRowIndex++) {
           // skip empty row
           if (((allValueColDeletedMap != null
-                  && 
allValueColDeletedMap.isMarked(list.getValueIndex(sortedRowIndex)))
-              || (list.isTimeDeleted(sortedRowIndex)))) {
+                  && allValueColDeletedMap.isMarked(
+                      
alignedWorkingListForFlush.getValueIndex(sortedRowIndex)))
+              || (alignedWorkingListForFlush.isTimeDeleted(sortedRowIndex)))) {
             continue;
           }
           if (Objects.isNull(timeDuplicateInfo) || 
!timeDuplicateInfo[sortedRowIndex]) {
-            times[pointsInPage++] = list.getTime(sortedRowIndex);
+            times[pointsInPage++] = 
alignedWorkingListForFlush.getTime(sortedRowIndex);
           }
         }
         alignedChunkWriter.write(times, pointsInPage, 0);
@@ -752,8 +775,7 @@ public class AlignedWritableMemChunk extends 
AbstractWritableMemChunk {
   }
 
   @Override
-  public synchronized void encode(
-      BlockingQueue<Object> ioTaskQueue, BatchEncodeInfo encodeInfo, long[] 
times) {
+  public void encode(BlockingQueue<Object> ioTaskQueue, BatchEncodeInfo 
encodeInfo, long[] times) {
     encodeInfo.maxNumberOfPointsInChunk =
         Math.min(
             encodeInfo.maxNumberOfPointsInChunk,
@@ -770,7 +792,7 @@ public class AlignedWritableMemChunk extends 
AbstractWritableMemChunk {
 
     // create MergeSortAlignedTVListIterator.
     List<AlignedTVList> alignedTvLists = new ArrayList<>(sortedList);
-    alignedTvLists.add(list);
+    alignedTvLists.add((AlignedTVList) workingListForFlush);
     List<Integer> columnIndexList = buildColumnIndexList(schemaList);
     MemPointIterator timeValuePairIterator =
         MemPointIteratorFactory.create(
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/IWritableMemChunk.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/IWritableMemChunk.java
index 635e06da4b7..af55d7df9d1 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/IWritableMemChunk.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/IWritableMemChunk.java
@@ -98,6 +98,8 @@ public interface IWritableMemChunk extends WALEntryValue {
    */
   void sortTvListForFlush();
 
+  void releaseTemporaryTvListForFlush();
+
   default long getMaxTime() {
     return Long.MAX_VALUE;
   }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/WritableMemChunk.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/WritableMemChunk.java
index 0256d5b16e5..571a371c849 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/WritableMemChunk.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/WritableMemChunk.java
@@ -59,6 +59,7 @@ import static 
org.apache.iotdb.db.utils.MemUtils.getBinarySize;
 public class WritableMemChunk extends AbstractWritableMemChunk {
 
   private IMeasurementSchema schema;
+  // Note: Use AbstractWritableMemChunk.workingListForFlush instead of list in 
FlushTask
   private TVList list;
   private List<TVList> sortedList;
   private long sortedRowCount = 0;
@@ -262,13 +263,6 @@ public class WritableMemChunk extends 
AbstractWritableMemChunk {
     throw new UnSupportedDataTypeException(UNSUPPORTED_TYPE + 
schema.getType());
   }
 
-  @Override
-  public synchronized void sortTvListForFlush() {
-    if (!list.isSorted()) {
-      list.sort();
-    }
-  }
-
   @Override
   public TVList getWorkingTVList() {
     return list;
@@ -395,50 +389,53 @@ public class WritableMemChunk extends 
AbstractWritableMemChunk {
     ChunkWriterImpl chunkWriterImpl = createIChunkWriter();
     long dataSizeInCurrentChunk = 0;
     int pointNumInCurrentChunk = 0;
-    for (int sortedRowIndex = 0; sortedRowIndex < list.rowCount(); 
sortedRowIndex++) {
-      if (list.isNullValue(list.getValueIndex(sortedRowIndex))) {
+    for (int sortedRowIndex = 0;
+        sortedRowIndex < workingListForFlush.rowCount();
+        sortedRowIndex++) {
+      if 
(workingListForFlush.isNullValue(workingListForFlush.getValueIndex(sortedRowIndex)))
 {
         continue;
       }
 
-      long time = list.getTime(sortedRowIndex);
+      long time = workingListForFlush.getTime(sortedRowIndex);
 
       // skip duplicated data
-      if ((sortedRowIndex + 1 < list.rowCount() && (time == 
list.getTime(sortedRowIndex + 1)))) {
+      if ((sortedRowIndex + 1 < workingListForFlush.rowCount()
+          && (time == workingListForFlush.getTime(sortedRowIndex + 1)))) {
         continue;
       }
 
       // store last point for SDT
-      if (sortedRowIndex + 1 == list.rowCount()) {
+      if (sortedRowIndex + 1 == workingListForFlush.rowCount()) {
         chunkWriterImpl.setLastPoint(true);
       }
 
       switch (tsDataType) {
         case BOOLEAN:
-          chunkWriterImpl.write(time, list.getBoolean(sortedRowIndex));
+          chunkWriterImpl.write(time, 
workingListForFlush.getBoolean(sortedRowIndex));
           dataSizeInCurrentChunk += 8L + 1L;
           break;
         case INT32:
         case DATE:
-          chunkWriterImpl.write(time, list.getInt(sortedRowIndex));
+          chunkWriterImpl.write(time, 
workingListForFlush.getInt(sortedRowIndex));
           dataSizeInCurrentChunk += 8L + 4L;
           break;
         case INT64:
         case TIMESTAMP:
-          chunkWriterImpl.write(time, list.getLong(sortedRowIndex));
+          chunkWriterImpl.write(time, 
workingListForFlush.getLong(sortedRowIndex));
           dataSizeInCurrentChunk += 8L + 8L;
           break;
         case FLOAT:
-          chunkWriterImpl.write(time, list.getFloat(sortedRowIndex));
+          chunkWriterImpl.write(time, 
workingListForFlush.getFloat(sortedRowIndex));
           dataSizeInCurrentChunk += 8L + 4L;
           break;
         case DOUBLE:
-          chunkWriterImpl.write(time, list.getDouble(sortedRowIndex));
+          chunkWriterImpl.write(time, 
workingListForFlush.getDouble(sortedRowIndex));
           dataSizeInCurrentChunk += 8L + 8L;
           break;
         case TEXT:
         case BLOB:
         case STRING:
-          Binary value = list.getBinary(sortedRowIndex);
+          Binary value = workingListForFlush.getBinary(sortedRowIndex);
           chunkWriterImpl.write(time, value);
           dataSizeInCurrentChunk += 8L + getBinarySize(value);
           break;
@@ -473,8 +470,7 @@ public class WritableMemChunk extends 
AbstractWritableMemChunk {
   }
 
   @Override
-  public synchronized void encode(
-      BlockingQueue<Object> ioTaskQueue, BatchEncodeInfo encodeInfo, long[] 
times) {
+  public void encode(BlockingQueue<Object> ioTaskQueue, BatchEncodeInfo 
encodeInfo, long[] times) {
     if (TVLIST_SORT_THRESHOLD == 0) {
       encodeWorkingTVList(
           ioTaskQueue, encodeInfo.maxNumberOfPointsInChunk, 
encodeInfo.targetChunkSize);
@@ -488,7 +484,7 @@ public class WritableMemChunk extends 
AbstractWritableMemChunk {
 
     // create MultiTvListIterator. It need not handle float/double precision 
here.
     List<TVList> tvLists = new ArrayList<>(sortedList);
-    tvLists.add(list);
+    tvLists.add(workingListForFlush);
     MemPointIterator timeValuePairIterator =
         MemPointIteratorFactory.create(
             schema.getType(), tvLists, encodeInfo.maxNumberOfPointsInPage);
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/AlignedTVList.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/AlignedTVList.java
index 8c732d4f08e..2c87fdb32ab 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/AlignedTVList.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/AlignedTVList.java
@@ -165,6 +165,18 @@ public abstract class AlignedTVList extends TVList {
     return alignedTvList;
   }
 
+  @Override
+  public synchronized AlignedTVList cloneForFlushSort() {
+    AlignedTVList cloneList = AlignedTVList.newAlignedList(new 
ArrayList<>(dataTypes));
+    cloneAs(cloneList);
+    cloneList.timeDeletedCnt = this.timeDeletedCnt;
+    cloneList.memoryBinaryChunkSize = this.memoryBinaryChunkSize;
+    cloneList.values = this.values;
+    cloneList.bitMaps = this.bitMaps;
+    cloneList.timeColDeletedMap = this.timeColDeletedMap;
+    return cloneList;
+  }
+
   @Override
   public synchronized AlignedTVList clone() {
     AlignedTVList cloneList = AlignedTVList.newAlignedList(new 
ArrayList<>(dataTypes));
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/BinaryTVList.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/BinaryTVList.java
index dc4ff5529d4..b274bc8ef65 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/BinaryTVList.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/BinaryTVList.java
@@ -63,6 +63,15 @@ public abstract class BinaryTVList extends TVList {
     }
   }
 
+  @Override
+  public synchronized TVList cloneForFlushSort() {
+    BinaryTVList cloneList = BinaryTVList.newList();
+    cloneAs(cloneList);
+    cloneList.bitMap = this.bitMap;
+    cloneList.values = this.values;
+    return cloneList;
+  }
+
   @Override
   public synchronized BinaryTVList clone() {
     BinaryTVList cloneList = BinaryTVList.newList();
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/BooleanTVList.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/BooleanTVList.java
index b8eb0e508bf..5e3461acb2c 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/BooleanTVList.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/BooleanTVList.java
@@ -62,6 +62,15 @@ public abstract class BooleanTVList extends TVList {
     }
   }
 
+  @Override
+  public synchronized TVList cloneForFlushSort() {
+    BooleanTVList cloneList = BooleanTVList.newList();
+    cloneAs(cloneList);
+    cloneList.bitMap = this.bitMap;
+    cloneList.values = this.values;
+    return cloneList;
+  }
+
   @Override
   public synchronized BooleanTVList clone() {
     BooleanTVList cloneList = BooleanTVList.newList();
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/DoubleTVList.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/DoubleTVList.java
index f61995ef062..753ca2020a8 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/DoubleTVList.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/DoubleTVList.java
@@ -63,6 +63,15 @@ public abstract class DoubleTVList extends TVList {
     }
   }
 
+  @Override
+  public synchronized TVList cloneForFlushSort() {
+    DoubleTVList cloneList = DoubleTVList.newList();
+    cloneAs(cloneList);
+    cloneList.bitMap = this.bitMap;
+    cloneList.values = this.values;
+    return cloneList;
+  }
+
   @Override
   public synchronized DoubleTVList clone() {
     DoubleTVList cloneList = DoubleTVList.newList();
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/FloatTVList.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/FloatTVList.java
index 3623fa49a3e..517dc208211 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/FloatTVList.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/FloatTVList.java
@@ -63,6 +63,15 @@ public abstract class FloatTVList extends TVList {
     }
   }
 
+  @Override
+  public synchronized TVList cloneForFlushSort() {
+    FloatTVList cloneList = FloatTVList.newList();
+    cloneAs(cloneList);
+    cloneList.bitMap = this.bitMap;
+    cloneList.values = this.values;
+    return cloneList;
+  }
+
   @Override
   public synchronized FloatTVList clone() {
     FloatTVList cloneList = FloatTVList.newList();
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/IntTVList.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/IntTVList.java
index 0146ecf0e6b..a51e785d414 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/IntTVList.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/IntTVList.java
@@ -62,6 +62,15 @@ public abstract class IntTVList extends TVList {
     }
   }
 
+  @Override
+  public synchronized TVList cloneForFlushSort() {
+    IntTVList cloneList = IntTVList.newList(dataType);
+    cloneAs(cloneList);
+    cloneList.bitMap = this.bitMap;
+    cloneList.values = this.values;
+    return cloneList;
+  }
+
   @Override
   public synchronized IntTVList clone() {
     IntTVList cloneList = IntTVList.newList(dataType);
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/LongTVList.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/LongTVList.java
index 7b4bd8d82d2..544d2cb7dd0 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/LongTVList.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/LongTVList.java
@@ -62,6 +62,15 @@ public abstract class LongTVList extends TVList {
     }
   }
 
+  @Override
+  public synchronized TVList cloneForFlushSort() {
+    LongTVList cloneList = LongTVList.newList();
+    cloneAs(cloneList);
+    cloneList.bitMap = this.bitMap;
+    cloneList.values = this.values;
+    return cloneList;
+  }
+
   @Override
   public synchronized LongTVList clone() {
     LongTVList cloneList = LongTVList.newList();
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/TVList.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/TVList.java
index d61611c4cc5..b3bedd62e8f 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/TVList.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/TVList.java
@@ -520,6 +520,8 @@ public abstract class TVList implements WALEntryValue {
 
   protected abstract void expandValues();
 
+  public abstract TVList cloneForFlushSort();
+
   @Override
   public abstract TVList clone();
 
diff --git 
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/memtable/PrimitiveMemTableTest.java
 
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/memtable/PrimitiveMemTableTest.java
index 7e77abca29e..169dd11042f 100644
--- 
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/memtable/PrimitiveMemTableTest.java
+++ 
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/memtable/PrimitiveMemTableTest.java
@@ -205,6 +205,54 @@ public class PrimitiveMemTableTest {
     }
   }
 
+  @Test
+  public void 
testWriteAndFlushSortDuringQuerySortTVListAndActualQueryExecution()
+      throws QueryProcessException, IOException, IllegalPathException {
+
+    PrimitiveMemTable memTable = new PrimitiveMemTable("root.test", "0");
+    List<IMeasurementSchema> measurementSchemas =
+        Arrays.asList(
+            new MeasurementSchema("s1", TSDataType.INT32),
+            new MeasurementSchema("s2", TSDataType.INT32),
+            new MeasurementSchema("s3", TSDataType.INT32));
+    for (int i = 1000; i < 2000; i++) {
+      memTable.writeAlignedRow(
+          new StringArrayDeviceID("root.test.d1"), measurementSchemas, i, new 
Object[] {i, i, i});
+    }
+    for (int i = 100; i < 200; i++) {
+      memTable.writeAlignedRow(
+          new StringArrayDeviceID("root.test.d1"), measurementSchemas, i, new 
Object[] {i, i, i});
+    }
+    memTable.delete(
+        new TreeDeletionEntry(new MeasurementPath("root.test.d1.s1", 
TSDataType.INT32), 150, 160));
+    memTable.delete(
+        new TreeDeletionEntry(new MeasurementPath("root.test.d1.s2", 
TSDataType.INT32), 150, 160));
+    memTable.delete(
+        new TreeDeletionEntry(new MeasurementPath("root.test.d1.s3", 
TSDataType.INT32), 150, 160));
+    ResourceByPathUtils resourcesByPathUtils =
+        ResourceByPathUtils.getResourceInstance(
+            new AlignedFullPath(
+                new StringArrayDeviceID("root.test.d1"),
+                Arrays.asList("s1", "s2", "s3"),
+                measurementSchemas));
+    ReadOnlyMemChunk readOnlyMemChunk =
+        resourcesByPathUtils.getReadOnlyMemChunkFromMemTable(
+            new QueryContext(1, false), memTable, null, Long.MAX_VALUE, null);
+
+    for (int i = 1; i <= 50; i++) {
+      memTable.writeAlignedRow(
+          new StringArrayDeviceID("root.test.d1"), measurementSchemas, i, new 
Object[] {i, i, i});
+    }
+    memTable.getWritableMemChunk(new StringArrayDeviceID("root.test.d1"), 
"").sortTvListForFlush();
+
+    readOnlyMemChunk.sortTvLists();
+
+    MemPointIterator memPointIterator = 
readOnlyMemChunk.createMemPointIterator(Ordering.ASC, null);
+    while (memPointIterator.hasNextBatch()) {
+      memPointIterator.nextBatch();
+    }
+  }
+
   @Test
   public void memSeriesToStringTest() throws IOException {
     TSDataType dataType = TSDataType.INT32;

Reply via email to