This is an automated email from the ASF dual-hosted git repository. jackietien pushed a commit to branch LocalDataBlockTransferDeadLock in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit 708eb29985ef62e7e97eaed5881340c9641412e7 Author: JackieTien97 <[email protected]> AuthorDate: Sun Jun 19 16:52:36 2022 +0800 Fix Dead Lock Bug --- .../execution/datatransfer/LocalSinkHandle.java | 72 ++++++++++++++-------- .../execution/datatransfer/LocalSourceHandle.java | 41 ++++++++---- .../execution/datatransfer/SharedTsBlockQueue.java | 23 +++---- 3 files changed, 84 insertions(+), 52 deletions(-) diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/datatransfer/LocalSinkHandle.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/datatransfer/LocalSinkHandle.java index fa59e7c98e..40b5760b82 100644 --- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/datatransfer/LocalSinkHandle.java +++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/datatransfer/LocalSinkHandle.java @@ -85,32 +85,43 @@ public class LocalSinkHandle implements ISinkHandle { @Override public boolean isFinished() { - return queue.hasNoMoreTsBlocks() && queue.isEmpty(); + synchronized (queue) { + return queue.hasNoMoreTsBlocks() && queue.isEmpty(); + } } public void checkAndInvokeOnFinished() { - if (isFinished()) { - synchronized (this) { - sinkHandleListener.onFinish(this); + synchronized (queue) { + if (isFinished()) { + synchronized (this) { + sinkHandleListener.onFinish(this); + } } } } @Override - public synchronized void send(List<TsBlock> tsBlocks) { + public void send(List<TsBlock> tsBlocks) { Validate.notNull(tsBlocks, "tsBlocks is null"); - if (aborted) { - throw new IllegalStateException("Sink handle is aborted."); - } - if (!blocked.isDone()) { - throw new IllegalStateException("Sink handle is blocked."); - } - if (queue.hasNoMoreTsBlocks()) { - return; + synchronized (this) { + if (aborted) { + throw new IllegalStateException("Sink handle is aborted."); + } + if (!blocked.isDone()) { + throw new IllegalStateException("Sink handle is blocked."); + } } - logger.info("send TsBlocks. Size: {}", tsBlocks.size()); - for (TsBlock tsBlock : tsBlocks) { - blocked = queue.add(tsBlock); + + synchronized (queue) { + if (queue.hasNoMoreTsBlocks()) { + return; + } + logger.info("send TsBlocks. Size: {}", tsBlocks.size()); + synchronized (this) { + for (TsBlock tsBlock : tsBlocks) { + blocked = queue.add(tsBlock); + } + } } } @@ -121,24 +132,33 @@ public class LocalSinkHandle implements ISinkHandle { @Override public void setNoMoreTsBlocks() { - synchronized (this) { - logger.info("set noMoreTsBlocks."); - if (aborted) { - return; + synchronized (queue) { + synchronized (this) { + logger.info("set noMoreTsBlocks."); + if (aborted) { + return; + } + queue.setNoMoreTsBlocks(true); + sinkHandleListener.onEndOfBlocks(this); } - queue.setNoMoreTsBlocks(true); - sinkHandleListener.onEndOfBlocks(this); } checkAndInvokeOnFinished(); logger.info("noMoreTsBlocks has been set."); } @Override - public synchronized void abort() { + public void abort() { logger.info("Sink handle is being aborted."); - aborted = true; - queue.destroy(); - sinkHandleListener.onAborted(this); + synchronized (queue) { + synchronized (this) { + if (aborted) { + return; + } + aborted = true; + queue.destroy(); + sinkHandleListener.onAborted(this); + } + } logger.info("Sink handle is aborted"); } diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/datatransfer/LocalSourceHandle.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/datatransfer/LocalSourceHandle.java index ac91afacda..644a21ccb9 100644 --- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/datatransfer/LocalSourceHandle.java +++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/datatransfer/LocalSourceHandle.java @@ -26,11 +26,16 @@ import org.apache.iotdb.tsfile.read.common.block.TsBlock; import com.google.common.util.concurrent.ListenableFuture; import io.airlift.concurrent.SetThreadName; import org.apache.commons.lang3.Validate; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import static com.google.common.util.concurrent.Futures.nonCancellationPropagating; import static org.apache.iotdb.db.mpp.execution.datatransfer.DataBlockManager.createFullIdFrom; public class LocalSourceHandle implements ISourceHandle { + + private static final Logger logger = LoggerFactory.getLogger(LocalSourceHandle.class); + private final TFragmentInstanceId remoteFragmentInstanceId; private final TFragmentInstanceId localFragmentInstanceId; private final String localPlanNodeId; @@ -81,7 +86,7 @@ public class LocalSourceHandle implements ISourceHandle { throw new IllegalStateException("Source handle is blocked."); } TsBlock tsBlock; - synchronized (this) { + synchronized (queue) { tsBlock = queue.remove(); } checkAndInvokeOnFinished(); @@ -91,16 +96,20 @@ public class LocalSourceHandle implements ISourceHandle { @Override public boolean isFinished() { - return queue.hasNoMoreTsBlocks() && queue.isEmpty(); + synchronized (queue) { + return queue.hasNoMoreTsBlocks() && queue.isEmpty(); + } } public void checkAndInvokeOnFinished() { - if (isFinished()) { - // Putting synchronized here rather than marking in method is to avoid deadlock. - // There are two locks need to invoke this method. One is lock of SharedTsBlockQueue, - // the other is lock of LocalSourceHandle. - synchronized (this) { - sourceHandleListener.onFinished(this); + synchronized (queue) { + if (isFinished()) { + // Putting synchronized here rather than marking in method is to avoid deadlock. + // There are two locks need to invoke this method. One is lock of SharedTsBlockQueue, + // the other is lock of LocalSourceHandle. + synchronized (this) { + sourceHandleListener.onFinished(this); + } } } } @@ -120,14 +129,20 @@ public class LocalSourceHandle implements ISourceHandle { @Override public synchronized void abort() { + logger.info("Source handle is being aborted."); try (SetThreadName sourceHandleName = new SetThreadName(threadName)) { - if (aborted) { - return; + synchronized (queue) { + synchronized (this) { + if (aborted) { + return; + } + queue.destroy(); + aborted = true; + sourceHandleListener.onAborted(this); + } } - queue.destroy(); - aborted = true; - sourceHandleListener.onAborted(this); } + logger.info("Source handle is aborted"); } public TFragmentInstanceId getRemoteFragmentInstanceId() { diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/datatransfer/SharedTsBlockQueue.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/datatransfer/SharedTsBlockQueue.java index 61b94f3ecd..729e7323b5 100644 --- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/datatransfer/SharedTsBlockQueue.java +++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/datatransfer/SharedTsBlockQueue.java @@ -29,11 +29,13 @@ import org.apache.commons.lang3.Validate; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import javax.annotation.concurrent.GuardedBy; +import javax.annotation.concurrent.NotThreadSafe; import java.util.LinkedList; import java.util.Queue; +/** This is not thread safe class, the caller should ensure multi-threads safety. */ +@NotThreadSafe public class SharedTsBlockQueue { private static final Logger logger = LoggerFactory.getLogger(SharedTsBlockQueue.class); @@ -41,22 +43,16 @@ public class SharedTsBlockQueue { private final TFragmentInstanceId localFragmentInstanceId; private final LocalMemoryManager localMemoryManager; - @GuardedBy("this") private boolean noMoreTsBlocks = false; - @GuardedBy("this") private long bufferRetainedSizeInBytes = 0L; - @GuardedBy("this") private final Queue<TsBlock> queue = new LinkedList<>(); - @GuardedBy("this") private SettableFuture<Void> blocked = SettableFuture.create(); - @GuardedBy("this") private ListenableFuture<Void> blockedOnMemory; - @GuardedBy("this") private boolean destroyed = false; private LocalSourceHandle sourceHandle; @@ -70,7 +66,7 @@ public class SharedTsBlockQueue { Validate.notNull(localMemoryManager, "local memory manager cannot be null"); } - public synchronized boolean hasNoMoreTsBlocks() { + public boolean hasNoMoreTsBlocks() { return noMoreTsBlocks; } @@ -82,7 +78,7 @@ public class SharedTsBlockQueue { return blocked; } - public synchronized boolean isEmpty() { + public boolean isEmpty() { return queue.isEmpty(); } @@ -95,7 +91,8 @@ public class SharedTsBlockQueue { } /** Notify no more tsblocks will be added to the queue. */ - public synchronized void setNoMoreTsBlocks(boolean noMoreTsBlocks) { + public void setNoMoreTsBlocks(boolean noMoreTsBlocks) { + logger.info("SharedTsBlockQueue receive no more TsBlocks signal."); if (destroyed) { throw new IllegalStateException("queue has been destroyed"); } @@ -112,7 +109,7 @@ public class SharedTsBlockQueue { * Remove a tsblock from the head of the queue and return. Should be invoked only when the future * returned by {@link #isBlocked()} completes. */ - public synchronized TsBlock remove() { + public TsBlock remove() { if (destroyed) { throw new IllegalStateException("queue has been destroyed"); } @@ -136,7 +133,7 @@ public class SharedTsBlockQueue { * Add tsblocks to the queue. Except the first invocation, this method should be invoked only when * the returned future of last invocation completes. */ - public synchronized ListenableFuture<Void> add(TsBlock tsBlock) { + public ListenableFuture<Void> add(TsBlock tsBlock) { if (destroyed) { throw new IllegalStateException("queue has been destroyed"); } @@ -156,7 +153,7 @@ public class SharedTsBlockQueue { } /** Destroy the queue and cancel the future. */ - public synchronized void destroy() { + public void destroy() { if (destroyed) { return; }
