This is an automated email from the ASF dual-hosted git repository.

justinchen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/master by this push:
     new 19834e45cd8 Pipe: Fixed the concurrency bug of stop / start (#16461)
19834e45cd8 is described below

commit 19834e45cd8c22f25c3c1f6237efc66f85f7e2cf
Author: Caideyipi <[email protected]>
AuthorDate: Tue Sep 23 14:07:48 2025 +0800

    Pipe: Fixed the concurrency bug of stop / start (#16461)
---
 .../iotdb/db/pipe/agent/runtime/PipeDataNodeRuntimeAgent.java      | 5 ++---
 .../org/apache/iotdb/db/pipe/agent/task/PipeDataNodeTaskAgent.java | 7 +++++--
 .../org/apache/iotdb/commons/pipe/agent/task/PipeTaskAgent.java    | 6 +++++-
 3 files changed, 12 insertions(+), 6 deletions(-)

diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/runtime/PipeDataNodeRuntimeAgent.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/runtime/PipeDataNodeRuntimeAgent.java
index 5884ca94b23..d9a2347acec 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/runtime/PipeDataNodeRuntimeAgent.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/runtime/PipeDataNodeRuntimeAgent.java
@@ -225,12 +225,11 @@ public class PipeDataNodeRuntimeAgent implements IService 
{
         pipeRuntimeException.getMessage(),
         pipeRuntimeException);
 
-    pipeTaskMeta.trackExceptionMessage(pipeRuntimeException);
-
     // Quick stop all pipes locally if critical exception occurs,
     // no need to wait for the next heartbeat cycle.
     if (pipeRuntimeException instanceof PipeRuntimeCriticalException) {
-      PipeDataNodeAgent.task().stopAllPipesWithCriticalException();
+      PipeDataNodeAgent.task()
+          .stopAllPipesWithCriticalExceptionAndTrackException(pipeTaskMeta, 
pipeRuntimeException);
     }
   }
 
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/PipeDataNodeTaskAgent.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/PipeDataNodeTaskAgent.java
index 728e2801c8f..c4044a7213e 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/PipeDataNodeTaskAgent.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/PipeDataNodeTaskAgent.java
@@ -30,6 +30,7 @@ import org.apache.iotdb.commons.consensus.SchemaRegionId;
 import org.apache.iotdb.commons.consensus.index.ProgressIndex;
 import org.apache.iotdb.commons.consensus.index.impl.MetaProgressIndex;
 import org.apache.iotdb.commons.exception.IllegalPathException;
+import org.apache.iotdb.commons.exception.pipe.PipeRuntimeException;
 import org.apache.iotdb.commons.pipe.agent.plugin.builtin.BuiltinPipePlugin;
 import org.apache.iotdb.commons.pipe.agent.task.PipeTask;
 import org.apache.iotdb.commons.pipe.agent.task.PipeTaskAgent;
@@ -373,8 +374,10 @@ public class PipeDataNodeTaskAgent extends PipeTaskAgent {
     return true;
   }
 
-  public void stopAllPipesWithCriticalException() {
-    super.stopAllPipesWithCriticalException(CONFIG.getDataNodeId());
+  public void stopAllPipesWithCriticalExceptionAndTrackException(
+      final PipeTaskMeta pipeTaskMeta, final PipeRuntimeException 
pipeRuntimeException) {
+    super.stopAllPipesWithCriticalException(
+        CONFIG.getDataNodeId(), pipeTaskMeta, pipeRuntimeException);
   }
 
   ///////////////////////// Heartbeat /////////////////////////
diff --git 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/PipeTaskAgent.java
 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/PipeTaskAgent.java
index 88ea0941443..dffb0c47de1 100644
--- 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/PipeTaskAgent.java
+++ 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/PipeTaskAgent.java
@@ -951,7 +951,10 @@ public abstract class PipeTaskAgent {
    * Using try lock method to prevent deadlock when stopping all pipes with 
critical exceptions and
    * {@link PipeTaskAgent#handlePipeMetaChanges(List)}} concurrently.
    */
-  protected void stopAllPipesWithCriticalException(final int currentNodeId) {
+  protected void stopAllPipesWithCriticalException(
+      final int currentNodeId,
+      final PipeTaskMeta pipeTaskMeta,
+      final PipeRuntimeException pipeRuntimeException) {
     // To avoid deadlock, we use a new thread to stop all pipes.
     CompletableFuture.runAsync(
         () -> {
@@ -960,6 +963,7 @@ public abstract class PipeTaskAgent {
             while (true) {
               if (tryWriteLockWithTimeOut(5)) {
                 try {
+                  pipeTaskMeta.trackExceptionMessage(pipeRuntimeException);
                   stopAllPipesWithCriticalExceptionInternal(currentNodeId);
                   LOGGER.info("Stopped all pipes with critical exception.");
                   return;

Reply via email to