This is an automated email from the ASF dual-hosted git repository.

justinchen pushed a commit to branch wait_time
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/wait_time by this push:
     new 7a4ba437070 fix
7a4ba437070 is described below

commit 7a4ba437070b9fe1acc20129c3b8d2287cab12df
Author: Caideyipi <[email protected]>
AuthorDate: Tue Nov 25 10:34:48 2025 +0800

    fix
---
 .../agent/task/subtask/sink/PipeSinkSubtask.java   |  9 +++----
 .../apache/iotdb/commons/conf/CommonConfig.java    | 30 ++++++++++++----------
 .../iotdb/commons/pipe/config/PipeConfig.java      | 10 ++++++++
 .../iotdb/commons/pipe/config/PipeDescriptor.java  | 12 ++++-----
 4 files changed, 36 insertions(+), 25 deletions(-)

diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/subtask/sink/PipeSinkSubtask.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/subtask/sink/PipeSinkSubtask.java
index 26d64108724..b23a6f2ed49 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/subtask/sink/PipeSinkSubtask.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/subtask/sink/PipeSinkSubtask.java
@@ -23,6 +23,7 @@ import 
org.apache.iotdb.commons.exception.pipe.PipeNonReportException;
 import org.apache.iotdb.commons.exception.pipe.PipeRuntimeException;
 import 
org.apache.iotdb.commons.pipe.agent.task.connection.UnboundedBlockingPendingQueue;
 import 
org.apache.iotdb.commons.pipe.agent.task.subtask.PipeAbstractSinkSubtask;
+import org.apache.iotdb.commons.pipe.config.PipeConfig;
 import org.apache.iotdb.commons.pipe.event.EnrichedEvent;
 import org.apache.iotdb.commons.pipe.sink.protocol.IoTDBSink;
 import org.apache.iotdb.db.pipe.agent.PipeDataNodeAgent;
@@ -51,8 +52,6 @@ import java.util.Objects;
 public class PipeSinkSubtask extends PipeAbstractSinkSubtask {
 
   private static final Logger LOGGER = 
LoggerFactory.getLogger(PipeSinkSubtask.class);
-  private static final int SLEEP_MAX_INTERVAL_MS = 1000;
-  private static final int SLEEP_INIT_INTERVAL_MS = 250;
 
   // For input
   protected final UnboundedBlockingPendingQueue<Event> inputPendingQueue;
@@ -60,7 +59,7 @@ public class PipeSinkSubtask extends PipeAbstractSinkSubtask {
   // Record these variables to provide corresponding value to tag key of 
monitoring metrics
   private final String attributeSortedString;
   private final int connectorIndex;
-  private int sleepInterval = SLEEP_INIT_INTERVAL_MS;
+  private long sleepInterval = 
PipeConfig.getInstance().getPipeSinkSubtaskSleepIntervalInitMs();
 
   // Now parallel connectors run the same time, thus the heartbeat events are 
not sure
   // to trigger the general event transfer function, causing potentially such 
as
@@ -135,7 +134,7 @@ public class PipeSinkSubtask extends 
PipeAbstractSinkSubtask {
       }
 
       decreaseReferenceCountAndReleaseLastEvent(event, true);
-      sleepInterval = SLEEP_INIT_INTERVAL_MS;
+      sleepInterval = 
PipeConfig.getInstance().getPipeSinkSubtaskSleepIntervalInitMs();
     } catch (final PipeNonReportException e) {
       sleep4NonReportException();
     } catch (final PipeException e) {
@@ -228,7 +227,7 @@ public class PipeSinkSubtask extends 
PipeAbstractSinkSubtask {
   }
 
   private void sleep4NonReportException() {
-    if (sleepInterval < SLEEP_MAX_INTERVAL_MS) {
+    if (sleepInterval < 
PipeConfig.getInstance().getPipeSinkSubtaskSleepIntervalMaxMs()) {
       sleepInterval <<= 1;
     }
     try {
diff --git 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java
 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java
index 788d27f3ccf..4e748ce647c 100644
--- 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java
+++ 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java
@@ -254,8 +254,8 @@ public class CommonConfig {
   private long pipeSubtaskExecutorBasicCheckPointIntervalByTimeDuration = 10 * 
1000L;
   private long pipeSubtaskExecutorPendingQueueMaxBlockingTimeMs = 50;
 
-  private long pipeSinkSubtaskSleepIntervalMs = 250L;
-  private long pipeSinkSubtaskSleepMaxMs = 1000L;
+  private long pipeSinkSubtaskSleepIntervalInitMs = 250L;
+  private long pipeSinkSubtaskSleepIntervalMaxMs = 1000L;
 
   private long pipeSubtaskExecutorCronHeartbeatEventIntervalSeconds = 20;
 
@@ -1418,28 +1418,30 @@ public class CommonConfig {
         "pipeRetryLocallyForParallelOrUserConflict is set to {}.", 
pipeSubtaskExecutorMaxThreadNum);
   }
 
-  public long getPipeSinkSubtaskSleepIntervalMs() {
-    return pipeSinkSubtaskSleepIntervalMs;
+  public long getPipeSinkSubtaskSleepIntervalInitMs() {
+    return pipeSinkSubtaskSleepIntervalInitMs;
   }
 
-  public void setPipeSinkSubtaskSleepIntervalMs(long 
pipeSinkSubtaskSleepIntervalMs) {
-    if (this.pipeSinkSubtaskSleepIntervalMs == pipeSinkSubtaskSleepIntervalMs) 
{
+  public void setPipeSinkSubtaskSleepIntervalInitMs(long 
pipeSinkSubtaskSleepIntervalInitMs) {
+    if (this.pipeSinkSubtaskSleepIntervalInitMs == 
pipeSinkSubtaskSleepIntervalInitMs) {
       return;
     }
-    this.pipeSinkSubtaskSleepIntervalMs = pipeSinkSubtaskSleepIntervalMs;
-    logger.info("pipeSinkSubtaskSleepIntervalMs is set to {}.", 
pipeSinkSubtaskSleepIntervalMs);
+    this.pipeSinkSubtaskSleepIntervalInitMs = 
pipeSinkSubtaskSleepIntervalInitMs;
+    logger.info(
+        "pipeSinkSubtaskSleepIntervalInitMs is set to {}.", 
pipeSinkSubtaskSleepIntervalInitMs);
   }
 
-  public long getPipeSinkSubtaskSleepMaxMs() {
-    return pipeSinkSubtaskSleepMaxMs;
+  public long getPipeSinkSubtaskSleepIntervalMaxMs() {
+    return pipeSinkSubtaskSleepIntervalMaxMs;
   }
 
-  public void setPipeSinkSubtaskSleepMaxMs(long pipeSinkSubtaskSleepMaxMs) {
-    if (this.pipeSinkSubtaskSleepMaxMs == pipeSinkSubtaskSleepMaxMs) {
+  public void setPipeSinkSubtaskSleepIntervalMaxMs(long 
pipeSinkSubtaskSleepIntervalMaxMs) {
+    if (this.pipeSinkSubtaskSleepIntervalMaxMs == 
pipeSinkSubtaskSleepIntervalMaxMs) {
       return;
     }
-    this.pipeSinkSubtaskSleepMaxMs = pipeSinkSubtaskSleepMaxMs;
-    logger.info("pipeSinkSubtaskSleepMaxMs is set to {}.", 
pipeSinkSubtaskSleepMaxMs);
+    this.pipeSinkSubtaskSleepIntervalMaxMs = pipeSinkSubtaskSleepIntervalMaxMs;
+    logger.info(
+        "pipeSinkSubtaskSleepIntervalMaxMs is set to {}.", 
pipeSinkSubtaskSleepIntervalMaxMs);
   }
 
   public long getPipeSubtaskExecutorPendingQueueMaxBlockingTimeMs() {
diff --git 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeConfig.java
 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeConfig.java
index ce09fb1f291..1ed1e39911f 100644
--- 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeConfig.java
+++ 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeConfig.java
@@ -143,6 +143,14 @@ public class PipeConfig {
     return COMMON_CONFIG.getPipeMaxWaitFinishTime();
   }
 
+  public long getPipeSinkSubtaskSleepIntervalInitMs() {
+    return COMMON_CONFIG.getPipeSinkSubtaskSleepIntervalInitMs();
+  }
+
+  public long getPipeSinkSubtaskSleepIntervalMaxMs() {
+    return COMMON_CONFIG.getPipeSinkSubtaskSleepIntervalMaxMs();
+  }
+
   /////////////////////////////// Source ///////////////////////////////
 
   public int getPipeSourceAssignerDisruptorRingBufferSize() {
@@ -484,6 +492,8 @@ public class PipeConfig {
         "PipeSubtaskExecutorCronHeartbeatEventIntervalSeconds: {}",
         getPipeSubtaskExecutorCronHeartbeatEventIntervalSeconds());
     LOGGER.info("PipeMaxWaitFinishTime: {}", getPipeMaxWaitFinishTime());
+    LOGGER.info("PipeSinkSubtaskSleepIntervalInitMs: {}", 
getPipeSinkSubtaskSleepIntervalInitMs());
+    LOGGER.info("PipeSinkSubtaskSleepIntervalMaxMs: {}", 
getPipeSinkSubtaskSleepIntervalMaxMs());
 
     LOGGER.info(
         "PipeSourceAssignerDisruptorRingBufferSize: {}",
diff --git 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeDescriptor.java
 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeDescriptor.java
index 41fab677412..cbe4dd25e83 100644
--- 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeDescriptor.java
+++ 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeDescriptor.java
@@ -296,16 +296,16 @@ public class PipeDescriptor {
                 "pipe_retry_locally_for_user_conflict",
                 
String.valueOf(config.isPipeRetryLocallyForParallelOrUserConflict()))));
 
-    config.setPipeSinkSubtaskSleepIntervalMs(
+    config.setPipeSinkSubtaskSleepIntervalInitMs(
         Long.parseLong(
             properties.getProperty(
-                "pipe_sink_subtask_sleep_interval_ms",
-                String.valueOf(config.getPipeSinkSubtaskSleepIntervalMs()))));
-    config.setPipeSinkSubtaskSleepMaxMs(
+                "pipe_sink_subtask_sleep_interval_init_ms",
+                
String.valueOf(config.getPipeSinkSubtaskSleepIntervalInitMs()))));
+    config.setPipeSinkSubtaskSleepIntervalMaxMs(
         Long.parseLong(
             properties.getProperty(
-                "pipe_sink_subtask_sleep_max_ms",
-                String.valueOf(config.getPipeSinkSubtaskSleepMaxMs()))));
+                "pipe_sink_subtask_sleep_interval_max_ms",
+                
String.valueOf(config.getPipeSinkSubtaskSleepIntervalMaxMs()))));
 
     config.setPipeSourceAssignerDisruptorRingBufferSize(
         Integer.parseInt(

Reply via email to