This is an automated email from the ASF dual-hosted git repository. rong pushed a commit to branch fix-pipe-cause-wal-pin in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit 6f7d78721cc70124b503961bc483f9f6fd409bae Author: Steve Yurong Su <[email protected]> AuthorDate: Thu Sep 28 12:23:24 2023 +0800 new config prop: pipeSubtaskCronEventInjectorExecutionIntervalSeconds --- .../iotdb/db/pipe/agent/runtime/PipeCronEventInjector.java | 4 +++- .../main/java/org/apache/iotdb/commons/conf/CommonConfig.java | 11 +++++++++++ .../java/org/apache/iotdb/commons/conf/CommonDescriptor.java | 5 +++++ .../java/org/apache/iotdb/commons/pipe/config/PipeConfig.java | 4 ++++ 4 files changed, 23 insertions(+), 1 deletion(-) diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/runtime/PipeCronEventInjector.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/runtime/PipeCronEventInjector.java index 8ec9798e187..20d8028855c 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/runtime/PipeCronEventInjector.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/runtime/PipeCronEventInjector.java @@ -22,6 +22,7 @@ package org.apache.iotdb.db.pipe.agent.runtime; import org.apache.iotdb.commons.concurrent.IoTDBThreadPoolFactory; import org.apache.iotdb.commons.concurrent.ThreadName; import org.apache.iotdb.commons.concurrent.threadpool.ScheduledExecutorUtil; +import org.apache.iotdb.commons.pipe.config.PipeConfig; import org.apache.iotdb.db.pipe.extractor.realtime.listener.PipeInsertionDataNodeListener; import org.slf4j.Logger; @@ -35,7 +36,8 @@ public class PipeCronEventInjector { private static final Logger LOGGER = LoggerFactory.getLogger(PipeCronEventInjector.class); - private static final int CRON_EVENT_INJECTOR_INTERVAL_SECONDS = 1; + private static final int CRON_EVENT_INJECTOR_INTERVAL_SECONDS = + PipeConfig.getInstance().getPipeSubtaskCronEventInjectorExecutionIntervalSeconds(); private static final ScheduledExecutorService CRON_EVENT_INJECTOR_EXECUTOR = IoTDBThreadPoolFactory.newSingleThreadScheduledExecutor( diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java index c96a70d07df..1bc039f2d54 100644 --- a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java +++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java @@ -161,6 +161,7 @@ public class CommonConfig { private int pipeSubtaskExecutorBasicCheckPointIntervalByConsumedEventCount = 10_000; private long pipeSubtaskExecutorBasicCheckPointIntervalByTimeDuration = 10 * 1000L; private long pipeSubtaskExecutorPendingQueueMaxBlockingTimeMs = 1000; + private int pipeSubtaskCronEventInjectorExecutionIntervalSeconds = 10; private int pipeExtractorAssignerDisruptorRingBufferSize = 65536; private int pipeExtractorMatcherCacheSize = 1024; @@ -711,6 +712,16 @@ public class CommonConfig { pipeSubtaskExecutorPendingQueueMaxBlockingTimeMs; } + public int getPipeSubtaskCronEventInjectorExecutionIntervalSeconds() { + return pipeSubtaskCronEventInjectorExecutionIntervalSeconds; + } + + public void setPipeSubtaskCronEventInjectorExecutionIntervalSeconds( + int pipeSubtaskCronEventInjectorExecutionIntervalSeconds) { + this.pipeSubtaskCronEventInjectorExecutionIntervalSeconds = + pipeSubtaskCronEventInjectorExecutionIntervalSeconds; + } + public void setPipeAirGapReceiverEnabled(boolean pipeAirGapReceiverEnabled) { this.pipeAirGapReceiverEnabled = pipeAirGapReceiverEnabled; } diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonDescriptor.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonDescriptor.java index ab4be8eacfc..6177623fead 100644 --- a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonDescriptor.java +++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonDescriptor.java @@ -300,6 +300,11 @@ public class CommonDescriptor { properties.getProperty( "pipe_subtask_executor_pending_queue_max_blocking_time_ms", String.valueOf(config.getPipeSubtaskExecutorPendingQueueMaxBlockingTimeMs())))); + config.setPipeSubtaskCronEventInjectorExecutionIntervalSeconds( + Integer.parseInt( + properties.getProperty( + "pipe_subtask_cron_event_injector_execution_interval_seconds", + String.valueOf(config.getPipeSubtaskCronEventInjectorExecutionIntervalSeconds())))); config.setPipeExtractorAssignerDisruptorRingBufferSize( Integer.parseInt( diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeConfig.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeConfig.java index f19f948d6cc..2c6e307b0cc 100644 --- a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeConfig.java +++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeConfig.java @@ -71,6 +71,10 @@ public class PipeConfig { return COMMON_CONFIG.getPipeSubtaskExecutorPendingQueueMaxBlockingTimeMs(); } + public int getPipeSubtaskCronEventInjectorExecutionIntervalSeconds() { + return COMMON_CONFIG.getPipeSubtaskCronEventInjectorExecutionIntervalSeconds(); + } + /////////////////////////////// Extractor /////////////////////////////// public int getPipeExtractorAssignerDisruptorRingBufferSize() {
