This is an automated email from the ASF dual-hosted git repository.
jackietien pushed a commit to branch dev/1.3
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/dev/1.3 by this push:
new 9be174b795b [to dev/1.3] Optimize memtable region scan #16891
9be174b795b is described below
commit 9be174b795b8f03c09a94fdc2635b117b9ce80f2
Author: shuwenwei <[email protected]>
AuthorDate: Wed Dec 10 14:18:55 2025 +0800
[to dev/1.3] Optimize memtable region scan #16891
---
.../db/storageengine/dataregion/DataRegion.java | 6 +-
.../dataregion/memtable/AbstractMemTable.java | 85 ++++--
.../memtable/AlignedWritableMemChunk.java | 114 ++++++--
.../dataregion/memtable/IMemTable.java | 6 +-
.../dataregion/memtable/TsFileProcessor.java | 18 +-
.../dataregion/memtable/WritableMemChunk.java | 52 ++--
.../memtable/WritableMemChunkRegionScanTest.java | 299 +++++++++++++++++++++
7 files changed, 494 insertions(+), 86 deletions(-)
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/DataRegion.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/DataRegion.java
index 1856a5448b3..8ce2185d9ec 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/DataRegion.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/DataRegion.java
@@ -2161,7 +2161,8 @@ public class DataRegion implements IDataRegionForQuery {
} else {
tsFileResource
.getProcessor()
- .queryForSeriesRegionScanWithoutLock(partialPaths, context,
fileScanHandles);
+ .queryForSeriesRegionScanWithoutLock(
+ partialPaths, context, fileScanHandles, globalTimeFilter);
}
}
return fileScanHandles;
@@ -2238,7 +2239,8 @@ public class DataRegion implements IDataRegionForQuery {
} else {
tsFileResource
.getProcessor()
- .queryForDeviceRegionScanWithoutLock(devicePathsToContext,
context, fileScanHandles);
+ .queryForDeviceRegionScanWithoutLock(
+ devicePathsToContext, context, fileScanHandles,
globalTimeFilter);
}
}
return fileScanHandles;
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/AbstractMemTable.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/AbstractMemTable.java
index dcae00700ed..9d1078a471e 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/AbstractMemTable.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/AbstractMemTable.java
@@ -67,6 +67,7 @@ import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Objects;
+import java.util.Optional;
import java.util.concurrent.atomic.AtomicLong;
import java.util.stream.Collectors;
@@ -451,7 +452,8 @@ public abstract class AbstractMemTable implements IMemTable
{
long ttlLowerBound,
Map<String, List<IChunkMetadata>> chunkMetaDataMap,
Map<String, List<IChunkHandle>> memChunkHandleMap,
- List<Pair<Modification, IMemTable>> modsToMemTabled) {
+ List<Pair<Modification, IMemTable>> modsToMemTabled,
+ Filter globalTimeFilter) {
IDeviceID deviceID =
DeviceIDFactory.getInstance().getDeviceID(fullPath.getDevicePath());
@@ -469,7 +471,12 @@ public abstract class AbstractMemTable implements
IMemTable {
(MeasurementPath) fullPath, this, modsToMemTabled,
ttlLowerBound);
}
getMemChunkHandleFromMemTable(
- deviceID, measurementId, chunkMetaDataMap, memChunkHandleMap,
deletionList);
+ deviceID,
+ measurementId,
+ chunkMetaDataMap,
+ memChunkHandleMap,
+ deletionList,
+ globalTimeFilter);
} else {
if (!memTableMap.containsKey(deviceID)) {
return;
@@ -486,7 +493,8 @@ public abstract class AbstractMemTable implements IMemTable
{
((AlignedPath) fullPath).getSchemaList(),
chunkMetaDataMap,
memChunkHandleMap,
- deletionList);
+ deletionList,
+ globalTimeFilter);
}
}
@@ -497,7 +505,8 @@ public abstract class AbstractMemTable implements IMemTable
{
long ttlLowerBound,
Map<String, List<IChunkMetadata>> chunkMetadataMap,
Map<String, List<IChunkHandle>> memChunkHandleMap,
- List<Pair<Modification, IMemTable>> modsToMemTabled)
+ List<Pair<Modification, IMemTable>> modsToMemTabled,
+ Filter globalTimeFilter)
throws IllegalPathException {
Map<IDeviceID, IWritableMemChunkGroup> memTableMap = getMemTableMap();
@@ -515,7 +524,8 @@ public abstract class AbstractMemTable implements IMemTable
{
chunkMetadataMap,
memChunkHandleMap,
ttlLowerBound,
- modsToMemTabled);
+ modsToMemTabled,
+ globalTimeFilter);
} else {
getMemChunkHandleFromMemTable(
deviceID,
@@ -523,7 +533,8 @@ public abstract class AbstractMemTable implements IMemTable
{
chunkMetadataMap,
memChunkHandleMap,
ttlLowerBound,
- modsToMemTabled);
+ modsToMemTabled,
+ globalTimeFilter);
}
}
@@ -532,24 +543,30 @@ public abstract class AbstractMemTable implements
IMemTable {
String measurementId,
Map<String, List<IChunkMetadata>> chunkMetadataMap,
Map<String, List<IChunkHandle>> memChunkHandleMap,
- List<TimeRange> deletionList) {
+ List<TimeRange> deletionList,
+ Filter globalTimeFilter) {
WritableMemChunk memChunk =
(WritableMemChunk)
memTableMap.get(deviceID).getMemChunkMap().get(measurementId);
- long[] timestamps = memChunk.getFilteredTimestamp(deletionList);
+ if (memChunk == null) {
+ return;
+ }
+ Optional<Long> anySatisfiedTimestamp =
+ memChunk.getAnySatisfiedTimestamp(deletionList, globalTimeFilter);
+ if (!anySatisfiedTimestamp.isPresent()) {
+ return;
+ }
+ long satisfiedTimestamp = anySatisfiedTimestamp.get();
chunkMetadataMap
.computeIfAbsent(measurementId, k -> new ArrayList<>())
.add(
- buildChunkMetaDataForMemoryChunk(
- measurementId,
- timestamps[0],
- timestamps[timestamps.length - 1],
- Collections.emptyList()));
+ buildFakeChunkMetaDataForFakeMemoryChunk(
+ measurementId, satisfiedTimestamp, satisfiedTimestamp,
Collections.emptyList()));
memChunkHandleMap
.computeIfAbsent(measurementId, k -> new ArrayList<>())
- .add(new MemChunkHandleImpl(deviceID, measurementId, timestamps));
+ .add(new MemChunkHandleImpl(deviceID, measurementId, new long[]
{satisfiedTimestamp}));
}
private void getMemAlignedChunkHandleFromMemTable(
@@ -557,7 +574,8 @@ public abstract class AbstractMemTable implements IMemTable
{
List<IMeasurementSchema> schemaList,
Map<String, List<IChunkMetadata>> chunkMetadataList,
Map<String, List<IChunkHandle>> memChunkHandleMap,
- List<List<TimeRange>> deletionList) {
+ List<List<TimeRange>> deletionList,
+ Filter globalTimeFilter) {
AlignedWritableMemChunk alignedMemChunk =
((AlignedWritableMemChunkGroup)
memTableMap.get(deviceID)).getAlignedMemChunk();
@@ -574,7 +592,11 @@ public abstract class AbstractMemTable implements
IMemTable {
}
List<BitMap> bitMaps = new ArrayList<>();
- long[] timestamps = alignedMemChunk.getFilteredTimestamp(deletionList,
bitMaps);
+ long[] timestamps =
+ alignedMemChunk.getAnySatisfiedTimestamp(deletionList, bitMaps,
globalTimeFilter);
+ if (timestamps.length == 0) {
+ return;
+ }
buildAlignedMemChunkHandle(
deviceID,
@@ -592,7 +614,8 @@ public abstract class AbstractMemTable implements IMemTable
{
Map<String, List<IChunkMetadata>> chunkMetadataList,
Map<String, List<IChunkHandle>> memChunkHandleMap,
long ttlLowerBound,
- List<Pair<Modification, IMemTable>> modsToMemTabled)
+ List<Pair<Modification, IMemTable>> modsToMemTabled,
+ Filter globalTimeFilter)
throws IllegalPathException {
AlignedWritableMemChunk memChunk =
writableMemChunkGroup.getAlignedMemChunk();
@@ -611,7 +634,10 @@ public abstract class AbstractMemTable implements
IMemTable {
}
List<BitMap> bitMaps = new ArrayList<>();
- long[] timestamps = memChunk.getFilteredTimestamp(deletionList, bitMaps);
+ long[] timestamps = memChunk.getAnySatisfiedTimestamp(deletionList,
bitMaps, globalTimeFilter);
+ if (timestamps.length == 0) {
+ return;
+ }
buildAlignedMemChunkHandle(
deviceID,
timestamps,
@@ -628,7 +654,8 @@ public abstract class AbstractMemTable implements IMemTable
{
Map<String, List<IChunkMetadata>> chunkMetadataMap,
Map<String, List<IChunkHandle>> memChunkHandleMap,
long ttlLowerBound,
- List<Pair<Modification, IMemTable>> modsToMemTabled)
+ List<Pair<Modification, IMemTable>> modsToMemTabled,
+ Filter globalTimeFilter)
throws IllegalPathException {
for (Entry<String, IWritableMemChunk> entry :
@@ -646,18 +673,20 @@ public abstract class AbstractMemTable implements
IMemTable {
modsToMemTabled,
ttlLowerBound);
}
- long[] timestamps = writableMemChunk.getFilteredTimestamp(deletionList);
+ Optional<Long> anySatisfiedTimestamp =
+ writableMemChunk.getAnySatisfiedTimestamp(deletionList,
globalTimeFilter);
+ if (!anySatisfiedTimestamp.isPresent()) {
+ return;
+ }
+ long satisfiedTimestamp = anySatisfiedTimestamp.get();
chunkMetadataMap
.computeIfAbsent(measurementId, k -> new ArrayList<>())
.add(
- buildChunkMetaDataForMemoryChunk(
- measurementId,
- timestamps[0],
- timestamps[timestamps.length - 1],
- Collections.emptyList()));
+ buildFakeChunkMetaDataForFakeMemoryChunk(
+ measurementId, satisfiedTimestamp, satisfiedTimestamp,
Collections.emptyList()));
memChunkHandleMap
.computeIfAbsent(measurementId, k -> new ArrayList<>())
- .add(new MemChunkHandleImpl(deviceID, measurementId, timestamps));
+ .add(new MemChunkHandleImpl(deviceID, measurementId, new long[]
{satisfiedTimestamp}));
}
}
@@ -681,7 +710,7 @@ public abstract class AbstractMemTable implements IMemTable
{
chunkMetadataList
.computeIfAbsent(measurement, k -> new ArrayList<>())
.add(
- buildChunkMetaDataForMemoryChunk(
+ buildFakeChunkMetaDataForFakeMemoryChunk(
measurement, startEndTime[0], startEndTime[1], deletion));
chunkHandleMap
.computeIfAbsent(measurement, k -> new ArrayList<>())
@@ -712,7 +741,7 @@ public abstract class AbstractMemTable implements IMemTable
{
return new long[] {startTime, endTime};
}
- private IChunkMetadata buildChunkMetaDataForMemoryChunk(
+ private IChunkMetadata buildFakeChunkMetaDataForFakeMemoryChunk(
String measurement, long startTime, long endTime, List<TimeRange>
deletionList) {
TimeStatistics timeStatistics = new TimeStatistics();
timeStatistics.setStartTime(startTime);
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/AlignedWritableMemChunk.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/AlignedWritableMemChunk.java
index bca23f4df1f..88d55e5bdf8 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/AlignedWritableMemChunk.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/AlignedWritableMemChunk.java
@@ -31,6 +31,7 @@ import org.apache.iotdb.db.utils.datastructure.TVList;
import org.apache.tsfile.enums.TSDataType;
import org.apache.tsfile.read.common.TimeRange;
+import org.apache.tsfile.read.filter.basic.Filter;
import org.apache.tsfile.utils.Binary;
import org.apache.tsfile.utils.BitMap;
import org.apache.tsfile.utils.Pair;
@@ -52,6 +53,7 @@ import java.util.Objects;
import java.util.Set;
import java.util.TreeMap;
import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.atomic.AtomicInteger;
import static org.apache.iotdb.db.utils.ModificationUtils.isPointDeleted;
@@ -712,29 +714,89 @@ public class AlignedWritableMemChunk extends
AbstractWritableMemChunk {
return new Pair<>(reorderedColumnValues, reorderedBitMaps);
}
- private void filterDeletedTimeStamp(
+ public long[] getAnySatisfiedTimestamp(
+ List<List<TimeRange>> deletionList, List<BitMap> bitMaps, Filter
globalTimeFilter) {
+ BitMap columnHasNonNullValue = new BitMap(schemaList.size());
+ AtomicInteger hasNonNullValueColumnCount = new AtomicInteger(0);
+ Map<Long, BitMap> timestampWithBitmap = new TreeMap<>();
+
+ getAnySatisfiedTimestamp(
+ list,
+ deletionList,
+ timestampWithBitmap,
+ globalTimeFilter,
+ columnHasNonNullValue,
+ hasNonNullValueColumnCount);
+ for (int i = 0;
+ i < sortedList.size() && hasNonNullValueColumnCount.get() <
schemaList.size();
+ i++) {
+ getAnySatisfiedTimestamp(
+ sortedList.get(i),
+ deletionList,
+ timestampWithBitmap,
+ globalTimeFilter,
+ columnHasNonNullValue,
+ hasNonNullValueColumnCount);
+ }
+
+ long[] timestamps = new long[timestampWithBitmap.size()];
+ int idx = 0;
+ for (Map.Entry<Long, BitMap> entry : timestampWithBitmap.entrySet()) {
+ timestamps[idx++] = entry.getKey();
+ bitMaps.add(entry.getValue());
+ }
+ return timestamps;
+ }
+
+ private void getAnySatisfiedTimestamp(
AlignedTVList alignedTVList,
List<List<TimeRange>> valueColumnsDeletionList,
- Map<Long, BitMap> timestampWithBitmap) {
+ Map<Long, BitMap> timestampWithBitmap,
+ Filter globalTimeFilter,
+ BitMap columnHasNonNullValue,
+ AtomicInteger hasNonNullValueColumnCount) {
+ if (globalTimeFilter != null
+ && !globalTimeFilter.satisfyStartEndTime(
+ alignedTVList.getMinTime(), alignedTVList.getMaxTime())) {
+ return;
+ }
BitMap allValueColDeletedMap = alignedTVList.getAllValueColDeletedMap();
-
int rowCount = alignedTVList.rowCount();
List<int[]> valueColumnDeleteCursor = new ArrayList<>();
if (valueColumnsDeletionList != null) {
valueColumnsDeletionList.forEach(x -> valueColumnDeleteCursor.add(new
int[] {0}));
}
+ // example:
+ // globalTimeFilter:null, ignoreAllNullRows: true
+ // tvList:
+ // time s1 s2 s3
+ // 1 1 null null
+ // 2 null 1 null
+ // 2 1 1 null
+ // 3 1 null null
+ // 4 1 null 1
+ // timestampWithBitmap:
+ // timestamp: 1 bitmap: 011
+ // timestamp: 2 bitmap: 101
+ // timestamp: 4 bitmap: 110
for (int row = 0; row < rowCount; row++) {
// the row is deleted
if (allValueColDeletedMap != null &&
allValueColDeletedMap.isMarked(row)) {
continue;
}
long timestamp = alignedTVList.getTime(row);
+ if (globalTimeFilter != null && !globalTimeFilter.satisfy(timestamp,
null)) {
+ continue;
+ }
+
+ // Note that this method will only perform bitmap unmarking on the first
occurrence of a
+ // non-null value in multiple timestamps for the same column.
+ BitMap currentRowNullValueBitmap = null;
- BitMap bitMap = new BitMap(schemaList.size());
for (int column = 0; column < schemaList.size(); column++) {
if (alignedTVList.isNullValue(alignedTVList.getValueIndex(row),
column)) {
- bitMap.mark(column);
+ continue;
}
// skip deleted row
@@ -744,32 +806,36 @@ public class AlignedWritableMemChunk extends
AbstractWritableMemChunk {
timestamp,
valueColumnsDeletionList.get(column),
valueColumnDeleteCursor.get(column))) {
- bitMap.mark(column);
- }
-
- // skip all-null row
- if (bitMap.isAllMarked()) {
continue;
}
- timestampWithBitmap.put(timestamp, bitMap);
+ if (!columnHasNonNullValue.isMarked(column)) {
+ hasNonNullValueColumnCount.incrementAndGet();
+ columnHasNonNullValue.mark(column);
+ currentRowNullValueBitmap =
+ currentRowNullValueBitmap != null
+ ? currentRowNullValueBitmap
+ : timestampWithBitmap.computeIfAbsent(
+ timestamp, k -> getAllMarkedBitmap(schemaList.size()));
+ currentRowNullValueBitmap.unmark(column);
+ }
}
- }
- }
- public long[] getFilteredTimestamp(List<List<TimeRange>> deletionList,
List<BitMap> bitMaps) {
- Map<Long, BitMap> timestampWithBitmap = new TreeMap<>();
+ if (currentRowNullValueBitmap == null) {
+ continue;
+ }
+ // found new column with non-null value
+ timestampWithBitmap.put(timestamp, currentRowNullValueBitmap);
- filterDeletedTimeStamp(list, deletionList, timestampWithBitmap);
- for (AlignedTVList alignedTVList : sortedList) {
- filterDeletedTimeStamp(alignedTVList, deletionList, timestampWithBitmap);
+ if (hasNonNullValueColumnCount.get() == schemaList.size()) {
+ return;
+ }
}
+ }
- List<Long> filteredTimestamps = new ArrayList<>();
- for (Map.Entry<Long, BitMap> entry : timestampWithBitmap.entrySet()) {
- filteredTimestamps.add(entry.getKey());
- bitMaps.add(entry.getValue());
- }
- return filteredTimestamps.stream().mapToLong(Long::valueOf).toArray();
+ private BitMap getAllMarkedBitmap(int size) {
+ BitMap bitMap = new BitMap(size);
+ bitMap.markAll();
+ return bitMap;
}
// Choose maximum avgPointSizeOfLargestColumn among working and sorted
AlignedTVList as
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/IMemTable.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/IMemTable.java
index 3a7e9c55092..cd1e9f14432 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/IMemTable.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/IMemTable.java
@@ -125,7 +125,8 @@ public interface IMemTable extends WALEntryValue {
long ttlLowerBound,
Map<String, List<IChunkMetadata>> chunkMetadataMap,
Map<String, List<IChunkHandle>> memChunkHandleMap,
- List<Pair<Modification, IMemTable>> modsToMemtabled)
+ List<Pair<Modification, IMemTable>> modsToMemtabled,
+ Filter globalTimeFilter)
throws IOException, QueryProcessException, MetadataException;
void queryForDeviceRegionScan(
@@ -134,7 +135,8 @@ public interface IMemTable extends WALEntryValue {
long ttlLowerBound,
Map<String, List<IChunkMetadata>> chunkMetadataMap,
Map<String, List<IChunkHandle>> memChunkHandleMap,
- List<Pair<Modification, IMemTable>> modsToMemtabled)
+ List<Pair<Modification, IMemTable>> modsToMemtabled,
+ Filter globalTimeFilter)
throws IOException, QueryProcessException, MetadataException;
/** putBack all the memory resources. */
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/TsFileProcessor.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/TsFileProcessor.java
index a508ec48a88..6bbcb427f87 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/TsFileProcessor.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/TsFileProcessor.java
@@ -1826,7 +1826,8 @@ public class TsFileProcessor {
public void queryForSeriesRegionScanWithoutLock(
List<PartialPath> pathList,
QueryContext queryContext,
- List<IFileScanHandle> fileScanHandlesForQuery) {
+ List<IFileScanHandle> fileScanHandlesForQuery,
+ Filter globalTimeFilter) {
long startTime = System.nanoTime();
try {
Map<IDeviceID, Map<String, List<IChunkHandle>>>
deviceToMemChunkHandleMap = new HashMap<>();
@@ -1846,7 +1847,8 @@ public class TsFileProcessor {
timeLowerBound,
measurementToChunkMetaList,
measurementToChunkHandleList,
- modsToMemtable);
+ modsToMemtable,
+ globalTimeFilter);
}
if (workMemTable != null) {
workMemTable.queryForSeriesRegionScan(
@@ -1854,7 +1856,8 @@ public class TsFileProcessor {
timeLowerBound,
measurementToChunkMetaList,
measurementToChunkHandleList,
- null);
+ null,
+ globalTimeFilter);
}
IDeviceID deviceID =
DeviceIDFactory.getInstance().getDeviceID(seriesPath.getDevice());
// Some memTable have been flushed already, so we need to get the
chunk metadata from
@@ -1905,7 +1908,8 @@ public class TsFileProcessor {
public void queryForDeviceRegionScanWithoutLock(
Map<IDeviceID, DeviceContext> devicePathsToContext,
QueryContext queryContext,
- List<IFileScanHandle> fileScanHandlesForQuery) {
+ List<IFileScanHandle> fileScanHandlesForQuery,
+ Filter globalTimeFilter) {
long startTime = System.nanoTime();
try {
Map<IDeviceID, Map<String, List<IChunkHandle>>>
deviceToMemChunkHandleMap = new HashMap<>();
@@ -1930,7 +1934,8 @@ public class TsFileProcessor {
timeLowerBound,
measurementToChunkMetadataList,
measurementToMemChunkHandleList,
- modsToMemtable);
+ modsToMemtable,
+ globalTimeFilter);
}
if (workMemTable != null) {
workMemTable.queryForDeviceRegionScan(
@@ -1939,7 +1944,8 @@ public class TsFileProcessor {
timeLowerBound,
measurementToChunkMetadataList,
measurementToMemChunkHandleList,
- null);
+ null,
+ globalTimeFilter);
}
buildChunkHandleForFlushedMemTable(
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/WritableMemChunk.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/WritableMemChunk.java
index 01e91387612..c19ad4f5a22 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/WritableMemChunk.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/WritableMemChunk.java
@@ -30,6 +30,7 @@ import org.apache.iotdb.db.utils.datastructure.TVList;
import org.apache.tsfile.enums.TSDataType;
import org.apache.tsfile.read.TimeValuePair;
import org.apache.tsfile.read.common.TimeRange;
+import org.apache.tsfile.read.filter.basic.Filter;
import org.apache.tsfile.utils.Binary;
import org.apache.tsfile.utils.BitMap;
import org.apache.tsfile.write.UnSupportedDataTypeException;
@@ -43,10 +44,9 @@ import java.io.DataInputStream;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.ArrayList;
-import java.util.Arrays;
import java.util.List;
+import java.util.Optional;
import java.util.concurrent.BlockingQueue;
-import java.util.stream.Collectors;
import static org.apache.iotdb.db.utils.MemUtils.getBinarySize;
@@ -554,11 +554,30 @@ public class WritableMemChunk extends
AbstractWritableMemChunk {
return sortedList;
}
- private void filterDeletedTimestamp(
- TVList tvlist, List<TimeRange> deletionList, List<Long> timestampList) {
- long lastTime = Long.MIN_VALUE;
+ public Optional<Long> getAnySatisfiedTimestamp(
+ List<TimeRange> deletionList, Filter globalTimeFilter) {
+ Optional<Long> anySatisfiedTimestamp =
+ getAnySatisfiedTimestamp(list, deletionList, globalTimeFilter);
+ if (anySatisfiedTimestamp.isPresent()) {
+ return anySatisfiedTimestamp;
+ }
+ for (TVList tvList : sortedList) {
+ anySatisfiedTimestamp = getAnySatisfiedTimestamp(tvList, deletionList,
globalTimeFilter);
+ if (anySatisfiedTimestamp.isPresent()) {
+ break;
+ }
+ }
+ return anySatisfiedTimestamp;
+ }
+
+ private Optional<Long> getAnySatisfiedTimestamp(
+ TVList tvlist, List<TimeRange> deletionList, Filter globalTimeFilter) {
int[] deletionCursor = {0};
int rowCount = tvlist.rowCount();
+ if (globalTimeFilter != null
+ && !globalTimeFilter.satisfyStartEndTime(tvlist.getMinTime(),
tvlist.getMaxTime())) {
+ return Optional.empty();
+ }
for (int i = 0; i < rowCount; i++) {
if (tvlist.getBitMap() != null &&
tvlist.isNullValue(tvlist.getValueIndex(i))) {
continue;
@@ -568,26 +587,11 @@ public class WritableMemChunk extends
AbstractWritableMemChunk {
&& ModificationUtils.isPointDeleted(curTime, deletionList,
deletionCursor)) {
continue;
}
-
- if (i == rowCount - 1 || curTime != lastTime) {
- timestampList.add(curTime);
+ if (globalTimeFilter != null && !globalTimeFilter.satisfy(curTime,
null)) {
+ continue;
}
- lastTime = curTime;
+ return Optional.of(curTime);
}
- }
-
- public long[] getFilteredTimestamp(List<TimeRange> deletionList) {
- List<Long> timestampList = new ArrayList<>();
- filterDeletedTimestamp(list, deletionList, timestampList);
- for (TVList tvList : sortedList) {
- filterDeletedTimestamp(tvList, deletionList, timestampList);
- }
-
- // remove duplicated time
- List<Long> distinctTimestamps =
timestampList.stream().distinct().collect(Collectors.toList());
- // sort timestamps
- long[] filteredTimestamps =
distinctTimestamps.stream().mapToLong(Long::longValue).toArray();
- Arrays.sort(filteredTimestamps);
- return filteredTimestamps;
+ return Optional.empty();
}
}
diff --git
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/memtable/WritableMemChunkRegionScanTest.java
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/memtable/WritableMemChunkRegionScanTest.java
new file mode 100644
index 00000000000..63bbac18e38
--- /dev/null
+++
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/memtable/WritableMemChunkRegionScanTest.java
@@ -0,0 +1,299 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.storageengine.dataregion.memtable;
+
+import org.apache.iotdb.commons.exception.IllegalPathException;
+import org.apache.iotdb.commons.path.AlignedPath;
+import org.apache.iotdb.commons.path.MeasurementPath;
+import org.apache.iotdb.db.conf.IoTDBDescriptor;
+import org.apache.iotdb.db.storageengine.dataregion.read.filescan.IChunkHandle;
+
+import org.apache.tsfile.enums.TSDataType;
+import org.apache.tsfile.file.metadata.PlainDeviceID;
+import org.apache.tsfile.read.common.TimeRange;
+import org.apache.tsfile.read.filter.operator.TimeFilterOperators;
+import org.apache.tsfile.utils.BitMap;
+import org.apache.tsfile.write.schema.IMeasurementSchema;
+import org.apache.tsfile.write.schema.MeasurementSchema;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.stream.Collectors;
+
+@RunWith(Parameterized.class)
+public class WritableMemChunkRegionScanTest {
+
+ @Parameterized.Parameters(name = "{0}")
+ public static Collection<Object[]> data() {
+ return Arrays.asList(new Object[][] {{0}, {1000}, {10000}, {20000}});
+ }
+
+ private int defaultTvListThreshold;
+ private int tvListSortThreshold;
+
+ public WritableMemChunkRegionScanTest(int tvListSortThreshold) {
+ this.tvListSortThreshold = tvListSortThreshold;
+ }
+
+ @Before
+ public void setup() {
+ defaultTvListThreshold =
IoTDBDescriptor.getInstance().getConfig().getTvListSortThreshold();
+
IoTDBDescriptor.getInstance().getConfig().setTVListSortThreshold(tvListSortThreshold);
+ }
+
+ @After
+ public void tearDown() {
+
IoTDBDescriptor.getInstance().getConfig().setTVListSortThreshold(defaultTvListThreshold);
+ }
+
+ @Test
+ public void testAlignedWritableMemChunkRegionScan() throws
IllegalPathException {
+ PrimitiveMemTable memTable = new PrimitiveMemTable("root.test", "0");
+ try {
+ List<IMeasurementSchema> measurementSchemas =
+ Arrays.asList(
+ new MeasurementSchema("s1", TSDataType.INT32),
+ new MeasurementSchema("s2", TSDataType.INT32),
+ new MeasurementSchema("s3", TSDataType.INT32));
+ AlignedWritableMemChunk writableMemChunk = null;
+ int size = 100000;
+ for (int i = 0; i < size; i++) {
+ if (i <= 10000) {
+ memTable.writeAlignedRow(
+ new PlainDeviceID("root.test.d1"), measurementSchemas, i, new
Object[] {1, null, 1});
+ } else if (i <= 20000) {
+ memTable.writeAlignedRow(
+ new PlainDeviceID("root.test.d1"),
+ measurementSchemas,
+ i,
+ new Object[] {null, null, 2});
+ } else if (i <= 30000) {
+ memTable.writeAlignedRow(
+ new PlainDeviceID("root.test.d1"),
+ measurementSchemas,
+ i,
+ new Object[] {3, null, null});
+ } else {
+ memTable.writeAlignedRow(
+ new PlainDeviceID("root.test.d1"), measurementSchemas, i, new
Object[] {4, 4, 4});
+ }
+ }
+ writableMemChunk =
+ (AlignedWritableMemChunk)
+ memTable.getWritableMemChunk(new PlainDeviceID("root.test.d1"),
"");
+ List<BitMap> bitMaps = new ArrayList<>();
+ long[] timestamps =
+ writableMemChunk.getAnySatisfiedTimestamp(
+ Arrays.asList(
+ Collections.emptyList(), Collections.emptyList(),
Collections.emptyList()),
+ bitMaps,
+ null);
+ Assert.assertEquals(2, timestamps.length);
+ Assert.assertEquals(0, timestamps[0]);
+ Assert.assertFalse(bitMaps.get(0).isMarked(0));
+ Assert.assertTrue(bitMaps.get(0).isMarked(1));
+ Assert.assertFalse(bitMaps.get(0).isMarked(2));
+ Assert.assertTrue(bitMaps.get(1).isMarked(0));
+ Assert.assertFalse(bitMaps.get(1).isMarked(1));
+ Assert.assertTrue(bitMaps.get(1).isMarked(2));
+ Assert.assertEquals(30001, timestamps[1]);
+
+ bitMaps = new ArrayList<>();
+ timestamps =
+ writableMemChunk.getAnySatisfiedTimestamp(
+ Arrays.asList(
+ Collections.emptyList(),
+ Collections.emptyList(),
+ Collections.singletonList(new TimeRange(0, 12000))),
+ bitMaps,
+ new TimeFilterOperators.TimeGt(10000000));
+ Assert.assertEquals(0, timestamps.length);
+
+ bitMaps = new ArrayList<>();
+ timestamps =
+ writableMemChunk.getAnySatisfiedTimestamp(
+ Arrays.asList(
+ Collections.emptyList(),
+ Collections.emptyList(),
+ Collections.singletonList(new TimeRange(0, 12000))),
+ bitMaps,
+ new TimeFilterOperators.TimeGt(11000));
+
+ Assert.assertEquals(3, timestamps.length);
+ Assert.assertEquals(12001, timestamps[0]);
+ Assert.assertTrue(bitMaps.get(0).isMarked(0));
+ Assert.assertTrue(bitMaps.get(0).isMarked(1));
+ Assert.assertFalse(bitMaps.get(0).isMarked(2));
+ Assert.assertEquals(20001, timestamps[1]);
+ Assert.assertFalse(bitMaps.get(1).isMarked(0));
+ Assert.assertTrue(bitMaps.get(1).isMarked(1));
+ Assert.assertTrue(bitMaps.get(1).isMarked(2));
+ Assert.assertEquals(30001, timestamps[2]);
+ Assert.assertTrue(bitMaps.get(2).isMarked(0));
+ Assert.assertFalse(bitMaps.get(2).isMarked(1));
+ Assert.assertTrue(bitMaps.get(2).isMarked(2));
+
+ writableMemChunk.writeAlignedPoints(
+ 1000001, new Object[] {1, null, null}, measurementSchemas);
+ writableMemChunk.writeAlignedPoints(
+ 1000002, new Object[] {null, 1, null}, measurementSchemas);
+ writableMemChunk.writeAlignedPoints(1000002, new Object[] {1, 1, null},
measurementSchemas);
+ writableMemChunk.writeAlignedPoints(
+ 1000003, new Object[] {1, null, null}, measurementSchemas);
+ writableMemChunk.writeAlignedPoints(1000004, new Object[] {1, null, 1},
measurementSchemas);
+ bitMaps = new ArrayList<>();
+ timestamps =
+ writableMemChunk.getAnySatisfiedTimestamp(
+ Arrays.asList(
+ Collections.emptyList(), Collections.emptyList(),
Collections.emptyList()),
+ bitMaps,
+ new TimeFilterOperators.TimeGt(1000000));
+ Assert.assertEquals(3, timestamps.length);
+ Assert.assertEquals(1000001, timestamps[0]);
+ Assert.assertFalse(bitMaps.get(0).isMarked(0));
+ Assert.assertTrue(bitMaps.get(0).isMarked(1));
+ Assert.assertTrue(bitMaps.get(0).isMarked(2));
+ Assert.assertEquals(1000002, timestamps[1]);
+ Assert.assertTrue(bitMaps.get(1).isMarked(0));
+ Assert.assertFalse(bitMaps.get(1).isMarked(1));
+ Assert.assertTrue(bitMaps.get(1).isMarked(2));
+ Assert.assertEquals(1000004, timestamps[2]);
+ Assert.assertTrue(bitMaps.get(2).isMarked(0));
+ Assert.assertTrue(bitMaps.get(2).isMarked(1));
+ Assert.assertFalse(bitMaps.get(2).isMarked(2));
+
+ Map<String, List<IChunkHandle>> chunkHandleMap = new HashMap<>();
+ memTable.queryForDeviceRegionScan(
+ new PlainDeviceID("root.test.d1"),
+ true,
+ Long.MIN_VALUE,
+ new HashMap<>(),
+ chunkHandleMap,
+ Collections.emptyList(),
+ new TimeFilterOperators.TimeGt(1000000));
+ Assert.assertEquals(3, chunkHandleMap.size());
+ Assert.assertArrayEquals(
+ new long[] {1000001, 1000001},
chunkHandleMap.get("s1").get(0).getPageStatisticsTime());
+ Assert.assertArrayEquals(
+ new long[] {1000002, 1000002},
chunkHandleMap.get("s2").get(0).getPageStatisticsTime());
+ Assert.assertArrayEquals(
+ new long[] {1000004, 1000004},
chunkHandleMap.get("s3").get(0).getPageStatisticsTime());
+
+ memTable.queryForSeriesRegionScan(
+ new AlignedPath(
+ "root.test.d1",
+ measurementSchemas.stream()
+ .map(IMeasurementSchema::getMeasurementId)
+ .collect(Collectors.toList()),
+ measurementSchemas),
+ Long.MIN_VALUE,
+ new HashMap<>(),
+ chunkHandleMap,
+ Collections.emptyList(),
+ new TimeFilterOperators.TimeGt(1000000));
+ Assert.assertEquals(3, chunkHandleMap.size());
+ Assert.assertArrayEquals(
+ new long[] {1000001, 1000001},
chunkHandleMap.get("s1").get(0).getPageStatisticsTime());
+ Assert.assertArrayEquals(
+ new long[] {1000002, 1000002},
chunkHandleMap.get("s2").get(0).getPageStatisticsTime());
+ Assert.assertArrayEquals(
+ new long[] {1000004, 1000004},
chunkHandleMap.get("s3").get(0).getPageStatisticsTime());
+ } finally {
+ memTable.release();
+ }
+ }
+
+ @Test
+ public void testNonAlignedWritableMemChunkRegionScan() throws
IllegalPathException {
+ PrimitiveMemTable memTable = new PrimitiveMemTable("root.test", "0");
+ try {
+ MeasurementSchema measurementSchema = new MeasurementSchema("s1",
TSDataType.INT32);
+ int size = 100000;
+ for (int i = 0; i < size; i++) {
+ memTable.write(
+ new PlainDeviceID("root.test.d1"),
+ Collections.singletonList(measurementSchema),
+ i,
+ new Object[] {i});
+ }
+ WritableMemChunk writableMemChunk =
+ (WritableMemChunk) memTable.getWritableMemChunk(new
PlainDeviceID("root.test.d1"), "s1");
+ Optional<Long> timestamp =
writableMemChunk.getAnySatisfiedTimestamp(null, null);
+ Assert.assertTrue(timestamp.isPresent());
+ Assert.assertEquals(0, timestamp.get().longValue());
+
+ timestamp =
+ writableMemChunk.getAnySatisfiedTimestamp(
+ null, new TimeFilterOperators.TimeBetweenAnd(1000L, 2000L));
+ Assert.assertTrue(timestamp.isPresent());
+ Assert.assertEquals(1000, timestamp.get().longValue());
+
+ timestamp =
+ writableMemChunk.getAnySatisfiedTimestamp(
+ Collections.singletonList(new TimeRange(1, 1500)),
+ new TimeFilterOperators.TimeBetweenAnd(1000L, 2000L));
+ Assert.assertTrue(timestamp.isPresent());
+ Assert.assertEquals(1501, timestamp.get().longValue());
+
+ timestamp =
+ writableMemChunk.getAnySatisfiedTimestamp(
+ Collections.singletonList(new TimeRange(1, 1500)),
+ new TimeFilterOperators.TimeBetweenAnd(100000L, 200000L));
+ Assert.assertFalse(timestamp.isPresent());
+
+ Map<String, List<IChunkHandle>> chunkHandleMap = new HashMap<>();
+ memTable.queryForDeviceRegionScan(
+ new PlainDeviceID("root.test.d1"),
+ false,
+ Long.MIN_VALUE,
+ new HashMap<>(),
+ chunkHandleMap,
+ Collections.emptyList(),
+ new TimeFilterOperators.TimeGt(1));
+ Assert.assertEquals(1, chunkHandleMap.size());
+ Assert.assertArrayEquals(
+ new long[] {2, 2},
chunkHandleMap.get("s1").get(0).getPageStatisticsTime());
+ memTable.queryForSeriesRegionScan(
+ new MeasurementPath("root.test.d1", "s1", measurementSchema),
+ Long.MIN_VALUE,
+ new HashMap<>(),
+ chunkHandleMap,
+ Collections.emptyList(),
+ new TimeFilterOperators.TimeGt(1));
+ Assert.assertEquals(1, chunkHandleMap.size());
+ Assert.assertArrayEquals(
+ new long[] {2, 2},
chunkHandleMap.get("s1").get(0).getPageStatisticsTime());
+ } finally {
+ memTable.release();
+ }
+ }
+}