This is an automated email from the ASF dual-hosted git repository.
justinchen pushed a commit to branch wait_time
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/wait_time by this push:
new ece280f05f9 fff
ece280f05f9 is described below
commit ece280f05f9810a48bf6188a09abb178b2c5b07c
Author: Caideyipi <[email protected]>
AuthorDate: Tue Nov 25 10:29:17 2025 +0800
fff
---
.../agent/task/subtask/sink/PipeSinkSubtask.java | 17 +++++++++++++-
.../apache/iotdb/commons/conf/CommonConfig.java | 27 ++++++++++++++++++++++
.../iotdb/commons/pipe/config/PipeDescriptor.java | 11 +++++++++
3 files changed, 54 insertions(+), 1 deletion(-)
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/subtask/sink/PipeSinkSubtask.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/subtask/sink/PipeSinkSubtask.java
index 308ddfb90d2..26d64108724 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/subtask/sink/PipeSinkSubtask.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/subtask/sink/PipeSinkSubtask.java
@@ -51,6 +51,8 @@ import java.util.Objects;
public class PipeSinkSubtask extends PipeAbstractSinkSubtask {
private static final Logger LOGGER =
LoggerFactory.getLogger(PipeSinkSubtask.class);
+ private static final int SLEEP_MAX_INTERVAL_MS = 1000;
+ private static final int SLEEP_INIT_INTERVAL_MS = 250;
// For input
protected final UnboundedBlockingPendingQueue<Event> inputPendingQueue;
@@ -58,6 +60,7 @@ public class PipeSinkSubtask extends PipeAbstractSinkSubtask {
// Record these variables to provide corresponding value to tag key of
monitoring metrics
private final String attributeSortedString;
private final int connectorIndex;
+ private int sleepInterval = SLEEP_INIT_INTERVAL_MS;
// Now parallel connectors run the same time, thus the heartbeat events are
not sure
// to trigger the general event transfer function, causing potentially such
as
@@ -132,8 +135,9 @@ public class PipeSinkSubtask extends
PipeAbstractSinkSubtask {
}
decreaseReferenceCountAndReleaseLastEvent(event, true);
+ sleepInterval = SLEEP_INIT_INTERVAL_MS;
} catch (final PipeNonReportException e) {
- // Ignore, go directly next round
+ sleep4NonReportException();
} catch (final PipeException e) {
if (!isClosed.get()) {
setLastExceptionEvent(event);
@@ -223,6 +227,17 @@ public class PipeSinkSubtask extends
PipeAbstractSinkSubtask {
}
}
+ private void sleep4NonReportException() {
+ if (sleepInterval < SLEEP_MAX_INTERVAL_MS) {
+ sleepInterval <<= 1;
+ }
+ try {
+ Thread.sleep(sleepInterval);
+ } catch (final InterruptedException e) {
+ Thread.currentThread().interrupt();
+ }
+ }
+
/**
* When a pipe is dropped, the connector maybe reused and will not be
closed. So we just discard
* its queued events in the output pipe connector.
diff --git
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java
index d83e96eb100..788d27f3ccf 100644
---
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java
+++
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java
@@ -254,6 +254,9 @@ public class CommonConfig {
private long pipeSubtaskExecutorBasicCheckPointIntervalByTimeDuration = 10 *
1000L;
private long pipeSubtaskExecutorPendingQueueMaxBlockingTimeMs = 50;
+ private long pipeSinkSubtaskSleepIntervalMs = 250L;
+ private long pipeSinkSubtaskSleepMaxMs = 1000L;
+
private long pipeSubtaskExecutorCronHeartbeatEventIntervalSeconds = 20;
private long pipeMaxWaitFinishTime = 10 * 1000;
@@ -1415,6 +1418,30 @@ public class CommonConfig {
"pipeRetryLocallyForParallelOrUserConflict is set to {}.",
pipeSubtaskExecutorMaxThreadNum);
}
+ public long getPipeSinkSubtaskSleepIntervalMs() {
+ return pipeSinkSubtaskSleepIntervalMs;
+ }
+
+ public void setPipeSinkSubtaskSleepIntervalMs(long
pipeSinkSubtaskSleepIntervalMs) {
+ if (this.pipeSinkSubtaskSleepIntervalMs == pipeSinkSubtaskSleepIntervalMs)
{
+ return;
+ }
+ this.pipeSinkSubtaskSleepIntervalMs = pipeSinkSubtaskSleepIntervalMs;
+ logger.info("pipeSinkSubtaskSleepIntervalMs is set to {}.",
pipeSinkSubtaskSleepIntervalMs);
+ }
+
+ public long getPipeSinkSubtaskSleepMaxMs() {
+ return pipeSinkSubtaskSleepMaxMs;
+ }
+
+ public void setPipeSinkSubtaskSleepMaxMs(long pipeSinkSubtaskSleepMaxMs) {
+ if (this.pipeSinkSubtaskSleepMaxMs == pipeSinkSubtaskSleepMaxMs) {
+ return;
+ }
+ this.pipeSinkSubtaskSleepMaxMs = pipeSinkSubtaskSleepMaxMs;
+ logger.info("pipeSinkSubtaskSleepMaxMs is set to {}.",
pipeSinkSubtaskSleepMaxMs);
+ }
+
public long getPipeSubtaskExecutorPendingQueueMaxBlockingTimeMs() {
return pipeSubtaskExecutorPendingQueueMaxBlockingTimeMs;
}
diff --git
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeDescriptor.java
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeDescriptor.java
index 928ff5f25a5..41fab677412 100644
---
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeDescriptor.java
+++
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeDescriptor.java
@@ -296,6 +296,17 @@ public class PipeDescriptor {
"pipe_retry_locally_for_user_conflict",
String.valueOf(config.isPipeRetryLocallyForParallelOrUserConflict()))));
+ config.setPipeSinkSubtaskSleepIntervalMs(
+ Long.parseLong(
+ properties.getProperty(
+ "pipe_sink_subtask_sleep_interval_ms",
+ String.valueOf(config.getPipeSinkSubtaskSleepIntervalMs()))));
+ config.setPipeSinkSubtaskSleepMaxMs(
+ Long.parseLong(
+ properties.getProperty(
+ "pipe_sink_subtask_sleep_max_ms",
+ String.valueOf(config.getPipeSinkSubtaskSleepMaxMs()))));
+
config.setPipeSourceAssignerDisruptorRingBufferSize(
Integer.parseInt(
Optional.ofNullable(