This is an automated email from the ASF dual-hosted git repository.
justinchen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new 19834e45cd8 Pipe: Fixed the concurrency bug of stop / start (#16461)
19834e45cd8 is described below
commit 19834e45cd8c22f25c3c1f6237efc66f85f7e2cf
Author: Caideyipi <[email protected]>
AuthorDate: Tue Sep 23 14:07:48 2025 +0800
Pipe: Fixed the concurrency bug of stop / start (#16461)
---
.../iotdb/db/pipe/agent/runtime/PipeDataNodeRuntimeAgent.java | 5 ++---
.../org/apache/iotdb/db/pipe/agent/task/PipeDataNodeTaskAgent.java | 7 +++++--
.../org/apache/iotdb/commons/pipe/agent/task/PipeTaskAgent.java | 6 +++++-
3 files changed, 12 insertions(+), 6 deletions(-)
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/runtime/PipeDataNodeRuntimeAgent.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/runtime/PipeDataNodeRuntimeAgent.java
index 5884ca94b23..d9a2347acec 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/runtime/PipeDataNodeRuntimeAgent.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/runtime/PipeDataNodeRuntimeAgent.java
@@ -225,12 +225,11 @@ public class PipeDataNodeRuntimeAgent implements IService
{
pipeRuntimeException.getMessage(),
pipeRuntimeException);
- pipeTaskMeta.trackExceptionMessage(pipeRuntimeException);
-
// Quick stop all pipes locally if critical exception occurs,
// no need to wait for the next heartbeat cycle.
if (pipeRuntimeException instanceof PipeRuntimeCriticalException) {
- PipeDataNodeAgent.task().stopAllPipesWithCriticalException();
+ PipeDataNodeAgent.task()
+ .stopAllPipesWithCriticalExceptionAndTrackException(pipeTaskMeta,
pipeRuntimeException);
}
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/PipeDataNodeTaskAgent.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/PipeDataNodeTaskAgent.java
index 728e2801c8f..c4044a7213e 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/PipeDataNodeTaskAgent.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/PipeDataNodeTaskAgent.java
@@ -30,6 +30,7 @@ import org.apache.iotdb.commons.consensus.SchemaRegionId;
import org.apache.iotdb.commons.consensus.index.ProgressIndex;
import org.apache.iotdb.commons.consensus.index.impl.MetaProgressIndex;
import org.apache.iotdb.commons.exception.IllegalPathException;
+import org.apache.iotdb.commons.exception.pipe.PipeRuntimeException;
import org.apache.iotdb.commons.pipe.agent.plugin.builtin.BuiltinPipePlugin;
import org.apache.iotdb.commons.pipe.agent.task.PipeTask;
import org.apache.iotdb.commons.pipe.agent.task.PipeTaskAgent;
@@ -373,8 +374,10 @@ public class PipeDataNodeTaskAgent extends PipeTaskAgent {
return true;
}
- public void stopAllPipesWithCriticalException() {
- super.stopAllPipesWithCriticalException(CONFIG.getDataNodeId());
+ public void stopAllPipesWithCriticalExceptionAndTrackException(
+ final PipeTaskMeta pipeTaskMeta, final PipeRuntimeException
pipeRuntimeException) {
+ super.stopAllPipesWithCriticalException(
+ CONFIG.getDataNodeId(), pipeTaskMeta, pipeRuntimeException);
}
///////////////////////// Heartbeat /////////////////////////
diff --git
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/PipeTaskAgent.java
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/PipeTaskAgent.java
index 88ea0941443..dffb0c47de1 100644
---
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/PipeTaskAgent.java
+++
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/PipeTaskAgent.java
@@ -951,7 +951,10 @@ public abstract class PipeTaskAgent {
* Using try lock method to prevent deadlock when stopping all pipes with
critical exceptions and
* {@link PipeTaskAgent#handlePipeMetaChanges(List)}} concurrently.
*/
- protected void stopAllPipesWithCriticalException(final int currentNodeId) {
+ protected void stopAllPipesWithCriticalException(
+ final int currentNodeId,
+ final PipeTaskMeta pipeTaskMeta,
+ final PipeRuntimeException pipeRuntimeException) {
// To avoid deadlock, we use a new thread to stop all pipes.
CompletableFuture.runAsync(
() -> {
@@ -960,6 +963,7 @@ public abstract class PipeTaskAgent {
while (true) {
if (tryWriteLockWithTimeOut(5)) {
try {
+ pipeTaskMeta.trackExceptionMessage(pipeRuntimeException);
stopAllPipesWithCriticalExceptionInternal(currentNodeId);
LOGGER.info("Stopped all pipes with critical exception.");
return;