This is an automated email from the ASF dual-hosted git repository.

rong pushed a commit to branch dev/1.3
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/dev/1.3 by this push:
     new 5aa3f1ae9ce Pipe: Fix the deadlock of PeriodicalJob thread caused by 
using parallelStream to split restartAllStuckPipes' subtasks (#14392) (#14402)
5aa3f1ae9ce is described below

commit 5aa3f1ae9ceaf2e2f886a6cf7ad07673c4338d05
Author: Zhenyu Luo <[email protected]>
AuthorDate: Thu Dec 12 17:39:56 2024 +0800

    Pipe: Fix the deadlock of PeriodicalJob thread caused by using 
parallelStream to split restartAllStuckPipes' subtasks (#14392) (#14402)
    
    Co-authored-by: Steve Yurong Su <[email protected]>
---
 .../apache/iotdb/db/pipe/agent/task/PipeDataNodeTaskAgent.java   | 9 +++++++--
 1 file changed, 7 insertions(+), 2 deletions(-)

diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/PipeDataNodeTaskAgent.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/PipeDataNodeTaskAgent.java
index d730e22694e..836557c0231 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/PipeDataNodeTaskAgent.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/PipeDataNodeTaskAgent.java
@@ -486,8 +486,13 @@ public class PipeDataNodeTaskAgent extends PipeTaskAgent {
       releaseWriteLock();
     }
 
-    // Restart all stuck pipes
-    stuckPipes.parallelStream().forEach(this::restartStuckPipe);
+    // Restart all stuck pipes.
+    // Note that parallelStream cannot be used here. The method 
PipeTaskAgent#dropPipe also uses
+    // parallelStream. If parallelStream is used here, the subtasks generated 
inside the dropPipe
+    // may not be scheduled by the worker thread of ForkJoinPool because of 
less available threads,
+    // and the parent task will wait for the completion of the subtasks in 
ForkJoinPool forever,
+    // causing the deadlock.
+    stuckPipes.forEach(this::restartStuckPipe);
   }
 
   private Set<PipeMeta> findAllStuckPipes() {

Reply via email to