This is an automated email from the ASF dual-hosted git repository.
justinchen pushed a commit to branch md
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/md by this push:
new 77d7af276a1 ex-t
77d7af276a1 is described below
commit 77d7af276a17618d239df4b4fd8f4d007d8bc8ed
Author: Caideyipi <[email protected]>
AuthorDate: Tue Jan 20 10:05:15 2026 +0800
ex-t
---
.../pipe/agent/task/PipeConfigNodeSubtask.java | 1 +
.../subtask/processor/PipeProcessorSubtask.java | 1 +
.../agent/task/subtask/sink/PipeSinkSubtask.java | 38 ++++++++++++++--------
.../agent/task/subtask/PipeReportableSubtask.java | 1 +
.../pipe/receiver/PipeReceiverStatusHandler.java | 12 +++----
5 files changed, 33 insertions(+), 20 deletions(-)
diff --git
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/agent/task/PipeConfigNodeSubtask.java
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/agent/task/PipeConfigNodeSubtask.java
index e922c23321a..a4d0e976562 100644
---
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/agent/task/PipeConfigNodeSubtask.java
+++
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/agent/task/PipeConfigNodeSubtask.java
@@ -259,6 +259,7 @@ public class PipeConfigNodeSubtask extends
PipeAbstractSinkSubtask {
@Override
protected void report(final EnrichedEvent event, final PipeRuntimeException
exception) {
+ lastExceptionTime = Long.MIN_VALUE;
PipeConfigNodeAgent.runtime().report(event, exception);
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/subtask/processor/PipeProcessorSubtask.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/subtask/processor/PipeProcessorSubtask.java
index b481117c5e0..983b5a51b90 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/subtask/processor/PipeProcessorSubtask.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/subtask/processor/PipeProcessorSubtask.java
@@ -342,6 +342,7 @@ public class PipeProcessorSubtask extends
PipeReportableSubtask {
@Override
protected void report(final EnrichedEvent event, final PipeRuntimeException
exception) {
+ lastExceptionTime = Long.MIN_VALUE;
PipeDataNodeAgent.runtime().report(event, exception);
}
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/subtask/sink/PipeSinkSubtask.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/subtask/sink/PipeSinkSubtask.java
index a46e174ef12..83228eb39f1 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/subtask/sink/PipeSinkSubtask.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/subtask/sink/PipeSinkSubtask.java
@@ -19,8 +19,8 @@
package org.apache.iotdb.db.pipe.agent.task.subtask.sink;
-import org.apache.iotdb.commons.exception.pipe.PipeNonReportException;
import org.apache.iotdb.commons.exception.pipe.PipeRuntimeException;
+import
org.apache.iotdb.commons.exception.pipe.PipeRuntimeSinkNonReportTimeConfigurableException;
import
org.apache.iotdb.commons.pipe.agent.task.connection.UnboundedBlockingPendingQueue;
import
org.apache.iotdb.commons.pipe.agent.task.subtask.PipeAbstractSinkSubtask;
import org.apache.iotdb.commons.pipe.config.PipeConfig;
@@ -66,7 +66,6 @@ public class PipeSinkSubtask extends PipeAbstractSinkSubtask {
// when no event can be pulled.
public static final PipeHeartbeatEvent CRON_HEARTBEAT_EVENT =
new PipeHeartbeatEvent("cron", false);
- private long lastExceptionTime = Long.MIN_VALUE;
public PipeSinkSubtask(
final String taskID,
@@ -135,19 +134,17 @@ public class PipeSinkSubtask extends
PipeAbstractSinkSubtask {
decreaseReferenceCountAndReleaseLastEvent(event, true);
sleepInterval =
PipeConfig.getInstance().getPipeSinkSubtaskSleepIntervalInitMs();
- } catch (final PipeNonReportException e) {
- sleep4NonReportException();
- } catch (final PipeException e) {
- if (!isClosed.get()) {
- setLastExceptionEvent(event);
- throw e;
- } else {
- LOGGER.info(
- "{} in pipe transfer, ignored because the connector subtask is
dropped.{}",
- e.getClass().getSimpleName(),
- e.getMessage() != null ? " Message: " + e.getMessage() : "");
- clearReferenceCountAndReleaseLastEvent(event);
+ } catch (final PipeRuntimeSinkNonReportTimeConfigurableException e) {
+ if (lastExceptionTime == Long.MIN_VALUE) {
+ lastExceptionTime = System.currentTimeMillis();
}
+ if (System.currentTimeMillis() - lastExceptionTime < e.getInterval()) {
+ sleep4NonReportException();
+ return true;
+ }
+ handlePipeException(event, e);
+ } catch (final PipeException e) {
+ handlePipeException(event, e);
} catch (final Exception e) {
if (!isClosed.get()) {
setLastExceptionEvent(event);
@@ -171,6 +168,19 @@ public class PipeSinkSubtask extends
PipeAbstractSinkSubtask {
return true;
}
+ private void handlePipeException(final Event event, final PipeException e) {
+ if (!isClosed.get()) {
+ setLastExceptionEvent(event);
+ throw e;
+ } else {
+ LOGGER.info(
+ "{} in pipe transfer, ignored because the connector subtask is
dropped.{}",
+ e.getClass().getSimpleName(),
+ e.getMessage() != null ? " Message: " + e.getMessage() : "");
+ clearReferenceCountAndReleaseLastEvent(event);
+ }
+ }
+
private void transferHeartbeatEvent(final PipeHeartbeatEvent event) {
// DO NOT call heartbeat or transfer after closed, or will cause
connection leak
if (isClosed.get()) {
diff --git
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/subtask/PipeReportableSubtask.java
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/subtask/PipeReportableSubtask.java
index f290f8c4965..ee274cf69ed 100644
---
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/subtask/PipeReportableSubtask.java
+++
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/subtask/PipeReportableSubtask.java
@@ -36,6 +36,7 @@ public abstract class PipeReportableSubtask extends
PipeSubtask {
// To ensure that high-priority tasks can obtain object locks first, a
counter is now used to save
// the number of high-priority tasks.
protected final AtomicLong highPriorityLockTaskCount = new AtomicLong(0);
+ protected long lastExceptionTime = Long.MIN_VALUE;
protected PipeReportableSubtask(final String taskID, final long
creationTime) {
super(taskID, creationTime);
diff --git
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/receiver/PipeReceiverStatusHandler.java
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/receiver/PipeReceiverStatusHandler.java
index e20ee03c275..b713a084a82 100644
---
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/receiver/PipeReceiverStatusHandler.java
+++
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/receiver/PipeReceiverStatusHandler.java
@@ -21,7 +21,6 @@ package org.apache.iotdb.commons.pipe.receiver;
import org.apache.iotdb.common.rpc.thrift.TSStatus;
import
org.apache.iotdb.commons.exception.pipe.PipeConsensusRetryWithIncreasingIntervalException;
-import org.apache.iotdb.commons.exception.pipe.PipeNonReportException;
import
org.apache.iotdb.commons.exception.pipe.PipeRuntimeSinkNonReportTimeConfigurableException;
import org.apache.iotdb.commons.pipe.config.PipeConfig;
import org.apache.iotdb.commons.pipe.resource.log.PipeLogger;
@@ -181,11 +180,12 @@ public class PipeReceiverStatusHandler {
+ " seconds",
status);
exceptionEventHasBeenRetried.set(true);
- throw status.getCode() == 1815
- &&
PipeConfig.getInstance().isPipeRetryLocallyForParallelOrUserConflict()
- ? new PipeNonReportException(exceptionMessage)
- : new PipeRuntimeSinkNonReportTimeConfigurableException(
- exceptionMessage, retryMaxMillisWhenConflictOccurs);
+ throw new PipeRuntimeSinkNonReportTimeConfigurableException(
+ exceptionMessage,
+ status.getCode() == 1815
+ &&
PipeConfig.getInstance().isPipeRetryLocallyForParallelOrUserConflict()
+ ? Long.MAX_VALUE
+ : retryMaxMillisWhenConflictOccurs);
}
case 803: // NO_PERMISSION