This is an automated email from the ASF dual-hosted git repository. sunzesong pushed a commit to branch pr_1758 in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit f6008ab0c56bce6d7cb1adbc5e0775e252727690 Author: 张凌哲 <[email protected]> AuthorDate: Thu Sep 24 12:06:32 2020 +0800 add device chunk point cache --- .../db/engine/cache/FileChunkPointSizeCache.java | 68 ++++++++++++++++ .../engine/storagegroup/StorageGroupProcessor.java | 1 + .../db/engine/storagegroup/TsFileProcessor.java | 33 +++++++- .../db/engine/storagegroup/TsFileResource.java | 6 +- .../level/LevelTsFileManagement.java | 92 ++++++++-------------- .../tsfilemanagement/utils/HotCompactionUtils.java | 20 +++-- 6 files changed, 150 insertions(+), 70 deletions(-) diff --git a/server/src/main/java/org/apache/iotdb/db/engine/cache/FileChunkPointSizeCache.java b/server/src/main/java/org/apache/iotdb/db/engine/cache/FileChunkPointSizeCache.java new file mode 100644 index 0000000..38ab644 --- /dev/null +++ b/server/src/main/java/org/apache/iotdb/db/engine/cache/FileChunkPointSizeCache.java @@ -0,0 +1,68 @@ +package org.apache.iotdb.db.engine.cache; + +import java.io.File; +import java.io.IOException; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import org.apache.iotdb.tsfile.file.metadata.ChunkMetadata; +import org.apache.iotdb.tsfile.read.TsFileSequenceReader; +import org.apache.iotdb.tsfile.read.common.Path; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class FileChunkPointSizeCache { + + private static final Logger logger = LoggerFactory.getLogger(FileChunkPointSizeCache.class); + + // (absolute path,avg chunk point size) + private Map<String, Map<String, Long>> tsfileDeviceChunkPointCache; + + private FileChunkPointSizeCache() { + tsfileDeviceChunkPointCache = new HashMap<>(); + } + + public static FileChunkPointSizeCache getInstance() { + return FileChunkPointSizeCacheHolder.INSTANCE; + } + + public Map<String, Long> get(File tsfile) { + String path = tsfile.getAbsolutePath(); + return tsfileDeviceChunkPointCache.computeIfAbsent(path, k -> { + Map<String, Long> deviceChunkPointMap = new HashMap<>(); + try { + if (tsfile.exists()) { + TsFileSequenceReader reader = new TsFileSequenceReader(path); + List<Path> pathList = reader.getAllPaths(); + for (Path sensorPath : pathList) { + String device = sensorPath.getDevice(); + long chunkPointNum = deviceChunkPointMap.getOrDefault(device, 0L); + List<ChunkMetadata> chunkMetadataList = reader.getChunkMetadataList(sensorPath); + for (ChunkMetadata chunkMetadata : chunkMetadataList) { + chunkPointNum += chunkMetadata.getNumOfPoints(); + } + deviceChunkPointMap.put(device, chunkPointNum); + } + } else { + logger.info("{} tsfile does not exist", path); + } + } catch (IOException e) { + logger.error( + "{} tsfile reader creates error", path, e); + } + return deviceChunkPointMap; + }); + } + + public void put(String tsfilePath, Map<String, Long> deviceChunkPointMap) { + tsfileDeviceChunkPointCache.put(tsfilePath, deviceChunkPointMap); + } + + /** + * singleton pattern. + */ + private static class FileChunkPointSizeCacheHolder { + + private static final FileChunkPointSizeCache INSTANCE = new FileChunkPointSizeCache(); + } +} diff --git a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java index 28783ac..0011532 100755 --- a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java +++ b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java @@ -1580,6 +1580,7 @@ public class StorageGroupProcessor { tsFileManagement.new HotCompactionMergeTask(this::closeHotCompactionMergeCallBack, tsFileProcessor.getTimeRangeId())); } catch (RejectedExecutionException | IOException e) { + e.printStackTrace(); this.closeHotCompactionMergeCallBack(); logger.error("{} hot compaction submit task failed", storageGroupName); } diff --git a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileProcessor.java b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileProcessor.java index bd48984..136b979 100644 --- a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileProcessor.java +++ b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileProcessor.java @@ -24,8 +24,10 @@ import java.io.File; import java.io.IOException; import java.util.ArrayList; import java.util.Collections; +import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.Map.Entry; import java.util.concurrent.ConcurrentLinkedDeque; import java.util.concurrent.locks.ReadWriteLock; import java.util.concurrent.locks.ReentrantReadWriteLock; @@ -35,6 +37,7 @@ import org.apache.iotdb.db.conf.IoTDBDescriptor; import org.apache.iotdb.db.conf.adapter.ActiveTimeSeriesCounter; import org.apache.iotdb.db.conf.adapter.CompressionRatio; import org.apache.iotdb.db.conf.adapter.IoTDBConfigDynamicAdapter; +import org.apache.iotdb.db.engine.cache.FileChunkPointSizeCache; import org.apache.iotdb.db.engine.flush.FlushManager; import org.apache.iotdb.db.engine.flush.MemTableFlushTask; import org.apache.iotdb.db.engine.flush.NotifyFlushMemTable; @@ -165,11 +168,13 @@ public class TsFileProcessor { } // update start time of this memtable - tsFileResource.updateStartTime(insertRowPlan.getDeviceId().getFullPath(), insertRowPlan.getTime()); + tsFileResource + .updateStartTime(insertRowPlan.getDeviceId().getFullPath(), insertRowPlan.getTime()); //for sequence tsfile, we update the endTime only when the file is prepared to be closed. //for unsequence tsfile, we have to update the endTime for each insertion. if (!sequence) { - tsFileResource.updateEndTime(insertRowPlan.getDeviceId().getFullPath(), insertRowPlan.getTime()); + tsFileResource + .updateEndTime(insertRowPlan.getDeviceId().getFullPath(), insertRowPlan.getTime()); } } @@ -209,7 +214,8 @@ public class TsFileProcessor { } tsFileResource - .updateStartTime(insertTabletPlan.getDeviceId().getFullPath(), insertTabletPlan.getTimes()[start]); + .updateStartTime(insertTabletPlan.getDeviceId().getFullPath(), + insertTabletPlan.getTimes()[start]); //for sequence tsfile, we update the endTime only when the file is prepared to be closed. //for unsequence tsfile, we have to update the endTime for each insertion. @@ -645,8 +651,29 @@ public class TsFileProcessor { } } + private void updateDeviceChunkPointSizeCache() { + Map<String, Map<String, List<ChunkMetadata>>> deviceMeasurementChunkMetadataMap = writer + .getMetadatasForQuery(); + Map<String, Long> deviceChunkPointMap = new HashMap<>(); + for (Entry<String, Map<String, List<ChunkMetadata>>> deviceMeasurementChunkMetadataEntry : deviceMeasurementChunkMetadataMap + .entrySet()) { + String device = deviceMeasurementChunkMetadataEntry.getKey(); + long chunkPointNum = 0; + for (Entry<String, List<ChunkMetadata>> measurementChunkMetadataEntry : deviceMeasurementChunkMetadataEntry + .getValue().entrySet()) { + for (ChunkMetadata chunkMetadata : measurementChunkMetadataEntry.getValue()) { + chunkPointNum += chunkMetadata.getNumOfPoints(); + } + } + deviceChunkPointMap.put(device, chunkPointNum); + } + FileChunkPointSizeCache.getInstance() + .put(tsFileResource.getTsFile().getAbsolutePath(), deviceChunkPointMap); + } + private void endFile() throws IOException, TsFileProcessorException { long closeStartTime = System.currentTimeMillis(); + updateDeviceChunkPointSizeCache(); tsFileResource.serialize(); writer.endFile(); tsFileResource.cleanCloseFlag(); diff --git a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileResource.java b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileResource.java index 2cbb7b8..f71f83a 100644 --- a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileResource.java +++ b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileResource.java @@ -62,7 +62,8 @@ import org.slf4j.LoggerFactory; public class TsFileResource { private static final Logger logger = LoggerFactory.getLogger(TsFileResource.class); - private static Map<String, String> cachedDevicePool = CachedStringPool.getInstance().getCachedPool(); + private static Map<String, String> cachedDevicePool = CachedStringPool.getInstance() + .getCachedPool(); // tsfile private File file; @@ -783,7 +784,8 @@ public class TsFileResource { } while (true) { - String hardlinkSuffix = TsFileConstant.PATH_SEPARATOR + System.currentTimeMillis() + "_" + random.nextLong(); + String hardlinkSuffix = + TsFileConstant.PATH_SEPARATOR + System.currentTimeMillis() + "_" + random.nextLong(); File hardlink = new File(file.getAbsolutePath() + hardlinkSuffix); try { diff --git a/server/src/main/java/org/apache/iotdb/db/engine/tsfilemanagement/level/LevelTsFileManagement.java b/server/src/main/java/org/apache/iotdb/db/engine/tsfilemanagement/level/LevelTsFileManagement.java index 0c74be3..c9173f7 100644 --- a/server/src/main/java/org/apache/iotdb/db/engine/tsfilemanagement/level/LevelTsFileManagement.java +++ b/server/src/main/java/org/apache/iotdb/db/engine/tsfilemanagement/level/LevelTsFileManagement.java @@ -31,7 +31,6 @@ import java.io.IOException; import java.nio.file.Files; import java.util.ArrayList; import java.util.Collection; -import java.util.HashMap; import java.util.HashSet; import java.util.Iterator; import java.util.List; @@ -43,17 +42,15 @@ import java.util.concurrent.ConcurrentSkipListMap; import java.util.concurrent.CopyOnWriteArrayList; import org.apache.iotdb.db.conf.IoTDBDescriptor; import org.apache.iotdb.db.engine.cache.ChunkMetadataCache; +import org.apache.iotdb.db.engine.cache.FileChunkPointSizeCache; import org.apache.iotdb.db.engine.storagegroup.TsFileResource; import org.apache.iotdb.db.engine.tsfilemanagement.TsFileManagement; import org.apache.iotdb.db.engine.tsfilemanagement.utils.HotCompactionLogAnalyzer; import org.apache.iotdb.db.engine.tsfilemanagement.utils.HotCompactionLogger; import org.apache.iotdb.db.engine.tsfilemanagement.utils.HotCompactionUtils; import org.apache.iotdb.db.query.control.FileReaderManager; -import org.apache.iotdb.tsfile.file.metadata.ChunkMetadata; import org.apache.iotdb.tsfile.fileSystem.FSFactoryProducer; -import org.apache.iotdb.tsfile.read.common.Path; import org.apache.iotdb.tsfile.utils.Pair; -import org.apache.iotdb.tsfile.write.schema.MeasurementSchema; import org.apache.iotdb.tsfile.write.writer.RestorableTsFileIOWriter; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -73,9 +70,9 @@ public class LevelTsFileManagement extends TsFileManagement { private final List<List<TsFileResource>> forkedSequenceTsFileResources = new ArrayList<>(); private final List<List<TsFileResource>> forkedUnSequenceTsFileResources = new ArrayList<>(); private long forkedSeqListPointNum = 0; - private Map<Path, MeasurementSchema> forkedSeqListPathMeasurementSchemaMap = new HashMap<>(); + private Set<String> forkedSeqListDeviceSet = new HashSet<>(); private long forkedUnSeqListPointNum = 0; - private Map<Path, MeasurementSchema> forkedUnSeqListPathMeasurementSchemaMap = new HashMap<>(); + private Set<String> forkedUnSeqListDeviceSet = new HashSet<>(); public LevelTsFileManagement(String storageGroupName, String storageGroupDir) { super(storageGroupName, storageGroupDir); @@ -405,74 +402,55 @@ public class LevelTsFileManagement extends TsFileManagement { } @Override - public void forkCurrentFileList(long timePartition) throws IOException { - Pair<Long, Map<Path, MeasurementSchema>> seqResult = forkTsFileList( + public void forkCurrentFileList(long timePartition) { + Pair<Long, Set<String>> seqResult = forkTsFileList( forkedSequenceTsFileResources, sequenceTsFileResources.computeIfAbsent(timePartition, this::newSequenceTsFileResources)); forkedSeqListPointNum = seqResult.left; - forkedSeqListPathMeasurementSchemaMap = seqResult.right; - Pair<Long, Map<Path, MeasurementSchema>> unSeqResult = forkTsFileList( + forkedSeqListDeviceSet = seqResult.right; + Pair<Long, Set<String>> unSeqResult = forkTsFileList( forkedUnSequenceTsFileResources, unSequenceTsFileResources .computeIfAbsent(timePartition, this::newUnSequenceTsFileResources)); forkedUnSeqListPointNum = unSeqResult.left; - forkedUnSeqListPathMeasurementSchemaMap = unSeqResult.right; + forkedUnSeqListDeviceSet = unSeqResult.right; } - private Pair<Long, Map<Path, MeasurementSchema>> forkTsFileList( + private Pair<Long, Set<String>> forkTsFileList( List<List<TsFileResource>> forkedTsFileResources, - List rawTsFileResources) throws IOException { + List rawTsFileResources) { forkedTsFileResources.clear(); // just fork part of the TsFile list, controlled by max_merge_chunk_point long pointNum = 0; // all flush to target file - Map<Path, MeasurementSchema> pathMeasurementSchemaMap = new HashMap<>(); + Set<String> deviceSet = new HashSet<>(); for (int i = 0; i < maxLevelNum - 1; i++) { List<TsFileResource> forkedLevelTsFileResources = new ArrayList<>(); Collection<TsFileResource> levelRawTsFileResources = (Collection<TsFileResource>) rawTsFileResources .get(i); - for (TsFileResource tsFileResource : levelRawTsFileResources) { - if (tsFileResource.isClosed()) { - RestorableTsFileIOWriter writer; - try { - writer = new RestorableTsFileIOWriter( - tsFileResource.getTsFile()); - } catch (Exception e) { - logger.error("[Hot Compaction] {} open writer failed", - tsFileResource.getTsFile().getPath(), e); - continue; - } - Map<String, Map<String, List<ChunkMetadata>>> schemaMap = writer - .getMetadatasForQuery(); - for (Entry<String, Map<String, List<ChunkMetadata>>> schemaMapEntry : schemaMap - .entrySet()) { - String device = schemaMapEntry.getKey(); - for (Entry<String, List<ChunkMetadata>> entry : schemaMapEntry.getValue() - .entrySet()) { - String measurement = entry.getKey(); - List<ChunkMetadata> chunkMetadataList = entry.getValue(); - for (ChunkMetadata chunkMetadata : chunkMetadataList) { - pointNum += chunkMetadata.getNumOfPoints(); - } - pathMeasurementSchemaMap.computeIfAbsent(new Path(device, measurement), k -> - new MeasurementSchema(measurement, chunkMetadataList.get(0).getDataType())); + synchronized (levelRawTsFileResources) { + for (TsFileResource tsFileResource : levelRawTsFileResources) { + if (tsFileResource.isClosed()) { + Map<String, Long> chunkPointMap = FileChunkPointSizeCache.getInstance() + .get(tsFileResource.getTsFile()); + for (Entry<String, Long> deviceChunkPoint : chunkPointMap.entrySet()) { + deviceSet.add(deviceChunkPoint.getKey()); + pointNum += deviceChunkPoint.getValue(); } } - writer.close(); - forkedLevelTsFileResources.add(tsFileResource); - } - if (pathMeasurementSchemaMap.size() > 0 - && pointNum / pathMeasurementSchemaMap.size() >= maxChunkPointNum) { - break; + if (deviceSet.size() > 0 + && pointNum / deviceSet.size() >= maxChunkPointNum) { + break; + } } } - if (pathMeasurementSchemaMap.size() > 0 - && pointNum / pathMeasurementSchemaMap.size() >= maxChunkPointNum) { + if (deviceSet.size() > 0 + && pointNum / deviceSet.size() >= maxChunkPointNum) { break; } forkedTsFileResources.add(forkedLevelTsFileResources); } - return new Pair<>(pointNum, pathMeasurementSchemaMap); + return new Pair<>(pointNum, deviceSet); } @Override @@ -488,24 +466,22 @@ public class LevelTsFileManagement extends TsFileManagement { try { logger.info("{} start to filter hot compaction condition", storageGroupName); long pointNum = sequence ? forkedSeqListPointNum : forkedUnSeqListPointNum; - Map<Path, MeasurementSchema> pathMeasurementSchemaMap = - sequence ? forkedSeqListPathMeasurementSchemaMap - : forkedUnSeqListPathMeasurementSchemaMap; - logger.info("{} current sg subLevel point num: {}, measurement num: {}", storageGroupName, - pointNum, pathMeasurementSchemaMap.size()); + Set<String> deviceSet = + sequence ? forkedSeqListDeviceSet : forkedUnSeqListDeviceSet; + logger + .info("{} current sg subLevel point num: {}, device num: {}", storageGroupName, pointNum, + deviceSet.size()); HotCompactionLogger hotCompactionLogger = new HotCompactionLogger(storageGroupDir, storageGroupName); - if (pathMeasurementSchemaMap.size() > 0 - && pointNum / pathMeasurementSchemaMap.size() > IoTDBDescriptor.getInstance().getConfig() - .getMergeChunkPointNumberThreshold()) { + if (deviceSet.size() > 0 && pointNum / deviceSet.size() > IoTDBDescriptor.getInstance() + .getConfig().getMergeChunkPointNumberThreshold()) { // merge all tsfile to last level logger.info("{} merge {} level tsfiles to next level", storageGroupName, mergeResources.size()); flushAllFilesToLastLevel(timePartition, mergeResources, hotCompactionLogger, sequence); } else { for (int i = 0; i < maxLevelNum - 1; i++) { - if (IoTDBDescriptor.getInstance().getConfig().getMaxFileNumInEachLevel() <= mergeResources - .get(i).size()) { + if (maxFileNumInEachLevel <= mergeResources.get(i).size()) { for (TsFileResource mergeResource : mergeResources.get(i)) { hotCompactionLogger.logFile(SOURCE_NAME, mergeResource.getTsFile()); } diff --git a/server/src/main/java/org/apache/iotdb/db/engine/tsfilemanagement/utils/HotCompactionUtils.java b/server/src/main/java/org/apache/iotdb/db/engine/tsfilemanagement/utils/HotCompactionUtils.java index b9db31a..1ee6faf 100644 --- a/server/src/main/java/org/apache/iotdb/db/engine/tsfilemanagement/utils/HotCompactionUtils.java +++ b/server/src/main/java/org/apache/iotdb/db/engine/tsfilemanagement/utils/HotCompactionUtils.java @@ -58,8 +58,9 @@ public class HotCompactionUtils { throw new IllegalStateException("Utility class"); } - private static Pair<ChunkMetadata, Chunk> writeSeqChunk(String storageGroup, - Map<String, TsFileSequenceReader> tsFileSequenceReaderMap, String deviceId, + private static Pair<ChunkMetadata, Chunk> readSeqChunk(String storageGroup, + Map<String, TsFileSequenceReader> tsFileSequenceReaderMap, + Map<String, List<String>> tsFileDevicesMap, String deviceId, String measurementId, List<TsFileResource> levelResources) throws IOException { @@ -69,7 +70,8 @@ public class HotCompactionUtils { TsFileSequenceReader reader = buildReaderFromTsFileResource(levelResource, tsFileSequenceReaderMap, storageGroup); - if (reader == null || !reader.getAllDevices().contains(deviceId)) { + if (reader == null || !tsFileDevicesMap.get(levelResource.getTsFile().getAbsolutePath()) + .contains(deviceId)) { continue; } List<ChunkMetadata> chunkMetadataList = reader @@ -125,7 +127,8 @@ public class HotCompactionUtils { private static void fillDeviceMeasurementMap(Set<String> devices, Map<String, Map<String, MeasurementSchema>> deviceMeasurementMap, List<TsFileResource> subLevelResources, - Map<String, TsFileSequenceReader> tsFileSequenceReaderMap, String storageGroup) + Map<String, TsFileSequenceReader> tsFileSequenceReaderMap, + Map<String, List<String>> tsFileDevicesMap, String storageGroup) throws IOException { for (TsFileResource levelResource : subLevelResources) { TsFileSequenceReader reader = buildReaderFromTsFileResource(levelResource, @@ -134,6 +137,8 @@ public class HotCompactionUtils { if (reader == null) { continue; } + tsFileDevicesMap + .putIfAbsent(levelResource.getTsFile().getAbsolutePath(), reader.getAllDevices()); List<Path> allPaths = reader.getAllPaths(); Map<String, TSDataType> allMeasurements = reader.getAllMeasurements(); // device, measurement -> chunk metadata list @@ -158,10 +163,11 @@ public class HotCompactionUtils { Set<String> devices, boolean sequence) throws IOException { RestorableTsFileIOWriter writer = new RestorableTsFileIOWriter(targetResource.getTsFile()); Map<String, TsFileSequenceReader> tsFileSequenceReaderMap = new HashMap<>(); + Map<String, List<String>> tsFileDevicesMap = new HashMap<>(); Map<String, Map<String, MeasurementSchema>> deviceMeasurementMap = new HashMap<>(); RateLimiter compactionRateLimiter = MergeManager.getINSTANCE().getMergeRateLimiter(); fillDeviceMeasurementMap(devices, deviceMeasurementMap, tsFileResources, - tsFileSequenceReaderMap, storageGroup); + tsFileSequenceReaderMap, tsFileDevicesMap, storageGroup); if (!sequence) { for (Entry<String, Map<String, MeasurementSchema>> deviceMeasurementEntry : deviceMeasurementMap .entrySet()) { @@ -199,8 +205,8 @@ public class HotCompactionUtils { for (Entry<String, MeasurementSchema> entry : deviceMeasurementEntry.getValue() .entrySet()) { String measurementId = entry.getKey(); - Pair<ChunkMetadata, Chunk> chunkPair = writeSeqChunk(storageGroup, - tsFileSequenceReaderMap, deviceId, measurementId, tsFileResources); + Pair<ChunkMetadata, Chunk> chunkPair = readSeqChunk(storageGroup, + tsFileSequenceReaderMap, tsFileDevicesMap, deviceId, measurementId, tsFileResources); ChunkMetadata newChunkMetadata = chunkPair.left; Chunk newChunk = chunkPair.right; if (newChunkMetadata != null && newChunk != null) {
