This is an automated email from the ASF dual-hosted git repository. shuwenwei pushed a commit to branch fixWriteFlushWithConcurrentQuery in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit 75bbe1ae9b36b7fad6072afafb3decfc535b1567 Author: shuwenwei <[email protected]> AuthorDate: Tue Feb 10 10:21:51 2026 +0800 Fixed concurrency issues caused by write and flush sorting during query execution --- .../memtable/AbstractWritableMemChunk.java | 21 ++++++++- .../memtable/AlignedWritableMemChunk.java | 11 ++--- .../dataregion/memtable/WritableMemChunk.java | 10 +---- .../db/utils/datastructure/AlignedTVList.java | 12 ++++++ .../iotdb/db/utils/datastructure/BinaryTVList.java | 9 ++++ .../db/utils/datastructure/BooleanTVList.java | 9 ++++ .../iotdb/db/utils/datastructure/DoubleTVList.java | 9 ++++ .../iotdb/db/utils/datastructure/FloatTVList.java | 9 ++++ .../iotdb/db/utils/datastructure/IntTVList.java | 9 ++++ .../iotdb/db/utils/datastructure/LongTVList.java | 9 ++++ .../iotdb/db/utils/datastructure/TVList.java | 2 + .../dataregion/memtable/PrimitiveMemTableTest.java | 50 ++++++++++++++++++++++ 12 files changed, 144 insertions(+), 16 deletions(-) diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/AbstractWritableMemChunk.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/AbstractWritableMemChunk.java index f33182fc680..fe494004529 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/AbstractWritableMemChunk.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/AbstractWritableMemChunk.java @@ -47,6 +47,8 @@ public abstract class AbstractWritableMemChunk implements IWritableMemChunk { protected static long RETRY_INTERVAL_MS = 100L; protected static long MAX_WAIT_QUERY_MS = 60 * 1000L; + protected TVList listForFlushSort; + /** * Release the TVList if there is no query on it. Otherwise, it should set the first query as the * owner. TVList is released until all queries finish. If it throws memory-not-enough exception @@ -198,7 +200,24 @@ public abstract class AbstractWritableMemChunk implements IWritableMemChunk { public abstract IMeasurementSchema getSchema(); @Override - public abstract void sortTvListForFlush(); + public synchronized void sortTvListForFlush() { + TVList workingList = getWorkingTVList(); + if (workingList.isSorted()) { + listForFlushSort = workingList; + return; + } + + boolean needCloneTimesAndIndicesInWorkingTVList; + workingList.lockQueryList(); + try { + needCloneTimesAndIndicesInWorkingTVList = !workingList.getQueryContextSet().isEmpty(); + } finally { + workingList.unlockQueryList(); + } + listForFlushSort = + needCloneTimesAndIndicesInWorkingTVList ? workingList.cloneForFlushSort() : workingList; + listForFlushSort.sort(); + } @Override public abstract int delete(long lowerBound, long upperBound); diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/AlignedWritableMemChunk.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/AlignedWritableMemChunk.java index 5fa0008ce75..498698f19ef 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/AlignedWritableMemChunk.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/AlignedWritableMemChunk.java @@ -499,13 +499,6 @@ public class AlignedWritableMemChunk extends AbstractWritableMemChunk { return minTime; } - @Override - public synchronized void sortTvListForFlush() { - if (!list.isSorted()) { - list.sort(); - } - } - @Override public int delete(long lowerBound, long upperBound) { int deletedNumber = list.delete(lowerBound, upperBound); @@ -557,6 +550,8 @@ public class AlignedWritableMemChunk extends AbstractWritableMemChunk { long maxNumberOfPointsInChunk, int maxNumberOfPointsInPage) { BitMap allValueColDeletedMap; + AlignedTVList list = (AlignedTVList) listForFlushSort; + allValueColDeletedMap = ignoreAllNullRows ? list.getAllValueColDeletedMap() : null; boolean[] timeDuplicateInfo = null; @@ -623,6 +618,7 @@ public class AlignedWritableMemChunk extends AbstractWritableMemChunk { boolean[] timeDuplicateInfo, BitMap allValueColDeletedMap, int maxNumberOfPointsInPage) { + AlignedTVList list = (AlignedTVList) listForFlushSort; List<TSDataType> dataTypes = list.getTsDataTypes(); Pair<Long, Integer>[] lastValidPointIndexForTimeDupCheck = new Pair[dataTypes.size()]; for (List<Integer> pageRange : chunkRange) { @@ -754,6 +750,7 @@ public class AlignedWritableMemChunk extends AbstractWritableMemChunk { @Override public synchronized void encode( BlockingQueue<Object> ioTaskQueue, BatchEncodeInfo encodeInfo, long[] times) { + AlignedTVList list = (AlignedTVList) listForFlushSort; encodeInfo.maxNumberOfPointsInChunk = Math.min( encodeInfo.maxNumberOfPointsInChunk, diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/WritableMemChunk.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/WritableMemChunk.java index 0256d5b16e5..0411690adb1 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/WritableMemChunk.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/WritableMemChunk.java @@ -262,13 +262,6 @@ public class WritableMemChunk extends AbstractWritableMemChunk { throw new UnSupportedDataTypeException(UNSUPPORTED_TYPE + schema.getType()); } - @Override - public synchronized void sortTvListForFlush() { - if (!list.isSorted()) { - list.sort(); - } - } - @Override public TVList getWorkingTVList() { return list; @@ -391,6 +384,7 @@ public class WritableMemChunk extends AbstractWritableMemChunk { public void encodeWorkingTVList( BlockingQueue<Object> ioTaskQueue, long maxNumberOfPointsInChunk, long targetChunkSize) { + TVList list = listForFlushSort; TSDataType tsDataType = schema.getType(); ChunkWriterImpl chunkWriterImpl = createIChunkWriter(); long dataSizeInCurrentChunk = 0; @@ -488,7 +482,7 @@ public class WritableMemChunk extends AbstractWritableMemChunk { // create MultiTvListIterator. It need not handle float/double precision here. List<TVList> tvLists = new ArrayList<>(sortedList); - tvLists.add(list); + tvLists.add(listForFlushSort); MemPointIterator timeValuePairIterator = MemPointIteratorFactory.create( schema.getType(), tvLists, encodeInfo.maxNumberOfPointsInPage); diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/AlignedTVList.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/AlignedTVList.java index 8c732d4f08e..fdbebb70960 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/AlignedTVList.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/AlignedTVList.java @@ -165,6 +165,18 @@ public abstract class AlignedTVList extends TVList { return alignedTvList; } + @Override + public AlignedTVList cloneForFlushSort() { + AlignedTVList cloneList = AlignedTVList.newAlignedList(new ArrayList<>(dataTypes)); + cloneAs(cloneList); + cloneList.timeDeletedCnt = this.timeDeletedCnt; + cloneList.memoryBinaryChunkSize = this.memoryBinaryChunkSize; + cloneList.values = this.values; + cloneList.bitMaps = this.bitMaps; + cloneList.timeColDeletedMap = this.timeColDeletedMap; + return cloneList; + } + @Override public synchronized AlignedTVList clone() { AlignedTVList cloneList = AlignedTVList.newAlignedList(new ArrayList<>(dataTypes)); diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/BinaryTVList.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/BinaryTVList.java index dc4ff5529d4..7899565b75c 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/BinaryTVList.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/BinaryTVList.java @@ -63,6 +63,15 @@ public abstract class BinaryTVList extends TVList { } } + @Override + public TVList cloneForFlushSort() { + BinaryTVList cloneList = BinaryTVList.newList(); + cloneAs(cloneList); + cloneList.bitMap = this.bitMap; + cloneList.values = this.values; + return cloneList; + } + @Override public synchronized BinaryTVList clone() { BinaryTVList cloneList = BinaryTVList.newList(); diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/BooleanTVList.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/BooleanTVList.java index b8eb0e508bf..a4cc03401bb 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/BooleanTVList.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/BooleanTVList.java @@ -62,6 +62,15 @@ public abstract class BooleanTVList extends TVList { } } + @Override + public TVList cloneForFlushSort() { + BooleanTVList cloneList = BooleanTVList.newList(); + cloneAs(cloneList); + cloneList.bitMap = this.bitMap; + cloneList.values = this.values; + return cloneList; + } + @Override public synchronized BooleanTVList clone() { BooleanTVList cloneList = BooleanTVList.newList(); diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/DoubleTVList.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/DoubleTVList.java index f61995ef062..9897e11db56 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/DoubleTVList.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/DoubleTVList.java @@ -63,6 +63,15 @@ public abstract class DoubleTVList extends TVList { } } + @Override + public TVList cloneForFlushSort() { + DoubleTVList cloneList = DoubleTVList.newList(); + cloneAs(cloneList); + cloneList.bitMap = this.bitMap; + cloneList.values = this.values; + return cloneList; + } + @Override public synchronized DoubleTVList clone() { DoubleTVList cloneList = DoubleTVList.newList(); diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/FloatTVList.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/FloatTVList.java index 3623fa49a3e..857668cd3a8 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/FloatTVList.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/FloatTVList.java @@ -63,6 +63,15 @@ public abstract class FloatTVList extends TVList { } } + @Override + public TVList cloneForFlushSort() { + FloatTVList cloneList = FloatTVList.newList(); + cloneAs(cloneList); + cloneList.bitMap = this.bitMap; + cloneList.values = this.values; + return cloneList; + } + @Override public synchronized FloatTVList clone() { FloatTVList cloneList = FloatTVList.newList(); diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/IntTVList.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/IntTVList.java index 0146ecf0e6b..172f0920468 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/IntTVList.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/IntTVList.java @@ -62,6 +62,15 @@ public abstract class IntTVList extends TVList { } } + @Override + public TVList cloneForFlushSort() { + IntTVList cloneList = IntTVList.newList(dataType); + cloneAs(cloneList); + cloneList.bitMap = this.bitMap; + cloneList.values = this.values; + return cloneList; + } + @Override public synchronized IntTVList clone() { IntTVList cloneList = IntTVList.newList(dataType); diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/LongTVList.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/LongTVList.java index 7b4bd8d82d2..74789b1842f 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/LongTVList.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/LongTVList.java @@ -62,6 +62,15 @@ public abstract class LongTVList extends TVList { } } + @Override + public TVList cloneForFlushSort() { + LongTVList cloneList = LongTVList.newList(); + cloneAs(cloneList); + cloneList.bitMap = this.bitMap; + cloneList.values = this.values; + return cloneList; + } + @Override public synchronized LongTVList clone() { LongTVList cloneList = LongTVList.newList(); diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/TVList.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/TVList.java index d61611c4cc5..b3bedd62e8f 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/TVList.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/TVList.java @@ -520,6 +520,8 @@ public abstract class TVList implements WALEntryValue { protected abstract void expandValues(); + public abstract TVList cloneForFlushSort(); + @Override public abstract TVList clone(); diff --git a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/memtable/PrimitiveMemTableTest.java b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/memtable/PrimitiveMemTableTest.java index b16e20d4f85..0412981433a 100644 --- a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/memtable/PrimitiveMemTableTest.java +++ b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/memtable/PrimitiveMemTableTest.java @@ -205,6 +205,56 @@ public class PrimitiveMemTableTest { } } + @Test + public void testWriteAndFlushSortDuringQuerySortTVListAndActualQueryExecution() + throws QueryProcessException, IOException, IllegalPathException { + + PrimitiveMemTable memTable = new PrimitiveMemTable("root.test", "0"); + List<IMeasurementSchema> measurementSchemas = + Arrays.asList( + new MeasurementSchema("s1", TSDataType.INT32), + new MeasurementSchema("s2", TSDataType.INT32), + new MeasurementSchema("s3", TSDataType.INT32)); + for (int i = 1000; i < 2000; i++) { + memTable.writeAlignedRow( + new StringArrayDeviceID("root.test.d1"), measurementSchemas, i, new Object[] {i, i, i}); + } + for (int i = 100; i < 200; i++) { + memTable.writeAlignedRow( + new StringArrayDeviceID("root.test.d1"), measurementSchemas, i, new Object[] {i, i, i}); + } + memTable.delete( + new TreeDeletionEntry(new MeasurementPath("root.test.d1.s1", TSDataType.INT32), 150, 160)); + memTable.delete( + new TreeDeletionEntry(new MeasurementPath("root.test.d1.s2", TSDataType.INT32), 150, 160)); + memTable.delete( + new TreeDeletionEntry(new MeasurementPath("root.test.d1.s3", TSDataType.INT32), 150, 160)); + ResourceByPathUtils resourcesByPathUtils = + ResourceByPathUtils.getResourceInstance( + new AlignedFullPath( + new StringArrayDeviceID("root.test.d1"), + Arrays.asList("s1", "s2", "s3"), + measurementSchemas)); + ReadOnlyMemChunk readOnlyMemChunk = + resourcesByPathUtils.getReadOnlyMemChunkFromMemTable( + new QueryContext(1), memTable, null, Long.MAX_VALUE, null); + + for (int i = 1; i <= 50; i++) { + memTable.writeAlignedRow( + new StringArrayDeviceID("root.test.d1"), measurementSchemas, i, new Object[] {i, i, i}); + } + memTable.getWritableMemChunk(new StringArrayDeviceID("root.test.d1"), "s1").sortTvListForFlush(); + memTable.getWritableMemChunk(new StringArrayDeviceID("root.test.d1"), "s2").sortTvListForFlush(); + memTable.getWritableMemChunk(new StringArrayDeviceID("root.test.d1"), "s3").sortTvListForFlush(); + + readOnlyMemChunk.sortTvLists(); + + MemPointIterator memPointIterator = readOnlyMemChunk.createMemPointIterator(Ordering.ASC, null); + while (memPointIterator.hasNextBatch()) { + memPointIterator.nextBatch(); + } + } + @Test public void memSeriesToStringTest() throws IOException { TSDataType dataType = TSDataType.INT32;
