This is an automated email from the ASF dual-hosted git repository.

justinchen pushed a commit to branch wait_time
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/wait_time by this push:
     new ece280f05f9 fff
ece280f05f9 is described below

commit ece280f05f9810a48bf6188a09abb178b2c5b07c
Author: Caideyipi <[email protected]>
AuthorDate: Tue Nov 25 10:29:17 2025 +0800

    fff
---
 .../agent/task/subtask/sink/PipeSinkSubtask.java   | 17 +++++++++++++-
 .../apache/iotdb/commons/conf/CommonConfig.java    | 27 ++++++++++++++++++++++
 .../iotdb/commons/pipe/config/PipeDescriptor.java  | 11 +++++++++
 3 files changed, 54 insertions(+), 1 deletion(-)

diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/subtask/sink/PipeSinkSubtask.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/subtask/sink/PipeSinkSubtask.java
index 308ddfb90d2..26d64108724 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/subtask/sink/PipeSinkSubtask.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/subtask/sink/PipeSinkSubtask.java
@@ -51,6 +51,8 @@ import java.util.Objects;
 public class PipeSinkSubtask extends PipeAbstractSinkSubtask {
 
   private static final Logger LOGGER = 
LoggerFactory.getLogger(PipeSinkSubtask.class);
+  private static final int SLEEP_MAX_INTERVAL_MS = 1000;
+  private static final int SLEEP_INIT_INTERVAL_MS = 250;
 
   // For input
   protected final UnboundedBlockingPendingQueue<Event> inputPendingQueue;
@@ -58,6 +60,7 @@ public class PipeSinkSubtask extends PipeAbstractSinkSubtask {
   // Record these variables to provide corresponding value to tag key of 
monitoring metrics
   private final String attributeSortedString;
   private final int connectorIndex;
+  private int sleepInterval = SLEEP_INIT_INTERVAL_MS;
 
   // Now parallel connectors run the same time, thus the heartbeat events are 
not sure
   // to trigger the general event transfer function, causing potentially such 
as
@@ -132,8 +135,9 @@ public class PipeSinkSubtask extends 
PipeAbstractSinkSubtask {
       }
 
       decreaseReferenceCountAndReleaseLastEvent(event, true);
+      sleepInterval = SLEEP_INIT_INTERVAL_MS;
     } catch (final PipeNonReportException e) {
-      // Ignore, go directly next round
+      sleep4NonReportException();
     } catch (final PipeException e) {
       if (!isClosed.get()) {
         setLastExceptionEvent(event);
@@ -223,6 +227,17 @@ public class PipeSinkSubtask extends 
PipeAbstractSinkSubtask {
     }
   }
 
+  private void sleep4NonReportException() {
+    if (sleepInterval < SLEEP_MAX_INTERVAL_MS) {
+      sleepInterval <<= 1;
+    }
+    try {
+      Thread.sleep(sleepInterval);
+    } catch (final InterruptedException e) {
+      Thread.currentThread().interrupt();
+    }
+  }
+
   /**
    * When a pipe is dropped, the connector maybe reused and will not be 
closed. So we just discard
    * its queued events in the output pipe connector.
diff --git 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java
 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java
index d83e96eb100..788d27f3ccf 100644
--- 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java
+++ 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java
@@ -254,6 +254,9 @@ public class CommonConfig {
   private long pipeSubtaskExecutorBasicCheckPointIntervalByTimeDuration = 10 * 
1000L;
   private long pipeSubtaskExecutorPendingQueueMaxBlockingTimeMs = 50;
 
+  private long pipeSinkSubtaskSleepIntervalMs = 250L;
+  private long pipeSinkSubtaskSleepMaxMs = 1000L;
+
   private long pipeSubtaskExecutorCronHeartbeatEventIntervalSeconds = 20;
 
   private long pipeMaxWaitFinishTime = 10 * 1000;
@@ -1415,6 +1418,30 @@ public class CommonConfig {
         "pipeRetryLocallyForParallelOrUserConflict is set to {}.", 
pipeSubtaskExecutorMaxThreadNum);
   }
 
+  public long getPipeSinkSubtaskSleepIntervalMs() {
+    return pipeSinkSubtaskSleepIntervalMs;
+  }
+
+  public void setPipeSinkSubtaskSleepIntervalMs(long 
pipeSinkSubtaskSleepIntervalMs) {
+    if (this.pipeSinkSubtaskSleepIntervalMs == pipeSinkSubtaskSleepIntervalMs) 
{
+      return;
+    }
+    this.pipeSinkSubtaskSleepIntervalMs = pipeSinkSubtaskSleepIntervalMs;
+    logger.info("pipeSinkSubtaskSleepIntervalMs is set to {}.", 
pipeSinkSubtaskSleepIntervalMs);
+  }
+
+  public long getPipeSinkSubtaskSleepMaxMs() {
+    return pipeSinkSubtaskSleepMaxMs;
+  }
+
+  public void setPipeSinkSubtaskSleepMaxMs(long pipeSinkSubtaskSleepMaxMs) {
+    if (this.pipeSinkSubtaskSleepMaxMs == pipeSinkSubtaskSleepMaxMs) {
+      return;
+    }
+    this.pipeSinkSubtaskSleepMaxMs = pipeSinkSubtaskSleepMaxMs;
+    logger.info("pipeSinkSubtaskSleepMaxMs is set to {}.", 
pipeSinkSubtaskSleepMaxMs);
+  }
+
   public long getPipeSubtaskExecutorPendingQueueMaxBlockingTimeMs() {
     return pipeSubtaskExecutorPendingQueueMaxBlockingTimeMs;
   }
diff --git 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeDescriptor.java
 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeDescriptor.java
index 928ff5f25a5..41fab677412 100644
--- 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeDescriptor.java
+++ 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeDescriptor.java
@@ -296,6 +296,17 @@ public class PipeDescriptor {
                 "pipe_retry_locally_for_user_conflict",
                 
String.valueOf(config.isPipeRetryLocallyForParallelOrUserConflict()))));
 
+    config.setPipeSinkSubtaskSleepIntervalMs(
+        Long.parseLong(
+            properties.getProperty(
+                "pipe_sink_subtask_sleep_interval_ms",
+                String.valueOf(config.getPipeSinkSubtaskSleepIntervalMs()))));
+    config.setPipeSinkSubtaskSleepMaxMs(
+        Long.parseLong(
+            properties.getProperty(
+                "pipe_sink_subtask_sleep_max_ms",
+                String.valueOf(config.getPipeSinkSubtaskSleepMaxMs()))));
+
     config.setPipeSourceAssignerDisruptorRingBufferSize(
         Integer.parseInt(
             Optional.ofNullable(

Reply via email to