This is an automated email from the ASF dual-hosted git repository.

haonan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/master by this push:
     new fde91579d17 Abort old task when compaction config is modified (#12611)
fde91579d17 is described below

commit fde91579d17be22904e6dc106757d0680147154e
Author: shuwenwei <[email protected]>
AuthorDate: Wed May 29 19:07:43 2024 +0800

    Abort old task when compaction config is modified (#12611)
---
 .../java/org/apache/iotdb/db/conf/IoTDBConfig.java | 24 +++----
 .../org/apache/iotdb/db/conf/IoTDBDescriptor.java  | 77 +++++++++++++++++-----
 .../execute/task/AbstractCompactionTask.java       | 11 ++++
 .../execute/task/CrossSpaceCompactionTask.java     | 19 ++++++
 .../execute/task/InnerSpaceCompactionTask.java     | 21 ++++++
 .../compaction/schedule/CompactionScheduler.java   | 34 ++++++----
 .../compaction/schedule/CompactionTaskManager.java | 13 +++-
 .../compaction/schedule/CompactionTaskQueue.java   |  2 +
 .../compaction/CompactionOverlapCheckTest.java     |  6 ++
 .../FastInnerCompactionPerformerTest.java          |  7 ++
 .../resources/conf/iotdb-common.properties         |  2 +-
 11 files changed, 170 insertions(+), 46 deletions(-)

diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
index 90afe2cc395..111979a07c4 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
@@ -429,13 +429,13 @@ public class IoTDBConfig {
   private int avgSeriesPointNumberThreshold = 100000;
 
   /** Enable inner space compaction for sequence files */
-  private boolean enableSeqSpaceCompaction = true;
+  private volatile boolean enableSeqSpaceCompaction = true;
 
   /** Enable inner space compaction for unsequence files */
-  private boolean enableUnseqSpaceCompaction = true;
+  private volatile boolean enableUnseqSpaceCompaction = true;
 
   /** Compact the unsequence files into the overlapped sequence files */
-  private boolean enableCrossSpaceCompaction = true;
+  private volatile boolean enableCrossSpaceCompaction = true;
 
   /** The buffer for sort operation */
   private long sortBufferSize = 1024 * 1024L;
@@ -502,22 +502,22 @@ public class IoTDBConfig {
   private long compactionAcquireWriteLockTimeout = 60_000L;
 
   /** The max candidate file num in one inner space compaction task */
-  private int fileLimitPerInnerTask = 30;
+  private volatile int fileLimitPerInnerTask = 30;
 
   /** The max candidate file num in one cross space compaction task */
-  private int fileLimitPerCrossTask = 500;
+  private volatile int fileLimitPerCrossTask = 500;
 
   /** The max candidate file num in compaction */
-  private int totalFileLimitForCompactionTask = 5000;
+  private volatile int totalFileLimitForCompactionTask = 5000;
 
   /** The max total size of candidate files in one cross space compaction task 
*/
-  private long maxCrossCompactionCandidateFileSize = 1024 * 1024 * 1024 * 5L;
+  private volatile long maxCrossCompactionCandidateFileSize = 1024 * 1024 * 
1024 * 5L;
 
   /**
    * Only the unseq files whose level of inner space compaction reaches this 
value can be selected
    * to participate in the cross space compaction.
    */
-  private int minCrossCompactionUnseqFileLevel = 1;
+  private volatile int minCrossCompactionUnseqFileLevel = 1;
 
   /** The interval of compaction task schedulation in each virtual database. 
The unit is ms. */
   private long compactionScheduleIntervalInMs = 60_000L;
@@ -534,23 +534,23 @@ public class IoTDBConfig {
   /** The number of threads to be set up to select compaction task. */
   private int compactionScheduleThreadNum = 4;
 
-  private boolean enableTsFileValidation = false;
+  private volatile boolean enableTsFileValidation = false;
 
   /** The size of candidate compaction task queue. */
-  private int candidateCompactionTaskQueueSize = 200;
+  private int candidateCompactionTaskQueueSize = 50;
 
   /**
    * When the size of the mods file corresponding to TsFile exceeds this 
value, inner compaction
    * tasks containing mods files are selected first.
    */
-  private long innerCompactionTaskSelectionModsFileThreshold = 10 * 1024 * 
1024L;
+  private volatile long innerCompactionTaskSelectionModsFileThreshold = 10 * 
1024 * 1024L;
 
   /**
    * When disk availability is lower than the sum of 
(disk_space_warning_threshold +
    * inner_compaction_task_selection_disk_redundancy), inner compaction tasks 
containing mods files
    * are selected first.
    */
-  private double innerCompactionTaskSelectionDiskRedundancy = 0.05;
+  private volatile double innerCompactionTaskSelectionDiskRedundancy = 0.05;
 
   /** The size of global compaction estimation file info cahce. */
   private int globalCompactionFileInfoCacheSize = 1000;
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java
index 659026f8e24..c7d0613ac9d 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java
@@ -1147,6 +1147,10 @@ public class IoTDBDescriptor {
   }
 
   private void loadCompactionHotModifiedProps(Properties properties) throws 
InterruptedException {
+    boolean compactionTaskConfigHotModified = 
loadCompactionTaskHotModifiedProps(properties);
+    if (compactionTaskConfigHotModified) {
+      CompactionTaskManager.getInstance().incrCompactionConfigVersion();
+    }
     // hot load compaction schedule task manager configurations
     int compactionScheduleThreadNum =
         Integer.parseInt(
@@ -1165,69 +1169,118 @@ public class IoTDBDescriptor {
     if (restartCompactionTaskManager) {
       CompactionTaskManager.getInstance().restart();
     }
+
     // hot load compaction rate limit configurations
+    CompactionTaskManager.getInstance()
+        
.setCompactionReadOperationRate(conf.getCompactionReadOperationPerSec());
+    CompactionTaskManager.getInstance()
+        
.setCompactionReadThroughputRate(conf.getCompactionReadThroughputMbPerSec());
+    CompactionTaskManager.getInstance()
+        .setWriteMergeRate(conf.getCompactionWriteThroughputMbPerSec());
+  }
 
+  private boolean loadCompactionTaskHotModifiedProps(Properties properties) {
+    boolean configModified = false;
     // update merge_write_throughput_mb_per_sec
+    int compactionWriteThroughput = 
conf.getCompactionWriteThroughputMbPerSec();
     conf.setCompactionWriteThroughputMbPerSec(
         Integer.parseInt(
             properties.getProperty(
                 "compaction_write_throughput_mb_per_sec",
                 
Integer.toString(conf.getCompactionWriteThroughputMbPerSec()))));
+    configModified |= compactionWriteThroughput != 
conf.getCompactionWriteThroughputMbPerSec();
 
     // update compaction_read_operation_per_sec
+    int compactionReadOperation = conf.getCompactionReadOperationPerSec();
     conf.setCompactionReadOperationPerSec(
         Integer.parseInt(
             properties.getProperty(
                 "compaction_read_operation_per_sec",
                 Integer.toString(conf.getCompactionReadOperationPerSec()))));
+    configModified |= compactionReadOperation != 
conf.getCompactionReadOperationPerSec();
 
     // update compaction_read_throughput_mb_per_sec
+    int compactionReadThroughput = conf.getCompactionReadThroughputMbPerSec();
     conf.setCompactionReadThroughputMbPerSec(
         Integer.parseInt(
             properties.getProperty(
                 "compaction_read_throughput_mb_per_sec",
                 
Integer.toString(conf.getCompactionReadThroughputMbPerSec()))));
+    configModified |= compactionReadThroughput != 
conf.getCompactionReadThroughputMbPerSec();
 
     // update max_inner_compaction_candidate_file_num
+    int maxInnerCompactionCandidateFileNum = conf.getFileLimitPerInnerTask();
     conf.setFileLimitPerInnerTask(
         Integer.parseInt(
             properties.getProperty(
                 "max_inner_compaction_candidate_file_num",
                 Integer.toString(conf.getFileLimitPerInnerTask()))));
+    configModified |= maxInnerCompactionCandidateFileNum != 
conf.getFileLimitPerInnerTask();
 
     // update target_compaction_file_size
+    long targetCompactionFilesize = conf.getTargetCompactionFileSize();
     conf.setTargetCompactionFileSize(
         Long.parseLong(
             properties.getProperty(
                 "target_compaction_file_size", 
Long.toString(conf.getTargetCompactionFileSize()))));
+    configModified |= targetCompactionFilesize != 
conf.getTargetCompactionFileSize();
 
     // update max_cross_compaction_candidate_file_num
+    int maxCrossCompactionCandidateFileNum = conf.getFileLimitPerCrossTask();
     conf.setFileLimitPerCrossTask(
         Integer.parseInt(
             properties.getProperty(
                 "max_cross_compaction_candidate_file_num",
                 Integer.toString(conf.getFileLimitPerCrossTask()))));
+    configModified |= maxCrossCompactionCandidateFileNum != 
conf.getFileLimitPerCrossTask();
 
     // update max_cross_compaction_candidate_file_size
+    long maxCrossCompactionCandidateFileSize = 
conf.getMaxCrossCompactionCandidateFileSize();
     conf.setMaxCrossCompactionCandidateFileSize(
         Long.parseLong(
             properties.getProperty(
                 "max_cross_compaction_candidate_file_size",
                 
Long.toString(conf.getMaxCrossCompactionCandidateFileSize()))));
+    configModified |=
+        maxCrossCompactionCandidateFileSize != 
conf.getMaxCrossCompactionCandidateFileSize();
 
     // update min_cross_compaction_unseq_file_level
+    int minCrossCompactionCandidateFileNum = 
conf.getMinCrossCompactionUnseqFileLevel();
     conf.setMinCrossCompactionUnseqFileLevel(
         Integer.parseInt(
             properties.getProperty(
                 "min_cross_compaction_unseq_file_level",
                 
Integer.toString(conf.getMinCrossCompactionUnseqFileLevel()))));
+    configModified |=
+        minCrossCompactionCandidateFileNum != 
conf.getMinCrossCompactionUnseqFileLevel();
 
-    CompactionTaskManager.getInstance()
-        
.setCompactionReadOperationRate(conf.getCompactionReadOperationPerSec());
-    CompactionTaskManager.getInstance()
-        
.setCompactionReadThroughputRate(conf.getCompactionReadThroughputMbPerSec());
-    CompactionTaskManager.getInstance()
-        .setWriteMergeRate(conf.getCompactionWriteThroughputMbPerSec());
+    // update inner_compaction_task_selection_disk_redundancy
+    double innerCompactionTaskSelectionDiskRedundancy =
+        conf.getInnerCompactionTaskSelectionDiskRedundancy();
+    conf.setInnerCompactionTaskSelectionDiskRedundancy(
+        Double.parseDouble(
+            properties.getProperty(
+                "inner_compaction_task_selection_disk_redundancy",
+                
Double.toString(conf.getInnerCompactionTaskSelectionDiskRedundancy()))));
+    configModified |=
+        (Math.abs(
+                innerCompactionTaskSelectionDiskRedundancy
+                    - conf.getInnerCompactionTaskSelectionDiskRedundancy())
+            > 0.001);
+
+    // update inner_compaction_task_selection_mods_file_threshold
+    long innerCompactionTaskSelectionModsFileThreshold =
+        conf.getInnerCompactionTaskSelectionModsFileThreshold();
+    conf.setInnerCompactionTaskSelectionModsFileThreshold(
+        Long.parseLong(
+            properties.getProperty(
+                "inner_compaction_task_selection_mods_file_threshold",
+                
Long.toString(conf.getInnerCompactionTaskSelectionModsFileThreshold()))));
+    configModified |=
+        innerCompactionTaskSelectionModsFileThreshold
+            != conf.getInnerCompactionTaskSelectionModsFileThreshold();
+
+    return configModified;
   }
 
   private boolean loadCompactionThreadCountHotModifiedProps(Properties 
properties) {
@@ -1298,18 +1351,6 @@ public class IoTDBDescriptor {
     conf.setEnableCrossSpaceCompaction(newConfigEnableCrossSpaceCompaction);
     conf.setEnableSeqSpaceCompaction(newConfigEnableSeqSpaceCompaction);
     conf.setEnableUnseqSpaceCompaction(newConfigEnableUnseqSpaceCompaction);
-
-    conf.setInnerCompactionTaskSelectionDiskRedundancy(
-        Double.parseDouble(
-            properties.getProperty(
-                "inner_compaction_task_selection_disk_redundancy",
-                
Double.toString(conf.getInnerCompactionTaskSelectionDiskRedundancy()))));
-
-    conf.setInnerCompactionTaskSelectionModsFileThreshold(
-        Long.parseLong(
-            properties.getProperty(
-                "inner_compaction_task_selection_mods_file_threshold",
-                
Long.toString(conf.getInnerCompactionTaskSelectionModsFileThreshold()))));
   }
 
   private void loadWALHotModifiedProps(Properties properties) {
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/task/AbstractCompactionTask.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/task/AbstractCompactionTask.java
index 2188ae7da5f..15c82f26417 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/task/AbstractCompactionTask.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/task/AbstractCompactionTask.java
@@ -81,6 +81,7 @@ public abstract class AbstractCompactionTask {
 
   private boolean memoryAcquired = false;
   private boolean fileHandleAcquired = false;
+  protected long compactionConfigVersion = Long.MAX_VALUE;
 
   protected AbstractCompactionTask(
       String storageGroupName,
@@ -114,6 +115,16 @@ public abstract class AbstractCompactionTask {
 
   public abstract List<TsFileResource> getAllSourceTsFiles();
 
+  public long getCompactionConfigVersion() {
+    // This parameter should not take effect by default unless it is 
overridden by a subclass
+    return Long.MAX_VALUE;
+  }
+
+  public void setCompactionConfigVersion(long compactionConfigVersion) {
+    // This parameter should not take effect by default unless it is 
overridden by a subclass
+    this.compactionConfigVersion = Long.MAX_VALUE;
+  }
+
   /**
    * This method will try to set the files to COMPACTION_CANDIDATE. If failed, 
it should roll back
    * all status to original value
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/task/CrossSpaceCompactionTask.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/task/CrossSpaceCompactionTask.java
index 2c6aa02b9b0..99aad09f47b 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/task/CrossSpaceCompactionTask.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/task/CrossSpaceCompactionTask.java
@@ -20,6 +20,7 @@
 package org.apache.iotdb.db.storageengine.dataregion.compaction.execute.task;
 
 import org.apache.iotdb.commons.conf.IoTDBConstant;
+import org.apache.iotdb.db.conf.IoTDBDescriptor;
 import org.apache.iotdb.db.service.metrics.CompactionMetrics;
 import org.apache.iotdb.db.service.metrics.FileMetrics;
 import 
org.apache.iotdb.db.storageengine.dataregion.compaction.execute.exception.CompactionRecoverException;
@@ -31,6 +32,7 @@ import 
org.apache.iotdb.db.storageengine.dataregion.compaction.execute.utils.log
 import 
org.apache.iotdb.db.storageengine.dataregion.compaction.execute.utils.log.CompactionLogger;
 import 
org.apache.iotdb.db.storageengine.dataregion.compaction.execute.utils.log.SimpleCompactionLogger;
 import 
org.apache.iotdb.db.storageengine.dataregion.compaction.execute.utils.log.TsFileIdentifier;
+import 
org.apache.iotdb.db.storageengine.dataregion.compaction.schedule.CompactionTaskManager;
 import org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileManager;
 import org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileResource;
 import 
org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileResourceStatus;
@@ -131,6 +133,13 @@ public class CrossSpaceCompactionTask extends 
AbstractCompactionTask {
       if (!tsFileManager.isAllowCompaction()) {
         return true;
       }
+      if 
(!IoTDBDescriptor.getInstance().getConfig().isEnableCrossSpaceCompaction()) {
+        return true;
+      }
+      if (compactionConfigVersion
+          < 
CompactionTaskManager.getInstance().getCurrentCompactionConfigVersion()) {
+        return true;
+      }
       long startTime = System.currentTimeMillis();
       targetTsfileResourceList =
           
TsFileNameGenerator.getCrossCompactionTargetFileResources(selectedSequenceFiles);
@@ -439,4 +448,14 @@ public class CrossSpaceCompactionTask extends 
AbstractCompactionTask {
       this.summary = new CompactionTaskSummary();
     }
   }
+
+  @Override
+  public long getCompactionConfigVersion() {
+    return this.compactionConfigVersion;
+  }
+
+  @Override
+  public void setCompactionConfigVersion(long compactionConfigVersion) {
+    this.compactionConfigVersion = Math.min(this.compactionConfigVersion, 
compactionConfigVersion);
+  }
 }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/task/InnerSpaceCompactionTask.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/task/InnerSpaceCompactionTask.java
index e4c55b8addf..b836863d2cb 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/task/InnerSpaceCompactionTask.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/task/InnerSpaceCompactionTask.java
@@ -20,6 +20,7 @@
 package org.apache.iotdb.db.storageengine.dataregion.compaction.execute.task;
 
 import org.apache.iotdb.commons.conf.IoTDBConstant;
+import org.apache.iotdb.db.conf.IoTDBDescriptor;
 import org.apache.iotdb.db.exception.DiskSpaceInsufficientException;
 import org.apache.iotdb.db.service.metrics.CompactionMetrics;
 import org.apache.iotdb.db.service.metrics.FileMetrics;
@@ -33,6 +34,7 @@ import 
org.apache.iotdb.db.storageengine.dataregion.compaction.execute.utils.log
 import 
org.apache.iotdb.db.storageengine.dataregion.compaction.execute.utils.log.CompactionLogger;
 import 
org.apache.iotdb.db.storageengine.dataregion.compaction.execute.utils.log.SimpleCompactionLogger;
 import 
org.apache.iotdb.db.storageengine.dataregion.compaction.execute.utils.log.TsFileIdentifier;
+import 
org.apache.iotdb.db.storageengine.dataregion.compaction.schedule.CompactionTaskManager;
 import 
org.apache.iotdb.db.storageengine.dataregion.compaction.selector.estimator.AbstractInnerSpaceEstimator;
 import 
org.apache.iotdb.db.storageengine.dataregion.compaction.selector.estimator.FastCompactionInnerCompactionEstimator;
 import 
org.apache.iotdb.db.storageengine.dataregion.compaction.selector.estimator.ReadChunkInnerCompactionEstimator;
@@ -169,6 +171,15 @@ public class InnerSpaceCompactionTask extends 
AbstractCompactionTask {
     if (!tsFileManager.isAllowCompaction()) {
       return true;
     }
+    if ((sequence && 
!IoTDBDescriptor.getInstance().getConfig().isEnableSeqSpaceCompaction())
+        || (!sequence
+            && 
!IoTDBDescriptor.getInstance().getConfig().isEnableUnseqSpaceCompaction())) {
+      return true;
+    }
+    if (this.compactionConfigVersion
+        < 
CompactionTaskManager.getInstance().getCurrentCompactionConfigVersion()) {
+      return true;
+    }
     long startTime = System.currentTimeMillis();
     // get resource of target file
     recoverMemoryStatus = true;
@@ -521,4 +532,14 @@ public class InnerSpaceCompactionTask extends 
AbstractCompactionTask {
       this.summary = new CompactionTaskSummary();
     }
   }
+
+  @Override
+  public long getCompactionConfigVersion() {
+    return this.compactionConfigVersion;
+  }
+
+  @Override
+  public void setCompactionConfigVersion(long compactionConfigVersion) {
+    this.compactionConfigVersion = Math.min(this.compactionConfigVersion, 
compactionConfigVersion);
+  }
 }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/schedule/CompactionScheduler.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/schedule/CompactionScheduler.java
index 6ebb5578670..8d57caee8fd 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/schedule/CompactionScheduler.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/schedule/CompactionScheduler.java
@@ -140,6 +140,8 @@ public class CompactionScheduler {
     String storageGroupName = tsFileManager.getStorageGroupName();
     String dataRegionId = tsFileManager.getDataRegionId();
 
+    long compactionConfigVersionWhenSelectTask =
+        
CompactionTaskManager.getInstance().getCurrentCompactionConfigVersion();
     ICompactionSelector innerSpaceCompactionSelector;
     if (sequence) {
       innerSpaceCompactionSelector =
@@ -162,6 +164,8 @@ public class CompactionScheduler {
         .updateCompactionTaskSelectionTimeCost(
             sequence ? CompactionTaskType.INNER_SEQ : 
CompactionTaskType.INNER_UNSEQ,
             System.currentTimeMillis() - startTime);
+    innerSpaceTaskList.forEach(
+        task -> 
task.setCompactionConfigVersion(compactionConfigVersionWhenSelectTask));
     // the name of this variable is trySubmitCount, because the task submitted 
to the queue could be
     // evicted due to the low priority of the task
     int trySubmitCount = addTaskToWaitingQueue(innerSpaceTaskList);
@@ -246,6 +250,8 @@ public class CompactionScheduler {
     }
     String logicalStorageGroupName = tsFileManager.getStorageGroupName();
     String dataRegionId = tsFileManager.getDataRegionId();
+    long compactionConfigVersionWhenSelectTask =
+        
CompactionTaskManager.getInstance().getCurrentCompactionConfigVersion();
     ICrossSpaceSelector crossSpaceCompactionSelector =
         config
             .getCrossCompactionSelector()
@@ -263,20 +269,20 @@ public class CompactionScheduler {
     // evicted due to the low priority of the task
     int trySubmitCount = 0;
     for (int i = 0, size = taskList.size(); i < size; ++i) {
-      trySubmitCount =
-          addTaskToWaitingQueue(
-              Collections.singletonList(
-                  new CrossSpaceCompactionTask(
-                      timePartition,
-                      tsFileManager,
-                      taskList.get(i).getSeqFiles(),
-                      taskList.get(i).getUnseqFiles(),
-                      IoTDBDescriptor.getInstance()
-                          .getConfig()
-                          .getCrossCompactionPerformer()
-                          .createInstance(),
-                      memoryCost.get(i),
-                      tsFileManager.getNextCompactionTaskId())));
+      CrossSpaceCompactionTask task =
+          new CrossSpaceCompactionTask(
+              timePartition,
+              tsFileManager,
+              taskList.get(i).getSeqFiles(),
+              taskList.get(i).getUnseqFiles(),
+              IoTDBDescriptor.getInstance()
+                  .getConfig()
+                  .getCrossCompactionPerformer()
+                  .createInstance(),
+              memoryCost.get(i),
+              tsFileManager.getNextCompactionTaskId());
+      task.setCompactionConfigVersion(compactionConfigVersionWhenSelectTask);
+      trySubmitCount = addTaskToWaitingQueue(Collections.singletonList(task));
     }
     summary.incrementSubmitTaskNum(CompactionTaskType.CROSS, trySubmitCount);
     return trySubmitCount;
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/schedule/CompactionTaskManager.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/schedule/CompactionTaskManager.java
index b0066398318..59899d544cd 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/schedule/CompactionTaskManager.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/schedule/CompactionTaskManager.java
@@ -53,6 +53,7 @@ import java.util.concurrent.Future;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
 import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicLong;
 
 /** CompactionMergeTaskPoolManager provides a ThreadPool tPro queue and run 
all compaction tasks. */
 @SuppressWarnings("squid:S6548")
@@ -100,6 +101,7 @@ public class CompactionTaskManager implements IService {
               : config.getCompactionReadThroughputMbPerSec() * 1024.0 * 
1024.0);
 
   private volatile boolean init = false;
+  private AtomicLong compactionConfigVersion = new AtomicLong(0);
 
   public static CompactionTaskManager getInstance() {
     return INSTANCE;
@@ -109,6 +111,14 @@ public class CompactionTaskManager implements IService {
     return stopAllCompactionWorker;
   }
 
+  public long getCurrentCompactionConfigVersion() {
+    return compactionConfigVersion.get();
+  }
+
+  public void incrCompactionConfigVersion() {
+    this.compactionConfigVersion.incrementAndGet();
+  }
+
   @Override
   public synchronized void start() {
     if (taskExecutionPool == null
@@ -249,7 +259,8 @@ public class CompactionTaskManager implements IService {
     if (init
         && !candidateCompactionTaskQueue.contains(compactionTask)
         && !isTaskRunning(compactionTask)
-        && compactionTask.setSourceFilesToCompactionCandidate()) {
+        && compactionTask.setSourceFilesToCompactionCandidate()
+        && compactionTask.getCompactionConfigVersion() >= 
getCurrentCompactionConfigVersion()) {
       candidateCompactionTaskQueue.put(compactionTask);
       return true;
     }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/schedule/CompactionTaskQueue.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/schedule/CompactionTaskQueue.java
index 9f0a5dbbc7d..10cc73eb8bf 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/schedule/CompactionTaskQueue.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/schedule/CompactionTaskQueue.java
@@ -82,6 +82,8 @@ public class CompactionTaskQueue extends 
FixedPriorityBlockingQueue<AbstractComp
 
   private boolean checkTaskValid(AbstractCompactionTask task) {
     return task.isCompactionAllowed()
+        && task.getCompactionConfigVersion()
+            >= 
CompactionTaskManager.getInstance().getCurrentCompactionConfigVersion()
         && task.getEstimatedMemoryCost() <= 
SystemInfo.getInstance().getMemorySizeForCompaction()
         && task.getProcessedFileNum() <= 
SystemInfo.getInstance().getTotalFileLimitForCompaction();
   }
diff --git 
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/compaction/CompactionOverlapCheckTest.java
 
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/compaction/CompactionOverlapCheckTest.java
index 9d43c6f9ba1..871c9a815a7 100644
--- 
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/compaction/CompactionOverlapCheckTest.java
+++ 
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/compaction/CompactionOverlapCheckTest.java
@@ -47,6 +47,8 @@ public class CompactionOverlapCheckTest extends 
AbstractCompactionTest {
       
IoTDBDescriptor.getInstance().getConfig().getChunkSizeLowerBoundInCompaction();
   long initChunkPointNumLowerBound =
       
IoTDBDescriptor.getInstance().getConfig().getChunkPointNumLowerBoundInCompaction();
+  boolean isEnableUnseqSpaceCompaction =
+      IoTDBDescriptor.getInstance().getConfig().isEnableUnseqSpaceCompaction();
 
   @Before
   public void setUp()
@@ -54,6 +56,7 @@ public class CompactionOverlapCheckTest extends 
AbstractCompactionTest {
     super.setUp();
     
IoTDBDescriptor.getInstance().getConfig().setChunkSizeLowerBoundInCompaction(10240);
     
IoTDBDescriptor.getInstance().getConfig().setChunkPointNumLowerBoundInCompaction(1000);
+    
IoTDBDescriptor.getInstance().getConfig().setEnableUnseqSpaceCompaction(true);
   }
 
   @After
@@ -65,6 +68,9 @@ public class CompactionOverlapCheckTest extends 
AbstractCompactionTest {
     IoTDBDescriptor.getInstance()
         .getConfig()
         .setChunkPointNumLowerBoundInCompaction(initChunkPointNumLowerBound);
+    IoTDBDescriptor.getInstance()
+        .getConfig()
+        .setEnableUnseqSpaceCompaction(isEnableUnseqSpaceCompaction);
   }
 
   @Test
diff --git 
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/compaction/FastInnerCompactionPerformerTest.java
 
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/compaction/FastInnerCompactionPerformerTest.java
index 2e0ded23e54..14ead8dff82 100644
--- 
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/compaction/FastInnerCompactionPerformerTest.java
+++ 
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/compaction/FastInnerCompactionPerformerTest.java
@@ -77,6 +77,9 @@ import static org.junit.Assert.assertEquals;
 
 public class FastInnerCompactionPerformerTest extends AbstractCompactionTest {
 
+  private boolean enableUnseqSpaceCompaction =
+      IoTDBDescriptor.getInstance().getConfig().isEnableUnseqSpaceCompaction();
+
   @Before
   public void setUp()
       throws IOException, WriteProcessException, MetadataException, 
InterruptedException {
@@ -84,6 +87,7 @@ public class FastInnerCompactionPerformerTest extends 
AbstractCompactionTest {
     IoTDBDescriptor.getInstance().getConfig().setTargetChunkSize(512);
     IoTDBDescriptor.getInstance().getConfig().setTargetChunkPointNum(100);
     TSFileDescriptor.getInstance().getConfig().setMaxNumberOfPointsInPage(10);
+    
IoTDBDescriptor.getInstance().getConfig().setEnableUnseqSpaceCompaction(true);
   }
 
   @After
@@ -95,6 +99,9 @@ public class FastInnerCompactionPerformerTest extends 
AbstractCompactionTest {
     for (TsFileResource tsFileResource : unseqResources) {
       
FileReaderManager.getInstance().closeFileAndRemoveReader(tsFileResource.getTsFilePath());
     }
+    IoTDBDescriptor.getInstance()
+        .getConfig()
+        .setEnableUnseqSpaceCompaction(enableUnseqSpaceCompaction);
   }
 
   /* Total 5 seq files, each file has the same 6 nonAligned timeseries, each 
timeseries has the same 100 data point.*/
diff --git 
a/iotdb-core/node-commons/src/assembly/resources/conf/iotdb-common.properties 
b/iotdb-core/node-commons/src/assembly/resources/conf/iotdb-common.properties
index 099f76fb3b7..4760251318f 100644
--- 
a/iotdb-core/node-commons/src/assembly/resources/conf/iotdb-common.properties
+++ 
b/iotdb-core/node-commons/src/assembly/resources/conf/iotdb-common.properties
@@ -582,7 +582,7 @@ data_replication_factor=1
 
 # The size of candidate compaction task queue.
 # Datatype: int
-# candidate_compaction_task_queue_size = 200
+# candidate_compaction_task_queue_size=50
 
 # This parameter is used in two places:
 # 1. The target tsfile size of inner space compaction.

Reply via email to