This is an automated email from the ASF dual-hosted git repository.
rong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new 3464e95f495 Load: Support configure
load_clean_up_task_execution_delay_time_seconds of a load task (#11936)
3464e95f495 is described below
commit 3464e95f4950d3b385e73dbf8b1388fe6d82d94a
Author: Itami Sho <[email protected]>
AuthorDate: Sun Jan 21 15:20:12 2024 +0800
Load: Support configure load_clean_up_task_execution_delay_time_seconds of
a load task (#11936)
Co-authored-by: Steve Yurong Su <[email protected]>
---
.../main/java/org/apache/iotdb/db/conf/IoTDBConfig.java | 16 +++++++++++++---
.../java/org/apache/iotdb/db/conf/IoTDBDescriptor.java | 12 ++++++++++++
.../db/pipe/agent/runtime/PipePeriodicalJobExecutor.java | 1 +
.../db/queryengine/execution/load/LoadTsFileManager.java | 7 +++----
.../plan/scheduler/load/LoadTsFileScheduler.java | 8 ++++++--
.../src/assembly/resources/conf/iotdb-common.properties | 8 ++++++++
6 files changed, 43 insertions(+), 9 deletions(-)
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
index ce5c4e5796a..0fe486f1d8c 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
@@ -1092,13 +1092,14 @@ public class IoTDBConfig {
private double maxAllocateMemoryRatioForLoad = 0.8;
private int loadTsFileAnalyzeSchemaBatchFlushTimeSeriesNumber = 4096;
-
private long loadTsFileAnalyzeSchemaMemorySizeInBytes =
- 0; // 0 means that the decision will be adaptive based on the number of
sequences
+ 0L; // 0 means that the decision will be adaptive based on the number of
sequences
- private long loadMemoryAllocateRetryIntervalMs = 1000;
+ private long loadMemoryAllocateRetryIntervalMs = 1000L;
private int loadMemoryAllocateMaxRetries = 5;
+ private long loadCleanupTaskExecutionDelayTimeSeconds = 1800L; // 30 min
+
/** Pipe related */
/** initialized as empty, updated based on the latest `systemDir` during
querying */
private String[] pipeReceiverFileDirs = new String[0];
@@ -3764,6 +3765,15 @@ public class IoTDBConfig {
this.loadMemoryAllocateMaxRetries = loadMemoryAllocateMaxRetries;
}
+ public long getLoadCleanupTaskExecutionDelayTimeSeconds() {
+ return loadCleanupTaskExecutionDelayTimeSeconds;
+ }
+
+ public void setLoadCleanupTaskExecutionDelayTimeSeconds(
+ long loadCleanupTaskExecutionDelayTimeSeconds) {
+ this.loadCleanupTaskExecutionDelayTimeSeconds =
loadCleanupTaskExecutionDelayTimeSeconds;
+ }
+
public void setPipeReceiverFileDirs(String[] pipeReceiverFileDirs) {
this.pipeReceiverFileDirs = pipeReceiverFileDirs;
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java
index 5f0616bc08c..f6e56fd60c9 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java
@@ -902,6 +902,11 @@ public class IoTDBDescriptor {
properties.getProperty(
"load_tsfile_analyze_schema_memory_size_in_bytes",
String.valueOf(conf.getLoadTsFileAnalyzeSchemaMemorySizeInBytes()))));
+ conf.setLoadCleanupTaskExecutionDelayTimeSeconds(
+ Long.parseLong(
+ properties.getProperty(
+ "load_clean_up_task_execution_delay_time_seconds",
+
String.valueOf(conf.getLoadCleanupTaskExecutionDelayTimeSeconds()))));
conf.setExtPipeDir(properties.getProperty("ext_pipe_dir",
conf.getExtPipeDir()).trim());
@@ -1628,6 +1633,13 @@ public class IoTDBDescriptor {
// update compaction config
loadCompactionHotModifiedProps(properties);
+
+ // update load config
+ conf.setLoadCleanupTaskExecutionDelayTimeSeconds(
+ Long.parseLong(
+ properties.getProperty(
+ "load_clean_up_task_execution_delay_time_seconds",
+
String.valueOf(conf.getLoadCleanupTaskExecutionDelayTimeSeconds()))));
} catch (Exception e) {
throw new QueryProcessException(String.format("Fail to reload
configuration because %s", e));
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/runtime/PipePeriodicalJobExecutor.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/runtime/PipePeriodicalJobExecutor.java
index 3202745533d..61000451cfd 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/runtime/PipePeriodicalJobExecutor.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/runtime/PipePeriodicalJobExecutor.java
@@ -50,6 +50,7 @@ public class PipePeriodicalJobExecutor {
private static final long MIN_INTERVAL_SECONDS =
PipeConfig.getInstance().getPipeSubtaskExecutorCronHeartbeatEventIntervalSeconds();
+
private long rounds;
private Future<?> executorFuture;
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/load/LoadTsFileManager.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/load/LoadTsFileManager.java
index 1d5cbd0205f..bbd3d3a760d 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/load/LoadTsFileManager.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/load/LoadTsFileManager.java
@@ -32,7 +32,6 @@ import org.apache.iotdb.db.conf.IoTDBDescriptor;
import org.apache.iotdb.db.exception.LoadFileException;
import org.apache.iotdb.db.pipe.agent.PipeAgent;
import
org.apache.iotdb.db.queryengine.plan.planner.plan.node.load.LoadTsFilePieceNode;
-import org.apache.iotdb.db.queryengine.plan.scheduler.load.LoadTsFileScheduler;
import
org.apache.iotdb.db.queryengine.plan.scheduler.load.LoadTsFileScheduler.LoadCommand;
import org.apache.iotdb.db.storageengine.dataregion.DataRegion;
import
org.apache.iotdb.db.storageengine.dataregion.modification.ModificationFile;
@@ -91,7 +90,7 @@ public class LoadTsFileManager {
.registerPeriodicalJob(
"LoadTsFileManager#cleanupTasks",
this::cleanupTasks,
- LoadTsFileScheduler.LOAD_TASK_MAX_TIME_IN_SECOND);
+ CONFIG.getLoadCleanupTaskExecutionDelayTimeSeconds() >> 2);
}
private void cleanupTasks() {
@@ -138,7 +137,7 @@ public class LoadTsFileManager {
synchronized (uuid2CleanupTask) {
final CleanupTask cleanupTask =
- new CleanupTask(uuid,
LoadTsFileScheduler.LOAD_TASK_MAX_TIME_IN_SECOND * 1000);
+ new CleanupTask(uuid,
CONFIG.getLoadCleanupTaskExecutionDelayTimeSeconds() * 1000);
uuid2CleanupTask.put(uuid, cleanupTask);
cleanupTaskQueue.add(cleanupTask);
}
@@ -150,7 +149,7 @@ public class LoadTsFileManager {
if (!uuid2WriterManager.containsKey(uuid)) {
synchronized (uuid2CleanupTask) {
final CleanupTask cleanupTask =
- new CleanupTask(uuid,
LoadTsFileScheduler.LOAD_TASK_MAX_TIME_IN_SECOND * 1000);
+ new CleanupTask(uuid,
CONFIG.getLoadCleanupTaskExecutionDelayTimeSeconds() * 1000);
uuid2CleanupTask.put(uuid, cleanupTask);
cleanupTaskQueue.add(cleanupTask);
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/scheduler/load/LoadTsFileScheduler.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/scheduler/load/LoadTsFileScheduler.java
index ecdcbbc5922..9a11b05e2df 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/scheduler/load/LoadTsFileScheduler.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/scheduler/load/LoadTsFileScheduler.java
@@ -35,6 +35,7 @@ import org.apache.iotdb.commons.partition.StorageExecutor;
import org.apache.iotdb.commons.service.metric.MetricService;
import org.apache.iotdb.commons.service.metric.enums.Metric;
import org.apache.iotdb.commons.service.metric.enums.Tag;
+import org.apache.iotdb.db.conf.IoTDBConfig;
import org.apache.iotdb.db.conf.IoTDBDescriptor;
import org.apache.iotdb.db.exception.LoadReadOnlyException;
import org.apache.iotdb.db.exception.mpp.FragmentInstanceDispatchException;
@@ -98,8 +99,11 @@ import java.util.stream.IntStream;
*
href="https://apache-iotdb.feishu.cn/docx/doxcnyBYWzek8ksSEU6obZMpYLe">...</a>;
*/
public class LoadTsFileScheduler implements IScheduler {
+
private static final Logger LOGGER =
LoggerFactory.getLogger(LoadTsFileScheduler.class);
- public static final long LOAD_TASK_MAX_TIME_IN_SECOND = 900L; // 15min
+
+ private static final IoTDBConfig CONFIG =
IoTDBDescriptor.getInstance().getConfig();
+
private static final long SINGLE_SCHEDULER_MAX_MEMORY_SIZE =
IoTDBDescriptor.getInstance().getConfig().getThriftMaxFrameSize() >> 2;
private static final int TRANSMIT_LIMIT =
@@ -251,7 +255,7 @@ public class LoadTsFileScheduler implements IScheduler {
try {
FragInstanceDispatchResult result =
dispatchResultFuture.get(
- LoadTsFileScheduler.LOAD_TASK_MAX_TIME_IN_SECOND,
TimeUnit.SECONDS);
+ CONFIG.getLoadCleanupTaskExecutionDelayTimeSeconds(),
TimeUnit.SECONDS);
if (!result.isSuccessful()) {
// TODO: retry.
LOGGER.warn(
diff --git
a/iotdb-core/node-commons/src/assembly/resources/conf/iotdb-common.properties
b/iotdb-core/node-commons/src/assembly/resources/conf/iotdb-common.properties
index cf108f14291..b70df894ee7 100644
---
a/iotdb-core/node-commons/src/assembly/resources/conf/iotdb-common.properties
+++
b/iotdb-core/node-commons/src/assembly/resources/conf/iotdb-common.properties
@@ -1099,3 +1099,11 @@ data_replication_factor=1
# The thread count which can be used for model inference operation.
# model_inference_execution_thread_count=5
+
+####################
+### Load TsFile Configuration
+####################
+
+# Load clean up task is used to clean up the unsuccessful loaded tsfile after
a certain period of time.
+# The parameter is the delay time after an unsuccessful load operation (in
seconds).
+# load_clean_up_task_execution_delay_time_seconds=1800