This is an automated email from the ASF dual-hosted git repository.

rong pushed a commit to branch fix-pipe-cause-wal-pin
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit 6f7d78721cc70124b503961bc483f9f6fd409bae
Author: Steve Yurong Su <[email protected]>
AuthorDate: Thu Sep 28 12:23:24 2023 +0800

    new config prop: pipeSubtaskCronEventInjectorExecutionIntervalSeconds
---
 .../iotdb/db/pipe/agent/runtime/PipeCronEventInjector.java    |  4 +++-
 .../main/java/org/apache/iotdb/commons/conf/CommonConfig.java | 11 +++++++++++
 .../java/org/apache/iotdb/commons/conf/CommonDescriptor.java  |  5 +++++
 .../java/org/apache/iotdb/commons/pipe/config/PipeConfig.java |  4 ++++
 4 files changed, 23 insertions(+), 1 deletion(-)

diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/runtime/PipeCronEventInjector.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/runtime/PipeCronEventInjector.java
index 8ec9798e187..20d8028855c 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/runtime/PipeCronEventInjector.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/runtime/PipeCronEventInjector.java
@@ -22,6 +22,7 @@ package org.apache.iotdb.db.pipe.agent.runtime;
 import org.apache.iotdb.commons.concurrent.IoTDBThreadPoolFactory;
 import org.apache.iotdb.commons.concurrent.ThreadName;
 import org.apache.iotdb.commons.concurrent.threadpool.ScheduledExecutorUtil;
+import org.apache.iotdb.commons.pipe.config.PipeConfig;
 import 
org.apache.iotdb.db.pipe.extractor.realtime.listener.PipeInsertionDataNodeListener;
 
 import org.slf4j.Logger;
@@ -35,7 +36,8 @@ public class PipeCronEventInjector {
 
   private static final Logger LOGGER = 
LoggerFactory.getLogger(PipeCronEventInjector.class);
 
-  private static final int CRON_EVENT_INJECTOR_INTERVAL_SECONDS = 1;
+  private static final int CRON_EVENT_INJECTOR_INTERVAL_SECONDS =
+      
PipeConfig.getInstance().getPipeSubtaskCronEventInjectorExecutionIntervalSeconds();
 
   private static final ScheduledExecutorService CRON_EVENT_INJECTOR_EXECUTOR =
       IoTDBThreadPoolFactory.newSingleThreadScheduledExecutor(
diff --git 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java
 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java
index c96a70d07df..1bc039f2d54 100644
--- 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java
+++ 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java
@@ -161,6 +161,7 @@ public class CommonConfig {
   private int pipeSubtaskExecutorBasicCheckPointIntervalByConsumedEventCount = 
10_000;
   private long pipeSubtaskExecutorBasicCheckPointIntervalByTimeDuration = 10 * 
1000L;
   private long pipeSubtaskExecutorPendingQueueMaxBlockingTimeMs = 1000;
+  private int pipeSubtaskCronEventInjectorExecutionIntervalSeconds = 10;
 
   private int pipeExtractorAssignerDisruptorRingBufferSize = 65536;
   private int pipeExtractorMatcherCacheSize = 1024;
@@ -711,6 +712,16 @@ public class CommonConfig {
         pipeSubtaskExecutorPendingQueueMaxBlockingTimeMs;
   }
 
+  public int getPipeSubtaskCronEventInjectorExecutionIntervalSeconds() {
+    return pipeSubtaskCronEventInjectorExecutionIntervalSeconds;
+  }
+
+  public void setPipeSubtaskCronEventInjectorExecutionIntervalSeconds(
+      int pipeSubtaskCronEventInjectorExecutionIntervalSeconds) {
+    this.pipeSubtaskCronEventInjectorExecutionIntervalSeconds =
+        pipeSubtaskCronEventInjectorExecutionIntervalSeconds;
+  }
+
   public void setPipeAirGapReceiverEnabled(boolean pipeAirGapReceiverEnabled) {
     this.pipeAirGapReceiverEnabled = pipeAirGapReceiverEnabled;
   }
diff --git 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonDescriptor.java
 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonDescriptor.java
index ab4be8eacfc..6177623fead 100644
--- 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonDescriptor.java
+++ 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonDescriptor.java
@@ -300,6 +300,11 @@ public class CommonDescriptor {
             properties.getProperty(
                 "pipe_subtask_executor_pending_queue_max_blocking_time_ms",
                 
String.valueOf(config.getPipeSubtaskExecutorPendingQueueMaxBlockingTimeMs()))));
+    config.setPipeSubtaskCronEventInjectorExecutionIntervalSeconds(
+        Integer.parseInt(
+            properties.getProperty(
+                "pipe_subtask_cron_event_injector_execution_interval_seconds",
+                
String.valueOf(config.getPipeSubtaskCronEventInjectorExecutionIntervalSeconds()))));
 
     config.setPipeExtractorAssignerDisruptorRingBufferSize(
         Integer.parseInt(
diff --git 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeConfig.java
 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeConfig.java
index f19f948d6cc..2c6e307b0cc 100644
--- 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeConfig.java
+++ 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeConfig.java
@@ -71,6 +71,10 @@ public class PipeConfig {
     return COMMON_CONFIG.getPipeSubtaskExecutorPendingQueueMaxBlockingTimeMs();
   }
 
+  public int getPipeSubtaskCronEventInjectorExecutionIntervalSeconds() {
+    return 
COMMON_CONFIG.getPipeSubtaskCronEventInjectorExecutionIntervalSeconds();
+  }
+
   /////////////////////////////// Extractor ///////////////////////////////
 
   public int getPipeExtractorAssignerDisruptorRingBufferSize() {

Reply via email to