This is an automated email from the ASF dual-hosted git repository.

justinchen pushed a commit to branch md
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/md by this push:
     new 77d7af276a1 ex-t
77d7af276a1 is described below

commit 77d7af276a17618d239df4b4fd8f4d007d8bc8ed
Author: Caideyipi <[email protected]>
AuthorDate: Tue Jan 20 10:05:15 2026 +0800

    ex-t
---
 .../pipe/agent/task/PipeConfigNodeSubtask.java     |  1 +
 .../subtask/processor/PipeProcessorSubtask.java    |  1 +
 .../agent/task/subtask/sink/PipeSinkSubtask.java   | 38 ++++++++++++++--------
 .../agent/task/subtask/PipeReportableSubtask.java  |  1 +
 .../pipe/receiver/PipeReceiverStatusHandler.java   | 12 +++----
 5 files changed, 33 insertions(+), 20 deletions(-)

diff --git 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/agent/task/PipeConfigNodeSubtask.java
 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/agent/task/PipeConfigNodeSubtask.java
index e922c23321a..a4d0e976562 100644
--- 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/agent/task/PipeConfigNodeSubtask.java
+++ 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/agent/task/PipeConfigNodeSubtask.java
@@ -259,6 +259,7 @@ public class PipeConfigNodeSubtask extends 
PipeAbstractSinkSubtask {
 
   @Override
   protected void report(final EnrichedEvent event, final PipeRuntimeException 
exception) {
+    lastExceptionTime = Long.MIN_VALUE;
     PipeConfigNodeAgent.runtime().report(event, exception);
   }
 
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/subtask/processor/PipeProcessorSubtask.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/subtask/processor/PipeProcessorSubtask.java
index b481117c5e0..983b5a51b90 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/subtask/processor/PipeProcessorSubtask.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/subtask/processor/PipeProcessorSubtask.java
@@ -342,6 +342,7 @@ public class PipeProcessorSubtask extends 
PipeReportableSubtask {
 
   @Override
   protected void report(final EnrichedEvent event, final PipeRuntimeException 
exception) {
+    lastExceptionTime = Long.MIN_VALUE;
     PipeDataNodeAgent.runtime().report(event, exception);
   }
 }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/subtask/sink/PipeSinkSubtask.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/subtask/sink/PipeSinkSubtask.java
index a46e174ef12..83228eb39f1 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/subtask/sink/PipeSinkSubtask.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/subtask/sink/PipeSinkSubtask.java
@@ -19,8 +19,8 @@
 
 package org.apache.iotdb.db.pipe.agent.task.subtask.sink;
 
-import org.apache.iotdb.commons.exception.pipe.PipeNonReportException;
 import org.apache.iotdb.commons.exception.pipe.PipeRuntimeException;
+import 
org.apache.iotdb.commons.exception.pipe.PipeRuntimeSinkNonReportTimeConfigurableException;
 import 
org.apache.iotdb.commons.pipe.agent.task.connection.UnboundedBlockingPendingQueue;
 import 
org.apache.iotdb.commons.pipe.agent.task.subtask.PipeAbstractSinkSubtask;
 import org.apache.iotdb.commons.pipe.config.PipeConfig;
@@ -66,7 +66,6 @@ public class PipeSinkSubtask extends PipeAbstractSinkSubtask {
   // when no event can be pulled.
   public static final PipeHeartbeatEvent CRON_HEARTBEAT_EVENT =
       new PipeHeartbeatEvent("cron", false);
-  private long lastExceptionTime = Long.MIN_VALUE;
 
   public PipeSinkSubtask(
       final String taskID,
@@ -135,19 +134,17 @@ public class PipeSinkSubtask extends 
PipeAbstractSinkSubtask {
 
       decreaseReferenceCountAndReleaseLastEvent(event, true);
       sleepInterval = 
PipeConfig.getInstance().getPipeSinkSubtaskSleepIntervalInitMs();
-    } catch (final PipeNonReportException e) {
-      sleep4NonReportException();
-    } catch (final PipeException e) {
-      if (!isClosed.get()) {
-        setLastExceptionEvent(event);
-        throw e;
-      } else {
-        LOGGER.info(
-            "{} in pipe transfer, ignored because the connector subtask is 
dropped.{}",
-            e.getClass().getSimpleName(),
-            e.getMessage() != null ? " Message: " + e.getMessage() : "");
-        clearReferenceCountAndReleaseLastEvent(event);
+    } catch (final PipeRuntimeSinkNonReportTimeConfigurableException e) {
+      if (lastExceptionTime == Long.MIN_VALUE) {
+        lastExceptionTime = System.currentTimeMillis();
       }
+      if (System.currentTimeMillis() - lastExceptionTime < e.getInterval()) {
+        sleep4NonReportException();
+        return true;
+      }
+      handlePipeException(event, e);
+    } catch (final PipeException e) {
+      handlePipeException(event, e);
     } catch (final Exception e) {
       if (!isClosed.get()) {
         setLastExceptionEvent(event);
@@ -171,6 +168,19 @@ public class PipeSinkSubtask extends 
PipeAbstractSinkSubtask {
     return true;
   }
 
+  private void handlePipeException(final Event event, final PipeException e) {
+    if (!isClosed.get()) {
+      setLastExceptionEvent(event);
+      throw e;
+    } else {
+      LOGGER.info(
+          "{} in pipe transfer, ignored because the connector subtask is 
dropped.{}",
+          e.getClass().getSimpleName(),
+          e.getMessage() != null ? " Message: " + e.getMessage() : "");
+      clearReferenceCountAndReleaseLastEvent(event);
+    }
+  }
+
   private void transferHeartbeatEvent(final PipeHeartbeatEvent event) {
     // DO NOT call heartbeat or transfer after closed, or will cause 
connection leak
     if (isClosed.get()) {
diff --git 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/subtask/PipeReportableSubtask.java
 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/subtask/PipeReportableSubtask.java
index f290f8c4965..ee274cf69ed 100644
--- 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/subtask/PipeReportableSubtask.java
+++ 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/subtask/PipeReportableSubtask.java
@@ -36,6 +36,7 @@ public abstract class PipeReportableSubtask extends 
PipeSubtask {
   // To ensure that high-priority tasks can obtain object locks first, a 
counter is now used to save
   // the number of high-priority tasks.
   protected final AtomicLong highPriorityLockTaskCount = new AtomicLong(0);
+  protected long lastExceptionTime = Long.MIN_VALUE;
 
   protected PipeReportableSubtask(final String taskID, final long 
creationTime) {
     super(taskID, creationTime);
diff --git 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/receiver/PipeReceiverStatusHandler.java
 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/receiver/PipeReceiverStatusHandler.java
index e20ee03c275..b713a084a82 100644
--- 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/receiver/PipeReceiverStatusHandler.java
+++ 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/receiver/PipeReceiverStatusHandler.java
@@ -21,7 +21,6 @@ package org.apache.iotdb.commons.pipe.receiver;
 
 import org.apache.iotdb.common.rpc.thrift.TSStatus;
 import 
org.apache.iotdb.commons.exception.pipe.PipeConsensusRetryWithIncreasingIntervalException;
-import org.apache.iotdb.commons.exception.pipe.PipeNonReportException;
 import 
org.apache.iotdb.commons.exception.pipe.PipeRuntimeSinkNonReportTimeConfigurableException;
 import org.apache.iotdb.commons.pipe.config.PipeConfig;
 import org.apache.iotdb.commons.pipe.resource.log.PipeLogger;
@@ -181,11 +180,12 @@ public class PipeReceiverStatusHandler {
                       + " seconds",
               status);
           exceptionEventHasBeenRetried.set(true);
-          throw status.getCode() == 1815
-                  && 
PipeConfig.getInstance().isPipeRetryLocallyForParallelOrUserConflict()
-              ? new PipeNonReportException(exceptionMessage)
-              : new PipeRuntimeSinkNonReportTimeConfigurableException(
-                  exceptionMessage, retryMaxMillisWhenConflictOccurs);
+          throw new PipeRuntimeSinkNonReportTimeConfigurableException(
+              exceptionMessage,
+              status.getCode() == 1815
+                      && 
PipeConfig.getInstance().isPipeRetryLocallyForParallelOrUserConflict()
+                  ? Long.MAX_VALUE
+                  : retryMaxMillisWhenConflictOccurs);
         }
 
       case 803: // NO_PERMISSION

Reply via email to