This is an automated email from the ASF dual-hosted git repository. marklau99 pushed a commit to branch fix-cannot-select-files-when-seq-empty in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit 64cf1a314f70aa2547130379cd5309c5065c5340 Author: Liu Xuxin <[email protected]> AuthorDate: Fri May 19 16:13:18 2023 +0800 add test --- .../compaction/schedule/CompactionScheduler.java | 43 +++++++++++++++------- .../compaction/schedule/CompactionTaskManager.java | 5 +++ .../iotdb/db/engine/storagegroup/DataRegion.java | 10 +++-- .../db/engine/storagegroup/TsFileManager.java | 1 + .../SizeTieredCompactionSelectorTest.java | 38 +++++++++++++++++++ 5 files changed, 80 insertions(+), 17 deletions(-) diff --git a/server/src/main/java/org/apache/iotdb/db/engine/compaction/schedule/CompactionScheduler.java b/server/src/main/java/org/apache/iotdb/db/engine/compaction/schedule/CompactionScheduler.java index 97a12af6707..387c1cf3e9e 100644 --- a/server/src/main/java/org/apache/iotdb/db/engine/compaction/schedule/CompactionScheduler.java +++ b/server/src/main/java/org/apache/iotdb/db/engine/compaction/schedule/CompactionScheduler.java @@ -52,26 +52,35 @@ public class CompactionScheduler { LoggerFactory.getLogger(IoTDBConstant.COMPACTION_LOGGER_NAME); private static IoTDBConfig config = IoTDBDescriptor.getInstance().getConfig(); - public static void scheduleCompaction(TsFileManager tsFileManager, long timePartition) { + /** + * Select compaction task and submit them to CompactionTaskManager + * + * @param tsFileManager tsfileManager that contains source files + * @param timePartition the time partition to execute the selection + * @return the count of submitted task + */ + public static int scheduleCompaction(TsFileManager tsFileManager, long timePartition) { if (!tsFileManager.isAllowCompaction()) { - return; + return 0; } + int submitCount = 0; try { - tryToSubmitCrossSpaceCompactionTask(tsFileManager, timePartition); - tryToSubmitInnerSpaceCompactionTask(tsFileManager, timePartition, true); - tryToSubmitInnerSpaceCompactionTask(tsFileManager, timePartition, false); + submitCount += tryToSubmitCrossSpaceCompactionTask(tsFileManager, timePartition); + submitCount += tryToSubmitInnerSpaceCompactionTask(tsFileManager, timePartition, true); + submitCount += tryToSubmitInnerSpaceCompactionTask(tsFileManager, timePartition, false); } catch (InterruptedException e) { LOGGER.error("Exception occurs when selecting compaction tasks", e); Thread.currentThread().interrupt(); } + return submitCount; } - public static void tryToSubmitInnerSpaceCompactionTask( + public static int tryToSubmitInnerSpaceCompactionTask( TsFileManager tsFileManager, long timePartition, boolean sequence) throws InterruptedException { if ((!config.isEnableSeqSpaceCompaction() && sequence) || (!config.isEnableUnseqSpaceCompaction() && !sequence)) { - return; + return 0; } String storageGroupName = tsFileManager.getStorageGroupName(); @@ -94,6 +103,7 @@ public class CompactionScheduler { sequence ? tsFileManager.getOrCreateSequenceListByTimePartition(timePartition) : tsFileManager.getOrCreateUnsequenceListByTimePartition(timePartition)); + int submitCount = 0; for (List<TsFileResource> task : taskList) { ICompactionPerformer performer = sequence @@ -105,7 +115,7 @@ public class CompactionScheduler { .getConfig() .getInnerUnseqCompactionPerformer() .createInstance(); - CompactionTaskManager.getInstance() + if (CompactionTaskManager.getInstance() .addTaskToWaitingQueue( new InnerSpaceCompactionTask( timePartition, @@ -114,15 +124,19 @@ public class CompactionScheduler { sequence, performer, CompactionTaskManager.currentTaskNum, - tsFileManager.getNextCompactionTaskId())); + tsFileManager.getNextCompactionTaskId()))) { + submitCount++; + } } + return submitCount; } - private static void tryToSubmitCrossSpaceCompactionTask( + private static int tryToSubmitCrossSpaceCompactionTask( TsFileManager tsFileManager, long timePartition) throws InterruptedException { if (!config.isEnableCrossSpaceCompaction()) { - return; + return 0; } + int submitCount = 0; String logicalStorageGroupName = tsFileManager.getStorageGroupName(); String dataRegionId = tsFileManager.getDataRegionId(); ICrossSpaceSelector crossSpaceCompactionSelector = @@ -138,7 +152,7 @@ public class CompactionScheduler { .map(CrossCompactionTaskResource::getTotalMemoryCost) .collect(Collectors.toList()); for (int i = 0, size = taskList.size(); i < size; ++i) { - CompactionTaskManager.getInstance() + if (CompactionTaskManager.getInstance() .addTaskToWaitingQueue( new CrossSpaceCompactionTask( timePartition, @@ -151,7 +165,10 @@ public class CompactionScheduler { .createInstance(), CompactionTaskManager.currentTaskNum, memoryCost.get(i), - tsFileManager.getNextCompactionTaskId())); + tsFileManager.getNextCompactionTaskId()))) { + submitCount++; + } } + return submitCount; } } diff --git a/server/src/main/java/org/apache/iotdb/db/engine/compaction/schedule/CompactionTaskManager.java b/server/src/main/java/org/apache/iotdb/db/engine/compaction/schedule/CompactionTaskManager.java index 158cdaa34f6..5814731a3c9 100644 --- a/server/src/main/java/org/apache/iotdb/db/engine/compaction/schedule/CompactionTaskManager.java +++ b/server/src/main/java/org/apache/iotdb/db/engine/compaction/schedule/CompactionTaskManager.java @@ -71,6 +71,7 @@ public class CompactionTaskManager implements IService { private WrappedThreadPoolExecutor subCompactionTaskExecutionPool; public static volatile AtomicInteger currentTaskNum = new AtomicInteger(0); + private final FixedPriorityBlockingQueue<AbstractCompactionTask> candidateCompactionTaskQueue = new FixedPriorityBlockingQueue<>( config.getCandidateCompactionTaskQueueSize(), new DefaultCompactionTaskComparatorImpl()); @@ -456,4 +457,8 @@ public class CompactionTaskManager implements IService { } return storageGroupTasks.get(regionWithSG).get(task); } + + public FixedPriorityBlockingQueue<AbstractCompactionTask> getCandidateCompactionTaskQueue() { + return candidateCompactionTaskQueue; + } } diff --git a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/DataRegion.java b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/DataRegion.java index 42772794dfd..185a2794194 100644 --- a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/DataRegion.java +++ b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/DataRegion.java @@ -2147,17 +2147,19 @@ public class DataRegion implements IDataRegionForQuery { logger.info("signal closing database condition in {}", databaseName + "-" + dataRegionId); } - protected void executeCompaction() { + protected int executeCompaction() { + int submittedTask = 0; try { List<Long> timePartitions = new ArrayList<>(tsFileManager.getTimePartitions()); // sort the time partition from largest to smallest timePartitions.sort(Comparator.reverseOrder()); for (long timePartition : timePartitions) { - CompactionScheduler.scheduleCompaction(tsFileManager, timePartition); + submittedTask += CompactionScheduler.scheduleCompaction(tsFileManager, timePartition); } } catch (Throwable e) { logger.error("Meet error in compaction schedule.", e); } + return submittedTask; } /** @@ -2288,10 +2290,10 @@ public class DataRegion implements IDataRegionForQuery { } /** merge file under this database processor */ - public void compact() { + public int compact() { writeLock("merge"); try { - executeCompaction(); + return executeCompaction(); } finally { writeUnlock(); } diff --git a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileManager.java b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileManager.java index 86fa5e742e0..c8ff4534099 100644 --- a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileManager.java +++ b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileManager.java @@ -346,6 +346,7 @@ public class TsFileManager { readLock(); try { Set<Long> timePartitions = new HashSet<>(sequenceFiles.keySet()); + timePartitions.addAll(unsequenceFiles.keySet()); return timePartitions; } finally { readUnlock(); diff --git a/server/src/test/java/org/apache/iotdb/db/engine/compaction/inner/sizetiered/SizeTieredCompactionSelectorTest.java b/server/src/test/java/org/apache/iotdb/db/engine/compaction/inner/sizetiered/SizeTieredCompactionSelectorTest.java index 730a4e56795..7941628e8f6 100644 --- a/server/src/test/java/org/apache/iotdb/db/engine/compaction/inner/sizetiered/SizeTieredCompactionSelectorTest.java +++ b/server/src/test/java/org/apache/iotdb/db/engine/compaction/inner/sizetiered/SizeTieredCompactionSelectorTest.java @@ -20,14 +20,19 @@ package org.apache.iotdb.db.engine.compaction.inner.sizetiered; import org.apache.iotdb.db.conf.IoTDBDescriptor; +import org.apache.iotdb.db.engine.compaction.schedule.CompactionTaskManager; import org.apache.iotdb.db.engine.compaction.selector.impl.SizeTieredCompactionSelector; +import org.apache.iotdb.db.engine.storagegroup.DataRegion; import org.apache.iotdb.db.engine.storagegroup.FakedTsFileResource; import org.apache.iotdb.db.engine.storagegroup.TsFileManager; import org.apache.iotdb.db.engine.storagegroup.TsFileResource; +import org.apache.iotdb.db.engine.storagegroup.TsFileResourceStatus; import org.junit.Assert; import org.junit.Test; +import org.mockito.Mockito; +import java.io.File; import java.util.ArrayList; import java.util.List; @@ -65,4 +70,37 @@ public class SizeTieredCompactionSelectorTest { .selectInnerSpaceTask(manager.getOrCreateSequenceListByTimePartition(9)) .size()); } + + @Test + public void testSubmitWhenSequenceFileIsEmpty() throws Exception { + DataRegion region = new DataRegion("root.test", "1"); + TsFileManager manager = region.getTsFileManager(); + int originCandidate = + IoTDBDescriptor.getInstance().getConfig().getMaxInnerCompactionCandidateFileNum(); + IoTDBDescriptor.getInstance().getConfig().setMaxInnerCompactionCandidateFileNum(30); + boolean enableUnseqCompaction = + IoTDBDescriptor.getInstance().getConfig().isEnableUnseqSpaceCompaction(); + IoTDBDescriptor.getInstance().getConfig().setEnableUnseqSpaceCompaction(true); + CompactionTaskManager.getInstance().start(); + try { + for (int i = 1; i < 91; ++i) { + TsFileResource resource = Mockito.mock(TsFileResource.class); + Mockito.when(resource.getTimePartition()).thenReturn(0L); + Mockito.when(resource.getTsFileSize()).thenReturn(100L); + Mockito.when(resource.getTsFile()) + .thenReturn(new File(String.format("%d-%d-0-0.tsfile", i, i))); + Mockito.when(resource.getStatus()).thenReturn(TsFileResourceStatus.NORMAL); + manager.add(resource, false); + } + Assert.assertEquals(3, region.compact()); + } finally { + IoTDBDescriptor.getInstance() + .getConfig() + .setMaxInnerCompactionCandidateFileNum(originCandidate); + IoTDBDescriptor.getInstance() + .getConfig() + .setEnableUnseqSpaceCompaction(enableUnseqCompaction); + CompactionTaskManager.getInstance().shutdown(60_000L); + } + } }
