This is an automated email from the ASF dual-hosted git repository. jackietien pushed a commit to branch force_ci/object_type in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit e34e0fcb929b1885aab0abdf315a7f52a10ae60a Author: Caideyipi <[email protected]> AuthorDate: Tue Nov 25 18:40:18 2025 +0800 Pipe: Adjusted the waiting time of temporary unavailable exceptions (#16798) * fff * fix * fix * fix * fix * f (cherry picked from commit 4525d0758673ddb6ae78f0bd2d7c365ec5ba2c8c) --- .../pipe/agent/task/PipeConfigNodeSubtask.java | 6 +- .../agent/task/subtask/sink/PipeSinkSubtask.java | 4 +- .../apache/iotdb/commons/conf/CommonConfig.java | 29 ++++++++ .../task/subtask/PipeAbstractSinkSubtask.java | 13 ++++ .../iotdb/commons/pipe/config/PipeConfig.java | 10 +++ .../iotdb/commons/pipe/config/PipeDescriptor.java | 11 +++ .../commons/pipe/task/PipeSleepIntervalTest.java | 83 ++++++++++++++++++++++ 7 files changed, 154 insertions(+), 2 deletions(-) diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/agent/task/PipeConfigNodeSubtask.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/agent/task/PipeConfigNodeSubtask.java index 2403e353b56..e922c23321a 100644 --- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/agent/task/PipeConfigNodeSubtask.java +++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/agent/task/PipeConfigNodeSubtask.java @@ -19,11 +19,13 @@ package org.apache.iotdb.confignode.manager.pipe.agent.task; +import org.apache.iotdb.commons.exception.pipe.PipeNonReportException; import org.apache.iotdb.commons.exception.pipe.PipeRuntimeException; import org.apache.iotdb.commons.pipe.agent.plugin.builtin.BuiltinPipePlugin; import org.apache.iotdb.commons.pipe.agent.task.meta.PipeTaskMeta; import org.apache.iotdb.commons.pipe.agent.task.progress.PipeEventCommitManager; import org.apache.iotdb.commons.pipe.agent.task.subtask.PipeAbstractSinkSubtask; +import org.apache.iotdb.commons.pipe.config.PipeConfig; import org.apache.iotdb.commons.pipe.config.constant.PipeProcessorConstant; import org.apache.iotdb.commons.pipe.config.plugin.configuraion.PipeTaskRuntimeConfiguration; import org.apache.iotdb.commons.pipe.config.plugin.env.PipeTaskProcessorRuntimeEnvironment; @@ -188,7 +190,9 @@ public class PipeConfigNodeSubtask extends PipeAbstractSinkSubtask { PipeConfigRegionSinkMetrics.getInstance().markConfigEvent(taskID); } decreaseReferenceCountAndReleaseLastEvent(event, true); - + sleepInterval = PipeConfig.getInstance().getPipeSinkSubtaskSleepIntervalInitMs(); + } catch (final PipeNonReportException e) { + sleep4NonReportException(); } catch (final PipeException e) { setLastExceptionEvent(event); if (!isClosed.get()) { diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/subtask/sink/PipeSinkSubtask.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/subtask/sink/PipeSinkSubtask.java index 308ddfb90d2..8a86db3ded5 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/subtask/sink/PipeSinkSubtask.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/subtask/sink/PipeSinkSubtask.java @@ -23,6 +23,7 @@ import org.apache.iotdb.commons.exception.pipe.PipeNonReportException; import org.apache.iotdb.commons.exception.pipe.PipeRuntimeException; import org.apache.iotdb.commons.pipe.agent.task.connection.UnboundedBlockingPendingQueue; import org.apache.iotdb.commons.pipe.agent.task.subtask.PipeAbstractSinkSubtask; +import org.apache.iotdb.commons.pipe.config.PipeConfig; import org.apache.iotdb.commons.pipe.event.EnrichedEvent; import org.apache.iotdb.commons.pipe.sink.protocol.IoTDBSink; import org.apache.iotdb.db.pipe.agent.PipeDataNodeAgent; @@ -132,8 +133,9 @@ public class PipeSinkSubtask extends PipeAbstractSinkSubtask { } decreaseReferenceCountAndReleaseLastEvent(event, true); + sleepInterval = PipeConfig.getInstance().getPipeSinkSubtaskSleepIntervalInitMs(); } catch (final PipeNonReportException e) { - // Ignore, go directly next round + sleep4NonReportException(); } catch (final PipeException e) { if (!isClosed.get()) { setLastExceptionEvent(event); diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java index d83e96eb100..cd0645adf4c 100644 --- a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java +++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java @@ -254,6 +254,9 @@ public class CommonConfig { private long pipeSubtaskExecutorBasicCheckPointIntervalByTimeDuration = 10 * 1000L; private long pipeSubtaskExecutorPendingQueueMaxBlockingTimeMs = 50; + private volatile long pipeSinkSubtaskSleepIntervalInitMs = 250L; + private volatile long pipeSinkSubtaskSleepIntervalMaxMs = 1000L; + private long pipeSubtaskExecutorCronHeartbeatEventIntervalSeconds = 20; private long pipeMaxWaitFinishTime = 10 * 1000; @@ -1415,6 +1418,32 @@ public class CommonConfig { "pipeRetryLocallyForParallelOrUserConflict is set to {}.", pipeSubtaskExecutorMaxThreadNum); } + public long getPipeSinkSubtaskSleepIntervalInitMs() { + return pipeSinkSubtaskSleepIntervalInitMs; + } + + public void setPipeSinkSubtaskSleepIntervalInitMs(long pipeSinkSubtaskSleepIntervalInitMs) { + if (this.pipeSinkSubtaskSleepIntervalInitMs == pipeSinkSubtaskSleepIntervalInitMs) { + return; + } + this.pipeSinkSubtaskSleepIntervalInitMs = pipeSinkSubtaskSleepIntervalInitMs; + logger.info( + "pipeSinkSubtaskSleepIntervalInitMs is set to {}.", pipeSinkSubtaskSleepIntervalInitMs); + } + + public long getPipeSinkSubtaskSleepIntervalMaxMs() { + return pipeSinkSubtaskSleepIntervalMaxMs; + } + + public void setPipeSinkSubtaskSleepIntervalMaxMs(long pipeSinkSubtaskSleepIntervalMaxMs) { + if (this.pipeSinkSubtaskSleepIntervalMaxMs == pipeSinkSubtaskSleepIntervalMaxMs) { + return; + } + this.pipeSinkSubtaskSleepIntervalMaxMs = pipeSinkSubtaskSleepIntervalMaxMs; + logger.info( + "pipeSinkSubtaskSleepIntervalMaxMs is set to {}.", pipeSinkSubtaskSleepIntervalMaxMs); + } + public long getPipeSubtaskExecutorPendingQueueMaxBlockingTimeMs() { return pipeSubtaskExecutorPendingQueueMaxBlockingTimeMs; } diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/subtask/PipeAbstractSinkSubtask.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/subtask/PipeAbstractSinkSubtask.java index 66d43e0743c..09083d45bca 100644 --- a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/subtask/PipeAbstractSinkSubtask.java +++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/subtask/PipeAbstractSinkSubtask.java @@ -53,6 +53,8 @@ public abstract class PipeAbstractSinkSubtask extends PipeReportableSubtask { @SuppressWarnings("java:S3077") protected volatile Event lastExceptionEvent; + protected long sleepInterval = PipeConfig.getInstance().getPipeSinkSubtaskSleepIntervalInitMs(); + protected PipeAbstractSinkSubtask( final String taskID, final long creationTime, final PipeConnector outputPipeSink) { super(taskID, creationTime); @@ -248,4 +250,15 @@ public abstract class PipeAbstractSinkSubtask extends PipeReportableSubtask { lastExceptionEvent = null; } } + + public void sleep4NonReportException() { + if (sleepInterval < PipeConfig.getInstance().getPipeSinkSubtaskSleepIntervalMaxMs()) { + sleepInterval <<= 1; + } + try { + Thread.sleep(sleepInterval); + } catch (final InterruptedException e) { + Thread.currentThread().interrupt(); + } + } } diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeConfig.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeConfig.java index ce09fb1f291..1ed1e39911f 100644 --- a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeConfig.java +++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeConfig.java @@ -143,6 +143,14 @@ public class PipeConfig { return COMMON_CONFIG.getPipeMaxWaitFinishTime(); } + public long getPipeSinkSubtaskSleepIntervalInitMs() { + return COMMON_CONFIG.getPipeSinkSubtaskSleepIntervalInitMs(); + } + + public long getPipeSinkSubtaskSleepIntervalMaxMs() { + return COMMON_CONFIG.getPipeSinkSubtaskSleepIntervalMaxMs(); + } + /////////////////////////////// Source /////////////////////////////// public int getPipeSourceAssignerDisruptorRingBufferSize() { @@ -484,6 +492,8 @@ public class PipeConfig { "PipeSubtaskExecutorCronHeartbeatEventIntervalSeconds: {}", getPipeSubtaskExecutorCronHeartbeatEventIntervalSeconds()); LOGGER.info("PipeMaxWaitFinishTime: {}", getPipeMaxWaitFinishTime()); + LOGGER.info("PipeSinkSubtaskSleepIntervalInitMs: {}", getPipeSinkSubtaskSleepIntervalInitMs()); + LOGGER.info("PipeSinkSubtaskSleepIntervalMaxMs: {}", getPipeSinkSubtaskSleepIntervalMaxMs()); LOGGER.info( "PipeSourceAssignerDisruptorRingBufferSize: {}", diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeDescriptor.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeDescriptor.java index 928ff5f25a5..cbe4dd25e83 100644 --- a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeDescriptor.java +++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeDescriptor.java @@ -296,6 +296,17 @@ public class PipeDescriptor { "pipe_retry_locally_for_user_conflict", String.valueOf(config.isPipeRetryLocallyForParallelOrUserConflict())))); + config.setPipeSinkSubtaskSleepIntervalInitMs( + Long.parseLong( + properties.getProperty( + "pipe_sink_subtask_sleep_interval_init_ms", + String.valueOf(config.getPipeSinkSubtaskSleepIntervalInitMs())))); + config.setPipeSinkSubtaskSleepIntervalMaxMs( + Long.parseLong( + properties.getProperty( + "pipe_sink_subtask_sleep_interval_max_ms", + String.valueOf(config.getPipeSinkSubtaskSleepIntervalMaxMs())))); + config.setPipeSourceAssignerDisruptorRingBufferSize( Integer.parseInt( Optional.ofNullable( diff --git a/iotdb-core/node-commons/src/test/java/org/apache/iotdb/commons/pipe/task/PipeSleepIntervalTest.java b/iotdb-core/node-commons/src/test/java/org/apache/iotdb/commons/pipe/task/PipeSleepIntervalTest.java new file mode 100644 index 00000000000..07ae50e992e --- /dev/null +++ b/iotdb-core/node-commons/src/test/java/org/apache/iotdb/commons/pipe/task/PipeSleepIntervalTest.java @@ -0,0 +1,83 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.commons.pipe.task; + +import org.apache.iotdb.commons.conf.CommonConfig; +import org.apache.iotdb.commons.conf.CommonDescriptor; +import org.apache.iotdb.commons.exception.pipe.PipeRuntimeException; +import org.apache.iotdb.commons.pipe.agent.task.subtask.PipeAbstractSinkSubtask; +import org.apache.iotdb.commons.pipe.config.PipeConfig; +import org.apache.iotdb.commons.pipe.event.EnrichedEvent; + +import org.junit.After; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; + +public class PipeSleepIntervalTest { + private long oldPipeSinkSubtaskSleepIntervalInitMs; + private long oldPipeSinkSubtaskSleepIntervalMaxMs; + + @Before + public void setUp() throws Exception { + final CommonConfig config = CommonDescriptor.getInstance().getConfig(); + oldPipeSinkSubtaskSleepIntervalInitMs = config.getPipeSinkSubtaskSleepIntervalInitMs(); + oldPipeSinkSubtaskSleepIntervalMaxMs = config.getPipeSinkSubtaskSleepIntervalMaxMs(); + config.setPipeSinkSubtaskSleepIntervalInitMs(25L); + config.setPipeSinkSubtaskSleepIntervalMaxMs(50L); + } + + @After + public void tearDown() throws Exception { + final CommonConfig config = CommonDescriptor.getInstance().getConfig(); + config.setPipeSinkSubtaskSleepIntervalInitMs(oldPipeSinkSubtaskSleepIntervalInitMs); + config.setPipeSinkSubtaskSleepIntervalMaxMs(oldPipeSinkSubtaskSleepIntervalMaxMs); + } + + @Test + public void test() { + try (final PipeAbstractSinkSubtask subtask = + new PipeAbstractSinkSubtask(null, 0, null) { + @Override + protected String getRootCause(Throwable throwable) { + return null; + } + + @Override + protected void report(EnrichedEvent event, PipeRuntimeException exception) {} + + @Override + protected boolean executeOnce() { + return false; + } + }) { + long startTime = System.currentTimeMillis(); + subtask.sleep4NonReportException(); + Assert.assertTrue( + System.currentTimeMillis() - startTime + >= PipeConfig.getInstance().getPipeSinkSubtaskSleepIntervalInitMs()); + startTime = System.currentTimeMillis() - startTime; + subtask.sleep4NonReportException(); + Assert.assertTrue( + System.currentTimeMillis() - startTime + >= PipeConfig.getInstance().getPipeSinkSubtaskSleepIntervalInitMs()); + } + } +}
