This is an automated email from the ASF dual-hosted git repository.
justinchen pushed a commit to branch md
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/md by this push:
new e0cebcef705 fix
e0cebcef705 is described below
commit e0cebcef70590805e51b98966bb80d1ca936f226
Author: Caideyipi <[email protected]>
AuthorDate: Tue Jan 20 10:13:19 2026 +0800
fix
---
.../pipe/agent/task/PipeConfigNodeSubtask.java | 26 ++++++++++------------
.../subtask/processor/PipeProcessorSubtask.java | 1 -
.../agent/task/subtask/sink/PipeSinkSubtask.java | 16 ++-----------
.../task/subtask/PipeAbstractSinkSubtask.java | 15 +++++++++++++
.../agent/task/subtask/PipeReportableSubtask.java | 1 -
5 files changed, 29 insertions(+), 30 deletions(-)
diff --git
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/agent/task/PipeConfigNodeSubtask.java
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/agent/task/PipeConfigNodeSubtask.java
index a4d0e976562..20e7d6f10cb 100644
---
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/agent/task/PipeConfigNodeSubtask.java
+++
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/agent/task/PipeConfigNodeSubtask.java
@@ -19,8 +19,8 @@
package org.apache.iotdb.confignode.manager.pipe.agent.task;
-import org.apache.iotdb.commons.exception.pipe.PipeNonReportException;
import org.apache.iotdb.commons.exception.pipe.PipeRuntimeException;
+import
org.apache.iotdb.commons.exception.pipe.PipeRuntimeSinkNonReportTimeConfigurableException;
import org.apache.iotdb.commons.pipe.agent.plugin.builtin.BuiltinPipePlugin;
import org.apache.iotdb.commons.pipe.agent.task.meta.PipeTaskMeta;
import
org.apache.iotdb.commons.pipe.agent.task.progress.PipeEventCommitManager;
@@ -191,19 +191,17 @@ public class PipeConfigNodeSubtask extends
PipeAbstractSinkSubtask {
}
decreaseReferenceCountAndReleaseLastEvent(event, true);
sleepInterval =
PipeConfig.getInstance().getPipeSinkSubtaskSleepIntervalInitMs();
- } catch (final PipeNonReportException e) {
- sleep4NonReportException();
- } catch (final PipeException e) {
- setLastExceptionEvent(event);
- if (!isClosed.get()) {
- throw e;
- } else {
- LOGGER.info(
- "{} in pipe transfer, ignored because pipe is dropped.",
- e.getClass().getSimpleName(),
- e);
- clearReferenceCountAndReleaseLastEvent(event);
+ } catch (final PipeRuntimeSinkNonReportTimeConfigurableException e) {
+ if (lastExceptionTime == Long.MAX_VALUE) {
+ lastExceptionTime = System.currentTimeMillis();
}
+ if (System.currentTimeMillis() - lastExceptionTime < e.getInterval()) {
+ sleep4NonReportException();
+ return true;
+ }
+ handlePipeException(event, e);
+ } catch (final PipeException e) {
+ handlePipeException(event, e);
} catch (final Exception e) {
setLastExceptionEvent(event);
if (!isClosed.get()) {
@@ -259,7 +257,7 @@ public class PipeConfigNodeSubtask extends
PipeAbstractSinkSubtask {
@Override
protected void report(final EnrichedEvent event, final PipeRuntimeException
exception) {
- lastExceptionTime = Long.MIN_VALUE;
+ lastExceptionTime = Long.MAX_VALUE;
PipeConfigNodeAgent.runtime().report(event, exception);
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/subtask/processor/PipeProcessorSubtask.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/subtask/processor/PipeProcessorSubtask.java
index 983b5a51b90..b481117c5e0 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/subtask/processor/PipeProcessorSubtask.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/subtask/processor/PipeProcessorSubtask.java
@@ -342,7 +342,6 @@ public class PipeProcessorSubtask extends
PipeReportableSubtask {
@Override
protected void report(final EnrichedEvent event, final PipeRuntimeException
exception) {
- lastExceptionTime = Long.MIN_VALUE;
PipeDataNodeAgent.runtime().report(event, exception);
}
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/subtask/sink/PipeSinkSubtask.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/subtask/sink/PipeSinkSubtask.java
index 83228eb39f1..37a43eea62f 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/subtask/sink/PipeSinkSubtask.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/subtask/sink/PipeSinkSubtask.java
@@ -135,7 +135,7 @@ public class PipeSinkSubtask extends
PipeAbstractSinkSubtask {
decreaseReferenceCountAndReleaseLastEvent(event, true);
sleepInterval =
PipeConfig.getInstance().getPipeSinkSubtaskSleepIntervalInitMs();
} catch (final PipeRuntimeSinkNonReportTimeConfigurableException e) {
- if (lastExceptionTime == Long.MIN_VALUE) {
+ if (lastExceptionTime == Long.MAX_VALUE) {
lastExceptionTime = System.currentTimeMillis();
}
if (System.currentTimeMillis() - lastExceptionTime < e.getInterval()) {
@@ -168,19 +168,6 @@ public class PipeSinkSubtask extends
PipeAbstractSinkSubtask {
return true;
}
- private void handlePipeException(final Event event, final PipeException e) {
- if (!isClosed.get()) {
- setLastExceptionEvent(event);
- throw e;
- } else {
- LOGGER.info(
- "{} in pipe transfer, ignored because the connector subtask is
dropped.{}",
- e.getClass().getSimpleName(),
- e.getMessage() != null ? " Message: " + e.getMessage() : "");
- clearReferenceCountAndReleaseLastEvent(event);
- }
- }
-
private void transferHeartbeatEvent(final PipeHeartbeatEvent event) {
// DO NOT call heartbeat or transfer after closed, or will cause
connection leak
if (isClosed.get()) {
@@ -393,6 +380,7 @@ public class PipeSinkSubtask extends
PipeAbstractSinkSubtask {
@Override
protected void report(final EnrichedEvent event, final PipeRuntimeException
exception) {
+ lastExceptionTime = Long.MAX_VALUE;
PipeDataNodeAgent.runtime().report(event, exception);
}
}
diff --git
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/subtask/PipeAbstractSinkSubtask.java
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/subtask/PipeAbstractSinkSubtask.java
index 09083d45bca..59ea95a9338 100644
---
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/subtask/PipeAbstractSinkSubtask.java
+++
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/subtask/PipeAbstractSinkSubtask.java
@@ -26,6 +26,7 @@ import org.apache.iotdb.commons.pipe.event.EnrichedEvent;
import org.apache.iotdb.pipe.api.PipeConnector;
import org.apache.iotdb.pipe.api.event.Event;
import org.apache.iotdb.pipe.api.exception.PipeConnectionException;
+import org.apache.iotdb.pipe.api.exception.PipeException;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
@@ -54,6 +55,7 @@ public abstract class PipeAbstractSinkSubtask extends
PipeReportableSubtask {
protected volatile Event lastExceptionEvent;
protected long sleepInterval =
PipeConfig.getInstance().getPipeSinkSubtaskSleepIntervalInitMs();
+ protected long lastExceptionTime = Long.MAX_VALUE;
protected PipeAbstractSinkSubtask(
final String taskID, final long creationTime, final PipeConnector
outputPipeSink) {
@@ -261,4 +263,17 @@ public abstract class PipeAbstractSinkSubtask extends
PipeReportableSubtask {
Thread.currentThread().interrupt();
}
}
+
+ protected void handlePipeException(final Event event, final PipeException e)
{
+ if (!isClosed.get()) {
+ setLastExceptionEvent(event);
+ throw e;
+ } else {
+ LOGGER.info(
+ "{} in pipe transfer, ignored because the connector subtask is
dropped.{}",
+ e.getClass().getSimpleName(),
+ e.getMessage() != null ? " Message: " + e.getMessage() : "");
+ clearReferenceCountAndReleaseLastEvent(event);
+ }
+ }
}
diff --git
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/subtask/PipeReportableSubtask.java
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/subtask/PipeReportableSubtask.java
index ee274cf69ed..f290f8c4965 100644
---
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/subtask/PipeReportableSubtask.java
+++
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/subtask/PipeReportableSubtask.java
@@ -36,7 +36,6 @@ public abstract class PipeReportableSubtask extends
PipeSubtask {
// To ensure that high-priority tasks can obtain object locks first, a
counter is now used to save
// the number of high-priority tasks.
protected final AtomicLong highPriorityLockTaskCount = new AtomicLong(0);
- protected long lastExceptionTime = Long.MIN_VALUE;
protected PipeReportableSubtask(final String taskID, final long
creationTime) {
super(taskID, creationTime);