This is an automated email from the ASF dual-hosted git repository.

rong pushed a commit to branch fix-pipe-deadlock
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit 8d0fadbbcb89e1dc7d6f0f24cfe6bab93fbd2dcf
Author: Steve Yurong Su <[email protected]>
AuthorDate: Sun Jan 14 22:42:00 2024 +0800

    Pipe: Fix deadlock when PipeTaskAgent.handlePipeMetaChanges and 
PipeTaskDataNodeAgent.stopAllPipesWithCriticalException are invoked concurrently
    
    Found one Java-level deadlock:
    =============================
    "pool-41-IoTDB-DataNodeInternalRPC-Processor-2":
      waiting to lock monitor 0x00000299461ba110 (object 0x000000054e91ea58, a 
org.apache.iotdb.db.pipe.task.subtask.connector.PipeConnectorSubtask),
      which is held by "pool-25-IoTDB-Pipe-SubTask-Callback-Executor-Pool-1"
    
    "pool-25-IoTDB-Pipe-SubTask-Callback-Executor-Pool-1":
      waiting to lock monitor 0x000002994731e980 (object 0x0000000404acc078, a 
org.apache.iotdb.db.pipe.agent.task.PipeTaskDataNodeAgent),
      which is held by "pool-41-IoTDB-DataNodeInternalRPC-Processor-2"
    
    Java stack information for the threads listed above:
    ===================================================
    "pool-41-IoTDB-DataNodeInternalRPC-Processor-2":
            at 
org.apache.iotdb.db.pipe.task.subtask.connector.PipeConnectorSubtask.submitSelf(PipeConnectorSubtask.java:297)
            - waiting to lock <0x000000054e91ea58> (a 
org.apache.iotdb.db.pipe.task.subtask.connector.PipeConnectorSubtask)
            at 
org.apache.iotdb.commons.pipe.execution.executor.PipeSubtaskExecutor.start(PipeSubtaskExecutor.java:90)
            - locked <0x000000054e91e840> (a 
org.apache.iotdb.db.pipe.execution.executor.dataregion.PipeDataRegionConnectorSubtaskExecutor)
            at 
org.apache.iotdb.db.pipe.task.subtask.connector.PipeConnectorSubtaskLifeCycle.start(PipeConnectorSubtaskLifeCycle.java:121)
            - locked <0x000000054e02eb90> (a 
org.apache.iotdb.db.pipe.task.subtask.connector.PipeConnectorSubtaskLifeCycle)
            at 
org.apache.iotdb.db.pipe.task.subtask.connector.PipeConnectorSubtaskManager.start(PipeConnectorSubtaskManager.java:155)
            - locked <0x000000054e02ea40> (a 
org.apache.iotdb.db.pipe.task.subtask.connector.PipeConnectorSubtaskManager)
            at 
org.apache.iotdb.db.pipe.task.stage.PipeTaskConnectorStage.startSubtask(PipeTaskConnectorStage.java:66)
            at 
org.apache.iotdb.commons.pipe.task.stage.PipeTaskStage.start(PipeTaskStage.java:86)
            - locked <0x000000054ee138f8> (a 
org.apache.iotdb.db.pipe.task.stage.PipeTaskConnectorStage)
            at 
org.apache.iotdb.db.pipe.task.PipeDataNodeTask.start(PipeDataNodeTask.java:67)
            at 
org.apache.iotdb.commons.pipe.agent.task.PipeTaskAgent.startPipe(PipeTaskAgent.java:513)
            at 
org.apache.iotdb.commons.pipe.agent.task.PipeTaskAgent.executeSinglePipeRuntimeMetaChanges(PipeTaskAgent.java:248)
            at 
org.apache.iotdb.commons.pipe.agent.task.PipeTaskAgent.executeSinglePipeMetaChanges(PipeTaskAgent.java:168)
            at 
org.apache.iotdb.commons.pipe.agent.task.PipeTaskAgent.handlePipeMetaChangesInternal(PipeTaskAgent.java:325)
            at 
org.apache.iotdb.commons.pipe.agent.task.PipeTaskAgent.handlePipeMetaChanges(PipeTaskAgent.java:306)
            - locked <0x0000000404acc078> (a 
org.apache.iotdb.db.pipe.agent.task.PipeTaskDataNodeAgent)
            at 
org.apache.iotdb.db.protocol.thrift.impl.DataNodeInternalRPCServiceImpl.pushPipeMeta(DataNodeInternalRPCServiceImpl.java:961)
            at 
org.apache.iotdb.mpp.rpc.thrift.IDataNodeRPCService$Processor$pushPipeMeta.getResult(IDataNodeRPCService.java:5746)
            at 
org.apache.iotdb.mpp.rpc.thrift.IDataNodeRPCService$Processor$pushPipeMeta.getResult(IDataNodeRPCService.java:5726)
            at 
org.apache.thrift.ProcessFunction.process(ProcessFunction.java:38)
            at org.apache.thrift.TBaseProcessor.process(TBaseProcessor.java:38)
            at 
org.apache.thrift.server.TThreadPoolServer$WorkerProcess.run(TThreadPoolServer.java:248)
            at 
java.util.concurrent.ThreadPoolExecutor.runWorker([email protected]/ThreadPoolExecutor.java:1136)
            at 
java.util.concurrent.ThreadPoolExecutor$Worker.run([email protected]/ThreadPoolExecutor.java:635)
            at java.lang.Thread.run([email protected]/Thread.java:833)
    "pool-25-IoTDB-Pipe-SubTask-Callback-Executor-Pool-1":
            at 
org.apache.iotdb.db.pipe.agent.task.PipeTaskDataNodeAgent.stopAllPipesWithCriticalException(PipeTaskDataNodeAgent.java:79)
            - waiting to lock <0x0000000404acc078> (a 
org.apache.iotdb.db.pipe.agent.task.PipeTaskDataNodeAgent)
            at 
org.apache.iotdb.db.pipe.agent.runtime.PipeRuntimeAgent.report(PipeRuntimeAgent.java:142)
            at 
org.apache.iotdb.db.pipe.event.EnrichedEvent.reportException(EnrichedEvent.java:220)
            at 
org.apache.iotdb.db.pipe.task.subtask.connector.PipeConnectorSubtask.onFailure(PipeConnectorSubtask.java:253)
            - locked <0x000000054e91ea58> (a 
org.apache.iotdb.db.pipe.task.subtask.connector.PipeConnectorSubtask)
            at 
com.google.common.util.concurrent.Futures$CallbackListener.run(Futures.java:1119)
            at 
org.apache.iotdb.commons.concurrent.WrappedRunnable$1.runMayThrow(WrappedRunnable.java:45)
            at 
org.apache.iotdb.commons.concurrent.WrappedRunnable.run(WrappedRunnable.java:30)
            at 
java.util.concurrent.ThreadPoolExecutor.runWorker([email protected]/ThreadPoolExecutor.java:1136)
            at 
java.util.concurrent.ThreadPoolExecutor$Worker.run([email protected]/ThreadPoolExecutor.java:635)
            at java.lang.Thread.run([email protected]/Thread.java:833)
    
    Found 1 deadlock.
---
 .../db/pipe/agent/runtime/PipeRuntimeAgent.java    |  2 +-
 .../db/pipe/agent/task/PipeTaskDataNodeAgent.java  | 29 ++++++++++++++++------
 .../commons/pipe/agent/task/PipeTaskAgent.java     | 20 +++++++++++----
 .../commons/pipe/task/meta/PipeMetaKeeper.java     |  4 +++
 4 files changed, 41 insertions(+), 14 deletions(-)

diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/runtime/PipeRuntimeAgent.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/runtime/PipeRuntimeAgent.java
index 991c57b3a15..c8efcecad43 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/runtime/PipeRuntimeAgent.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/runtime/PipeRuntimeAgent.java
@@ -139,7 +139,7 @@ public class PipeRuntimeAgent implements IService {
     // Quick stop all pipes locally if critical exception occurs,
     // no need to wait for the next heartbeat cycle.
     if (pipeRuntimeException instanceof PipeRuntimeCriticalException) {
-      PipeAgent.task().stopAllPipesWithCriticalException();
+      PipeAgent.task().stopAllPipesWithCriticalExceptionIfPossible();
     }
   }
 
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/PipeTaskDataNodeAgent.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/PipeTaskDataNodeAgent.java
index 4ac18a1168e..5402369a547 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/PipeTaskDataNodeAgent.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/PipeTaskDataNodeAgent.java
@@ -75,12 +75,25 @@ public class PipeTaskDataNodeAgent extends PipeTaskAgent {
     return new PipeDataNodeBuilder(pipeMetaFromConfigNode).build();
   }
 
-  public synchronized void stopAllPipesWithCriticalException() {
-    acquireWriteLock();
-    try {
-      stopAllPipesWithCriticalExceptionInternal();
-    } finally {
-      releaseWriteLock();
+  /**
+   * Using try lock method to prevent deadlock when stopping all pipes with 
critical exceptions and
+   * {@link PipeTaskDataNodeAgent#handlePipeMetaChanges(List)}} concurrently.
+   *
+   * @return true if the lock is acquired and all pipes are stopped, false if 
the lock is not
+   *     acquired
+   */
+  public boolean stopAllPipesWithCriticalExceptionIfPossible() {
+    if (tryWriteLockWithTimeOut(5)) {
+      try {
+        stopAllPipesWithCriticalExceptionInternal();
+        return true;
+      } finally {
+        releaseWriteLock();
+      }
+    } else {
+      LOGGER.info(
+          "Failed to stop all pipes with critical exception because of timeout 
(5s). Ignored.");
+      return false;
     }
   }
 
@@ -220,7 +233,7 @@ public class PipeTaskDataNodeAgent extends PipeTaskAgent {
 
   ///////////////////////// Heartbeat /////////////////////////
 
-  public synchronized void collectPipeMetaList(TDataNodeHeartbeatResp resp) 
throws TException {
+  public void collectPipeMetaList(TDataNodeHeartbeatResp resp) throws 
TException {
     // Try the lock instead of directly acquire it to prevent the block of the 
cluster heartbeat
     // 10s is the half of the HEARTBEAT_TIMEOUT_TIME defined in class 
BaseNodeCache in ConfigNode
     if (!tryReadLockWithTimeOut(10)) {
@@ -251,7 +264,7 @@ public class PipeTaskDataNodeAgent extends PipeTaskAgent {
     resp.setPipeMetaList(pipeMetaBinaryList);
   }
 
-  public synchronized void collectPipeMetaList(TPipeHeartbeatReq req, 
TPipeHeartbeatResp resp)
+  public void collectPipeMetaList(TPipeHeartbeatReq req, TPipeHeartbeatResp 
resp)
       throws TException {
     acquireReadLock();
     try {
diff --git 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/PipeTaskAgent.java
 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/PipeTaskAgent.java
index 79575a6bb17..0a32cd85dfc 100644
--- 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/PipeTaskAgent.java
+++ 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/PipeTaskAgent.java
@@ -81,7 +81,7 @@ public abstract class PipeTaskAgent {
       return pipeMetaKeeper.tryReadLock(timeOutInSeconds);
     } catch (InterruptedException e) {
       Thread.currentThread().interrupt();
-      LOGGER.warn("Interruption during requiring pipeMetaKeeper lock.", e);
+      LOGGER.warn("Interruption during requiring pipeMetaKeeper read lock.", 
e);
       return false;
     }
   }
@@ -94,13 +94,23 @@ public abstract class PipeTaskAgent {
     pipeMetaKeeper.acquireWriteLock();
   }
 
+  protected boolean tryWriteLockWithTimeOut(long timeOutInSeconds) {
+    try {
+      return pipeMetaKeeper.tryWriteLock(timeOutInSeconds);
+    } catch (InterruptedException e) {
+      Thread.currentThread().interrupt();
+      LOGGER.warn("Interruption during requiring pipeMetaKeeper write lock.", 
e);
+      return false;
+    }
+  }
+
   protected void releaseWriteLock() {
     pipeMetaKeeper.releaseWriteLock();
   }
 
   ////////////////////////// Pipe Task Management Entry 
//////////////////////////
 
-  public synchronized TPushPipeMetaRespExceptionMessage 
handleSinglePipeMetaChanges(
+  public TPushPipeMetaRespExceptionMessage handleSinglePipeMetaChanges(
       PipeMeta pipeMetaFromCoordinator) {
     acquireWriteLock();
     try {
@@ -272,7 +282,7 @@ public abstract class PipeTaskAgent {
     }
   }
 
-  public synchronized TPushPipeMetaRespExceptionMessage handleDropPipe(String 
pipeName) {
+  public TPushPipeMetaRespExceptionMessage handleDropPipe(String pipeName) {
     acquireWriteLock();
     try {
       return handleDropPipeInternal(pipeName);
@@ -299,7 +309,7 @@ public abstract class PipeTaskAgent {
     }
   }
 
-  public synchronized List<TPushPipeMetaRespExceptionMessage> 
handlePipeMetaChanges(
+  public List<TPushPipeMetaRespExceptionMessage> handlePipeMetaChanges(
       List<PipeMeta> pipeMetaListFromCoordinator) {
     acquireWriteLock();
     try {
@@ -356,7 +366,7 @@ public abstract class PipeTaskAgent {
     return exceptionMessages;
   }
 
-  public synchronized void dropAllPipeTasks() {
+  public void dropAllPipeTasks() {
     acquireWriteLock();
     try {
       dropAllPipeTasksInternal();
diff --git 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/task/meta/PipeMetaKeeper.java
 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/task/meta/PipeMetaKeeper.java
index 86b930b3b04..003eec9bdd8 100644
--- 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/task/meta/PipeMetaKeeper.java
+++ 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/task/meta/PipeMetaKeeper.java
@@ -59,6 +59,10 @@ public class PipeMetaKeeper {
     pipeMetaKeeperLock.writeLock().lock();
   }
 
+  public boolean tryWriteLock(long timeOut) throws InterruptedException {
+    return pipeMetaKeeperLock.writeLock().tryLock(timeOut, TimeUnit.SECONDS);
+  }
+
   public void releaseWriteLock() {
     pipeMetaKeeperLock.writeLock().unlock();
   }

Reply via email to