This is an automated email from the ASF dual-hosted git repository.

rong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/master by this push:
     new 3464e95f495 Load: Support configure 
load_clean_up_task_execution_delay_time_seconds of a load task (#11936)
3464e95f495 is described below

commit 3464e95f4950d3b385e73dbf8b1388fe6d82d94a
Author: Itami Sho <[email protected]>
AuthorDate: Sun Jan 21 15:20:12 2024 +0800

    Load: Support configure load_clean_up_task_execution_delay_time_seconds of 
a load task (#11936)
    
    Co-authored-by: Steve Yurong Su <[email protected]>
---
 .../main/java/org/apache/iotdb/db/conf/IoTDBConfig.java  | 16 +++++++++++++---
 .../java/org/apache/iotdb/db/conf/IoTDBDescriptor.java   | 12 ++++++++++++
 .../db/pipe/agent/runtime/PipePeriodicalJobExecutor.java |  1 +
 .../db/queryengine/execution/load/LoadTsFileManager.java |  7 +++----
 .../plan/scheduler/load/LoadTsFileScheduler.java         |  8 ++++++--
 .../src/assembly/resources/conf/iotdb-common.properties  |  8 ++++++++
 6 files changed, 43 insertions(+), 9 deletions(-)

diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
index ce5c4e5796a..0fe486f1d8c 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
@@ -1092,13 +1092,14 @@ public class IoTDBConfig {
   private double maxAllocateMemoryRatioForLoad = 0.8;
 
   private int loadTsFileAnalyzeSchemaBatchFlushTimeSeriesNumber = 4096;
-
   private long loadTsFileAnalyzeSchemaMemorySizeInBytes =
-      0; // 0 means that the decision will be adaptive based on the number of 
sequences
+      0L; // 0 means that the decision will be adaptive based on the number of 
sequences
 
-  private long loadMemoryAllocateRetryIntervalMs = 1000;
+  private long loadMemoryAllocateRetryIntervalMs = 1000L;
   private int loadMemoryAllocateMaxRetries = 5;
 
+  private long loadCleanupTaskExecutionDelayTimeSeconds = 1800L; // 30 min
+
   /** Pipe related */
   /** initialized as empty, updated based on the latest `systemDir` during 
querying */
   private String[] pipeReceiverFileDirs = new String[0];
@@ -3764,6 +3765,15 @@ public class IoTDBConfig {
     this.loadMemoryAllocateMaxRetries = loadMemoryAllocateMaxRetries;
   }
 
+  public long getLoadCleanupTaskExecutionDelayTimeSeconds() {
+    return loadCleanupTaskExecutionDelayTimeSeconds;
+  }
+
+  public void setLoadCleanupTaskExecutionDelayTimeSeconds(
+      long loadCleanupTaskExecutionDelayTimeSeconds) {
+    this.loadCleanupTaskExecutionDelayTimeSeconds = 
loadCleanupTaskExecutionDelayTimeSeconds;
+  }
+
   public void setPipeReceiverFileDirs(String[] pipeReceiverFileDirs) {
     this.pipeReceiverFileDirs = pipeReceiverFileDirs;
   }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java
index 5f0616bc08c..f6e56fd60c9 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java
@@ -902,6 +902,11 @@ public class IoTDBDescriptor {
             properties.getProperty(
                 "load_tsfile_analyze_schema_memory_size_in_bytes",
                 
String.valueOf(conf.getLoadTsFileAnalyzeSchemaMemorySizeInBytes()))));
+    conf.setLoadCleanupTaskExecutionDelayTimeSeconds(
+        Long.parseLong(
+            properties.getProperty(
+                "load_clean_up_task_execution_delay_time_seconds",
+                
String.valueOf(conf.getLoadCleanupTaskExecutionDelayTimeSeconds()))));
 
     conf.setExtPipeDir(properties.getProperty("ext_pipe_dir", 
conf.getExtPipeDir()).trim());
 
@@ -1628,6 +1633,13 @@ public class IoTDBDescriptor {
 
       // update compaction config
       loadCompactionHotModifiedProps(properties);
+
+      // update load config
+      conf.setLoadCleanupTaskExecutionDelayTimeSeconds(
+          Long.parseLong(
+              properties.getProperty(
+                  "load_clean_up_task_execution_delay_time_seconds",
+                  
String.valueOf(conf.getLoadCleanupTaskExecutionDelayTimeSeconds()))));
     } catch (Exception e) {
       throw new QueryProcessException(String.format("Fail to reload 
configuration because %s", e));
     }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/runtime/PipePeriodicalJobExecutor.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/runtime/PipePeriodicalJobExecutor.java
index 3202745533d..61000451cfd 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/runtime/PipePeriodicalJobExecutor.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/runtime/PipePeriodicalJobExecutor.java
@@ -50,6 +50,7 @@ public class PipePeriodicalJobExecutor {
 
   private static final long MIN_INTERVAL_SECONDS =
       
PipeConfig.getInstance().getPipeSubtaskExecutorCronHeartbeatEventIntervalSeconds();
+
   private long rounds;
   private Future<?> executorFuture;
 
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/load/LoadTsFileManager.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/load/LoadTsFileManager.java
index 1d5cbd0205f..bbd3d3a760d 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/load/LoadTsFileManager.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/load/LoadTsFileManager.java
@@ -32,7 +32,6 @@ import org.apache.iotdb.db.conf.IoTDBDescriptor;
 import org.apache.iotdb.db.exception.LoadFileException;
 import org.apache.iotdb.db.pipe.agent.PipeAgent;
 import 
org.apache.iotdb.db.queryengine.plan.planner.plan.node.load.LoadTsFilePieceNode;
-import org.apache.iotdb.db.queryengine.plan.scheduler.load.LoadTsFileScheduler;
 import 
org.apache.iotdb.db.queryengine.plan.scheduler.load.LoadTsFileScheduler.LoadCommand;
 import org.apache.iotdb.db.storageengine.dataregion.DataRegion;
 import 
org.apache.iotdb.db.storageengine.dataregion.modification.ModificationFile;
@@ -91,7 +90,7 @@ public class LoadTsFileManager {
         .registerPeriodicalJob(
             "LoadTsFileManager#cleanupTasks",
             this::cleanupTasks,
-            LoadTsFileScheduler.LOAD_TASK_MAX_TIME_IN_SECOND);
+            CONFIG.getLoadCleanupTaskExecutionDelayTimeSeconds() >> 2);
   }
 
   private void cleanupTasks() {
@@ -138,7 +137,7 @@ public class LoadTsFileManager {
 
       synchronized (uuid2CleanupTask) {
         final CleanupTask cleanupTask =
-            new CleanupTask(uuid, 
LoadTsFileScheduler.LOAD_TASK_MAX_TIME_IN_SECOND * 1000);
+            new CleanupTask(uuid, 
CONFIG.getLoadCleanupTaskExecutionDelayTimeSeconds() * 1000);
         uuid2CleanupTask.put(uuid, cleanupTask);
         cleanupTaskQueue.add(cleanupTask);
       }
@@ -150,7 +149,7 @@ public class LoadTsFileManager {
     if (!uuid2WriterManager.containsKey(uuid)) {
       synchronized (uuid2CleanupTask) {
         final CleanupTask cleanupTask =
-            new CleanupTask(uuid, 
LoadTsFileScheduler.LOAD_TASK_MAX_TIME_IN_SECOND * 1000);
+            new CleanupTask(uuid, 
CONFIG.getLoadCleanupTaskExecutionDelayTimeSeconds() * 1000);
         uuid2CleanupTask.put(uuid, cleanupTask);
         cleanupTaskQueue.add(cleanupTask);
       }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/scheduler/load/LoadTsFileScheduler.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/scheduler/load/LoadTsFileScheduler.java
index ecdcbbc5922..9a11b05e2df 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/scheduler/load/LoadTsFileScheduler.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/scheduler/load/LoadTsFileScheduler.java
@@ -35,6 +35,7 @@ import org.apache.iotdb.commons.partition.StorageExecutor;
 import org.apache.iotdb.commons.service.metric.MetricService;
 import org.apache.iotdb.commons.service.metric.enums.Metric;
 import org.apache.iotdb.commons.service.metric.enums.Tag;
+import org.apache.iotdb.db.conf.IoTDBConfig;
 import org.apache.iotdb.db.conf.IoTDBDescriptor;
 import org.apache.iotdb.db.exception.LoadReadOnlyException;
 import org.apache.iotdb.db.exception.mpp.FragmentInstanceDispatchException;
@@ -98,8 +99,11 @@ import java.util.stream.IntStream;
  * 
href="https://apache-iotdb.feishu.cn/docx/doxcnyBYWzek8ksSEU6obZMpYLe";>...</a>;
  */
 public class LoadTsFileScheduler implements IScheduler {
+
   private static final Logger LOGGER = 
LoggerFactory.getLogger(LoadTsFileScheduler.class);
-  public static final long LOAD_TASK_MAX_TIME_IN_SECOND = 900L; // 15min
+
+  private static final IoTDBConfig CONFIG = 
IoTDBDescriptor.getInstance().getConfig();
+
   private static final long SINGLE_SCHEDULER_MAX_MEMORY_SIZE =
       IoTDBDescriptor.getInstance().getConfig().getThriftMaxFrameSize() >> 2;
   private static final int TRANSMIT_LIMIT =
@@ -251,7 +255,7 @@ public class LoadTsFileScheduler implements IScheduler {
     try {
       FragInstanceDispatchResult result =
           dispatchResultFuture.get(
-              LoadTsFileScheduler.LOAD_TASK_MAX_TIME_IN_SECOND, 
TimeUnit.SECONDS);
+              CONFIG.getLoadCleanupTaskExecutionDelayTimeSeconds(), 
TimeUnit.SECONDS);
       if (!result.isSuccessful()) {
         // TODO: retry.
         LOGGER.warn(
diff --git 
a/iotdb-core/node-commons/src/assembly/resources/conf/iotdb-common.properties 
b/iotdb-core/node-commons/src/assembly/resources/conf/iotdb-common.properties
index cf108f14291..b70df894ee7 100644
--- 
a/iotdb-core/node-commons/src/assembly/resources/conf/iotdb-common.properties
+++ 
b/iotdb-core/node-commons/src/assembly/resources/conf/iotdb-common.properties
@@ -1099,3 +1099,11 @@ data_replication_factor=1
 
 # The thread count which can be used for model inference operation.
 # model_inference_execution_thread_count=5
+
+####################
+### Load TsFile Configuration
+####################
+
+# Load clean up task is used to clean up the unsuccessful loaded tsfile after 
a certain period of time.
+# The parameter is the delay time after an unsuccessful load operation (in 
seconds).
+# load_clean_up_task_execution_delay_time_seconds=1800

Reply via email to