This is an automated email from the ASF dual-hosted git repository.

xingtanzjr pushed a commit to branch xingtanzjr/fix_local_sink
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit 072c82ee85fd87cd6420e9d24737c627c346d751
Author: Jinrui.Zhang <[email protected]>
AuthorDate: Wed Jun 15 16:49:40 2022 +0800

    fix the issues in LocalSinkHandle/LocalSourceHandle
---
 .../execution/datatransfer/LocalSinkHandle.java    | 11 +++----
 .../execution/datatransfer/LocalSourceHandle.java  | 18 ++++++++---
 .../execution/datatransfer/SharedTsBlockQueue.java | 36 +++++++++++++++++++++-
 3 files changed, 54 insertions(+), 11 deletions(-)

diff --git 
a/server/src/main/java/org/apache/iotdb/db/mpp/execution/datatransfer/LocalSinkHandle.java
 
b/server/src/main/java/org/apache/iotdb/db/mpp/execution/datatransfer/LocalSinkHandle.java
index d1a8ac33f1..a16edf5573 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/mpp/execution/datatransfer/LocalSinkHandle.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/mpp/execution/datatransfer/LocalSinkHandle.java
@@ -99,6 +99,7 @@ public class LocalSinkHandle implements ISinkHandle {
     if (queue.hasNoMoreTsBlocks()) {
       return;
     }
+    logger.info("send TsBlocks. Size: {}", tsBlocks.size());
     for (TsBlock tsBlock : tsBlocks) {
       blocked = queue.add(tsBlock);
     }
@@ -111,23 +112,21 @@ public class LocalSinkHandle implements ISinkHandle {
 
   @Override
   public synchronized void setNoMoreTsBlocks() {
-    logger.info("Set no-more-tsblocks.");
+    logger.info("set noMoreTsBlocks.");
     if (aborted) {
       return;
     }
     queue.setNoMoreTsBlocks(true);
     sinkHandleListener.onEndOfBlocks(this);
-    if (isFinished()) {
-      sinkHandleListener.onFinish(this);
-    }
-    logger.info("No-more-tsblocks has been set.");
+    sinkHandleListener.onFinish(this);
+    logger.info("noMoreTsBlocks has been set.");
   }
 
   @Override
   public synchronized void abort() {
     logger.info("Sink handle is being aborted.");
     aborted = true;
-    queue.destroy();
+    queue.producerFinished();
     sinkHandleListener.onAborted(this);
     logger.info("Sink handle is aborted");
   }
diff --git 
a/server/src/main/java/org/apache/iotdb/db/mpp/execution/datatransfer/LocalSourceHandle.java
 
b/server/src/main/java/org/apache/iotdb/db/mpp/execution/datatransfer/LocalSourceHandle.java
index 14a0ebc664..768797ba14 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/mpp/execution/datatransfer/LocalSourceHandle.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/mpp/execution/datatransfer/LocalSourceHandle.java
@@ -55,6 +55,7 @@ public class LocalSourceHandle implements ISourceHandle {
     this.localFragmentInstanceId = Validate.notNull(localFragmentInstanceId);
     this.localPlanNodeId = Validate.notNull(localPlanNodeId);
     this.queue = Validate.notNull(queue);
+    this.queue.setConsumer(this);
     this.sourceHandleListener = Validate.notNull(sourceHandleListener);
     this.threadName =
         createFullIdFrom(localFragmentInstanceId, localPlanNodeId + "." + 
"SourceHandle");
@@ -88,9 +89,7 @@ public class LocalSourceHandle implements ISourceHandle {
       synchronized (this) {
         tsBlock = queue.remove();
       }
-      if (isFinished()) {
-        sourceHandleListener.onFinished(this);
-      }
+      checkAndInvokeOnFinished();
       return tsBlock;
     }
   }
@@ -100,6 +99,17 @@ public class LocalSourceHandle implements ISourceHandle {
     return queue.hasNoMoreTsBlocks() && queue.isEmpty();
   }
 
+  public void checkAndInvokeOnFinished() {
+    if (isFinished()) {
+      // Putting synchronized here rather than marking in method is to avoid 
deadlock.
+      // There are two locks need to invoke this method. One is lock of 
SharedTsBlockQueue,
+      // the other is lock of LocalSourceHandle.
+      synchronized(this) {
+        sourceHandleListener.onFinished(this);
+      }
+    }
+  }
+
   @Override
   public ListenableFuture<Void> isBlocked() {
     if (aborted) {
@@ -119,7 +129,7 @@ public class LocalSourceHandle implements ISourceHandle {
       if (aborted) {
         return;
       }
-      queue.destroy();
+      queue.consumerFinished();
       aborted = true;
       sourceHandleListener.onAborted(this);
     }
diff --git 
a/server/src/main/java/org/apache/iotdb/db/mpp/execution/datatransfer/SharedTsBlockQueue.java
 
b/server/src/main/java/org/apache/iotdb/db/mpp/execution/datatransfer/SharedTsBlockQueue.java
index 3d991b1aa9..93c2ff4c9d 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/mpp/execution/datatransfer/SharedTsBlockQueue.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/mpp/execution/datatransfer/SharedTsBlockQueue.java
@@ -26,6 +26,8 @@ import org.apache.iotdb.tsfile.read.common.block.TsBlock;
 import com.google.common.util.concurrent.ListenableFuture;
 import com.google.common.util.concurrent.SettableFuture;
 import org.apache.commons.lang3.Validate;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import javax.annotation.concurrent.GuardedBy;
 
@@ -34,6 +36,8 @@ import java.util.Queue;
 
 public class SharedTsBlockQueue {
 
+  private static final Logger logger = 
LoggerFactory.getLogger(SharedTsBlockQueue.class);
+
   private final TFragmentInstanceId localFragmentInstanceId;
   private final LocalMemoryManager localMemoryManager;
 
@@ -55,6 +59,10 @@ public class SharedTsBlockQueue {
   @GuardedBy("this")
   private boolean destroyed = false;
 
+  private LocalSourceHandle consumer;
+  private boolean consumerFinished;
+  private boolean producerFinished;
+
   public SharedTsBlockQueue(
       TFragmentInstanceId fragmentInstanceId, LocalMemoryManager 
localMemoryManager) {
     this.localFragmentInstanceId =
@@ -79,12 +87,22 @@ public class SharedTsBlockQueue {
     return queue.isEmpty();
   }
 
+  public void setConsumer(LocalSourceHandle consumer) {
+    this.consumer = consumer;
+  }
+
   /** Notify no more tsblocks will be added to the queue. */
   public synchronized void setNoMoreTsBlocks(boolean noMoreTsBlocks) {
     if (destroyed) {
       throw new IllegalStateException("queue has been destroyed");
     }
     this.noMoreTsBlocks = noMoreTsBlocks;
+    if (!blocked.isDone()) {
+      blocked.set(null);
+    }
+    if (this.consumer != null) {
+      this.consumer.checkAndInvokeOnFinished();
+    }
   }
 
   /**
@@ -115,7 +133,7 @@ public class SharedTsBlockQueue {
       throw new IllegalStateException("queue has been destroyed");
     }
 
-    Validate.notNull(tsBlock, "tsblock cannot be null");
+    Validate.notNull(tsBlock, "TsBlock cannot be null");
     Validate.isTrue(blockedOnMemory == null || blockedOnMemory.isDone(), 
"queue is full");
     blockedOnMemory =
         localMemoryManager
@@ -129,6 +147,22 @@ public class SharedTsBlockQueue {
     return blockedOnMemory;
   }
 
+  public synchronized void consumerFinished() {
+    this.consumerFinished = true;
+    tryDestroy();
+  }
+
+  public synchronized void producerFinished() {
+    this.producerFinished = true;
+    tryDestroy();
+  }
+
+  private void tryDestroy() {
+    if (this.consumerFinished && this.producerFinished) {
+      destroy();
+    }
+  }
+
   /** Destroy the queue and cancel the future. */
   public synchronized void destroy() {
     if (destroyed) {

Reply via email to