This is an automated email from the ASF dual-hosted git repository. shuwenwei pushed a commit to branch skipNotSatisfiedTimeRange-1.3 in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit a179718fb5b5af0d395f7a99d8b0bd2868a74d58 Author: shuwenwei <[email protected]> AuthorDate: Fri Oct 17 18:29:12 2025 +0800 Optimize memtable scan --- .../fragment/FragmentInstanceContext.java | 20 ++ .../execution/operator/source/SeriesScanUtil.java | 65 +++++- .../read/reader/common/NoDataPointReader.java | 59 ++++++ .../db/utils/datastructure/AlignedTVList.java | 90 +++++--- .../iotdb/db/utils/datastructure/LazyBitMap.java | 93 +++++++++ .../db/utils/datastructure/MemPointIterator.java | 3 + .../MergeSortMultiAlignedTVListIterator.java | 22 ++ .../MergeSortMultiTVListIterator.java | 22 ++ .../OrderedMultiAlignedTVListIterator.java | 24 +++ .../datastructure/OrderedMultiTVListIterator.java | 24 +++ .../iotdb/db/utils/datastructure/TVList.java | 229 ++++++++++++++++++++- .../memtable/AlignedTVListIteratorTest.java | 110 ++++++++++ .../memtable/NonAlignedTVListIteratorTest.java | 87 ++++++++ 13 files changed, 799 insertions(+), 49 deletions(-) diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/fragment/FragmentInstanceContext.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/fragment/FragmentInstanceContext.java index cdb44db5f66..c8c89d05399 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/fragment/FragmentInstanceContext.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/fragment/FragmentInstanceContext.java @@ -55,12 +55,14 @@ import org.apache.iotdb.db.utils.datastructure.TVList; import org.apache.iotdb.mpp.rpc.thrift.TFetchFragmentInstanceStatisticsResp; import org.apache.tsfile.file.metadata.IDeviceID; +import org.apache.tsfile.read.common.TimeRange; import org.apache.tsfile.read.filter.basic.Filter; import org.apache.tsfile.utils.RamUsageEstimator; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.time.ZoneId; +import java.util.Collections; import java.util.HashSet; import java.util.List; import java.util.Map; @@ -91,6 +93,7 @@ public class FragmentInstanceContext extends QueryContext { protected IDataRegionForQuery dataRegion; private Filter globalTimeFilter; + private List<TimeRange> globalTimeFilterTimeRanges; // it will only be used once, after sharedQueryDataSource being inited, it will be set to null protected List<PartialPath> sourcePaths; @@ -511,6 +514,23 @@ public class FragmentInstanceContext extends QueryContext { return globalTimeFilter; } + public List<TimeRange> getGlobalTimeFilterTimeRanges() { + if (globalTimeFilter == null) { + return Collections.singletonList(new TimeRange(Long.MIN_VALUE, Long.MAX_VALUE)); + } + List<TimeRange> local = globalTimeFilterTimeRanges; + if (local == null) { + synchronized (this) { + local = globalTimeFilterTimeRanges; + if (local == null) { + local = globalTimeFilter.getTimeRanges(); + globalTimeFilterTimeRanges = local; + } + } + } + return local; + } + public IDataRegionForQuery getDataRegion() { return dataRegion; } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/SeriesScanUtil.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/SeriesScanUtil.java index d9e49d63ebf..2df717d9738 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/SeriesScanUtil.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/SeriesScanUtil.java @@ -35,6 +35,7 @@ import org.apache.iotdb.db.storageengine.dataregion.read.reader.chunk.MemChunkLo import org.apache.iotdb.db.storageengine.dataregion.read.reader.chunk.MemPageReader; import org.apache.iotdb.db.storageengine.dataregion.read.reader.common.DescPriorityMergeReader; import org.apache.iotdb.db.storageengine.dataregion.read.reader.common.MergeReaderPriority; +import org.apache.iotdb.db.storageengine.dataregion.read.reader.common.NoDataPointReader; import org.apache.iotdb.db.storageengine.dataregion.read.reader.common.PriorityMergeReader; import org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileResource; import org.apache.iotdb.db.utils.datastructure.MemPointIterator; @@ -131,7 +132,10 @@ public class SeriesScanUtil implements Accountable { + RamUsageEstimator.shallowSizeOfInstance(IDeviceID.class) + RamUsageEstimator.shallowSizeOfInstance(TimeOrderUtils.class) + RamUsageEstimator.shallowSizeOfInstance(PaginationController.class) - + RamUsageEstimator.shallowSizeOfInstance(SeriesScanOptions.class); + + RamUsageEstimator.shallowSizeOfInstance(SeriesScanOptions.class) + + RamUsageEstimator.shallowSizeOfInstance(TimeRange.class); + + protected TimeRange satisfiedTimeRange; public SeriesScanUtil( PartialPath seriesPath, @@ -193,6 +197,20 @@ public class SeriesScanUtil implements Accountable { // init file index orderUtils.setCurSeqFileIndex(dataSource); curUnseqFileIndex = 0; + + if (satisfiedTimeRange == null) { + long startTime = Long.MAX_VALUE; + long endTime = Long.MIN_VALUE; + if (scanOptions.getGlobalTimeFilter() == null) { + satisfiedTimeRange = new TimeRange(Long.MIN_VALUE, Long.MAX_VALUE); + return; + } + for (TimeRange timeRange : context.getGlobalTimeFilterTimeRanges()) { + startTime = Math.min(startTime, timeRange.getMin()); + endTime = Math.max(endTime, timeRange.getMax()); + } + satisfiedTimeRange = new TimeRange(startTime, endTime); + } } protected PriorityMergeReader getPriorityMergeReader() { @@ -660,6 +678,13 @@ public class SeriesScanUtil implements Accountable { readOnlyMemChunk.createMemPointIterator( orderUtils.getScanOrder(), scanOptions.getGlobalTimeFilter()); for (Statistics<? extends Serializable> statistics : statisticsList) { + long orderTime = orderUtils.getOrderTime(statistics); + boolean canSkip = + (orderUtils.getAscending() && orderTime > satisfiedTimeRange.getMax()) + || (!orderUtils.getAscending() && orderTime < satisfiedTimeRange.getMin()); + if (canSkip) { + break; + } IVersionPageReader versionPageReader = new LazyMemVersionPageReader( context, @@ -1434,6 +1459,7 @@ public class SeriesScanUtil implements Accountable { protected final boolean isSeq; protected final boolean isAligned; private boolean inited = false; + private boolean hasData = true; LazyMemVersionPageReader( QueryContext context, @@ -1453,11 +1479,14 @@ public class SeriesScanUtil implements Accountable { } public IPointReader getPointReader() { + if (!hasData) { + return NoDataPointReader.getInstance(); + } return memPointIterator; } public boolean hasNextBatch() { - return memPointIterator.hasNextBatch(); + return hasData && memPointIterator.hasNextBatch(); } public void setCurrentPageTimeRangeToMemPointIterator() { @@ -1466,10 +1495,34 @@ public class SeriesScanUtil implements Accountable { } if (statistics.getStartTime() > statistics.getEndTime()) { // empty + hasData = false; return; } - this.memPointIterator.setCurrentPageTimeRange( - new TimeRange(statistics.getStartTime(), statistics.getEndTime())); + Filter globalTimeFilter = ((FragmentInstanceContext) context).getGlobalTimeFilter(); + if (globalTimeFilter == null) { + this.memPointIterator.setCurrentPageTimeRange( + new TimeRange(statistics.getStartTime(), statistics.getEndTime())); + return; + } + + long startTime = statistics.getStartTime(); + long endTime = statistics.getEndTime(); + long minStart = Long.MAX_VALUE; + long maxEnd = Long.MIN_VALUE; + for (TimeRange timeRange : + ((FragmentInstanceContext) context).getGlobalTimeFilterTimeRanges()) { + if (timeRange.overlaps(new TimeRange(startTime, endTime))) { + minStart = Math.min(minStart, Math.max(timeRange.getMin(), startTime)); + maxEnd = Math.max(maxEnd, Math.min(timeRange.getMax(), endTime)); + } + } + + if (minStart > maxEnd) { + hasData = false; + return; + } + + this.memPointIterator.setCurrentPageTimeRange(new TimeRange(minStart, maxEnd)); } public TsBlock nextBatch() { @@ -1605,7 +1658,7 @@ public class SeriesScanUtil implements Accountable { @SuppressWarnings("squid:S3740") @Override public long getOverlapCheckTime(Statistics range) { - return range.getStartTime(); + return Math.max(satisfiedTimeRange.getMin(), range.getStartTime()); } @SuppressWarnings("squid:S3740") @@ -1734,7 +1787,7 @@ public class SeriesScanUtil implements Accountable { @SuppressWarnings("squid:S3740") @Override public long getOverlapCheckTime(Statistics range) { - return range.getEndTime(); + return Math.min(satisfiedTimeRange.getMax(), range.getEndTime()); } @SuppressWarnings("squid:S3740") diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/read/reader/common/NoDataPointReader.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/read/reader/common/NoDataPointReader.java new file mode 100644 index 00000000000..945318f6d22 --- /dev/null +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/read/reader/common/NoDataPointReader.java @@ -0,0 +1,59 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.db.storageengine.dataregion.read.reader.common; + +import org.apache.tsfile.read.TimeValuePair; +import org.apache.tsfile.read.reader.IPointReader; + +import java.io.IOException; + +public class NoDataPointReader implements IPointReader { + + private NoDataPointReader() {} + + private static final IPointReader instance = new NoDataPointReader(); + + public static IPointReader getInstance() { + return instance; + } + + @Override + public boolean hasNextTimeValuePair() throws IOException { + return false; + } + + @Override + public TimeValuePair nextTimeValuePair() throws IOException { + return null; + } + + @Override + public TimeValuePair currentTimeValuePair() throws IOException { + return null; + } + + @Override + public long getUsedMemorySize() { + return 0; + } + + @Override + public void close() throws IOException {} +} diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/AlignedTVList.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/AlignedTVList.java index 18853a94596..cf701383b37 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/AlignedTVList.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/AlignedTVList.java @@ -1617,13 +1617,16 @@ public abstract class AlignedTVList extends TVList { @Override public TsBlock nextBatch() { - TsBlockBuilder builder = new TsBlockBuilder(maxNumberOfPointsInPage, dataTypeList); + int maxRowCountOfCurrentBatch = Math.min(rows - index, maxNumberOfPointsInPage); + TsBlockBuilder builder = new TsBlockBuilder(maxRowCountOfCurrentBatch, dataTypeList); // Time column TimeColumnBuilder timeBuilder = builder.getTimeColumnBuilder(); int validRowCount = 0; - // duplicated time or deleted time are all invalid, true if we don't need this row - BitMap timeDuplicateInfo = null; + // Rows that are deleted or whose timestamps do not match the filter are considered invalid. + // The corresponding bit is set to true if the row is not needed. + LazyBitMap timeInvalidInfo = null; + LazyBitMap timeDuplicatedInfo = null; int startIndex = index; // time column @@ -1632,12 +1635,15 @@ public abstract class AlignedTVList extends TVList { if (validRowCount >= maxNumberOfPointsInPage || isCurrentTimeExceedTimeRange(time)) { break; } - // skip empty row - if (allValueColDeletedMap != null - && allValueColDeletedMap.isMarked(getValueIndex(getScanOrderIndex(index)))) { - continue; - } - if (!isTimeSatisfied(time)) { + // skip invalid row + if ((allValueColDeletedMap != null + && allValueColDeletedMap.isMarked(getValueIndex(getScanOrderIndex(index)))) + || !isTimeSatisfied(time)) { + timeInvalidInfo = + timeInvalidInfo == null + ? new LazyBitMap(index, maxRowCountOfCurrentBatch, rows - 1) + : timeInvalidInfo; + timeInvalidInfo.mark(index); continue; } int nextRowIndex = index + 1; @@ -1645,20 +1651,25 @@ public abstract class AlignedTVList extends TVList { && ((allValueColDeletedMap != null && allValueColDeletedMap.isMarked( getValueIndex(getScanOrderIndex(nextRowIndex)))) - || !isTimeSatisfied(time))) { + || !isTimeSatisfied(getTime(getScanOrderIndex(nextRowIndex))))) { + timeInvalidInfo = + timeInvalidInfo == null + ? new LazyBitMap(nextRowIndex, maxRowCountOfCurrentBatch, rows - 1) + : timeInvalidInfo; + timeInvalidInfo.mark(nextRowIndex); nextRowIndex++; } if ((nextRowIndex == rows || time != getTime(getScanOrderIndex(nextRowIndex)))) { timeBuilder.writeLong(time); validRowCount++; } else { - if (Objects.isNull(timeDuplicateInfo)) { - timeDuplicateInfo = new BitMap(rows); + if (Objects.isNull(timeDuplicatedInfo)) { + timeDuplicatedInfo = new LazyBitMap(index, maxRowCountOfCurrentBatch, rows - 1); } - // For this timeInvalidInfo, we mark all positions that are not the last one in the + // For this timeDuplicatedInfo, we mark all positions that are not the last one in the // ASC traversal. It has the same behaviour for the DESC traversal, because our ultimate // goal is to process all the data with the same timestamp before writing it into TsBlock. - timeDuplicateInfo.mark(index); + timeDuplicatedInfo.mark(index); } index = nextRowIndex - 1; } @@ -1673,21 +1684,20 @@ public abstract class AlignedTVList extends TVList { int[] deleteCursor = {0}; // Pair of Time and Index Pair<Long, Integer> lastValidPointIndexForTimeDupCheck = null; - if (Objects.nonNull(timeDuplicateInfo)) { + if (Objects.nonNull(timeDuplicatedInfo)) { lastValidPointIndexForTimeDupCheck = new Pair<>(Long.MIN_VALUE, null); } ColumnBuilder valueBuilder = builder.getColumnBuilder(columnIndex); currentWriteRowIndex = 0; for (int sortedRowIndex = startIndex; sortedRowIndex < index; sortedRowIndex++) { - // skip empty row - if ((allValueColDeletedMap != null - && allValueColDeletedMap.isMarked( - getValueIndex(getScanOrderIndex(sortedRowIndex)))) - || !isTimeSatisfied(getTime(getScanOrderIndex(sortedRowIndex)))) { - continue; + // skip invalid rows + if (Objects.nonNull(timeInvalidInfo)) { + if (timeInvalidInfo.isMarked(sortedRowIndex)) { + continue; + } } - // skip time duplicated or totally deleted rows - if (Objects.nonNull(timeDuplicateInfo)) { + // skip time duplicated rows + if (Objects.nonNull(timeDuplicatedInfo)) { if (!outer.isNullValue( getValueIndex(getScanOrderIndex(sortedRowIndex)), validColumnIndex)) { lastValidPointIndexForTimeDupCheck.left = getTime(getScanOrderIndex(sortedRowIndex)); @@ -1702,11 +1712,11 @@ public abstract class AlignedTVList extends TVList { getValueIndex(getScanOrderIndex(sortedRowIndex)); } } - // timeInvalidInfo was constructed when traversing the time column before. It can be - // reused when traversing each value column to skip deleted rows or non-last rows with + // timeDuplicatedInfo was constructed when traversing the time column before. It can be + // reused when traversing each value column to skip non-last rows with // duplicated timestamps. // Until the last duplicate timestamp is encountered, it will be skipped here. - if (timeDuplicateInfo.isMarked(sortedRowIndex)) { + if (timeDuplicatedInfo.isMarked(sortedRowIndex)) { continue; } } @@ -1759,9 +1769,9 @@ public abstract class AlignedTVList extends TVList { TsBlock tsBlock = builder.build(); if (needRebuildTsBlock(hasAnyNonNullValue)) { // if exist all null rows, at most have validRowCount - 1 valid rows + // When rebuilding TsBlock, pushDownFilter and paginationController are also processed. tsBlock = reBuildTsBlock(hasAnyNonNullValue, validRowCount, dataTypeList, tsBlock); - } - if (pushDownFilter != null) { + } else if (pushDownFilter != null) { tsBlock = TsBlockUtil.applyFilterAndLimitOffsetToTsBlock( tsBlock, @@ -1823,21 +1833,35 @@ public abstract class AlignedTVList extends TVList { int previousValidRowCount, List<TSDataType> tsDataTypeList, TsBlock previousTsBlock) { + boolean[] selection = hasAnyNonNullValue; + if (pushDownFilter != null) { + selection = pushDownFilter.satisfyTsBlock(hasAnyNonNullValue, previousTsBlock); + } TsBlockBuilder builder = new TsBlockBuilder(previousValidRowCount - 1, tsDataTypeList); TimeColumnBuilder timeColumnBuilder = builder.getTimeColumnBuilder(); Column timeColumn = previousTsBlock.getTimeColumn(); + int stopIndex = previousValidRowCount; for (int i = 0; i < previousValidRowCount; i++) { - if (hasAnyNonNullValue[i]) { - timeColumnBuilder.writeLong(timeColumn.getLong(i)); - builder.declarePosition(); + if (selection[i]) { + if (paginationController.hasCurOffset()) { + paginationController.consumeOffset(); + selection[i] = false; + } else if (paginationController.hasCurLimit()) { + timeColumnBuilder.writeLong(timeColumn.getLong(i)); + builder.declarePosition(); + paginationController.consumeLimit(); + } else { + stopIndex = i; + break; + } } } for (int columnIndex = 0; columnIndex < tsDataTypeList.size(); columnIndex++) { ColumnBuilder columnBuilder = builder.getColumnBuilder(columnIndex); Column column = previousTsBlock.getColumn(columnIndex); - for (int i = 0; i < previousValidRowCount; i++) { - if (hasAnyNonNullValue[i]) { + for (int i = 0; i < stopIndex; i++) { + if (selection[i]) { if (column.isNull(i)) { columnBuilder.appendNull(); } else { diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/LazyBitMap.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/LazyBitMap.java new file mode 100644 index 00000000000..dd5eac1a528 --- /dev/null +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/LazyBitMap.java @@ -0,0 +1,93 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.db.utils.datastructure; + +import org.apache.tsfile.utils.BitMap; + +import java.util.ArrayList; + +public class LazyBitMap { + private final int startPosition; + private final int endPosition; + private final int blockSize; + private final ArrayList<BitMap> blocks; + + public LazyBitMap(int startIndex, int appendSize, int endIndex) { + if (endIndex < startIndex) { + throw new IllegalArgumentException("endIndex must be >= startIndex"); + } + if (appendSize <= 0) { + throw new IllegalArgumentException("appendSize must be positive"); + } + this.startPosition = startIndex; + this.endPosition = endIndex; + this.blockSize = appendSize; + this.blocks = new ArrayList<>(2); + } + + public void mark(int index) { + if (index < startPosition) { + throw new IndexOutOfBoundsException("Index below startPosition: " + index); + } + if (index > endPosition) { + throw new IndexOutOfBoundsException("Index exceeds endPosition: " + index); + } + int blockIndex = getBlockIndex(index); + ensureCapacity(blockIndex); + BitMap block = blocks.get(blockIndex); + if (block == null) { + block = new BitMap(blockSize); + blocks.set(blockIndex, block); + } + block.mark(getInnerIndex(index)); + } + + private void ensureCapacity(int blockIndex) { + while (blockIndex >= blocks.size()) { + blocks.add(null); + } + } + + public boolean isMarked(int index) { + if (index < startPosition) { + return false; + } + if (index > endPosition) { + throw new IndexOutOfBoundsException("Index exceeds endPosition: " + index); + } + int blockIndex = getBlockIndex(index); + if (blockIndex >= blocks.size()) { + return false; + } + BitMap block = blocks.get(blockIndex); + if (block == null) { + return false; + } + return block.isMarked(getInnerIndex(index)); + } + + private int getBlockIndex(int index) { + return (index - startPosition) / blockSize; + } + + private int getInnerIndex(int index) { + return (index - startPosition) % blockSize; + } +} diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/MemPointIterator.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/MemPointIterator.java index 1cce0ff0b4d..ca5686f7ac3 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/MemPointIterator.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/MemPointIterator.java @@ -69,8 +69,11 @@ public abstract class MemPointIterator implements IPointReader { public void setCurrentPageTimeRange(TimeRange timeRange) { this.timeRange = timeRange; + skipToCurrentTimeRangeStartPosition(); } + protected void skipToCurrentTimeRangeStartPosition() {} + protected boolean isCurrentTimeExceedTimeRange(long time) { return timeRange != null && (scanOrder.isAscending() ? (time > timeRange.getMax()) : (time < timeRange.getMin())); diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/MergeSortMultiAlignedTVListIterator.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/MergeSortMultiAlignedTVListIterator.java index c507bd75d50..7fa1949959d 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/MergeSortMultiAlignedTVListIterator.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/MergeSortMultiAlignedTVListIterator.java @@ -86,6 +86,20 @@ public class MergeSortMultiAlignedTVListIterator extends MultiAlignedTVListItera a.left.equals(b.left) ? a.right.compareTo(b.right) : b.left.compareTo(a.left)); } + @Override + protected void skipToCurrentTimeRangeStartPosition() { + hasNext = false; + probeIterators.clear(); + for (int i = 0; i < alignedTvListIterators.size(); i++) { + AlignedTVList.AlignedTVListIterator iterator = alignedTvListIterators.get(i); + iterator.skipToCurrentTimeRangeStartPosition(); + if (iterator.hasNextTimeValuePair()) { + probeIterators.add(i); + } + } + probeNext = false; + } + @Override protected void prepareNext() { hasNext = false; @@ -288,4 +302,12 @@ public class MergeSortMultiAlignedTVListIterator extends MultiAlignedTVListItera protected int currentRowIndex(int columnIndex) { return rowIndices[columnIndex]; } + + @Override + public void setCurrentPageTimeRange(TimeRange timeRange) { + for (TVList.TVListIterator tvListIterator : this.alignedTvListIterators) { + tvListIterator.timeRange = timeRange; + } + super.setCurrentPageTimeRange(timeRange); + } } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/MergeSortMultiTVListIterator.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/MergeSortMultiTVListIterator.java index 2e33811c257..aebd71dbddd 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/MergeSortMultiTVListIterator.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/MergeSortMultiTVListIterator.java @@ -73,6 +73,20 @@ public class MergeSortMultiTVListIterator extends MultiTVListIterator { a.left.equals(b.left) ? a.right.compareTo(b.right) : b.left.compareTo(a.left)); } + @Override + protected void skipToCurrentTimeRangeStartPosition() { + hasNext = false; + probeIterators.clear(); + for (int i = 0; i < tvListIterators.size(); i++) { + TVList.TVListIterator iterator = tvListIterators.get(i); + iterator.skipToCurrentTimeRangeStartPosition(); + if (iterator.hasNextTimeValuePair()) { + probeIterators.add(i); + } + } + probeNext = false; + } + @Override protected void prepareNext() { hasNext = false; @@ -167,4 +181,12 @@ public class MergeSortMultiTVListIterator extends MultiTVListIterator { } } } + + @Override + public void setCurrentPageTimeRange(TimeRange timeRange) { + for (TVList.TVListIterator tvListIterator : this.tvListIterators) { + tvListIterator.timeRange = timeRange; + } + super.setCurrentPageTimeRange(timeRange); + } } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/OrderedMultiAlignedTVListIterator.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/OrderedMultiAlignedTVListIterator.java index 46d8c5f02ac..b21a619d238 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/OrderedMultiAlignedTVListIterator.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/OrderedMultiAlignedTVListIterator.java @@ -98,6 +98,22 @@ public class OrderedMultiAlignedTVListIterator extends MultiAlignedTVListIterato probeNext = true; } + @Override + protected void skipToCurrentTimeRangeStartPosition() { + hasNext = false; + iteratorIndex = 0; + while (iteratorIndex < alignedTvListIterators.size() && !hasNext) { + AlignedTVList.AlignedTVListIterator iterator = alignedTvListIterators.get(iteratorIndex); + iterator.skipToCurrentTimeRangeStartPosition(); + if (!iterator.hasNextTimeValuePair()) { + iteratorIndex++; + continue; + } + hasNext = iterator.hasNextTimeValuePair(); + } + probeNext = false; + } + @Override protected void next() { TVList.TVListIterator iterator = alignedTvListIterators.get(iteratorIndex); @@ -129,4 +145,12 @@ public class OrderedMultiAlignedTVListIterator extends MultiAlignedTVListIterato protected int currentRowIndex(int columnIndex) { return rowIndices[columnIndex]; } + + @Override + public void setCurrentPageTimeRange(TimeRange timeRange) { + for (AlignedTVList.AlignedTVListIterator iterator : alignedTvListIterators) { + iterator.timeRange = timeRange; + } + super.setCurrentPageTimeRange(timeRange); + } } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/OrderedMultiTVListIterator.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/OrderedMultiTVListIterator.java index 61ae5f1360d..27f79eca846 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/OrderedMultiTVListIterator.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/OrderedMultiTVListIterator.java @@ -68,6 +68,22 @@ public class OrderedMultiTVListIterator extends MultiTVListIterator { probeNext = true; } + @Override + protected void skipToCurrentTimeRangeStartPosition() { + hasNext = false; + iteratorIndex = 0; + while (iteratorIndex < tvListIterators.size() && !hasNext) { + TVList.TVListIterator iterator = tvListIterators.get(iteratorIndex); + iterator.skipToCurrentTimeRangeStartPosition(); + if (!iterator.hasNextTimeValuePair()) { + iteratorIndex++; + continue; + } + hasNext = iterator.hasNextTimeValuePair(); + } + probeNext = false; + } + @Override protected void next() { tvListIterators.get(iteratorIndex).next(); @@ -90,4 +106,12 @@ public class OrderedMultiTVListIterator extends MultiTVListIterator { } probeNext = false; } + + @Override + public void setCurrentPageTimeRange(TimeRange timeRange) { + for (TVList.TVListIterator tvListIterator : this.tvListIterators) { + tvListIterator.timeRange = timeRange; + } + super.setCurrentPageTimeRange(timeRange); + } } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/TVList.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/TVList.java index e37fdec551e..8fa7925bc3c 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/TVList.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/TVList.java @@ -199,6 +199,105 @@ public abstract class TVList implements WALEntryValue { return timestamps.get(arrayIndex)[elementIndex]; } + /** + * Performs a binary search to find the first position whose timestamp is greater than or equal to + * the given {@code time}. + * + * <p>This method assumes timestamps are sorted in ascending order. If the list is not sorted, an + * {@link UnsupportedOperationException} will be thrown. + * + * <p>Typical use case: locate the starting index of a time range query. + * + * <p>Example: + * + * <ul> + * <li>timestamps = [10, 20, 20, 25, 30] + * <li>time = 5 → return 0 + * <li>time = 20 → return 1 + * <li>time = 21 → return 3 + * <li>time = 40 → return 5 (all timestamps < 40) + * </ul> + * + * <p><b>Return value range:</b> + * + * <ul> + * <li>When a matching or greater element exists: {@code 0 <= index <= seqRowCount - 1} + * <li>When all elements are smaller than {@code time}: {@code index == seqRowCount} + * </ul> + * + * @param time the target timestamp + * @param low the lower bound index (inclusive) + * @param high the upper bound index (inclusive) + * @return the index of the first timestamp ≥ {@code time}, or {@code seqRowCount} if all + * timestamps are smaller + */ + private int binarySearchTimestampFirstGreaterOrEqualsPosition(long time, int low, int high) { + if (!sorted && high >= seqRowCount) { + throw new UnsupportedOperationException("Current TVList is not sorted"); + } + int mid; + while (low <= high) { + mid = low + ((high - low) >>> 1); + long midTime = getTime(mid); + if (midTime < time) { + low = mid + 1; + } else { + high = mid - 1; + } + } + return low; + } + + /** + * Performs a binary search to find the last position whose timestamp is less than or equal to the + * given {@code time}. + * + * <p>This method assumes timestamps are sorted in ascending order. If the list is not sorted, an + * {@link UnsupportedOperationException} will be thrown. + * + * <p>Typical use case: locate the ending index of a time range query. + * + * <p>Example: + * + * <ul> + * <li>timestamps = [10, 20, 20, 25, 30] + * <li>time = 5 → return -1 (no timestamp ≤ 5) + * <li>time = 20 → return 2 + * <li>time = 21 → return 2 + * <li>time = 50 → return 4 + * </ul> + * + * <p><b>Return value range:</b> + * + * <ul> + * <li>When a matching or smaller element exists: {@code 0 <= index <= seqRowCount - 1} + * <li>When all elements are greater than {@code time}: {@code index == -1} + * </ul> + * + * @param time the target timestamp + * @param low the lower bound index (inclusive) + * @param high the upper bound index (inclusive) + * @return the index of the last timestamp ≤ {@code time}, or {@code -1} if all timestamps are + * greater + */ + private int binarySearchTimestampLastLessOrEqualsPosition(long time, int low, int high) { + if (!sorted && high >= seqRowCount) { + throw new UnsupportedOperationException("Current TVList is not sorted"); + } + + int mid; + while (low <= high) { + mid = low + ((high - low) >>> 1); + long midTime = getTime(mid); + if (midTime <= time) { + low = mid + 1; + } else { + high = mid - 1; + } + } + return high; + } + protected void set(int src, int dest) { long srcT = getTime(src); int srcV = getValueIndex(src); @@ -699,6 +798,61 @@ public abstract class TVList implements WALEntryValue { deleteCursor = new int[] {cursor}; } + @Override + protected void skipToCurrentTimeRangeStartPosition() { + if (timeRange == null || index >= rows) { + return; + } + if (timeRange.contains(getTime(getScanOrderIndex(index)))) { + return; + } + + int indexInTVList; + // Since there may be duplicate timestamps in TVList, we need to move to the index of the + // first timestamp that meets the requirements under current scanOrder. + if (scanOrder.isAscending()) { + long searchTimestamp = timeRange.getMin(); + if (searchTimestamp <= outer.getMinTime()) { + return; + } + if (searchTimestamp > outer.getMaxTime()) { + // all satisfied data has been consumed + index = rows; + probeNext = true; + return; + } + // For asc scan, if it can not be found, the indexInTVList is too small, and we should move + // to the next timestamp position. + // If it can be found, move to the min index of current timestamp. + indexInTVList = + binarySearchTimestampFirstGreaterOrEqualsPosition( + searchTimestamp, getScanOrderIndex(index), rows - 1); + } else { + long searchTimestamp = timeRange.getMax(); + if (searchTimestamp >= outer.getMaxTime()) { + return; + } + if (searchTimestamp < outer.getMinTime()) { + // all satisfied data has been consumed + index = rows; + probeNext = true; + return; + } + // For desc scan, regardless of whether it is found, the timestamp corresponding to + // indexInTVList has met the conditions. We only need to find the index that first + // encounters this timestamp during desc scan. + indexInTVList = + binarySearchTimestampLastLessOrEqualsPosition( + searchTimestamp, 0, getScanOrderIndex(index)); + } + int newIndex = getScanOrderIndex(indexInTVList); + if (newIndex > index) { + index = newIndex; + } + + probeNext = false; + } + protected void prepareNext() { if (scanOrder.isAscending()) { // For ASC traversal, we first find a valid index and then handle duplicate timestamps. @@ -808,10 +962,19 @@ public abstract class TVList implements WALEntryValue { @Override public TsBlock nextBatch() { TSDataType dataType = getDataType(); - TsBlockBuilder builder = new TsBlockBuilder(Collections.singletonList(dataType)); + int maxRowCountOfCurrentBatch = + Math.min( + paginationController.hasLimit() + ? (int) paginationController.getCurLimit() + : Integer.MAX_VALUE, + Math.min(maxNumberOfPointsInPage, rows - index)); + TsBlockBuilder builder = + new TsBlockBuilder(maxRowCountOfCurrentBatch, Collections.singletonList(dataType)); switch (dataType) { case BOOLEAN: - while (index < rows && builder.getPositionCount() < maxNumberOfPointsInPage) { + while (index < rows + && builder.getPositionCount() < maxNumberOfPointsInPage + && paginationController.hasCurLimit()) { long time = getTime(getScanOrderIndex(index)); if (isCurrentTimeExceedTimeRange(time)) { break; @@ -822,6 +985,12 @@ public abstract class TVList implements WALEntryValue { && isTimeSatisfied(time)) { boolean aBoolean = getBoolean(getScanOrderIndex(index)); if (pushDownFilter == null || pushDownFilter.satisfyBoolean(time, aBoolean)) { + if (paginationController.hasCurOffset()) { + paginationController.consumeOffset(); + index++; + continue; + } + paginationController.consumeLimit(); builder.getTimeColumnBuilder().writeLong(time); builder.getColumnBuilder(0).writeBoolean(aBoolean); builder.declarePosition(); @@ -832,7 +1001,9 @@ public abstract class TVList implements WALEntryValue { break; case INT32: case DATE: - while (index < rows && builder.getPositionCount() < maxNumberOfPointsInPage) { + while (index < rows + && builder.getPositionCount() < maxNumberOfPointsInPage + && paginationController.hasCurLimit()) { long time = getTime(getScanOrderIndex(index)); if (isCurrentTimeExceedTimeRange(time)) { break; @@ -843,6 +1014,12 @@ public abstract class TVList implements WALEntryValue { && isTimeSatisfied(time)) { int anInt = getInt(getScanOrderIndex(index)); if (pushDownFilter == null || pushDownFilter.satisfyInteger(time, anInt)) { + if (paginationController.hasCurOffset()) { + paginationController.consumeOffset(); + index++; + continue; + } + paginationController.consumeLimit(); builder.getTimeColumnBuilder().writeLong(time); builder.getColumnBuilder(0).writeInt(anInt); builder.declarePosition(); @@ -853,7 +1030,9 @@ public abstract class TVList implements WALEntryValue { break; case INT64: case TIMESTAMP: - while (index < rows && builder.getPositionCount() < maxNumberOfPointsInPage) { + while (index < rows + && builder.getPositionCount() < maxNumberOfPointsInPage + && paginationController.hasCurLimit()) { long time = getTime(getScanOrderIndex(index)); if (isCurrentTimeExceedTimeRange(time)) { break; @@ -864,6 +1043,12 @@ public abstract class TVList implements WALEntryValue { && isTimeSatisfied(time)) { long aLong = getLong(getScanOrderIndex(index)); if (pushDownFilter == null || pushDownFilter.satisfyLong(time, aLong)) { + if (paginationController.hasCurOffset()) { + paginationController.consumeOffset(); + index++; + continue; + } + paginationController.consumeLimit(); builder.getTimeColumnBuilder().writeLong(time); builder.getColumnBuilder(0).writeLong(aLong); builder.declarePosition(); @@ -873,7 +1058,9 @@ public abstract class TVList implements WALEntryValue { } break; case FLOAT: - while (index < rows && builder.getPositionCount() < maxNumberOfPointsInPage) { + while (index < rows + && builder.getPositionCount() < maxNumberOfPointsInPage + && paginationController.hasCurLimit()) { long time = getTime(getScanOrderIndex(index)); if (isCurrentTimeExceedTimeRange(time)) { break; @@ -886,6 +1073,12 @@ public abstract class TVList implements WALEntryValue { roundValueWithGivenPrecision( getFloat(getScanOrderIndex(index)), floatPrecision, encoding); if (pushDownFilter == null || pushDownFilter.satisfyFloat(time, aFloat)) { + if (paginationController.hasCurOffset()) { + paginationController.consumeOffset(); + index++; + continue; + } + paginationController.consumeLimit(); builder.getTimeColumnBuilder().writeLong(time); builder.getColumnBuilder(0).writeFloat(aFloat); builder.declarePosition(); @@ -895,7 +1088,9 @@ public abstract class TVList implements WALEntryValue { } break; case DOUBLE: - while (index < rows && builder.getPositionCount() < maxNumberOfPointsInPage) { + while (index < rows + && builder.getPositionCount() < maxNumberOfPointsInPage + && paginationController.hasCurLimit()) { long time = getTime(getScanOrderIndex(index)); if (isCurrentTimeExceedTimeRange(time)) { break; @@ -908,6 +1103,12 @@ public abstract class TVList implements WALEntryValue { roundValueWithGivenPrecision( getDouble(getScanOrderIndex(index)), floatPrecision, encoding); if (pushDownFilter == null || pushDownFilter.satisfyDouble(time, aDouble)) { + if (paginationController.hasCurOffset()) { + paginationController.consumeOffset(); + index++; + continue; + } + paginationController.consumeLimit(); builder.getTimeColumnBuilder().writeLong(time); builder.getColumnBuilder(0).writeDouble(aDouble); builder.declarePosition(); @@ -919,7 +1120,9 @@ public abstract class TVList implements WALEntryValue { case TEXT: case BLOB: case STRING: - while (index < rows && builder.getPositionCount() < maxNumberOfPointsInPage) { + while (index < rows + && builder.getPositionCount() < maxNumberOfPointsInPage + && paginationController.hasCurLimit()) { long time = getTime(getScanOrderIndex(index)); if (isCurrentTimeExceedTimeRange(time)) { break; @@ -930,6 +1133,12 @@ public abstract class TVList implements WALEntryValue { && isTimeSatisfied(time)) { Binary binary = getBinary(getScanOrderIndex(index)); if (pushDownFilter == null || pushDownFilter.satisfyBinary(time, binary)) { + if (paginationController.hasCurOffset()) { + paginationController.consumeOffset(); + index++; + continue; + } + paginationController.consumeLimit(); builder.getTimeColumnBuilder().writeLong(time); builder.getColumnBuilder(0).writeBinary(binary); builder.declarePosition(); @@ -942,9 +1151,9 @@ public abstract class TVList implements WALEntryValue { throw new UnSupportedDataTypeException( String.format("Data type %s is not supported.", dataType)); } - // There is no need to process pushDownFilter here because it has been applied when - // constructing the tsBlock - TsBlock tsBlock = paginationController.applyTsBlock(builder.build()); + // There is no need to process pushDownFilter and paginationController here because it has + // been applied when constructing the tsBlock + TsBlock tsBlock = builder.build(); addTsBlock(tsBlock); return tsBlock; } diff --git a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/memtable/AlignedTVListIteratorTest.java b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/memtable/AlignedTVListIteratorTest.java index 758e0869f2c..442f69109e2 100644 --- a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/memtable/AlignedTVListIteratorTest.java +++ b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/memtable/AlignedTVListIteratorTest.java @@ -20,6 +20,7 @@ package org.apache.iotdb.db.storageengine.dataregion.memtable; import org.apache.iotdb.commons.concurrent.IoTDBThreadPoolFactory; +import org.apache.iotdb.db.exception.query.QueryProcessException; import org.apache.iotdb.db.queryengine.common.FragmentInstanceId; import org.apache.iotdb.db.queryengine.common.PlanFragmentId; import org.apache.iotdb.db.queryengine.execution.fragment.FragmentInstanceContext; @@ -159,6 +160,27 @@ public class AlignedTVListIteratorTest { Collections.emptyList()), false, 10); + tvListMap = buildAlignedSingleTvListMap(Collections.singletonList(new TimeRange(1, 10))); + testAligned( + tvListMap, + Ordering.ASC, + new TimeFilterOperators.TimeBetweenAnd(1L, 10L), + new LongFilterOperators.ValueBetweenAnd(0, 1, 10), + new PaginationController(10, 1), + Collections.singletonList(new TimeRange(4, 4)), + Arrays.asList(Collections.emptyList(), Collections.emptyList(), Collections.emptyList()), + true, + 8); + testAligned( + tvListMap, + Ordering.DESC, + new TimeFilterOperators.TimeBetweenAnd(1L, 10L), + new LongFilterOperators.ValueBetweenAnd(0, 1, 10), + new PaginationController(10, 1), + Collections.singletonList(new TimeRange(4, 4)), + Arrays.asList(Collections.emptyList(), Collections.emptyList(), Collections.emptyList()), + true, + 8); } @Test @@ -806,4 +828,92 @@ public class AlignedTVListIteratorTest { : new PaginationController( paginationController.getCurLimit(), paginationController.getCurOffset()); } + + @Test + public void testSkipTimeRange() throws QueryProcessException, IOException { + List<Map<TVList, Integer>> list = + Arrays.asList( + buildAlignedSingleTvListMap( + Arrays.asList(new TimeRange(10, 20), new TimeRange(22, 40))), + buildAlignedMultiTvListMap(Arrays.asList(new TimeRange(10, 20), new TimeRange(22, 40))), + buildAlignedMultiTvListMap( + Arrays.asList( + new TimeRange(10, 20), + new TimeRange(10, 20), + new TimeRange(24, 30), + new TimeRange(22, 40)))); + for (Map<TVList, Integer> tvListMap : list) { + testSkipTimeRange( + tvListMap, + Ordering.ASC, + Arrays.asList(new TimeRange(11, 13), new TimeRange(21, 21), new TimeRange(33, 34)), + Arrays.asList(new TimeRange(11, 13), new TimeRange(33, 34))); + testSkipTimeRange( + tvListMap, + Ordering.DESC, + Arrays.asList(new TimeRange(33, 34), new TimeRange(21, 21), new TimeRange(11, 13)), + Arrays.asList(new TimeRange(33, 34), new TimeRange(11, 13))); + } + } + + private void testSkipTimeRange( + Map<TVList, Integer> tvListMap, + Ordering scanOrder, + List<TimeRange> statisticsTimeRanges, + List<TimeRange> expectedResultTimeRange) + throws QueryProcessException, IOException { + List<Integer> columnIdxList = Arrays.asList(0, 1, 2); + IMeasurementSchema measurementSchema = getMeasurementSchema(); + AlignedReadOnlyMemChunk chunk = + new AlignedReadOnlyMemChunk( + fragmentInstanceContext, + columnIdxList, + measurementSchema, + tvListMap, + Collections.emptyList(), + Arrays.asList( + Collections.emptyList(), Collections.emptyList(), Collections.emptyList())); + chunk.sortTvLists(); + chunk.initChunkMetaFromTVListsWithFakeStatistics(); + MemPointIterator memPointIterator = chunk.createMemPointIterator(scanOrder, null); + List<Long> expectedTimestamps = new ArrayList<>(); + for (TimeRange timeRange : expectedResultTimeRange) { + if (scanOrder == Ordering.ASC) { + for (long i = timeRange.getMin(); i <= timeRange.getMax(); i++) { + expectedTimestamps.add(i); + } + } else { + for (long i = timeRange.getMax(); i >= timeRange.getMin(); i--) { + expectedTimestamps.add(i); + } + } + } + List<Long> resultTimestamps = new ArrayList<>(expectedTimestamps.size()); + for (TimeRange timeRange : statisticsTimeRanges) { + memPointIterator.setCurrentPageTimeRange(timeRange); + while (memPointIterator.hasNextBatch()) { + TsBlock tsBlock = memPointIterator.nextBatch(); + for (int i = 0; i < tsBlock.getPositionCount(); i++) { + long currentTimestamp = tsBlock.getTimeByIndex(i); + long value = tsBlock.getColumn(0).getLong(i); + Assert.assertEquals(currentTimestamp, value); + resultTimestamps.add(currentTimestamp); + } + } + } + Assert.assertEquals(expectedTimestamps, resultTimestamps); + + memPointIterator = chunk.createMemPointIterator(scanOrder, null); + + resultTimestamps.clear(); + for (TimeRange timeRange : statisticsTimeRanges) { + memPointIterator.setCurrentPageTimeRange(timeRange); + while (memPointIterator.hasNextTimeValuePair()) { + TimeValuePair timeValuePair = memPointIterator.nextTimeValuePair(); + Assert.assertEquals(timeValuePair.getTimestamp(), timeValuePair.getValues()[0]); + resultTimestamps.add(timeValuePair.getTimestamp()); + } + } + Assert.assertEquals(expectedTimestamps, resultTimestamps); + } } diff --git a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/memtable/NonAlignedTVListIteratorTest.java b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/memtable/NonAlignedTVListIteratorTest.java index c22dcbb6b0f..b28979efd67 100644 --- a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/memtable/NonAlignedTVListIteratorTest.java +++ b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/memtable/NonAlignedTVListIteratorTest.java @@ -630,4 +630,91 @@ public class NonAlignedTVListIteratorTest { } return tvListMap; } + + @Test + public void testSkipTimeRange() throws QueryProcessException, IOException { + List<Map<TVList, Integer>> list = + Arrays.asList( + buildNonAlignedSingleTvListMap( + Arrays.asList(new TimeRange(10, 20), new TimeRange(22, 40))), + buildNonAlignedMultiTvListMap( + Arrays.asList(new TimeRange(10, 20), new TimeRange(22, 40))), + buildNonAlignedMultiTvListMap( + Arrays.asList( + new TimeRange(10, 20), + new TimeRange(10, 20), + new TimeRange(24, 30), + new TimeRange(22, 40)))); + for (Map<TVList, Integer> tvListMap : list) { + testSkipTimeRange( + tvListMap, + Ordering.ASC, + Arrays.asList(new TimeRange(11, 13), new TimeRange(21, 21), new TimeRange(33, 34)), + Arrays.asList(new TimeRange(11, 13), new TimeRange(33, 34))); + testSkipTimeRange( + tvListMap, + Ordering.DESC, + Arrays.asList(new TimeRange(33, 34), new TimeRange(21, 21), new TimeRange(11, 13)), + Arrays.asList(new TimeRange(33, 34), new TimeRange(11, 13))); + } + } + + private void testSkipTimeRange( + Map<TVList, Integer> tvListMap, + Ordering scanOrder, + List<TimeRange> statisticsTimeRanges, + List<TimeRange> expectedResultTimeRange) + throws QueryProcessException, IOException { + ReadOnlyMemChunk chunk = + new ReadOnlyMemChunk( + fragmentInstanceContext, + "s1", + TSDataType.INT64, + TSEncoding.PLAIN, + tvListMap, + null, + Collections.emptyList()); + chunk.sortTvLists(); + chunk.initChunkMetaFromTVListsWithFakeStatistics(); + MemPointIterator memPointIterator = chunk.createMemPointIterator(scanOrder, null); + List<Long> expectedTimestamps = new ArrayList<>(); + for (TimeRange timeRange : expectedResultTimeRange) { + if (scanOrder == Ordering.ASC) { + for (long i = timeRange.getMin(); i <= timeRange.getMax(); i++) { + expectedTimestamps.add(i); + } + } else { + for (long i = timeRange.getMax(); i >= timeRange.getMin(); i--) { + expectedTimestamps.add(i); + } + } + } + List<Long> resultTimestamps = new ArrayList<>(expectedTimestamps.size()); + for (TimeRange timeRange : statisticsTimeRanges) { + memPointIterator.setCurrentPageTimeRange(timeRange); + while (memPointIterator.hasNextBatch()) { + TsBlock tsBlock = memPointIterator.nextBatch(); + for (int i = 0; i < tsBlock.getPositionCount(); i++) { + long currentTimestamp = tsBlock.getTimeByIndex(i); + long value = tsBlock.getColumn(0).getLong(i); + Assert.assertEquals(currentTimestamp, value); + resultTimestamps.add(currentTimestamp); + } + } + } + Assert.assertEquals(expectedTimestamps, resultTimestamps); + + memPointIterator = chunk.createMemPointIterator(scanOrder, null); + + resultTimestamps.clear(); + for (TimeRange timeRange : statisticsTimeRanges) { + memPointIterator.setCurrentPageTimeRange(timeRange); + while (memPointIterator.hasNextTimeValuePair()) { + TimeValuePair timeValuePair = memPointIterator.nextTimeValuePair(); + Assert.assertEquals(timeValuePair.getTimestamp(), timeValuePair.getValue().getLong()); + resultTimestamps.add(timeValuePair.getTimestamp()); + } + } + Assert.assertEquals(expectedTimestamps, resultTimestamps); + } }
