This is an automated email from the ASF dual-hosted git repository. shuwenwei pushed a commit to branch optimizeMemTableRegionScan in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit e60335cc4dbe8b8f53e31c08216e24573cd75540 Author: shuwenwei <[email protected]> AuthorDate: Tue Dec 9 12:18:34 2025 +0800 optimize memtable region scan --- .../db/storageengine/dataregion/DataRegion.java | 6 +- .../dataregion/memtable/AbstractMemTable.java | 83 +++++--- .../memtable/AlignedWritableMemChunk.java | 100 +++++++--- .../dataregion/memtable/IMemTable.java | 6 +- .../dataregion/memtable/TsFileProcessor.java | 18 +- .../dataregion/memtable/WritableMemChunk.java | 52 ++--- .../memtable/AlignedTVListIteratorTest.java | 29 +++ .../memtable/WritableMemChunkRegionScanTest.java | 219 +++++++++++++++++++++ 8 files changed, 429 insertions(+), 84 deletions(-) diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/DataRegion.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/DataRegion.java index 91c36262cbd..9293f9867b6 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/DataRegion.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/DataRegion.java @@ -2435,7 +2435,8 @@ public class DataRegion implements IDataRegionForQuery { } else { tsFileResource .getProcessor() - .queryForSeriesRegionScanWithoutLock(partialPaths, context, fileScanHandles); + .queryForSeriesRegionScanWithoutLock( + partialPaths, context, fileScanHandles, globalTimeFilter); } } return fileScanHandles; @@ -2512,7 +2513,8 @@ public class DataRegion implements IDataRegionForQuery { } else { tsFileResource .getProcessor() - .queryForDeviceRegionScanWithoutLock(devicePathsToContext, context, fileScanHandles); + .queryForDeviceRegionScanWithoutLock( + devicePathsToContext, context, fileScanHandles, globalTimeFilter); } } return fileScanHandles; diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/AbstractMemTable.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/AbstractMemTable.java index 92803984cee..78c918f4463 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/AbstractMemTable.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/AbstractMemTable.java @@ -70,6 +70,7 @@ import java.util.List; import java.util.Map; import java.util.Map.Entry; import java.util.Objects; +import java.util.Optional; import java.util.concurrent.atomic.AtomicLong; import java.util.stream.Collectors; @@ -488,7 +489,8 @@ public abstract class AbstractMemTable implements IMemTable { long ttlLowerBound, Map<String, List<IChunkMetadata>> chunkMetaDataMap, Map<String, List<IChunkHandle>> memChunkHandleMap, - List<Pair<ModEntry, IMemTable>> modsToMemTabled) { + List<Pair<ModEntry, IMemTable>> modsToMemTabled, + Filter globalTimeFilter) { IDeviceID deviceID = fullPath.getDeviceId(); if (fullPath instanceof NonAlignedFullPath) { @@ -506,7 +508,12 @@ public abstract class AbstractMemTable implements IMemTable { fullPath.getDeviceId(), measurementId, this, modsToMemTabled, ttlLowerBound); } getMemChunkHandleFromMemTable( - deviceID, measurementId, chunkMetaDataMap, memChunkHandleMap, deletionList); + deviceID, + measurementId, + chunkMetaDataMap, + memChunkHandleMap, + deletionList, + globalTimeFilter); } else { // check If MemTable Contains this path if (!memTableMap.containsKey(deviceID)) { @@ -528,7 +535,8 @@ public abstract class AbstractMemTable implements IMemTable { ((AlignedFullPath) fullPath).getSchemaList(), chunkMetaDataMap, memChunkHandleMap, - deletionList); + deletionList, + globalTimeFilter); } } @@ -539,7 +547,8 @@ public abstract class AbstractMemTable implements IMemTable { long ttlLowerBound, Map<String, List<IChunkMetadata>> chunkMetadataMap, Map<String, List<IChunkHandle>> memChunkHandleMap, - List<Pair<ModEntry, IMemTable>> modsToMemTabled) { + List<Pair<ModEntry, IMemTable>> modsToMemTabled, + Filter globalTimeFilter) { Map<IDeviceID, IWritableMemChunkGroup> memTableMap = getMemTableMap(); @@ -556,7 +565,8 @@ public abstract class AbstractMemTable implements IMemTable { chunkMetadataMap, memChunkHandleMap, ttlLowerBound, - modsToMemTabled); + modsToMemTabled, + globalTimeFilter); } else { getMemChunkHandleFromMemTable( deviceID, @@ -564,7 +574,8 @@ public abstract class AbstractMemTable implements IMemTable { chunkMetadataMap, memChunkHandleMap, ttlLowerBound, - modsToMemTabled); + modsToMemTabled, + globalTimeFilter); } } @@ -573,24 +584,27 @@ public abstract class AbstractMemTable implements IMemTable { String measurementId, Map<String, List<IChunkMetadata>> chunkMetadataMap, Map<String, List<IChunkHandle>> memChunkHandleMap, - List<TimeRange> deletionList) { + List<TimeRange> deletionList, + Filter globalTimeFilter) { WritableMemChunk memChunk = (WritableMemChunk) memTableMap.get(deviceID).getMemChunkMap().get(measurementId); - long[] timestamps = memChunk.getFilteredTimestamp(deletionList); + Optional<Long> anySatisfiedTimestamp = + memChunk.getAnySatisfiedTimestamp(deletionList, globalTimeFilter); + if (!anySatisfiedTimestamp.isPresent()) { + return; + } + long satisfiedTimestamp = anySatisfiedTimestamp.get(); chunkMetadataMap .computeIfAbsent(measurementId, k -> new ArrayList<>()) .add( - buildChunkMetaDataForMemoryChunk( - measurementId, - timestamps[0], - timestamps[timestamps.length - 1], - Collections.emptyList())); + buildFakeChunkMetaDataForFakeMemoryChunk( + measurementId, satisfiedTimestamp, satisfiedTimestamp, Collections.emptyList())); memChunkHandleMap .computeIfAbsent(measurementId, k -> new ArrayList<>()) - .add(new MemChunkHandleImpl(deviceID, measurementId, timestamps)); + .add(new MemChunkHandleImpl(deviceID, measurementId, new long[] {satisfiedTimestamp})); } private void getMemAlignedChunkHandleFromMemTable( @@ -598,7 +612,8 @@ public abstract class AbstractMemTable implements IMemTable { List<IMeasurementSchema> schemaList, Map<String, List<IChunkMetadata>> chunkMetadataList, Map<String, List<IChunkHandle>> memChunkHandleMap, - List<List<TimeRange>> deletionList) { + List<List<TimeRange>> deletionList, + Filter globalTimeFilter) { AlignedWritableMemChunk alignedMemChunk = ((AlignedWritableMemChunkGroup) memTableMap.get(deviceID)).getAlignedMemChunk(); @@ -615,7 +630,11 @@ public abstract class AbstractMemTable implements IMemTable { } List<BitMap> bitMaps = new ArrayList<>(); - long[] timestamps = alignedMemChunk.getFilteredTimestamp(deletionList, bitMaps, true); + long[] timestamps = + alignedMemChunk.getAnySatisfiedTimestamp(deletionList, bitMaps, true, globalTimeFilter); + if (timestamps.length == 0) { + return; + } buildAlignedMemChunkHandle( deviceID, @@ -633,7 +652,8 @@ public abstract class AbstractMemTable implements IMemTable { Map<String, List<IChunkMetadata>> chunkMetadataList, Map<String, List<IChunkHandle>> memChunkHandleMap, long ttlLowerBound, - List<Pair<ModEntry, IMemTable>> modsToMemTabled) { + List<Pair<ModEntry, IMemTable>> modsToMemTabled, + Filter globalTimeFilter) { AlignedWritableMemChunk memChunk = writableMemChunkGroup.getAlignedMemChunk(); List<IMeasurementSchema> schemaList = memChunk.getSchemaList(); @@ -648,7 +668,11 @@ public abstract class AbstractMemTable implements IMemTable { } List<BitMap> bitMaps = new ArrayList<>(); - long[] timestamps = memChunk.getFilteredTimestamp(deletionList, bitMaps, true); + long[] timestamps = + memChunk.getAnySatisfiedTimestamp(deletionList, bitMaps, true, globalTimeFilter); + if (timestamps.length == 0) { + return; + } buildAlignedMemChunkHandle( deviceID, timestamps, @@ -665,7 +689,8 @@ public abstract class AbstractMemTable implements IMemTable { Map<String, List<IChunkMetadata>> chunkMetadataMap, Map<String, List<IChunkHandle>> memChunkHandleMap, long ttlLowerBound, - List<Pair<ModEntry, IMemTable>> modsToMemTabled) { + List<Pair<ModEntry, IMemTable>> modsToMemTabled, + Filter globalTimeFilter) { for (Entry<String, IWritableMemChunk> entry : writableMemChunkGroup.getMemChunkMap().entrySet()) { @@ -679,18 +704,20 @@ public abstract class AbstractMemTable implements IMemTable { ModificationUtils.constructDeletionList( deviceID, measurementId, this, modsToMemTabled, ttlLowerBound); } - long[] timestamps = writableMemChunk.getFilteredTimestamp(deletionList); + Optional<Long> anySatisfiedTimestamp = + writableMemChunk.getAnySatisfiedTimestamp(deletionList, globalTimeFilter); + if (!anySatisfiedTimestamp.isPresent()) { + return; + } + long satisfiedTimestamp = anySatisfiedTimestamp.get(); chunkMetadataMap .computeIfAbsent(measurementId, k -> new ArrayList<>()) .add( - buildChunkMetaDataForMemoryChunk( - measurementId, - timestamps[0], - timestamps[timestamps.length - 1], - Collections.emptyList())); + buildFakeChunkMetaDataForFakeMemoryChunk( + measurementId, satisfiedTimestamp, satisfiedTimestamp, Collections.emptyList())); memChunkHandleMap .computeIfAbsent(measurementId, k -> new ArrayList<>()) - .add(new MemChunkHandleImpl(deviceID, measurementId, timestamps)); + .add(new MemChunkHandleImpl(deviceID, measurementId, new long[] {satisfiedTimestamp})); } } @@ -714,7 +741,7 @@ public abstract class AbstractMemTable implements IMemTable { chunkMetadataList .computeIfAbsent(measurement, k -> new ArrayList<>()) .add( - buildChunkMetaDataForMemoryChunk( + buildFakeChunkMetaDataForFakeMemoryChunk( measurement, startEndTime[0], startEndTime[1], deletion)); chunkHandleMap .computeIfAbsent(measurement, k -> new ArrayList<>()) @@ -745,7 +772,7 @@ public abstract class AbstractMemTable implements IMemTable { return new long[] {startTime, endTime}; } - private IChunkMetadata buildChunkMetaDataForMemoryChunk( + private IChunkMetadata buildFakeChunkMetaDataForFakeMemoryChunk( String measurement, long startTime, long endTime, List<TimeRange> deletionList) { TimeStatistics timeStatistics = new TimeStatistics(); timeStatistics.setStartTime(startTime); diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/AlignedWritableMemChunk.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/AlignedWritableMemChunk.java index f81238bef71..2cbc6091e8b 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/AlignedWritableMemChunk.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/AlignedWritableMemChunk.java @@ -34,6 +34,7 @@ import org.apache.tsfile.encrypt.EncryptParameter; import org.apache.tsfile.encrypt.EncryptUtils; import org.apache.tsfile.enums.TSDataType; import org.apache.tsfile.read.common.TimeRange; +import org.apache.tsfile.read.filter.basic.Filter; import org.apache.tsfile.utils.Binary; import org.apache.tsfile.utils.BitMap; import org.apache.tsfile.utils.Pair; @@ -55,6 +56,7 @@ import java.util.Objects; import java.util.Set; import java.util.TreeMap; import java.util.concurrent.BlockingQueue; +import java.util.concurrent.atomic.AtomicInteger; import static org.apache.iotdb.db.utils.ModificationUtils.isPointDeleted; @@ -287,13 +289,63 @@ public class AlignedWritableMemChunk extends AbstractWritableMemChunk { return new Pair<>(reorderedColumnValues, reorderedBitMaps); } - private void filterDeletedTimeStamp( + public long[] getAnySatisfiedTimestamp( + List<List<TimeRange>> deletionList, + List<BitMap> bitMaps, + boolean ignoreAllNullRows, + Filter globalTimeFilter) { + BitMap columnHasNonNullValue = new BitMap(schemaList.size()); + AtomicInteger hasNonNullValueColumnCount = new AtomicInteger(0); + Map<Long, BitMap> timestampWithBitmap = new TreeMap<>(); + + getAnySatisfiedTimestamp( + list, + deletionList, + ignoreAllNullRows, + timestampWithBitmap, + globalTimeFilter, + columnHasNonNullValue, + hasNonNullValueColumnCount); + for (int i = 0; + i < sortedList.size() && hasNonNullValueColumnCount.get() < schemaList.size(); + i++) { + if (!ignoreAllNullRows && !timestampWithBitmap.isEmpty()) { + // count devices in table model + break; + } + getAnySatisfiedTimestamp( + sortedList.get(i), + deletionList, + ignoreAllNullRows, + timestampWithBitmap, + globalTimeFilter, + columnHasNonNullValue, + hasNonNullValueColumnCount); + } + + long[] timestamps = new long[timestampWithBitmap.size()]; + int idx = 0; + for (Map.Entry<Long, BitMap> entry : timestampWithBitmap.entrySet()) { + timestamps[idx++] = entry.getKey(); + bitMaps.add(entry.getValue()); + } + return timestamps; + } + + private void getAnySatisfiedTimestamp( AlignedTVList alignedTVList, List<List<TimeRange>> valueColumnsDeletionList, boolean ignoreAllNullRows, - Map<Long, BitMap> timestampWithBitmap) { + Map<Long, BitMap> timestampWithBitmap, + Filter globalTimeFilter, + BitMap columnHasNonNullValue, + AtomicInteger hasNonNullValueColumnCount) { + if (globalTimeFilter != null + && !globalTimeFilter.satisfyStartEndTime( + alignedTVList.getMinTime(), alignedTVList.getMaxTime())) { + return; + } BitMap allValueColDeletedMap = alignedTVList.getAllValueColDeletedMap(); - int rowCount = alignedTVList.rowCount(); List<int[]> valueColumnDeleteCursor = new ArrayList<>(); if (valueColumnsDeletionList != null) { @@ -306,11 +358,17 @@ public class AlignedWritableMemChunk extends AbstractWritableMemChunk { continue; } long timestamp = alignedTVList.getTime(row); + if (globalTimeFilter != null && !globalTimeFilter.satisfy(timestamp, null)) { + continue; + } BitMap bitMap = new BitMap(schemaList.size()); + + boolean foundAnyNewColumnWithNonNullValue = false; for (int column = 0; column < schemaList.size(); column++) { if (alignedTVList.isNullValue(alignedTVList.getValueIndex(row), column)) { bitMap.mark(column); + continue; } // skip deleted row @@ -321,32 +379,30 @@ public class AlignedWritableMemChunk extends AbstractWritableMemChunk { valueColumnsDeletionList.get(column), valueColumnDeleteCursor.get(column))) { bitMap.mark(column); + continue; } - // skip all-null row - if (ignoreAllNullRows && bitMap.isAllMarked()) { - continue; + if (!columnHasNonNullValue.isMarked(column)) { + hasNonNullValueColumnCount.incrementAndGet(); + foundAnyNewColumnWithNonNullValue = true; + columnHasNonNullValue.mark(column); } - timestampWithBitmap.put(timestamp, bitMap); } - } - } - - public long[] getFilteredTimestamp( - List<List<TimeRange>> deletionList, List<BitMap> bitMaps, boolean ignoreAllNullRows) { - Map<Long, BitMap> timestampWithBitmap = new TreeMap<>(); - filterDeletedTimeStamp(list, deletionList, ignoreAllNullRows, timestampWithBitmap); - for (AlignedTVList alignedTVList : sortedList) { - filterDeletedTimeStamp(alignedTVList, deletionList, ignoreAllNullRows, timestampWithBitmap); - } + if (!ignoreAllNullRows) { + // count devices in table model + timestampWithBitmap.put(timestamp, bitMap); + return; + } + if (!foundAnyNewColumnWithNonNullValue) { + continue; + } + timestampWithBitmap.put(timestamp, bitMap); - List<Long> filteredTimestamps = new ArrayList<>(); - for (Map.Entry<Long, BitMap> entry : timestampWithBitmap.entrySet()) { - filteredTimestamps.add(entry.getKey()); - bitMaps.add(entry.getValue()); + if (hasNonNullValueColumnCount.get() == schemaList.size()) { + return; + } } - return filteredTimestamps.stream().mapToLong(Long::valueOf).toArray(); } @Override diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/IMemTable.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/IMemTable.java index 6c6e09c5782..fd9ffe90b0a 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/IMemTable.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/IMemTable.java @@ -125,7 +125,8 @@ public interface IMemTable extends WALEntryValue { long ttlLowerBound, Map<String, List<IChunkMetadata>> chunkMetadataMap, Map<String, List<IChunkHandle>> memChunkHandleMap, - List<Pair<ModEntry, IMemTable>> modsToMemtabled) + List<Pair<ModEntry, IMemTable>> modsToMemtabled, + Filter globalTimeFilter) throws IOException, QueryProcessException, MetadataException; void queryForDeviceRegionScan( @@ -134,7 +135,8 @@ public interface IMemTable extends WALEntryValue { long ttlLowerBound, Map<String, List<IChunkMetadata>> chunkMetadataMap, Map<String, List<IChunkHandle>> memChunkHandleMap, - List<Pair<ModEntry, IMemTable>> modsToMemtabled) + List<Pair<ModEntry, IMemTable>> modsToMemtabled, + Filter globalTimeFilter) throws IOException, QueryProcessException, MetadataException; /** putBack all the memory resources. */ diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/TsFileProcessor.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/TsFileProcessor.java index 68776c92c0b..36cbb4e0f88 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/TsFileProcessor.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/TsFileProcessor.java @@ -1974,7 +1974,8 @@ public class TsFileProcessor { public void queryForSeriesRegionScanWithoutLock( List<IFullPath> pathList, QueryContext queryContext, - List<IFileScanHandle> fileScanHandlesForQuery) { + List<IFileScanHandle> fileScanHandlesForQuery, + Filter globalTimeFilter) { long startTime = System.nanoTime(); try { Map<IDeviceID, Map<String, List<IChunkHandle>>> deviceToMemChunkHandleMap = new HashMap<>(); @@ -1995,7 +1996,8 @@ public class TsFileProcessor { timeLowerBound, measurementToChunkMetaList, measurementToChunkHandleList, - modsToMemtable); + modsToMemtable, + globalTimeFilter); } if (workMemTable != null) { workMemTable.queryForSeriesRegionScan( @@ -2003,7 +2005,8 @@ public class TsFileProcessor { timeLowerBound, measurementToChunkMetaList, measurementToChunkHandleList, - null); + null, + globalTimeFilter); } IDeviceID deviceID = seriesPath.getDeviceId(); // Some memTable have been flushed already, so we need to get the chunk metadata from @@ -2054,7 +2057,8 @@ public class TsFileProcessor { public void queryForDeviceRegionScanWithoutLock( Map<IDeviceID, DeviceContext> devicePathsToContext, QueryContext queryContext, - List<IFileScanHandle> fileScanHandlesForQuery) { + List<IFileScanHandle> fileScanHandlesForQuery, + Filter globalTimeFilter) { long startTime = System.nanoTime(); try { Map<IDeviceID, Map<String, List<IChunkHandle>>> deviceToMemChunkHandleMap = new HashMap<>(); @@ -2077,7 +2081,8 @@ public class TsFileProcessor { timeLowerBound, measurementToChunkMetadataList, measurementToMemChunkHandleList, - modsToMemtable); + modsToMemtable, + globalTimeFilter); } if (workMemTable != null) { workMemTable.queryForDeviceRegionScan( @@ -2086,7 +2091,8 @@ public class TsFileProcessor { timeLowerBound, measurementToChunkMetadataList, measurementToMemChunkHandleList, - null); + null, + globalTimeFilter); } buildChunkHandleForFlushedMemTable( diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/WritableMemChunk.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/WritableMemChunk.java index ebf23154d15..dbc3183df80 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/WritableMemChunk.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/WritableMemChunk.java @@ -35,6 +35,7 @@ import org.apache.tsfile.encrypt.EncryptUtils; import org.apache.tsfile.enums.TSDataType; import org.apache.tsfile.read.TimeValuePair; import org.apache.tsfile.read.common.TimeRange; +import org.apache.tsfile.read.filter.basic.Filter; import org.apache.tsfile.utils.Binary; import org.apache.tsfile.utils.BitMap; import org.apache.tsfile.write.UnSupportedDataTypeException; @@ -48,10 +49,9 @@ import java.io.DataInputStream; import java.io.IOException; import java.nio.ByteBuffer; import java.util.ArrayList; -import java.util.Arrays; import java.util.List; +import java.util.Optional; import java.util.concurrent.BlockingQueue; -import java.util.stream.Collectors; import static org.apache.iotdb.db.utils.MemUtils.getBinarySize; @@ -574,11 +574,30 @@ public class WritableMemChunk extends AbstractWritableMemChunk { return sortedList; } - private void filterDeletedTimestamp( - TVList tvlist, List<TimeRange> deletionList, List<Long> timestampList) { - long lastTime = Long.MIN_VALUE; + public Optional<Long> getAnySatisfiedTimestamp( + List<TimeRange> deletionList, Filter globalTimeFilter) { + Optional<Long> anySatisfiedTimestamp = + getAnySatisfiedTimestamp(list, deletionList, globalTimeFilter); + if (anySatisfiedTimestamp.isPresent()) { + return anySatisfiedTimestamp; + } + for (TVList tvList : sortedList) { + anySatisfiedTimestamp = getAnySatisfiedTimestamp(tvList, deletionList, globalTimeFilter); + if (anySatisfiedTimestamp.isPresent()) { + break; + } + } + return anySatisfiedTimestamp; + } + + private Optional<Long> getAnySatisfiedTimestamp( + TVList tvlist, List<TimeRange> deletionList, Filter globalTimeFilter) { int[] deletionCursor = {0}; int rowCount = tvlist.rowCount(); + if (globalTimeFilter != null + && !globalTimeFilter.satisfyStartEndTime(tvlist.getMinTime(), tvlist.getMaxTime())) { + return Optional.empty(); + } for (int i = 0; i < rowCount; i++) { if (tvlist.getBitMap() != null && tvlist.isNullValue(tvlist.getValueIndex(i))) { continue; @@ -588,27 +607,12 @@ public class WritableMemChunk extends AbstractWritableMemChunk { && ModificationUtils.isPointDeleted(curTime, deletionList, deletionCursor)) { continue; } - - if (i == rowCount - 1 || curTime != lastTime) { - timestampList.add(curTime); + if (globalTimeFilter != null && !globalTimeFilter.satisfy(curTime, null)) { + continue; } - lastTime = curTime; + return Optional.of(curTime); } - } - - public long[] getFilteredTimestamp(List<TimeRange> deletionList) { - List<Long> timestampList = new ArrayList<>(); - filterDeletedTimestamp(list, deletionList, timestampList); - for (TVList tvList : sortedList) { - filterDeletedTimestamp(tvList, deletionList, timestampList); - } - - // remove duplicated time - List<Long> distinctTimestamps = timestampList.stream().distinct().collect(Collectors.toList()); - // sort timestamps - long[] filteredTimestamps = distinctTimestamps.stream().mapToLong(Long::longValue).toArray(); - Arrays.sort(filteredTimestamps); - return filteredTimestamps; + return Optional.empty(); } @Override diff --git a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/memtable/AlignedTVListIteratorTest.java b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/memtable/AlignedTVListIteratorTest.java index b0c383e3213..915ee8d7f5f 100644 --- a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/memtable/AlignedTVListIteratorTest.java +++ b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/memtable/AlignedTVListIteratorTest.java @@ -44,7 +44,9 @@ import org.apache.tsfile.read.filter.operator.LongFilterOperators; import org.apache.tsfile.read.filter.operator.TimeFilterOperators; import org.apache.tsfile.read.reader.series.PaginationController; import org.apache.tsfile.write.chunk.AlignedChunkWriterImpl; +import org.apache.tsfile.write.record.Tablet; import org.apache.tsfile.write.schema.IMeasurementSchema; +import org.apache.tsfile.write.schema.MeasurementSchema; import org.apache.tsfile.write.schema.VectorMeasurementSchema; import org.junit.AfterClass; import org.junit.Assert; @@ -1009,4 +1011,31 @@ public class AlignedTVListIteratorTest { } Assert.assertEquals(expectedCount, count); } + + @Test + public void test() { + MeasurementSchema measurementSchema = new MeasurementSchema("s1", TSDataType.INT32); + WritableMemChunk writableMemChunk = new WritableMemChunk(measurementSchema); + Tablet tablet = new Tablet("root.test.d1", Collections.singletonList(measurementSchema)); + int size = 3000000; + for (int i = 0; i < size; i++) { + writableMemChunk.writeNonAlignedPoint(i, i); + } + for (int j = 0; j < 10; j++) { + // for (int i = 0; i < size; i++) { + // if (tablet.getRowSize() == tablet.getMaxRowNumber() || i == size - 1) { + // writableMemChunk.writeNonAlignedTablet(tablet.getTimestamps(), + // tablet.getValues()[0], tablet.getBitMaps()[0], TSDataType.INT32, 0, tablet.getRowSize()); + // tablet.reset(); + // } + // int rowIdx = i % tablet.getMaxRowNumber(); + // tablet.addTimestamp(rowIdx, i); + // tablet.addValue(rowIdx, 0, i); + // } + long start = System.currentTimeMillis(); + // long[] filteredTimestamp = writableMemChunk.getAnySatisfiedTimestamp(null, null); + writableMemChunk.getAnySatisfiedTimestamp(null, null); + System.out.println("cost: " + (System.currentTimeMillis() - start)); + } + } } diff --git a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/memtable/WritableMemChunkRegionScanTest.java b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/memtable/WritableMemChunkRegionScanTest.java new file mode 100644 index 00000000000..4130e3032e8 --- /dev/null +++ b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/memtable/WritableMemChunkRegionScanTest.java @@ -0,0 +1,219 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.db.storageengine.dataregion.memtable; + +import org.apache.iotdb.db.conf.IoTDBDescriptor; + +import org.apache.tsfile.enums.TSDataType; +import org.apache.tsfile.read.common.TimeRange; +import org.apache.tsfile.read.filter.operator.TimeFilterOperators; +import org.apache.tsfile.utils.BitMap; +import org.apache.tsfile.write.schema.IMeasurementSchema; +import org.apache.tsfile.write.schema.MeasurementSchema; +import org.junit.After; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collection; +import java.util.Collections; +import java.util.List; +import java.util.Optional; + +@RunWith(Parameterized.class) +public class WritableMemChunkRegionScanTest { + + @Parameterized.Parameters(name = "{0}") + public static Collection<Object[]> data() { + return Arrays.asList(new Object[][] {{0}, {1000}, {10000}, {20000}}); + } + + private int defaultTvListThreshold; + private int tvListSortThreshold; + + public WritableMemChunkRegionScanTest(int tvListSortThreshold) { + this.tvListSortThreshold = tvListSortThreshold; + } + + @Before + public void setup() { + defaultTvListThreshold = IoTDBDescriptor.getInstance().getConfig().getTvListSortThreshold(); + IoTDBDescriptor.getInstance().getConfig().setTVListSortThreshold(tvListSortThreshold); + } + + @After + public void tearDown() { + IoTDBDescriptor.getInstance().getConfig().setTVListSortThreshold(defaultTvListThreshold); + } + + @Test + public void testAlignedWritableMemChunkRegionScan() { + List<IMeasurementSchema> measurementSchemas = + Arrays.asList( + new MeasurementSchema("s1", TSDataType.INT32), + new MeasurementSchema("s2", TSDataType.INT32), + new MeasurementSchema("s3", TSDataType.INT32)); + AlignedWritableMemChunk writableMemChunk = + new AlignedWritableMemChunk(measurementSchemas, false); + int size = 100000; + for (int i = 0; i < size; i++) { + if (i <= 10000) { + writableMemChunk.writeAlignedPoints(i, new Object[] {1, null, 1}, measurementSchemas); + } else if (i <= 20000) { + writableMemChunk.writeAlignedPoints(i, new Object[] {null, null, 2}, measurementSchemas); + } else if (i <= 30000) { + writableMemChunk.writeAlignedPoints(i, new Object[] {3, null, null}, measurementSchemas); + } else { + writableMemChunk.writeAlignedPoints(i, new Object[] {4, 4, 4}, measurementSchemas); + } + } + List<BitMap> bitMaps = new ArrayList<>(); + long[] timestamps = + writableMemChunk.getAnySatisfiedTimestamp( + Arrays.asList( + Collections.emptyList(), Collections.emptyList(), Collections.emptyList()), + bitMaps, + true, + null); + Assert.assertEquals(2, timestamps.length); + Assert.assertEquals(0, timestamps[0]); + Assert.assertFalse(bitMaps.get(0).isMarked(0)); + Assert.assertTrue(bitMaps.get(0).isMarked(1)); + Assert.assertFalse(bitMaps.get(0).isMarked(2)); + Assert.assertFalse(bitMaps.get(1).isMarked(0)); + Assert.assertFalse(bitMaps.get(1).isMarked(1)); + Assert.assertFalse(bitMaps.get(1).isMarked(2)); + Assert.assertEquals(30001, timestamps[1]); + + bitMaps = new ArrayList<>(); + timestamps = + writableMemChunk.getAnySatisfiedTimestamp( + Arrays.asList( + Collections.emptyList(), + Collections.emptyList(), + Collections.singletonList(new TimeRange(0, 12000))), + bitMaps, + true, + new TimeFilterOperators.TimeGt(11000)); + + Assert.assertEquals(3, timestamps.length); + Assert.assertEquals(12001, timestamps[0]); + Assert.assertTrue(bitMaps.get(0).isMarked(0)); + Assert.assertTrue(bitMaps.get(0).isMarked(1)); + Assert.assertFalse(bitMaps.get(0).isMarked(2)); + Assert.assertEquals(20001, timestamps[1]); + Assert.assertFalse(bitMaps.get(1).isMarked(0)); + Assert.assertTrue(bitMaps.get(1).isMarked(1)); + Assert.assertTrue(bitMaps.get(1).isMarked(2)); + Assert.assertEquals(30001, timestamps[2]); + Assert.assertFalse(bitMaps.get(2).isMarked(0)); + Assert.assertFalse(bitMaps.get(2).isMarked(1)); + Assert.assertFalse(bitMaps.get(2).isMarked(2)); + } + + @Test + public void testTableWritableMemChunkRegionScan() { + List<IMeasurementSchema> measurementSchemas = + Arrays.asList( + new MeasurementSchema("s1", TSDataType.INT32), + new MeasurementSchema("s2", TSDataType.INT32), + new MeasurementSchema("s3", TSDataType.INT32)); + AlignedWritableMemChunk writableMemChunk = + new AlignedWritableMemChunk(measurementSchemas, true); + int size = 100000; + for (int i = 0; i < size; i++) { + if (i <= 10000) { + writableMemChunk.writeAlignedPoints(i, new Object[] {1, null, 1}, measurementSchemas); + } else if (i <= 20000) { + writableMemChunk.writeAlignedPoints(i, new Object[] {null, null, 2}, measurementSchemas); + } else if (i <= 30000) { + writableMemChunk.writeAlignedPoints(i, new Object[] {3, null, null}, measurementSchemas); + } else { + writableMemChunk.writeAlignedPoints(i, new Object[] {4, 4, 4}, measurementSchemas); + } + } + List<BitMap> bitMaps = new ArrayList<>(); + long[] timestamps = + writableMemChunk.getAnySatisfiedTimestamp( + Arrays.asList( + Collections.emptyList(), Collections.emptyList(), Collections.emptyList()), + bitMaps, + false, + null); + Assert.assertEquals(1, timestamps.length); + Assert.assertEquals(0, timestamps[0]); + Assert.assertFalse(bitMaps.get(0).isMarked(0)); + Assert.assertTrue(bitMaps.get(0).isMarked(1)); + Assert.assertFalse(bitMaps.get(0).isMarked(2)); + + bitMaps = new ArrayList<>(); + timestamps = + writableMemChunk.getAnySatisfiedTimestamp( + Arrays.asList( + Collections.emptyList(), + Collections.emptyList(), + Collections.singletonList(new TimeRange(0, 12000))), + bitMaps, + false, + new TimeFilterOperators.TimeGt(11000)); + + Assert.assertEquals(1, timestamps.length); + Assert.assertEquals(11001, timestamps[0]); + Assert.assertTrue(bitMaps.get(0).isMarked(0)); + Assert.assertTrue(bitMaps.get(0).isMarked(1)); + Assert.assertTrue(bitMaps.get(0).isMarked(2)); + } + + @Test + public void testNonAlignedWritableMemChunkRegionScan() { + MeasurementSchema measurementSchema = new MeasurementSchema("s1", TSDataType.INT32); + WritableMemChunk writableMemChunk = new WritableMemChunk(measurementSchema); + int size = 100000; + for (int i = 0; i < size; i++) { + writableMemChunk.writeNonAlignedPoint(i, i); + } + Optional<Long> timestamp = writableMemChunk.getAnySatisfiedTimestamp(null, null); + Assert.assertTrue(timestamp.isPresent()); + Assert.assertEquals(0, timestamp.get().longValue()); + + timestamp = + writableMemChunk.getAnySatisfiedTimestamp( + null, new TimeFilterOperators.TimeBetweenAnd(1000L, 2000L)); + Assert.assertTrue(timestamp.isPresent()); + Assert.assertEquals(1000, timestamp.get().longValue()); + + timestamp = + writableMemChunk.getAnySatisfiedTimestamp( + Collections.singletonList(new TimeRange(1, 1500)), + new TimeFilterOperators.TimeBetweenAnd(1000L, 2000L)); + Assert.assertTrue(timestamp.isPresent()); + Assert.assertEquals(1501, timestamp.get().longValue()); + + timestamp = + writableMemChunk.getAnySatisfiedTimestamp( + Collections.singletonList(new TimeRange(1, 1500)), + new TimeFilterOperators.TimeBetweenAnd(100000L, 200000L)); + Assert.assertFalse(timestamp.isPresent()); + } +}
