This is an automated email from the ASF dual-hosted git repository. sunzesong pushed a commit to branch pr_1758 in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit 3a601a2463e866a0d984420d4c91ca95c23cb01d Author: 张凌哲 <[email protected]> AuthorDate: Fri Sep 25 01:13:31 2020 +0800 fix bug --- .../tsfilemanagement/utils/HotCompactionUtils.java | 39 ++++++++++++++-------- 1 file changed, 25 insertions(+), 14 deletions(-) diff --git a/server/src/main/java/org/apache/iotdb/db/engine/tsfilemanagement/utils/HotCompactionUtils.java b/server/src/main/java/org/apache/iotdb/db/engine/tsfilemanagement/utils/HotCompactionUtils.java index d669aae..2df848f 100644 --- a/server/src/main/java/org/apache/iotdb/db/engine/tsfilemanagement/utils/HotCompactionUtils.java +++ b/server/src/main/java/org/apache/iotdb/db/engine/tsfilemanagement/utils/HotCompactionUtils.java @@ -28,6 +28,7 @@ import java.util.Collection; import java.util.HashMap; import java.util.HashSet; import java.util.Iterator; +import java.util.LinkedHashMap; import java.util.List; import java.util.Map; import java.util.Map.Entry; @@ -105,10 +106,10 @@ public class HotCompactionUtils { return maxVersion; } - private static void fillTsFileDevicesMap(List<TsFileResource> subLevelResources, - Map<String, TsFileSequenceReader> tsFileSequenceReaderMap, - Map<String, List<String>> tsFileDevicesMap, String storageGroup) + private static Set<String> getTsFileDevicesSet(List<TsFileResource> subLevelResources, + Map<String, TsFileSequenceReader> tsFileSequenceReaderMap, String storageGroup) throws IOException { + Set<String> tsFileDevicesSet = new HashSet<>(); for (TsFileResource levelResource : subLevelResources) { TsFileSequenceReader reader = buildReaderFromTsFileResource(levelResource, tsFileSequenceReaderMap, @@ -116,11 +117,20 @@ public class HotCompactionUtils { if (reader == null) { continue; } - tsFileDevicesMap - .putIfAbsent(levelResource.getTsFile().getAbsolutePath(), reader.getAllDevices()); + tsFileDevicesSet.addAll(reader.getAllDevices()); } + return tsFileDevicesSet; } + /** + * + * @param targetResource the target resource to be merged to + * @param tsFileResources the source resource to be merged + * @param storageGroup the storage group name + * @param hotCompactionLogger the logger + * @param devices the devices to be skipped(used by recover) + * @throws IOException + */ @SuppressWarnings("squid:S3776") // Suppress high Cognitive Complexity warning public static void merge(TsFileResource targetResource, List<TsFileResource> tsFileResources, String storageGroup, @@ -128,14 +138,16 @@ public class HotCompactionUtils { Set<String> devices, boolean sequence) throws IOException { RestorableTsFileIOWriter writer = new RestorableTsFileIOWriter(targetResource.getTsFile()); Map<String, TsFileSequenceReader> tsFileSequenceReaderMap = new HashMap<>(); - Map<String, List<String>> tsFileDevicesMap = new HashMap<>(); - Map<String, Map<String, MeasurementSchema>> deviceMeasurementMap = new HashMap<>(); RateLimiter compactionRateLimiter = MergeManager.getINSTANCE().getMergeRateLimiter(); - fillTsFileDevicesMap(tsFileResources, tsFileSequenceReaderMap, tsFileDevicesMap, storageGroup); - for (String device : devices) { + Set<String> tsFileDevicesMap = getTsFileDevicesSet(tsFileResources, tsFileSequenceReaderMap, + storageGroup); + for (String device : tsFileDevicesMap) { + if (devices.contains(device)) { + continue; + } + writer.startChunkGroup(device); // sort chunkMeta by measurement Map<String, Map<TsFileSequenceReader, List<ChunkMetadata>>> measurementChunkMetadataMap = new HashMap<>(); - writer.startChunkGroup(device); for (TsFileResource levelResource : tsFileResources) { TsFileSequenceReader reader = buildReaderFromTsFileResource(levelResource, tsFileSequenceReaderMap, storageGroup); @@ -148,7 +160,7 @@ public class HotCompactionUtils { if (measurementChunkMetadataMap.containsKey(measurementUid)) { readerChunkMetadataMap = measurementChunkMetadataMap.get(measurementUid); } else { - readerChunkMetadataMap = new HashMap<>(); + readerChunkMetadataMap = new LinkedHashMap<>(); } List<ChunkMetadata> chunkMetadataList; if (readerChunkMetadataMap.containsKey(reader)) { @@ -164,9 +176,9 @@ public class HotCompactionUtils { } } if (!sequence) { + long maxVersion = Long.MIN_VALUE; for (Entry<String, Map<TsFileSequenceReader, List<ChunkMetadata>>> entry : measurementChunkMetadataMap .entrySet()) { - long maxVersion = Long.MIN_VALUE; Map<Long, TimeValuePair> timeValuePairMap = new TreeMap<>(); maxVersion = readUnseqChunk(entry.getValue(), maxVersion, timeValuePairMap); Iterator<List<ChunkMetadata>> chunkMetadataListIterator = entry.getValue().values() @@ -189,12 +201,11 @@ public class HotCompactionUtils { MergeManager .mergeRateLimiterAcquire(compactionRateLimiter, chunkWriter.getCurrentChunkSize()); chunkWriter.writeToFileWriter(writer); - writer.writeVersion(maxVersion); - writer.endChunkGroup(); if (hotCompactionLogger != null) { hotCompactionLogger.logDevice(device, writer.getPos()); } } + writer.endChunkGroup(); } else { for (Entry<String, Map<TsFileSequenceReader, List<ChunkMetadata>>> entry : measurementChunkMetadataMap .entrySet()) {
