This is an automated email from the ASF dual-hosted git repository. sunzesong pushed a commit to branch pr_1758 in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit 5fd7de3b4fe69e6558cf35c8e9d60660ba5f1bb7 Author: 张凌哲 <[email protected]> AuthorDate: Mon Oct 19 15:27:09 2020 +0800 uncomment flush all code --- .../level/LevelTsFileManagement.java | 199 +++++++++------------ 1 file changed, 88 insertions(+), 111 deletions(-) diff --git a/server/src/main/java/org/apache/iotdb/db/engine/tsfilemanagement/level/LevelTsFileManagement.java b/server/src/main/java/org/apache/iotdb/db/engine/tsfilemanagement/level/LevelTsFileManagement.java index a121871..d0e5d2b 100644 --- a/server/src/main/java/org/apache/iotdb/db/engine/tsfilemanagement/level/LevelTsFileManagement.java +++ b/server/src/main/java/org/apache/iotdb/db/engine/tsfilemanagement/level/LevelTsFileManagement.java @@ -88,10 +88,10 @@ public class LevelTsFileManagement extends TsFileManagement { private final List<List<TsFileResource>> forkedSequenceTsFileResources = new ArrayList<>(); private final List<List<TsFileResource>> forkedUnSequenceTsFileResources = new ArrayList<>(); -// private double forkedSeqListPointNum = 0; -// private double forkedSeqListMeasurementSize = 0; -// private double forkedUnSeqListPointNum = 0; -// private double forkedUnSeqListMeasurementSize = 0; + private double forkedSeqListPointNum = 0; + private double forkedSeqListMeasurementSize = 0; + private double forkedUnSeqListPointNum = 0; + private double forkedUnSeqListMeasurementSize = 0; public LevelTsFileManagement(String storageGroupName, String storageGroupDir) { super(storageGroupName, storageGroupDir); @@ -441,71 +441,14 @@ public class LevelTsFileManagement extends TsFileManagement { .computeIfAbsent(timePartition, this::newUnSequenceTsFileResources), maxUnseqLevelNum); } -// private Pair<Double, Double> forkTsFileList( -// List<List<TsFileResource>> forkedTsFileResources, -// List rawTsFileResources, int currMaxLevel) { -// forkedTsFileResources.clear(); -// // just fork part of the TsFile list, controlled by max_merge_chunk_point -// long pointNum = 0; -// // all flush to target file -// ICardinality measurementSet = new HyperLogLog(13); -// for (int i = 0; i < currMaxLevel - 1; i++) { -// List<TsFileResource> forkedLevelTsFileResources = new ArrayList<>(); -// Collection<TsFileResource> levelRawTsFileResources = (Collection<TsFileResource>) rawTsFileResources -// .get(i); -// synchronized (levelRawTsFileResources) { -// for (TsFileResource tsFileResource : levelRawTsFileResources) { -// if (tsFileResource.isClosed()) { -// String path = tsFileResource.getTsFile().getAbsolutePath(); -// try { -// if (tsFileResource.getTsFile().exists()) { -// TsFileSequenceReader reader = new TsFileSequenceReader(path); -// List<Path> pathList = reader.getAllPaths(); -// for (Path sensorPath : pathList) { -// measurementSet.offer(sensorPath.getFullPath()); -// List<ChunkMetadata> chunkMetadataList = reader.getChunkMetadataList(sensorPath); -// for (ChunkMetadata chunkMetadata : chunkMetadataList) { -// pointNum += chunkMetadata.getNumOfPoints(); -// } -// } -// } else { -// logger.info("{} tsfile does not exist", path); -// } -// } catch (IOException e) { -// logger.error( -// "{} tsfile reader creates error", path, e); -// } -// } -// if (measurementSet.cardinality() > 0 -// && pointNum / measurementSet.cardinality() >= maxChunkPointNum) { -// forkedLevelTsFileResources.add(tsFileResource); -// break; -// } -// forkedLevelTsFileResources.add(tsFileResource); -// } -// } -// -// if (measurementSet.cardinality() > 0 -// && pointNum / measurementSet.cardinality() >= maxChunkPointNum) { -// forkedTsFileResources.add(forkedLevelTsFileResources); -// break; -// } -// forkedTsFileResources.add(forkedLevelTsFileResources); -// } -// -// // fill in empty file -// while (forkedTsFileResources.size() < currMaxLevel) { -// List<TsFileResource> emptyForkedLevelTsFileResources = new ArrayList<>(); -// forkedTsFileResources.add(emptyForkedLevelTsFileResources); -// } -// -// return new Pair<>((double) pointNum, (double) measurementSet.cardinality()); -// } - - private void forkTsFileList( + private Pair<Double, Double> forkTsFileList( List<List<TsFileResource>> forkedTsFileResources, List rawTsFileResources, int currMaxLevel) { forkedTsFileResources.clear(); + // just fork part of the TsFile list, controlled by max_merge_chunk_point + long pointNum = 0; + // all flush to target file + ICardinality measurementSet = new HyperLogLog(13); for (int i = 0; i < currMaxLevel - 1; i++) { List<TsFileResource> forkedLevelTsFileResources = new ArrayList<>(); Collection<TsFileResource> levelRawTsFileResources = (Collection<TsFileResource>) rawTsFileResources @@ -513,10 +456,41 @@ public class LevelTsFileManagement extends TsFileManagement { synchronized (levelRawTsFileResources) { for (TsFileResource tsFileResource : levelRawTsFileResources) { if (tsFileResource.isClosed()) { + String path = tsFileResource.getTsFile().getAbsolutePath(); + try { + if (tsFileResource.getTsFile().exists()) { + TsFileSequenceReader reader = new TsFileSequenceReader(path); + List<Path> pathList = reader.getAllPaths(); + for (Path sensorPath : pathList) { + measurementSet.offer(sensorPath.getFullPath()); + List<ChunkMetadata> chunkMetadataList = reader.getChunkMetadataList(sensorPath); + for (ChunkMetadata chunkMetadata : chunkMetadataList) { + pointNum += chunkMetadata.getNumOfPoints(); + } + } + reader.close(); + } else { + logger.info("{} tsfile does not exist", path); + } + } catch (IOException e) { + logger.error( + "{} tsfile reader creates error", path, e); + } + } + if (measurementSet.cardinality() > 0 + && pointNum / measurementSet.cardinality() >= maxChunkPointNum) { forkedLevelTsFileResources.add(tsFileResource); + break; } + forkedLevelTsFileResources.add(tsFileResource); } } + + if (measurementSet.cardinality() > 0 + && pointNum / measurementSet.cardinality() >= maxChunkPointNum) { + forkedTsFileResources.add(forkedLevelTsFileResources); + break; + } forkedTsFileResources.add(forkedLevelTsFileResources); } @@ -525,6 +499,8 @@ public class LevelTsFileManagement extends TsFileManagement { List<TsFileResource> emptyForkedLevelTsFileResources = new ArrayList<>(); forkedTsFileResources.add(emptyForkedLevelTsFileResources); } + + return new Pair<>((double) pointNum, (double) measurementSet.cardinality()); } @Override @@ -545,59 +521,60 @@ public class LevelTsFileManagement extends TsFileManagement { long startTimeMillis = System.currentTimeMillis(); try { logger.info("{} start to filter hot compaction condition", storageGroupName); -// double pointNum = sequence ? forkedSeqListPointNum : forkedUnSeqListPointNum; -// double measurementSize = -// sequence ? forkedSeqListMeasurementSize : forkedUnSeqListMeasurementSize; -// logger -// .info("{} current sg subLevel point num: {}, approximate measurement num: {}", -// storageGroupName, pointNum, -// measurementSize); + double pointNum = sequence ? forkedSeqListPointNum : forkedUnSeqListPointNum; + double measurementSize = + sequence ? forkedSeqListMeasurementSize : forkedUnSeqListMeasurementSize; + logger + .info("{} current sg subLevel point num: {}, approximate measurement num: {}", + storageGroupName, pointNum, + measurementSize); HotCompactionLogger hotCompactionLogger = new HotCompactionLogger(storageGroupDir, storageGroupName); -// if (measurementSize > 0 && pointNum / measurementSize >= maxChunkPointNum) { -// // merge all tsfile to last level -// logger.info("{} merge {} level tsfiles to next level", storageGroupName, -// mergeResources.size()); -// flushAllFilesToLastLevel(timePartition, mergeResources, hotCompactionLogger, sequence); -// } else { - for (int i = 0; i < currMaxLevel - 1; i++) { - if (currMaxFileNumInEachLevel <= mergeResources.get(i).size()) { - if (!sequence && i == currMaxLevel - 2) { - merge(isForceFullMerge, getTsFileList(true), mergeResources.get(i), Long.MAX_VALUE); - } else { - for (TsFileResource mergeResource : mergeResources.get(i)) { - hotCompactionLogger.logFile(SOURCE_NAME, mergeResource.getTsFile()); - } - File newLevelFile = createNewTsFileName(mergeResources.get(i).get(0).getTsFile(), - i + 1); - hotCompactionLogger.logSequence(sequence); - hotCompactionLogger.logFile(TARGET_NAME, newLevelFile); - logger.info("{} [Hot Compaction] merge level-{}'s {} tsfiles to next level vm", - storageGroupName, i, mergeResources.get(i).size()); - - TsFileResource newResource = new TsFileResource(newLevelFile); - HotCompactionUtils - .merge(newResource, mergeResources.get(i), storageGroupName, hotCompactionLogger, - new HashSet<>(), sequence); - writeLock(); - try { - deleteLevelFiles(timePartition, mergeResources.get(i)); - hotCompactionLogger.logMergeFinish(); - if (sequence) { - sequenceTsFileResources.get(timePartition).get(i + 1).add(newResource); - } else { - unSequenceTsFileResources.get(timePartition).get(i + 1).add(newResource); + if (measurementSize > 0 && pointNum / measurementSize >= maxChunkPointNum) { + // merge all tsfile to last level + logger.info("{} merge {} level tsfiles to next level", storageGroupName, + mergeResources.size()); + flushAllFilesToLastLevel(timePartition, mergeResources, hotCompactionLogger, sequence); + } else { + for (int i = 0; i < currMaxLevel - 1; i++) { + if (currMaxFileNumInEachLevel <= mergeResources.get(i).size()) { + if (!sequence && i == currMaxLevel - 2) { + // do not merge current unseq file level to upper level and just merge all of them to seq file + merge(isForceFullMerge, getTsFileList(true), mergeResources.get(i), Long.MAX_VALUE); + } else { + for (TsFileResource mergeResource : mergeResources.get(i)) { + hotCompactionLogger.logFile(SOURCE_NAME, mergeResource.getTsFile()); } - if (mergeResources.size() > i + 1) { - mergeResources.get(i + 1).add(newResource); + File newLevelFile = createNewTsFileName(mergeResources.get(i).get(0).getTsFile(), + i + 1); + hotCompactionLogger.logSequence(sequence); + hotCompactionLogger.logFile(TARGET_NAME, newLevelFile); + logger.info("{} [Hot Compaction] merge level-{}'s {} tsfiles to next level vm", + storageGroupName, i, mergeResources.get(i).size()); + + TsFileResource newResource = new TsFileResource(newLevelFile); + HotCompactionUtils + .merge(newResource, mergeResources.get(i), storageGroupName, hotCompactionLogger, + new HashSet<>(), sequence); + writeLock(); + try { + deleteLevelFiles(timePartition, mergeResources.get(i)); + hotCompactionLogger.logMergeFinish(); + if (sequence) { + sequenceTsFileResources.get(timePartition).get(i + 1).add(newResource); + } else { + unSequenceTsFileResources.get(timePartition).get(i + 1).add(newResource); + } + if (mergeResources.size() > i + 1) { + mergeResources.get(i + 1).add(newResource); + } + } finally { + writeUnlock(); } - } finally { - writeUnlock(); } } } } -// } hotCompactionLogger.close(); File logFile = FSFactoryProducer.getFSFactory() .getFile(storageGroupDir, storageGroupName + HOT_COMPACTION_LOG_NAME);
