This is an automated email from the ASF dual-hosted git repository. sunzesong pushed a commit to branch pr_1758 in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit 5980e44e08057c7c797c9a02f3a96f4a5b16dd18 Author: CRZbulabula <[email protected]> AuthorDate: Tue Sep 29 23:16:15 2020 +0800 change merge strategy Use the approximate average value to determine whether to merge --- .../level/LevelTsFileManagement.java | 149 ++++++++++++--------- 1 file changed, 86 insertions(+), 63 deletions(-) diff --git a/server/src/main/java/org/apache/iotdb/db/engine/tsfilemanagement/level/LevelTsFileManagement.java b/server/src/main/java/org/apache/iotdb/db/engine/tsfilemanagement/level/LevelTsFileManagement.java index 2ad1de0..2b852d2 100644 --- a/server/src/main/java/org/apache/iotdb/db/engine/tsfilemanagement/level/LevelTsFileManagement.java +++ b/server/src/main/java/org/apache/iotdb/db/engine/tsfilemanagement/level/LevelTsFileManagement.java @@ -40,7 +40,11 @@ import java.util.Set; import java.util.TreeSet; import java.util.concurrent.ConcurrentSkipListMap; import java.util.concurrent.CopyOnWriteArrayList; + +import com.clearspring.analytics.stream.cardinality.HyperLogLog; +import com.clearspring.analytics.stream.cardinality.ICardinality; import org.apache.iotdb.db.conf.IoTDBDescriptor; +import org.apache.iotdb.db.conf.adapter.ActiveTimeSeriesCounter; import org.apache.iotdb.db.engine.cache.ChunkMetadataCache; import org.apache.iotdb.db.engine.cache.FileChunkPointSizeCache; import org.apache.iotdb.db.engine.storagegroup.TsFileResource; @@ -69,10 +73,14 @@ public class LevelTsFileManagement extends TsFileManagement { private final Map<Long, List<List<TsFileResource>>> unSequenceTsFileResources = new ConcurrentSkipListMap<>(); private final List<List<TsFileResource>> forkedSequenceTsFileResources = new ArrayList<>(); private final List<List<TsFileResource>> forkedUnSequenceTsFileResources = new ArrayList<>(); - private long forkedSeqListPointNum = 0; - private Set<String> forkedSeqListDeviceSet = new HashSet<>(); - private long forkedUnSeqListPointNum = 0; - private Set<String> forkedUnSeqListDeviceSet = new HashSet<>(); + + // Deciding whether or not to merge; + //private boolean forkedSeqListMergeFlag = false; + //private boolean forkedUnSeqListMergeFlag = false; + private double forkedSeqListPointNum = 0; + private double forkedSeqListDeviceSize = 0; + private double forkedUnSeqListPointNum = 0; + private double forkedUnSeqListDeviceSize = 0; public LevelTsFileManagement(String storageGroupName, String storageGroupDir) { super(storageGroupName, storageGroupDir); @@ -111,7 +119,7 @@ public class LevelTsFileManagement extends TsFileManagement { File newTargetFile = createNewTsFileName(sourceFile.getTsFile(), maxLevelNum - 1); TsFileResource targetResource = new TsFileResource(newTargetFile); List<TsFileResource> mergeFiles = new ArrayList<>(); - for (int i = currMergeFiles.size(); i >= 0; i--) { + for (int i = currMergeFiles.size() - 1; i >= 0; i--) { mergeFiles.addAll(sequenceTsFileResources.get(timePartitionId).get(i)); } HotCompactionUtils.merge(targetResource, mergeFiles, @@ -403,17 +411,68 @@ public class LevelTsFileManagement extends TsFileManagement { @Override public void forkCurrentFileList(long timePartition) { - Pair<Long, Set<String>> seqResult = forkTsFileList( + Pair<Double, Double> seqResult = forkTsFileList( forkedSequenceTsFileResources, sequenceTsFileResources.computeIfAbsent(timePartition, this::newSequenceTsFileResources)); forkedSeqListPointNum = seqResult.left; - forkedSeqListDeviceSet = seqResult.right; - Pair<Long, Set<String>> unSeqResult = forkTsFileList( + forkedSeqListDeviceSize = seqResult.right; + Pair<Double, Double> unSeqResult = forkTsFileList( forkedUnSequenceTsFileResources, - unSequenceTsFileResources - .computeIfAbsent(timePartition, this::newUnSequenceTsFileResources)); + unSequenceTsFileResources.computeIfAbsent(timePartition, this::newUnSequenceTsFileResources)); forkedUnSeqListPointNum = unSeqResult.left; - forkedUnSeqListDeviceSet = unSeqResult.right; + forkedUnSeqListDeviceSize = unSeqResult.right; + } + + private Pair<Double, Double> forkTsFileList( + List<List<TsFileResource>> forkedTsFileResources, + List rawTsFileResources) { + forkedTsFileResources.clear(); + // just fork part of the TsFile list, controlled by max_merge_chunk_point + long pointNum = 0; + // all flush to target file +// Set<String> deviceSet = new HashSet<>(); + ICardinality deviceSet = new HyperLogLog(13); + for (int i = 0; i < maxLevelNum - 1; i++) { + List<TsFileResource> forkedLevelTsFileResources = new ArrayList<>(); + Collection<TsFileResource> levelRawTsFileResources = (Collection<TsFileResource>) rawTsFileResources + .get(i); + synchronized (levelRawTsFileResources) { + for (TsFileResource tsFileResource : levelRawTsFileResources) { + if (tsFileResource.isClosed()) { + Map<String, Long> chunkPointMap = FileChunkPointSizeCache.getInstance() + .get(tsFileResource.getTsFile()); + for (Entry<String, Long> deviceChunkPoint : chunkPointMap.entrySet()) { + deviceSet.offer(deviceChunkPoint.getKey()); + pointNum += deviceChunkPoint.getValue(); + } + } + if (deviceSet.cardinality() > 0 + && pointNum / deviceSet.cardinality() >= maxChunkPointNum) { + forkedLevelTsFileResources.add(tsFileResource); + break; + } + forkedLevelTsFileResources.add(tsFileResource); + } + } + + if (deviceSet.cardinality() > 0 + && pointNum / deviceSet.cardinality() >= maxChunkPointNum) { + forkedTsFileResources.add(forkedLevelTsFileResources); + break; + } + forkedTsFileResources.add(forkedLevelTsFileResources); + //System.out.println(forkedLevelTsFileResources.size()); + //System.out.println(forkedTsFileResources.get(i).size()); + } + + // fill in empty file + while (forkedTsFileResources.size() < maxLevelNum) { + List<TsFileResource> emptyForkedLevelTsFileResources = new ArrayList<>(); + forkedTsFileResources.add(emptyForkedLevelTsFileResources); + } + + //return forkedTsFileResources.size() > 1; + return new Pair<>((double)pointNum, (double)deviceSet.cardinality()); } // private Pair<Long, Set<String>> forkTsFileList( @@ -431,52 +490,15 @@ public class LevelTsFileManagement extends TsFileManagement { // synchronized (levelRawTsFileResources) { // for (TsFileResource tsFileResource : levelRawTsFileResources) { // if (tsFileResource.isClosed()) { -// Map<String, Long> chunkPointMap = FileChunkPointSizeCache.getInstance() -// .get(tsFileResource.getTsFile()); -// for (Entry<String, Long> deviceChunkPoint : chunkPointMap.entrySet()) { -// deviceSet.add(deviceChunkPoint.getKey()); -// pointNum += deviceChunkPoint.getValue(); -// } -// } -// if (deviceSet.size() > 0 -// && pointNum / deviceSet.size() >= maxChunkPointNum) { -// break; +// forkedLevelTsFileResources.add(tsFileResource); // } // } // } -// if (deviceSet.size() > 0 -// && pointNum / deviceSet.size() >= maxChunkPointNum) { -// break; -// } // forkedTsFileResources.add(forkedLevelTsFileResources); // } // return new Pair<>(pointNum, deviceSet); // } - private Pair<Long, Set<String>> forkTsFileList( - List<List<TsFileResource>> forkedTsFileResources, - List rawTsFileResources) { - forkedTsFileResources.clear(); - // just fork part of the TsFile list, controlled by max_merge_chunk_point - long pointNum = 0; - // all flush to target file - Set<String> deviceSet = new HashSet<>(); - for (int i = 0; i < maxLevelNum - 1; i++) { - List<TsFileResource> forkedLevelTsFileResources = new ArrayList<>(); - Collection<TsFileResource> levelRawTsFileResources = (Collection<TsFileResource>) rawTsFileResources - .get(i); - synchronized (levelRawTsFileResources) { - for (TsFileResource tsFileResource : levelRawTsFileResources) { - if (tsFileResource.isClosed()) { - forkedLevelTsFileResources.add(tsFileResource); - } - } - } - forkedTsFileResources.add(forkedLevelTsFileResources); - } - return new Pair<>(pointNum, deviceSet); - } - @Override protected void merge(long timePartition) { merge(forkedSequenceTsFileResources, true, timePartition); @@ -489,21 +511,22 @@ public class LevelTsFileManagement extends TsFileManagement { long startTimeMillis = System.currentTimeMillis(); try { logger.info("{} start to filter hot compaction condition", storageGroupName); - long pointNum = sequence ? forkedSeqListPointNum : forkedUnSeqListPointNum; - Set<String> deviceSet = - sequence ? forkedSeqListDeviceSet : forkedUnSeqListDeviceSet; -// logger -// .info("{} current sg subLevel point num: {}, device num: {}", storageGroupName, pointNum, -// deviceSet.size()); + double pointNum = sequence ? forkedSeqListPointNum : forkedUnSeqListPointNum; + double deviceSize = + sequence ? forkedSeqListDeviceSize : forkedSeqListDeviceSize; + //boolean mergeFlag = sequence ? forkedSeqListMergeFlag : forkedUnSeqListMergeFlag; + + logger + .info("{} current sg subLevel point num: {}, approximate device num: {}", storageGroupName, pointNum, + deviceSize); HotCompactionLogger hotCompactionLogger = new HotCompactionLogger(storageGroupDir, storageGroupName); -// if (deviceSet.size() > 0 && pointNum / deviceSet.size() > IoTDBDescriptor.getInstance() -// .getConfig().getMergeChunkPointNumberThreshold()) { -// // merge all tsfile to last level -// logger.info("{} merge {} level tsfiles to next level", storageGroupName, -// mergeResources.size()); -// flushAllFilesToLastLevel(timePartition, mergeResources, hotCompactionLogger, sequence); -// } else { + if (deviceSize > 0 && pointNum / deviceSize >= maxChunkPointNum) { + // merge all tsfile to last level + logger.info("{} merge {} level tsfiles to next level", storageGroupName, + mergeResources.size()); + flushAllFilesToLastLevel(timePartition, mergeResources, hotCompactionLogger, sequence); + } else { for (int i = 0; i < maxLevelNum - 1; i++) { if (maxFileNumInEachLevel <= mergeResources.get(i).size()) { for (TsFileResource mergeResource : mergeResources.get(i)) { @@ -537,7 +560,7 @@ public class LevelTsFileManagement extends TsFileManagement { } } } -// } + } hotCompactionLogger.close(); File logFile = FSFactoryProducer.getFSFactory() .getFile(storageGroupDir, storageGroupName + HOT_COMPACTION_LOG_NAME);
