This is an automated email from the ASF dual-hosted git repository.
xingtanzjr pushed a commit to branch xingtanzjr/fix_local_sink
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/xingtanzjr/fix_local_sink by
this push:
new 4c0ec7b4b6 fix the bug that SourceHandle's finish won't be triggered
4c0ec7b4b6 is described below
commit 4c0ec7b4b6fc5faa390cba334c3111b125bfd535
Author: Jinrui.Zhang <[email protected]>
AuthorDate: Wed Jun 15 11:45:04 2022 +0800
fix the bug that SourceHandle's finish won't be triggered
---
.../db/mpp/execution/datatransfer/LocalSourceHandle.java | 11 ++++++++---
.../db/mpp/execution/datatransfer/SharedTsBlockQueue.java | 9 +++++++++
2 files changed, 17 insertions(+), 3 deletions(-)
diff --git
a/server/src/main/java/org/apache/iotdb/db/mpp/execution/datatransfer/LocalSourceHandle.java
b/server/src/main/java/org/apache/iotdb/db/mpp/execution/datatransfer/LocalSourceHandle.java
index 14a0ebc664..463db59e06 100644
---
a/server/src/main/java/org/apache/iotdb/db/mpp/execution/datatransfer/LocalSourceHandle.java
+++
b/server/src/main/java/org/apache/iotdb/db/mpp/execution/datatransfer/LocalSourceHandle.java
@@ -55,6 +55,7 @@ public class LocalSourceHandle implements ISourceHandle {
this.localFragmentInstanceId = Validate.notNull(localFragmentInstanceId);
this.localPlanNodeId = Validate.notNull(localPlanNodeId);
this.queue = Validate.notNull(queue);
+ this.queue.setConsumer(this);
this.sourceHandleListener = Validate.notNull(sourceHandleListener);
this.threadName =
createFullIdFrom(localFragmentInstanceId, localPlanNodeId + "." +
"SourceHandle");
@@ -88,9 +89,7 @@ public class LocalSourceHandle implements ISourceHandle {
synchronized (this) {
tsBlock = queue.remove();
}
- if (isFinished()) {
- sourceHandleListener.onFinished(this);
- }
+ checkAndInvokeOnFinished();
return tsBlock;
}
}
@@ -100,6 +99,12 @@ public class LocalSourceHandle implements ISourceHandle {
return queue.hasNoMoreTsBlocks() && queue.isEmpty();
}
+ public void checkAndInvokeOnFinished() {
+ if (isFinished()) {
+ sourceHandleListener.onFinished(this);
+ }
+ }
+
@Override
public ListenableFuture<Void> isBlocked() {
if (aborted) {
diff --git
a/server/src/main/java/org/apache/iotdb/db/mpp/execution/datatransfer/SharedTsBlockQueue.java
b/server/src/main/java/org/apache/iotdb/db/mpp/execution/datatransfer/SharedTsBlockQueue.java
index 3d991b1aa9..f66ed30ebd 100644
---
a/server/src/main/java/org/apache/iotdb/db/mpp/execution/datatransfer/SharedTsBlockQueue.java
+++
b/server/src/main/java/org/apache/iotdb/db/mpp/execution/datatransfer/SharedTsBlockQueue.java
@@ -55,6 +55,8 @@ public class SharedTsBlockQueue {
@GuardedBy("this")
private boolean destroyed = false;
+ private LocalSourceHandle consumer;
+
public SharedTsBlockQueue(
TFragmentInstanceId fragmentInstanceId, LocalMemoryManager
localMemoryManager) {
this.localFragmentInstanceId =
@@ -79,12 +81,19 @@ public class SharedTsBlockQueue {
return queue.isEmpty();
}
+ public void setConsumer(LocalSourceHandle consumer) {
+ this.consumer = consumer;
+ }
+
/** Notify no more tsblocks will be added to the queue. */
public synchronized void setNoMoreTsBlocks(boolean noMoreTsBlocks) {
if (destroyed) {
throw new IllegalStateException("queue has been destroyed");
}
this.noMoreTsBlocks = noMoreTsBlocks;
+ if (this.consumer != null) {
+ this.consumer.checkAndInvokeOnFinished();
+ }
}
/**