This is an automated email from the ASF dual-hosted git repository. qiaojialin pushed a commit to branch fix_compaction_ttl in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit 21f566b07660c58ead6162f4e6dcc3ba92a5bd86 Author: qiaojialin <[email protected]> AuthorDate: Thu Jun 10 17:09:45 2021 +0800 fix compaction ttl bug --- .../level/LevelCompactionTsFileManagement.java | 7 + .../engine/compaction/utils/CompactionUtils.java | 284 +++++++++++---------- 2 files changed, 151 insertions(+), 140 deletions(-) diff --git a/server/src/main/java/org/apache/iotdb/db/engine/compaction/level/LevelCompactionTsFileManagement.java b/server/src/main/java/org/apache/iotdb/db/engine/compaction/level/LevelCompactionTsFileManagement.java index 98f1c2e..afefcd5 100644 --- a/server/src/main/java/org/apache/iotdb/db/engine/compaction/level/LevelCompactionTsFileManagement.java +++ b/server/src/main/java/org/apache/iotdb/db/engine/compaction/level/LevelCompactionTsFileManagement.java @@ -660,6 +660,7 @@ public class LevelCompactionTsFileManagement extends TsFileManagement { compactionLogger = new CompactionLogger(storageGroupDir, storageGroupName); // log source file list and target file for recover for (TsFileResource mergeResource : mergeResources.get(i)) { + mergeResource.setMerging(true); compactionLogger.logFile(SOURCE_NAME, mergeResource.getTsFile()); } File newLevelFile = @@ -844,6 +845,12 @@ public class LevelCompactionTsFileManagement extends TsFileManagement { CompactionLogAnalyzer logAnalyzer = new CompactionLogAnalyzer(logFile); logAnalyzer.analyze(); String targetFilePath = logAnalyzer.getTargetFile(); + List<String> sourceFileList = logAnalyzer.getSourceFiles(); + boolean isSeq = logAnalyzer.isSeq(); + for (String file : sourceFileList) { + TsFileResource fileResource = getTsFileResource(file, isSeq); + fileResource.setMerging(false); + } if (targetFilePath != null) { File targetFile = new File(targetFilePath); if (targetFile.exists()) { diff --git a/server/src/main/java/org/apache/iotdb/db/engine/compaction/utils/CompactionUtils.java b/server/src/main/java/org/apache/iotdb/db/engine/compaction/utils/CompactionUtils.java index a6aa68c..22705af 100644 --- a/server/src/main/java/org/apache/iotdb/db/engine/compaction/utils/CompactionUtils.java +++ b/server/src/main/java/org/apache/iotdb/db/engine/compaction/utils/CompactionUtils.java @@ -260,142 +260,92 @@ public class CompactionUtils { boolean sequence, List<Modification> modifications) throws IOException, IllegalPathException { - RestorableTsFileIOWriter writer = new RestorableTsFileIOWriter(targetResource.getTsFile()); Map<String, TsFileSequenceReader> tsFileSequenceReaderMap = new HashMap<>(); - Map<String, List<Modification>> modificationCache = new HashMap<>(); - RateLimiter compactionWriteRateLimiter = MergeManager.getINSTANCE().getMergeWriteRateLimiter(); - Set<String> tsFileDevicesMap = - getTsFileDevicesSet(tsFileResources, tsFileSequenceReaderMap, storageGroup); - for (String device : tsFileDevicesMap) { - if (devices.contains(device)) { - continue; - } - writer.startChunkGroup(device); - Map<TsFileSequenceReader, Map<String, List<ChunkMetadata>>> chunkMetadataListCacheForMerge = - new TreeMap<>( - (o1, o2) -> - TsFileManagement.compareFileName( - new File(o1.getFileName()), new File(o2.getFileName()))); - Map<TsFileSequenceReader, Iterator<Map<String, List<ChunkMetadata>>>> - chunkMetadataListIteratorCache = - new TreeMap<>( - (o1, o2) -> - TsFileManagement.compareFileName( - new File(o1.getFileName()), new File(o2.getFileName()))); - for (TsFileResource tsFileResource : tsFileResources) { - TsFileSequenceReader reader = - buildReaderFromTsFileResource(tsFileResource, tsFileSequenceReaderMap, storageGroup); - if (reader == null) { - throw new IOException(); + try { + RestorableTsFileIOWriter writer = new RestorableTsFileIOWriter(targetResource.getTsFile()); + Map<String, List<Modification>> modificationCache = new HashMap<>(); + RateLimiter compactionWriteRateLimiter = MergeManager.getINSTANCE() + .getMergeWriteRateLimiter(); + Set<String> tsFileDevicesMap = + getTsFileDevicesSet(tsFileResources, tsFileSequenceReaderMap, storageGroup); + for (String device : tsFileDevicesMap) { + if (devices.contains(device)) { + continue; } - Iterator<Map<String, List<ChunkMetadata>>> iterator = - reader.getMeasurementChunkMetadataListMapIterator(device); - chunkMetadataListIteratorCache.put(reader, iterator); - chunkMetadataListCacheForMerge.put(reader, new TreeMap<>()); - } - while (hasNextChunkMetadataList(chunkMetadataListIteratorCache.values())) { - String lastSensor = null; - Set<String> allSensors = new HashSet<>(); - for (Entry<TsFileSequenceReader, Map<String, List<ChunkMetadata>>> - chunkMetadataListCacheForMergeEntry : chunkMetadataListCacheForMerge.entrySet()) { - TsFileSequenceReader reader = chunkMetadataListCacheForMergeEntry.getKey(); - Map<String, List<ChunkMetadata>> sensorChunkMetadataListMap = - chunkMetadataListCacheForMergeEntry.getValue(); - if (sensorChunkMetadataListMap.size() <= 0) { - if (chunkMetadataListIteratorCache.get(reader).hasNext()) { - sensorChunkMetadataListMap = chunkMetadataListIteratorCache.get(reader).next(); - chunkMetadataListCacheForMerge.put(reader, sensorChunkMetadataListMap); - } else { - continue; - } - } - // get the min last sensor in the current chunkMetadata cache list for merge - String maxSensor = Collections.max(sensorChunkMetadataListMap.keySet()); - if (lastSensor == null) { - lastSensor = maxSensor; - } else { - if (maxSensor.compareTo(lastSensor) < 0) { - lastSensor = maxSensor; - } + writer.startChunkGroup(device); + Map<TsFileSequenceReader, Map<String, List<ChunkMetadata>>> chunkMetadataListCacheForMerge = + new TreeMap<>( + (o1, o2) -> + TsFileManagement.compareFileName( + new File(o1.getFileName()), new File(o2.getFileName()))); + Map<TsFileSequenceReader, Iterator<Map<String, List<ChunkMetadata>>>> + chunkMetadataListIteratorCache = + new TreeMap<>( + (o1, o2) -> + TsFileManagement.compareFileName( + new File(o1.getFileName()), new File(o2.getFileName()))); + for (TsFileResource tsFileResource : tsFileResources) { + TsFileSequenceReader reader = + buildReaderFromTsFileResource(tsFileResource, tsFileSequenceReaderMap, storageGroup); + if (reader == null) { + throw new IOException(); } - // get all sensor used later - allSensors.addAll(sensorChunkMetadataListMap.keySet()); + Iterator<Map<String, List<ChunkMetadata>>> iterator = + reader.getMeasurementChunkMetadataListMapIterator(device); + chunkMetadataListIteratorCache.put(reader, iterator); + chunkMetadataListCacheForMerge.put(reader, new TreeMap<>()); } - - for (String sensor : allSensors) { - if (sensor.compareTo(lastSensor) <= 0) { - Map<TsFileSequenceReader, List<ChunkMetadata>> readerChunkMetadataListMap = - new TreeMap<>( - (o1, o2) -> - TsFileManagement.compareFileName( - new File(o1.getFileName()), new File(o2.getFileName()))); - // find all chunkMetadata of a sensor - for (Entry<TsFileSequenceReader, Map<String, List<ChunkMetadata>>> - chunkMetadataListCacheForMergeEntry : chunkMetadataListCacheForMerge.entrySet()) { - TsFileSequenceReader reader = chunkMetadataListCacheForMergeEntry.getKey(); - Map<String, List<ChunkMetadata>> sensorChunkMetadataListMap = - chunkMetadataListCacheForMergeEntry.getValue(); - if (sensorChunkMetadataListMap.containsKey(sensor)) { - readerChunkMetadataListMap.put(reader, sensorChunkMetadataListMap.get(sensor)); - sensorChunkMetadataListMap.remove(sensor); + while (hasNextChunkMetadataList(chunkMetadataListIteratorCache.values())) { + String lastSensor = null; + Set<String> allSensors = new HashSet<>(); + for (Entry<TsFileSequenceReader, Map<String, List<ChunkMetadata>>> + chunkMetadataListCacheForMergeEntry : chunkMetadataListCacheForMerge.entrySet()) { + TsFileSequenceReader reader = chunkMetadataListCacheForMergeEntry.getKey(); + Map<String, List<ChunkMetadata>> sensorChunkMetadataListMap = + chunkMetadataListCacheForMergeEntry.getValue(); + if (sensorChunkMetadataListMap.size() <= 0) { + if (chunkMetadataListIteratorCache.get(reader).hasNext()) { + sensorChunkMetadataListMap = chunkMetadataListIteratorCache.get(reader).next(); + chunkMetadataListCacheForMerge.put(reader, sensorChunkMetadataListMap); + } else { + continue; } } - Entry<String, Map<TsFileSequenceReader, List<ChunkMetadata>>> - sensorReaderChunkMetadataListEntry = - new DefaultMapEntry<>(sensor, readerChunkMetadataListMap); - if (!sequence) { - writeByDeserializePageMerge( - device, - compactionWriteRateLimiter, - sensorReaderChunkMetadataListEntry, - targetResource, - writer, - modificationCache, - modifications); + // get the min last sensor in the current chunkMetadata cache list for merge + String maxSensor = Collections.max(sensorChunkMetadataListMap.keySet()); + if (lastSensor == null) { + lastSensor = maxSensor; } else { - boolean isChunkEnoughLarge = true; - boolean isPageEnoughLarge = true; - for (List<ChunkMetadata> chunkMetadatas : readerChunkMetadataListMap.values()) { - for (ChunkMetadata chunkMetadata : chunkMetadatas) { - if (chunkMetadata.getNumOfPoints() - < IoTDBDescriptor.getInstance() - .getConfig() - .getMergePagePointNumberThreshold()) { - isPageEnoughLarge = false; - } - if (chunkMetadata.getNumOfPoints() - < IoTDBDescriptor.getInstance() - .getConfig() - .getMergeChunkPointNumberThreshold()) { - isChunkEnoughLarge = false; - } + if (maxSensor.compareTo(lastSensor) < 0) { + lastSensor = maxSensor; + } + } + // get all sensor used later + allSensors.addAll(sensorChunkMetadataListMap.keySet()); + } + + for (String sensor : allSensors) { + if (sensor.compareTo(lastSensor) <= 0) { + Map<TsFileSequenceReader, List<ChunkMetadata>> readerChunkMetadataListMap = + new TreeMap<>( + (o1, o2) -> + TsFileManagement.compareFileName( + new File(o1.getFileName()), new File(o2.getFileName()))); + // find all chunkMetadata of a sensor + for (Entry<TsFileSequenceReader, Map<String, List<ChunkMetadata>>> + chunkMetadataListCacheForMergeEntry : chunkMetadataListCacheForMerge.entrySet()) { + TsFileSequenceReader reader = chunkMetadataListCacheForMergeEntry.getKey(); + Map<String, List<ChunkMetadata>> sensorChunkMetadataListMap = + chunkMetadataListCacheForMergeEntry.getValue(); + if (sensorChunkMetadataListMap.containsKey(sensor)) { + readerChunkMetadataListMap.put(reader, sensorChunkMetadataListMap.get(sensor)); + sensorChunkMetadataListMap.remove(sensor); } } - // if a chunk is large enough, it's page must be large enough too - if (isChunkEnoughLarge) { - logger.debug( - "{} [Compaction] chunk enough large, use append chunk merge", storageGroup); - // append page in chunks, so we do not have to deserialize a chunk - writeByAppendChunkMerge( - device, - compactionWriteRateLimiter, - sensorReaderChunkMetadataListEntry, - targetResource, - writer); - } else if (isPageEnoughLarge) { - logger.debug( - "{} [Compaction] page enough large, use append page merge", storageGroup); - // append page in chunks, so we do not have to deserialize a chunk - writeByAppendPageMerge( - device, - compactionWriteRateLimiter, - sensorReaderChunkMetadataListEntry, - targetResource, - writer); - } else { - logger.debug( - "{} [Compaction] page too small, use deserialize page merge", storageGroup); - // we have to deserialize chunks to merge pages + Entry<String, Map<TsFileSequenceReader, List<ChunkMetadata>>> + sensorReaderChunkMetadataListEntry = + new DefaultMapEntry<>(sensor, readerChunkMetadataListMap); + if (!sequence) { writeByDeserializePageMerge( device, compactionWriteRateLimiter, @@ -404,27 +354,81 @@ public class CompactionUtils { writer, modificationCache, modifications); + } else { + boolean isChunkEnoughLarge = true; + boolean isPageEnoughLarge = true; + for (List<ChunkMetadata> chunkMetadatas : readerChunkMetadataListMap.values()) { + for (ChunkMetadata chunkMetadata : chunkMetadatas) { + if (chunkMetadata.getNumOfPoints() + < IoTDBDescriptor.getInstance() + .getConfig() + .getMergePagePointNumberThreshold()) { + isPageEnoughLarge = false; + } + if (chunkMetadata.getNumOfPoints() + < IoTDBDescriptor.getInstance() + .getConfig() + .getMergeChunkPointNumberThreshold()) { + isChunkEnoughLarge = false; + } + } + } + // if a chunk is large enough, it's page must be large enough too + if (isChunkEnoughLarge) { + logger.debug( + "{} [Compaction] chunk enough large, use append chunk merge", storageGroup); + // append page in chunks, so we do not have to deserialize a chunk + writeByAppendChunkMerge( + device, + compactionWriteRateLimiter, + sensorReaderChunkMetadataListEntry, + targetResource, + writer); + } else if (isPageEnoughLarge) { + logger.debug( + "{} [Compaction] page enough large, use append page merge", storageGroup); + // append page in chunks, so we do not have to deserialize a chunk + writeByAppendPageMerge( + device, + compactionWriteRateLimiter, + sensorReaderChunkMetadataListEntry, + targetResource, + writer); + } else { + logger.debug( + "{} [Compaction] page too small, use deserialize page merge", storageGroup); + // we have to deserialize chunks to merge pages + writeByDeserializePageMerge( + device, + compactionWriteRateLimiter, + sensorReaderChunkMetadataListEntry, + targetResource, + writer, + modificationCache, + modifications); + } } } } } + writer.endChunkGroup(); + if (compactionLogger != null) { + compactionLogger.logDevice(device, writer.getPos()); + } } - writer.endChunkGroup(); - if (compactionLogger != null) { - compactionLogger.logDevice(device, writer.getPos()); - } - } - for (TsFileSequenceReader reader : tsFileSequenceReaderMap.values()) { - reader.close(); - } + for (TsFileResource tsFileResource : tsFileResources) { + targetResource.updatePlanIndexes(tsFileResource); + } + targetResource.serialize(); + writer.endFile(); + targetResource.close(); - for (TsFileResource tsFileResource : tsFileResources) { - targetResource.updatePlanIndexes(tsFileResource); + } finally { + for (TsFileSequenceReader reader : tsFileSequenceReaderMap.values()) { + reader.close(); + } } - targetResource.serialize(); - writer.endFile(); - targetResource.close(); } private static TsFileSequenceReader buildReaderFromTsFileResource(
