This is an automated email from the ASF dual-hosted git repository. xingtanzjr pushed a commit to branch compaction_refine in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit edc7244f43d7eda4e369ea7cc2b01c223be3c902 Author: Jinrui.Zhang <[email protected]> AuthorDate: Sat Dec 17 00:23:56 2022 +0800 tmp save --- .../inner/sizetiered/InnerCompactionCandidate.java | 52 +++++++++++ .../sizetiered/SizeTieredCompactionSelector.java | 103 +++++++++++---------- 2 files changed, 106 insertions(+), 49 deletions(-) diff --git a/server/src/main/java/org/apache/iotdb/db/engine/compaction/inner/sizetiered/InnerCompactionCandidate.java b/server/src/main/java/org/apache/iotdb/db/engine/compaction/inner/sizetiered/InnerCompactionCandidate.java new file mode 100644 index 0000000000..4ccd475175 --- /dev/null +++ b/server/src/main/java/org/apache/iotdb/db/engine/compaction/inner/sizetiered/InnerCompactionCandidate.java @@ -0,0 +1,52 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.db.engine.compaction.inner.sizetiered; + +import org.apache.iotdb.db.engine.storagegroup.TsFileResource; + +import java.util.ArrayList; +import java.util.List; + +public class InnerCompactionCandidate { + private List<TsFileResource> tsFileResources; + private long totalFileSize; + + public InnerCompactionCandidate() { + this.tsFileResources = new ArrayList<>(); + this.totalFileSize = 0L; + } + + public void addTsFileResource(TsFileResource resource) { + this.tsFileResources.add(resource); + totalFileSize += resource.getTsFileSize(); + } + + public int getFileCount() { + return tsFileResources.size(); + } + + public long getTotalFileSize() { + return totalFileSize; + } + + public List<TsFileResource> getTsFileResources() { + return tsFileResources; + } +} diff --git a/server/src/main/java/org/apache/iotdb/db/engine/compaction/inner/sizetiered/SizeTieredCompactionSelector.java b/server/src/main/java/org/apache/iotdb/db/engine/compaction/inner/sizetiered/SizeTieredCompactionSelector.java index d8430029c9..f8d60a2fe1 100644 --- a/server/src/main/java/org/apache/iotdb/db/engine/compaction/inner/sizetiered/SizeTieredCompactionSelector.java +++ b/server/src/main/java/org/apache/iotdb/db/engine/compaction/inner/sizetiered/SizeTieredCompactionSelector.java @@ -59,7 +59,6 @@ public class SizeTieredCompactionSelector protected String storageGroupName; protected String dataRegionId; protected long timePartition; - protected List<TsFileResource> tsFileResources; protected boolean sequence; protected TsFileManager tsFileManager; protected boolean hasNextTimePartition; @@ -87,64 +86,54 @@ public class SizeTieredCompactionSelector * longer search for higher layers), otherwise it will return true. * * @param level the level to be searched - * @param taskPriorityQueue it stores the batches of files to be compacted and the total size of * each batch * @return return whether to continue the search to higher levels * @throws IOException */ - private boolean selectLevelTask( - int level, PriorityQueue<Pair<List<TsFileResource>, Long>> taskPriorityQueue) + private List<InnerCompactionCandidate> selectLevelTask(List<TsFileResource> tsFileResources, int level) throws IOException { - boolean shouldContinueToSearch = true; - List<TsFileResource> selectedFileList = new ArrayList<>(); - long selectedFileSize = 0L; - long targetCompactionFileSize = config.getTargetCompactionFileSize(); - + List<InnerCompactionCandidate> result = new ArrayList<>(); + //sort TsFileResources by inner level in ascending +// tsFileResources.sort(new TsFileResourceInnerLevelComparator()); + InnerCompactionCandidate candidate = new InnerCompactionCandidate(); for (TsFileResource currentFile : tsFileResources) { - TsFileNameGenerator.TsFileName currentName = - TsFileNameGenerator.getTsFileName(currentFile.getTsFile().getName()); - if (currentName.getInnerCompactionCnt() != level) { - if (selectedFileList.size() > 1) { - taskPriorityQueue.add(new Pair<>(new ArrayList<>(selectedFileList), selectedFileSize)); - shouldContinueToSearch = false; - } - selectedFileList = new ArrayList<>(); - selectedFileSize = 0L; + if (tsFileShouldBeSkipped(currentFile, level)) { continue; } + // 为什么 ? if (currentFile.getStatus() != TsFileResourceStatus.CLOSED) { - selectedFileList.clear(); - selectedFileSize = 0L; + candidate = new InnerCompactionCandidate(); continue; } - LOGGER.debug("Current File is {}, size is {}", currentFile, currentFile.getTsFileSize()); - selectedFileList.add(currentFile); - selectedFileSize += currentFile.getTsFileSize(); - LOGGER.debug( - "Add tsfile {}, current select file num is {}, size is {}", - currentFile, - selectedFileList.size(), - selectedFileSize); + LOGGER.debug("file added. File is {}, size is {}", currentFile, currentFile.getTsFileSize()); + candidate.addTsFileResource(currentFile); + // if the file size or file num reach threshold - if (selectedFileSize >= targetCompactionFileSize - || selectedFileList.size() >= config.getMaxInnerCompactionCandidateFileNum()) { + if (candidateSatisfied(candidate)) { // submit the task - if (selectedFileList.size() > 1) { - taskPriorityQueue.add(new Pair<>(new ArrayList<>(selectedFileList), selectedFileSize)); - shouldContinueToSearch = false; + if (candidate.getFileCount() > 1) { + result.add(candidate); } - selectedFileList = new ArrayList<>(); - selectedFileSize = 0L; + candidate = new InnerCompactionCandidate(); } } // if next time partition exists // submit a merge task even it does not meet the requirement for file num or file size - if (hasNextTimePartition && selectedFileList.size() > 1) { - taskPriorityQueue.add(new Pair<>(new ArrayList<>(selectedFileList), selectedFileSize)); - shouldContinueToSearch = false; + if (hasNextTimePartition && candidate.getTotalFileSize() > 1) { + result.add(candidate); } - return shouldContinueToSearch; + return result; + } + + private boolean candidateSatisfied(InnerCompactionCandidate candidate) { + return candidate.getFileCount() >= config.getTargetCompactionFileSize() || candidate.getTotalFileSize() >= config.getMaxInnerCompactionCandidateFileNum(); + } + + private boolean tsFileShouldBeSkipped(TsFileResource tsFileResource, int level) throws IOException { + TsFileNameGenerator.TsFileName currentFileName = + TsFileNameGenerator.getTsFileName(tsFileResource.getTsFile().getName()); + return currentFileName.getInnerCompactionCnt() != level; } /** @@ -157,19 +146,20 @@ public class SizeTieredCompactionSelector */ @Override public List<List<TsFileResource>> selectInnerSpaceTask(List<TsFileResource> tsFileResources) { - this.tsFileResources = tsFileResources; - PriorityQueue<Pair<List<TsFileResource>, Long>> taskPriorityQueue = + PriorityQueue<InnerCompactionCandidate> taskPriorityQueue = new PriorityQueue<>(new SizeTieredCompactionTaskComparator()); try { - int maxLevel = searchMaxFileLevel(); + int maxLevel = searchMaxFileLevel(tsFileResources); for (int currentLevel = 0; currentLevel <= maxLevel; currentLevel++) { - if (!selectLevelTask(currentLevel, taskPriorityQueue)) { + List<InnerCompactionCandidate> candidates = selectLevelTask(tsFileResources, currentLevel); + if (candidates.size() > 0) { + taskPriorityQueue.addAll(candidates); break; } } List<List<TsFileResource>> taskList = new LinkedList<>(); while (taskPriorityQueue.size() > 0) { - List<TsFileResource> resources = taskPriorityQueue.poll().left; + List<TsFileResource> resources = taskPriorityQueue.poll().getTsFileResources(); taskList.add(resources); } return taskList; @@ -179,7 +169,7 @@ public class SizeTieredCompactionSelector return Collections.emptyList(); } - private int searchMaxFileLevel() throws IOException { + private int searchMaxFileLevel(List<TsFileResource> tsFileResources) throws IOException { int maxLevel = -1; for (TsFileResource currentFile : tsFileResources) { TsFileNameGenerator.TsFileName currentName = @@ -192,12 +182,12 @@ public class SizeTieredCompactionSelector } private class SizeTieredCompactionTaskComparator - implements Comparator<Pair<List<TsFileResource>, Long>> { + implements Comparator<InnerCompactionCandidate> { @Override - public int compare(Pair<List<TsFileResource>, Long> o1, Pair<List<TsFileResource>, Long> o2) { - TsFileResource resourceOfO1 = o1.left.get(0); - TsFileResource resourceOfO2 = o2.left.get(0); + public int compare(InnerCompactionCandidate o1, InnerCompactionCandidate o2) { + TsFileResource resourceOfO1 = o1.getTsFileResources().get(0); + TsFileResource resourceOfO2 = o2.getTsFileResources().get(0); try { TsFileNameGenerator.TsFileName fileNameOfO1 = TsFileNameGenerator.getTsFileName(resourceOfO1.getTsFile().getName()); @@ -212,4 +202,19 @@ public class SizeTieredCompactionSelector } } } + + private class TsFileResourceInnerLevelComparator implements Comparator<TsFileResource> { + @Override + public int compare(TsFileResource o1, TsFileResource o2) { + try { + TsFileNameGenerator.TsFileName o1Name = + TsFileNameGenerator.getTsFileName(o1.getTsFile().getName()); + TsFileNameGenerator.TsFileName o2Name = + TsFileNameGenerator.getTsFileName(o2.getTsFile().getName()); + return o1Name.getInnerCompactionCnt() - o2Name.getInnerCompactionCnt(); + } catch (IOException e) { + return 0; + } + } + } }
