This is an automated email from the ASF dual-hosted git repository.
jackietien pushed a commit to branch dev/1.3
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/dev/1.3 by this push:
new c2c50269fce [To dev/1.3] Concurrently querying and writing to the
memtable may cause the query results out of order
c2c50269fce is described below
commit c2c50269fceb5086def7316f9c196fe24f4adec6
Author: shuwenwei <[email protected]>
AuthorDate: Wed Sep 3 10:15:13 2025 +0800
[To dev/1.3] Concurrently querying and writing to the memtable may cause
the query results out of order
---
.../memtable/AlignedReadOnlyMemChunk.java | 30 +++----
.../dataregion/memtable/ReadOnlyMemChunk.java | 20 ++---
.../db/utils/datastructure/AlignedTVList.java | 5 +-
.../datastructure/MemPointIteratorFactory.java | 91 +++++++++++++++++-----
.../MergeSortMultiAlignedTVListIterator.java | 2 +
.../MergeSortMultiTVListIterator.java | 2 +
.../datastructure/MultiAlignedTVListIterator.java | 6 +-
.../utils/datastructure/MultiTVListIterator.java | 22 +++++-
.../OrderedMultiAlignedTVListIterator.java | 2 +
.../datastructure/OrderedMultiTVListIterator.java | 2 +
.../iotdb/db/utils/datastructure/TVList.java | 9 +--
.../memtable/AlignedTVListIteratorTest.java | 8 +-
.../memtable/NonAlignedTVListIteratorTest.java | 8 +-
13 files changed, 134 insertions(+), 73 deletions(-)
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/AlignedReadOnlyMemChunk.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/AlignedReadOnlyMemChunk.java
index db19d099f18..6d13b49110e 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/AlignedReadOnlyMemChunk.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/AlignedReadOnlyMemChunk.java
@@ -50,7 +50,6 @@ import java.io.Serializable;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
-import java.util.stream.Collectors;
public class AlignedReadOnlyMemChunk extends ReadOnlyMemChunk {
private final String timeChunkName;
@@ -361,21 +360,7 @@ public class AlignedReadOnlyMemChunk extends
ReadOnlyMemChunk {
}
private void writeValidValuesIntoTsBlock(TsBlockBuilder builder) throws
IOException {
- List<AlignedTVList> alignedTvLists =
- alignedTvListQueryMap.keySet().stream()
- .map(x -> (AlignedTVList) x)
- .collect(Collectors.toList());
- MemPointIterator timeValuePairIterator =
- MemPointIteratorFactory.create(
- dataTypes,
- columnIndexList,
- alignedTvLists,
- Ordering.ASC,
- null,
- valueColumnsDeletionList,
- floatPrecision,
- encodingList,
- MAX_NUMBER_OF_POINTS_IN_PAGE);
+ MemPointIterator timeValuePairIterator =
createMemPointIterator(Ordering.ASC, null);
while (timeValuePairIterator.hasNextTimeValuePair()) {
TimeValuePair tvPair = timeValuePairIterator.nextTimeValuePair();
@@ -453,14 +438,17 @@ public class AlignedReadOnlyMemChunk extends
ReadOnlyMemChunk {
@Override
public MemPointIterator createMemPointIterator(Ordering scanOrder, Filter
globalTimeFilter) {
- List<AlignedTVList> alignedTvLists =
- alignedTvListQueryMap.keySet().stream()
- .map(x -> (AlignedTVList) x)
- .collect(Collectors.toList());
+ List<AlignedTVList> tvLists = new
ArrayList<>(alignedTvListQueryMap.size());
+ List<Integer> tvListRowCounts = new
ArrayList<>(alignedTvListQueryMap.size());
+ for (Map.Entry<TVList, Integer> entry : alignedTvListQueryMap.entrySet()) {
+ tvLists.add((AlignedTVList) entry.getKey());
+ tvListRowCounts.add(entry.getValue());
+ }
return MemPointIteratorFactory.create(
dataTypes,
columnIndexList,
- alignedTvLists,
+ tvLists,
+ tvListRowCounts,
scanOrder,
globalTimeFilter,
valueColumnsDeletionList,
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/ReadOnlyMemChunk.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/ReadOnlyMemChunk.java
index 5ba6f1d5ce9..64a8868c8c7 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/ReadOnlyMemChunk.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/ReadOnlyMemChunk.java
@@ -291,17 +291,7 @@ public class ReadOnlyMemChunk {
// read all data in memory chunk and write to tsblock
private void writeValidValuesIntoTsBlock(TsBlockBuilder builder) throws
IOException {
- List<TVList> tvLists = new ArrayList<>(tvListQueryMap.keySet());
- MemPointIterator timeValuePairIterator =
- MemPointIteratorFactory.create(
- getDataType(),
- tvLists,
- Ordering.ASC,
- null,
- deletionList,
- floatPrecision,
- encoding,
- MAX_NUMBER_OF_POINTS_IN_PAGE);
+ MemPointIterator timeValuePairIterator =
createMemPointIterator(Ordering.ASC, null);
while (timeValuePairIterator.hasNextTimeValuePair()) {
TimeValuePair tvPair = timeValuePairIterator.nextTimeValuePair();
@@ -378,10 +368,16 @@ public class ReadOnlyMemChunk {
}
public MemPointIterator createMemPointIterator(Ordering scanOrder, Filter
globalTimeFilter) {
- List<TVList> tvLists = new ArrayList<>(tvListQueryMap.keySet());
+ List<TVList> tvLists = new ArrayList<>(tvListQueryMap.size());
+ List<Integer> tvListRowCounts = new ArrayList<>(tvListQueryMap.size());
+ for (Map.Entry<TVList, Integer> entry : tvListQueryMap.entrySet()) {
+ tvLists.add(entry.getKey());
+ tvListRowCounts.add(entry.getValue());
+ }
return MemPointIteratorFactory.create(
dataType,
tvLists,
+ tvListRowCounts,
scanOrder,
globalTimeFilter,
deletionList,
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/AlignedTVList.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/AlignedTVList.java
index 2b643a52128..4971775fefa 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/AlignedTVList.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/AlignedTVList.java
@@ -1334,6 +1334,7 @@ public abstract class AlignedTVList extends TVList {
public AlignedTVListIterator iterator(
Ordering scanOrder,
+ int rowCount,
Filter globalTimeFilter,
List<TSDataType> dataTypeList,
List<Integer> columnIndexList,
@@ -1343,6 +1344,7 @@ public abstract class AlignedTVList extends TVList {
int maxNumberOfPointsInPage) {
return new AlignedTVListIterator(
scanOrder,
+ rowCount,
globalTimeFilter,
dataTypeList,
columnIndexList,
@@ -1369,6 +1371,7 @@ public abstract class AlignedTVList extends TVList {
public AlignedTVListIterator(
Ordering scanOrder,
+ int rowCount,
Filter globalTimeFilter,
List<TSDataType> dataTypeList,
List<Integer> columnIndexList,
@@ -1376,7 +1379,7 @@ public abstract class AlignedTVList extends TVList {
Integer floatPrecision,
List<TSEncoding> encodingList,
int maxNumberOfPointsInPage) {
- super(scanOrder, globalTimeFilter, null, null, null,
maxNumberOfPointsInPage);
+ super(scanOrder, rowCount, globalTimeFilter, null, null, null,
maxNumberOfPointsInPage);
this.dataTypeList = dataTypeList;
this.columnIndexList =
(columnIndexList == null)
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/MemPointIteratorFactory.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/MemPointIteratorFactory.java
index 4d549afb2fd..14efebb19c8 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/MemPointIteratorFactory.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/MemPointIteratorFactory.java
@@ -35,18 +35,29 @@ public class MemPointIteratorFactory {
// TVListIterator
private static MemPointIterator single(List<TVList> tvLists, int
maxNumberOfPointsInPage) {
- return tvLists.get(0).iterator(Ordering.ASC, null, null, null, null,
maxNumberOfPointsInPage);
+ return tvLists
+ .get(0)
+ .iterator(
+ Ordering.ASC, tvLists.get(0).rowCount, null, null, null, null,
maxNumberOfPointsInPage);
}
private static MemPointIterator single(
List<TVList> tvLists, List<TimeRange> deletionList, int
maxNumberOfPointsInPage) {
return tvLists
.get(0)
- .iterator(Ordering.ASC, null, deletionList, null, null,
maxNumberOfPointsInPage);
+ .iterator(
+ Ordering.ASC,
+ tvLists.get(0).rowCount,
+ null,
+ deletionList,
+ null,
+ null,
+ maxNumberOfPointsInPage);
}
private static MemPointIterator single(
List<TVList> tvLists,
+ List<Integer> tvListRowCounts,
Ordering scanOrder,
Filter globalTimeFilter,
List<TimeRange> deletionList,
@@ -57,6 +68,7 @@ public class MemPointIteratorFactory {
.get(0)
.iterator(
scanOrder,
+ tvListRowCounts.get(0),
globalTimeFilter,
deletionList,
floatPrecision,
@@ -68,7 +80,7 @@ public class MemPointIteratorFactory {
private static MemPointIterator mergeSort(
TSDataType tsDataType, List<TVList> tvLists, int
maxNumberOfPointsInPage) {
return new MergeSortMultiTVListIterator(
- Ordering.ASC, null, tsDataType, tvLists, null, null, null,
maxNumberOfPointsInPage);
+ Ordering.ASC, null, tsDataType, tvLists, null, null, null, null,
maxNumberOfPointsInPage);
}
private static MemPointIterator mergeSort(
@@ -77,12 +89,21 @@ public class MemPointIteratorFactory {
List<TimeRange> deletionList,
int maxNumberOfPointsInPage) {
return new MergeSortMultiTVListIterator(
- Ordering.ASC, null, tsDataType, tvLists, deletionList, null, null,
maxNumberOfPointsInPage);
+ Ordering.ASC,
+ null,
+ tsDataType,
+ tvLists,
+ null,
+ deletionList,
+ null,
+ null,
+ maxNumberOfPointsInPage);
}
private static MemPointIterator mergeSort(
TSDataType tsDataType,
List<TVList> tvLists,
+ List<Integer> tvListRowCounts,
Ordering scanOrder,
Filter globalTimeFilter,
List<TimeRange> deletionList,
@@ -94,6 +115,7 @@ public class MemPointIteratorFactory {
globalTimeFilter,
tsDataType,
tvLists,
+ tvListRowCounts,
deletionList,
floatPrecision,
encoding,
@@ -104,7 +126,7 @@ public class MemPointIteratorFactory {
private static MemPointIterator ordered(
TSDataType tsDataType, List<TVList> tvLists, int
maxNumberOfPointsInPage) {
return new OrderedMultiTVListIterator(
- Ordering.ASC, null, tsDataType, tvLists, null, null, null,
maxNumberOfPointsInPage);
+ Ordering.ASC, null, tsDataType, tvLists, null, null, null, null,
maxNumberOfPointsInPage);
}
private static MemPointIterator ordered(
@@ -113,12 +135,21 @@ public class MemPointIteratorFactory {
List<TimeRange> deletionList,
int maxNumberOfPointsInPage) {
return new OrderedMultiTVListIterator(
- Ordering.ASC, null, tsDataType, tvLists, deletionList, null, null,
maxNumberOfPointsInPage);
+ Ordering.ASC,
+ null,
+ tsDataType,
+ tvLists,
+ null,
+ deletionList,
+ null,
+ null,
+ maxNumberOfPointsInPage);
}
private static MemPointIterator ordered(
TSDataType tsDataType,
List<TVList> tvLists,
+ List<Integer> tvListRowCounts,
Ordering scanOrder,
Filter globalTimeFilter,
List<TimeRange> deletionList,
@@ -130,6 +161,7 @@ public class MemPointIteratorFactory {
globalTimeFilter,
tsDataType,
tvLists,
+ tvListRowCounts,
deletionList,
floatPrecision,
encoding,
@@ -146,6 +178,7 @@ public class MemPointIteratorFactory {
.get(0)
.iterator(
Ordering.ASC,
+ alignedTvLists.get(0).rowCount,
null,
tsDataTypes,
columnIndexList,
@@ -165,6 +198,7 @@ public class MemPointIteratorFactory {
.get(0)
.iterator(
Ordering.ASC,
+ alignedTvLists.get(0).rowCount,
null,
tsDataTypes,
columnIndexList,
@@ -178,6 +212,7 @@ public class MemPointIteratorFactory {
List<TSDataType> tsDataTypes,
List<Integer> columnIndexList,
List<AlignedTVList> alignedTvLists,
+ List<Integer> tvListRowCounts,
Ordering scanOrder,
Filter globalTimeFilter,
List<List<TimeRange>> valueColumnsDeletionList,
@@ -188,6 +223,7 @@ public class MemPointIteratorFactory {
.get(0)
.iterator(
scanOrder,
+ tvListRowCounts.get(0),
globalTimeFilter,
tsDataTypes,
columnIndexList,
@@ -207,6 +243,7 @@ public class MemPointIteratorFactory {
tsDataTypes,
columnIndexList,
alignedTvLists,
+ null,
Ordering.ASC,
null,
null,
@@ -225,6 +262,7 @@ public class MemPointIteratorFactory {
tsDataTypes,
columnIndexList,
alignedTvLists,
+ null,
Ordering.ASC,
null,
valueColumnsDeletionList,
@@ -237,6 +275,7 @@ public class MemPointIteratorFactory {
List<TSDataType> tsDataTypes,
List<Integer> columnIndexList,
List<AlignedTVList> alignedTvLists,
+ List<Integer> tvListRowCounts,
Ordering scanOrder,
Filter globalTimeFilter,
List<List<TimeRange>> valueColumnsDeletionList,
@@ -247,6 +286,7 @@ public class MemPointIteratorFactory {
tsDataTypes,
columnIndexList,
alignedTvLists,
+ tvListRowCounts,
scanOrder,
globalTimeFilter,
valueColumnsDeletionList,
@@ -265,6 +305,7 @@ public class MemPointIteratorFactory {
tsDataTypes,
columnIndexList,
alignedTvLists,
+ null,
Ordering.ASC,
null,
null,
@@ -283,6 +324,7 @@ public class MemPointIteratorFactory {
tsDataTypes,
columnIndexList,
alignedTvLists,
+ null,
Ordering.ASC,
null,
valueColumnsDeletionList,
@@ -295,6 +337,7 @@ public class MemPointIteratorFactory {
List<TSDataType> tsDataTypes,
List<Integer> columnIndexList,
List<AlignedTVList> alignedTvLists,
+ List<Integer> tvListRowCounts,
Ordering scanOrder,
Filter globalTimeFilter,
List<List<TimeRange>> valueColumnsDeletionList,
@@ -305,6 +348,7 @@ public class MemPointIteratorFactory {
tsDataTypes,
columnIndexList,
alignedTvLists,
+ tvListRowCounts,
scanOrder,
globalTimeFilter,
valueColumnsDeletionList,
@@ -317,7 +361,7 @@ public class MemPointIteratorFactory {
TSDataType tsDataType, List<TVList> tvLists, int
maxNumberOfPointsInPage) {
if (tvLists.size() == 1) {
return single(tvLists, maxNumberOfPointsInPage);
- } else if (isCompleteOrdered(tvLists)) {
+ } else if (isCompleteOrdered(tvLists, null)) {
return ordered(tsDataType, tvLists, maxNumberOfPointsInPage);
} else {
return mergeSort(tsDataType, tvLists, maxNumberOfPointsInPage);
@@ -331,7 +375,7 @@ public class MemPointIteratorFactory {
int maxNumberOfPointsInPage) {
if (tvLists.size() == 1) {
return single(tvLists, deletionList, maxNumberOfPointsInPage);
- } else if (isCompleteOrdered(tvLists)) {
+ } else if (isCompleteOrdered(tvLists, null)) {
return ordered(tsDataType, tvLists, deletionList,
maxNumberOfPointsInPage);
} else {
return mergeSort(tsDataType, tvLists, deletionList,
maxNumberOfPointsInPage);
@@ -341,6 +385,7 @@ public class MemPointIteratorFactory {
public static MemPointIterator create(
TSDataType tsDataType,
List<TVList> tvLists,
+ List<Integer> tvListRowCounts,
Ordering scanOrder,
Filter globalTimeFilter,
List<TimeRange> deletionList,
@@ -350,16 +395,18 @@ public class MemPointIteratorFactory {
if (tvLists.size() == 1) {
return single(
tvLists,
+ tvListRowCounts,
scanOrder,
globalTimeFilter,
deletionList,
floatPrecision,
encoding,
maxNumberOfPointsInPage);
- } else if (isCompleteOrdered(tvLists)) {
+ } else if (isCompleteOrdered(tvLists, tvListRowCounts)) {
return ordered(
tsDataType,
tvLists,
+ tvListRowCounts,
scanOrder,
globalTimeFilter,
deletionList,
@@ -370,6 +417,7 @@ public class MemPointIteratorFactory {
return mergeSort(
tsDataType,
tvLists,
+ tvListRowCounts,
scanOrder,
globalTimeFilter,
deletionList,
@@ -386,7 +434,7 @@ public class MemPointIteratorFactory {
int maxNumberOfPointsInPage) {
if (alignedTvLists.size() == 1) {
return single(tsDataTypes, columnIndexList, alignedTvLists,
maxNumberOfPointsInPage);
- } else if (isCompleteOrdered(alignedTvLists)) {
+ } else if (isCompleteOrdered(alignedTvLists, null)) {
return ordered(tsDataTypes, columnIndexList, alignedTvLists,
maxNumberOfPointsInPage);
} else {
return mergeSort(tsDataTypes, columnIndexList, alignedTvLists,
maxNumberOfPointsInPage);
@@ -406,7 +454,7 @@ public class MemPointIteratorFactory {
alignedTvLists,
valueColumnsDeletionList,
maxNumberOfPointsInPage);
- } else if (isCompleteOrdered(alignedTvLists)) {
+ } else if (isCompleteOrdered(alignedTvLists, null)) {
return ordered(
tsDataTypes,
columnIndexList,
@@ -427,6 +475,7 @@ public class MemPointIteratorFactory {
List<TSDataType> tsDataTypes,
List<Integer> columnIndexList,
List<AlignedTVList> alignedTvLists,
+ List<Integer> tvListRowCounts,
Ordering scanOrder,
Filter globalTimeFilter,
List<List<TimeRange>> valueColumnsDeletionList,
@@ -438,17 +487,19 @@ public class MemPointIteratorFactory {
tsDataTypes,
columnIndexList,
alignedTvLists,
+ tvListRowCounts,
scanOrder,
globalTimeFilter,
valueColumnsDeletionList,
floatPrecision,
encodingList,
maxNumberOfPointsInPage);
- } else if (isCompleteOrdered(alignedTvLists)) {
+ } else if (isCompleteOrdered(alignedTvLists, tvListRowCounts)) {
return ordered(
tsDataTypes,
columnIndexList,
alignedTvLists,
+ tvListRowCounts,
scanOrder,
globalTimeFilter,
valueColumnsDeletionList,
@@ -460,6 +511,7 @@ public class MemPointIteratorFactory {
tsDataTypes,
columnIndexList,
alignedTvLists,
+ tvListRowCounts,
scanOrder,
globalTimeFilter,
valueColumnsDeletionList,
@@ -469,21 +521,24 @@ public class MemPointIteratorFactory {
}
}
- private static boolean isCompleteOrdered(List<? extends TVList> tvLists) {
+ private static boolean isCompleteOrdered(
+ List<? extends TVList> tvLists, List<Integer> tvListRowCounts) {
long time = Long.MIN_VALUE;
for (int i = 0; i < tvLists.size(); i++) {
TVList list = tvLists.get(i);
- if (!list.isSorted()) {
- return false;
- }
+ int rowCount = tvListRowCounts == null ? list.rowCount() :
tvListRowCounts.get(i);
- if (tvLists.get(i).rowCount() == 0) {
+ if (rowCount == 0) {
continue;
}
+ if (list.seqRowCount() < rowCount) {
+ return false;
+ }
+
if (i > 0 && list.getTime(0) <= time) {
return false;
}
- time = list.getTime(list.rowCount() - 1);
+ time = list.getTime(rowCount - 1);
}
return true;
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/MergeSortMultiAlignedTVListIterator.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/MergeSortMultiAlignedTVListIterator.java
index bf96e822b06..c507bd75d50 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/MergeSortMultiAlignedTVListIterator.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/MergeSortMultiAlignedTVListIterator.java
@@ -54,6 +54,7 @@ public class MergeSortMultiAlignedTVListIterator extends
MultiAlignedTVListItera
List<TSDataType> tsDataTypes,
List<Integer> columnIndexList,
List<AlignedTVList> alignedTvLists,
+ List<Integer> tvListRowCounts,
Ordering scanOrder,
Filter globalTimeFilter,
List<List<TimeRange>> valueColumnsDeletionList,
@@ -64,6 +65,7 @@ public class MergeSortMultiAlignedTVListIterator extends
MultiAlignedTVListItera
tsDataTypes,
columnIndexList,
alignedTvLists,
+ tvListRowCounts,
scanOrder,
globalTimeFilter,
valueColumnsDeletionList,
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/MergeSortMultiTVListIterator.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/MergeSortMultiTVListIterator.java
index 44fa0d47807..2e33811c257 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/MergeSortMultiTVListIterator.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/MergeSortMultiTVListIterator.java
@@ -47,6 +47,7 @@ public class MergeSortMultiTVListIterator extends
MultiTVListIterator {
Filter globalTimeFilter,
TSDataType tsDataType,
List<TVList> tvLists,
+ List<Integer> tvListRowCounts,
List<TimeRange> deletionList,
Integer floatPrecision,
TSEncoding encoding,
@@ -56,6 +57,7 @@ public class MergeSortMultiTVListIterator extends
MultiTVListIterator {
globalTimeFilter,
tsDataType,
tvLists,
+ tvListRowCounts,
deletionList,
floatPrecision,
encoding,
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/MultiAlignedTVListIterator.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/MultiAlignedTVListIterator.java
index 64ed298da24..373b53c9f90 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/MultiAlignedTVListIterator.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/MultiAlignedTVListIterator.java
@@ -60,6 +60,7 @@ public abstract class MultiAlignedTVListIterator extends
MemPointIterator {
List<TSDataType> tsDataTypeList,
List<Integer> columnIndexList,
List<AlignedTVList> alignedTvLists,
+ List<Integer> tvListRowCounts,
Ordering scanOrder,
Filter globalTimeFilter,
List<List<TimeRange>> valueColumnsDeletionList,
@@ -71,10 +72,12 @@ public abstract class MultiAlignedTVListIterator extends
MemPointIterator {
this.columnIndexList = columnIndexList;
this.alignedTvListIterators = new ArrayList<>(alignedTvLists.size());
if (scanOrder.isAscending()) {
- for (AlignedTVList alignedTVList : alignedTvLists) {
+ for (int i = 0; i < alignedTvLists.size(); i++) {
+ AlignedTVList alignedTVList = alignedTvLists.get(i);
AlignedTVList.AlignedTVListIterator iterator =
alignedTVList.iterator(
scanOrder,
+ tvListRowCounts == null ? alignedTVList.rowCount() :
tvListRowCounts.get(i),
globalTimeFilter,
tsDataTypeList,
columnIndexList,
@@ -90,6 +93,7 @@ public abstract class MultiAlignedTVListIterator extends
MemPointIterator {
AlignedTVList.AlignedTVListIterator iterator =
alignedTVList.iterator(
scanOrder,
+ tvListRowCounts == null ? alignedTVList.rowCount() :
tvListRowCounts.get(i),
globalTimeFilter,
tsDataTypeList,
columnIndexList,
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/MultiTVListIterator.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/MultiTVListIterator.java
index a8d5879b767..b735cc7927b 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/MultiTVListIterator.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/MultiTVListIterator.java
@@ -56,6 +56,7 @@ public abstract class MultiTVListIterator extends
MemPointIterator {
Filter globalTimeFilter,
TSDataType tsDataType,
List<TVList> tvLists,
+ List<Integer> tvListRowCounts,
List<TimeRange> deletionList,
Integer floatPrecision,
TSEncoding encoding,
@@ -64,18 +65,33 @@ public abstract class MultiTVListIterator extends
MemPointIterator {
this.tsDataType = tsDataType;
this.tvListIterators = new ArrayList<>(tvLists.size());
if (scanOrder.isAscending()) {
- for (TVList tvList : tvLists) {
+ for (int i = 0; i < tvLists.size(); i++) {
+ TVList tvList = tvLists.get(i);
+ int rowCount = tvListRowCounts == null ? tvList.rowCount :
tvListRowCounts.get(i);
TVList.TVListIterator iterator =
tvList.iterator(
- scanOrder, globalTimeFilter, deletionList, null, null,
maxNumberOfPointsInPage);
+ scanOrder,
+ rowCount,
+ globalTimeFilter,
+ deletionList,
+ null,
+ null,
+ maxNumberOfPointsInPage);
tvListIterators.add(iterator);
}
} else {
for (int i = tvLists.size() - 1; i >= 0; i--) {
TVList tvList = tvLists.get(i);
+ int rowCount = tvListRowCounts == null ? tvList.rowCount :
tvListRowCounts.get(i);
TVList.TVListIterator iterator =
tvList.iterator(
- scanOrder, globalTimeFilter, deletionList, null, null,
maxNumberOfPointsInPage);
+ scanOrder,
+ rowCount,
+ globalTimeFilter,
+ deletionList,
+ null,
+ null,
+ maxNumberOfPointsInPage);
tvListIterators.add(iterator);
}
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/OrderedMultiAlignedTVListIterator.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/OrderedMultiAlignedTVListIterator.java
index c138b1ff07b..46d8c5f02ac 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/OrderedMultiAlignedTVListIterator.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/OrderedMultiAlignedTVListIterator.java
@@ -41,6 +41,7 @@ public class OrderedMultiAlignedTVListIterator extends
MultiAlignedTVListIterato
List<TSDataType> tsDataTypes,
List<Integer> columnIndexList,
List<AlignedTVList> alignedTvLists,
+ List<Integer> tvListRowCounts,
Ordering scanOrder,
Filter globalTimeFilter,
List<List<TimeRange>> valueColumnsDeletionList,
@@ -51,6 +52,7 @@ public class OrderedMultiAlignedTVListIterator extends
MultiAlignedTVListIterato
tsDataTypes,
columnIndexList,
alignedTvLists,
+ tvListRowCounts,
scanOrder,
globalTimeFilter,
valueColumnsDeletionList,
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/OrderedMultiTVListIterator.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/OrderedMultiTVListIterator.java
index c7ae99386c5..61ae5f1360d 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/OrderedMultiTVListIterator.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/OrderedMultiTVListIterator.java
@@ -35,6 +35,7 @@ public class OrderedMultiTVListIterator extends
MultiTVListIterator {
Filter globalTimeFilter,
TSDataType tsDataType,
List<TVList> tvLists,
+ List<Integer> tvListRowCounts,
List<TimeRange> deletionList,
Integer floatPrecision,
TSEncoding encoding,
@@ -44,6 +45,7 @@ public class OrderedMultiTVListIterator extends
MultiTVListIterator {
globalTimeFilter,
tsDataType,
tvLists,
+ tvListRowCounts,
deletionList,
floatPrecision,
encoding,
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/TVList.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/TVList.java
index 3d5ee978469..c88968d6ffa 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/TVList.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/TVList.java
@@ -246,10 +246,6 @@ public abstract class TVList implements WALEntryValue {
return indices.get(arrayIndex)[elementIndex];
}
- public int getValueIndex(int index, Ordering ordering) {
- return ordering.isAscending() ? getValueIndex(index) :
getValueIndex(rowCount - 1 - index);
- }
-
protected void markNullValue(int arrayIndex, int elementIndex) {
// init bitMap if doesn't have
if (bitMap == null) {
@@ -650,6 +646,7 @@ public abstract class TVList implements WALEntryValue {
public TVListIterator iterator(
Ordering scanOrder,
+ int rowCount,
Filter globalTimeFilter,
List<TimeRange> deletionList,
Integer floatPrecision,
@@ -657,6 +654,7 @@ public abstract class TVList implements WALEntryValue {
int maxNumberOfPointsInPage) {
return new TVListIterator(
scanOrder,
+ rowCount,
globalTimeFilter,
deletionList,
floatPrecision,
@@ -681,6 +679,7 @@ public abstract class TVList implements WALEntryValue {
public TVListIterator(
Ordering scanOrder,
+ int rowCount,
Filter globalTimeFilter,
List<TimeRange> deletionList,
Integer floatPrecision,
@@ -942,7 +941,7 @@ public abstract class TVList implements WALEntryValue {
// When traversing in desc order, the index needs to be converted
public int getScanOrderIndex(int rowIndex) {
- return scanOrder.isAscending() ? rowIndex : rowCount - 1 - rowIndex;
+ return scanOrder.isAscending() ? rowIndex : rows - 1 - rowIndex;
}
@Override
diff --git
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/memtable/AlignedTVListIteratorTest.java
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/memtable/AlignedTVListIteratorTest.java
index 6e3e21ff904..59e58182f6f 100644
---
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/memtable/AlignedTVListIteratorTest.java
+++
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/memtable/AlignedTVListIteratorTest.java
@@ -738,7 +738,6 @@ public class AlignedTVListIteratorTest {
AlignedTVList alignedTVList =
AlignedTVList.newAlignedList(
Arrays.asList(TSDataType.INT64, TSDataType.BOOLEAN,
TSDataType.BOOLEAN));
- int rowCount = 0;
for (TimeRange timeRange : timeRanges) {
long start = timeRange.getMin();
long end = timeRange.getMax();
@@ -757,11 +756,10 @@ public class AlignedTVListIteratorTest {
}
alignedTVList.putAlignedValue(
timestamp, new Object[] {timestamp, timestamp % 2 == 0, true});
- rowCount++;
}
}
Map<TVList, Integer> tvListMap = new HashMap<>();
- tvListMap.put(alignedTVList, rowCount);
+ tvListMap.put(alignedTVList, alignedTVList.rowCount());
return tvListMap;
}
@@ -772,7 +770,6 @@ public class AlignedTVListIteratorTest {
AlignedTVList alignedTVList =
AlignedTVList.newAlignedList(
Arrays.asList(TSDataType.INT64, TSDataType.BOOLEAN,
TSDataType.BOOLEAN));
- int rowCount = 0;
long start = timeRange.getMin();
long end = timeRange.getMax();
List<Long> timestamps = new ArrayList<>((int) (end - start + 1));
@@ -790,9 +787,8 @@ public class AlignedTVListIteratorTest {
}
alignedTVList.putAlignedValue(
timestamp, new Object[] {timestamp, timestamp % 2 == 0, isLast});
- rowCount++;
}
- tvListMap.put(alignedTVList, rowCount);
+ tvListMap.put(alignedTVList, alignedTVList.rowCount());
}
return tvListMap;
}
diff --git
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/memtable/NonAlignedTVListIteratorTest.java
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/memtable/NonAlignedTVListIteratorTest.java
index 62c801f4e34..a984eaada1f 100644
---
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/memtable/NonAlignedTVListIteratorTest.java
+++
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/memtable/NonAlignedTVListIteratorTest.java
@@ -582,7 +582,6 @@ public class NonAlignedTVListIteratorTest {
private static Map<TVList, Integer>
buildNonAlignedSingleTvListMap(List<TimeRange> timeRanges) {
TVList tvList = TVList.newList(TSDataType.INT64);
- int rowCount = 0;
for (TimeRange timeRange : timeRanges) {
long start = timeRange.getMin();
long end = timeRange.getMax();
@@ -599,11 +598,10 @@ public class NonAlignedTVListIteratorTest {
}
}
tvList.putLong(timestamp, timestamp);
- rowCount++;
}
}
Map<TVList, Integer> tvListMap = new HashMap<>();
- tvListMap.put(tvList, rowCount);
+ tvListMap.put(tvList, tvList.rowCount());
return tvListMap;
}
@@ -611,7 +609,6 @@ public class NonAlignedTVListIteratorTest {
Map<TVList, Integer> tvListMap = new LinkedHashMap<>();
for (TimeRange timeRange : timeRanges) {
TVList tvList = TVList.newList(TSDataType.INT64);
- int rowCount = 0;
long start = timeRange.getMin();
long end = timeRange.getMax();
List<Long> timestamps = new ArrayList<>((int) (end - start + 1));
@@ -621,9 +618,8 @@ public class NonAlignedTVListIteratorTest {
Collections.shuffle(timestamps);
for (Long timestamp : timestamps) {
tvList.putLong(timestamp, timestamp);
- rowCount++;
}
- tvListMap.put(tvList, rowCount);
+ tvListMap.put(tvList, tvList.rowCount());
}
return tvListMap;
}