This is an automated email from the ASF dual-hosted git repository.

jackietien pushed a commit to branch LocalDataBlockTransferDeadLock
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit 708eb29985ef62e7e97eaed5881340c9641412e7
Author: JackieTien97 <[email protected]>
AuthorDate: Sun Jun 19 16:52:36 2022 +0800

    Fix Dead Lock Bug
---
 .../execution/datatransfer/LocalSinkHandle.java    | 72 ++++++++++++++--------
 .../execution/datatransfer/LocalSourceHandle.java  | 41 ++++++++----
 .../execution/datatransfer/SharedTsBlockQueue.java | 23 +++----
 3 files changed, 84 insertions(+), 52 deletions(-)

diff --git 
a/server/src/main/java/org/apache/iotdb/db/mpp/execution/datatransfer/LocalSinkHandle.java
 
b/server/src/main/java/org/apache/iotdb/db/mpp/execution/datatransfer/LocalSinkHandle.java
index fa59e7c98e..40b5760b82 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/mpp/execution/datatransfer/LocalSinkHandle.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/mpp/execution/datatransfer/LocalSinkHandle.java
@@ -85,32 +85,43 @@ public class LocalSinkHandle implements ISinkHandle {
 
   @Override
   public boolean isFinished() {
-    return queue.hasNoMoreTsBlocks() && queue.isEmpty();
+    synchronized (queue) {
+      return queue.hasNoMoreTsBlocks() && queue.isEmpty();
+    }
   }
 
   public void checkAndInvokeOnFinished() {
-    if (isFinished()) {
-      synchronized (this) {
-        sinkHandleListener.onFinish(this);
+    synchronized (queue) {
+      if (isFinished()) {
+        synchronized (this) {
+          sinkHandleListener.onFinish(this);
+        }
       }
     }
   }
 
   @Override
-  public synchronized void send(List<TsBlock> tsBlocks) {
+  public void send(List<TsBlock> tsBlocks) {
     Validate.notNull(tsBlocks, "tsBlocks is null");
-    if (aborted) {
-      throw new IllegalStateException("Sink handle is aborted.");
-    }
-    if (!blocked.isDone()) {
-      throw new IllegalStateException("Sink handle is blocked.");
-    }
-    if (queue.hasNoMoreTsBlocks()) {
-      return;
+    synchronized (this) {
+      if (aborted) {
+        throw new IllegalStateException("Sink handle is aborted.");
+      }
+      if (!blocked.isDone()) {
+        throw new IllegalStateException("Sink handle is blocked.");
+      }
     }
-    logger.info("send TsBlocks. Size: {}", tsBlocks.size());
-    for (TsBlock tsBlock : tsBlocks) {
-      blocked = queue.add(tsBlock);
+
+    synchronized (queue) {
+      if (queue.hasNoMoreTsBlocks()) {
+        return;
+      }
+      logger.info("send TsBlocks. Size: {}", tsBlocks.size());
+      synchronized (this) {
+        for (TsBlock tsBlock : tsBlocks) {
+          blocked = queue.add(tsBlock);
+        }
+      }
     }
   }
 
@@ -121,24 +132,33 @@ public class LocalSinkHandle implements ISinkHandle {
 
   @Override
   public void setNoMoreTsBlocks() {
-    synchronized (this) {
-      logger.info("set noMoreTsBlocks.");
-      if (aborted) {
-        return;
+    synchronized (queue) {
+      synchronized (this) {
+        logger.info("set noMoreTsBlocks.");
+        if (aborted) {
+          return;
+        }
+        queue.setNoMoreTsBlocks(true);
+        sinkHandleListener.onEndOfBlocks(this);
       }
-      queue.setNoMoreTsBlocks(true);
-      sinkHandleListener.onEndOfBlocks(this);
     }
     checkAndInvokeOnFinished();
     logger.info("noMoreTsBlocks has been set.");
   }
 
   @Override
-  public synchronized void abort() {
+  public void abort() {
     logger.info("Sink handle is being aborted.");
-    aborted = true;
-    queue.destroy();
-    sinkHandleListener.onAborted(this);
+    synchronized (queue) {
+      synchronized (this) {
+        if (aborted) {
+          return;
+        }
+        aborted = true;
+        queue.destroy();
+        sinkHandleListener.onAborted(this);
+      }
+    }
     logger.info("Sink handle is aborted");
   }
 
diff --git 
a/server/src/main/java/org/apache/iotdb/db/mpp/execution/datatransfer/LocalSourceHandle.java
 
b/server/src/main/java/org/apache/iotdb/db/mpp/execution/datatransfer/LocalSourceHandle.java
index ac91afacda..644a21ccb9 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/mpp/execution/datatransfer/LocalSourceHandle.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/mpp/execution/datatransfer/LocalSourceHandle.java
@@ -26,11 +26,16 @@ import org.apache.iotdb.tsfile.read.common.block.TsBlock;
 import com.google.common.util.concurrent.ListenableFuture;
 import io.airlift.concurrent.SetThreadName;
 import org.apache.commons.lang3.Validate;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import static 
com.google.common.util.concurrent.Futures.nonCancellationPropagating;
 import static 
org.apache.iotdb.db.mpp.execution.datatransfer.DataBlockManager.createFullIdFrom;
 
 public class LocalSourceHandle implements ISourceHandle {
+
+  private static final Logger logger = 
LoggerFactory.getLogger(LocalSourceHandle.class);
+
   private final TFragmentInstanceId remoteFragmentInstanceId;
   private final TFragmentInstanceId localFragmentInstanceId;
   private final String localPlanNodeId;
@@ -81,7 +86,7 @@ public class LocalSourceHandle implements ISourceHandle {
         throw new IllegalStateException("Source handle is blocked.");
       }
       TsBlock tsBlock;
-      synchronized (this) {
+      synchronized (queue) {
         tsBlock = queue.remove();
       }
       checkAndInvokeOnFinished();
@@ -91,16 +96,20 @@ public class LocalSourceHandle implements ISourceHandle {
 
   @Override
   public boolean isFinished() {
-    return queue.hasNoMoreTsBlocks() && queue.isEmpty();
+    synchronized (queue) {
+      return queue.hasNoMoreTsBlocks() && queue.isEmpty();
+    }
   }
 
   public void checkAndInvokeOnFinished() {
-    if (isFinished()) {
-      // Putting synchronized here rather than marking in method is to avoid 
deadlock.
-      // There are two locks need to invoke this method. One is lock of 
SharedTsBlockQueue,
-      // the other is lock of LocalSourceHandle.
-      synchronized (this) {
-        sourceHandleListener.onFinished(this);
+    synchronized (queue) {
+      if (isFinished()) {
+        // Putting synchronized here rather than marking in method is to avoid 
deadlock.
+        // There are two locks need to invoke this method. One is lock of 
SharedTsBlockQueue,
+        // the other is lock of LocalSourceHandle.
+        synchronized (this) {
+          sourceHandleListener.onFinished(this);
+        }
       }
     }
   }
@@ -120,14 +129,20 @@ public class LocalSourceHandle implements ISourceHandle {
 
   @Override
   public synchronized void abort() {
+    logger.info("Source handle is being aborted.");
     try (SetThreadName sourceHandleName = new SetThreadName(threadName)) {
-      if (aborted) {
-        return;
+      synchronized (queue) {
+        synchronized (this) {
+          if (aborted) {
+            return;
+          }
+          queue.destroy();
+          aborted = true;
+          sourceHandleListener.onAborted(this);
+        }
       }
-      queue.destroy();
-      aborted = true;
-      sourceHandleListener.onAborted(this);
     }
+    logger.info("Source handle is aborted");
   }
 
   public TFragmentInstanceId getRemoteFragmentInstanceId() {
diff --git 
a/server/src/main/java/org/apache/iotdb/db/mpp/execution/datatransfer/SharedTsBlockQueue.java
 
b/server/src/main/java/org/apache/iotdb/db/mpp/execution/datatransfer/SharedTsBlockQueue.java
index 61b94f3ecd..729e7323b5 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/mpp/execution/datatransfer/SharedTsBlockQueue.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/mpp/execution/datatransfer/SharedTsBlockQueue.java
@@ -29,11 +29,13 @@ import org.apache.commons.lang3.Validate;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import javax.annotation.concurrent.GuardedBy;
+import javax.annotation.concurrent.NotThreadSafe;
 
 import java.util.LinkedList;
 import java.util.Queue;
 
+/** This is not thread safe class, the caller should ensure multi-threads 
safety. */
+@NotThreadSafe
 public class SharedTsBlockQueue {
 
   private static final Logger logger = 
LoggerFactory.getLogger(SharedTsBlockQueue.class);
@@ -41,22 +43,16 @@ public class SharedTsBlockQueue {
   private final TFragmentInstanceId localFragmentInstanceId;
   private final LocalMemoryManager localMemoryManager;
 
-  @GuardedBy("this")
   private boolean noMoreTsBlocks = false;
 
-  @GuardedBy("this")
   private long bufferRetainedSizeInBytes = 0L;
 
-  @GuardedBy("this")
   private final Queue<TsBlock> queue = new LinkedList<>();
 
-  @GuardedBy("this")
   private SettableFuture<Void> blocked = SettableFuture.create();
 
-  @GuardedBy("this")
   private ListenableFuture<Void> blockedOnMemory;
 
-  @GuardedBy("this")
   private boolean destroyed = false;
 
   private LocalSourceHandle sourceHandle;
@@ -70,7 +66,7 @@ public class SharedTsBlockQueue {
         Validate.notNull(localMemoryManager, "local memory manager cannot be 
null");
   }
 
-  public synchronized boolean hasNoMoreTsBlocks() {
+  public boolean hasNoMoreTsBlocks() {
     return noMoreTsBlocks;
   }
 
@@ -82,7 +78,7 @@ public class SharedTsBlockQueue {
     return blocked;
   }
 
-  public synchronized boolean isEmpty() {
+  public boolean isEmpty() {
     return queue.isEmpty();
   }
 
@@ -95,7 +91,8 @@ public class SharedTsBlockQueue {
   }
 
   /** Notify no more tsblocks will be added to the queue. */
-  public synchronized void setNoMoreTsBlocks(boolean noMoreTsBlocks) {
+  public void setNoMoreTsBlocks(boolean noMoreTsBlocks) {
+    logger.info("SharedTsBlockQueue receive no more TsBlocks signal.");
     if (destroyed) {
       throw new IllegalStateException("queue has been destroyed");
     }
@@ -112,7 +109,7 @@ public class SharedTsBlockQueue {
    * Remove a tsblock from the head of the queue and return. Should be invoked 
only when the future
    * returned by {@link #isBlocked()} completes.
    */
-  public synchronized TsBlock remove() {
+  public TsBlock remove() {
     if (destroyed) {
       throw new IllegalStateException("queue has been destroyed");
     }
@@ -136,7 +133,7 @@ public class SharedTsBlockQueue {
    * Add tsblocks to the queue. Except the first invocation, this method 
should be invoked only when
    * the returned future of last invocation completes.
    */
-  public synchronized ListenableFuture<Void> add(TsBlock tsBlock) {
+  public ListenableFuture<Void> add(TsBlock tsBlock) {
     if (destroyed) {
       throw new IllegalStateException("queue has been destroyed");
     }
@@ -156,7 +153,7 @@ public class SharedTsBlockQueue {
   }
 
   /** Destroy the queue and cancel the future. */
-  public synchronized void destroy() {
+  public void destroy() {
     if (destroyed) {
       return;
     }

Reply via email to