This is an automated email from the ASF dual-hosted git repository.

justinchen pushed a commit to branch md
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/md by this push:
     new e0cebcef705 fix
e0cebcef705 is described below

commit e0cebcef70590805e51b98966bb80d1ca936f226
Author: Caideyipi <[email protected]>
AuthorDate: Tue Jan 20 10:13:19 2026 +0800

    fix
---
 .../pipe/agent/task/PipeConfigNodeSubtask.java     | 26 ++++++++++------------
 .../subtask/processor/PipeProcessorSubtask.java    |  1 -
 .../agent/task/subtask/sink/PipeSinkSubtask.java   | 16 ++-----------
 .../task/subtask/PipeAbstractSinkSubtask.java      | 15 +++++++++++++
 .../agent/task/subtask/PipeReportableSubtask.java  |  1 -
 5 files changed, 29 insertions(+), 30 deletions(-)

diff --git 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/agent/task/PipeConfigNodeSubtask.java
 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/agent/task/PipeConfigNodeSubtask.java
index a4d0e976562..20e7d6f10cb 100644
--- 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/agent/task/PipeConfigNodeSubtask.java
+++ 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/agent/task/PipeConfigNodeSubtask.java
@@ -19,8 +19,8 @@
 
 package org.apache.iotdb.confignode.manager.pipe.agent.task;
 
-import org.apache.iotdb.commons.exception.pipe.PipeNonReportException;
 import org.apache.iotdb.commons.exception.pipe.PipeRuntimeException;
+import 
org.apache.iotdb.commons.exception.pipe.PipeRuntimeSinkNonReportTimeConfigurableException;
 import org.apache.iotdb.commons.pipe.agent.plugin.builtin.BuiltinPipePlugin;
 import org.apache.iotdb.commons.pipe.agent.task.meta.PipeTaskMeta;
 import 
org.apache.iotdb.commons.pipe.agent.task.progress.PipeEventCommitManager;
@@ -191,19 +191,17 @@ public class PipeConfigNodeSubtask extends 
PipeAbstractSinkSubtask {
       }
       decreaseReferenceCountAndReleaseLastEvent(event, true);
       sleepInterval = 
PipeConfig.getInstance().getPipeSinkSubtaskSleepIntervalInitMs();
-    } catch (final PipeNonReportException e) {
-      sleep4NonReportException();
-    } catch (final PipeException e) {
-      setLastExceptionEvent(event);
-      if (!isClosed.get()) {
-        throw e;
-      } else {
-        LOGGER.info(
-            "{} in pipe transfer, ignored because pipe is dropped.",
-            e.getClass().getSimpleName(),
-            e);
-        clearReferenceCountAndReleaseLastEvent(event);
+    } catch (final PipeRuntimeSinkNonReportTimeConfigurableException e) {
+      if (lastExceptionTime == Long.MAX_VALUE) {
+        lastExceptionTime = System.currentTimeMillis();
       }
+      if (System.currentTimeMillis() - lastExceptionTime < e.getInterval()) {
+        sleep4NonReportException();
+        return true;
+      }
+      handlePipeException(event, e);
+    } catch (final PipeException e) {
+      handlePipeException(event, e);
     } catch (final Exception e) {
       setLastExceptionEvent(event);
       if (!isClosed.get()) {
@@ -259,7 +257,7 @@ public class PipeConfigNodeSubtask extends 
PipeAbstractSinkSubtask {
 
   @Override
   protected void report(final EnrichedEvent event, final PipeRuntimeException 
exception) {
-    lastExceptionTime = Long.MIN_VALUE;
+    lastExceptionTime = Long.MAX_VALUE;
     PipeConfigNodeAgent.runtime().report(event, exception);
   }
 
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/subtask/processor/PipeProcessorSubtask.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/subtask/processor/PipeProcessorSubtask.java
index 983b5a51b90..b481117c5e0 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/subtask/processor/PipeProcessorSubtask.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/subtask/processor/PipeProcessorSubtask.java
@@ -342,7 +342,6 @@ public class PipeProcessorSubtask extends 
PipeReportableSubtask {
 
   @Override
   protected void report(final EnrichedEvent event, final PipeRuntimeException 
exception) {
-    lastExceptionTime = Long.MIN_VALUE;
     PipeDataNodeAgent.runtime().report(event, exception);
   }
 }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/subtask/sink/PipeSinkSubtask.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/subtask/sink/PipeSinkSubtask.java
index 83228eb39f1..37a43eea62f 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/subtask/sink/PipeSinkSubtask.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/subtask/sink/PipeSinkSubtask.java
@@ -135,7 +135,7 @@ public class PipeSinkSubtask extends 
PipeAbstractSinkSubtask {
       decreaseReferenceCountAndReleaseLastEvent(event, true);
       sleepInterval = 
PipeConfig.getInstance().getPipeSinkSubtaskSleepIntervalInitMs();
     } catch (final PipeRuntimeSinkNonReportTimeConfigurableException e) {
-      if (lastExceptionTime == Long.MIN_VALUE) {
+      if (lastExceptionTime == Long.MAX_VALUE) {
         lastExceptionTime = System.currentTimeMillis();
       }
       if (System.currentTimeMillis() - lastExceptionTime < e.getInterval()) {
@@ -168,19 +168,6 @@ public class PipeSinkSubtask extends 
PipeAbstractSinkSubtask {
     return true;
   }
 
-  private void handlePipeException(final Event event, final PipeException e) {
-    if (!isClosed.get()) {
-      setLastExceptionEvent(event);
-      throw e;
-    } else {
-      LOGGER.info(
-          "{} in pipe transfer, ignored because the connector subtask is 
dropped.{}",
-          e.getClass().getSimpleName(),
-          e.getMessage() != null ? " Message: " + e.getMessage() : "");
-      clearReferenceCountAndReleaseLastEvent(event);
-    }
-  }
-
   private void transferHeartbeatEvent(final PipeHeartbeatEvent event) {
     // DO NOT call heartbeat or transfer after closed, or will cause 
connection leak
     if (isClosed.get()) {
@@ -393,6 +380,7 @@ public class PipeSinkSubtask extends 
PipeAbstractSinkSubtask {
 
   @Override
   protected void report(final EnrichedEvent event, final PipeRuntimeException 
exception) {
+    lastExceptionTime = Long.MAX_VALUE;
     PipeDataNodeAgent.runtime().report(event, exception);
   }
 }
diff --git 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/subtask/PipeAbstractSinkSubtask.java
 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/subtask/PipeAbstractSinkSubtask.java
index 09083d45bca..59ea95a9338 100644
--- 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/subtask/PipeAbstractSinkSubtask.java
+++ 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/subtask/PipeAbstractSinkSubtask.java
@@ -26,6 +26,7 @@ import org.apache.iotdb.commons.pipe.event.EnrichedEvent;
 import org.apache.iotdb.pipe.api.PipeConnector;
 import org.apache.iotdb.pipe.api.event.Event;
 import org.apache.iotdb.pipe.api.exception.PipeConnectionException;
+import org.apache.iotdb.pipe.api.exception.PipeException;
 
 import com.google.common.util.concurrent.Futures;
 import com.google.common.util.concurrent.ListenableFuture;
@@ -54,6 +55,7 @@ public abstract class PipeAbstractSinkSubtask extends 
PipeReportableSubtask {
   protected volatile Event lastExceptionEvent;
 
   protected long sleepInterval = 
PipeConfig.getInstance().getPipeSinkSubtaskSleepIntervalInitMs();
+  protected long lastExceptionTime = Long.MAX_VALUE;
 
   protected PipeAbstractSinkSubtask(
       final String taskID, final long creationTime, final PipeConnector 
outputPipeSink) {
@@ -261,4 +263,17 @@ public abstract class PipeAbstractSinkSubtask extends 
PipeReportableSubtask {
       Thread.currentThread().interrupt();
     }
   }
+
+  protected void handlePipeException(final Event event, final PipeException e) 
{
+    if (!isClosed.get()) {
+      setLastExceptionEvent(event);
+      throw e;
+    } else {
+      LOGGER.info(
+          "{} in pipe transfer, ignored because the connector subtask is 
dropped.{}",
+          e.getClass().getSimpleName(),
+          e.getMessage() != null ? " Message: " + e.getMessage() : "");
+      clearReferenceCountAndReleaseLastEvent(event);
+    }
+  }
 }
diff --git 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/subtask/PipeReportableSubtask.java
 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/subtask/PipeReportableSubtask.java
index ee274cf69ed..f290f8c4965 100644
--- 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/subtask/PipeReportableSubtask.java
+++ 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/subtask/PipeReportableSubtask.java
@@ -36,7 +36,6 @@ public abstract class PipeReportableSubtask extends 
PipeSubtask {
   // To ensure that high-priority tasks can obtain object locks first, a 
counter is now used to save
   // the number of high-priority tasks.
   protected final AtomicLong highPriorityLockTaskCount = new AtomicLong(0);
-  protected long lastExceptionTime = Long.MIN_VALUE;
 
   protected PipeReportableSubtask(final String taskID, final long 
creationTime) {
     super(taskID, creationTime);

Reply via email to