This is an automated email from the ASF dual-hosted git repository.
justinchen pushed a commit to branch wait_time
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/wait_time by this push:
new 7a4ba437070 fix
7a4ba437070 is described below
commit 7a4ba437070b9fe1acc20129c3b8d2287cab12df
Author: Caideyipi <[email protected]>
AuthorDate: Tue Nov 25 10:34:48 2025 +0800
fix
---
.../agent/task/subtask/sink/PipeSinkSubtask.java | 9 +++----
.../apache/iotdb/commons/conf/CommonConfig.java | 30 ++++++++++++----------
.../iotdb/commons/pipe/config/PipeConfig.java | 10 ++++++++
.../iotdb/commons/pipe/config/PipeDescriptor.java | 12 ++++-----
4 files changed, 36 insertions(+), 25 deletions(-)
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/subtask/sink/PipeSinkSubtask.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/subtask/sink/PipeSinkSubtask.java
index 26d64108724..b23a6f2ed49 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/subtask/sink/PipeSinkSubtask.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/subtask/sink/PipeSinkSubtask.java
@@ -23,6 +23,7 @@ import
org.apache.iotdb.commons.exception.pipe.PipeNonReportException;
import org.apache.iotdb.commons.exception.pipe.PipeRuntimeException;
import
org.apache.iotdb.commons.pipe.agent.task.connection.UnboundedBlockingPendingQueue;
import
org.apache.iotdb.commons.pipe.agent.task.subtask.PipeAbstractSinkSubtask;
+import org.apache.iotdb.commons.pipe.config.PipeConfig;
import org.apache.iotdb.commons.pipe.event.EnrichedEvent;
import org.apache.iotdb.commons.pipe.sink.protocol.IoTDBSink;
import org.apache.iotdb.db.pipe.agent.PipeDataNodeAgent;
@@ -51,8 +52,6 @@ import java.util.Objects;
public class PipeSinkSubtask extends PipeAbstractSinkSubtask {
private static final Logger LOGGER =
LoggerFactory.getLogger(PipeSinkSubtask.class);
- private static final int SLEEP_MAX_INTERVAL_MS = 1000;
- private static final int SLEEP_INIT_INTERVAL_MS = 250;
// For input
protected final UnboundedBlockingPendingQueue<Event> inputPendingQueue;
@@ -60,7 +59,7 @@ public class PipeSinkSubtask extends PipeAbstractSinkSubtask {
// Record these variables to provide corresponding value to tag key of
monitoring metrics
private final String attributeSortedString;
private final int connectorIndex;
- private int sleepInterval = SLEEP_INIT_INTERVAL_MS;
+ private long sleepInterval =
PipeConfig.getInstance().getPipeSinkSubtaskSleepIntervalInitMs();
// Now parallel connectors run the same time, thus the heartbeat events are
not sure
// to trigger the general event transfer function, causing potentially such
as
@@ -135,7 +134,7 @@ public class PipeSinkSubtask extends
PipeAbstractSinkSubtask {
}
decreaseReferenceCountAndReleaseLastEvent(event, true);
- sleepInterval = SLEEP_INIT_INTERVAL_MS;
+ sleepInterval =
PipeConfig.getInstance().getPipeSinkSubtaskSleepIntervalInitMs();
} catch (final PipeNonReportException e) {
sleep4NonReportException();
} catch (final PipeException e) {
@@ -228,7 +227,7 @@ public class PipeSinkSubtask extends
PipeAbstractSinkSubtask {
}
private void sleep4NonReportException() {
- if (sleepInterval < SLEEP_MAX_INTERVAL_MS) {
+ if (sleepInterval <
PipeConfig.getInstance().getPipeSinkSubtaskSleepIntervalMaxMs()) {
sleepInterval <<= 1;
}
try {
diff --git
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java
index 788d27f3ccf..4e748ce647c 100644
---
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java
+++
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java
@@ -254,8 +254,8 @@ public class CommonConfig {
private long pipeSubtaskExecutorBasicCheckPointIntervalByTimeDuration = 10 *
1000L;
private long pipeSubtaskExecutorPendingQueueMaxBlockingTimeMs = 50;
- private long pipeSinkSubtaskSleepIntervalMs = 250L;
- private long pipeSinkSubtaskSleepMaxMs = 1000L;
+ private long pipeSinkSubtaskSleepIntervalInitMs = 250L;
+ private long pipeSinkSubtaskSleepIntervalMaxMs = 1000L;
private long pipeSubtaskExecutorCronHeartbeatEventIntervalSeconds = 20;
@@ -1418,28 +1418,30 @@ public class CommonConfig {
"pipeRetryLocallyForParallelOrUserConflict is set to {}.",
pipeSubtaskExecutorMaxThreadNum);
}
- public long getPipeSinkSubtaskSleepIntervalMs() {
- return pipeSinkSubtaskSleepIntervalMs;
+ public long getPipeSinkSubtaskSleepIntervalInitMs() {
+ return pipeSinkSubtaskSleepIntervalInitMs;
}
- public void setPipeSinkSubtaskSleepIntervalMs(long
pipeSinkSubtaskSleepIntervalMs) {
- if (this.pipeSinkSubtaskSleepIntervalMs == pipeSinkSubtaskSleepIntervalMs)
{
+ public void setPipeSinkSubtaskSleepIntervalInitMs(long
pipeSinkSubtaskSleepIntervalInitMs) {
+ if (this.pipeSinkSubtaskSleepIntervalInitMs ==
pipeSinkSubtaskSleepIntervalInitMs) {
return;
}
- this.pipeSinkSubtaskSleepIntervalMs = pipeSinkSubtaskSleepIntervalMs;
- logger.info("pipeSinkSubtaskSleepIntervalMs is set to {}.",
pipeSinkSubtaskSleepIntervalMs);
+ this.pipeSinkSubtaskSleepIntervalInitMs =
pipeSinkSubtaskSleepIntervalInitMs;
+ logger.info(
+ "pipeSinkSubtaskSleepIntervalInitMs is set to {}.",
pipeSinkSubtaskSleepIntervalInitMs);
}
- public long getPipeSinkSubtaskSleepMaxMs() {
- return pipeSinkSubtaskSleepMaxMs;
+ public long getPipeSinkSubtaskSleepIntervalMaxMs() {
+ return pipeSinkSubtaskSleepIntervalMaxMs;
}
- public void setPipeSinkSubtaskSleepMaxMs(long pipeSinkSubtaskSleepMaxMs) {
- if (this.pipeSinkSubtaskSleepMaxMs == pipeSinkSubtaskSleepMaxMs) {
+ public void setPipeSinkSubtaskSleepIntervalMaxMs(long
pipeSinkSubtaskSleepIntervalMaxMs) {
+ if (this.pipeSinkSubtaskSleepIntervalMaxMs ==
pipeSinkSubtaskSleepIntervalMaxMs) {
return;
}
- this.pipeSinkSubtaskSleepMaxMs = pipeSinkSubtaskSleepMaxMs;
- logger.info("pipeSinkSubtaskSleepMaxMs is set to {}.",
pipeSinkSubtaskSleepMaxMs);
+ this.pipeSinkSubtaskSleepIntervalMaxMs = pipeSinkSubtaskSleepIntervalMaxMs;
+ logger.info(
+ "pipeSinkSubtaskSleepIntervalMaxMs is set to {}.",
pipeSinkSubtaskSleepIntervalMaxMs);
}
public long getPipeSubtaskExecutorPendingQueueMaxBlockingTimeMs() {
diff --git
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeConfig.java
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeConfig.java
index ce09fb1f291..1ed1e39911f 100644
---
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeConfig.java
+++
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeConfig.java
@@ -143,6 +143,14 @@ public class PipeConfig {
return COMMON_CONFIG.getPipeMaxWaitFinishTime();
}
+ public long getPipeSinkSubtaskSleepIntervalInitMs() {
+ return COMMON_CONFIG.getPipeSinkSubtaskSleepIntervalInitMs();
+ }
+
+ public long getPipeSinkSubtaskSleepIntervalMaxMs() {
+ return COMMON_CONFIG.getPipeSinkSubtaskSleepIntervalMaxMs();
+ }
+
/////////////////////////////// Source ///////////////////////////////
public int getPipeSourceAssignerDisruptorRingBufferSize() {
@@ -484,6 +492,8 @@ public class PipeConfig {
"PipeSubtaskExecutorCronHeartbeatEventIntervalSeconds: {}",
getPipeSubtaskExecutorCronHeartbeatEventIntervalSeconds());
LOGGER.info("PipeMaxWaitFinishTime: {}", getPipeMaxWaitFinishTime());
+ LOGGER.info("PipeSinkSubtaskSleepIntervalInitMs: {}",
getPipeSinkSubtaskSleepIntervalInitMs());
+ LOGGER.info("PipeSinkSubtaskSleepIntervalMaxMs: {}",
getPipeSinkSubtaskSleepIntervalMaxMs());
LOGGER.info(
"PipeSourceAssignerDisruptorRingBufferSize: {}",
diff --git
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeDescriptor.java
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeDescriptor.java
index 41fab677412..cbe4dd25e83 100644
---
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeDescriptor.java
+++
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeDescriptor.java
@@ -296,16 +296,16 @@ public class PipeDescriptor {
"pipe_retry_locally_for_user_conflict",
String.valueOf(config.isPipeRetryLocallyForParallelOrUserConflict()))));
- config.setPipeSinkSubtaskSleepIntervalMs(
+ config.setPipeSinkSubtaskSleepIntervalInitMs(
Long.parseLong(
properties.getProperty(
- "pipe_sink_subtask_sleep_interval_ms",
- String.valueOf(config.getPipeSinkSubtaskSleepIntervalMs()))));
- config.setPipeSinkSubtaskSleepMaxMs(
+ "pipe_sink_subtask_sleep_interval_init_ms",
+
String.valueOf(config.getPipeSinkSubtaskSleepIntervalInitMs()))));
+ config.setPipeSinkSubtaskSleepIntervalMaxMs(
Long.parseLong(
properties.getProperty(
- "pipe_sink_subtask_sleep_max_ms",
- String.valueOf(config.getPipeSinkSubtaskSleepMaxMs()))));
+ "pipe_sink_subtask_sleep_interval_max_ms",
+
String.valueOf(config.getPipeSinkSubtaskSleepIntervalMaxMs()))));
config.setPipeSourceAssignerDisruptorRingBufferSize(
Integer.parseInt(