This is an automated email from the ASF dual-hosted git repository.

jackietien pushed a commit to branch force_ci/object_type
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit e34e0fcb929b1885aab0abdf315a7f52a10ae60a
Author: Caideyipi <[email protected]>
AuthorDate: Tue Nov 25 18:40:18 2025 +0800

    Pipe: Adjusted the waiting time of temporary unavailable exceptions (#16798)
    
    * fff
    
    * fix
    
    * fix
    
    * fix
    
    * fix
    
    * f
    
    (cherry picked from commit 4525d0758673ddb6ae78f0bd2d7c365ec5ba2c8c)
---
 .../pipe/agent/task/PipeConfigNodeSubtask.java     |  6 +-
 .../agent/task/subtask/sink/PipeSinkSubtask.java   |  4 +-
 .../apache/iotdb/commons/conf/CommonConfig.java    | 29 ++++++++
 .../task/subtask/PipeAbstractSinkSubtask.java      | 13 ++++
 .../iotdb/commons/pipe/config/PipeConfig.java      | 10 +++
 .../iotdb/commons/pipe/config/PipeDescriptor.java  | 11 +++
 .../commons/pipe/task/PipeSleepIntervalTest.java   | 83 ++++++++++++++++++++++
 7 files changed, 154 insertions(+), 2 deletions(-)

diff --git 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/agent/task/PipeConfigNodeSubtask.java
 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/agent/task/PipeConfigNodeSubtask.java
index 2403e353b56..e922c23321a 100644
--- 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/agent/task/PipeConfigNodeSubtask.java
+++ 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/agent/task/PipeConfigNodeSubtask.java
@@ -19,11 +19,13 @@
 
 package org.apache.iotdb.confignode.manager.pipe.agent.task;
 
+import org.apache.iotdb.commons.exception.pipe.PipeNonReportException;
 import org.apache.iotdb.commons.exception.pipe.PipeRuntimeException;
 import org.apache.iotdb.commons.pipe.agent.plugin.builtin.BuiltinPipePlugin;
 import org.apache.iotdb.commons.pipe.agent.task.meta.PipeTaskMeta;
 import 
org.apache.iotdb.commons.pipe.agent.task.progress.PipeEventCommitManager;
 import 
org.apache.iotdb.commons.pipe.agent.task.subtask.PipeAbstractSinkSubtask;
+import org.apache.iotdb.commons.pipe.config.PipeConfig;
 import org.apache.iotdb.commons.pipe.config.constant.PipeProcessorConstant;
 import 
org.apache.iotdb.commons.pipe.config.plugin.configuraion.PipeTaskRuntimeConfiguration;
 import 
org.apache.iotdb.commons.pipe.config.plugin.env.PipeTaskProcessorRuntimeEnvironment;
@@ -188,7 +190,9 @@ public class PipeConfigNodeSubtask extends 
PipeAbstractSinkSubtask {
         PipeConfigRegionSinkMetrics.getInstance().markConfigEvent(taskID);
       }
       decreaseReferenceCountAndReleaseLastEvent(event, true);
-
+      sleepInterval = 
PipeConfig.getInstance().getPipeSinkSubtaskSleepIntervalInitMs();
+    } catch (final PipeNonReportException e) {
+      sleep4NonReportException();
     } catch (final PipeException e) {
       setLastExceptionEvent(event);
       if (!isClosed.get()) {
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/subtask/sink/PipeSinkSubtask.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/subtask/sink/PipeSinkSubtask.java
index 308ddfb90d2..8a86db3ded5 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/subtask/sink/PipeSinkSubtask.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/subtask/sink/PipeSinkSubtask.java
@@ -23,6 +23,7 @@ import 
org.apache.iotdb.commons.exception.pipe.PipeNonReportException;
 import org.apache.iotdb.commons.exception.pipe.PipeRuntimeException;
 import 
org.apache.iotdb.commons.pipe.agent.task.connection.UnboundedBlockingPendingQueue;
 import 
org.apache.iotdb.commons.pipe.agent.task.subtask.PipeAbstractSinkSubtask;
+import org.apache.iotdb.commons.pipe.config.PipeConfig;
 import org.apache.iotdb.commons.pipe.event.EnrichedEvent;
 import org.apache.iotdb.commons.pipe.sink.protocol.IoTDBSink;
 import org.apache.iotdb.db.pipe.agent.PipeDataNodeAgent;
@@ -132,8 +133,9 @@ public class PipeSinkSubtask extends 
PipeAbstractSinkSubtask {
       }
 
       decreaseReferenceCountAndReleaseLastEvent(event, true);
+      sleepInterval = 
PipeConfig.getInstance().getPipeSinkSubtaskSleepIntervalInitMs();
     } catch (final PipeNonReportException e) {
-      // Ignore, go directly next round
+      sleep4NonReportException();
     } catch (final PipeException e) {
       if (!isClosed.get()) {
         setLastExceptionEvent(event);
diff --git 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java
 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java
index d83e96eb100..cd0645adf4c 100644
--- 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java
+++ 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java
@@ -254,6 +254,9 @@ public class CommonConfig {
   private long pipeSubtaskExecutorBasicCheckPointIntervalByTimeDuration = 10 * 
1000L;
   private long pipeSubtaskExecutorPendingQueueMaxBlockingTimeMs = 50;
 
+  private volatile long pipeSinkSubtaskSleepIntervalInitMs = 250L;
+  private volatile long pipeSinkSubtaskSleepIntervalMaxMs = 1000L;
+
   private long pipeSubtaskExecutorCronHeartbeatEventIntervalSeconds = 20;
 
   private long pipeMaxWaitFinishTime = 10 * 1000;
@@ -1415,6 +1418,32 @@ public class CommonConfig {
         "pipeRetryLocallyForParallelOrUserConflict is set to {}.", 
pipeSubtaskExecutorMaxThreadNum);
   }
 
+  public long getPipeSinkSubtaskSleepIntervalInitMs() {
+    return pipeSinkSubtaskSleepIntervalInitMs;
+  }
+
+  public void setPipeSinkSubtaskSleepIntervalInitMs(long 
pipeSinkSubtaskSleepIntervalInitMs) {
+    if (this.pipeSinkSubtaskSleepIntervalInitMs == 
pipeSinkSubtaskSleepIntervalInitMs) {
+      return;
+    }
+    this.pipeSinkSubtaskSleepIntervalInitMs = 
pipeSinkSubtaskSleepIntervalInitMs;
+    logger.info(
+        "pipeSinkSubtaskSleepIntervalInitMs is set to {}.", 
pipeSinkSubtaskSleepIntervalInitMs);
+  }
+
+  public long getPipeSinkSubtaskSleepIntervalMaxMs() {
+    return pipeSinkSubtaskSleepIntervalMaxMs;
+  }
+
+  public void setPipeSinkSubtaskSleepIntervalMaxMs(long 
pipeSinkSubtaskSleepIntervalMaxMs) {
+    if (this.pipeSinkSubtaskSleepIntervalMaxMs == 
pipeSinkSubtaskSleepIntervalMaxMs) {
+      return;
+    }
+    this.pipeSinkSubtaskSleepIntervalMaxMs = pipeSinkSubtaskSleepIntervalMaxMs;
+    logger.info(
+        "pipeSinkSubtaskSleepIntervalMaxMs is set to {}.", 
pipeSinkSubtaskSleepIntervalMaxMs);
+  }
+
   public long getPipeSubtaskExecutorPendingQueueMaxBlockingTimeMs() {
     return pipeSubtaskExecutorPendingQueueMaxBlockingTimeMs;
   }
diff --git 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/subtask/PipeAbstractSinkSubtask.java
 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/subtask/PipeAbstractSinkSubtask.java
index 66d43e0743c..09083d45bca 100644
--- 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/subtask/PipeAbstractSinkSubtask.java
+++ 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/subtask/PipeAbstractSinkSubtask.java
@@ -53,6 +53,8 @@ public abstract class PipeAbstractSinkSubtask extends 
PipeReportableSubtask {
   @SuppressWarnings("java:S3077")
   protected volatile Event lastExceptionEvent;
 
+  protected long sleepInterval = 
PipeConfig.getInstance().getPipeSinkSubtaskSleepIntervalInitMs();
+
   protected PipeAbstractSinkSubtask(
       final String taskID, final long creationTime, final PipeConnector 
outputPipeSink) {
     super(taskID, creationTime);
@@ -248,4 +250,15 @@ public abstract class PipeAbstractSinkSubtask extends 
PipeReportableSubtask {
       lastExceptionEvent = null;
     }
   }
+
+  public void sleep4NonReportException() {
+    if (sleepInterval < 
PipeConfig.getInstance().getPipeSinkSubtaskSleepIntervalMaxMs()) {
+      sleepInterval <<= 1;
+    }
+    try {
+      Thread.sleep(sleepInterval);
+    } catch (final InterruptedException e) {
+      Thread.currentThread().interrupt();
+    }
+  }
 }
diff --git 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeConfig.java
 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeConfig.java
index ce09fb1f291..1ed1e39911f 100644
--- 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeConfig.java
+++ 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeConfig.java
@@ -143,6 +143,14 @@ public class PipeConfig {
     return COMMON_CONFIG.getPipeMaxWaitFinishTime();
   }
 
+  public long getPipeSinkSubtaskSleepIntervalInitMs() {
+    return COMMON_CONFIG.getPipeSinkSubtaskSleepIntervalInitMs();
+  }
+
+  public long getPipeSinkSubtaskSleepIntervalMaxMs() {
+    return COMMON_CONFIG.getPipeSinkSubtaskSleepIntervalMaxMs();
+  }
+
   /////////////////////////////// Source ///////////////////////////////
 
   public int getPipeSourceAssignerDisruptorRingBufferSize() {
@@ -484,6 +492,8 @@ public class PipeConfig {
         "PipeSubtaskExecutorCronHeartbeatEventIntervalSeconds: {}",
         getPipeSubtaskExecutorCronHeartbeatEventIntervalSeconds());
     LOGGER.info("PipeMaxWaitFinishTime: {}", getPipeMaxWaitFinishTime());
+    LOGGER.info("PipeSinkSubtaskSleepIntervalInitMs: {}", 
getPipeSinkSubtaskSleepIntervalInitMs());
+    LOGGER.info("PipeSinkSubtaskSleepIntervalMaxMs: {}", 
getPipeSinkSubtaskSleepIntervalMaxMs());
 
     LOGGER.info(
         "PipeSourceAssignerDisruptorRingBufferSize: {}",
diff --git 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeDescriptor.java
 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeDescriptor.java
index 928ff5f25a5..cbe4dd25e83 100644
--- 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeDescriptor.java
+++ 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeDescriptor.java
@@ -296,6 +296,17 @@ public class PipeDescriptor {
                 "pipe_retry_locally_for_user_conflict",
                 
String.valueOf(config.isPipeRetryLocallyForParallelOrUserConflict()))));
 
+    config.setPipeSinkSubtaskSleepIntervalInitMs(
+        Long.parseLong(
+            properties.getProperty(
+                "pipe_sink_subtask_sleep_interval_init_ms",
+                
String.valueOf(config.getPipeSinkSubtaskSleepIntervalInitMs()))));
+    config.setPipeSinkSubtaskSleepIntervalMaxMs(
+        Long.parseLong(
+            properties.getProperty(
+                "pipe_sink_subtask_sleep_interval_max_ms",
+                
String.valueOf(config.getPipeSinkSubtaskSleepIntervalMaxMs()))));
+
     config.setPipeSourceAssignerDisruptorRingBufferSize(
         Integer.parseInt(
             Optional.ofNullable(
diff --git 
a/iotdb-core/node-commons/src/test/java/org/apache/iotdb/commons/pipe/task/PipeSleepIntervalTest.java
 
b/iotdb-core/node-commons/src/test/java/org/apache/iotdb/commons/pipe/task/PipeSleepIntervalTest.java
new file mode 100644
index 00000000000..07ae50e992e
--- /dev/null
+++ 
b/iotdb-core/node-commons/src/test/java/org/apache/iotdb/commons/pipe/task/PipeSleepIntervalTest.java
@@ -0,0 +1,83 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.commons.pipe.task;
+
+import org.apache.iotdb.commons.conf.CommonConfig;
+import org.apache.iotdb.commons.conf.CommonDescriptor;
+import org.apache.iotdb.commons.exception.pipe.PipeRuntimeException;
+import 
org.apache.iotdb.commons.pipe.agent.task.subtask.PipeAbstractSinkSubtask;
+import org.apache.iotdb.commons.pipe.config.PipeConfig;
+import org.apache.iotdb.commons.pipe.event.EnrichedEvent;
+
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+public class PipeSleepIntervalTest {
+  private long oldPipeSinkSubtaskSleepIntervalInitMs;
+  private long oldPipeSinkSubtaskSleepIntervalMaxMs;
+
+  @Before
+  public void setUp() throws Exception {
+    final CommonConfig config = CommonDescriptor.getInstance().getConfig();
+    oldPipeSinkSubtaskSleepIntervalInitMs = 
config.getPipeSinkSubtaskSleepIntervalInitMs();
+    oldPipeSinkSubtaskSleepIntervalMaxMs = 
config.getPipeSinkSubtaskSleepIntervalMaxMs();
+    config.setPipeSinkSubtaskSleepIntervalInitMs(25L);
+    config.setPipeSinkSubtaskSleepIntervalMaxMs(50L);
+  }
+
+  @After
+  public void tearDown() throws Exception {
+    final CommonConfig config = CommonDescriptor.getInstance().getConfig();
+    
config.setPipeSinkSubtaskSleepIntervalInitMs(oldPipeSinkSubtaskSleepIntervalInitMs);
+    
config.setPipeSinkSubtaskSleepIntervalMaxMs(oldPipeSinkSubtaskSleepIntervalMaxMs);
+  }
+
+  @Test
+  public void test() {
+    try (final PipeAbstractSinkSubtask subtask =
+        new PipeAbstractSinkSubtask(null, 0, null) {
+          @Override
+          protected String getRootCause(Throwable throwable) {
+            return null;
+          }
+
+          @Override
+          protected void report(EnrichedEvent event, PipeRuntimeException 
exception) {}
+
+          @Override
+          protected boolean executeOnce() {
+            return false;
+          }
+        }) {
+      long startTime = System.currentTimeMillis();
+      subtask.sleep4NonReportException();
+      Assert.assertTrue(
+          System.currentTimeMillis() - startTime
+              >= 
PipeConfig.getInstance().getPipeSinkSubtaskSleepIntervalInitMs());
+      startTime = System.currentTimeMillis() - startTime;
+      subtask.sleep4NonReportException();
+      Assert.assertTrue(
+          System.currentTimeMillis() - startTime
+              >= 
PipeConfig.getInstance().getPipeSinkSubtaskSleepIntervalInitMs());
+    }
+  }
+}

Reply via email to