This is an automated email from the ASF dual-hosted git repository. shuwenwei pushed a commit to branch fixBug0902-1.3 in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit 40476eda32eeb147b8d31fa39b5dc766bc0a8953 Author: shuwenwei <[email protected]> AuthorDate: Tue Sep 2 16:21:43 2025 +0800 Concurrently querying and writing to the MemTable may cause the query results out of order --- .../memtable/AlignedReadOnlyMemChunk.java | 30 +++---- .../dataregion/memtable/ReadOnlyMemChunk.java | 20 ++--- .../db/utils/datastructure/AlignedTVList.java | 5 +- .../datastructure/MemPointIteratorFactory.java | 91 +++++++++++++++++----- .../MergeSortMultiAlignedTVListIterator.java | 2 + .../MergeSortMultiTVListIterator.java | 2 + .../datastructure/MultiAlignedTVListIterator.java | 6 +- .../utils/datastructure/MultiTVListIterator.java | 22 +++++- .../OrderedMultiAlignedTVListIterator.java | 2 + .../datastructure/OrderedMultiTVListIterator.java | 2 + .../iotdb/db/utils/datastructure/TVList.java | 3 + .../memtable/AlignedTVListIteratorTest.java | 8 +- .../memtable/NonAlignedTVListIteratorTest.java | 8 +- 13 files changed, 133 insertions(+), 68 deletions(-) diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/AlignedReadOnlyMemChunk.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/AlignedReadOnlyMemChunk.java index db19d099f18..6d13b49110e 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/AlignedReadOnlyMemChunk.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/AlignedReadOnlyMemChunk.java @@ -50,7 +50,6 @@ import java.io.Serializable; import java.util.ArrayList; import java.util.List; import java.util.Map; -import java.util.stream.Collectors; public class AlignedReadOnlyMemChunk extends ReadOnlyMemChunk { private final String timeChunkName; @@ -361,21 +360,7 @@ public class AlignedReadOnlyMemChunk extends ReadOnlyMemChunk { } private void writeValidValuesIntoTsBlock(TsBlockBuilder builder) throws IOException { - List<AlignedTVList> alignedTvLists = - alignedTvListQueryMap.keySet().stream() - .map(x -> (AlignedTVList) x) - .collect(Collectors.toList()); - MemPointIterator timeValuePairIterator = - MemPointIteratorFactory.create( - dataTypes, - columnIndexList, - alignedTvLists, - Ordering.ASC, - null, - valueColumnsDeletionList, - floatPrecision, - encodingList, - MAX_NUMBER_OF_POINTS_IN_PAGE); + MemPointIterator timeValuePairIterator = createMemPointIterator(Ordering.ASC, null); while (timeValuePairIterator.hasNextTimeValuePair()) { TimeValuePair tvPair = timeValuePairIterator.nextTimeValuePair(); @@ -453,14 +438,17 @@ public class AlignedReadOnlyMemChunk extends ReadOnlyMemChunk { @Override public MemPointIterator createMemPointIterator(Ordering scanOrder, Filter globalTimeFilter) { - List<AlignedTVList> alignedTvLists = - alignedTvListQueryMap.keySet().stream() - .map(x -> (AlignedTVList) x) - .collect(Collectors.toList()); + List<AlignedTVList> tvLists = new ArrayList<>(alignedTvListQueryMap.size()); + List<Integer> tvListRowCounts = new ArrayList<>(alignedTvListQueryMap.size()); + for (Map.Entry<TVList, Integer> entry : alignedTvListQueryMap.entrySet()) { + tvLists.add((AlignedTVList) entry.getKey()); + tvListRowCounts.add(entry.getValue()); + } return MemPointIteratorFactory.create( dataTypes, columnIndexList, - alignedTvLists, + tvLists, + tvListRowCounts, scanOrder, globalTimeFilter, valueColumnsDeletionList, diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/ReadOnlyMemChunk.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/ReadOnlyMemChunk.java index 5ba6f1d5ce9..64a8868c8c7 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/ReadOnlyMemChunk.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/ReadOnlyMemChunk.java @@ -291,17 +291,7 @@ public class ReadOnlyMemChunk { // read all data in memory chunk and write to tsblock private void writeValidValuesIntoTsBlock(TsBlockBuilder builder) throws IOException { - List<TVList> tvLists = new ArrayList<>(tvListQueryMap.keySet()); - MemPointIterator timeValuePairIterator = - MemPointIteratorFactory.create( - getDataType(), - tvLists, - Ordering.ASC, - null, - deletionList, - floatPrecision, - encoding, - MAX_NUMBER_OF_POINTS_IN_PAGE); + MemPointIterator timeValuePairIterator = createMemPointIterator(Ordering.ASC, null); while (timeValuePairIterator.hasNextTimeValuePair()) { TimeValuePair tvPair = timeValuePairIterator.nextTimeValuePair(); @@ -378,10 +368,16 @@ public class ReadOnlyMemChunk { } public MemPointIterator createMemPointIterator(Ordering scanOrder, Filter globalTimeFilter) { - List<TVList> tvLists = new ArrayList<>(tvListQueryMap.keySet()); + List<TVList> tvLists = new ArrayList<>(tvListQueryMap.size()); + List<Integer> tvListRowCounts = new ArrayList<>(tvListQueryMap.size()); + for (Map.Entry<TVList, Integer> entry : tvListQueryMap.entrySet()) { + tvLists.add(entry.getKey()); + tvListRowCounts.add(entry.getValue()); + } return MemPointIteratorFactory.create( dataType, tvLists, + tvListRowCounts, scanOrder, globalTimeFilter, deletionList, diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/AlignedTVList.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/AlignedTVList.java index 2b643a52128..4971775fefa 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/AlignedTVList.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/AlignedTVList.java @@ -1334,6 +1334,7 @@ public abstract class AlignedTVList extends TVList { public AlignedTVListIterator iterator( Ordering scanOrder, + int rowCount, Filter globalTimeFilter, List<TSDataType> dataTypeList, List<Integer> columnIndexList, @@ -1343,6 +1344,7 @@ public abstract class AlignedTVList extends TVList { int maxNumberOfPointsInPage) { return new AlignedTVListIterator( scanOrder, + rowCount, globalTimeFilter, dataTypeList, columnIndexList, @@ -1369,6 +1371,7 @@ public abstract class AlignedTVList extends TVList { public AlignedTVListIterator( Ordering scanOrder, + int rowCount, Filter globalTimeFilter, List<TSDataType> dataTypeList, List<Integer> columnIndexList, @@ -1376,7 +1379,7 @@ public abstract class AlignedTVList extends TVList { Integer floatPrecision, List<TSEncoding> encodingList, int maxNumberOfPointsInPage) { - super(scanOrder, globalTimeFilter, null, null, null, maxNumberOfPointsInPage); + super(scanOrder, rowCount, globalTimeFilter, null, null, null, maxNumberOfPointsInPage); this.dataTypeList = dataTypeList; this.columnIndexList = (columnIndexList == null) diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/MemPointIteratorFactory.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/MemPointIteratorFactory.java index 4d549afb2fd..14efebb19c8 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/MemPointIteratorFactory.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/MemPointIteratorFactory.java @@ -35,18 +35,29 @@ public class MemPointIteratorFactory { // TVListIterator private static MemPointIterator single(List<TVList> tvLists, int maxNumberOfPointsInPage) { - return tvLists.get(0).iterator(Ordering.ASC, null, null, null, null, maxNumberOfPointsInPage); + return tvLists + .get(0) + .iterator( + Ordering.ASC, tvLists.get(0).rowCount, null, null, null, null, maxNumberOfPointsInPage); } private static MemPointIterator single( List<TVList> tvLists, List<TimeRange> deletionList, int maxNumberOfPointsInPage) { return tvLists .get(0) - .iterator(Ordering.ASC, null, deletionList, null, null, maxNumberOfPointsInPage); + .iterator( + Ordering.ASC, + tvLists.get(0).rowCount, + null, + deletionList, + null, + null, + maxNumberOfPointsInPage); } private static MemPointIterator single( List<TVList> tvLists, + List<Integer> tvListRowCounts, Ordering scanOrder, Filter globalTimeFilter, List<TimeRange> deletionList, @@ -57,6 +68,7 @@ public class MemPointIteratorFactory { .get(0) .iterator( scanOrder, + tvListRowCounts.get(0), globalTimeFilter, deletionList, floatPrecision, @@ -68,7 +80,7 @@ public class MemPointIteratorFactory { private static MemPointIterator mergeSort( TSDataType tsDataType, List<TVList> tvLists, int maxNumberOfPointsInPage) { return new MergeSortMultiTVListIterator( - Ordering.ASC, null, tsDataType, tvLists, null, null, null, maxNumberOfPointsInPage); + Ordering.ASC, null, tsDataType, tvLists, null, null, null, null, maxNumberOfPointsInPage); } private static MemPointIterator mergeSort( @@ -77,12 +89,21 @@ public class MemPointIteratorFactory { List<TimeRange> deletionList, int maxNumberOfPointsInPage) { return new MergeSortMultiTVListIterator( - Ordering.ASC, null, tsDataType, tvLists, deletionList, null, null, maxNumberOfPointsInPage); + Ordering.ASC, + null, + tsDataType, + tvLists, + null, + deletionList, + null, + null, + maxNumberOfPointsInPage); } private static MemPointIterator mergeSort( TSDataType tsDataType, List<TVList> tvLists, + List<Integer> tvListRowCounts, Ordering scanOrder, Filter globalTimeFilter, List<TimeRange> deletionList, @@ -94,6 +115,7 @@ public class MemPointIteratorFactory { globalTimeFilter, tsDataType, tvLists, + tvListRowCounts, deletionList, floatPrecision, encoding, @@ -104,7 +126,7 @@ public class MemPointIteratorFactory { private static MemPointIterator ordered( TSDataType tsDataType, List<TVList> tvLists, int maxNumberOfPointsInPage) { return new OrderedMultiTVListIterator( - Ordering.ASC, null, tsDataType, tvLists, null, null, null, maxNumberOfPointsInPage); + Ordering.ASC, null, tsDataType, tvLists, null, null, null, null, maxNumberOfPointsInPage); } private static MemPointIterator ordered( @@ -113,12 +135,21 @@ public class MemPointIteratorFactory { List<TimeRange> deletionList, int maxNumberOfPointsInPage) { return new OrderedMultiTVListIterator( - Ordering.ASC, null, tsDataType, tvLists, deletionList, null, null, maxNumberOfPointsInPage); + Ordering.ASC, + null, + tsDataType, + tvLists, + null, + deletionList, + null, + null, + maxNumberOfPointsInPage); } private static MemPointIterator ordered( TSDataType tsDataType, List<TVList> tvLists, + List<Integer> tvListRowCounts, Ordering scanOrder, Filter globalTimeFilter, List<TimeRange> deletionList, @@ -130,6 +161,7 @@ public class MemPointIteratorFactory { globalTimeFilter, tsDataType, tvLists, + tvListRowCounts, deletionList, floatPrecision, encoding, @@ -146,6 +178,7 @@ public class MemPointIteratorFactory { .get(0) .iterator( Ordering.ASC, + alignedTvLists.get(0).rowCount, null, tsDataTypes, columnIndexList, @@ -165,6 +198,7 @@ public class MemPointIteratorFactory { .get(0) .iterator( Ordering.ASC, + alignedTvLists.get(0).rowCount, null, tsDataTypes, columnIndexList, @@ -178,6 +212,7 @@ public class MemPointIteratorFactory { List<TSDataType> tsDataTypes, List<Integer> columnIndexList, List<AlignedTVList> alignedTvLists, + List<Integer> tvListRowCounts, Ordering scanOrder, Filter globalTimeFilter, List<List<TimeRange>> valueColumnsDeletionList, @@ -188,6 +223,7 @@ public class MemPointIteratorFactory { .get(0) .iterator( scanOrder, + tvListRowCounts.get(0), globalTimeFilter, tsDataTypes, columnIndexList, @@ -207,6 +243,7 @@ public class MemPointIteratorFactory { tsDataTypes, columnIndexList, alignedTvLists, + null, Ordering.ASC, null, null, @@ -225,6 +262,7 @@ public class MemPointIteratorFactory { tsDataTypes, columnIndexList, alignedTvLists, + null, Ordering.ASC, null, valueColumnsDeletionList, @@ -237,6 +275,7 @@ public class MemPointIteratorFactory { List<TSDataType> tsDataTypes, List<Integer> columnIndexList, List<AlignedTVList> alignedTvLists, + List<Integer> tvListRowCounts, Ordering scanOrder, Filter globalTimeFilter, List<List<TimeRange>> valueColumnsDeletionList, @@ -247,6 +286,7 @@ public class MemPointIteratorFactory { tsDataTypes, columnIndexList, alignedTvLists, + tvListRowCounts, scanOrder, globalTimeFilter, valueColumnsDeletionList, @@ -265,6 +305,7 @@ public class MemPointIteratorFactory { tsDataTypes, columnIndexList, alignedTvLists, + null, Ordering.ASC, null, null, @@ -283,6 +324,7 @@ public class MemPointIteratorFactory { tsDataTypes, columnIndexList, alignedTvLists, + null, Ordering.ASC, null, valueColumnsDeletionList, @@ -295,6 +337,7 @@ public class MemPointIteratorFactory { List<TSDataType> tsDataTypes, List<Integer> columnIndexList, List<AlignedTVList> alignedTvLists, + List<Integer> tvListRowCounts, Ordering scanOrder, Filter globalTimeFilter, List<List<TimeRange>> valueColumnsDeletionList, @@ -305,6 +348,7 @@ public class MemPointIteratorFactory { tsDataTypes, columnIndexList, alignedTvLists, + tvListRowCounts, scanOrder, globalTimeFilter, valueColumnsDeletionList, @@ -317,7 +361,7 @@ public class MemPointIteratorFactory { TSDataType tsDataType, List<TVList> tvLists, int maxNumberOfPointsInPage) { if (tvLists.size() == 1) { return single(tvLists, maxNumberOfPointsInPage); - } else if (isCompleteOrdered(tvLists)) { + } else if (isCompleteOrdered(tvLists, null)) { return ordered(tsDataType, tvLists, maxNumberOfPointsInPage); } else { return mergeSort(tsDataType, tvLists, maxNumberOfPointsInPage); @@ -331,7 +375,7 @@ public class MemPointIteratorFactory { int maxNumberOfPointsInPage) { if (tvLists.size() == 1) { return single(tvLists, deletionList, maxNumberOfPointsInPage); - } else if (isCompleteOrdered(tvLists)) { + } else if (isCompleteOrdered(tvLists, null)) { return ordered(tsDataType, tvLists, deletionList, maxNumberOfPointsInPage); } else { return mergeSort(tsDataType, tvLists, deletionList, maxNumberOfPointsInPage); @@ -341,6 +385,7 @@ public class MemPointIteratorFactory { public static MemPointIterator create( TSDataType tsDataType, List<TVList> tvLists, + List<Integer> tvListRowCounts, Ordering scanOrder, Filter globalTimeFilter, List<TimeRange> deletionList, @@ -350,16 +395,18 @@ public class MemPointIteratorFactory { if (tvLists.size() == 1) { return single( tvLists, + tvListRowCounts, scanOrder, globalTimeFilter, deletionList, floatPrecision, encoding, maxNumberOfPointsInPage); - } else if (isCompleteOrdered(tvLists)) { + } else if (isCompleteOrdered(tvLists, tvListRowCounts)) { return ordered( tsDataType, tvLists, + tvListRowCounts, scanOrder, globalTimeFilter, deletionList, @@ -370,6 +417,7 @@ public class MemPointIteratorFactory { return mergeSort( tsDataType, tvLists, + tvListRowCounts, scanOrder, globalTimeFilter, deletionList, @@ -386,7 +434,7 @@ public class MemPointIteratorFactory { int maxNumberOfPointsInPage) { if (alignedTvLists.size() == 1) { return single(tsDataTypes, columnIndexList, alignedTvLists, maxNumberOfPointsInPage); - } else if (isCompleteOrdered(alignedTvLists)) { + } else if (isCompleteOrdered(alignedTvLists, null)) { return ordered(tsDataTypes, columnIndexList, alignedTvLists, maxNumberOfPointsInPage); } else { return mergeSort(tsDataTypes, columnIndexList, alignedTvLists, maxNumberOfPointsInPage); @@ -406,7 +454,7 @@ public class MemPointIteratorFactory { alignedTvLists, valueColumnsDeletionList, maxNumberOfPointsInPage); - } else if (isCompleteOrdered(alignedTvLists)) { + } else if (isCompleteOrdered(alignedTvLists, null)) { return ordered( tsDataTypes, columnIndexList, @@ -427,6 +475,7 @@ public class MemPointIteratorFactory { List<TSDataType> tsDataTypes, List<Integer> columnIndexList, List<AlignedTVList> alignedTvLists, + List<Integer> tvListRowCounts, Ordering scanOrder, Filter globalTimeFilter, List<List<TimeRange>> valueColumnsDeletionList, @@ -438,17 +487,19 @@ public class MemPointIteratorFactory { tsDataTypes, columnIndexList, alignedTvLists, + tvListRowCounts, scanOrder, globalTimeFilter, valueColumnsDeletionList, floatPrecision, encodingList, maxNumberOfPointsInPage); - } else if (isCompleteOrdered(alignedTvLists)) { + } else if (isCompleteOrdered(alignedTvLists, tvListRowCounts)) { return ordered( tsDataTypes, columnIndexList, alignedTvLists, + tvListRowCounts, scanOrder, globalTimeFilter, valueColumnsDeletionList, @@ -460,6 +511,7 @@ public class MemPointIteratorFactory { tsDataTypes, columnIndexList, alignedTvLists, + tvListRowCounts, scanOrder, globalTimeFilter, valueColumnsDeletionList, @@ -469,21 +521,24 @@ public class MemPointIteratorFactory { } } - private static boolean isCompleteOrdered(List<? extends TVList> tvLists) { + private static boolean isCompleteOrdered( + List<? extends TVList> tvLists, List<Integer> tvListRowCounts) { long time = Long.MIN_VALUE; for (int i = 0; i < tvLists.size(); i++) { TVList list = tvLists.get(i); - if (!list.isSorted()) { - return false; - } + int rowCount = tvListRowCounts == null ? list.rowCount() : tvListRowCounts.get(i); - if (tvLists.get(i).rowCount() == 0) { + if (rowCount == 0) { continue; } + if (list.seqRowCount() < rowCount) { + return false; + } + if (i > 0 && list.getTime(0) <= time) { return false; } - time = list.getTime(list.rowCount() - 1); + time = list.getTime(rowCount - 1); } return true; } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/MergeSortMultiAlignedTVListIterator.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/MergeSortMultiAlignedTVListIterator.java index bf96e822b06..c507bd75d50 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/MergeSortMultiAlignedTVListIterator.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/MergeSortMultiAlignedTVListIterator.java @@ -54,6 +54,7 @@ public class MergeSortMultiAlignedTVListIterator extends MultiAlignedTVListItera List<TSDataType> tsDataTypes, List<Integer> columnIndexList, List<AlignedTVList> alignedTvLists, + List<Integer> tvListRowCounts, Ordering scanOrder, Filter globalTimeFilter, List<List<TimeRange>> valueColumnsDeletionList, @@ -64,6 +65,7 @@ public class MergeSortMultiAlignedTVListIterator extends MultiAlignedTVListItera tsDataTypes, columnIndexList, alignedTvLists, + tvListRowCounts, scanOrder, globalTimeFilter, valueColumnsDeletionList, diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/MergeSortMultiTVListIterator.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/MergeSortMultiTVListIterator.java index 44fa0d47807..2e33811c257 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/MergeSortMultiTVListIterator.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/MergeSortMultiTVListIterator.java @@ -47,6 +47,7 @@ public class MergeSortMultiTVListIterator extends MultiTVListIterator { Filter globalTimeFilter, TSDataType tsDataType, List<TVList> tvLists, + List<Integer> tvListRowCounts, List<TimeRange> deletionList, Integer floatPrecision, TSEncoding encoding, @@ -56,6 +57,7 @@ public class MergeSortMultiTVListIterator extends MultiTVListIterator { globalTimeFilter, tsDataType, tvLists, + tvListRowCounts, deletionList, floatPrecision, encoding, diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/MultiAlignedTVListIterator.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/MultiAlignedTVListIterator.java index 64ed298da24..a6019b5465f 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/MultiAlignedTVListIterator.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/MultiAlignedTVListIterator.java @@ -60,6 +60,7 @@ public abstract class MultiAlignedTVListIterator extends MemPointIterator { List<TSDataType> tsDataTypeList, List<Integer> columnIndexList, List<AlignedTVList> alignedTvLists, + List<Integer> tvListRowCounts, Ordering scanOrder, Filter globalTimeFilter, List<List<TimeRange>> valueColumnsDeletionList, @@ -71,10 +72,12 @@ public abstract class MultiAlignedTVListIterator extends MemPointIterator { this.columnIndexList = columnIndexList; this.alignedTvListIterators = new ArrayList<>(alignedTvLists.size()); if (scanOrder.isAscending()) { - for (AlignedTVList alignedTVList : alignedTvLists) { + for (int i = 0; i < alignedTvLists.size(); i++) { + AlignedTVList alignedTVList = alignedTvLists.get(i); AlignedTVList.AlignedTVListIterator iterator = alignedTVList.iterator( scanOrder, + tvListRowCounts.get(i), globalTimeFilter, tsDataTypeList, columnIndexList, @@ -90,6 +93,7 @@ public abstract class MultiAlignedTVListIterator extends MemPointIterator { AlignedTVList.AlignedTVListIterator iterator = alignedTVList.iterator( scanOrder, + tvListRowCounts.get(i), globalTimeFilter, tsDataTypeList, columnIndexList, diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/MultiTVListIterator.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/MultiTVListIterator.java index a8d5879b767..b735cc7927b 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/MultiTVListIterator.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/MultiTVListIterator.java @@ -56,6 +56,7 @@ public abstract class MultiTVListIterator extends MemPointIterator { Filter globalTimeFilter, TSDataType tsDataType, List<TVList> tvLists, + List<Integer> tvListRowCounts, List<TimeRange> deletionList, Integer floatPrecision, TSEncoding encoding, @@ -64,18 +65,33 @@ public abstract class MultiTVListIterator extends MemPointIterator { this.tsDataType = tsDataType; this.tvListIterators = new ArrayList<>(tvLists.size()); if (scanOrder.isAscending()) { - for (TVList tvList : tvLists) { + for (int i = 0; i < tvLists.size(); i++) { + TVList tvList = tvLists.get(i); + int rowCount = tvListRowCounts == null ? tvList.rowCount : tvListRowCounts.get(i); TVList.TVListIterator iterator = tvList.iterator( - scanOrder, globalTimeFilter, deletionList, null, null, maxNumberOfPointsInPage); + scanOrder, + rowCount, + globalTimeFilter, + deletionList, + null, + null, + maxNumberOfPointsInPage); tvListIterators.add(iterator); } } else { for (int i = tvLists.size() - 1; i >= 0; i--) { TVList tvList = tvLists.get(i); + int rowCount = tvListRowCounts == null ? tvList.rowCount : tvListRowCounts.get(i); TVList.TVListIterator iterator = tvList.iterator( - scanOrder, globalTimeFilter, deletionList, null, null, maxNumberOfPointsInPage); + scanOrder, + rowCount, + globalTimeFilter, + deletionList, + null, + null, + maxNumberOfPointsInPage); tvListIterators.add(iterator); } } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/OrderedMultiAlignedTVListIterator.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/OrderedMultiAlignedTVListIterator.java index c138b1ff07b..46d8c5f02ac 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/OrderedMultiAlignedTVListIterator.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/OrderedMultiAlignedTVListIterator.java @@ -41,6 +41,7 @@ public class OrderedMultiAlignedTVListIterator extends MultiAlignedTVListIterato List<TSDataType> tsDataTypes, List<Integer> columnIndexList, List<AlignedTVList> alignedTvLists, + List<Integer> tvListRowCounts, Ordering scanOrder, Filter globalTimeFilter, List<List<TimeRange>> valueColumnsDeletionList, @@ -51,6 +52,7 @@ public class OrderedMultiAlignedTVListIterator extends MultiAlignedTVListIterato tsDataTypes, columnIndexList, alignedTvLists, + tvListRowCounts, scanOrder, globalTimeFilter, valueColumnsDeletionList, diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/OrderedMultiTVListIterator.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/OrderedMultiTVListIterator.java index c7ae99386c5..61ae5f1360d 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/OrderedMultiTVListIterator.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/OrderedMultiTVListIterator.java @@ -35,6 +35,7 @@ public class OrderedMultiTVListIterator extends MultiTVListIterator { Filter globalTimeFilter, TSDataType tsDataType, List<TVList> tvLists, + List<Integer> tvListRowCounts, List<TimeRange> deletionList, Integer floatPrecision, TSEncoding encoding, @@ -44,6 +45,7 @@ public class OrderedMultiTVListIterator extends MultiTVListIterator { globalTimeFilter, tsDataType, tvLists, + tvListRowCounts, deletionList, floatPrecision, encoding, diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/TVList.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/TVList.java index 3d5ee978469..7db4791571c 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/TVList.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/TVList.java @@ -650,6 +650,7 @@ public abstract class TVList implements WALEntryValue { public TVListIterator iterator( Ordering scanOrder, + int rowCount, Filter globalTimeFilter, List<TimeRange> deletionList, Integer floatPrecision, @@ -657,6 +658,7 @@ public abstract class TVList implements WALEntryValue { int maxNumberOfPointsInPage) { return new TVListIterator( scanOrder, + rowCount, globalTimeFilter, deletionList, floatPrecision, @@ -681,6 +683,7 @@ public abstract class TVList implements WALEntryValue { public TVListIterator( Ordering scanOrder, + int rowCount, Filter globalTimeFilter, List<TimeRange> deletionList, Integer floatPrecision, diff --git a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/memtable/AlignedTVListIteratorTest.java b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/memtable/AlignedTVListIteratorTest.java index 6e3e21ff904..59e58182f6f 100644 --- a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/memtable/AlignedTVListIteratorTest.java +++ b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/memtable/AlignedTVListIteratorTest.java @@ -738,7 +738,6 @@ public class AlignedTVListIteratorTest { AlignedTVList alignedTVList = AlignedTVList.newAlignedList( Arrays.asList(TSDataType.INT64, TSDataType.BOOLEAN, TSDataType.BOOLEAN)); - int rowCount = 0; for (TimeRange timeRange : timeRanges) { long start = timeRange.getMin(); long end = timeRange.getMax(); @@ -757,11 +756,10 @@ public class AlignedTVListIteratorTest { } alignedTVList.putAlignedValue( timestamp, new Object[] {timestamp, timestamp % 2 == 0, true}); - rowCount++; } } Map<TVList, Integer> tvListMap = new HashMap<>(); - tvListMap.put(alignedTVList, rowCount); + tvListMap.put(alignedTVList, alignedTVList.rowCount()); return tvListMap; } @@ -772,7 +770,6 @@ public class AlignedTVListIteratorTest { AlignedTVList alignedTVList = AlignedTVList.newAlignedList( Arrays.asList(TSDataType.INT64, TSDataType.BOOLEAN, TSDataType.BOOLEAN)); - int rowCount = 0; long start = timeRange.getMin(); long end = timeRange.getMax(); List<Long> timestamps = new ArrayList<>((int) (end - start + 1)); @@ -790,9 +787,8 @@ public class AlignedTVListIteratorTest { } alignedTVList.putAlignedValue( timestamp, new Object[] {timestamp, timestamp % 2 == 0, isLast}); - rowCount++; } - tvListMap.put(alignedTVList, rowCount); + tvListMap.put(alignedTVList, alignedTVList.rowCount()); } return tvListMap; } diff --git a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/memtable/NonAlignedTVListIteratorTest.java b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/memtable/NonAlignedTVListIteratorTest.java index 62c801f4e34..a984eaada1f 100644 --- a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/memtable/NonAlignedTVListIteratorTest.java +++ b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/memtable/NonAlignedTVListIteratorTest.java @@ -582,7 +582,6 @@ public class NonAlignedTVListIteratorTest { private static Map<TVList, Integer> buildNonAlignedSingleTvListMap(List<TimeRange> timeRanges) { TVList tvList = TVList.newList(TSDataType.INT64); - int rowCount = 0; for (TimeRange timeRange : timeRanges) { long start = timeRange.getMin(); long end = timeRange.getMax(); @@ -599,11 +598,10 @@ public class NonAlignedTVListIteratorTest { } } tvList.putLong(timestamp, timestamp); - rowCount++; } } Map<TVList, Integer> tvListMap = new HashMap<>(); - tvListMap.put(tvList, rowCount); + tvListMap.put(tvList, tvList.rowCount()); return tvListMap; } @@ -611,7 +609,6 @@ public class NonAlignedTVListIteratorTest { Map<TVList, Integer> tvListMap = new LinkedHashMap<>(); for (TimeRange timeRange : timeRanges) { TVList tvList = TVList.newList(TSDataType.INT64); - int rowCount = 0; long start = timeRange.getMin(); long end = timeRange.getMax(); List<Long> timestamps = new ArrayList<>((int) (end - start + 1)); @@ -621,9 +618,8 @@ public class NonAlignedTVListIteratorTest { Collections.shuffle(timestamps); for (Long timestamp : timestamps) { tvList.putLong(timestamp, timestamp); - rowCount++; } - tvListMap.put(tvList, rowCount); + tvListMap.put(tvList, tvList.rowCount()); } return tvListMap; }
