This is an automated email from the ASF dual-hosted git repository.
jackietien pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new 15a0a7288cd Fixed concurrency issues caused by write and flush sorting
during query execution (#17193)
15a0a7288cd is described below
commit 15a0a7288cd2de3b89d6ec173a39cf9a65c08fce
Author: shuwenwei <[email protected]>
AuthorDate: Wed Feb 11 18:17:08 2026 +0800
Fixed concurrency issues caused by write and flush sorting during query
execution (#17193)
---
.../dataregion/flush/MemTableFlushTask.java | 1 +
.../memtable/AbstractWritableMemChunk.java | 43 +++++++++-
.../memtable/AlignedWritableMemChunk.java | 94 +++++++++++++---------
.../dataregion/memtable/IWritableMemChunk.java | 2 +
.../dataregion/memtable/WritableMemChunk.java | 38 ++++-----
.../db/utils/datastructure/AlignedTVList.java | 12 +++
.../iotdb/db/utils/datastructure/BinaryTVList.java | 9 +++
.../db/utils/datastructure/BooleanTVList.java | 9 +++
.../iotdb/db/utils/datastructure/DoubleTVList.java | 9 +++
.../iotdb/db/utils/datastructure/FloatTVList.java | 9 +++
.../iotdb/db/utils/datastructure/IntTVList.java | 9 +++
.../iotdb/db/utils/datastructure/LongTVList.java | 9 +++
.../iotdb/db/utils/datastructure/TVList.java | 2 +
.../dataregion/memtable/PrimitiveMemTableTest.java | 48 +++++++++++
14 files changed, 236 insertions(+), 58 deletions(-)
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/flush/MemTableFlushTask.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/flush/MemTableFlushTask.java
index 6977167f0e6..ffcbb97ffd1 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/flush/MemTableFlushTask.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/flush/MemTableFlushTask.java
@@ -272,6 +272,7 @@ public class MemTableFlushTask {
times = new long[MAX_NUMBER_OF_POINTS_IN_PAGE];
}
writableMemChunk.encode(ioTaskQueue, encodeInfo, times);
+ writableMemChunk.releaseTemporaryTvListForFlush();
long subTaskTime = System.currentTimeMillis() - starTime;
WRITING_METRICS.recordFlushSubTaskCost(WritingMetrics.ENCODING_TASK,
subTaskTime);
memSerializeTime += subTaskTime;
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/AbstractWritableMemChunk.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/AbstractWritableMemChunk.java
index f33182fc680..7a3233d58b6 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/AbstractWritableMemChunk.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/AbstractWritableMemChunk.java
@@ -47,6 +47,8 @@ public abstract class AbstractWritableMemChunk implements
IWritableMemChunk {
protected static long RETRY_INTERVAL_MS = 100L;
protected static long MAX_WAIT_QUERY_MS = 60 * 1000L;
+ protected TVList workingListForFlush;
+
/**
* Release the TVList if there is no query on it. Otherwise, it should set
the first query as the
* owner. TVList is released until all queries finish. If it throws
memory-not-enough exception
@@ -198,7 +200,46 @@ public abstract class AbstractWritableMemChunk implements
IWritableMemChunk {
public abstract IMeasurementSchema getSchema();
@Override
- public abstract void sortTvListForFlush();
+ public void sortTvListForFlush() {
+ TVList workingList = getWorkingTVList();
+ if (workingList.isSorted()) {
+ workingListForFlush = workingList;
+ return;
+ }
+
+ /*
+ * Concurrency background:
+ *
+ * A query may start earlier and record the current row count (rows) of
the TVList as its visible range.
+ * After that, new unseq writes may arrive and immediately trigger a
flush, which will sort the TVList.
+ *
+ * During sorting, the underlying indices array of the TVList may be
reordered.
+ * If the query continues to use the previously recorded rows as its upper
bound,
+ * it may convert a logical index to a physical index via the updated
indices array.
+ *
+ * In this case, the converted physical index may exceed the previously
visible
+ * rows range, leading to invalid access or unexpected behavior.
+ *
+ * To avoid this issue, when there are active queries on the working
TVList, we must
+ * clone the times and indices before sorting, so that the flush sort does
not mutate
+ * the data structures that concurrent queries rely on.
+ */
+ boolean needCloneTimesAndIndicesInWorkingTVList;
+ workingList.lockQueryList();
+ try {
+ needCloneTimesAndIndicesInWorkingTVList =
!workingList.getQueryContextSet().isEmpty();
+ } finally {
+ workingList.unlockQueryList();
+ }
+ workingListForFlush =
+ needCloneTimesAndIndicesInWorkingTVList ?
workingList.cloneForFlushSort() : workingList;
+ workingListForFlush.sort();
+ }
+
+ @Override
+ public void releaseTemporaryTvListForFlush() {
+ workingListForFlush = null;
+ }
@Override
public abstract int delete(long lowerBound, long upperBound);
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/AlignedWritableMemChunk.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/AlignedWritableMemChunk.java
index 5fa0008ce75..86878ea8dc9 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/AlignedWritableMemChunk.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/AlignedWritableMemChunk.java
@@ -68,6 +68,7 @@ public class AlignedWritableMemChunk extends
AbstractWritableMemChunk {
private Map<String, Integer> measurementIndexMap;
private List<TSDataType> dataTypes;
private final List<IMeasurementSchema> schemaList;
+ // Note: Use AbstractWritableMemChunk.workingListForFlush instead of list in
FlushTask
private AlignedTVList list;
private List<AlignedTVList> sortedList;
private long sortedRowCount = 0;
@@ -499,13 +500,6 @@ public class AlignedWritableMemChunk extends
AbstractWritableMemChunk {
return minTime;
}
- @Override
- public synchronized void sortTvListForFlush() {
- if (!list.isSorted()) {
- list.sort();
- }
- }
-
@Override
public int delete(long lowerBound, long upperBound) {
int deletedNumber = list.delete(lowerBound, upperBound);
@@ -557,7 +551,10 @@ public class AlignedWritableMemChunk extends
AbstractWritableMemChunk {
long maxNumberOfPointsInChunk,
int maxNumberOfPointsInPage) {
BitMap allValueColDeletedMap;
- allValueColDeletedMap = ignoreAllNullRows ?
list.getAllValueColDeletedMap() : null;
+ AlignedTVList alignedWorkingListForFlush = (AlignedTVList)
workingListForFlush;
+
+ allValueColDeletedMap =
+ ignoreAllNullRows ?
alignedWorkingListForFlush.getAllValueColDeletedMap() : null;
boolean[] timeDuplicateInfo = null;
@@ -569,8 +566,10 @@ public class AlignedWritableMemChunk extends
AbstractWritableMemChunk {
int pointNumInPage = 0;
int pointNumInChunk = 0;
- for (int sortedRowIndex = 0; sortedRowIndex < list.rowCount();
sortedRowIndex++) {
- long time = list.getTime(sortedRowIndex);
+ for (int sortedRowIndex = 0;
+ sortedRowIndex < alignedWorkingListForFlush.rowCount();
+ sortedRowIndex++) {
+ long time = alignedWorkingListForFlush.getTime(sortedRowIndex);
if (pointNumInPage == 0) {
pageRange.add(sortedRowIndex);
}
@@ -591,15 +590,17 @@ public class AlignedWritableMemChunk extends
AbstractWritableMemChunk {
}
int nextRowIndex = sortedRowIndex + 1;
- while (nextRowIndex < list.rowCount()
+ while (nextRowIndex < alignedWorkingListForFlush.rowCount()
&& ((allValueColDeletedMap != null
- &&
allValueColDeletedMap.isMarked(list.getValueIndex(nextRowIndex)))
- || list.isTimeDeleted(nextRowIndex))) {
+ && allValueColDeletedMap.isMarked(
+ alignedWorkingListForFlush.getValueIndex(nextRowIndex)))
+ || alignedWorkingListForFlush.isTimeDeleted(nextRowIndex))) {
nextRowIndex++;
}
- if (nextRowIndex != list.rowCount() && time ==
list.getTime(nextRowIndex)) {
+ if (nextRowIndex != alignedWorkingListForFlush.rowCount()
+ && time == alignedWorkingListForFlush.getTime(nextRowIndex)) {
if (Objects.isNull(timeDuplicateInfo)) {
- timeDuplicateInfo = new boolean[list.rowCount()];
+ timeDuplicateInfo = new
boolean[alignedWorkingListForFlush.rowCount()];
}
timeDuplicateInfo[sortedRowIndex] = true;
}
@@ -607,7 +608,7 @@ public class AlignedWritableMemChunk extends
AbstractWritableMemChunk {
}
if (pointNumInPage != 0) {
- pageRange.add(list.rowCount() - 1);
+ pageRange.add(alignedWorkingListForFlush.rowCount() - 1);
}
if (pointNumInChunk != 0) {
chunkRange.add(pageRange);
@@ -623,7 +624,8 @@ public class AlignedWritableMemChunk extends
AbstractWritableMemChunk {
boolean[] timeDuplicateInfo,
BitMap allValueColDeletedMap,
int maxNumberOfPointsInPage) {
- List<TSDataType> dataTypes = list.getTsDataTypes();
+ AlignedTVList alignedWorkingListForFlush = (AlignedTVList)
workingListForFlush;
+ List<TSDataType> dataTypes = alignedWorkingListForFlush.getTsDataTypes();
Pair<Long, Integer>[] lastValidPointIndexForTimeDupCheck = new
Pair[dataTypes.size()];
for (List<Integer> pageRange : chunkRange) {
AlignedChunkWriterImpl alignedChunkWriter =
@@ -641,16 +643,18 @@ public class AlignedWritableMemChunk extends
AbstractWritableMemChunk {
sortedRowIndex++) {
// skip empty row
if (allValueColDeletedMap != null
- &&
allValueColDeletedMap.isMarked(list.getValueIndex(sortedRowIndex))) {
+ && allValueColDeletedMap.isMarked(
+ alignedWorkingListForFlush.getValueIndex(sortedRowIndex)))
{
continue;
}
// skip time duplicated rows
- long time = list.getTime(sortedRowIndex);
+ long time = alignedWorkingListForFlush.getTime(sortedRowIndex);
if (Objects.nonNull(timeDuplicateInfo)) {
- if (!list.isNullValue(list.getValueIndex(sortedRowIndex),
columnIndex)) {
+ if (!alignedWorkingListForFlush.isNullValue(
+ alignedWorkingListForFlush.getValueIndex(sortedRowIndex),
columnIndex)) {
lastValidPointIndexForTimeDupCheck[columnIndex].left = time;
lastValidPointIndexForTimeDupCheck[columnIndex].right =
- list.getValueIndex(sortedRowIndex);
+ alignedWorkingListForFlush.getValueIndex(sortedRowIndex);
}
if (timeDuplicateInfo[sortedRowIndex]) {
continue;
@@ -671,41 +675,55 @@ public class AlignedWritableMemChunk extends
AbstractWritableMemChunk {
&& (time ==
lastValidPointIndexForTimeDupCheck[columnIndex].left)) {
originRowIndex =
lastValidPointIndexForTimeDupCheck[columnIndex].right;
} else {
- originRowIndex = list.getValueIndex(sortedRowIndex);
+ originRowIndex =
alignedWorkingListForFlush.getValueIndex(sortedRowIndex);
}
- boolean isNull = list.isNullValue(originRowIndex, columnIndex);
+ boolean isNull =
alignedWorkingListForFlush.isNullValue(originRowIndex, columnIndex);
switch (tsDataType) {
case BOOLEAN:
alignedChunkWriter.writeByColumn(
time,
- !isNull && list.getBooleanByValueIndex(originRowIndex,
columnIndex),
+ !isNull
+ && alignedWorkingListForFlush.getBooleanByValueIndex(
+ originRowIndex, columnIndex),
isNull);
break;
case INT32:
case DATE:
alignedChunkWriter.writeByColumn(
time,
- isNull ? 0 : list.getIntByValueIndex(originRowIndex,
columnIndex),
+ isNull
+ ? 0
+ : alignedWorkingListForFlush.getIntByValueIndex(
+ originRowIndex, columnIndex),
isNull);
break;
case INT64:
case TIMESTAMP:
alignedChunkWriter.writeByColumn(
time,
- isNull ? 0 : list.getLongByValueIndex(originRowIndex,
columnIndex),
+ isNull
+ ? 0
+ : alignedWorkingListForFlush.getLongByValueIndex(
+ originRowIndex, columnIndex),
isNull);
break;
case FLOAT:
alignedChunkWriter.writeByColumn(
time,
- isNull ? 0 : list.getFloatByValueIndex(originRowIndex,
columnIndex),
+ isNull
+ ? 0
+ : alignedWorkingListForFlush.getFloatByValueIndex(
+ originRowIndex, columnIndex),
isNull);
break;
case DOUBLE:
alignedChunkWriter.writeByColumn(
time,
- isNull ? 0 : list.getDoubleByValueIndex(originRowIndex,
columnIndex),
+ isNull
+ ? 0
+ : alignedWorkingListForFlush.getDoubleByValueIndex(
+ originRowIndex, columnIndex),
isNull);
break;
case TEXT:
@@ -714,7 +732,10 @@ public class AlignedWritableMemChunk extends
AbstractWritableMemChunk {
case OBJECT:
alignedChunkWriter.writeByColumn(
time,
- isNull ? null : list.getBinaryByValueIndex(originRowIndex,
columnIndex),
+ isNull
+ ? null
+ : alignedWorkingListForFlush.getBinaryByValueIndex(
+ originRowIndex, columnIndex),
isNull);
break;
default:
@@ -724,19 +745,21 @@ public class AlignedWritableMemChunk extends
AbstractWritableMemChunk {
alignedChunkWriter.nextColumn();
}
- long[] times = new long[Math.min(maxNumberOfPointsInPage,
list.rowCount())];
+ long[] times =
+ new long[Math.min(maxNumberOfPointsInPage,
alignedWorkingListForFlush.rowCount())];
int pointsInPage = 0;
for (int sortedRowIndex = pageRange.get(pageNum * 2);
sortedRowIndex <= pageRange.get(pageNum * 2 + 1);
sortedRowIndex++) {
// skip empty row
if (((allValueColDeletedMap != null
- &&
allValueColDeletedMap.isMarked(list.getValueIndex(sortedRowIndex)))
- || (list.isTimeDeleted(sortedRowIndex)))) {
+ && allValueColDeletedMap.isMarked(
+
alignedWorkingListForFlush.getValueIndex(sortedRowIndex)))
+ || (alignedWorkingListForFlush.isTimeDeleted(sortedRowIndex)))) {
continue;
}
if (Objects.isNull(timeDuplicateInfo) ||
!timeDuplicateInfo[sortedRowIndex]) {
- times[pointsInPage++] = list.getTime(sortedRowIndex);
+ times[pointsInPage++] =
alignedWorkingListForFlush.getTime(sortedRowIndex);
}
}
alignedChunkWriter.write(times, pointsInPage, 0);
@@ -752,8 +775,7 @@ public class AlignedWritableMemChunk extends
AbstractWritableMemChunk {
}
@Override
- public synchronized void encode(
- BlockingQueue<Object> ioTaskQueue, BatchEncodeInfo encodeInfo, long[]
times) {
+ public void encode(BlockingQueue<Object> ioTaskQueue, BatchEncodeInfo
encodeInfo, long[] times) {
encodeInfo.maxNumberOfPointsInChunk =
Math.min(
encodeInfo.maxNumberOfPointsInChunk,
@@ -770,7 +792,7 @@ public class AlignedWritableMemChunk extends
AbstractWritableMemChunk {
// create MergeSortAlignedTVListIterator.
List<AlignedTVList> alignedTvLists = new ArrayList<>(sortedList);
- alignedTvLists.add(list);
+ alignedTvLists.add((AlignedTVList) workingListForFlush);
List<Integer> columnIndexList = buildColumnIndexList(schemaList);
MemPointIterator timeValuePairIterator =
MemPointIteratorFactory.create(
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/IWritableMemChunk.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/IWritableMemChunk.java
index 635e06da4b7..af55d7df9d1 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/IWritableMemChunk.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/IWritableMemChunk.java
@@ -98,6 +98,8 @@ public interface IWritableMemChunk extends WALEntryValue {
*/
void sortTvListForFlush();
+ void releaseTemporaryTvListForFlush();
+
default long getMaxTime() {
return Long.MAX_VALUE;
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/WritableMemChunk.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/WritableMemChunk.java
index 0256d5b16e5..571a371c849 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/WritableMemChunk.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/WritableMemChunk.java
@@ -59,6 +59,7 @@ import static
org.apache.iotdb.db.utils.MemUtils.getBinarySize;
public class WritableMemChunk extends AbstractWritableMemChunk {
private IMeasurementSchema schema;
+ // Note: Use AbstractWritableMemChunk.workingListForFlush instead of list in
FlushTask
private TVList list;
private List<TVList> sortedList;
private long sortedRowCount = 0;
@@ -262,13 +263,6 @@ public class WritableMemChunk extends
AbstractWritableMemChunk {
throw new UnSupportedDataTypeException(UNSUPPORTED_TYPE +
schema.getType());
}
- @Override
- public synchronized void sortTvListForFlush() {
- if (!list.isSorted()) {
- list.sort();
- }
- }
-
@Override
public TVList getWorkingTVList() {
return list;
@@ -395,50 +389,53 @@ public class WritableMemChunk extends
AbstractWritableMemChunk {
ChunkWriterImpl chunkWriterImpl = createIChunkWriter();
long dataSizeInCurrentChunk = 0;
int pointNumInCurrentChunk = 0;
- for (int sortedRowIndex = 0; sortedRowIndex < list.rowCount();
sortedRowIndex++) {
- if (list.isNullValue(list.getValueIndex(sortedRowIndex))) {
+ for (int sortedRowIndex = 0;
+ sortedRowIndex < workingListForFlush.rowCount();
+ sortedRowIndex++) {
+ if
(workingListForFlush.isNullValue(workingListForFlush.getValueIndex(sortedRowIndex)))
{
continue;
}
- long time = list.getTime(sortedRowIndex);
+ long time = workingListForFlush.getTime(sortedRowIndex);
// skip duplicated data
- if ((sortedRowIndex + 1 < list.rowCount() && (time ==
list.getTime(sortedRowIndex + 1)))) {
+ if ((sortedRowIndex + 1 < workingListForFlush.rowCount()
+ && (time == workingListForFlush.getTime(sortedRowIndex + 1)))) {
continue;
}
// store last point for SDT
- if (sortedRowIndex + 1 == list.rowCount()) {
+ if (sortedRowIndex + 1 == workingListForFlush.rowCount()) {
chunkWriterImpl.setLastPoint(true);
}
switch (tsDataType) {
case BOOLEAN:
- chunkWriterImpl.write(time, list.getBoolean(sortedRowIndex));
+ chunkWriterImpl.write(time,
workingListForFlush.getBoolean(sortedRowIndex));
dataSizeInCurrentChunk += 8L + 1L;
break;
case INT32:
case DATE:
- chunkWriterImpl.write(time, list.getInt(sortedRowIndex));
+ chunkWriterImpl.write(time,
workingListForFlush.getInt(sortedRowIndex));
dataSizeInCurrentChunk += 8L + 4L;
break;
case INT64:
case TIMESTAMP:
- chunkWriterImpl.write(time, list.getLong(sortedRowIndex));
+ chunkWriterImpl.write(time,
workingListForFlush.getLong(sortedRowIndex));
dataSizeInCurrentChunk += 8L + 8L;
break;
case FLOAT:
- chunkWriterImpl.write(time, list.getFloat(sortedRowIndex));
+ chunkWriterImpl.write(time,
workingListForFlush.getFloat(sortedRowIndex));
dataSizeInCurrentChunk += 8L + 4L;
break;
case DOUBLE:
- chunkWriterImpl.write(time, list.getDouble(sortedRowIndex));
+ chunkWriterImpl.write(time,
workingListForFlush.getDouble(sortedRowIndex));
dataSizeInCurrentChunk += 8L + 8L;
break;
case TEXT:
case BLOB:
case STRING:
- Binary value = list.getBinary(sortedRowIndex);
+ Binary value = workingListForFlush.getBinary(sortedRowIndex);
chunkWriterImpl.write(time, value);
dataSizeInCurrentChunk += 8L + getBinarySize(value);
break;
@@ -473,8 +470,7 @@ public class WritableMemChunk extends
AbstractWritableMemChunk {
}
@Override
- public synchronized void encode(
- BlockingQueue<Object> ioTaskQueue, BatchEncodeInfo encodeInfo, long[]
times) {
+ public void encode(BlockingQueue<Object> ioTaskQueue, BatchEncodeInfo
encodeInfo, long[] times) {
if (TVLIST_SORT_THRESHOLD == 0) {
encodeWorkingTVList(
ioTaskQueue, encodeInfo.maxNumberOfPointsInChunk,
encodeInfo.targetChunkSize);
@@ -488,7 +484,7 @@ public class WritableMemChunk extends
AbstractWritableMemChunk {
// create MultiTvListIterator. It need not handle float/double precision
here.
List<TVList> tvLists = new ArrayList<>(sortedList);
- tvLists.add(list);
+ tvLists.add(workingListForFlush);
MemPointIterator timeValuePairIterator =
MemPointIteratorFactory.create(
schema.getType(), tvLists, encodeInfo.maxNumberOfPointsInPage);
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/AlignedTVList.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/AlignedTVList.java
index 8c732d4f08e..2c87fdb32ab 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/AlignedTVList.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/AlignedTVList.java
@@ -165,6 +165,18 @@ public abstract class AlignedTVList extends TVList {
return alignedTvList;
}
+ @Override
+ public synchronized AlignedTVList cloneForFlushSort() {
+ AlignedTVList cloneList = AlignedTVList.newAlignedList(new
ArrayList<>(dataTypes));
+ cloneAs(cloneList);
+ cloneList.timeDeletedCnt = this.timeDeletedCnt;
+ cloneList.memoryBinaryChunkSize = this.memoryBinaryChunkSize;
+ cloneList.values = this.values;
+ cloneList.bitMaps = this.bitMaps;
+ cloneList.timeColDeletedMap = this.timeColDeletedMap;
+ return cloneList;
+ }
+
@Override
public synchronized AlignedTVList clone() {
AlignedTVList cloneList = AlignedTVList.newAlignedList(new
ArrayList<>(dataTypes));
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/BinaryTVList.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/BinaryTVList.java
index dc4ff5529d4..b274bc8ef65 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/BinaryTVList.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/BinaryTVList.java
@@ -63,6 +63,15 @@ public abstract class BinaryTVList extends TVList {
}
}
+ @Override
+ public synchronized TVList cloneForFlushSort() {
+ BinaryTVList cloneList = BinaryTVList.newList();
+ cloneAs(cloneList);
+ cloneList.bitMap = this.bitMap;
+ cloneList.values = this.values;
+ return cloneList;
+ }
+
@Override
public synchronized BinaryTVList clone() {
BinaryTVList cloneList = BinaryTVList.newList();
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/BooleanTVList.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/BooleanTVList.java
index b8eb0e508bf..5e3461acb2c 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/BooleanTVList.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/BooleanTVList.java
@@ -62,6 +62,15 @@ public abstract class BooleanTVList extends TVList {
}
}
+ @Override
+ public synchronized TVList cloneForFlushSort() {
+ BooleanTVList cloneList = BooleanTVList.newList();
+ cloneAs(cloneList);
+ cloneList.bitMap = this.bitMap;
+ cloneList.values = this.values;
+ return cloneList;
+ }
+
@Override
public synchronized BooleanTVList clone() {
BooleanTVList cloneList = BooleanTVList.newList();
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/DoubleTVList.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/DoubleTVList.java
index f61995ef062..753ca2020a8 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/DoubleTVList.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/DoubleTVList.java
@@ -63,6 +63,15 @@ public abstract class DoubleTVList extends TVList {
}
}
+ @Override
+ public synchronized TVList cloneForFlushSort() {
+ DoubleTVList cloneList = DoubleTVList.newList();
+ cloneAs(cloneList);
+ cloneList.bitMap = this.bitMap;
+ cloneList.values = this.values;
+ return cloneList;
+ }
+
@Override
public synchronized DoubleTVList clone() {
DoubleTVList cloneList = DoubleTVList.newList();
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/FloatTVList.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/FloatTVList.java
index 3623fa49a3e..517dc208211 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/FloatTVList.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/FloatTVList.java
@@ -63,6 +63,15 @@ public abstract class FloatTVList extends TVList {
}
}
+ @Override
+ public synchronized TVList cloneForFlushSort() {
+ FloatTVList cloneList = FloatTVList.newList();
+ cloneAs(cloneList);
+ cloneList.bitMap = this.bitMap;
+ cloneList.values = this.values;
+ return cloneList;
+ }
+
@Override
public synchronized FloatTVList clone() {
FloatTVList cloneList = FloatTVList.newList();
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/IntTVList.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/IntTVList.java
index 0146ecf0e6b..a51e785d414 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/IntTVList.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/IntTVList.java
@@ -62,6 +62,15 @@ public abstract class IntTVList extends TVList {
}
}
+ @Override
+ public synchronized TVList cloneForFlushSort() {
+ IntTVList cloneList = IntTVList.newList(dataType);
+ cloneAs(cloneList);
+ cloneList.bitMap = this.bitMap;
+ cloneList.values = this.values;
+ return cloneList;
+ }
+
@Override
public synchronized IntTVList clone() {
IntTVList cloneList = IntTVList.newList(dataType);
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/LongTVList.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/LongTVList.java
index 7b4bd8d82d2..544d2cb7dd0 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/LongTVList.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/LongTVList.java
@@ -62,6 +62,15 @@ public abstract class LongTVList extends TVList {
}
}
+ @Override
+ public synchronized TVList cloneForFlushSort() {
+ LongTVList cloneList = LongTVList.newList();
+ cloneAs(cloneList);
+ cloneList.bitMap = this.bitMap;
+ cloneList.values = this.values;
+ return cloneList;
+ }
+
@Override
public synchronized LongTVList clone() {
LongTVList cloneList = LongTVList.newList();
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/TVList.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/TVList.java
index d61611c4cc5..b3bedd62e8f 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/TVList.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/TVList.java
@@ -520,6 +520,8 @@ public abstract class TVList implements WALEntryValue {
protected abstract void expandValues();
+ public abstract TVList cloneForFlushSort();
+
@Override
public abstract TVList clone();
diff --git
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/memtable/PrimitiveMemTableTest.java
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/memtable/PrimitiveMemTableTest.java
index 7e77abca29e..169dd11042f 100644
---
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/memtable/PrimitiveMemTableTest.java
+++
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/memtable/PrimitiveMemTableTest.java
@@ -205,6 +205,54 @@ public class PrimitiveMemTableTest {
}
}
+ @Test
+ public void
testWriteAndFlushSortDuringQuerySortTVListAndActualQueryExecution()
+ throws QueryProcessException, IOException, IllegalPathException {
+
+ PrimitiveMemTable memTable = new PrimitiveMemTable("root.test", "0");
+ List<IMeasurementSchema> measurementSchemas =
+ Arrays.asList(
+ new MeasurementSchema("s1", TSDataType.INT32),
+ new MeasurementSchema("s2", TSDataType.INT32),
+ new MeasurementSchema("s3", TSDataType.INT32));
+ for (int i = 1000; i < 2000; i++) {
+ memTable.writeAlignedRow(
+ new StringArrayDeviceID("root.test.d1"), measurementSchemas, i, new
Object[] {i, i, i});
+ }
+ for (int i = 100; i < 200; i++) {
+ memTable.writeAlignedRow(
+ new StringArrayDeviceID("root.test.d1"), measurementSchemas, i, new
Object[] {i, i, i});
+ }
+ memTable.delete(
+ new TreeDeletionEntry(new MeasurementPath("root.test.d1.s1",
TSDataType.INT32), 150, 160));
+ memTable.delete(
+ new TreeDeletionEntry(new MeasurementPath("root.test.d1.s2",
TSDataType.INT32), 150, 160));
+ memTable.delete(
+ new TreeDeletionEntry(new MeasurementPath("root.test.d1.s3",
TSDataType.INT32), 150, 160));
+ ResourceByPathUtils resourcesByPathUtils =
+ ResourceByPathUtils.getResourceInstance(
+ new AlignedFullPath(
+ new StringArrayDeviceID("root.test.d1"),
+ Arrays.asList("s1", "s2", "s3"),
+ measurementSchemas));
+ ReadOnlyMemChunk readOnlyMemChunk =
+ resourcesByPathUtils.getReadOnlyMemChunkFromMemTable(
+ new QueryContext(1, false), memTable, null, Long.MAX_VALUE, null);
+
+ for (int i = 1; i <= 50; i++) {
+ memTable.writeAlignedRow(
+ new StringArrayDeviceID("root.test.d1"), measurementSchemas, i, new
Object[] {i, i, i});
+ }
+ memTable.getWritableMemChunk(new StringArrayDeviceID("root.test.d1"),
"").sortTvListForFlush();
+
+ readOnlyMemChunk.sortTvLists();
+
+ MemPointIterator memPointIterator =
readOnlyMemChunk.createMemPointIterator(Ordering.ASC, null);
+ while (memPointIterator.hasNextBatch()) {
+ memPointIterator.nextBatch();
+ }
+ }
+
@Test
public void memSeriesToStringTest() throws IOException {
TSDataType dataType = TSDataType.INT32;