This is an automated email from the ASF dual-hosted git repository.
xingtanzjr pushed a commit to branch rel/1.2
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/rel/1.2 by this push:
new 720ad0d5b8f [To rel/1.2] add hot load compaction configs (#10759)
720ad0d5b8f is described below
commit 720ad0d5b8f08bc740f97ed981382072570ac794
Author: shuwenwei <[email protected]>
AuthorDate: Wed Sep 6 15:16:10 2023 +0800
[To rel/1.2] add hot load compaction configs (#10759)
---
.../org/apache/iotdb/db/conf/IoTDBDescriptor.java | 92 ++++++++++++++++++++++
.../compaction/schedule/CompactionTaskManager.java | 1 -
2 files changed, 92 insertions(+), 1 deletion(-)
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java
index 1fe268bb085..114354ac009 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java
@@ -35,6 +35,7 @@ import
org.apache.iotdb.db.storageengine.dataregion.compaction.constant.Compacti
import
org.apache.iotdb.db.storageengine.dataregion.compaction.execute.performer.constant.CrossCompactionPerformer;
import
org.apache.iotdb.db.storageengine.dataregion.compaction.execute.performer.constant.InnerSeqCompactionPerformer;
import
org.apache.iotdb.db.storageengine.dataregion.compaction.execute.performer.constant.InnerUnseqCompactionPerformer;
+import
org.apache.iotdb.db.storageengine.dataregion.compaction.schedule.CompactionTaskManager;
import
org.apache.iotdb.db.storageengine.dataregion.compaction.schedule.constant.CompactionPriority;
import
org.apache.iotdb.db.storageengine.dataregion.compaction.selector.constant.CrossCompactionSelector;
import
org.apache.iotdb.db.storageengine.dataregion.compaction.selector.constant.InnerSequenceCompactionSelector;
@@ -1101,6 +1102,94 @@ public class IoTDBDescriptor {
loadWALHotModifiedProps(properties);
}
+ private void loadCompactionHotModifiedProps(Properties properties) throws
InterruptedException {
+ conf.setCompactionValidationLevel(
+ CompactionValidationLevel.valueOf(
+ properties.getProperty(
+ "compaction_validation_level",
conf.getCompactionValidationLevel().toString())));
+
+ loadCompactionIsEnabledHotModifiedProps(properties);
+
+ boolean restartCompactionTaskManager =
loadCompactionThreadCountHotModifiedProps(properties);
+
+ restartCompactionTaskManager |=
loadCompactionSubTaskCountHotModifiedProps(properties);
+
+ if (restartCompactionTaskManager) {
+ CompactionTaskManager.getInstance().restart();
+ }
+ }
+
+ private boolean loadCompactionThreadCountHotModifiedProps(Properties
properties) {
+ int newConfigCompactionThreadCount =
+ Integer.parseInt(
+ properties.getProperty(
+ "compaction_thread_count",
Integer.toString(conf.getCompactionThreadCount())));
+ if (newConfigCompactionThreadCount <= 0) {
+ logger.error("compaction_thread_count must greater than 0");
+ return false;
+ }
+ if (newConfigCompactionThreadCount == conf.getCompactionThreadCount()) {
+ return false;
+ }
+ conf.setCompactionThreadCount(
+ Integer.parseInt(
+ properties.getProperty(
+ "compaction_thread_count",
Integer.toString(conf.getCompactionThreadCount()))));
+ return true;
+ }
+
+ private boolean loadCompactionSubTaskCountHotModifiedProps(Properties
properties) {
+ int newConfigSubtaskNum =
+ Integer.parseInt(
+ properties.getProperty(
+ "sub_compaction_thread_count",
Integer.toString(conf.getSubCompactionTaskNum())));
+ if (newConfigSubtaskNum <= 0) {
+ logger.error("sub_compaction_thread_count must greater than 0");
+ return false;
+ }
+ if (newConfigSubtaskNum == conf.getSubCompactionTaskNum()) {
+ return false;
+ }
+ conf.setSubCompactionTaskNum(newConfigSubtaskNum);
+ return true;
+ }
+
+ private void loadCompactionIsEnabledHotModifiedProps(Properties properties) {
+ boolean isCompactionEnabled =
+ conf.isEnableSeqSpaceCompaction()
+ || conf.isEnableUnseqSpaceCompaction()
+ || conf.isEnableCrossSpaceCompaction();
+
+ boolean newConfigEnableCrossSpaceCompaction =
+ Boolean.parseBoolean(
+ properties.getProperty(
+ "enable_cross_space_compaction",
+ Boolean.toString(conf.isEnableCrossSpaceCompaction())));
+ boolean newConfigEnableSeqSpaceCompaction =
+ Boolean.parseBoolean(
+ properties.getProperty(
+ "enable_seq_space_compaction",
+ Boolean.toString(conf.isEnableSeqSpaceCompaction())));
+ boolean newConfigEnableUnseqSpaceCompaction =
+ Boolean.parseBoolean(
+ properties.getProperty(
+ "enable_unseq_space_compaction",
+ Boolean.toString(conf.isEnableUnseqSpaceCompaction())));
+ boolean compactionEnabledInNewConfig =
+ newConfigEnableCrossSpaceCompaction
+ || newConfigEnableSeqSpaceCompaction
+ || newConfigEnableUnseqSpaceCompaction;
+
+ if (!isCompactionEnabled && compactionEnabledInNewConfig) {
+ logger.error("Compaction cannot start in current status.");
+ return;
+ }
+
+ conf.setEnableCrossSpaceCompaction(newConfigEnableCrossSpaceCompaction);
+ conf.setEnableSeqSpaceCompaction(newConfigEnableSeqSpaceCompaction);
+ conf.setEnableUnseqSpaceCompaction(newConfigEnableUnseqSpaceCompaction);
+ }
+
private void loadWALHotModifiedProps(Properties properties) {
long walAsyncModeFsyncDelayInMs =
Long.parseLong(
@@ -1518,6 +1607,9 @@ public class IoTDBDescriptor {
WALManager.getInstance().rebootWALDeleteThread();
}
+ // update compaction config
+ loadCompactionHotModifiedProps(properties);
+
// update schema quota configuration
conf.setClusterSchemaLimitLevel(
properties
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/schedule/CompactionTaskManager.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/schedule/CompactionTaskManager.java
index 6f8e561a3f9..74f3a99f483 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/schedule/CompactionTaskManager.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/schedule/CompactionTaskManager.java
@@ -398,7 +398,6 @@ public class CompactionTaskManager implements IService {
return storageGroupName + "-" + dataRegionId;
}
- @TestOnly
public void restart() throws InterruptedException {
if (IoTDBDescriptor.getInstance().getConfig().getCompactionThreadCount() >
0) {
if (subCompactionTaskExecutionPool != null) {