This is an automated email from the ASF dual-hosted git repository.
haonan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new fde91579d17 Abort old task when compaction config is modified (#12611)
fde91579d17 is described below
commit fde91579d17be22904e6dc106757d0680147154e
Author: shuwenwei <[email protected]>
AuthorDate: Wed May 29 19:07:43 2024 +0800
Abort old task when compaction config is modified (#12611)
---
.../java/org/apache/iotdb/db/conf/IoTDBConfig.java | 24 +++----
.../org/apache/iotdb/db/conf/IoTDBDescriptor.java | 77 +++++++++++++++++-----
.../execute/task/AbstractCompactionTask.java | 11 ++++
.../execute/task/CrossSpaceCompactionTask.java | 19 ++++++
.../execute/task/InnerSpaceCompactionTask.java | 21 ++++++
.../compaction/schedule/CompactionScheduler.java | 34 ++++++----
.../compaction/schedule/CompactionTaskManager.java | 13 +++-
.../compaction/schedule/CompactionTaskQueue.java | 2 +
.../compaction/CompactionOverlapCheckTest.java | 6 ++
.../FastInnerCompactionPerformerTest.java | 7 ++
.../resources/conf/iotdb-common.properties | 2 +-
11 files changed, 170 insertions(+), 46 deletions(-)
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
index 90afe2cc395..111979a07c4 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
@@ -429,13 +429,13 @@ public class IoTDBConfig {
private int avgSeriesPointNumberThreshold = 100000;
/** Enable inner space compaction for sequence files */
- private boolean enableSeqSpaceCompaction = true;
+ private volatile boolean enableSeqSpaceCompaction = true;
/** Enable inner space compaction for unsequence files */
- private boolean enableUnseqSpaceCompaction = true;
+ private volatile boolean enableUnseqSpaceCompaction = true;
/** Compact the unsequence files into the overlapped sequence files */
- private boolean enableCrossSpaceCompaction = true;
+ private volatile boolean enableCrossSpaceCompaction = true;
/** The buffer for sort operation */
private long sortBufferSize = 1024 * 1024L;
@@ -502,22 +502,22 @@ public class IoTDBConfig {
private long compactionAcquireWriteLockTimeout = 60_000L;
/** The max candidate file num in one inner space compaction task */
- private int fileLimitPerInnerTask = 30;
+ private volatile int fileLimitPerInnerTask = 30;
/** The max candidate file num in one cross space compaction task */
- private int fileLimitPerCrossTask = 500;
+ private volatile int fileLimitPerCrossTask = 500;
/** The max candidate file num in compaction */
- private int totalFileLimitForCompactionTask = 5000;
+ private volatile int totalFileLimitForCompactionTask = 5000;
/** The max total size of candidate files in one cross space compaction task
*/
- private long maxCrossCompactionCandidateFileSize = 1024 * 1024 * 1024 * 5L;
+ private volatile long maxCrossCompactionCandidateFileSize = 1024 * 1024 *
1024 * 5L;
/**
* Only the unseq files whose level of inner space compaction reaches this
value can be selected
* to participate in the cross space compaction.
*/
- private int minCrossCompactionUnseqFileLevel = 1;
+ private volatile int minCrossCompactionUnseqFileLevel = 1;
/** The interval of compaction task schedulation in each virtual database.
The unit is ms. */
private long compactionScheduleIntervalInMs = 60_000L;
@@ -534,23 +534,23 @@ public class IoTDBConfig {
/** The number of threads to be set up to select compaction task. */
private int compactionScheduleThreadNum = 4;
- private boolean enableTsFileValidation = false;
+ private volatile boolean enableTsFileValidation = false;
/** The size of candidate compaction task queue. */
- private int candidateCompactionTaskQueueSize = 200;
+ private int candidateCompactionTaskQueueSize = 50;
/**
* When the size of the mods file corresponding to TsFile exceeds this
value, inner compaction
* tasks containing mods files are selected first.
*/
- private long innerCompactionTaskSelectionModsFileThreshold = 10 * 1024 *
1024L;
+ private volatile long innerCompactionTaskSelectionModsFileThreshold = 10 *
1024 * 1024L;
/**
* When disk availability is lower than the sum of
(disk_space_warning_threshold +
* inner_compaction_task_selection_disk_redundancy), inner compaction tasks
containing mods files
* are selected first.
*/
- private double innerCompactionTaskSelectionDiskRedundancy = 0.05;
+ private volatile double innerCompactionTaskSelectionDiskRedundancy = 0.05;
/** The size of global compaction estimation file info cahce. */
private int globalCompactionFileInfoCacheSize = 1000;
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java
index 659026f8e24..c7d0613ac9d 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java
@@ -1147,6 +1147,10 @@ public class IoTDBDescriptor {
}
private void loadCompactionHotModifiedProps(Properties properties) throws
InterruptedException {
+ boolean compactionTaskConfigHotModified =
loadCompactionTaskHotModifiedProps(properties);
+ if (compactionTaskConfigHotModified) {
+ CompactionTaskManager.getInstance().incrCompactionConfigVersion();
+ }
// hot load compaction schedule task manager configurations
int compactionScheduleThreadNum =
Integer.parseInt(
@@ -1165,69 +1169,118 @@ public class IoTDBDescriptor {
if (restartCompactionTaskManager) {
CompactionTaskManager.getInstance().restart();
}
+
// hot load compaction rate limit configurations
+ CompactionTaskManager.getInstance()
+
.setCompactionReadOperationRate(conf.getCompactionReadOperationPerSec());
+ CompactionTaskManager.getInstance()
+
.setCompactionReadThroughputRate(conf.getCompactionReadThroughputMbPerSec());
+ CompactionTaskManager.getInstance()
+ .setWriteMergeRate(conf.getCompactionWriteThroughputMbPerSec());
+ }
+ private boolean loadCompactionTaskHotModifiedProps(Properties properties) {
+ boolean configModified = false;
// update merge_write_throughput_mb_per_sec
+ int compactionWriteThroughput =
conf.getCompactionWriteThroughputMbPerSec();
conf.setCompactionWriteThroughputMbPerSec(
Integer.parseInt(
properties.getProperty(
"compaction_write_throughput_mb_per_sec",
Integer.toString(conf.getCompactionWriteThroughputMbPerSec()))));
+ configModified |= compactionWriteThroughput !=
conf.getCompactionWriteThroughputMbPerSec();
// update compaction_read_operation_per_sec
+ int compactionReadOperation = conf.getCompactionReadOperationPerSec();
conf.setCompactionReadOperationPerSec(
Integer.parseInt(
properties.getProperty(
"compaction_read_operation_per_sec",
Integer.toString(conf.getCompactionReadOperationPerSec()))));
+ configModified |= compactionReadOperation !=
conf.getCompactionReadOperationPerSec();
// update compaction_read_throughput_mb_per_sec
+ int compactionReadThroughput = conf.getCompactionReadThroughputMbPerSec();
conf.setCompactionReadThroughputMbPerSec(
Integer.parseInt(
properties.getProperty(
"compaction_read_throughput_mb_per_sec",
Integer.toString(conf.getCompactionReadThroughputMbPerSec()))));
+ configModified |= compactionReadThroughput !=
conf.getCompactionReadThroughputMbPerSec();
// update max_inner_compaction_candidate_file_num
+ int maxInnerCompactionCandidateFileNum = conf.getFileLimitPerInnerTask();
conf.setFileLimitPerInnerTask(
Integer.parseInt(
properties.getProperty(
"max_inner_compaction_candidate_file_num",
Integer.toString(conf.getFileLimitPerInnerTask()))));
+ configModified |= maxInnerCompactionCandidateFileNum !=
conf.getFileLimitPerInnerTask();
// update target_compaction_file_size
+ long targetCompactionFilesize = conf.getTargetCompactionFileSize();
conf.setTargetCompactionFileSize(
Long.parseLong(
properties.getProperty(
"target_compaction_file_size",
Long.toString(conf.getTargetCompactionFileSize()))));
+ configModified |= targetCompactionFilesize !=
conf.getTargetCompactionFileSize();
// update max_cross_compaction_candidate_file_num
+ int maxCrossCompactionCandidateFileNum = conf.getFileLimitPerCrossTask();
conf.setFileLimitPerCrossTask(
Integer.parseInt(
properties.getProperty(
"max_cross_compaction_candidate_file_num",
Integer.toString(conf.getFileLimitPerCrossTask()))));
+ configModified |= maxCrossCompactionCandidateFileNum !=
conf.getFileLimitPerCrossTask();
// update max_cross_compaction_candidate_file_size
+ long maxCrossCompactionCandidateFileSize =
conf.getMaxCrossCompactionCandidateFileSize();
conf.setMaxCrossCompactionCandidateFileSize(
Long.parseLong(
properties.getProperty(
"max_cross_compaction_candidate_file_size",
Long.toString(conf.getMaxCrossCompactionCandidateFileSize()))));
+ configModified |=
+ maxCrossCompactionCandidateFileSize !=
conf.getMaxCrossCompactionCandidateFileSize();
// update min_cross_compaction_unseq_file_level
+ int minCrossCompactionCandidateFileNum =
conf.getMinCrossCompactionUnseqFileLevel();
conf.setMinCrossCompactionUnseqFileLevel(
Integer.parseInt(
properties.getProperty(
"min_cross_compaction_unseq_file_level",
Integer.toString(conf.getMinCrossCompactionUnseqFileLevel()))));
+ configModified |=
+ minCrossCompactionCandidateFileNum !=
conf.getMinCrossCompactionUnseqFileLevel();
- CompactionTaskManager.getInstance()
-
.setCompactionReadOperationRate(conf.getCompactionReadOperationPerSec());
- CompactionTaskManager.getInstance()
-
.setCompactionReadThroughputRate(conf.getCompactionReadThroughputMbPerSec());
- CompactionTaskManager.getInstance()
- .setWriteMergeRate(conf.getCompactionWriteThroughputMbPerSec());
+ // update inner_compaction_task_selection_disk_redundancy
+ double innerCompactionTaskSelectionDiskRedundancy =
+ conf.getInnerCompactionTaskSelectionDiskRedundancy();
+ conf.setInnerCompactionTaskSelectionDiskRedundancy(
+ Double.parseDouble(
+ properties.getProperty(
+ "inner_compaction_task_selection_disk_redundancy",
+
Double.toString(conf.getInnerCompactionTaskSelectionDiskRedundancy()))));
+ configModified |=
+ (Math.abs(
+ innerCompactionTaskSelectionDiskRedundancy
+ - conf.getInnerCompactionTaskSelectionDiskRedundancy())
+ > 0.001);
+
+ // update inner_compaction_task_selection_mods_file_threshold
+ long innerCompactionTaskSelectionModsFileThreshold =
+ conf.getInnerCompactionTaskSelectionModsFileThreshold();
+ conf.setInnerCompactionTaskSelectionModsFileThreshold(
+ Long.parseLong(
+ properties.getProperty(
+ "inner_compaction_task_selection_mods_file_threshold",
+
Long.toString(conf.getInnerCompactionTaskSelectionModsFileThreshold()))));
+ configModified |=
+ innerCompactionTaskSelectionModsFileThreshold
+ != conf.getInnerCompactionTaskSelectionModsFileThreshold();
+
+ return configModified;
}
private boolean loadCompactionThreadCountHotModifiedProps(Properties
properties) {
@@ -1298,18 +1351,6 @@ public class IoTDBDescriptor {
conf.setEnableCrossSpaceCompaction(newConfigEnableCrossSpaceCompaction);
conf.setEnableSeqSpaceCompaction(newConfigEnableSeqSpaceCompaction);
conf.setEnableUnseqSpaceCompaction(newConfigEnableUnseqSpaceCompaction);
-
- conf.setInnerCompactionTaskSelectionDiskRedundancy(
- Double.parseDouble(
- properties.getProperty(
- "inner_compaction_task_selection_disk_redundancy",
-
Double.toString(conf.getInnerCompactionTaskSelectionDiskRedundancy()))));
-
- conf.setInnerCompactionTaskSelectionModsFileThreshold(
- Long.parseLong(
- properties.getProperty(
- "inner_compaction_task_selection_mods_file_threshold",
-
Long.toString(conf.getInnerCompactionTaskSelectionModsFileThreshold()))));
}
private void loadWALHotModifiedProps(Properties properties) {
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/task/AbstractCompactionTask.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/task/AbstractCompactionTask.java
index 2188ae7da5f..15c82f26417 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/task/AbstractCompactionTask.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/task/AbstractCompactionTask.java
@@ -81,6 +81,7 @@ public abstract class AbstractCompactionTask {
private boolean memoryAcquired = false;
private boolean fileHandleAcquired = false;
+ protected long compactionConfigVersion = Long.MAX_VALUE;
protected AbstractCompactionTask(
String storageGroupName,
@@ -114,6 +115,16 @@ public abstract class AbstractCompactionTask {
public abstract List<TsFileResource> getAllSourceTsFiles();
+ public long getCompactionConfigVersion() {
+ // This parameter should not take effect by default unless it is
overridden by a subclass
+ return Long.MAX_VALUE;
+ }
+
+ public void setCompactionConfigVersion(long compactionConfigVersion) {
+ // This parameter should not take effect by default unless it is
overridden by a subclass
+ this.compactionConfigVersion = Long.MAX_VALUE;
+ }
+
/**
* This method will try to set the files to COMPACTION_CANDIDATE. If failed,
it should roll back
* all status to original value
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/task/CrossSpaceCompactionTask.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/task/CrossSpaceCompactionTask.java
index 2c6aa02b9b0..99aad09f47b 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/task/CrossSpaceCompactionTask.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/task/CrossSpaceCompactionTask.java
@@ -20,6 +20,7 @@
package org.apache.iotdb.db.storageengine.dataregion.compaction.execute.task;
import org.apache.iotdb.commons.conf.IoTDBConstant;
+import org.apache.iotdb.db.conf.IoTDBDescriptor;
import org.apache.iotdb.db.service.metrics.CompactionMetrics;
import org.apache.iotdb.db.service.metrics.FileMetrics;
import
org.apache.iotdb.db.storageengine.dataregion.compaction.execute.exception.CompactionRecoverException;
@@ -31,6 +32,7 @@ import
org.apache.iotdb.db.storageengine.dataregion.compaction.execute.utils.log
import
org.apache.iotdb.db.storageengine.dataregion.compaction.execute.utils.log.CompactionLogger;
import
org.apache.iotdb.db.storageengine.dataregion.compaction.execute.utils.log.SimpleCompactionLogger;
import
org.apache.iotdb.db.storageengine.dataregion.compaction.execute.utils.log.TsFileIdentifier;
+import
org.apache.iotdb.db.storageengine.dataregion.compaction.schedule.CompactionTaskManager;
import org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileManager;
import org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileResource;
import
org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileResourceStatus;
@@ -131,6 +133,13 @@ public class CrossSpaceCompactionTask extends
AbstractCompactionTask {
if (!tsFileManager.isAllowCompaction()) {
return true;
}
+ if
(!IoTDBDescriptor.getInstance().getConfig().isEnableCrossSpaceCompaction()) {
+ return true;
+ }
+ if (compactionConfigVersion
+ <
CompactionTaskManager.getInstance().getCurrentCompactionConfigVersion()) {
+ return true;
+ }
long startTime = System.currentTimeMillis();
targetTsfileResourceList =
TsFileNameGenerator.getCrossCompactionTargetFileResources(selectedSequenceFiles);
@@ -439,4 +448,14 @@ public class CrossSpaceCompactionTask extends
AbstractCompactionTask {
this.summary = new CompactionTaskSummary();
}
}
+
+ @Override
+ public long getCompactionConfigVersion() {
+ return this.compactionConfigVersion;
+ }
+
+ @Override
+ public void setCompactionConfigVersion(long compactionConfigVersion) {
+ this.compactionConfigVersion = Math.min(this.compactionConfigVersion,
compactionConfigVersion);
+ }
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/task/InnerSpaceCompactionTask.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/task/InnerSpaceCompactionTask.java
index e4c55b8addf..b836863d2cb 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/task/InnerSpaceCompactionTask.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/task/InnerSpaceCompactionTask.java
@@ -20,6 +20,7 @@
package org.apache.iotdb.db.storageengine.dataregion.compaction.execute.task;
import org.apache.iotdb.commons.conf.IoTDBConstant;
+import org.apache.iotdb.db.conf.IoTDBDescriptor;
import org.apache.iotdb.db.exception.DiskSpaceInsufficientException;
import org.apache.iotdb.db.service.metrics.CompactionMetrics;
import org.apache.iotdb.db.service.metrics.FileMetrics;
@@ -33,6 +34,7 @@ import
org.apache.iotdb.db.storageengine.dataregion.compaction.execute.utils.log
import
org.apache.iotdb.db.storageengine.dataregion.compaction.execute.utils.log.CompactionLogger;
import
org.apache.iotdb.db.storageengine.dataregion.compaction.execute.utils.log.SimpleCompactionLogger;
import
org.apache.iotdb.db.storageengine.dataregion.compaction.execute.utils.log.TsFileIdentifier;
+import
org.apache.iotdb.db.storageengine.dataregion.compaction.schedule.CompactionTaskManager;
import
org.apache.iotdb.db.storageengine.dataregion.compaction.selector.estimator.AbstractInnerSpaceEstimator;
import
org.apache.iotdb.db.storageengine.dataregion.compaction.selector.estimator.FastCompactionInnerCompactionEstimator;
import
org.apache.iotdb.db.storageengine.dataregion.compaction.selector.estimator.ReadChunkInnerCompactionEstimator;
@@ -169,6 +171,15 @@ public class InnerSpaceCompactionTask extends
AbstractCompactionTask {
if (!tsFileManager.isAllowCompaction()) {
return true;
}
+ if ((sequence &&
!IoTDBDescriptor.getInstance().getConfig().isEnableSeqSpaceCompaction())
+ || (!sequence
+ &&
!IoTDBDescriptor.getInstance().getConfig().isEnableUnseqSpaceCompaction())) {
+ return true;
+ }
+ if (this.compactionConfigVersion
+ <
CompactionTaskManager.getInstance().getCurrentCompactionConfigVersion()) {
+ return true;
+ }
long startTime = System.currentTimeMillis();
// get resource of target file
recoverMemoryStatus = true;
@@ -521,4 +532,14 @@ public class InnerSpaceCompactionTask extends
AbstractCompactionTask {
this.summary = new CompactionTaskSummary();
}
}
+
+ @Override
+ public long getCompactionConfigVersion() {
+ return this.compactionConfigVersion;
+ }
+
+ @Override
+ public void setCompactionConfigVersion(long compactionConfigVersion) {
+ this.compactionConfigVersion = Math.min(this.compactionConfigVersion,
compactionConfigVersion);
+ }
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/schedule/CompactionScheduler.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/schedule/CompactionScheduler.java
index 6ebb5578670..8d57caee8fd 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/schedule/CompactionScheduler.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/schedule/CompactionScheduler.java
@@ -140,6 +140,8 @@ public class CompactionScheduler {
String storageGroupName = tsFileManager.getStorageGroupName();
String dataRegionId = tsFileManager.getDataRegionId();
+ long compactionConfigVersionWhenSelectTask =
+
CompactionTaskManager.getInstance().getCurrentCompactionConfigVersion();
ICompactionSelector innerSpaceCompactionSelector;
if (sequence) {
innerSpaceCompactionSelector =
@@ -162,6 +164,8 @@ public class CompactionScheduler {
.updateCompactionTaskSelectionTimeCost(
sequence ? CompactionTaskType.INNER_SEQ :
CompactionTaskType.INNER_UNSEQ,
System.currentTimeMillis() - startTime);
+ innerSpaceTaskList.forEach(
+ task ->
task.setCompactionConfigVersion(compactionConfigVersionWhenSelectTask));
// the name of this variable is trySubmitCount, because the task submitted
to the queue could be
// evicted due to the low priority of the task
int trySubmitCount = addTaskToWaitingQueue(innerSpaceTaskList);
@@ -246,6 +250,8 @@ public class CompactionScheduler {
}
String logicalStorageGroupName = tsFileManager.getStorageGroupName();
String dataRegionId = tsFileManager.getDataRegionId();
+ long compactionConfigVersionWhenSelectTask =
+
CompactionTaskManager.getInstance().getCurrentCompactionConfigVersion();
ICrossSpaceSelector crossSpaceCompactionSelector =
config
.getCrossCompactionSelector()
@@ -263,20 +269,20 @@ public class CompactionScheduler {
// evicted due to the low priority of the task
int trySubmitCount = 0;
for (int i = 0, size = taskList.size(); i < size; ++i) {
- trySubmitCount =
- addTaskToWaitingQueue(
- Collections.singletonList(
- new CrossSpaceCompactionTask(
- timePartition,
- tsFileManager,
- taskList.get(i).getSeqFiles(),
- taskList.get(i).getUnseqFiles(),
- IoTDBDescriptor.getInstance()
- .getConfig()
- .getCrossCompactionPerformer()
- .createInstance(),
- memoryCost.get(i),
- tsFileManager.getNextCompactionTaskId())));
+ CrossSpaceCompactionTask task =
+ new CrossSpaceCompactionTask(
+ timePartition,
+ tsFileManager,
+ taskList.get(i).getSeqFiles(),
+ taskList.get(i).getUnseqFiles(),
+ IoTDBDescriptor.getInstance()
+ .getConfig()
+ .getCrossCompactionPerformer()
+ .createInstance(),
+ memoryCost.get(i),
+ tsFileManager.getNextCompactionTaskId());
+ task.setCompactionConfigVersion(compactionConfigVersionWhenSelectTask);
+ trySubmitCount = addTaskToWaitingQueue(Collections.singletonList(task));
}
summary.incrementSubmitTaskNum(CompactionTaskType.CROSS, trySubmitCount);
return trySubmitCount;
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/schedule/CompactionTaskManager.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/schedule/CompactionTaskManager.java
index b0066398318..59899d544cd 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/schedule/CompactionTaskManager.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/schedule/CompactionTaskManager.java
@@ -53,6 +53,7 @@ import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicLong;
/** CompactionMergeTaskPoolManager provides a ThreadPool tPro queue and run
all compaction tasks. */
@SuppressWarnings("squid:S6548")
@@ -100,6 +101,7 @@ public class CompactionTaskManager implements IService {
: config.getCompactionReadThroughputMbPerSec() * 1024.0 *
1024.0);
private volatile boolean init = false;
+ private AtomicLong compactionConfigVersion = new AtomicLong(0);
public static CompactionTaskManager getInstance() {
return INSTANCE;
@@ -109,6 +111,14 @@ public class CompactionTaskManager implements IService {
return stopAllCompactionWorker;
}
+ public long getCurrentCompactionConfigVersion() {
+ return compactionConfigVersion.get();
+ }
+
+ public void incrCompactionConfigVersion() {
+ this.compactionConfigVersion.incrementAndGet();
+ }
+
@Override
public synchronized void start() {
if (taskExecutionPool == null
@@ -249,7 +259,8 @@ public class CompactionTaskManager implements IService {
if (init
&& !candidateCompactionTaskQueue.contains(compactionTask)
&& !isTaskRunning(compactionTask)
- && compactionTask.setSourceFilesToCompactionCandidate()) {
+ && compactionTask.setSourceFilesToCompactionCandidate()
+ && compactionTask.getCompactionConfigVersion() >=
getCurrentCompactionConfigVersion()) {
candidateCompactionTaskQueue.put(compactionTask);
return true;
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/schedule/CompactionTaskQueue.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/schedule/CompactionTaskQueue.java
index 9f0a5dbbc7d..10cc73eb8bf 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/schedule/CompactionTaskQueue.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/schedule/CompactionTaskQueue.java
@@ -82,6 +82,8 @@ public class CompactionTaskQueue extends
FixedPriorityBlockingQueue<AbstractComp
private boolean checkTaskValid(AbstractCompactionTask task) {
return task.isCompactionAllowed()
+ && task.getCompactionConfigVersion()
+ >=
CompactionTaskManager.getInstance().getCurrentCompactionConfigVersion()
&& task.getEstimatedMemoryCost() <=
SystemInfo.getInstance().getMemorySizeForCompaction()
&& task.getProcessedFileNum() <=
SystemInfo.getInstance().getTotalFileLimitForCompaction();
}
diff --git
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/compaction/CompactionOverlapCheckTest.java
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/compaction/CompactionOverlapCheckTest.java
index 9d43c6f9ba1..871c9a815a7 100644
---
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/compaction/CompactionOverlapCheckTest.java
+++
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/compaction/CompactionOverlapCheckTest.java
@@ -47,6 +47,8 @@ public class CompactionOverlapCheckTest extends
AbstractCompactionTest {
IoTDBDescriptor.getInstance().getConfig().getChunkSizeLowerBoundInCompaction();
long initChunkPointNumLowerBound =
IoTDBDescriptor.getInstance().getConfig().getChunkPointNumLowerBoundInCompaction();
+ boolean isEnableUnseqSpaceCompaction =
+ IoTDBDescriptor.getInstance().getConfig().isEnableUnseqSpaceCompaction();
@Before
public void setUp()
@@ -54,6 +56,7 @@ public class CompactionOverlapCheckTest extends
AbstractCompactionTest {
super.setUp();
IoTDBDescriptor.getInstance().getConfig().setChunkSizeLowerBoundInCompaction(10240);
IoTDBDescriptor.getInstance().getConfig().setChunkPointNumLowerBoundInCompaction(1000);
+
IoTDBDescriptor.getInstance().getConfig().setEnableUnseqSpaceCompaction(true);
}
@After
@@ -65,6 +68,9 @@ public class CompactionOverlapCheckTest extends
AbstractCompactionTest {
IoTDBDescriptor.getInstance()
.getConfig()
.setChunkPointNumLowerBoundInCompaction(initChunkPointNumLowerBound);
+ IoTDBDescriptor.getInstance()
+ .getConfig()
+ .setEnableUnseqSpaceCompaction(isEnableUnseqSpaceCompaction);
}
@Test
diff --git
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/compaction/FastInnerCompactionPerformerTest.java
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/compaction/FastInnerCompactionPerformerTest.java
index 2e0ded23e54..14ead8dff82 100644
---
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/compaction/FastInnerCompactionPerformerTest.java
+++
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/compaction/FastInnerCompactionPerformerTest.java
@@ -77,6 +77,9 @@ import static org.junit.Assert.assertEquals;
public class FastInnerCompactionPerformerTest extends AbstractCompactionTest {
+ private boolean enableUnseqSpaceCompaction =
+ IoTDBDescriptor.getInstance().getConfig().isEnableUnseqSpaceCompaction();
+
@Before
public void setUp()
throws IOException, WriteProcessException, MetadataException,
InterruptedException {
@@ -84,6 +87,7 @@ public class FastInnerCompactionPerformerTest extends
AbstractCompactionTest {
IoTDBDescriptor.getInstance().getConfig().setTargetChunkSize(512);
IoTDBDescriptor.getInstance().getConfig().setTargetChunkPointNum(100);
TSFileDescriptor.getInstance().getConfig().setMaxNumberOfPointsInPage(10);
+
IoTDBDescriptor.getInstance().getConfig().setEnableUnseqSpaceCompaction(true);
}
@After
@@ -95,6 +99,9 @@ public class FastInnerCompactionPerformerTest extends
AbstractCompactionTest {
for (TsFileResource tsFileResource : unseqResources) {
FileReaderManager.getInstance().closeFileAndRemoveReader(tsFileResource.getTsFilePath());
}
+ IoTDBDescriptor.getInstance()
+ .getConfig()
+ .setEnableUnseqSpaceCompaction(enableUnseqSpaceCompaction);
}
/* Total 5 seq files, each file has the same 6 nonAligned timeseries, each
timeseries has the same 100 data point.*/
diff --git
a/iotdb-core/node-commons/src/assembly/resources/conf/iotdb-common.properties
b/iotdb-core/node-commons/src/assembly/resources/conf/iotdb-common.properties
index 099f76fb3b7..4760251318f 100644
---
a/iotdb-core/node-commons/src/assembly/resources/conf/iotdb-common.properties
+++
b/iotdb-core/node-commons/src/assembly/resources/conf/iotdb-common.properties
@@ -582,7 +582,7 @@ data_replication_factor=1
# The size of candidate compaction task queue.
# Datatype: int
-# candidate_compaction_task_queue_size = 200
+# candidate_compaction_task_queue_size=50
# This parameter is used in two places:
# 1. The target tsfile size of inner space compaction.