This is an automated email from the ASF dual-hosted git repository.

xingtanzjr pushed a commit to branch xingtanzjr/fix_local_sink
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/xingtanzjr/fix_local_sink by 
this push:
     new 4c0ec7b4b6 fix the bug that SourceHandle's finish won't be triggered
4c0ec7b4b6 is described below

commit 4c0ec7b4b6fc5faa390cba334c3111b125bfd535
Author: Jinrui.Zhang <[email protected]>
AuthorDate: Wed Jun 15 11:45:04 2022 +0800

    fix the bug that SourceHandle's finish won't be triggered
---
 .../db/mpp/execution/datatransfer/LocalSourceHandle.java      | 11 ++++++++---
 .../db/mpp/execution/datatransfer/SharedTsBlockQueue.java     |  9 +++++++++
 2 files changed, 17 insertions(+), 3 deletions(-)

diff --git 
a/server/src/main/java/org/apache/iotdb/db/mpp/execution/datatransfer/LocalSourceHandle.java
 
b/server/src/main/java/org/apache/iotdb/db/mpp/execution/datatransfer/LocalSourceHandle.java
index 14a0ebc664..463db59e06 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/mpp/execution/datatransfer/LocalSourceHandle.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/mpp/execution/datatransfer/LocalSourceHandle.java
@@ -55,6 +55,7 @@ public class LocalSourceHandle implements ISourceHandle {
     this.localFragmentInstanceId = Validate.notNull(localFragmentInstanceId);
     this.localPlanNodeId = Validate.notNull(localPlanNodeId);
     this.queue = Validate.notNull(queue);
+    this.queue.setConsumer(this);
     this.sourceHandleListener = Validate.notNull(sourceHandleListener);
     this.threadName =
         createFullIdFrom(localFragmentInstanceId, localPlanNodeId + "." + 
"SourceHandle");
@@ -88,9 +89,7 @@ public class LocalSourceHandle implements ISourceHandle {
       synchronized (this) {
         tsBlock = queue.remove();
       }
-      if (isFinished()) {
-        sourceHandleListener.onFinished(this);
-      }
+      checkAndInvokeOnFinished();
       return tsBlock;
     }
   }
@@ -100,6 +99,12 @@ public class LocalSourceHandle implements ISourceHandle {
     return queue.hasNoMoreTsBlocks() && queue.isEmpty();
   }
 
+  public void checkAndInvokeOnFinished() {
+    if (isFinished()) {
+      sourceHandleListener.onFinished(this);
+    }
+  }
+
   @Override
   public ListenableFuture<Void> isBlocked() {
     if (aborted) {
diff --git 
a/server/src/main/java/org/apache/iotdb/db/mpp/execution/datatransfer/SharedTsBlockQueue.java
 
b/server/src/main/java/org/apache/iotdb/db/mpp/execution/datatransfer/SharedTsBlockQueue.java
index 3d991b1aa9..f66ed30ebd 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/mpp/execution/datatransfer/SharedTsBlockQueue.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/mpp/execution/datatransfer/SharedTsBlockQueue.java
@@ -55,6 +55,8 @@ public class SharedTsBlockQueue {
   @GuardedBy("this")
   private boolean destroyed = false;
 
+  private LocalSourceHandle consumer;
+
   public SharedTsBlockQueue(
       TFragmentInstanceId fragmentInstanceId, LocalMemoryManager 
localMemoryManager) {
     this.localFragmentInstanceId =
@@ -79,12 +81,19 @@ public class SharedTsBlockQueue {
     return queue.isEmpty();
   }
 
+  public void setConsumer(LocalSourceHandle consumer) {
+    this.consumer = consumer;
+  }
+
   /** Notify no more tsblocks will be added to the queue. */
   public synchronized void setNoMoreTsBlocks(boolean noMoreTsBlocks) {
     if (destroyed) {
       throw new IllegalStateException("queue has been destroyed");
     }
     this.noMoreTsBlocks = noMoreTsBlocks;
+    if (this.consumer != null) {
+      this.consumer.checkAndInvokeOnFinished();
+    }
   }
 
   /**

Reply via email to