This is an automated email from the ASF dual-hosted git repository.

jiangtian pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/master by this push:
     new 91b672da649 Allow hot reloading compaction from disabled status 
(#14470)
91b672da649 is described below

commit 91b672da649a0e004f8fff3693c866f80be3f338
Author: shuwenwei <[email protected]>
AuthorDate: Wed Dec 18 11:58:09 2024 +0800

    Allow hot reloading compaction from disabled status (#14470)
    
    * allow hot reloading compaction from disabled status
    
    * Modify the value setting of the parameter to conform to the comment.
    
    * fix ut
---
 .../org/apache/iotdb/db/conf/IoTDBDescriptor.java  | 23 +++++++++-------------
 .../impl/DataNodeInternalRPCServiceImpl.java       |  7 +++----
 .../config/executor/ClusterConfigTaskExecutor.java |  8 +++-----
 .../compaction/schedule/CompactionTaskManager.java | 11 ++++++-----
 4 files changed, 21 insertions(+), 28 deletions(-)

diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java
index 87888d0da87..fce4bb4e933 100755
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java
@@ -770,14 +770,15 @@ public class IoTDBDescriptor {
                         "merge_interval_sec", 
Long.toString(conf.getMergeIntervalSec())))
                 .map(String::trim)
                 .orElse(Long.toString(conf.getMergeIntervalSec()))));
-    conf.setCompactionThreadCount(
+    int compactionThreadCount =
         Integer.parseInt(
             Optional.ofNullable(
                     properties.getProperty(
                         "compaction_thread_count",
                         Integer.toString(conf.getCompactionThreadCount())))
                 .map(String::trim)
-                .orElse(Integer.toString(conf.getCompactionThreadCount()))));
+                .orElse(Integer.toString(conf.getCompactionThreadCount())));
+    conf.setCompactionThreadCount(compactionThreadCount <= 0 ? 1 : 
compactionThreadCount);
     int maxConcurrentAlignedSeriesInCompaction =
         Integer.parseInt(
             Optional.ofNullable(
@@ -1767,8 +1768,8 @@ public class IoTDBDescriptor {
 
     
CompactionScheduleTaskManager.getInstance().checkAndMayApplyConfigurationChange();
     // hot load compaction task manager configurations
-    loadCompactionIsEnabledHotModifiedProps(properties);
-    boolean restartCompactionTaskManager = 
loadCompactionThreadCountHotModifiedProps(properties);
+    boolean restartCompactionTaskManager = 
loadCompactionIsEnabledHotModifiedProps(properties);
+    restartCompactionTaskManager |= 
loadCompactionThreadCountHotModifiedProps(properties);
     restartCompactionTaskManager |= 
loadCompactionSubTaskCountHotModifiedProps(properties);
     if (restartCompactionTaskManager) {
       CompactionTaskManager.getInstance().restart();
@@ -2071,8 +2072,7 @@ public class IoTDBDescriptor {
                     ConfigurationFileUtils.getConfigurationDefaultValue(
                         "compaction_thread_count")));
     if (newConfigCompactionThreadCount <= 0) {
-      LOGGER.error("compaction_thread_count must greater than 0");
-      return false;
+      newConfigCompactionThreadCount = 1;
     }
     if (newConfigCompactionThreadCount == conf.getCompactionThreadCount()) {
       return false;
@@ -2105,8 +2105,7 @@ public class IoTDBDescriptor {
                     ConfigurationFileUtils.getConfigurationDefaultValue(
                         "sub_compaction_thread_count")));
     if (newConfigSubtaskNum <= 0) {
-      LOGGER.error("sub_compaction_thread_count must greater than 0");
-      return false;
+      newConfigSubtaskNum = 1;
     }
     if (newConfigSubtaskNum == conf.getSubCompactionTaskNum()) {
       return false;
@@ -2115,7 +2114,7 @@ public class IoTDBDescriptor {
     return true;
   }
 
-  private void loadCompactionIsEnabledHotModifiedProps(TrimProperties 
properties)
+  private boolean loadCompactionIsEnabledHotModifiedProps(TrimProperties 
properties)
       throws IOException {
     boolean isCompactionEnabled =
         conf.isEnableSeqSpaceCompaction()
@@ -2159,14 +2158,10 @@ public class IoTDBDescriptor {
             || newConfigEnableSeqSpaceCompaction
             || newConfigEnableUnseqSpaceCompaction;
 
-    if (!isCompactionEnabled && compactionEnabledInNewConfig) {
-      LOGGER.error("Compaction cannot start in current status.");
-      return;
-    }
-
     conf.setEnableCrossSpaceCompaction(newConfigEnableCrossSpaceCompaction);
     conf.setEnableSeqSpaceCompaction(newConfigEnableSeqSpaceCompaction);
     conf.setEnableUnseqSpaceCompaction(newConfigEnableUnseqSpaceCompaction);
+    return !isCompactionEnabled && compactionEnabledInNewConfig;
   }
 
   private void loadWALHotModifiedProps(TrimProperties properties) throws 
IOException {
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/thrift/impl/DataNodeInternalRPCServiceImpl.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/thrift/impl/DataNodeInternalRPCServiceImpl.java
index 90635c1e029..137adeaad98 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/thrift/impl/DataNodeInternalRPCServiceImpl.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/thrift/impl/DataNodeInternalRPCServiceImpl.java
@@ -77,7 +77,6 @@ import 
org.apache.iotdb.consensus.exception.ConsensusException;
 import 
org.apache.iotdb.consensus.exception.ConsensusGroupAlreadyExistException;
 import org.apache.iotdb.consensus.exception.ConsensusGroupNotExistException;
 import org.apache.iotdb.db.auth.AuthorityChecker;
-import org.apache.iotdb.db.conf.IoTDBConfig;
 import org.apache.iotdb.db.conf.IoTDBDescriptor;
 import org.apache.iotdb.db.consensus.DataRegionConsensusImpl;
 import org.apache.iotdb.db.consensus.SchemaRegionConsensusImpl;
@@ -167,6 +166,7 @@ import org.apache.iotdb.db.service.metrics.FileMetrics;
 import org.apache.iotdb.db.storageengine.StorageEngine;
 import 
org.apache.iotdb.db.storageengine.dataregion.compaction.repair.RepairTaskStatus;
 import 
org.apache.iotdb.db.storageengine.dataregion.compaction.schedule.CompactionScheduleTaskManager;
+import 
org.apache.iotdb.db.storageengine.dataregion.compaction.schedule.CompactionTaskManager;
 import 
org.apache.iotdb.db.storageengine.dataregion.compaction.settle.SettleRequestHandler;
 import 
org.apache.iotdb.db.storageengine.dataregion.modification.DeletionPredicate;
 import org.apache.iotdb.db.storageengine.dataregion.modification.IDPredicate;
@@ -2067,11 +2067,10 @@ public class DataNodeInternalRPCServiceImpl implements 
IDataNodeRPCService.Iface
     if (!storageEngine.isReadyForNonReadWriteFunctions()) {
       return RpcUtils.getStatus(TSStatusCode.EXECUTE_STATEMENT_ERROR, "not all 
sg is ready");
     }
-    IoTDBConfig iotdbConfig = IoTDBDescriptor.getInstance().getConfig();
-    if (!iotdbConfig.isEnableSeqSpaceCompaction() || 
!iotdbConfig.isEnableUnseqSpaceCompaction()) {
+    if (!CompactionTaskManager.getInstance().isInit()) {
       return RpcUtils.getStatus(
           TSStatusCode.EXECUTE_STATEMENT_ERROR,
-          "cannot start repair task because inner space compaction is not 
enabled");
+          "cannot start repair task because compaction is not enabled");
     }
     try {
       if (storageEngine.repairData()) {
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/executor/ClusterConfigTaskExecutor.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/executor/ClusterConfigTaskExecutor.java
index 0ee63cd981a..b19a6ded9bc 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/executor/ClusterConfigTaskExecutor.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/executor/ClusterConfigTaskExecutor.java
@@ -135,7 +135,6 @@ import 
org.apache.iotdb.confignode.rpc.thrift.TShowVariablesResp;
 import org.apache.iotdb.confignode.rpc.thrift.TSpaceQuotaResp;
 import org.apache.iotdb.confignode.rpc.thrift.TThrottleQuotaResp;
 import org.apache.iotdb.confignode.rpc.thrift.TUnsetSchemaTemplateReq;
-import org.apache.iotdb.db.conf.IoTDBConfig;
 import org.apache.iotdb.db.conf.IoTDBDescriptor;
 import org.apache.iotdb.db.exception.BatchProcessException;
 import org.apache.iotdb.db.exception.StorageEngineException;
@@ -265,6 +264,7 @@ import 
org.apache.iotdb.db.service.DataNodeInternalRPCService;
 import org.apache.iotdb.db.storageengine.StorageEngine;
 import 
org.apache.iotdb.db.storageengine.dataregion.compaction.repair.RepairTaskStatus;
 import 
org.apache.iotdb.db.storageengine.dataregion.compaction.schedule.CompactionScheduleTaskManager;
+import 
org.apache.iotdb.db.storageengine.dataregion.compaction.schedule.CompactionTaskManager;
 import org.apache.iotdb.db.trigger.service.TriggerClassLoader;
 import org.apache.iotdb.pipe.api.PipePlugin;
 import org.apache.iotdb.pipe.api.customizer.parameter.PipeParameters;
@@ -1198,12 +1198,10 @@ public class ClusterConfigTaskExecutor implements 
IConfigTaskExecutor {
                 "not all sg is ready", 
TSStatusCode.EXECUTE_STATEMENT_ERROR.getStatusCode()));
         return future;
       }
-      IoTDBConfig iotdbConfig = IoTDBDescriptor.getInstance().getConfig();
-      if (!iotdbConfig.isEnableSeqSpaceCompaction()
-          || !iotdbConfig.isEnableUnseqSpaceCompaction()) {
+      if (!CompactionTaskManager.getInstance().isInit()) {
         future.setException(
             new IoTDBException(
-                "cannot start repair task because inner space compaction is 
not enabled",
+                "cannot start repair task because compaction is not enabled",
                 TSStatusCode.EXECUTE_STATEMENT_ERROR.getStatusCode()));
         return future;
       }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/schedule/CompactionTaskManager.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/schedule/CompactionTaskManager.java
index 3c9021f29c6..bd0c01b6dc2 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/schedule/CompactionTaskManager.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/schedule/CompactionTaskManager.java
@@ -118,11 +118,7 @@ public class CompactionTaskManager implements IService {
 
   @Override
   public synchronized void start() {
-    if (taskExecutionPool == null
-        && 
IoTDBDescriptor.getInstance().getConfig().getCompactionThreadCount() > 0
-        && (config.isEnableSeqSpaceCompaction()
-            || config.isEnableUnseqSpaceCompaction()
-            || config.isEnableCrossSpaceCompaction())) {
+    if (!init) {
       initThreadPool();
       candidateCompactionTaskQueue.regsitPollLastHook(
           
AbstractCompactionTask::resetCompactionCandidateStatusForAllSourceFiles);
@@ -132,6 +128,10 @@ public class CompactionTaskManager implements IService {
     logger.info("Compaction task manager started.");
   }
 
+  public boolean isInit() {
+    return this.init;
+  }
+
   private void initThreadPool() {
     int compactionThreadNum = 
IoTDBDescriptor.getInstance().getConfig().getCompactionThreadCount();
     this.taskExecutionPool =
@@ -222,6 +222,7 @@ public class CompactionTaskManager implements IService {
     }
     taskExecutionPool = null;
     subCompactionTaskExecutionPool = null;
+    init = false;
     storageGroupTasks.clear();
     logger.info("CompactionManager stopped");
   }

Reply via email to