This is an automated email from the ASF dual-hosted git repository. shuwenwei pushed a commit to branch optimizeMemTableRegionScan-1.3 in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit 7641d6ea96316410f96318b4ab5659851705ee69 Author: shuwenwei <[email protected]> AuthorDate: Tue Dec 9 19:10:23 2025 +0800 Optimize memtable region scan (#16883) --- .../db/storageengine/dataregion/DataRegion.java | 6 +- .../dataregion/memtable/AbstractMemTable.java | 85 ++++-- .../memtable/AlignedWritableMemChunk.java | 114 ++++++-- .../dataregion/memtable/IMemTable.java | 6 +- .../dataregion/memtable/TsFileProcessor.java | 18 +- .../dataregion/memtable/WritableMemChunk.java | 52 ++-- .../memtable/WritableMemChunkRegionScanTest.java | 299 +++++++++++++++++++++ 7 files changed, 494 insertions(+), 86 deletions(-) diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/DataRegion.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/DataRegion.java index 1856a5448b3..8ce2185d9ec 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/DataRegion.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/DataRegion.java @@ -2161,7 +2161,8 @@ public class DataRegion implements IDataRegionForQuery { } else { tsFileResource .getProcessor() - .queryForSeriesRegionScanWithoutLock(partialPaths, context, fileScanHandles); + .queryForSeriesRegionScanWithoutLock( + partialPaths, context, fileScanHandles, globalTimeFilter); } } return fileScanHandles; @@ -2238,7 +2239,8 @@ public class DataRegion implements IDataRegionForQuery { } else { tsFileResource .getProcessor() - .queryForDeviceRegionScanWithoutLock(devicePathsToContext, context, fileScanHandles); + .queryForDeviceRegionScanWithoutLock( + devicePathsToContext, context, fileScanHandles, globalTimeFilter); } } return fileScanHandles; diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/AbstractMemTable.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/AbstractMemTable.java index dcae00700ed..9d1078a471e 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/AbstractMemTable.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/AbstractMemTable.java @@ -67,6 +67,7 @@ import java.util.List; import java.util.Map; import java.util.Map.Entry; import java.util.Objects; +import java.util.Optional; import java.util.concurrent.atomic.AtomicLong; import java.util.stream.Collectors; @@ -451,7 +452,8 @@ public abstract class AbstractMemTable implements IMemTable { long ttlLowerBound, Map<String, List<IChunkMetadata>> chunkMetaDataMap, Map<String, List<IChunkHandle>> memChunkHandleMap, - List<Pair<Modification, IMemTable>> modsToMemTabled) { + List<Pair<Modification, IMemTable>> modsToMemTabled, + Filter globalTimeFilter) { IDeviceID deviceID = DeviceIDFactory.getInstance().getDeviceID(fullPath.getDevicePath()); @@ -469,7 +471,12 @@ public abstract class AbstractMemTable implements IMemTable { (MeasurementPath) fullPath, this, modsToMemTabled, ttlLowerBound); } getMemChunkHandleFromMemTable( - deviceID, measurementId, chunkMetaDataMap, memChunkHandleMap, deletionList); + deviceID, + measurementId, + chunkMetaDataMap, + memChunkHandleMap, + deletionList, + globalTimeFilter); } else { if (!memTableMap.containsKey(deviceID)) { return; @@ -486,7 +493,8 @@ public abstract class AbstractMemTable implements IMemTable { ((AlignedPath) fullPath).getSchemaList(), chunkMetaDataMap, memChunkHandleMap, - deletionList); + deletionList, + globalTimeFilter); } } @@ -497,7 +505,8 @@ public abstract class AbstractMemTable implements IMemTable { long ttlLowerBound, Map<String, List<IChunkMetadata>> chunkMetadataMap, Map<String, List<IChunkHandle>> memChunkHandleMap, - List<Pair<Modification, IMemTable>> modsToMemTabled) + List<Pair<Modification, IMemTable>> modsToMemTabled, + Filter globalTimeFilter) throws IllegalPathException { Map<IDeviceID, IWritableMemChunkGroup> memTableMap = getMemTableMap(); @@ -515,7 +524,8 @@ public abstract class AbstractMemTable implements IMemTable { chunkMetadataMap, memChunkHandleMap, ttlLowerBound, - modsToMemTabled); + modsToMemTabled, + globalTimeFilter); } else { getMemChunkHandleFromMemTable( deviceID, @@ -523,7 +533,8 @@ public abstract class AbstractMemTable implements IMemTable { chunkMetadataMap, memChunkHandleMap, ttlLowerBound, - modsToMemTabled); + modsToMemTabled, + globalTimeFilter); } } @@ -532,24 +543,30 @@ public abstract class AbstractMemTable implements IMemTable { String measurementId, Map<String, List<IChunkMetadata>> chunkMetadataMap, Map<String, List<IChunkHandle>> memChunkHandleMap, - List<TimeRange> deletionList) { + List<TimeRange> deletionList, + Filter globalTimeFilter) { WritableMemChunk memChunk = (WritableMemChunk) memTableMap.get(deviceID).getMemChunkMap().get(measurementId); - long[] timestamps = memChunk.getFilteredTimestamp(deletionList); + if (memChunk == null) { + return; + } + Optional<Long> anySatisfiedTimestamp = + memChunk.getAnySatisfiedTimestamp(deletionList, globalTimeFilter); + if (!anySatisfiedTimestamp.isPresent()) { + return; + } + long satisfiedTimestamp = anySatisfiedTimestamp.get(); chunkMetadataMap .computeIfAbsent(measurementId, k -> new ArrayList<>()) .add( - buildChunkMetaDataForMemoryChunk( - measurementId, - timestamps[0], - timestamps[timestamps.length - 1], - Collections.emptyList())); + buildFakeChunkMetaDataForFakeMemoryChunk( + measurementId, satisfiedTimestamp, satisfiedTimestamp, Collections.emptyList())); memChunkHandleMap .computeIfAbsent(measurementId, k -> new ArrayList<>()) - .add(new MemChunkHandleImpl(deviceID, measurementId, timestamps)); + .add(new MemChunkHandleImpl(deviceID, measurementId, new long[] {satisfiedTimestamp})); } private void getMemAlignedChunkHandleFromMemTable( @@ -557,7 +574,8 @@ public abstract class AbstractMemTable implements IMemTable { List<IMeasurementSchema> schemaList, Map<String, List<IChunkMetadata>> chunkMetadataList, Map<String, List<IChunkHandle>> memChunkHandleMap, - List<List<TimeRange>> deletionList) { + List<List<TimeRange>> deletionList, + Filter globalTimeFilter) { AlignedWritableMemChunk alignedMemChunk = ((AlignedWritableMemChunkGroup) memTableMap.get(deviceID)).getAlignedMemChunk(); @@ -574,7 +592,11 @@ public abstract class AbstractMemTable implements IMemTable { } List<BitMap> bitMaps = new ArrayList<>(); - long[] timestamps = alignedMemChunk.getFilteredTimestamp(deletionList, bitMaps); + long[] timestamps = + alignedMemChunk.getAnySatisfiedTimestamp(deletionList, bitMaps, globalTimeFilter); + if (timestamps.length == 0) { + return; + } buildAlignedMemChunkHandle( deviceID, @@ -592,7 +614,8 @@ public abstract class AbstractMemTable implements IMemTable { Map<String, List<IChunkMetadata>> chunkMetadataList, Map<String, List<IChunkHandle>> memChunkHandleMap, long ttlLowerBound, - List<Pair<Modification, IMemTable>> modsToMemTabled) + List<Pair<Modification, IMemTable>> modsToMemTabled, + Filter globalTimeFilter) throws IllegalPathException { AlignedWritableMemChunk memChunk = writableMemChunkGroup.getAlignedMemChunk(); @@ -611,7 +634,10 @@ public abstract class AbstractMemTable implements IMemTable { } List<BitMap> bitMaps = new ArrayList<>(); - long[] timestamps = memChunk.getFilteredTimestamp(deletionList, bitMaps); + long[] timestamps = memChunk.getAnySatisfiedTimestamp(deletionList, bitMaps, globalTimeFilter); + if (timestamps.length == 0) { + return; + } buildAlignedMemChunkHandle( deviceID, timestamps, @@ -628,7 +654,8 @@ public abstract class AbstractMemTable implements IMemTable { Map<String, List<IChunkMetadata>> chunkMetadataMap, Map<String, List<IChunkHandle>> memChunkHandleMap, long ttlLowerBound, - List<Pair<Modification, IMemTable>> modsToMemTabled) + List<Pair<Modification, IMemTable>> modsToMemTabled, + Filter globalTimeFilter) throws IllegalPathException { for (Entry<String, IWritableMemChunk> entry : @@ -646,18 +673,20 @@ public abstract class AbstractMemTable implements IMemTable { modsToMemTabled, ttlLowerBound); } - long[] timestamps = writableMemChunk.getFilteredTimestamp(deletionList); + Optional<Long> anySatisfiedTimestamp = + writableMemChunk.getAnySatisfiedTimestamp(deletionList, globalTimeFilter); + if (!anySatisfiedTimestamp.isPresent()) { + return; + } + long satisfiedTimestamp = anySatisfiedTimestamp.get(); chunkMetadataMap .computeIfAbsent(measurementId, k -> new ArrayList<>()) .add( - buildChunkMetaDataForMemoryChunk( - measurementId, - timestamps[0], - timestamps[timestamps.length - 1], - Collections.emptyList())); + buildFakeChunkMetaDataForFakeMemoryChunk( + measurementId, satisfiedTimestamp, satisfiedTimestamp, Collections.emptyList())); memChunkHandleMap .computeIfAbsent(measurementId, k -> new ArrayList<>()) - .add(new MemChunkHandleImpl(deviceID, measurementId, timestamps)); + .add(new MemChunkHandleImpl(deviceID, measurementId, new long[] {satisfiedTimestamp})); } } @@ -681,7 +710,7 @@ public abstract class AbstractMemTable implements IMemTable { chunkMetadataList .computeIfAbsent(measurement, k -> new ArrayList<>()) .add( - buildChunkMetaDataForMemoryChunk( + buildFakeChunkMetaDataForFakeMemoryChunk( measurement, startEndTime[0], startEndTime[1], deletion)); chunkHandleMap .computeIfAbsent(measurement, k -> new ArrayList<>()) @@ -712,7 +741,7 @@ public abstract class AbstractMemTable implements IMemTable { return new long[] {startTime, endTime}; } - private IChunkMetadata buildChunkMetaDataForMemoryChunk( + private IChunkMetadata buildFakeChunkMetaDataForFakeMemoryChunk( String measurement, long startTime, long endTime, List<TimeRange> deletionList) { TimeStatistics timeStatistics = new TimeStatistics(); timeStatistics.setStartTime(startTime); diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/AlignedWritableMemChunk.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/AlignedWritableMemChunk.java index bca23f4df1f..88d55e5bdf8 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/AlignedWritableMemChunk.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/AlignedWritableMemChunk.java @@ -31,6 +31,7 @@ import org.apache.iotdb.db.utils.datastructure.TVList; import org.apache.tsfile.enums.TSDataType; import org.apache.tsfile.read.common.TimeRange; +import org.apache.tsfile.read.filter.basic.Filter; import org.apache.tsfile.utils.Binary; import org.apache.tsfile.utils.BitMap; import org.apache.tsfile.utils.Pair; @@ -52,6 +53,7 @@ import java.util.Objects; import java.util.Set; import java.util.TreeMap; import java.util.concurrent.BlockingQueue; +import java.util.concurrent.atomic.AtomicInteger; import static org.apache.iotdb.db.utils.ModificationUtils.isPointDeleted; @@ -712,29 +714,89 @@ public class AlignedWritableMemChunk extends AbstractWritableMemChunk { return new Pair<>(reorderedColumnValues, reorderedBitMaps); } - private void filterDeletedTimeStamp( + public long[] getAnySatisfiedTimestamp( + List<List<TimeRange>> deletionList, List<BitMap> bitMaps, Filter globalTimeFilter) { + BitMap columnHasNonNullValue = new BitMap(schemaList.size()); + AtomicInteger hasNonNullValueColumnCount = new AtomicInteger(0); + Map<Long, BitMap> timestampWithBitmap = new TreeMap<>(); + + getAnySatisfiedTimestamp( + list, + deletionList, + timestampWithBitmap, + globalTimeFilter, + columnHasNonNullValue, + hasNonNullValueColumnCount); + for (int i = 0; + i < sortedList.size() && hasNonNullValueColumnCount.get() < schemaList.size(); + i++) { + getAnySatisfiedTimestamp( + sortedList.get(i), + deletionList, + timestampWithBitmap, + globalTimeFilter, + columnHasNonNullValue, + hasNonNullValueColumnCount); + } + + long[] timestamps = new long[timestampWithBitmap.size()]; + int idx = 0; + for (Map.Entry<Long, BitMap> entry : timestampWithBitmap.entrySet()) { + timestamps[idx++] = entry.getKey(); + bitMaps.add(entry.getValue()); + } + return timestamps; + } + + private void getAnySatisfiedTimestamp( AlignedTVList alignedTVList, List<List<TimeRange>> valueColumnsDeletionList, - Map<Long, BitMap> timestampWithBitmap) { + Map<Long, BitMap> timestampWithBitmap, + Filter globalTimeFilter, + BitMap columnHasNonNullValue, + AtomicInteger hasNonNullValueColumnCount) { + if (globalTimeFilter != null + && !globalTimeFilter.satisfyStartEndTime( + alignedTVList.getMinTime(), alignedTVList.getMaxTime())) { + return; + } BitMap allValueColDeletedMap = alignedTVList.getAllValueColDeletedMap(); - int rowCount = alignedTVList.rowCount(); List<int[]> valueColumnDeleteCursor = new ArrayList<>(); if (valueColumnsDeletionList != null) { valueColumnsDeletionList.forEach(x -> valueColumnDeleteCursor.add(new int[] {0})); } + // example: + // globalTimeFilter:null, ignoreAllNullRows: true + // tvList: + // time s1 s2 s3 + // 1 1 null null + // 2 null 1 null + // 2 1 1 null + // 3 1 null null + // 4 1 null 1 + // timestampWithBitmap: + // timestamp: 1 bitmap: 011 + // timestamp: 2 bitmap: 101 + // timestamp: 4 bitmap: 110 for (int row = 0; row < rowCount; row++) { // the row is deleted if (allValueColDeletedMap != null && allValueColDeletedMap.isMarked(row)) { continue; } long timestamp = alignedTVList.getTime(row); + if (globalTimeFilter != null && !globalTimeFilter.satisfy(timestamp, null)) { + continue; + } + + // Note that this method will only perform bitmap unmarking on the first occurrence of a + // non-null value in multiple timestamps for the same column. + BitMap currentRowNullValueBitmap = null; - BitMap bitMap = new BitMap(schemaList.size()); for (int column = 0; column < schemaList.size(); column++) { if (alignedTVList.isNullValue(alignedTVList.getValueIndex(row), column)) { - bitMap.mark(column); + continue; } // skip deleted row @@ -744,32 +806,36 @@ public class AlignedWritableMemChunk extends AbstractWritableMemChunk { timestamp, valueColumnsDeletionList.get(column), valueColumnDeleteCursor.get(column))) { - bitMap.mark(column); - } - - // skip all-null row - if (bitMap.isAllMarked()) { continue; } - timestampWithBitmap.put(timestamp, bitMap); + if (!columnHasNonNullValue.isMarked(column)) { + hasNonNullValueColumnCount.incrementAndGet(); + columnHasNonNullValue.mark(column); + currentRowNullValueBitmap = + currentRowNullValueBitmap != null + ? currentRowNullValueBitmap + : timestampWithBitmap.computeIfAbsent( + timestamp, k -> getAllMarkedBitmap(schemaList.size())); + currentRowNullValueBitmap.unmark(column); + } } - } - } - public long[] getFilteredTimestamp(List<List<TimeRange>> deletionList, List<BitMap> bitMaps) { - Map<Long, BitMap> timestampWithBitmap = new TreeMap<>(); + if (currentRowNullValueBitmap == null) { + continue; + } + // found new column with non-null value + timestampWithBitmap.put(timestamp, currentRowNullValueBitmap); - filterDeletedTimeStamp(list, deletionList, timestampWithBitmap); - for (AlignedTVList alignedTVList : sortedList) { - filterDeletedTimeStamp(alignedTVList, deletionList, timestampWithBitmap); + if (hasNonNullValueColumnCount.get() == schemaList.size()) { + return; + } } + } - List<Long> filteredTimestamps = new ArrayList<>(); - for (Map.Entry<Long, BitMap> entry : timestampWithBitmap.entrySet()) { - filteredTimestamps.add(entry.getKey()); - bitMaps.add(entry.getValue()); - } - return filteredTimestamps.stream().mapToLong(Long::valueOf).toArray(); + private BitMap getAllMarkedBitmap(int size) { + BitMap bitMap = new BitMap(size); + bitMap.markAll(); + return bitMap; } // Choose maximum avgPointSizeOfLargestColumn among working and sorted AlignedTVList as diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/IMemTable.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/IMemTable.java index 3a7e9c55092..cd1e9f14432 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/IMemTable.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/IMemTable.java @@ -125,7 +125,8 @@ public interface IMemTable extends WALEntryValue { long ttlLowerBound, Map<String, List<IChunkMetadata>> chunkMetadataMap, Map<String, List<IChunkHandle>> memChunkHandleMap, - List<Pair<Modification, IMemTable>> modsToMemtabled) + List<Pair<Modification, IMemTable>> modsToMemtabled, + Filter globalTimeFilter) throws IOException, QueryProcessException, MetadataException; void queryForDeviceRegionScan( @@ -134,7 +135,8 @@ public interface IMemTable extends WALEntryValue { long ttlLowerBound, Map<String, List<IChunkMetadata>> chunkMetadataMap, Map<String, List<IChunkHandle>> memChunkHandleMap, - List<Pair<Modification, IMemTable>> modsToMemtabled) + List<Pair<Modification, IMemTable>> modsToMemtabled, + Filter globalTimeFilter) throws IOException, QueryProcessException, MetadataException; /** putBack all the memory resources. */ diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/TsFileProcessor.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/TsFileProcessor.java index a508ec48a88..6bbcb427f87 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/TsFileProcessor.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/TsFileProcessor.java @@ -1826,7 +1826,8 @@ public class TsFileProcessor { public void queryForSeriesRegionScanWithoutLock( List<PartialPath> pathList, QueryContext queryContext, - List<IFileScanHandle> fileScanHandlesForQuery) { + List<IFileScanHandle> fileScanHandlesForQuery, + Filter globalTimeFilter) { long startTime = System.nanoTime(); try { Map<IDeviceID, Map<String, List<IChunkHandle>>> deviceToMemChunkHandleMap = new HashMap<>(); @@ -1846,7 +1847,8 @@ public class TsFileProcessor { timeLowerBound, measurementToChunkMetaList, measurementToChunkHandleList, - modsToMemtable); + modsToMemtable, + globalTimeFilter); } if (workMemTable != null) { workMemTable.queryForSeriesRegionScan( @@ -1854,7 +1856,8 @@ public class TsFileProcessor { timeLowerBound, measurementToChunkMetaList, measurementToChunkHandleList, - null); + null, + globalTimeFilter); } IDeviceID deviceID = DeviceIDFactory.getInstance().getDeviceID(seriesPath.getDevice()); // Some memTable have been flushed already, so we need to get the chunk metadata from @@ -1905,7 +1908,8 @@ public class TsFileProcessor { public void queryForDeviceRegionScanWithoutLock( Map<IDeviceID, DeviceContext> devicePathsToContext, QueryContext queryContext, - List<IFileScanHandle> fileScanHandlesForQuery) { + List<IFileScanHandle> fileScanHandlesForQuery, + Filter globalTimeFilter) { long startTime = System.nanoTime(); try { Map<IDeviceID, Map<String, List<IChunkHandle>>> deviceToMemChunkHandleMap = new HashMap<>(); @@ -1930,7 +1934,8 @@ public class TsFileProcessor { timeLowerBound, measurementToChunkMetadataList, measurementToMemChunkHandleList, - modsToMemtable); + modsToMemtable, + globalTimeFilter); } if (workMemTable != null) { workMemTable.queryForDeviceRegionScan( @@ -1939,7 +1944,8 @@ public class TsFileProcessor { timeLowerBound, measurementToChunkMetadataList, measurementToMemChunkHandleList, - null); + null, + globalTimeFilter); } buildChunkHandleForFlushedMemTable( diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/WritableMemChunk.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/WritableMemChunk.java index 01e91387612..c19ad4f5a22 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/WritableMemChunk.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/WritableMemChunk.java @@ -30,6 +30,7 @@ import org.apache.iotdb.db.utils.datastructure.TVList; import org.apache.tsfile.enums.TSDataType; import org.apache.tsfile.read.TimeValuePair; import org.apache.tsfile.read.common.TimeRange; +import org.apache.tsfile.read.filter.basic.Filter; import org.apache.tsfile.utils.Binary; import org.apache.tsfile.utils.BitMap; import org.apache.tsfile.write.UnSupportedDataTypeException; @@ -43,10 +44,9 @@ import java.io.DataInputStream; import java.io.IOException; import java.nio.ByteBuffer; import java.util.ArrayList; -import java.util.Arrays; import java.util.List; +import java.util.Optional; import java.util.concurrent.BlockingQueue; -import java.util.stream.Collectors; import static org.apache.iotdb.db.utils.MemUtils.getBinarySize; @@ -554,11 +554,30 @@ public class WritableMemChunk extends AbstractWritableMemChunk { return sortedList; } - private void filterDeletedTimestamp( - TVList tvlist, List<TimeRange> deletionList, List<Long> timestampList) { - long lastTime = Long.MIN_VALUE; + public Optional<Long> getAnySatisfiedTimestamp( + List<TimeRange> deletionList, Filter globalTimeFilter) { + Optional<Long> anySatisfiedTimestamp = + getAnySatisfiedTimestamp(list, deletionList, globalTimeFilter); + if (anySatisfiedTimestamp.isPresent()) { + return anySatisfiedTimestamp; + } + for (TVList tvList : sortedList) { + anySatisfiedTimestamp = getAnySatisfiedTimestamp(tvList, deletionList, globalTimeFilter); + if (anySatisfiedTimestamp.isPresent()) { + break; + } + } + return anySatisfiedTimestamp; + } + + private Optional<Long> getAnySatisfiedTimestamp( + TVList tvlist, List<TimeRange> deletionList, Filter globalTimeFilter) { int[] deletionCursor = {0}; int rowCount = tvlist.rowCount(); + if (globalTimeFilter != null + && !globalTimeFilter.satisfyStartEndTime(tvlist.getMinTime(), tvlist.getMaxTime())) { + return Optional.empty(); + } for (int i = 0; i < rowCount; i++) { if (tvlist.getBitMap() != null && tvlist.isNullValue(tvlist.getValueIndex(i))) { continue; @@ -568,26 +587,11 @@ public class WritableMemChunk extends AbstractWritableMemChunk { && ModificationUtils.isPointDeleted(curTime, deletionList, deletionCursor)) { continue; } - - if (i == rowCount - 1 || curTime != lastTime) { - timestampList.add(curTime); + if (globalTimeFilter != null && !globalTimeFilter.satisfy(curTime, null)) { + continue; } - lastTime = curTime; + return Optional.of(curTime); } - } - - public long[] getFilteredTimestamp(List<TimeRange> deletionList) { - List<Long> timestampList = new ArrayList<>(); - filterDeletedTimestamp(list, deletionList, timestampList); - for (TVList tvList : sortedList) { - filterDeletedTimestamp(tvList, deletionList, timestampList); - } - - // remove duplicated time - List<Long> distinctTimestamps = timestampList.stream().distinct().collect(Collectors.toList()); - // sort timestamps - long[] filteredTimestamps = distinctTimestamps.stream().mapToLong(Long::longValue).toArray(); - Arrays.sort(filteredTimestamps); - return filteredTimestamps; + return Optional.empty(); } } diff --git a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/memtable/WritableMemChunkRegionScanTest.java b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/memtable/WritableMemChunkRegionScanTest.java new file mode 100644 index 00000000000..63bbac18e38 --- /dev/null +++ b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/memtable/WritableMemChunkRegionScanTest.java @@ -0,0 +1,299 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.db.storageengine.dataregion.memtable; + +import org.apache.iotdb.commons.exception.IllegalPathException; +import org.apache.iotdb.commons.path.AlignedPath; +import org.apache.iotdb.commons.path.MeasurementPath; +import org.apache.iotdb.db.conf.IoTDBDescriptor; +import org.apache.iotdb.db.storageengine.dataregion.read.filescan.IChunkHandle; + +import org.apache.tsfile.enums.TSDataType; +import org.apache.tsfile.file.metadata.PlainDeviceID; +import org.apache.tsfile.read.common.TimeRange; +import org.apache.tsfile.read.filter.operator.TimeFilterOperators; +import org.apache.tsfile.utils.BitMap; +import org.apache.tsfile.write.schema.IMeasurementSchema; +import org.apache.tsfile.write.schema.MeasurementSchema; +import org.junit.After; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collection; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.stream.Collectors; + +@RunWith(Parameterized.class) +public class WritableMemChunkRegionScanTest { + + @Parameterized.Parameters(name = "{0}") + public static Collection<Object[]> data() { + return Arrays.asList(new Object[][] {{0}, {1000}, {10000}, {20000}}); + } + + private int defaultTvListThreshold; + private int tvListSortThreshold; + + public WritableMemChunkRegionScanTest(int tvListSortThreshold) { + this.tvListSortThreshold = tvListSortThreshold; + } + + @Before + public void setup() { + defaultTvListThreshold = IoTDBDescriptor.getInstance().getConfig().getTvListSortThreshold(); + IoTDBDescriptor.getInstance().getConfig().setTVListSortThreshold(tvListSortThreshold); + } + + @After + public void tearDown() { + IoTDBDescriptor.getInstance().getConfig().setTVListSortThreshold(defaultTvListThreshold); + } + + @Test + public void testAlignedWritableMemChunkRegionScan() throws IllegalPathException { + PrimitiveMemTable memTable = new PrimitiveMemTable("root.test", "0"); + try { + List<IMeasurementSchema> measurementSchemas = + Arrays.asList( + new MeasurementSchema("s1", TSDataType.INT32), + new MeasurementSchema("s2", TSDataType.INT32), + new MeasurementSchema("s3", TSDataType.INT32)); + AlignedWritableMemChunk writableMemChunk = null; + int size = 100000; + for (int i = 0; i < size; i++) { + if (i <= 10000) { + memTable.writeAlignedRow( + new PlainDeviceID("root.test.d1"), measurementSchemas, i, new Object[] {1, null, 1}); + } else if (i <= 20000) { + memTable.writeAlignedRow( + new PlainDeviceID("root.test.d1"), + measurementSchemas, + i, + new Object[] {null, null, 2}); + } else if (i <= 30000) { + memTable.writeAlignedRow( + new PlainDeviceID("root.test.d1"), + measurementSchemas, + i, + new Object[] {3, null, null}); + } else { + memTable.writeAlignedRow( + new PlainDeviceID("root.test.d1"), measurementSchemas, i, new Object[] {4, 4, 4}); + } + } + writableMemChunk = + (AlignedWritableMemChunk) + memTable.getWritableMemChunk(new PlainDeviceID("root.test.d1"), ""); + List<BitMap> bitMaps = new ArrayList<>(); + long[] timestamps = + writableMemChunk.getAnySatisfiedTimestamp( + Arrays.asList( + Collections.emptyList(), Collections.emptyList(), Collections.emptyList()), + bitMaps, + null); + Assert.assertEquals(2, timestamps.length); + Assert.assertEquals(0, timestamps[0]); + Assert.assertFalse(bitMaps.get(0).isMarked(0)); + Assert.assertTrue(bitMaps.get(0).isMarked(1)); + Assert.assertFalse(bitMaps.get(0).isMarked(2)); + Assert.assertTrue(bitMaps.get(1).isMarked(0)); + Assert.assertFalse(bitMaps.get(1).isMarked(1)); + Assert.assertTrue(bitMaps.get(1).isMarked(2)); + Assert.assertEquals(30001, timestamps[1]); + + bitMaps = new ArrayList<>(); + timestamps = + writableMemChunk.getAnySatisfiedTimestamp( + Arrays.asList( + Collections.emptyList(), + Collections.emptyList(), + Collections.singletonList(new TimeRange(0, 12000))), + bitMaps, + new TimeFilterOperators.TimeGt(10000000)); + Assert.assertEquals(0, timestamps.length); + + bitMaps = new ArrayList<>(); + timestamps = + writableMemChunk.getAnySatisfiedTimestamp( + Arrays.asList( + Collections.emptyList(), + Collections.emptyList(), + Collections.singletonList(new TimeRange(0, 12000))), + bitMaps, + new TimeFilterOperators.TimeGt(11000)); + + Assert.assertEquals(3, timestamps.length); + Assert.assertEquals(12001, timestamps[0]); + Assert.assertTrue(bitMaps.get(0).isMarked(0)); + Assert.assertTrue(bitMaps.get(0).isMarked(1)); + Assert.assertFalse(bitMaps.get(0).isMarked(2)); + Assert.assertEquals(20001, timestamps[1]); + Assert.assertFalse(bitMaps.get(1).isMarked(0)); + Assert.assertTrue(bitMaps.get(1).isMarked(1)); + Assert.assertTrue(bitMaps.get(1).isMarked(2)); + Assert.assertEquals(30001, timestamps[2]); + Assert.assertTrue(bitMaps.get(2).isMarked(0)); + Assert.assertFalse(bitMaps.get(2).isMarked(1)); + Assert.assertTrue(bitMaps.get(2).isMarked(2)); + + writableMemChunk.writeAlignedPoints( + 1000001, new Object[] {1, null, null}, measurementSchemas); + writableMemChunk.writeAlignedPoints( + 1000002, new Object[] {null, 1, null}, measurementSchemas); + writableMemChunk.writeAlignedPoints(1000002, new Object[] {1, 1, null}, measurementSchemas); + writableMemChunk.writeAlignedPoints( + 1000003, new Object[] {1, null, null}, measurementSchemas); + writableMemChunk.writeAlignedPoints(1000004, new Object[] {1, null, 1}, measurementSchemas); + bitMaps = new ArrayList<>(); + timestamps = + writableMemChunk.getAnySatisfiedTimestamp( + Arrays.asList( + Collections.emptyList(), Collections.emptyList(), Collections.emptyList()), + bitMaps, + new TimeFilterOperators.TimeGt(1000000)); + Assert.assertEquals(3, timestamps.length); + Assert.assertEquals(1000001, timestamps[0]); + Assert.assertFalse(bitMaps.get(0).isMarked(0)); + Assert.assertTrue(bitMaps.get(0).isMarked(1)); + Assert.assertTrue(bitMaps.get(0).isMarked(2)); + Assert.assertEquals(1000002, timestamps[1]); + Assert.assertTrue(bitMaps.get(1).isMarked(0)); + Assert.assertFalse(bitMaps.get(1).isMarked(1)); + Assert.assertTrue(bitMaps.get(1).isMarked(2)); + Assert.assertEquals(1000004, timestamps[2]); + Assert.assertTrue(bitMaps.get(2).isMarked(0)); + Assert.assertTrue(bitMaps.get(2).isMarked(1)); + Assert.assertFalse(bitMaps.get(2).isMarked(2)); + + Map<String, List<IChunkHandle>> chunkHandleMap = new HashMap<>(); + memTable.queryForDeviceRegionScan( + new PlainDeviceID("root.test.d1"), + true, + Long.MIN_VALUE, + new HashMap<>(), + chunkHandleMap, + Collections.emptyList(), + new TimeFilterOperators.TimeGt(1000000)); + Assert.assertEquals(3, chunkHandleMap.size()); + Assert.assertArrayEquals( + new long[] {1000001, 1000001}, chunkHandleMap.get("s1").get(0).getPageStatisticsTime()); + Assert.assertArrayEquals( + new long[] {1000002, 1000002}, chunkHandleMap.get("s2").get(0).getPageStatisticsTime()); + Assert.assertArrayEquals( + new long[] {1000004, 1000004}, chunkHandleMap.get("s3").get(0).getPageStatisticsTime()); + + memTable.queryForSeriesRegionScan( + new AlignedPath( + "root.test.d1", + measurementSchemas.stream() + .map(IMeasurementSchema::getMeasurementId) + .collect(Collectors.toList()), + measurementSchemas), + Long.MIN_VALUE, + new HashMap<>(), + chunkHandleMap, + Collections.emptyList(), + new TimeFilterOperators.TimeGt(1000000)); + Assert.assertEquals(3, chunkHandleMap.size()); + Assert.assertArrayEquals( + new long[] {1000001, 1000001}, chunkHandleMap.get("s1").get(0).getPageStatisticsTime()); + Assert.assertArrayEquals( + new long[] {1000002, 1000002}, chunkHandleMap.get("s2").get(0).getPageStatisticsTime()); + Assert.assertArrayEquals( + new long[] {1000004, 1000004}, chunkHandleMap.get("s3").get(0).getPageStatisticsTime()); + } finally { + memTable.release(); + } + } + + @Test + public void testNonAlignedWritableMemChunkRegionScan() throws IllegalPathException { + PrimitiveMemTable memTable = new PrimitiveMemTable("root.test", "0"); + try { + MeasurementSchema measurementSchema = new MeasurementSchema("s1", TSDataType.INT32); + int size = 100000; + for (int i = 0; i < size; i++) { + memTable.write( + new PlainDeviceID("root.test.d1"), + Collections.singletonList(measurementSchema), + i, + new Object[] {i}); + } + WritableMemChunk writableMemChunk = + (WritableMemChunk) memTable.getWritableMemChunk(new PlainDeviceID("root.test.d1"), "s1"); + Optional<Long> timestamp = writableMemChunk.getAnySatisfiedTimestamp(null, null); + Assert.assertTrue(timestamp.isPresent()); + Assert.assertEquals(0, timestamp.get().longValue()); + + timestamp = + writableMemChunk.getAnySatisfiedTimestamp( + null, new TimeFilterOperators.TimeBetweenAnd(1000L, 2000L)); + Assert.assertTrue(timestamp.isPresent()); + Assert.assertEquals(1000, timestamp.get().longValue()); + + timestamp = + writableMemChunk.getAnySatisfiedTimestamp( + Collections.singletonList(new TimeRange(1, 1500)), + new TimeFilterOperators.TimeBetweenAnd(1000L, 2000L)); + Assert.assertTrue(timestamp.isPresent()); + Assert.assertEquals(1501, timestamp.get().longValue()); + + timestamp = + writableMemChunk.getAnySatisfiedTimestamp( + Collections.singletonList(new TimeRange(1, 1500)), + new TimeFilterOperators.TimeBetweenAnd(100000L, 200000L)); + Assert.assertFalse(timestamp.isPresent()); + + Map<String, List<IChunkHandle>> chunkHandleMap = new HashMap<>(); + memTable.queryForDeviceRegionScan( + new PlainDeviceID("root.test.d1"), + false, + Long.MIN_VALUE, + new HashMap<>(), + chunkHandleMap, + Collections.emptyList(), + new TimeFilterOperators.TimeGt(1)); + Assert.assertEquals(1, chunkHandleMap.size()); + Assert.assertArrayEquals( + new long[] {2, 2}, chunkHandleMap.get("s1").get(0).getPageStatisticsTime()); + memTable.queryForSeriesRegionScan( + new MeasurementPath("root.test.d1", "s1", measurementSchema), + Long.MIN_VALUE, + new HashMap<>(), + chunkHandleMap, + Collections.emptyList(), + new TimeFilterOperators.TimeGt(1)); + Assert.assertEquals(1, chunkHandleMap.size()); + Assert.assertArrayEquals( + new long[] {2, 2}, chunkHandleMap.get("s1").get(0).getPageStatisticsTime()); + } finally { + memTable.release(); + } + } +}
