This is an automated email from the ASF dual-hosted git repository.
justinchen pushed a commit to branch wait-time-13
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/wait-time-13 by this push:
new c6e13ff41dc Pipe: Adjusted the waiting time of temporary unavailable
exceptions (#16798)
c6e13ff41dc is described below
commit c6e13ff41dcc42ca405e97cc945a28f4a9fbb690
Author: Caideyipi <[email protected]>
AuthorDate: Tue Nov 25 18:40:18 2025 +0800
Pipe: Adjusted the waiting time of temporary unavailable exceptions (#16798)
* fff
* fix
* fix
* fix
* fix
* f
---
.../pipe/agent/task/PipeConfigNodeSubtask.java | 5 ++
.../agent/task/subtask/sink/PipeSinkSubtask.java | 4 +-
.../apache/iotdb/commons/conf/CommonConfig.java | 29 ++++++++
.../task/subtask/PipeAbstractSinkSubtask.java | 13 ++++
.../iotdb/commons/pipe/config/PipeConfig.java | 10 +++
.../iotdb/commons/pipe/config/PipeDescriptor.java | 11 +++
.../commons/pipe/task/PipeSleepIntervalTest.java | 83 ++++++++++++++++++++++
7 files changed, 154 insertions(+), 1 deletion(-)
diff --git
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/agent/task/PipeConfigNodeSubtask.java
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/agent/task/PipeConfigNodeSubtask.java
index 890749f416e..66778f25893 100644
---
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/agent/task/PipeConfigNodeSubtask.java
+++
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/agent/task/PipeConfigNodeSubtask.java
@@ -19,11 +19,13 @@
package org.apache.iotdb.confignode.manager.pipe.agent.task;
+import org.apache.iotdb.commons.exception.pipe.PipeNonReportException;
import org.apache.iotdb.commons.exception.pipe.PipeRuntimeException;
import org.apache.iotdb.commons.pipe.agent.plugin.builtin.BuiltinPipePlugin;
import org.apache.iotdb.commons.pipe.agent.task.meta.PipeTaskMeta;
import
org.apache.iotdb.commons.pipe.agent.task.progress.PipeEventCommitManager;
import
org.apache.iotdb.commons.pipe.agent.task.subtask.PipeAbstractSinkSubtask;
+import org.apache.iotdb.commons.pipe.config.PipeConfig;
import org.apache.iotdb.commons.pipe.config.constant.PipeProcessorConstant;
import
org.apache.iotdb.commons.pipe.config.plugin.configuraion.PipeTaskRuntimeConfiguration;
import
org.apache.iotdb.commons.pipe.config.plugin.env.PipeTaskProcessorRuntimeEnvironment;
@@ -189,6 +191,9 @@ public class PipeConfigNodeSubtask extends
PipeAbstractSinkSubtask {
PipeConfigRegionSinkMetrics.getInstance().markConfigEvent(taskID);
}
decreaseReferenceCountAndReleaseLastEvent(event, true);
+ sleepInterval =
PipeConfig.getInstance().getPipeSinkSubtaskSleepIntervalInitMs();
+ } catch (final PipeNonReportException e) {
+ sleep4NonReportException();
} catch (final PipeException e) {
setLastExceptionEvent(event);
if (!isClosed.get()) {
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/subtask/sink/PipeSinkSubtask.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/subtask/sink/PipeSinkSubtask.java
index c4a1d0fecc3..5b917b5b4e5 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/subtask/sink/PipeSinkSubtask.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/subtask/sink/PipeSinkSubtask.java
@@ -23,6 +23,7 @@ import
org.apache.iotdb.commons.exception.pipe.PipeNonReportException;
import org.apache.iotdb.commons.exception.pipe.PipeRuntimeException;
import
org.apache.iotdb.commons.pipe.agent.task.connection.UnboundedBlockingPendingQueue;
import
org.apache.iotdb.commons.pipe.agent.task.subtask.PipeAbstractSinkSubtask;
+import org.apache.iotdb.commons.pipe.config.PipeConfig;
import org.apache.iotdb.commons.pipe.event.EnrichedEvent;
import org.apache.iotdb.commons.pipe.sink.protocol.IoTDBSink;
import org.apache.iotdb.db.pipe.agent.PipeDataNodeAgent;
@@ -132,8 +133,9 @@ public class PipeSinkSubtask extends
PipeAbstractSinkSubtask {
}
decreaseReferenceCountAndReleaseLastEvent(event, true);
+ sleepInterval =
PipeConfig.getInstance().getPipeSinkSubtaskSleepIntervalInitMs();
} catch (final PipeNonReportException e) {
- // Ignore, go directly next round
+ sleep4NonReportException();
} catch (final PipeException e) {
if (!isClosed.get()) {
setLastExceptionEvent(event);
diff --git
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java
index 9951fe217a8..f831125d46d 100644
---
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java
+++
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java
@@ -251,6 +251,9 @@ public class CommonConfig {
private long pipeSubtaskExecutorBasicCheckPointIntervalByTimeDuration = 10 *
1000L;
private long pipeSubtaskExecutorPendingQueueMaxBlockingTimeMs = 50;
+ private volatile long pipeSinkSubtaskSleepIntervalInitMs = 250L;
+ private volatile long pipeSinkSubtaskSleepIntervalMaxMs = 1000L;
+
private long pipeSubtaskExecutorCronHeartbeatEventIntervalSeconds = 20;
private long pipeMaxWaitFinishTime = 10 * 1000;
@@ -1359,6 +1362,32 @@ public class CommonConfig {
"pipeRetryLocallyForParallelOrUserConflict is set to {}.",
pipeSubtaskExecutorMaxThreadNum);
}
+ public long getPipeSinkSubtaskSleepIntervalInitMs() {
+ return pipeSinkSubtaskSleepIntervalInitMs;
+ }
+
+ public void setPipeSinkSubtaskSleepIntervalInitMs(long
pipeSinkSubtaskSleepIntervalInitMs) {
+ if (this.pipeSinkSubtaskSleepIntervalInitMs ==
pipeSinkSubtaskSleepIntervalInitMs) {
+ return;
+ }
+ this.pipeSinkSubtaskSleepIntervalInitMs =
pipeSinkSubtaskSleepIntervalInitMs;
+ logger.info(
+ "pipeSinkSubtaskSleepIntervalInitMs is set to {}.",
pipeSinkSubtaskSleepIntervalInitMs);
+ }
+
+ public long getPipeSinkSubtaskSleepIntervalMaxMs() {
+ return pipeSinkSubtaskSleepIntervalMaxMs;
+ }
+
+ public void setPipeSinkSubtaskSleepIntervalMaxMs(long
pipeSinkSubtaskSleepIntervalMaxMs) {
+ if (this.pipeSinkSubtaskSleepIntervalMaxMs ==
pipeSinkSubtaskSleepIntervalMaxMs) {
+ return;
+ }
+ this.pipeSinkSubtaskSleepIntervalMaxMs = pipeSinkSubtaskSleepIntervalMaxMs;
+ logger.info(
+ "pipeSinkSubtaskSleepIntervalMaxMs is set to {}.",
pipeSinkSubtaskSleepIntervalMaxMs);
+ }
+
public long getPipeSubtaskExecutorPendingQueueMaxBlockingTimeMs() {
return pipeSubtaskExecutorPendingQueueMaxBlockingTimeMs;
}
diff --git
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/subtask/PipeAbstractSinkSubtask.java
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/subtask/PipeAbstractSinkSubtask.java
index 62cce7438ba..87bb0c95949 100644
---
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/subtask/PipeAbstractSinkSubtask.java
+++
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/subtask/PipeAbstractSinkSubtask.java
@@ -53,6 +53,8 @@ public abstract class PipeAbstractSinkSubtask extends
PipeReportableSubtask {
@SuppressWarnings("java:S3077")
protected volatile Event lastExceptionEvent;
+ protected long sleepInterval =
PipeConfig.getInstance().getPipeSinkSubtaskSleepIntervalInitMs();
+
protected PipeAbstractSinkSubtask(
final String taskID, final long creationTime, final PipeConnector
outputPipeConnector) {
super(taskID, creationTime);
@@ -248,4 +250,15 @@ public abstract class PipeAbstractSinkSubtask extends
PipeReportableSubtask {
lastExceptionEvent = null;
}
}
+
+ public void sleep4NonReportException() {
+ if (sleepInterval <
PipeConfig.getInstance().getPipeSinkSubtaskSleepIntervalMaxMs()) {
+ sleepInterval <<= 1;
+ }
+ try {
+ Thread.sleep(sleepInterval);
+ } catch (final InterruptedException e) {
+ Thread.currentThread().interrupt();
+ }
+ }
}
diff --git
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeConfig.java
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeConfig.java
index 1ab26170d19..a815800e23f 100644
---
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeConfig.java
+++
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeConfig.java
@@ -143,6 +143,14 @@ public class PipeConfig {
return COMMON_CONFIG.getPipeMaxWaitFinishTime();
}
+ public long getPipeSinkSubtaskSleepIntervalInitMs() {
+ return COMMON_CONFIG.getPipeSinkSubtaskSleepIntervalInitMs();
+ }
+
+ public long getPipeSinkSubtaskSleepIntervalMaxMs() {
+ return COMMON_CONFIG.getPipeSinkSubtaskSleepIntervalMaxMs();
+ }
+
/////////////////////////////// Source ///////////////////////////////
public int getPipeSourceAssignerDisruptorRingBufferSize() {
@@ -484,6 +492,8 @@ public class PipeConfig {
"PipeSubtaskExecutorCronHeartbeatEventIntervalSeconds: {}",
getPipeSubtaskExecutorCronHeartbeatEventIntervalSeconds());
LOGGER.info("PipeMaxWaitFinishTime: {}", getPipeMaxWaitFinishTime());
+ LOGGER.info("PipeSinkSubtaskSleepIntervalInitMs: {}",
getPipeSinkSubtaskSleepIntervalInitMs());
+ LOGGER.info("PipeSinkSubtaskSleepIntervalMaxMs: {}",
getPipeSinkSubtaskSleepIntervalMaxMs());
LOGGER.info(
"PipeSourceAssignerDisruptorRingBufferSize: {}",
diff --git
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeDescriptor.java
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeDescriptor.java
index ec6860be3fb..9c46978b7d4 100644
---
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeDescriptor.java
+++
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeDescriptor.java
@@ -296,6 +296,17 @@ public class PipeDescriptor {
"pipe_retry_locally_for_user_conflict",
String.valueOf(config.isPipeRetryLocallyForParallelOrUserConflict()))));
+ config.setPipeSinkSubtaskSleepIntervalInitMs(
+ Long.parseLong(
+ properties.getProperty(
+ "pipe_sink_subtask_sleep_interval_init_ms",
+
String.valueOf(config.getPipeSinkSubtaskSleepIntervalInitMs()))));
+ config.setPipeSinkSubtaskSleepIntervalMaxMs(
+ Long.parseLong(
+ properties.getProperty(
+ "pipe_sink_subtask_sleep_interval_max_ms",
+
String.valueOf(config.getPipeSinkSubtaskSleepIntervalMaxMs()))));
+
config.setPipeSourceAssignerDisruptorRingBufferSize(
Integer.parseInt(
Optional.ofNullable(
diff --git
a/iotdb-core/node-commons/src/test/java/org/apache/iotdb/commons/pipe/task/PipeSleepIntervalTest.java
b/iotdb-core/node-commons/src/test/java/org/apache/iotdb/commons/pipe/task/PipeSleepIntervalTest.java
new file mode 100644
index 00000000000..07ae50e992e
--- /dev/null
+++
b/iotdb-core/node-commons/src/test/java/org/apache/iotdb/commons/pipe/task/PipeSleepIntervalTest.java
@@ -0,0 +1,83 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.commons.pipe.task;
+
+import org.apache.iotdb.commons.conf.CommonConfig;
+import org.apache.iotdb.commons.conf.CommonDescriptor;
+import org.apache.iotdb.commons.exception.pipe.PipeRuntimeException;
+import
org.apache.iotdb.commons.pipe.agent.task.subtask.PipeAbstractSinkSubtask;
+import org.apache.iotdb.commons.pipe.config.PipeConfig;
+import org.apache.iotdb.commons.pipe.event.EnrichedEvent;
+
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+public class PipeSleepIntervalTest {
+ private long oldPipeSinkSubtaskSleepIntervalInitMs;
+ private long oldPipeSinkSubtaskSleepIntervalMaxMs;
+
+ @Before
+ public void setUp() throws Exception {
+ final CommonConfig config = CommonDescriptor.getInstance().getConfig();
+ oldPipeSinkSubtaskSleepIntervalInitMs =
config.getPipeSinkSubtaskSleepIntervalInitMs();
+ oldPipeSinkSubtaskSleepIntervalMaxMs =
config.getPipeSinkSubtaskSleepIntervalMaxMs();
+ config.setPipeSinkSubtaskSleepIntervalInitMs(25L);
+ config.setPipeSinkSubtaskSleepIntervalMaxMs(50L);
+ }
+
+ @After
+ public void tearDown() throws Exception {
+ final CommonConfig config = CommonDescriptor.getInstance().getConfig();
+
config.setPipeSinkSubtaskSleepIntervalInitMs(oldPipeSinkSubtaskSleepIntervalInitMs);
+
config.setPipeSinkSubtaskSleepIntervalMaxMs(oldPipeSinkSubtaskSleepIntervalMaxMs);
+ }
+
+ @Test
+ public void test() {
+ try (final PipeAbstractSinkSubtask subtask =
+ new PipeAbstractSinkSubtask(null, 0, null) {
+ @Override
+ protected String getRootCause(Throwable throwable) {
+ return null;
+ }
+
+ @Override
+ protected void report(EnrichedEvent event, PipeRuntimeException
exception) {}
+
+ @Override
+ protected boolean executeOnce() {
+ return false;
+ }
+ }) {
+ long startTime = System.currentTimeMillis();
+ subtask.sleep4NonReportException();
+ Assert.assertTrue(
+ System.currentTimeMillis() - startTime
+ >=
PipeConfig.getInstance().getPipeSinkSubtaskSleepIntervalInitMs());
+ startTime = System.currentTimeMillis() - startTime;
+ subtask.sleep4NonReportException();
+ Assert.assertTrue(
+ System.currentTimeMillis() - startTime
+ >=
PipeConfig.getInstance().getPipeSinkSubtaskSleepIntervalInitMs());
+ }
+ }
+}