This is an automated email from the ASF dual-hosted git repository. rong pushed a commit to branch fix-pipe-deadlock in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit 8d0fadbbcb89e1dc7d6f0f24cfe6bab93fbd2dcf Author: Steve Yurong Su <[email protected]> AuthorDate: Sun Jan 14 22:42:00 2024 +0800 Pipe: Fix deadlock when PipeTaskAgent.handlePipeMetaChanges and PipeTaskDataNodeAgent.stopAllPipesWithCriticalException are invoked concurrently Found one Java-level deadlock: ============================= "pool-41-IoTDB-DataNodeInternalRPC-Processor-2": waiting to lock monitor 0x00000299461ba110 (object 0x000000054e91ea58, a org.apache.iotdb.db.pipe.task.subtask.connector.PipeConnectorSubtask), which is held by "pool-25-IoTDB-Pipe-SubTask-Callback-Executor-Pool-1" "pool-25-IoTDB-Pipe-SubTask-Callback-Executor-Pool-1": waiting to lock monitor 0x000002994731e980 (object 0x0000000404acc078, a org.apache.iotdb.db.pipe.agent.task.PipeTaskDataNodeAgent), which is held by "pool-41-IoTDB-DataNodeInternalRPC-Processor-2" Java stack information for the threads listed above: =================================================== "pool-41-IoTDB-DataNodeInternalRPC-Processor-2": at org.apache.iotdb.db.pipe.task.subtask.connector.PipeConnectorSubtask.submitSelf(PipeConnectorSubtask.java:297) - waiting to lock <0x000000054e91ea58> (a org.apache.iotdb.db.pipe.task.subtask.connector.PipeConnectorSubtask) at org.apache.iotdb.commons.pipe.execution.executor.PipeSubtaskExecutor.start(PipeSubtaskExecutor.java:90) - locked <0x000000054e91e840> (a org.apache.iotdb.db.pipe.execution.executor.dataregion.PipeDataRegionConnectorSubtaskExecutor) at org.apache.iotdb.db.pipe.task.subtask.connector.PipeConnectorSubtaskLifeCycle.start(PipeConnectorSubtaskLifeCycle.java:121) - locked <0x000000054e02eb90> (a org.apache.iotdb.db.pipe.task.subtask.connector.PipeConnectorSubtaskLifeCycle) at org.apache.iotdb.db.pipe.task.subtask.connector.PipeConnectorSubtaskManager.start(PipeConnectorSubtaskManager.java:155) - locked <0x000000054e02ea40> (a org.apache.iotdb.db.pipe.task.subtask.connector.PipeConnectorSubtaskManager) at org.apache.iotdb.db.pipe.task.stage.PipeTaskConnectorStage.startSubtask(PipeTaskConnectorStage.java:66) at org.apache.iotdb.commons.pipe.task.stage.PipeTaskStage.start(PipeTaskStage.java:86) - locked <0x000000054ee138f8> (a org.apache.iotdb.db.pipe.task.stage.PipeTaskConnectorStage) at org.apache.iotdb.db.pipe.task.PipeDataNodeTask.start(PipeDataNodeTask.java:67) at org.apache.iotdb.commons.pipe.agent.task.PipeTaskAgent.startPipe(PipeTaskAgent.java:513) at org.apache.iotdb.commons.pipe.agent.task.PipeTaskAgent.executeSinglePipeRuntimeMetaChanges(PipeTaskAgent.java:248) at org.apache.iotdb.commons.pipe.agent.task.PipeTaskAgent.executeSinglePipeMetaChanges(PipeTaskAgent.java:168) at org.apache.iotdb.commons.pipe.agent.task.PipeTaskAgent.handlePipeMetaChangesInternal(PipeTaskAgent.java:325) at org.apache.iotdb.commons.pipe.agent.task.PipeTaskAgent.handlePipeMetaChanges(PipeTaskAgent.java:306) - locked <0x0000000404acc078> (a org.apache.iotdb.db.pipe.agent.task.PipeTaskDataNodeAgent) at org.apache.iotdb.db.protocol.thrift.impl.DataNodeInternalRPCServiceImpl.pushPipeMeta(DataNodeInternalRPCServiceImpl.java:961) at org.apache.iotdb.mpp.rpc.thrift.IDataNodeRPCService$Processor$pushPipeMeta.getResult(IDataNodeRPCService.java:5746) at org.apache.iotdb.mpp.rpc.thrift.IDataNodeRPCService$Processor$pushPipeMeta.getResult(IDataNodeRPCService.java:5726) at org.apache.thrift.ProcessFunction.process(ProcessFunction.java:38) at org.apache.thrift.TBaseProcessor.process(TBaseProcessor.java:38) at org.apache.thrift.server.TThreadPoolServer$WorkerProcess.run(TThreadPoolServer.java:248) at java.util.concurrent.ThreadPoolExecutor.runWorker([email protected]/ThreadPoolExecutor.java:1136) at java.util.concurrent.ThreadPoolExecutor$Worker.run([email protected]/ThreadPoolExecutor.java:635) at java.lang.Thread.run([email protected]/Thread.java:833) "pool-25-IoTDB-Pipe-SubTask-Callback-Executor-Pool-1": at org.apache.iotdb.db.pipe.agent.task.PipeTaskDataNodeAgent.stopAllPipesWithCriticalException(PipeTaskDataNodeAgent.java:79) - waiting to lock <0x0000000404acc078> (a org.apache.iotdb.db.pipe.agent.task.PipeTaskDataNodeAgent) at org.apache.iotdb.db.pipe.agent.runtime.PipeRuntimeAgent.report(PipeRuntimeAgent.java:142) at org.apache.iotdb.db.pipe.event.EnrichedEvent.reportException(EnrichedEvent.java:220) at org.apache.iotdb.db.pipe.task.subtask.connector.PipeConnectorSubtask.onFailure(PipeConnectorSubtask.java:253) - locked <0x000000054e91ea58> (a org.apache.iotdb.db.pipe.task.subtask.connector.PipeConnectorSubtask) at com.google.common.util.concurrent.Futures$CallbackListener.run(Futures.java:1119) at org.apache.iotdb.commons.concurrent.WrappedRunnable$1.runMayThrow(WrappedRunnable.java:45) at org.apache.iotdb.commons.concurrent.WrappedRunnable.run(WrappedRunnable.java:30) at java.util.concurrent.ThreadPoolExecutor.runWorker([email protected]/ThreadPoolExecutor.java:1136) at java.util.concurrent.ThreadPoolExecutor$Worker.run([email protected]/ThreadPoolExecutor.java:635) at java.lang.Thread.run([email protected]/Thread.java:833) Found 1 deadlock. --- .../db/pipe/agent/runtime/PipeRuntimeAgent.java | 2 +- .../db/pipe/agent/task/PipeTaskDataNodeAgent.java | 29 ++++++++++++++++------ .../commons/pipe/agent/task/PipeTaskAgent.java | 20 +++++++++++---- .../commons/pipe/task/meta/PipeMetaKeeper.java | 4 +++ 4 files changed, 41 insertions(+), 14 deletions(-) diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/runtime/PipeRuntimeAgent.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/runtime/PipeRuntimeAgent.java index 991c57b3a15..c8efcecad43 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/runtime/PipeRuntimeAgent.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/runtime/PipeRuntimeAgent.java @@ -139,7 +139,7 @@ public class PipeRuntimeAgent implements IService { // Quick stop all pipes locally if critical exception occurs, // no need to wait for the next heartbeat cycle. if (pipeRuntimeException instanceof PipeRuntimeCriticalException) { - PipeAgent.task().stopAllPipesWithCriticalException(); + PipeAgent.task().stopAllPipesWithCriticalExceptionIfPossible(); } } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/PipeTaskDataNodeAgent.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/PipeTaskDataNodeAgent.java index 4ac18a1168e..5402369a547 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/PipeTaskDataNodeAgent.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/PipeTaskDataNodeAgent.java @@ -75,12 +75,25 @@ public class PipeTaskDataNodeAgent extends PipeTaskAgent { return new PipeDataNodeBuilder(pipeMetaFromConfigNode).build(); } - public synchronized void stopAllPipesWithCriticalException() { - acquireWriteLock(); - try { - stopAllPipesWithCriticalExceptionInternal(); - } finally { - releaseWriteLock(); + /** + * Using try lock method to prevent deadlock when stopping all pipes with critical exceptions and + * {@link PipeTaskDataNodeAgent#handlePipeMetaChanges(List)}} concurrently. + * + * @return true if the lock is acquired and all pipes are stopped, false if the lock is not + * acquired + */ + public boolean stopAllPipesWithCriticalExceptionIfPossible() { + if (tryWriteLockWithTimeOut(5)) { + try { + stopAllPipesWithCriticalExceptionInternal(); + return true; + } finally { + releaseWriteLock(); + } + } else { + LOGGER.info( + "Failed to stop all pipes with critical exception because of timeout (5s). Ignored."); + return false; } } @@ -220,7 +233,7 @@ public class PipeTaskDataNodeAgent extends PipeTaskAgent { ///////////////////////// Heartbeat ///////////////////////// - public synchronized void collectPipeMetaList(TDataNodeHeartbeatResp resp) throws TException { + public void collectPipeMetaList(TDataNodeHeartbeatResp resp) throws TException { // Try the lock instead of directly acquire it to prevent the block of the cluster heartbeat // 10s is the half of the HEARTBEAT_TIMEOUT_TIME defined in class BaseNodeCache in ConfigNode if (!tryReadLockWithTimeOut(10)) { @@ -251,7 +264,7 @@ public class PipeTaskDataNodeAgent extends PipeTaskAgent { resp.setPipeMetaList(pipeMetaBinaryList); } - public synchronized void collectPipeMetaList(TPipeHeartbeatReq req, TPipeHeartbeatResp resp) + public void collectPipeMetaList(TPipeHeartbeatReq req, TPipeHeartbeatResp resp) throws TException { acquireReadLock(); try { diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/PipeTaskAgent.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/PipeTaskAgent.java index 79575a6bb17..0a32cd85dfc 100644 --- a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/PipeTaskAgent.java +++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/PipeTaskAgent.java @@ -81,7 +81,7 @@ public abstract class PipeTaskAgent { return pipeMetaKeeper.tryReadLock(timeOutInSeconds); } catch (InterruptedException e) { Thread.currentThread().interrupt(); - LOGGER.warn("Interruption during requiring pipeMetaKeeper lock.", e); + LOGGER.warn("Interruption during requiring pipeMetaKeeper read lock.", e); return false; } } @@ -94,13 +94,23 @@ public abstract class PipeTaskAgent { pipeMetaKeeper.acquireWriteLock(); } + protected boolean tryWriteLockWithTimeOut(long timeOutInSeconds) { + try { + return pipeMetaKeeper.tryWriteLock(timeOutInSeconds); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + LOGGER.warn("Interruption during requiring pipeMetaKeeper write lock.", e); + return false; + } + } + protected void releaseWriteLock() { pipeMetaKeeper.releaseWriteLock(); } ////////////////////////// Pipe Task Management Entry ////////////////////////// - public synchronized TPushPipeMetaRespExceptionMessage handleSinglePipeMetaChanges( + public TPushPipeMetaRespExceptionMessage handleSinglePipeMetaChanges( PipeMeta pipeMetaFromCoordinator) { acquireWriteLock(); try { @@ -272,7 +282,7 @@ public abstract class PipeTaskAgent { } } - public synchronized TPushPipeMetaRespExceptionMessage handleDropPipe(String pipeName) { + public TPushPipeMetaRespExceptionMessage handleDropPipe(String pipeName) { acquireWriteLock(); try { return handleDropPipeInternal(pipeName); @@ -299,7 +309,7 @@ public abstract class PipeTaskAgent { } } - public synchronized List<TPushPipeMetaRespExceptionMessage> handlePipeMetaChanges( + public List<TPushPipeMetaRespExceptionMessage> handlePipeMetaChanges( List<PipeMeta> pipeMetaListFromCoordinator) { acquireWriteLock(); try { @@ -356,7 +366,7 @@ public abstract class PipeTaskAgent { return exceptionMessages; } - public synchronized void dropAllPipeTasks() { + public void dropAllPipeTasks() { acquireWriteLock(); try { dropAllPipeTasksInternal(); diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/task/meta/PipeMetaKeeper.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/task/meta/PipeMetaKeeper.java index 86b930b3b04..003eec9bdd8 100644 --- a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/task/meta/PipeMetaKeeper.java +++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/task/meta/PipeMetaKeeper.java @@ -59,6 +59,10 @@ public class PipeMetaKeeper { pipeMetaKeeperLock.writeLock().lock(); } + public boolean tryWriteLock(long timeOut) throws InterruptedException { + return pipeMetaKeeperLock.writeLock().tryLock(timeOut, TimeUnit.SECONDS); + } + public void releaseWriteLock() { pipeMetaKeeperLock.writeLock().unlock(); }
