This is an automated email from the ASF dual-hosted git repository. marklau99 pushed a commit to branch fix-cannot-select-files-when-seq-empty in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit 60869a0a5b9ea9db5471787a565833154a78661d Author: Liu Xuxin <[email protected]> AuthorDate: Sat May 20 10:44:21 2023 +0800 [To rel/1.1] [IOTDB-5903] Fix cannot select any inner space compaction task when there is only unsequence data (#9892) --- .../compaction/schedule/CompactionScheduler.java | 49 ++++++++++++++++------ .../compaction/schedule/CompactionTaskManager.java | 1 + .../iotdb/db/engine/storagegroup/DataRegion.java | 12 ++++-- .../db/engine/storagegroup/TsFileManager.java | 1 + .../SizeTieredCompactionSelectorTest.java | 38 +++++++++++++++++ 5 files changed, 84 insertions(+), 17 deletions(-) diff --git a/server/src/main/java/org/apache/iotdb/db/engine/compaction/schedule/CompactionScheduler.java b/server/src/main/java/org/apache/iotdb/db/engine/compaction/schedule/CompactionScheduler.java index 97a12af6707..5c3f1f80269 100644 --- a/server/src/main/java/org/apache/iotdb/db/engine/compaction/schedule/CompactionScheduler.java +++ b/server/src/main/java/org/apache/iotdb/db/engine/compaction/schedule/CompactionScheduler.java @@ -52,26 +52,37 @@ public class CompactionScheduler { LoggerFactory.getLogger(IoTDBConstant.COMPACTION_LOGGER_NAME); private static IoTDBConfig config = IoTDBDescriptor.getInstance().getConfig(); - public static void scheduleCompaction(TsFileManager tsFileManager, long timePartition) { + /** + * Select compaction task and submit them to CompactionTaskManager. + * + * @param tsFileManager tsfileManager that contains source files + * @param timePartition the time partition to execute the selection + * @return the count of submitted task + */ + public static int scheduleCompaction(TsFileManager tsFileManager, long timePartition) { if (!tsFileManager.isAllowCompaction()) { - return; + return 0; } + // the name of this variable is trySubmitCount, because the task submitted to the queue could be + // evicted due to the low priority of the task + int trySubmitCount = 0; try { - tryToSubmitCrossSpaceCompactionTask(tsFileManager, timePartition); - tryToSubmitInnerSpaceCompactionTask(tsFileManager, timePartition, true); - tryToSubmitInnerSpaceCompactionTask(tsFileManager, timePartition, false); + trySubmitCount += tryToSubmitCrossSpaceCompactionTask(tsFileManager, timePartition); + trySubmitCount += tryToSubmitInnerSpaceCompactionTask(tsFileManager, timePartition, true); + trySubmitCount += tryToSubmitInnerSpaceCompactionTask(tsFileManager, timePartition, false); } catch (InterruptedException e) { LOGGER.error("Exception occurs when selecting compaction tasks", e); Thread.currentThread().interrupt(); } + return trySubmitCount; } - public static void tryToSubmitInnerSpaceCompactionTask( + public static int tryToSubmitInnerSpaceCompactionTask( TsFileManager tsFileManager, long timePartition, boolean sequence) throws InterruptedException { if ((!config.isEnableSeqSpaceCompaction() && sequence) || (!config.isEnableUnseqSpaceCompaction() && !sequence)) { - return; + return 0; } String storageGroupName = tsFileManager.getStorageGroupName(); @@ -94,6 +105,9 @@ public class CompactionScheduler { sequence ? tsFileManager.getOrCreateSequenceListByTimePartition(timePartition) : tsFileManager.getOrCreateUnsequenceListByTimePartition(timePartition)); + // the name of this variable is trySubmitCount, because the task submitted to the queue could be + // evicted due to the low priority of the task + int trySubmitCount = 0; for (List<TsFileResource> task : taskList) { ICompactionPerformer performer = sequence @@ -105,7 +119,7 @@ public class CompactionScheduler { .getConfig() .getInnerUnseqCompactionPerformer() .createInstance(); - CompactionTaskManager.getInstance() + if (CompactionTaskManager.getInstance() .addTaskToWaitingQueue( new InnerSpaceCompactionTask( timePartition, @@ -114,14 +128,17 @@ public class CompactionScheduler { sequence, performer, CompactionTaskManager.currentTaskNum, - tsFileManager.getNextCompactionTaskId())); + tsFileManager.getNextCompactionTaskId()))) { + trySubmitCount++; + } } + return trySubmitCount; } - private static void tryToSubmitCrossSpaceCompactionTask( + private static int tryToSubmitCrossSpaceCompactionTask( TsFileManager tsFileManager, long timePartition) throws InterruptedException { if (!config.isEnableCrossSpaceCompaction()) { - return; + return 0; } String logicalStorageGroupName = tsFileManager.getStorageGroupName(); String dataRegionId = tsFileManager.getDataRegionId(); @@ -137,8 +154,11 @@ public class CompactionScheduler { taskList.stream() .map(CrossCompactionTaskResource::getTotalMemoryCost) .collect(Collectors.toList()); + // the name of this variable is trySubmitCount, because the task submitted to the queue could be + // evicted due to the low priority of the task + int trySubmitCount = 0; for (int i = 0, size = taskList.size(); i < size; ++i) { - CompactionTaskManager.getInstance() + if (CompactionTaskManager.getInstance() .addTaskToWaitingQueue( new CrossSpaceCompactionTask( timePartition, @@ -151,7 +171,10 @@ public class CompactionScheduler { .createInstance(), CompactionTaskManager.currentTaskNum, memoryCost.get(i), - tsFileManager.getNextCompactionTaskId())); + tsFileManager.getNextCompactionTaskId()))) { + trySubmitCount++; + } } + return trySubmitCount; } } diff --git a/server/src/main/java/org/apache/iotdb/db/engine/compaction/schedule/CompactionTaskManager.java b/server/src/main/java/org/apache/iotdb/db/engine/compaction/schedule/CompactionTaskManager.java index 158cdaa34f6..024b02fc9bf 100644 --- a/server/src/main/java/org/apache/iotdb/db/engine/compaction/schedule/CompactionTaskManager.java +++ b/server/src/main/java/org/apache/iotdb/db/engine/compaction/schedule/CompactionTaskManager.java @@ -71,6 +71,7 @@ public class CompactionTaskManager implements IService { private WrappedThreadPoolExecutor subCompactionTaskExecutionPool; public static volatile AtomicInteger currentTaskNum = new AtomicInteger(0); + private final FixedPriorityBlockingQueue<AbstractCompactionTask> candidateCompactionTaskQueue = new FixedPriorityBlockingQueue<>( config.getCandidateCompactionTaskQueueSize(), new DefaultCompactionTaskComparatorImpl()); diff --git a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/DataRegion.java b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/DataRegion.java index a5a3d7e20be..4b4d4cbd393 100644 --- a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/DataRegion.java +++ b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/DataRegion.java @@ -2187,17 +2187,21 @@ public class DataRegion implements IDataRegionForQuery { logger.info("signal closing database condition in {}", databaseName + "-" + dataRegionId); } - protected void executeCompaction() { + protected int executeCompaction() { + // the name of this variable is trySubmitCount, because the task submitted to the queue could be + // evicted due to the low priority of the task + int trySubmitCount = 0; try { List<Long> timePartitions = new ArrayList<>(tsFileManager.getTimePartitions()); // sort the time partition from largest to smallest timePartitions.sort(Comparator.reverseOrder()); for (long timePartition : timePartitions) { - CompactionScheduler.scheduleCompaction(tsFileManager, timePartition); + trySubmitCount += CompactionScheduler.scheduleCompaction(tsFileManager, timePartition); } } catch (Throwable e) { logger.error("Meet error in compaction schedule.", e); } + return trySubmitCount; } /** @@ -2328,10 +2332,10 @@ public class DataRegion implements IDataRegionForQuery { } /** merge file under this database processor */ - public void compact() { + public int compact() { writeLock("merge"); try { - executeCompaction(); + return executeCompaction(); } finally { writeUnlock(); } diff --git a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileManager.java b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileManager.java index 86fa5e742e0..c8ff4534099 100644 --- a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileManager.java +++ b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileManager.java @@ -346,6 +346,7 @@ public class TsFileManager { readLock(); try { Set<Long> timePartitions = new HashSet<>(sequenceFiles.keySet()); + timePartitions.addAll(unsequenceFiles.keySet()); return timePartitions; } finally { readUnlock(); diff --git a/server/src/test/java/org/apache/iotdb/db/engine/compaction/inner/sizetiered/SizeTieredCompactionSelectorTest.java b/server/src/test/java/org/apache/iotdb/db/engine/compaction/inner/sizetiered/SizeTieredCompactionSelectorTest.java index 730a4e56795..7941628e8f6 100644 --- a/server/src/test/java/org/apache/iotdb/db/engine/compaction/inner/sizetiered/SizeTieredCompactionSelectorTest.java +++ b/server/src/test/java/org/apache/iotdb/db/engine/compaction/inner/sizetiered/SizeTieredCompactionSelectorTest.java @@ -20,14 +20,19 @@ package org.apache.iotdb.db.engine.compaction.inner.sizetiered; import org.apache.iotdb.db.conf.IoTDBDescriptor; +import org.apache.iotdb.db.engine.compaction.schedule.CompactionTaskManager; import org.apache.iotdb.db.engine.compaction.selector.impl.SizeTieredCompactionSelector; +import org.apache.iotdb.db.engine.storagegroup.DataRegion; import org.apache.iotdb.db.engine.storagegroup.FakedTsFileResource; import org.apache.iotdb.db.engine.storagegroup.TsFileManager; import org.apache.iotdb.db.engine.storagegroup.TsFileResource; +import org.apache.iotdb.db.engine.storagegroup.TsFileResourceStatus; import org.junit.Assert; import org.junit.Test; +import org.mockito.Mockito; +import java.io.File; import java.util.ArrayList; import java.util.List; @@ -65,4 +70,37 @@ public class SizeTieredCompactionSelectorTest { .selectInnerSpaceTask(manager.getOrCreateSequenceListByTimePartition(9)) .size()); } + + @Test + public void testSubmitWhenSequenceFileIsEmpty() throws Exception { + DataRegion region = new DataRegion("root.test", "1"); + TsFileManager manager = region.getTsFileManager(); + int originCandidate = + IoTDBDescriptor.getInstance().getConfig().getMaxInnerCompactionCandidateFileNum(); + IoTDBDescriptor.getInstance().getConfig().setMaxInnerCompactionCandidateFileNum(30); + boolean enableUnseqCompaction = + IoTDBDescriptor.getInstance().getConfig().isEnableUnseqSpaceCompaction(); + IoTDBDescriptor.getInstance().getConfig().setEnableUnseqSpaceCompaction(true); + CompactionTaskManager.getInstance().start(); + try { + for (int i = 1; i < 91; ++i) { + TsFileResource resource = Mockito.mock(TsFileResource.class); + Mockito.when(resource.getTimePartition()).thenReturn(0L); + Mockito.when(resource.getTsFileSize()).thenReturn(100L); + Mockito.when(resource.getTsFile()) + .thenReturn(new File(String.format("%d-%d-0-0.tsfile", i, i))); + Mockito.when(resource.getStatus()).thenReturn(TsFileResourceStatus.NORMAL); + manager.add(resource, false); + } + Assert.assertEquals(3, region.compact()); + } finally { + IoTDBDescriptor.getInstance() + .getConfig() + .setMaxInnerCompactionCandidateFileNum(originCandidate); + IoTDBDescriptor.getInstance() + .getConfig() + .setEnableUnseqSpaceCompaction(enableUnseqCompaction); + CompactionTaskManager.getInstance().shutdown(60_000L); + } + } }
