This is an automated email from the ASF dual-hosted git repository. zyk pushed a commit to branch rc/1.1.0 in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit aaa86ca46185a307a687aa3680615d192703dd61 Author: 周沛辰 <[email protected]> AuthorDate: Thu Mar 16 10:03:05 2023 +0800 allow submitting tasks when queue is full (#9305) --- .../compaction/schedule/CompactionScheduler.java | 100 +++++++-------------- .../compaction/schedule/CompactionTaskManager.java | 3 +- .../impl/RewriteCrossSpaceCompactionSelector.java | 5 +- .../impl/SizeTieredCompactionSelector.java | 27 ++---- .../inner/InnerCompactionSchedulerTest.java | 6 +- 5 files changed, 45 insertions(+), 96 deletions(-) diff --git a/server/src/main/java/org/apache/iotdb/db/engine/compaction/schedule/CompactionScheduler.java b/server/src/main/java/org/apache/iotdb/db/engine/compaction/schedule/CompactionScheduler.java index d610fc5a4d..97a12af670 100644 --- a/server/src/main/java/org/apache/iotdb/db/engine/compaction/schedule/CompactionScheduler.java +++ b/server/src/main/java/org/apache/iotdb/db/engine/compaction/schedule/CompactionScheduler.java @@ -34,7 +34,6 @@ import org.apache.iotdb.db.engine.storagegroup.TsFileResource; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.util.Collections; import java.util.List; import java.util.stream.Collectors; @@ -59,23 +58,26 @@ public class CompactionScheduler { } try { tryToSubmitCrossSpaceCompactionTask(tsFileManager, timePartition); - tryToSubmitInnerSpaceCompactionTask(tsFileManager, timePartition); + tryToSubmitInnerSpaceCompactionTask(tsFileManager, timePartition, true); + tryToSubmitInnerSpaceCompactionTask(tsFileManager, timePartition, false); } catch (InterruptedException e) { LOGGER.error("Exception occurs when selecting compaction tasks", e); Thread.currentThread().interrupt(); } } - private static List<List<TsFileResource>> selectInnerSpaceCompactionTask( - long timePartition, TsFileManager tsFileManager, boolean sequence) { + public static void tryToSubmitInnerSpaceCompactionTask( + TsFileManager tsFileManager, long timePartition, boolean sequence) + throws InterruptedException { if ((!config.isEnableSeqSpaceCompaction() && sequence) || (!config.isEnableUnseqSpaceCompaction() && !sequence)) { - return Collections.emptyList(); + return; } + String storageGroupName = tsFileManager.getStorageGroupName(); String dataRegionId = tsFileManager.getDataRegionId(); - ICompactionSelector innerSpaceCompactionSelector = null; + ICompactionSelector innerSpaceCompactionSelector; if (sequence) { innerSpaceCompactionSelector = config @@ -87,69 +89,35 @@ public class CompactionScheduler { .getInnerUnsequenceCompactionSelector() .createInstance(storageGroupName, dataRegionId, timePartition, tsFileManager); } - - return innerSpaceCompactionSelector.selectInnerSpaceTask( - sequence - ? tsFileManager.getOrCreateSequenceListByTimePartition(timePartition) - : tsFileManager.getOrCreateUnsequenceListByTimePartition(timePartition)); - } - - public static void tryToSubmitInnerSpaceCompactionTask( - TsFileManager tsFileManager, long timePartition) throws InterruptedException { - List<List<TsFileResource>> seqTaskList = - selectInnerSpaceCompactionTask(timePartition, tsFileManager, true); - List<List<TsFileResource>> unseqTaskList = - selectInnerSpaceCompactionTask(timePartition, tsFileManager, false); - int taskFreeSize = - config.getCandidateCompactionTaskQueueSize() - - CompactionTaskManager.getInstance().getCompactionCandidateTaskCount(); - int taskSize = Math.max(seqTaskList.size(), unseqTaskList.size()); - for (int i = 0; i < taskSize; i++) { - if (taskFreeSize <= 0) { - break; - } - // submit one seq inner space task - if (i < seqTaskList.size()) { - submitInnerTask(seqTaskList.get(i), tsFileManager, timePartition, true); - taskFreeSize--; - } - - // submit one unseq inner space task - if (i < unseqTaskList.size()) { - submitInnerTask(unseqTaskList.get(i), tsFileManager, timePartition, false); - taskFreeSize--; - } + List<List<TsFileResource>> taskList = + innerSpaceCompactionSelector.selectInnerSpaceTask( + sequence + ? tsFileManager.getOrCreateSequenceListByTimePartition(timePartition) + : tsFileManager.getOrCreateUnsequenceListByTimePartition(timePartition)); + for (List<TsFileResource> task : taskList) { + ICompactionPerformer performer = + sequence + ? IoTDBDescriptor.getInstance() + .getConfig() + .getInnerSeqCompactionPerformer() + .createInstance() + : IoTDBDescriptor.getInstance() + .getConfig() + .getInnerUnseqCompactionPerformer() + .createInstance(); + CompactionTaskManager.getInstance() + .addTaskToWaitingQueue( + new InnerSpaceCompactionTask( + timePartition, + tsFileManager, + task, + sequence, + performer, + CompactionTaskManager.currentTaskNum, + tsFileManager.getNextCompactionTaskId())); } } - private static void submitInnerTask( - List<TsFileResource> taskList, - TsFileManager tsFileManager, - long timePartition, - boolean sequence) - throws InterruptedException { - ICompactionPerformer performer = - sequence - ? IoTDBDescriptor.getInstance() - .getConfig() - .getInnerSeqCompactionPerformer() - .createInstance() - : IoTDBDescriptor.getInstance() - .getConfig() - .getInnerUnseqCompactionPerformer() - .createInstance(); - CompactionTaskManager.getInstance() - .addTaskToWaitingQueue( - new InnerSpaceCompactionTask( - timePartition, - tsFileManager, - taskList, - sequence, - performer, - CompactionTaskManager.currentTaskNum, - tsFileManager.getNextCompactionTaskId())); - } - private static void tryToSubmitCrossSpaceCompactionTask( TsFileManager tsFileManager, long timePartition) throws InterruptedException { if (!config.isEnableCrossSpaceCompaction()) { diff --git a/server/src/main/java/org/apache/iotdb/db/engine/compaction/schedule/CompactionTaskManager.java b/server/src/main/java/org/apache/iotdb/db/engine/compaction/schedule/CompactionTaskManager.java index d75401820d..04f2cda472 100644 --- a/server/src/main/java/org/apache/iotdb/db/engine/compaction/schedule/CompactionTaskManager.java +++ b/server/src/main/java/org/apache/iotdb/db/engine/compaction/schedule/CompactionTaskManager.java @@ -221,8 +221,7 @@ public class CompactionTaskManager implements IService { throws InterruptedException { if (init && !candidateCompactionTaskQueue.contains(compactionTask) - && !isTaskRunning(compactionTask) - && candidateCompactionTaskQueue.size() < config.getCandidateCompactionTaskQueueSize()) { + && !isTaskRunning(compactionTask)) { compactionTask.setSourceFilesToCompactionCandidate(); candidateCompactionTaskQueue.put(compactionTask); diff --git a/server/src/main/java/org/apache/iotdb/db/engine/compaction/selector/impl/RewriteCrossSpaceCompactionSelector.java b/server/src/main/java/org/apache/iotdb/db/engine/compaction/selector/impl/RewriteCrossSpaceCompactionSelector.java index af3f383202..a6ed828827 100644 --- a/server/src/main/java/org/apache/iotdb/db/engine/compaction/selector/impl/RewriteCrossSpaceCompactionSelector.java +++ b/server/src/main/java/org/apache/iotdb/db/engine/compaction/selector/impl/RewriteCrossSpaceCompactionSelector.java @@ -208,10 +208,7 @@ public class RewriteCrossSpaceCompactionSelector implements ICrossSpaceSelector private boolean canSubmitCrossTask( List<TsFileResource> sequenceFileList, List<TsFileResource> unsequenceFileList) { - return CompactionTaskManager.getInstance().getCompactionCandidateTaskCount() - < config.getCandidateCompactionTaskQueueSize() - && !sequenceFileList.isEmpty() - && !unsequenceFileList.isEmpty(); + return !sequenceFileList.isEmpty() && !unsequenceFileList.isEmpty(); } /** diff --git a/server/src/main/java/org/apache/iotdb/db/engine/compaction/selector/impl/SizeTieredCompactionSelector.java b/server/src/main/java/org/apache/iotdb/db/engine/compaction/selector/impl/SizeTieredCompactionSelector.java index c61a46485b..df88248bf2 100644 --- a/server/src/main/java/org/apache/iotdb/db/engine/compaction/selector/impl/SizeTieredCompactionSelector.java +++ b/server/src/main/java/org/apache/iotdb/db/engine/compaction/selector/impl/SizeTieredCompactionSelector.java @@ -107,9 +107,7 @@ public class SizeTieredCompactionSelector if (currentName.getInnerCompactionCnt() != level) { // meet files of another level if (selectedFileList.size() > 1) { - if (!addOneTaskToQueue(taskPriorityQueue, selectedFileList, selectedFileSize)) { - return false; - } + taskPriorityQueue.add(new Pair<>(new ArrayList<>(selectedFileList), selectedFileSize)); shouldContinueToSearch = false; } selectedFileList = new ArrayList<>(); @@ -134,9 +132,7 @@ public class SizeTieredCompactionSelector || selectedFileList.size() >= config.getMaxInnerCompactionCandidateFileNum()) { // submit the task if (selectedFileList.size() > 1) { - if (!addOneTaskToQueue(taskPriorityQueue, selectedFileList, selectedFileSize)) { - return false; - } + taskPriorityQueue.add(new Pair<>(new ArrayList<>(selectedFileList), selectedFileSize)); shouldContinueToSearch = false; } selectedFileList = new ArrayList<>(); @@ -147,25 +143,12 @@ public class SizeTieredCompactionSelector // if next time partition exists // submit a merge task even it does not meet the requirement for file num or file size if (hasNextTimePartition && selectedFileList.size() > 1) { - addOneTaskToQueue(taskPriorityQueue, selectedFileList, selectedFileSize); + taskPriorityQueue.add(new Pair<>(new ArrayList<>(selectedFileList), selectedFileSize)); shouldContinueToSearch = false; } return shouldContinueToSearch; } - private boolean addOneTaskToQueue( - PriorityQueue<Pair<List<TsFileResource>, Long>> taskPriorityQueue, - List<TsFileResource> selectedFileList, - long selectedFileSize) { - if (CompactionTaskManager.getInstance().getCompactionCandidateTaskCount() - + taskPriorityQueue.size() - < config.getCandidateCompactionTaskQueueSize()) { - taskPriorityQueue.add(new Pair<>(new ArrayList<>(selectedFileList), selectedFileSize)); - return true; - } - return false; - } - /** * This method searches for a batch of files to be compacted from layer 0 to the highest layer. If * there are more than a batch of files to be merged on a certain layer, it does not search to @@ -223,9 +206,11 @@ public class SizeTieredCompactionSelector TsFileNameGenerator.TsFileName fileNameOfO2 = TsFileNameGenerator.getTsFileName(resourceOfO2.getTsFile().getName()); if (fileNameOfO1.getInnerCompactionCnt() != fileNameOfO2.getInnerCompactionCnt()) { + // the higher the inner compaction count, the higher the priority is return fileNameOfO2.getInnerCompactionCnt() - fileNameOfO1.getInnerCompactionCnt(); } - return (int) (fileNameOfO1.getVersion() - fileNameOfO2.getVersion()); + // the larger the version number, the higher the priority is + return (int) (fileNameOfO2.getVersion() - fileNameOfO1.getVersion()); } catch (IOException e) { return 0; } diff --git a/server/src/test/java/org/apache/iotdb/db/engine/compaction/inner/InnerCompactionSchedulerTest.java b/server/src/test/java/org/apache/iotdb/db/engine/compaction/inner/InnerCompactionSchedulerTest.java index 99a7e98190..d1f907fa34 100644 --- a/server/src/test/java/org/apache/iotdb/db/engine/compaction/inner/InnerCompactionSchedulerTest.java +++ b/server/src/test/java/org/apache/iotdb/db/engine/compaction/inner/InnerCompactionSchedulerTest.java @@ -94,7 +94,7 @@ public class InnerCompactionSchedulerTest extends AbstractCompactionTest { TsFileManager tsFileManager = new TsFileManager("testSG", "0", "tmp"); tsFileManager.addAll(seqResources, true); - CompactionScheduler.tryToSubmitInnerSpaceCompactionTask(tsFileManager, 0L); + CompactionScheduler.tryToSubmitInnerSpaceCompactionTask(tsFileManager, 0L, true); try { Thread.sleep(5000); } catch (Exception e) { @@ -115,7 +115,7 @@ public class InnerCompactionSchedulerTest extends AbstractCompactionTest { seqResources.get(0).setStatus(TsFileResourceStatus.COMPACTING); TsFileManager tsFileManager = new TsFileManager("testSG", "0", "tmp"); tsFileManager.addAll(seqResources, true); - CompactionScheduler.tryToSubmitInnerSpaceCompactionTask(tsFileManager, 0L); + CompactionScheduler.tryToSubmitInnerSpaceCompactionTask(tsFileManager, 0L, true); long waitingTime = 0; while (CompactionTaskManager.getInstance().getExecutingTaskCount() != 0) { @@ -144,7 +144,7 @@ public class InnerCompactionSchedulerTest extends AbstractCompactionTest { seqResources.get(3).setStatus(TsFileResourceStatus.UNCLOSED); TsFileManager tsFileManager = new TsFileManager("testSG", "0", "tmp"); tsFileManager.addAll(seqResources, true); - CompactionScheduler.tryToSubmitInnerSpaceCompactionTask(tsFileManager, 0L); + CompactionScheduler.tryToSubmitInnerSpaceCompactionTask(tsFileManager, 0L, true); long waitingTime = 0; while (CompactionTaskManager.getInstance().getExecutingTaskCount() != 0) { try {
