This is an automated email from the ASF dual-hosted git repository. sunzesong pushed a commit to branch pr_1758 in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit acc3d49df4ec1b32009e156766c645eb97eddca2 Author: 张凌哲 <[email protected]> AuthorDate: Fri Sep 25 00:11:29 2020 +0800 upgrade merge method --- .../tsfilemanagement/utils/HotCompactionUtils.java | 163 ++++++++++----------- 1 file changed, 78 insertions(+), 85 deletions(-) diff --git a/server/src/main/java/org/apache/iotdb/db/engine/tsfilemanagement/utils/HotCompactionUtils.java b/server/src/main/java/org/apache/iotdb/db/engine/tsfilemanagement/utils/HotCompactionUtils.java index 1ee6faf..d669aae 100644 --- a/server/src/main/java/org/apache/iotdb/db/engine/tsfilemanagement/utils/HotCompactionUtils.java +++ b/server/src/main/java/org/apache/iotdb/db/engine/tsfilemanagement/utils/HotCompactionUtils.java @@ -23,8 +23,11 @@ import static org.apache.iotdb.db.utils.MergeUtils.writeTVPair; import com.google.common.util.concurrent.RateLimiter; import java.io.IOException; +import java.util.ArrayList; +import java.util.Collection; import java.util.HashMap; import java.util.HashSet; +import java.util.Iterator; import java.util.List; import java.util.Map; import java.util.Map.Entry; @@ -37,7 +40,6 @@ import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType; import org.apache.iotdb.tsfile.read.TimeValuePair; import org.apache.iotdb.tsfile.read.TsFileSequenceReader; import org.apache.iotdb.tsfile.read.common.Chunk; -import org.apache.iotdb.tsfile.read.common.Path; import org.apache.iotdb.tsfile.read.reader.BatchDataIterator; import org.apache.iotdb.tsfile.read.reader.IChunkReader; import org.apache.iotdb.tsfile.read.reader.IPointReader; @@ -58,29 +60,14 @@ public class HotCompactionUtils { throw new IllegalStateException("Utility class"); } - private static Pair<ChunkMetadata, Chunk> readSeqChunk(String storageGroup, - Map<String, TsFileSequenceReader> tsFileSequenceReaderMap, - Map<String, List<String>> tsFileDevicesMap, String deviceId, - String measurementId, - List<TsFileResource> levelResources) - throws IOException { + private static Pair<ChunkMetadata, Chunk> readSeqChunk( + Map<TsFileSequenceReader, List<ChunkMetadata>> readerChunkMetadataMap) throws IOException { ChunkMetadata newChunkMetadata = null; Chunk newChunk = null; - for (TsFileResource levelResource : levelResources) { - TsFileSequenceReader reader = buildReaderFromTsFileResource(levelResource, - tsFileSequenceReaderMap, - storageGroup); - if (reader == null || !tsFileDevicesMap.get(levelResource.getTsFile().getAbsolutePath()) - .contains(deviceId)) { - continue; - } - List<ChunkMetadata> chunkMetadataList = reader - .getChunkMetadataList(new Path(deviceId, measurementId)); - if (chunkMetadataList == null) { - continue; - } - for (ChunkMetadata chunkMetadata : chunkMetadataList) { - Chunk chunk = reader.readMemChunk(chunkMetadata); + for (Entry<TsFileSequenceReader, List<ChunkMetadata>> entry : readerChunkMetadataMap + .entrySet()) { + for (ChunkMetadata chunkMetadata : entry.getValue()) { + Chunk chunk = entry.getKey().readMemChunk(chunkMetadata); if (newChunkMetadata == null) { newChunkMetadata = chunkMetadata; newChunk = chunk; @@ -93,20 +80,14 @@ public class HotCompactionUtils { return new Pair<>(newChunkMetadata, newChunk); } - private static long readUnseqChunk(String storageGroup, - Map<String, TsFileSequenceReader> tsFileSequenceReaderMap, String deviceId, long maxVersion, - String measurementId, - Map<Long, TimeValuePair> timeValuePairMap, List<TsFileResource> levelResources) + private static long readUnseqChunk( + Map<TsFileSequenceReader, List<ChunkMetadata>> readerChunkMetadataMap, long maxVersion, + Map<Long, TimeValuePair> timeValuePairMap) throws IOException { - for (TsFileResource levelResource : levelResources) { - TsFileSequenceReader reader = buildReaderFromTsFileResource(levelResource, - tsFileSequenceReaderMap, - storageGroup); - if (reader == null) { - continue; - } - List<ChunkMetadata> chunkMetadataList = reader - .getChunkMetadataList(new Path(deviceId, measurementId)); + for (Entry<TsFileSequenceReader, List<ChunkMetadata>> entry : readerChunkMetadataMap + .entrySet()) { + TsFileSequenceReader reader = entry.getKey(); + List<ChunkMetadata> chunkMetadataList = entry.getValue(); for (ChunkMetadata chunkMetadata : chunkMetadataList) { maxVersion = Math.max(chunkMetadata.getVersion(), maxVersion); IChunkReader chunkReader = new ChunkReaderByTimestamp( @@ -124,9 +105,7 @@ public class HotCompactionUtils { return maxVersion; } - private static void fillDeviceMeasurementMap(Set<String> devices, - Map<String, Map<String, MeasurementSchema>> deviceMeasurementMap, - List<TsFileResource> subLevelResources, + private static void fillTsFileDevicesMap(List<TsFileResource> subLevelResources, Map<String, TsFileSequenceReader> tsFileSequenceReaderMap, Map<String, List<String>> tsFileDevicesMap, String storageGroup) throws IOException { @@ -139,20 +118,6 @@ public class HotCompactionUtils { } tsFileDevicesMap .putIfAbsent(levelResource.getTsFile().getAbsolutePath(), reader.getAllDevices()); - List<Path> allPaths = reader.getAllPaths(); - Map<String, TSDataType> allMeasurements = reader.getAllMeasurements(); - // device, measurement -> chunk metadata list - for (Path path : allPaths) { - if (devices.contains(path.getDevice())) { - continue; - } - Map<String, MeasurementSchema> measurementSchemaMap = deviceMeasurementMap - .computeIfAbsent(path.getDevice(), k -> new HashMap<>()); - - // measurement, chunk metadata list - measurementSchemaMap.computeIfAbsent(path.getMeasurement(), k -> - new MeasurementSchema(k, allMeasurements.get(path.getMeasurement()))); - } } } @@ -166,47 +131,75 @@ public class HotCompactionUtils { Map<String, List<String>> tsFileDevicesMap = new HashMap<>(); Map<String, Map<String, MeasurementSchema>> deviceMeasurementMap = new HashMap<>(); RateLimiter compactionRateLimiter = MergeManager.getINSTANCE().getMergeRateLimiter(); - fillDeviceMeasurementMap(devices, deviceMeasurementMap, tsFileResources, - tsFileSequenceReaderMap, tsFileDevicesMap, storageGroup); - if (!sequence) { - for (Entry<String, Map<String, MeasurementSchema>> deviceMeasurementEntry : deviceMeasurementMap - .entrySet()) { - String deviceId = deviceMeasurementEntry.getKey(); - writer.startChunkGroup(deviceId); - long maxVersion = Long.MIN_VALUE; - for (Entry<String, MeasurementSchema> entry : deviceMeasurementEntry.getValue() + fillTsFileDevicesMap(tsFileResources, tsFileSequenceReaderMap, tsFileDevicesMap, storageGroup); + for (String device : devices) { + // sort chunkMeta by measurement + Map<String, Map<TsFileSequenceReader, List<ChunkMetadata>>> measurementChunkMetadataMap = new HashMap<>(); + writer.startChunkGroup(device); + for (TsFileResource levelResource : tsFileResources) { + TsFileSequenceReader reader = buildReaderFromTsFileResource(levelResource, + tsFileSequenceReaderMap, storageGroup); + Map<String, List<ChunkMetadata>> chunkMetadataMap = reader + .readChunkMetadataInDevice(device); + for (Entry<String, List<ChunkMetadata>> entry : chunkMetadataMap.entrySet()) { + for (ChunkMetadata chunkMetadata : entry.getValue()) { + Map<TsFileSequenceReader, List<ChunkMetadata>> readerChunkMetadataMap; + String measurementUid = chunkMetadata.getMeasurementUid(); + if (measurementChunkMetadataMap.containsKey(measurementUid)) { + readerChunkMetadataMap = measurementChunkMetadataMap.get(measurementUid); + } else { + readerChunkMetadataMap = new HashMap<>(); + } + List<ChunkMetadata> chunkMetadataList; + if (readerChunkMetadataMap.containsKey(reader)) { + chunkMetadataList = readerChunkMetadataMap.get(reader); + } else { + chunkMetadataList = new ArrayList<>(); + } + chunkMetadataList.add(chunkMetadata); + readerChunkMetadataMap.put(reader, chunkMetadataList); + measurementChunkMetadataMap + .put(chunkMetadata.getMeasurementUid(), readerChunkMetadataMap); + } + } + } + if (!sequence) { + for (Entry<String, Map<TsFileSequenceReader, List<ChunkMetadata>>> entry : measurementChunkMetadataMap .entrySet()) { - String measurementId = entry.getKey(); + long maxVersion = Long.MIN_VALUE; Map<Long, TimeValuePair> timeValuePairMap = new TreeMap<>(); - maxVersion = readUnseqChunk(storageGroup, tsFileSequenceReaderMap, deviceId, - maxVersion, measurementId, timeValuePairMap, tsFileResources); - IChunkWriter chunkWriter = new ChunkWriterImpl(entry.getValue()); + maxVersion = readUnseqChunk(entry.getValue(), maxVersion, timeValuePairMap); + Iterator<List<ChunkMetadata>> chunkMetadataListIterator = entry.getValue().values() + .iterator(); + if (!chunkMetadataListIterator.hasNext()) { + continue; + } + List<ChunkMetadata> chunkMetadataList = chunkMetadataListIterator.next(); + if (chunkMetadataList.size() <= 0) { + continue; + } + IChunkWriter chunkWriter = new ChunkWriterImpl( + new MeasurementSchema(entry.getKey(), chunkMetadataList.get(0).getDataType())); for (TimeValuePair timeValuePair : timeValuePairMap.values()) { writeTVPair(timeValuePair, chunkWriter); - targetResource.updateStartTime(deviceId, timeValuePair.getTimestamp()); - targetResource.updateEndTime(deviceId, timeValuePair.getTimestamp()); + targetResource.updateStartTime(device, timeValuePair.getTimestamp()); + targetResource.updateEndTime(device, timeValuePair.getTimestamp()); } // wait for limit write MergeManager .mergeRateLimiterAcquire(compactionRateLimiter, chunkWriter.getCurrentChunkSize()); chunkWriter.writeToFileWriter(writer); + writer.writeVersion(maxVersion); + writer.endChunkGroup(); + if (hotCompactionLogger != null) { + hotCompactionLogger.logDevice(device, writer.getPos()); + } } - writer.writeVersion(maxVersion); - writer.endChunkGroup(); - if (hotCompactionLogger != null) { - hotCompactionLogger.logDevice(deviceId, writer.getPos()); - } - } - } else { - for (Entry<String, Map<String, MeasurementSchema>> deviceMeasurementEntry : deviceMeasurementMap - .entrySet()) { - String deviceId = deviceMeasurementEntry.getKey(); - writer.startChunkGroup(deviceId); - for (Entry<String, MeasurementSchema> entry : deviceMeasurementEntry.getValue() + } else { + for (Entry<String, Map<TsFileSequenceReader, List<ChunkMetadata>>> entry : measurementChunkMetadataMap .entrySet()) { - String measurementId = entry.getKey(); - Pair<ChunkMetadata, Chunk> chunkPair = readSeqChunk(storageGroup, - tsFileSequenceReaderMap, tsFileDevicesMap, deviceId, measurementId, tsFileResources); + Pair<ChunkMetadata, Chunk> chunkPair = readSeqChunk( + entry.getValue()); ChunkMetadata newChunkMetadata = chunkPair.left; Chunk newChunk = chunkPair.right; if (newChunkMetadata != null && newChunk != null) { @@ -214,13 +207,13 @@ public class HotCompactionUtils { MergeManager.mergeRateLimiterAcquire(compactionRateLimiter, (long)newChunk.getHeader().getDataSize() + newChunk.getData().position()); writer.writeChunk(newChunk, newChunkMetadata); - targetResource.updateStartTime(deviceId, newChunkMetadata.getStartTime()); - targetResource.updateEndTime(deviceId, newChunkMetadata.getEndTime()); + targetResource.updateStartTime(device, newChunkMetadata.getStartTime()); + targetResource.updateEndTime(device, newChunkMetadata.getEndTime()); } } writer.endChunkGroup(); if (hotCompactionLogger != null) { - hotCompactionLogger.logDevice(deviceId, writer.getPos()); + hotCompactionLogger.logDevice(device, writer.getPos()); } } }
