This is an automated email from the ASF dual-hosted git repository.
jiangtian pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new 91b672da649 Allow hot reloading compaction from disabled status
(#14470)
91b672da649 is described below
commit 91b672da649a0e004f8fff3693c866f80be3f338
Author: shuwenwei <[email protected]>
AuthorDate: Wed Dec 18 11:58:09 2024 +0800
Allow hot reloading compaction from disabled status (#14470)
* allow hot reloading compaction from disabled status
* Modify the value setting of the parameter to conform to the comment.
* fix ut
---
.../org/apache/iotdb/db/conf/IoTDBDescriptor.java | 23 +++++++++-------------
.../impl/DataNodeInternalRPCServiceImpl.java | 7 +++----
.../config/executor/ClusterConfigTaskExecutor.java | 8 +++-----
.../compaction/schedule/CompactionTaskManager.java | 11 ++++++-----
4 files changed, 21 insertions(+), 28 deletions(-)
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java
index 87888d0da87..fce4bb4e933 100755
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java
@@ -770,14 +770,15 @@ public class IoTDBDescriptor {
"merge_interval_sec",
Long.toString(conf.getMergeIntervalSec())))
.map(String::trim)
.orElse(Long.toString(conf.getMergeIntervalSec()))));
- conf.setCompactionThreadCount(
+ int compactionThreadCount =
Integer.parseInt(
Optional.ofNullable(
properties.getProperty(
"compaction_thread_count",
Integer.toString(conf.getCompactionThreadCount())))
.map(String::trim)
- .orElse(Integer.toString(conf.getCompactionThreadCount()))));
+ .orElse(Integer.toString(conf.getCompactionThreadCount())));
+ conf.setCompactionThreadCount(compactionThreadCount <= 0 ? 1 :
compactionThreadCount);
int maxConcurrentAlignedSeriesInCompaction =
Integer.parseInt(
Optional.ofNullable(
@@ -1767,8 +1768,8 @@ public class IoTDBDescriptor {
CompactionScheduleTaskManager.getInstance().checkAndMayApplyConfigurationChange();
// hot load compaction task manager configurations
- loadCompactionIsEnabledHotModifiedProps(properties);
- boolean restartCompactionTaskManager =
loadCompactionThreadCountHotModifiedProps(properties);
+ boolean restartCompactionTaskManager =
loadCompactionIsEnabledHotModifiedProps(properties);
+ restartCompactionTaskManager |=
loadCompactionThreadCountHotModifiedProps(properties);
restartCompactionTaskManager |=
loadCompactionSubTaskCountHotModifiedProps(properties);
if (restartCompactionTaskManager) {
CompactionTaskManager.getInstance().restart();
@@ -2071,8 +2072,7 @@ public class IoTDBDescriptor {
ConfigurationFileUtils.getConfigurationDefaultValue(
"compaction_thread_count")));
if (newConfigCompactionThreadCount <= 0) {
- LOGGER.error("compaction_thread_count must greater than 0");
- return false;
+ newConfigCompactionThreadCount = 1;
}
if (newConfigCompactionThreadCount == conf.getCompactionThreadCount()) {
return false;
@@ -2105,8 +2105,7 @@ public class IoTDBDescriptor {
ConfigurationFileUtils.getConfigurationDefaultValue(
"sub_compaction_thread_count")));
if (newConfigSubtaskNum <= 0) {
- LOGGER.error("sub_compaction_thread_count must greater than 0");
- return false;
+ newConfigSubtaskNum = 1;
}
if (newConfigSubtaskNum == conf.getSubCompactionTaskNum()) {
return false;
@@ -2115,7 +2114,7 @@ public class IoTDBDescriptor {
return true;
}
- private void loadCompactionIsEnabledHotModifiedProps(TrimProperties
properties)
+ private boolean loadCompactionIsEnabledHotModifiedProps(TrimProperties
properties)
throws IOException {
boolean isCompactionEnabled =
conf.isEnableSeqSpaceCompaction()
@@ -2159,14 +2158,10 @@ public class IoTDBDescriptor {
|| newConfigEnableSeqSpaceCompaction
|| newConfigEnableUnseqSpaceCompaction;
- if (!isCompactionEnabled && compactionEnabledInNewConfig) {
- LOGGER.error("Compaction cannot start in current status.");
- return;
- }
-
conf.setEnableCrossSpaceCompaction(newConfigEnableCrossSpaceCompaction);
conf.setEnableSeqSpaceCompaction(newConfigEnableSeqSpaceCompaction);
conf.setEnableUnseqSpaceCompaction(newConfigEnableUnseqSpaceCompaction);
+ return !isCompactionEnabled && compactionEnabledInNewConfig;
}
private void loadWALHotModifiedProps(TrimProperties properties) throws
IOException {
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/thrift/impl/DataNodeInternalRPCServiceImpl.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/thrift/impl/DataNodeInternalRPCServiceImpl.java
index 90635c1e029..137adeaad98 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/thrift/impl/DataNodeInternalRPCServiceImpl.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/thrift/impl/DataNodeInternalRPCServiceImpl.java
@@ -77,7 +77,6 @@ import
org.apache.iotdb.consensus.exception.ConsensusException;
import
org.apache.iotdb.consensus.exception.ConsensusGroupAlreadyExistException;
import org.apache.iotdb.consensus.exception.ConsensusGroupNotExistException;
import org.apache.iotdb.db.auth.AuthorityChecker;
-import org.apache.iotdb.db.conf.IoTDBConfig;
import org.apache.iotdb.db.conf.IoTDBDescriptor;
import org.apache.iotdb.db.consensus.DataRegionConsensusImpl;
import org.apache.iotdb.db.consensus.SchemaRegionConsensusImpl;
@@ -167,6 +166,7 @@ import org.apache.iotdb.db.service.metrics.FileMetrics;
import org.apache.iotdb.db.storageengine.StorageEngine;
import
org.apache.iotdb.db.storageengine.dataregion.compaction.repair.RepairTaskStatus;
import
org.apache.iotdb.db.storageengine.dataregion.compaction.schedule.CompactionScheduleTaskManager;
+import
org.apache.iotdb.db.storageengine.dataregion.compaction.schedule.CompactionTaskManager;
import
org.apache.iotdb.db.storageengine.dataregion.compaction.settle.SettleRequestHandler;
import
org.apache.iotdb.db.storageengine.dataregion.modification.DeletionPredicate;
import org.apache.iotdb.db.storageengine.dataregion.modification.IDPredicate;
@@ -2067,11 +2067,10 @@ public class DataNodeInternalRPCServiceImpl implements
IDataNodeRPCService.Iface
if (!storageEngine.isReadyForNonReadWriteFunctions()) {
return RpcUtils.getStatus(TSStatusCode.EXECUTE_STATEMENT_ERROR, "not all
sg is ready");
}
- IoTDBConfig iotdbConfig = IoTDBDescriptor.getInstance().getConfig();
- if (!iotdbConfig.isEnableSeqSpaceCompaction() ||
!iotdbConfig.isEnableUnseqSpaceCompaction()) {
+ if (!CompactionTaskManager.getInstance().isInit()) {
return RpcUtils.getStatus(
TSStatusCode.EXECUTE_STATEMENT_ERROR,
- "cannot start repair task because inner space compaction is not
enabled");
+ "cannot start repair task because compaction is not enabled");
}
try {
if (storageEngine.repairData()) {
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/executor/ClusterConfigTaskExecutor.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/executor/ClusterConfigTaskExecutor.java
index 0ee63cd981a..b19a6ded9bc 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/executor/ClusterConfigTaskExecutor.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/executor/ClusterConfigTaskExecutor.java
@@ -135,7 +135,6 @@ import
org.apache.iotdb.confignode.rpc.thrift.TShowVariablesResp;
import org.apache.iotdb.confignode.rpc.thrift.TSpaceQuotaResp;
import org.apache.iotdb.confignode.rpc.thrift.TThrottleQuotaResp;
import org.apache.iotdb.confignode.rpc.thrift.TUnsetSchemaTemplateReq;
-import org.apache.iotdb.db.conf.IoTDBConfig;
import org.apache.iotdb.db.conf.IoTDBDescriptor;
import org.apache.iotdb.db.exception.BatchProcessException;
import org.apache.iotdb.db.exception.StorageEngineException;
@@ -265,6 +264,7 @@ import
org.apache.iotdb.db.service.DataNodeInternalRPCService;
import org.apache.iotdb.db.storageengine.StorageEngine;
import
org.apache.iotdb.db.storageengine.dataregion.compaction.repair.RepairTaskStatus;
import
org.apache.iotdb.db.storageengine.dataregion.compaction.schedule.CompactionScheduleTaskManager;
+import
org.apache.iotdb.db.storageengine.dataregion.compaction.schedule.CompactionTaskManager;
import org.apache.iotdb.db.trigger.service.TriggerClassLoader;
import org.apache.iotdb.pipe.api.PipePlugin;
import org.apache.iotdb.pipe.api.customizer.parameter.PipeParameters;
@@ -1198,12 +1198,10 @@ public class ClusterConfigTaskExecutor implements
IConfigTaskExecutor {
"not all sg is ready",
TSStatusCode.EXECUTE_STATEMENT_ERROR.getStatusCode()));
return future;
}
- IoTDBConfig iotdbConfig = IoTDBDescriptor.getInstance().getConfig();
- if (!iotdbConfig.isEnableSeqSpaceCompaction()
- || !iotdbConfig.isEnableUnseqSpaceCompaction()) {
+ if (!CompactionTaskManager.getInstance().isInit()) {
future.setException(
new IoTDBException(
- "cannot start repair task because inner space compaction is
not enabled",
+ "cannot start repair task because compaction is not enabled",
TSStatusCode.EXECUTE_STATEMENT_ERROR.getStatusCode()));
return future;
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/schedule/CompactionTaskManager.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/schedule/CompactionTaskManager.java
index 3c9021f29c6..bd0c01b6dc2 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/schedule/CompactionTaskManager.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/schedule/CompactionTaskManager.java
@@ -118,11 +118,7 @@ public class CompactionTaskManager implements IService {
@Override
public synchronized void start() {
- if (taskExecutionPool == null
- &&
IoTDBDescriptor.getInstance().getConfig().getCompactionThreadCount() > 0
- && (config.isEnableSeqSpaceCompaction()
- || config.isEnableUnseqSpaceCompaction()
- || config.isEnableCrossSpaceCompaction())) {
+ if (!init) {
initThreadPool();
candidateCompactionTaskQueue.regsitPollLastHook(
AbstractCompactionTask::resetCompactionCandidateStatusForAllSourceFiles);
@@ -132,6 +128,10 @@ public class CompactionTaskManager implements IService {
logger.info("Compaction task manager started.");
}
+ public boolean isInit() {
+ return this.init;
+ }
+
private void initThreadPool() {
int compactionThreadNum =
IoTDBDescriptor.getInstance().getConfig().getCompactionThreadCount();
this.taskExecutionPool =
@@ -222,6 +222,7 @@ public class CompactionTaskManager implements IService {
}
taskExecutionPool = null;
subCompactionTaskExecutionPool = null;
+ init = false;
storageGroupTasks.clear();
logger.info("CompactionManager stopped");
}