This is an automated email from the ASF dual-hosted git repository.
haonan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new 80850fb [IOTDB-1420] Fix compaction ttl bug (#3385)
80850fb is described below
commit 80850fb333c20085ac5a438bafee826c37446db5
Author: Jialin Qiao <[email protected]>
AuthorDate: Thu Jun 10 20:57:18 2021 -0500
[IOTDB-1420] Fix compaction ttl bug (#3385)
---
.../level/LevelCompactionTsFileManagement.java | 7 +
.../engine/compaction/utils/CompactionUtils.java | 280 +++++++++++----------
2 files changed, 149 insertions(+), 138 deletions(-)
diff --git
a/server/src/main/java/org/apache/iotdb/db/engine/compaction/level/LevelCompactionTsFileManagement.java
b/server/src/main/java/org/apache/iotdb/db/engine/compaction/level/LevelCompactionTsFileManagement.java
index 98f1c2e..afefcd5 100644
---
a/server/src/main/java/org/apache/iotdb/db/engine/compaction/level/LevelCompactionTsFileManagement.java
+++
b/server/src/main/java/org/apache/iotdb/db/engine/compaction/level/LevelCompactionTsFileManagement.java
@@ -660,6 +660,7 @@ public class LevelCompactionTsFileManagement extends
TsFileManagement {
compactionLogger = new CompactionLogger(storageGroupDir,
storageGroupName);
// log source file list and target file for recover
for (TsFileResource mergeResource : mergeResources.get(i)) {
+ mergeResource.setMerging(true);
compactionLogger.logFile(SOURCE_NAME, mergeResource.getTsFile());
}
File newLevelFile =
@@ -844,6 +845,12 @@ public class LevelCompactionTsFileManagement extends
TsFileManagement {
CompactionLogAnalyzer logAnalyzer = new CompactionLogAnalyzer(logFile);
logAnalyzer.analyze();
String targetFilePath = logAnalyzer.getTargetFile();
+ List<String> sourceFileList = logAnalyzer.getSourceFiles();
+ boolean isSeq = logAnalyzer.isSeq();
+ for (String file : sourceFileList) {
+ TsFileResource fileResource = getTsFileResource(file, isSeq);
+ fileResource.setMerging(false);
+ }
if (targetFilePath != null) {
File targetFile = new File(targetFilePath);
if (targetFile.exists()) {
diff --git
a/server/src/main/java/org/apache/iotdb/db/engine/compaction/utils/CompactionUtils.java
b/server/src/main/java/org/apache/iotdb/db/engine/compaction/utils/CompactionUtils.java
index a6aa68c..58e92ed 100644
---
a/server/src/main/java/org/apache/iotdb/db/engine/compaction/utils/CompactionUtils.java
+++
b/server/src/main/java/org/apache/iotdb/db/engine/compaction/utils/CompactionUtils.java
@@ -260,142 +260,92 @@ public class CompactionUtils {
boolean sequence,
List<Modification> modifications)
throws IOException, IllegalPathException {
- RestorableTsFileIOWriter writer = new
RestorableTsFileIOWriter(targetResource.getTsFile());
Map<String, TsFileSequenceReader> tsFileSequenceReaderMap = new
HashMap<>();
- Map<String, List<Modification>> modificationCache = new HashMap<>();
- RateLimiter compactionWriteRateLimiter =
MergeManager.getINSTANCE().getMergeWriteRateLimiter();
- Set<String> tsFileDevicesMap =
- getTsFileDevicesSet(tsFileResources, tsFileSequenceReaderMap,
storageGroup);
- for (String device : tsFileDevicesMap) {
- if (devices.contains(device)) {
- continue;
- }
- writer.startChunkGroup(device);
- Map<TsFileSequenceReader, Map<String, List<ChunkMetadata>>>
chunkMetadataListCacheForMerge =
- new TreeMap<>(
- (o1, o2) ->
- TsFileManagement.compareFileName(
- new File(o1.getFileName()), new File(o2.getFileName())));
- Map<TsFileSequenceReader, Iterator<Map<String, List<ChunkMetadata>>>>
- chunkMetadataListIteratorCache =
- new TreeMap<>(
- (o1, o2) ->
- TsFileManagement.compareFileName(
- new File(o1.getFileName()), new
File(o2.getFileName())));
- for (TsFileResource tsFileResource : tsFileResources) {
- TsFileSequenceReader reader =
- buildReaderFromTsFileResource(tsFileResource,
tsFileSequenceReaderMap, storageGroup);
- if (reader == null) {
- throw new IOException();
- }
- Iterator<Map<String, List<ChunkMetadata>>> iterator =
- reader.getMeasurementChunkMetadataListMapIterator(device);
- chunkMetadataListIteratorCache.put(reader, iterator);
- chunkMetadataListCacheForMerge.put(reader, new TreeMap<>());
- }
- while
(hasNextChunkMetadataList(chunkMetadataListIteratorCache.values())) {
- String lastSensor = null;
- Set<String> allSensors = new HashSet<>();
- for (Entry<TsFileSequenceReader, Map<String, List<ChunkMetadata>>>
- chunkMetadataListCacheForMergeEntry :
chunkMetadataListCacheForMerge.entrySet()) {
- TsFileSequenceReader reader =
chunkMetadataListCacheForMergeEntry.getKey();
- Map<String, List<ChunkMetadata>> sensorChunkMetadataListMap =
- chunkMetadataListCacheForMergeEntry.getValue();
- if (sensorChunkMetadataListMap.size() <= 0) {
- if (chunkMetadataListIteratorCache.get(reader).hasNext()) {
- sensorChunkMetadataListMap =
chunkMetadataListIteratorCache.get(reader).next();
- chunkMetadataListCacheForMerge.put(reader,
sensorChunkMetadataListMap);
- } else {
- continue;
- }
- }
- // get the min last sensor in the current chunkMetadata cache list
for merge
- String maxSensor =
Collections.max(sensorChunkMetadataListMap.keySet());
- if (lastSensor == null) {
- lastSensor = maxSensor;
- } else {
- if (maxSensor.compareTo(lastSensor) < 0) {
- lastSensor = maxSensor;
- }
- }
- // get all sensor used later
- allSensors.addAll(sensorChunkMetadataListMap.keySet());
+ try {
+ RestorableTsFileIOWriter writer = new
RestorableTsFileIOWriter(targetResource.getTsFile());
+ Map<String, List<Modification>> modificationCache = new HashMap<>();
+ RateLimiter compactionWriteRateLimiter =
+ MergeManager.getINSTANCE().getMergeWriteRateLimiter();
+ Set<String> tsFileDevicesMap =
+ getTsFileDevicesSet(tsFileResources, tsFileSequenceReaderMap,
storageGroup);
+ for (String device : tsFileDevicesMap) {
+ if (devices.contains(device)) {
+ continue;
}
-
- for (String sensor : allSensors) {
- if (sensor.compareTo(lastSensor) <= 0) {
- Map<TsFileSequenceReader, List<ChunkMetadata>>
readerChunkMetadataListMap =
+ writer.startChunkGroup(device);
+ Map<TsFileSequenceReader, Map<String, List<ChunkMetadata>>>
chunkMetadataListCacheForMerge =
+ new TreeMap<>(
+ (o1, o2) ->
+ TsFileManagement.compareFileName(
+ new File(o1.getFileName()), new
File(o2.getFileName())));
+ Map<TsFileSequenceReader, Iterator<Map<String, List<ChunkMetadata>>>>
+ chunkMetadataListIteratorCache =
new TreeMap<>(
(o1, o2) ->
TsFileManagement.compareFileName(
new File(o1.getFileName()), new
File(o2.getFileName())));
- // find all chunkMetadata of a sensor
- for (Entry<TsFileSequenceReader, Map<String, List<ChunkMetadata>>>
- chunkMetadataListCacheForMergeEntry :
chunkMetadataListCacheForMerge.entrySet()) {
- TsFileSequenceReader reader =
chunkMetadataListCacheForMergeEntry.getKey();
- Map<String, List<ChunkMetadata>> sensorChunkMetadataListMap =
- chunkMetadataListCacheForMergeEntry.getValue();
- if (sensorChunkMetadataListMap.containsKey(sensor)) {
- readerChunkMetadataListMap.put(reader,
sensorChunkMetadataListMap.get(sensor));
- sensorChunkMetadataListMap.remove(sensor);
+ for (TsFileResource tsFileResource : tsFileResources) {
+ TsFileSequenceReader reader =
+ buildReaderFromTsFileResource(tsFileResource,
tsFileSequenceReaderMap, storageGroup);
+ if (reader == null) {
+ throw new IOException();
+ }
+ Iterator<Map<String, List<ChunkMetadata>>> iterator =
+ reader.getMeasurementChunkMetadataListMapIterator(device);
+ chunkMetadataListIteratorCache.put(reader, iterator);
+ chunkMetadataListCacheForMerge.put(reader, new TreeMap<>());
+ }
+ while
(hasNextChunkMetadataList(chunkMetadataListIteratorCache.values())) {
+ String lastSensor = null;
+ Set<String> allSensors = new HashSet<>();
+ for (Entry<TsFileSequenceReader, Map<String, List<ChunkMetadata>>>
+ chunkMetadataListCacheForMergeEntry :
chunkMetadataListCacheForMerge.entrySet()) {
+ TsFileSequenceReader reader =
chunkMetadataListCacheForMergeEntry.getKey();
+ Map<String, List<ChunkMetadata>> sensorChunkMetadataListMap =
+ chunkMetadataListCacheForMergeEntry.getValue();
+ if (sensorChunkMetadataListMap.size() <= 0) {
+ if (chunkMetadataListIteratorCache.get(reader).hasNext()) {
+ sensorChunkMetadataListMap =
chunkMetadataListIteratorCache.get(reader).next();
+ chunkMetadataListCacheForMerge.put(reader,
sensorChunkMetadataListMap);
+ } else {
+ continue;
}
}
- Entry<String, Map<TsFileSequenceReader, List<ChunkMetadata>>>
- sensorReaderChunkMetadataListEntry =
- new DefaultMapEntry<>(sensor, readerChunkMetadataListMap);
- if (!sequence) {
- writeByDeserializePageMerge(
- device,
- compactionWriteRateLimiter,
- sensorReaderChunkMetadataListEntry,
- targetResource,
- writer,
- modificationCache,
- modifications);
+ // get the min last sensor in the current chunkMetadata cache list
for merge
+ String maxSensor =
Collections.max(sensorChunkMetadataListMap.keySet());
+ if (lastSensor == null) {
+ lastSensor = maxSensor;
} else {
- boolean isChunkEnoughLarge = true;
- boolean isPageEnoughLarge = true;
- for (List<ChunkMetadata> chunkMetadatas :
readerChunkMetadataListMap.values()) {
- for (ChunkMetadata chunkMetadata : chunkMetadatas) {
- if (chunkMetadata.getNumOfPoints()
- < IoTDBDescriptor.getInstance()
- .getConfig()
- .getMergePagePointNumberThreshold()) {
- isPageEnoughLarge = false;
- }
- if (chunkMetadata.getNumOfPoints()
- < IoTDBDescriptor.getInstance()
- .getConfig()
- .getMergeChunkPointNumberThreshold()) {
- isChunkEnoughLarge = false;
- }
+ if (maxSensor.compareTo(lastSensor) < 0) {
+ lastSensor = maxSensor;
+ }
+ }
+ // get all sensor used later
+ allSensors.addAll(sensorChunkMetadataListMap.keySet());
+ }
+
+ for (String sensor : allSensors) {
+ if (sensor.compareTo(lastSensor) <= 0) {
+ Map<TsFileSequenceReader, List<ChunkMetadata>>
readerChunkMetadataListMap =
+ new TreeMap<>(
+ (o1, o2) ->
+ TsFileManagement.compareFileName(
+ new File(o1.getFileName()), new
File(o2.getFileName())));
+ // find all chunkMetadata of a sensor
+ for (Entry<TsFileSequenceReader, Map<String,
List<ChunkMetadata>>>
+ chunkMetadataListCacheForMergeEntry :
chunkMetadataListCacheForMerge.entrySet()) {
+ TsFileSequenceReader reader =
chunkMetadataListCacheForMergeEntry.getKey();
+ Map<String, List<ChunkMetadata>> sensorChunkMetadataListMap =
+ chunkMetadataListCacheForMergeEntry.getValue();
+ if (sensorChunkMetadataListMap.containsKey(sensor)) {
+ readerChunkMetadataListMap.put(reader,
sensorChunkMetadataListMap.get(sensor));
+ sensorChunkMetadataListMap.remove(sensor);
}
}
- // if a chunk is large enough, it's page must be large enough too
- if (isChunkEnoughLarge) {
- logger.debug(
- "{} [Compaction] chunk enough large, use append chunk
merge", storageGroup);
- // append page in chunks, so we do not have to deserialize a
chunk
- writeByAppendChunkMerge(
- device,
- compactionWriteRateLimiter,
- sensorReaderChunkMetadataListEntry,
- targetResource,
- writer);
- } else if (isPageEnoughLarge) {
- logger.debug(
- "{} [Compaction] page enough large, use append page
merge", storageGroup);
- // append page in chunks, so we do not have to deserialize a
chunk
- writeByAppendPageMerge(
- device,
- compactionWriteRateLimiter,
- sensorReaderChunkMetadataListEntry,
- targetResource,
- writer);
- } else {
- logger.debug(
- "{} [Compaction] page too small, use deserialize page
merge", storageGroup);
- // we have to deserialize chunks to merge pages
+ Entry<String, Map<TsFileSequenceReader, List<ChunkMetadata>>>
+ sensorReaderChunkMetadataListEntry =
+ new DefaultMapEntry<>(sensor,
readerChunkMetadataListMap);
+ if (!sequence) {
writeByDeserializePageMerge(
device,
compactionWriteRateLimiter,
@@ -404,27 +354,81 @@ public class CompactionUtils {
writer,
modificationCache,
modifications);
+ } else {
+ boolean isChunkEnoughLarge = true;
+ boolean isPageEnoughLarge = true;
+ for (List<ChunkMetadata> chunkMetadatas :
readerChunkMetadataListMap.values()) {
+ for (ChunkMetadata chunkMetadata : chunkMetadatas) {
+ if (chunkMetadata.getNumOfPoints()
+ < IoTDBDescriptor.getInstance()
+ .getConfig()
+ .getMergePagePointNumberThreshold()) {
+ isPageEnoughLarge = false;
+ }
+ if (chunkMetadata.getNumOfPoints()
+ < IoTDBDescriptor.getInstance()
+ .getConfig()
+ .getMergeChunkPointNumberThreshold()) {
+ isChunkEnoughLarge = false;
+ }
+ }
+ }
+ // if a chunk is large enough, it's page must be large enough
too
+ if (isChunkEnoughLarge) {
+ logger.debug(
+ "{} [Compaction] chunk enough large, use append chunk
merge", storageGroup);
+ // append page in chunks, so we do not have to deserialize a
chunk
+ writeByAppendChunkMerge(
+ device,
+ compactionWriteRateLimiter,
+ sensorReaderChunkMetadataListEntry,
+ targetResource,
+ writer);
+ } else if (isPageEnoughLarge) {
+ logger.debug(
+ "{} [Compaction] page enough large, use append page
merge", storageGroup);
+ // append page in chunks, so we do not have to deserialize a
chunk
+ writeByAppendPageMerge(
+ device,
+ compactionWriteRateLimiter,
+ sensorReaderChunkMetadataListEntry,
+ targetResource,
+ writer);
+ } else {
+ logger.debug(
+ "{} [Compaction] page too small, use deserialize page
merge", storageGroup);
+ // we have to deserialize chunks to merge pages
+ writeByDeserializePageMerge(
+ device,
+ compactionWriteRateLimiter,
+ sensorReaderChunkMetadataListEntry,
+ targetResource,
+ writer,
+ modificationCache,
+ modifications);
+ }
}
}
}
}
+ writer.endChunkGroup();
+ if (compactionLogger != null) {
+ compactionLogger.logDevice(device, writer.getPos());
+ }
}
- writer.endChunkGroup();
- if (compactionLogger != null) {
- compactionLogger.logDevice(device, writer.getPos());
- }
- }
- for (TsFileSequenceReader reader : tsFileSequenceReaderMap.values()) {
- reader.close();
- }
+ for (TsFileResource tsFileResource : tsFileResources) {
+ targetResource.updatePlanIndexes(tsFileResource);
+ }
+ targetResource.serialize();
+ writer.endFile();
+ targetResource.close();
- for (TsFileResource tsFileResource : tsFileResources) {
- targetResource.updatePlanIndexes(tsFileResource);
+ } finally {
+ for (TsFileSequenceReader reader : tsFileSequenceReaderMap.values()) {
+ reader.close();
+ }
}
- targetResource.serialize();
- writer.endFile();
- targetResource.close();
}
private static TsFileSequenceReader buildReaderFromTsFileResource(