This is an automated email from the ASF dual-hosted git repository. xingtanzjr pushed a commit to branch xingtanzjr/fix_local_sink in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit 072c82ee85fd87cd6420e9d24737c627c346d751 Author: Jinrui.Zhang <[email protected]> AuthorDate: Wed Jun 15 16:49:40 2022 +0800 fix the issues in LocalSinkHandle/LocalSourceHandle --- .../execution/datatransfer/LocalSinkHandle.java | 11 +++---- .../execution/datatransfer/LocalSourceHandle.java | 18 ++++++++--- .../execution/datatransfer/SharedTsBlockQueue.java | 36 +++++++++++++++++++++- 3 files changed, 54 insertions(+), 11 deletions(-) diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/datatransfer/LocalSinkHandle.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/datatransfer/LocalSinkHandle.java index d1a8ac33f1..a16edf5573 100644 --- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/datatransfer/LocalSinkHandle.java +++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/datatransfer/LocalSinkHandle.java @@ -99,6 +99,7 @@ public class LocalSinkHandle implements ISinkHandle { if (queue.hasNoMoreTsBlocks()) { return; } + logger.info("send TsBlocks. Size: {}", tsBlocks.size()); for (TsBlock tsBlock : tsBlocks) { blocked = queue.add(tsBlock); } @@ -111,23 +112,21 @@ public class LocalSinkHandle implements ISinkHandle { @Override public synchronized void setNoMoreTsBlocks() { - logger.info("Set no-more-tsblocks."); + logger.info("set noMoreTsBlocks."); if (aborted) { return; } queue.setNoMoreTsBlocks(true); sinkHandleListener.onEndOfBlocks(this); - if (isFinished()) { - sinkHandleListener.onFinish(this); - } - logger.info("No-more-tsblocks has been set."); + sinkHandleListener.onFinish(this); + logger.info("noMoreTsBlocks has been set."); } @Override public synchronized void abort() { logger.info("Sink handle is being aborted."); aborted = true; - queue.destroy(); + queue.producerFinished(); sinkHandleListener.onAborted(this); logger.info("Sink handle is aborted"); } diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/datatransfer/LocalSourceHandle.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/datatransfer/LocalSourceHandle.java index 14a0ebc664..768797ba14 100644 --- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/datatransfer/LocalSourceHandle.java +++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/datatransfer/LocalSourceHandle.java @@ -55,6 +55,7 @@ public class LocalSourceHandle implements ISourceHandle { this.localFragmentInstanceId = Validate.notNull(localFragmentInstanceId); this.localPlanNodeId = Validate.notNull(localPlanNodeId); this.queue = Validate.notNull(queue); + this.queue.setConsumer(this); this.sourceHandleListener = Validate.notNull(sourceHandleListener); this.threadName = createFullIdFrom(localFragmentInstanceId, localPlanNodeId + "." + "SourceHandle"); @@ -88,9 +89,7 @@ public class LocalSourceHandle implements ISourceHandle { synchronized (this) { tsBlock = queue.remove(); } - if (isFinished()) { - sourceHandleListener.onFinished(this); - } + checkAndInvokeOnFinished(); return tsBlock; } } @@ -100,6 +99,17 @@ public class LocalSourceHandle implements ISourceHandle { return queue.hasNoMoreTsBlocks() && queue.isEmpty(); } + public void checkAndInvokeOnFinished() { + if (isFinished()) { + // Putting synchronized here rather than marking in method is to avoid deadlock. + // There are two locks need to invoke this method. One is lock of SharedTsBlockQueue, + // the other is lock of LocalSourceHandle. + synchronized(this) { + sourceHandleListener.onFinished(this); + } + } + } + @Override public ListenableFuture<Void> isBlocked() { if (aborted) { @@ -119,7 +129,7 @@ public class LocalSourceHandle implements ISourceHandle { if (aborted) { return; } - queue.destroy(); + queue.consumerFinished(); aborted = true; sourceHandleListener.onAborted(this); } diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/datatransfer/SharedTsBlockQueue.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/datatransfer/SharedTsBlockQueue.java index 3d991b1aa9..93c2ff4c9d 100644 --- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/datatransfer/SharedTsBlockQueue.java +++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/datatransfer/SharedTsBlockQueue.java @@ -26,6 +26,8 @@ import org.apache.iotdb.tsfile.read.common.block.TsBlock; import com.google.common.util.concurrent.ListenableFuture; import com.google.common.util.concurrent.SettableFuture; import org.apache.commons.lang3.Validate; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import javax.annotation.concurrent.GuardedBy; @@ -34,6 +36,8 @@ import java.util.Queue; public class SharedTsBlockQueue { + private static final Logger logger = LoggerFactory.getLogger(SharedTsBlockQueue.class); + private final TFragmentInstanceId localFragmentInstanceId; private final LocalMemoryManager localMemoryManager; @@ -55,6 +59,10 @@ public class SharedTsBlockQueue { @GuardedBy("this") private boolean destroyed = false; + private LocalSourceHandle consumer; + private boolean consumerFinished; + private boolean producerFinished; + public SharedTsBlockQueue( TFragmentInstanceId fragmentInstanceId, LocalMemoryManager localMemoryManager) { this.localFragmentInstanceId = @@ -79,12 +87,22 @@ public class SharedTsBlockQueue { return queue.isEmpty(); } + public void setConsumer(LocalSourceHandle consumer) { + this.consumer = consumer; + } + /** Notify no more tsblocks will be added to the queue. */ public synchronized void setNoMoreTsBlocks(boolean noMoreTsBlocks) { if (destroyed) { throw new IllegalStateException("queue has been destroyed"); } this.noMoreTsBlocks = noMoreTsBlocks; + if (!blocked.isDone()) { + blocked.set(null); + } + if (this.consumer != null) { + this.consumer.checkAndInvokeOnFinished(); + } } /** @@ -115,7 +133,7 @@ public class SharedTsBlockQueue { throw new IllegalStateException("queue has been destroyed"); } - Validate.notNull(tsBlock, "tsblock cannot be null"); + Validate.notNull(tsBlock, "TsBlock cannot be null"); Validate.isTrue(blockedOnMemory == null || blockedOnMemory.isDone(), "queue is full"); blockedOnMemory = localMemoryManager @@ -129,6 +147,22 @@ public class SharedTsBlockQueue { return blockedOnMemory; } + public synchronized void consumerFinished() { + this.consumerFinished = true; + tryDestroy(); + } + + public synchronized void producerFinished() { + this.producerFinished = true; + tryDestroy(); + } + + private void tryDestroy() { + if (this.consumerFinished && this.producerFinished) { + destroy(); + } + } + /** Destroy the queue and cancel the future. */ public synchronized void destroy() { if (destroyed) {
