This is an automated email from the ASF dual-hosted git repository.

jiangtian pushed a commit to branch dev/1.3
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/dev/1.3 by this push:
     new bc966a68991 [to dev/1.3] Allow hot reloading compaction from disabled 
status (#14470) (#14480)
bc966a68991 is described below

commit bc966a68991acdfad911def40de9e4a751fd2bd4
Author: shuwenwei <[email protected]>
AuthorDate: Wed Dec 18 17:43:51 2024 +0800

    [to dev/1.3] Allow hot reloading compaction from disabled status (#14470) 
(#14480)
    
    * Allow hot reloading compaction from disabled status (#14470)
    
    * allow hot reloading compaction from disabled status
    
    * Modify the value setting of the parameter to conform to the comment.
    
    * fix ut
    
    * Initialize compaction schedule even when compaction is not enabled
---
 .../org/apache/iotdb/db/conf/IoTDBDescriptor.java  | 37 ++++++++++++----------
 .../impl/DataNodeInternalRPCServiceImpl.java       |  7 ++--
 .../config/executor/ClusterConfigTaskExecutor.java |  8 ++---
 .../db/storageengine/dataregion/DataRegion.java    |  5 ---
 .../compaction/schedule/CompactionTaskManager.java | 11 ++++---
 5 files changed, 32 insertions(+), 36 deletions(-)

diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java
index 47d0f938fb3..fefc0614b2d 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java
@@ -623,12 +623,20 @@ public class IoTDBDescriptor {
     }
     conf.setMergeIntervalSec(
         Long.parseLong(
-            properties.getProperty(
-                "merge_interval_sec", 
Long.toString(conf.getMergeIntervalSec()))));
-    conf.setCompactionThreadCount(
+            Optional.ofNullable(
+                    properties.getProperty(
+                        "merge_interval_sec", 
Long.toString(conf.getMergeIntervalSec())))
+                .map(String::trim)
+                .orElse(Long.toString(conf.getMergeIntervalSec()))));
+    int compactionThreadCount =
         Integer.parseInt(
-            properties.getProperty(
-                "compaction_thread_count", 
Integer.toString(conf.getCompactionThreadCount()))));
+            Optional.ofNullable(
+                    properties.getProperty(
+                        "compaction_thread_count",
+                        Integer.toString(conf.getCompactionThreadCount())))
+                .map(String::trim)
+                .orElse(Integer.toString(conf.getCompactionThreadCount())));
+    conf.setCompactionThreadCount(compactionThreadCount <= 0 ? 1 : 
compactionThreadCount);
     int maxConcurrentAlignedSeriesInCompaction =
         Integer.parseInt(
             properties.getProperty(
@@ -1209,8 +1217,8 @@ public class IoTDBDescriptor {
 
     
CompactionScheduleTaskManager.getInstance().checkAndMayApplyConfigurationChange();
     // hot load compaction task manager configurations
-    loadCompactionIsEnabledHotModifiedProps(properties);
-    boolean restartCompactionTaskManager = 
loadCompactionThreadCountHotModifiedProps(properties);
+    boolean restartCompactionTaskManager = 
loadCompactionIsEnabledHotModifiedProps(properties);
+    restartCompactionTaskManager |= 
loadCompactionThreadCountHotModifiedProps(properties);
     restartCompactionTaskManager |= 
loadCompactionSubTaskCountHotModifiedProps(properties);
     if (restartCompactionTaskManager) {
       CompactionTaskManager.getInstance().restart();
@@ -1425,8 +1433,7 @@ public class IoTDBDescriptor {
                 "compaction_thread_count",
                 
ConfigurationFileUtils.getConfigurationDefaultValue("compaction_thread_count")));
     if (newConfigCompactionThreadCount <= 0) {
-      LOGGER.error("compaction_thread_count must greater than 0");
-      return false;
+      newConfigCompactionThreadCount = 1;
     }
     if (newConfigCompactionThreadCount == conf.getCompactionThreadCount()) {
       return false;
@@ -1448,8 +1455,7 @@ public class IoTDBDescriptor {
                 ConfigurationFileUtils.getConfigurationDefaultValue(
                     "sub_compaction_thread_count")));
     if (newConfigSubtaskNum <= 0) {
-      LOGGER.error("sub_compaction_thread_count must greater than 0");
-      return false;
+      newConfigSubtaskNum = 1;
     }
     if (newConfigSubtaskNum == conf.getSubCompactionTaskNum()) {
       return false;
@@ -1458,7 +1464,8 @@ public class IoTDBDescriptor {
     return true;
   }
 
-  private void loadCompactionIsEnabledHotModifiedProps(Properties properties) 
throws IOException {
+  private boolean loadCompactionIsEnabledHotModifiedProps(Properties 
properties)
+      throws IOException {
     boolean isCompactionEnabled =
         conf.isEnableSeqSpaceCompaction()
             || conf.isEnableUnseqSpaceCompaction()
@@ -1486,14 +1493,10 @@ public class IoTDBDescriptor {
             || newConfigEnableSeqSpaceCompaction
             || newConfigEnableUnseqSpaceCompaction;
 
-    if (!isCompactionEnabled && compactionEnabledInNewConfig) {
-      LOGGER.error("Compaction cannot start in current status.");
-      return;
-    }
-
     conf.setEnableCrossSpaceCompaction(newConfigEnableCrossSpaceCompaction);
     conf.setEnableSeqSpaceCompaction(newConfigEnableSeqSpaceCompaction);
     conf.setEnableUnseqSpaceCompaction(newConfigEnableUnseqSpaceCompaction);
+    return !isCompactionEnabled && compactionEnabledInNewConfig;
   }
 
   private void loadWALHotModifiedProps(Properties properties) throws 
IOException {
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/thrift/impl/DataNodeInternalRPCServiceImpl.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/thrift/impl/DataNodeInternalRPCServiceImpl.java
index b3544644bf4..34dde7d0b30 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/thrift/impl/DataNodeInternalRPCServiceImpl.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/thrift/impl/DataNodeInternalRPCServiceImpl.java
@@ -70,7 +70,6 @@ import 
org.apache.iotdb.consensus.exception.ConsensusException;
 import 
org.apache.iotdb.consensus.exception.ConsensusGroupAlreadyExistException;
 import org.apache.iotdb.consensus.exception.ConsensusGroupNotExistException;
 import org.apache.iotdb.db.auth.AuthorityChecker;
-import org.apache.iotdb.db.conf.IoTDBConfig;
 import org.apache.iotdb.db.conf.IoTDBDescriptor;
 import org.apache.iotdb.db.consensus.DataRegionConsensusImpl;
 import org.apache.iotdb.db.consensus.SchemaRegionConsensusImpl;
@@ -148,6 +147,7 @@ import org.apache.iotdb.db.service.metrics.FileMetrics;
 import org.apache.iotdb.db.storageengine.StorageEngine;
 import 
org.apache.iotdb.db.storageengine.dataregion.compaction.repair.RepairTaskStatus;
 import 
org.apache.iotdb.db.storageengine.dataregion.compaction.schedule.CompactionScheduleTaskManager;
+import 
org.apache.iotdb.db.storageengine.dataregion.compaction.schedule.CompactionTaskManager;
 import 
org.apache.iotdb.db.storageengine.dataregion.compaction.settle.SettleRequestHandler;
 import 
org.apache.iotdb.db.storageengine.rescon.quotas.DataNodeSpaceQuotaManager;
 import 
org.apache.iotdb.db.storageengine.rescon.quotas.DataNodeThrottleQuotaManager;
@@ -1806,11 +1806,10 @@ public class DataNodeInternalRPCServiceImpl implements 
IDataNodeRPCService.Iface
     if (!storageEngine.isReadyForNonReadWriteFunctions()) {
       return RpcUtils.getStatus(TSStatusCode.EXECUTE_STATEMENT_ERROR, "not all 
sg is ready");
     }
-    IoTDBConfig iotdbConfig = IoTDBDescriptor.getInstance().getConfig();
-    if (!iotdbConfig.isEnableSeqSpaceCompaction() || 
!iotdbConfig.isEnableUnseqSpaceCompaction()) {
+    if (!CompactionTaskManager.getInstance().isInit()) {
       return RpcUtils.getStatus(
           TSStatusCode.EXECUTE_STATEMENT_ERROR,
-          "cannot start repair task because inner space compaction is not 
enabled");
+          "cannot start repair task because compaction is not enabled");
     }
     try {
       if (storageEngine.repairData()) {
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/executor/ClusterConfigTaskExecutor.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/executor/ClusterConfigTaskExecutor.java
index b4f11e0a18d..da95ed794db 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/executor/ClusterConfigTaskExecutor.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/executor/ClusterConfigTaskExecutor.java
@@ -118,7 +118,6 @@ import 
org.apache.iotdb.confignode.rpc.thrift.TShowVariablesResp;
 import org.apache.iotdb.confignode.rpc.thrift.TSpaceQuotaResp;
 import org.apache.iotdb.confignode.rpc.thrift.TThrottleQuotaResp;
 import org.apache.iotdb.confignode.rpc.thrift.TUnsetSchemaTemplateReq;
-import org.apache.iotdb.db.conf.IoTDBConfig;
 import org.apache.iotdb.db.conf.IoTDBDescriptor;
 import org.apache.iotdb.db.exception.BatchProcessException;
 import org.apache.iotdb.db.exception.StorageEngineException;
@@ -231,6 +230,7 @@ import 
org.apache.iotdb.db.schemaengine.template.alter.TemplateExtendInfo;
 import org.apache.iotdb.db.storageengine.StorageEngine;
 import 
org.apache.iotdb.db.storageengine.dataregion.compaction.repair.RepairTaskStatus;
 import 
org.apache.iotdb.db.storageengine.dataregion.compaction.schedule.CompactionScheduleTaskManager;
+import 
org.apache.iotdb.db.storageengine.dataregion.compaction.schedule.CompactionTaskManager;
 import org.apache.iotdb.db.trigger.service.TriggerClassLoader;
 import org.apache.iotdb.pipe.api.PipePlugin;
 import org.apache.iotdb.pipe.api.customizer.parameter.PipeParameters;
@@ -1117,12 +1117,10 @@ public class ClusterConfigTaskExecutor implements 
IConfigTaskExecutor {
                 "not all sg is ready", 
TSStatusCode.EXECUTE_STATEMENT_ERROR.getStatusCode()));
         return future;
       }
-      IoTDBConfig iotdbConfig = IoTDBDescriptor.getInstance().getConfig();
-      if (!iotdbConfig.isEnableSeqSpaceCompaction()
-          || !iotdbConfig.isEnableUnseqSpaceCompaction()) {
+      if (!CompactionTaskManager.getInstance().isInit()) {
         future.setException(
             new IoTDBException(
-                "cannot start repair task because inner space compaction is 
not enabled",
+                "cannot start repair task because compaction is not enabled",
                 TSStatusCode.EXECUTE_STATEMENT_ERROR.getStatusCode()));
         return future;
       }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/DataRegion.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/DataRegion.java
index 7fd37e67587..98855a87d83 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/DataRegion.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/DataRegion.java
@@ -677,11 +677,6 @@ public class DataRegion implements IDataRegionForQuery {
   }
 
   public void initCompactionSchedule() {
-    if (!config.isEnableSeqSpaceCompaction()
-        && !config.isEnableUnseqSpaceCompaction()
-        && !config.isEnableCrossSpaceCompaction()) {
-      return;
-    }
     RepairUnsortedFileCompactionTask.recoverAllocatedFileTimestamp(
         tsFileManager.getMaxFileTimestampOfUnSequenceFile());
     CompactionScheduleTaskManager.getInstance().registerDataRegion(this);
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/schedule/CompactionTaskManager.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/schedule/CompactionTaskManager.java
index 3c9021f29c6..bd0c01b6dc2 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/schedule/CompactionTaskManager.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/schedule/CompactionTaskManager.java
@@ -118,11 +118,7 @@ public class CompactionTaskManager implements IService {
 
   @Override
   public synchronized void start() {
-    if (taskExecutionPool == null
-        && 
IoTDBDescriptor.getInstance().getConfig().getCompactionThreadCount() > 0
-        && (config.isEnableSeqSpaceCompaction()
-            || config.isEnableUnseqSpaceCompaction()
-            || config.isEnableCrossSpaceCompaction())) {
+    if (!init) {
       initThreadPool();
       candidateCompactionTaskQueue.regsitPollLastHook(
           
AbstractCompactionTask::resetCompactionCandidateStatusForAllSourceFiles);
@@ -132,6 +128,10 @@ public class CompactionTaskManager implements IService {
     logger.info("Compaction task manager started.");
   }
 
+  public boolean isInit() {
+    return this.init;
+  }
+
   private void initThreadPool() {
     int compactionThreadNum = 
IoTDBDescriptor.getInstance().getConfig().getCompactionThreadCount();
     this.taskExecutionPool =
@@ -222,6 +222,7 @@ public class CompactionTaskManager implements IService {
     }
     taskExecutionPool = null;
     subCompactionTaskExecutionPool = null;
+    init = false;
     storageGroupTasks.clear();
     logger.info("CompactionManager stopped");
   }

Reply via email to