This is an automated email from the ASF dual-hosted git repository. sunzesong pushed a commit to branch pr_1758 in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit 257fa620b650f0499b31fd9248307989590273ca Author: 张凌哲 <[email protected]> AuthorDate: Wed Oct 7 16:23:51 2020 +0800 comment to fix oom temp --- .../level/LevelTsFileManagement.java | 205 ++++++++++++--------- 1 file changed, 113 insertions(+), 92 deletions(-) diff --git a/server/src/main/java/org/apache/iotdb/db/engine/tsfilemanagement/level/LevelTsFileManagement.java b/server/src/main/java/org/apache/iotdb/db/engine/tsfilemanagement/level/LevelTsFileManagement.java index 8c5f482..a121871 100644 --- a/server/src/main/java/org/apache/iotdb/db/engine/tsfilemanagement/level/LevelTsFileManagement.java +++ b/server/src/main/java/org/apache/iotdb/db/engine/tsfilemanagement/level/LevelTsFileManagement.java @@ -88,10 +88,10 @@ public class LevelTsFileManagement extends TsFileManagement { private final List<List<TsFileResource>> forkedSequenceTsFileResources = new ArrayList<>(); private final List<List<TsFileResource>> forkedUnSequenceTsFileResources = new ArrayList<>(); - private double forkedSeqListPointNum = 0; - private double forkedSeqListMeasurementSize = 0; - private double forkedUnSeqListPointNum = 0; - private double forkedUnSeqListMeasurementSize = 0; +// private double forkedSeqListPointNum = 0; +// private double forkedSeqListMeasurementSize = 0; +// private double forkedUnSeqListPointNum = 0; +// private double forkedUnSeqListMeasurementSize = 0; public LevelTsFileManagement(String storageGroupName, String storageGroupDir) { super(storageGroupName, storageGroupDir); @@ -431,28 +431,81 @@ public class LevelTsFileManagement extends TsFileManagement { @Override public void forkCurrentFileList(long timePartition) { - Pair<Double, Double> seqResult = forkTsFileList( + forkTsFileList( forkedSequenceTsFileResources, sequenceTsFileResources.computeIfAbsent(timePartition, this::newSequenceTsFileResources), maxLevelNum); - forkedSeqListPointNum = seqResult.left; - forkedSeqListMeasurementSize = seqResult.right; - Pair<Double, Double> unSeqResult = forkTsFileList( + forkTsFileList( forkedUnSequenceTsFileResources, unSequenceTsFileResources .computeIfAbsent(timePartition, this::newUnSequenceTsFileResources), maxUnseqLevelNum); - forkedUnSeqListPointNum = unSeqResult.left; - forkedUnSeqListMeasurementSize = unSeqResult.right; } - private Pair<Double, Double> forkTsFileList( +// private Pair<Double, Double> forkTsFileList( +// List<List<TsFileResource>> forkedTsFileResources, +// List rawTsFileResources, int currMaxLevel) { +// forkedTsFileResources.clear(); +// // just fork part of the TsFile list, controlled by max_merge_chunk_point +// long pointNum = 0; +// // all flush to target file +// ICardinality measurementSet = new HyperLogLog(13); +// for (int i = 0; i < currMaxLevel - 1; i++) { +// List<TsFileResource> forkedLevelTsFileResources = new ArrayList<>(); +// Collection<TsFileResource> levelRawTsFileResources = (Collection<TsFileResource>) rawTsFileResources +// .get(i); +// synchronized (levelRawTsFileResources) { +// for (TsFileResource tsFileResource : levelRawTsFileResources) { +// if (tsFileResource.isClosed()) { +// String path = tsFileResource.getTsFile().getAbsolutePath(); +// try { +// if (tsFileResource.getTsFile().exists()) { +// TsFileSequenceReader reader = new TsFileSequenceReader(path); +// List<Path> pathList = reader.getAllPaths(); +// for (Path sensorPath : pathList) { +// measurementSet.offer(sensorPath.getFullPath()); +// List<ChunkMetadata> chunkMetadataList = reader.getChunkMetadataList(sensorPath); +// for (ChunkMetadata chunkMetadata : chunkMetadataList) { +// pointNum += chunkMetadata.getNumOfPoints(); +// } +// } +// } else { +// logger.info("{} tsfile does not exist", path); +// } +// } catch (IOException e) { +// logger.error( +// "{} tsfile reader creates error", path, e); +// } +// } +// if (measurementSet.cardinality() > 0 +// && pointNum / measurementSet.cardinality() >= maxChunkPointNum) { +// forkedLevelTsFileResources.add(tsFileResource); +// break; +// } +// forkedLevelTsFileResources.add(tsFileResource); +// } +// } +// +// if (measurementSet.cardinality() > 0 +// && pointNum / measurementSet.cardinality() >= maxChunkPointNum) { +// forkedTsFileResources.add(forkedLevelTsFileResources); +// break; +// } +// forkedTsFileResources.add(forkedLevelTsFileResources); +// } +// +// // fill in empty file +// while (forkedTsFileResources.size() < currMaxLevel) { +// List<TsFileResource> emptyForkedLevelTsFileResources = new ArrayList<>(); +// forkedTsFileResources.add(emptyForkedLevelTsFileResources); +// } +// +// return new Pair<>((double) pointNum, (double) measurementSet.cardinality()); +// } + + private void forkTsFileList( List<List<TsFileResource>> forkedTsFileResources, List rawTsFileResources, int currMaxLevel) { forkedTsFileResources.clear(); - // just fork part of the TsFile list, controlled by max_merge_chunk_point - long pointNum = 0; - // all flush to target file - ICardinality measurementSet = new HyperLogLog(13); for (int i = 0; i < currMaxLevel - 1; i++) { List<TsFileResource> forkedLevelTsFileResources = new ArrayList<>(); Collection<TsFileResource> levelRawTsFileResources = (Collection<TsFileResource>) rawTsFileResources @@ -460,40 +513,10 @@ public class LevelTsFileManagement extends TsFileManagement { synchronized (levelRawTsFileResources) { for (TsFileResource tsFileResource : levelRawTsFileResources) { if (tsFileResource.isClosed()) { - String path = tsFileResource.getTsFile().getAbsolutePath(); - try { - if (tsFileResource.getTsFile().exists()) { - TsFileSequenceReader reader = new TsFileSequenceReader(path); - List<Path> pathList = reader.getAllPaths(); - for (Path sensorPath : pathList) { - measurementSet.offer(sensorPath.getFullPath()); - List<ChunkMetadata> chunkMetadataList = reader.getChunkMetadataList(sensorPath); - for (ChunkMetadata chunkMetadata : chunkMetadataList) { - pointNum += chunkMetadata.getNumOfPoints(); - } - } - } else { - logger.info("{} tsfile does not exist", path); - } - } catch (IOException e) { - logger.error( - "{} tsfile reader creates error", path, e); - } - } - if (measurementSet.cardinality() > 0 - && pointNum / measurementSet.cardinality() >= maxChunkPointNum) { forkedLevelTsFileResources.add(tsFileResource); - break; } - forkedLevelTsFileResources.add(tsFileResource); } } - - if (measurementSet.cardinality() > 0 - && pointNum / measurementSet.cardinality() >= maxChunkPointNum) { - forkedTsFileResources.add(forkedLevelTsFileResources); - break; - } forkedTsFileResources.add(forkedLevelTsFileResources); } @@ -502,8 +525,6 @@ public class LevelTsFileManagement extends TsFileManagement { List<TsFileResource> emptyForkedLevelTsFileResources = new ArrayList<>(); forkedTsFileResources.add(emptyForkedLevelTsFileResources); } - - return new Pair<>((double) pointNum, (double) measurementSet.cardinality()); } @Override @@ -524,59 +545,59 @@ public class LevelTsFileManagement extends TsFileManagement { long startTimeMillis = System.currentTimeMillis(); try { logger.info("{} start to filter hot compaction condition", storageGroupName); - double pointNum = sequence ? forkedSeqListPointNum : forkedUnSeqListPointNum; - double measurementSize = - sequence ? forkedSeqListMeasurementSize : forkedUnSeqListMeasurementSize; - logger - .info("{} current sg subLevel point num: {}, approximate measurement num: {}", - storageGroupName, pointNum, - measurementSize); +// double pointNum = sequence ? forkedSeqListPointNum : forkedUnSeqListPointNum; +// double measurementSize = +// sequence ? forkedSeqListMeasurementSize : forkedUnSeqListMeasurementSize; +// logger +// .info("{} current sg subLevel point num: {}, approximate measurement num: {}", +// storageGroupName, pointNum, +// measurementSize); HotCompactionLogger hotCompactionLogger = new HotCompactionLogger(storageGroupDir, storageGroupName); - if (measurementSize > 0 && pointNum / measurementSize >= maxChunkPointNum) { - // merge all tsfile to last level - logger.info("{} merge {} level tsfiles to next level", storageGroupName, - mergeResources.size()); - flushAllFilesToLastLevel(timePartition, mergeResources, hotCompactionLogger, sequence); - } else { - for (int i = 0; i < currMaxLevel - 1; i++) { - if (currMaxFileNumInEachLevel <= mergeResources.get(i).size()) { - if (!sequence && i == currMaxLevel - 2) { - merge(isForceFullMerge, getTsFileList(true), mergeResources.get(i), Long.MAX_VALUE); - } else { - for (TsFileResource mergeResource : mergeResources.get(i)) { - hotCompactionLogger.logFile(SOURCE_NAME, mergeResource.getTsFile()); +// if (measurementSize > 0 && pointNum / measurementSize >= maxChunkPointNum) { +// // merge all tsfile to last level +// logger.info("{} merge {} level tsfiles to next level", storageGroupName, +// mergeResources.size()); +// flushAllFilesToLastLevel(timePartition, mergeResources, hotCompactionLogger, sequence); +// } else { + for (int i = 0; i < currMaxLevel - 1; i++) { + if (currMaxFileNumInEachLevel <= mergeResources.get(i).size()) { + if (!sequence && i == currMaxLevel - 2) { + merge(isForceFullMerge, getTsFileList(true), mergeResources.get(i), Long.MAX_VALUE); + } else { + for (TsFileResource mergeResource : mergeResources.get(i)) { + hotCompactionLogger.logFile(SOURCE_NAME, mergeResource.getTsFile()); + } + File newLevelFile = createNewTsFileName(mergeResources.get(i).get(0).getTsFile(), + i + 1); + hotCompactionLogger.logSequence(sequence); + hotCompactionLogger.logFile(TARGET_NAME, newLevelFile); + logger.info("{} [Hot Compaction] merge level-{}'s {} tsfiles to next level vm", + storageGroupName, i, mergeResources.get(i).size()); + + TsFileResource newResource = new TsFileResource(newLevelFile); + HotCompactionUtils + .merge(newResource, mergeResources.get(i), storageGroupName, hotCompactionLogger, + new HashSet<>(), sequence); + writeLock(); + try { + deleteLevelFiles(timePartition, mergeResources.get(i)); + hotCompactionLogger.logMergeFinish(); + if (sequence) { + sequenceTsFileResources.get(timePartition).get(i + 1).add(newResource); + } else { + unSequenceTsFileResources.get(timePartition).get(i + 1).add(newResource); } - File newLevelFile = createNewTsFileName(mergeResources.get(i).get(0).getTsFile(), - i + 1); - hotCompactionLogger.logSequence(sequence); - hotCompactionLogger.logFile(TARGET_NAME, newLevelFile); - logger.info("{} [Hot Compaction] merge level-{}'s {} tsfiles to next level vm", - storageGroupName, i, mergeResources.get(i).size()); - - TsFileResource newResource = new TsFileResource(newLevelFile); - HotCompactionUtils - .merge(newResource, mergeResources.get(i), storageGroupName, hotCompactionLogger, - new HashSet<>(), sequence); - writeLock(); - try { - deleteLevelFiles(timePartition, mergeResources.get(i)); - hotCompactionLogger.logMergeFinish(); - if (sequence) { - sequenceTsFileResources.get(timePartition).get(i + 1).add(newResource); - } else { - unSequenceTsFileResources.get(timePartition).get(i + 1).add(newResource); - } - if (mergeResources.size() > i + 1) { - mergeResources.get(i + 1).add(newResource); - } - } finally { - writeUnlock(); + if (mergeResources.size() > i + 1) { + mergeResources.get(i + 1).add(newResource); } + } finally { + writeUnlock(); } } } } +// } hotCompactionLogger.close(); File logFile = FSFactoryProducer.getFSFactory() .getFile(storageGroupDir, storageGroupName + HOT_COMPACTION_LOG_NAME);
