This is an automated email from the ASF dual-hosted git repository. jackietien pushed a commit to branch LocalSourceHandleNPE in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit 08b454b028006197cf8973b1a707a9ed0bffed69 Author: JackieTien97 <[email protected]> AuthorDate: Mon Aug 22 16:14:13 2022 +0800 [IOTDB-4199] Fix NPE in LocalSourceHandle and memory leak in SourceHandle --- .../mpp/execution/exchange/SharedTsBlockQueue.java | 38 +++++++++++++++++----- .../db/mpp/execution/exchange/SourceHandle.java | 10 +++--- 2 files changed, 34 insertions(+), 14 deletions(-) diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/SharedTsBlockQueue.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/SharedTsBlockQueue.java index aacffe4796..1222ee5ef0 100644 --- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/SharedTsBlockQueue.java +++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/SharedTsBlockQueue.java @@ -22,6 +22,7 @@ package org.apache.iotdb.db.mpp.execution.exchange; import org.apache.iotdb.db.mpp.execution.memory.LocalMemoryManager; import org.apache.iotdb.mpp.rpc.thrift.TFragmentInstanceId; import org.apache.iotdb.tsfile.read.common.block.TsBlock; +import org.apache.iotdb.tsfile.utils.Pair; import com.google.common.util.concurrent.ListenableFuture; import com.google.common.util.concurrent.SettableFuture; @@ -34,6 +35,9 @@ import javax.annotation.concurrent.NotThreadSafe; import java.util.LinkedList; import java.util.Queue; +import static com.google.common.util.concurrent.Futures.immediateVoidFuture; +import static com.google.common.util.concurrent.MoreExecutors.directExecutor; + /** This is not thread safe class, the caller should ensure multi-threads safety. */ @NotThreadSafe public class SharedTsBlockQueue { @@ -94,7 +98,8 @@ public class SharedTsBlockQueue { public void setNoMoreTsBlocks(boolean noMoreTsBlocks) { logger.info("SharedTsBlockQueue receive no more TsBlocks signal."); if (closed) { - throw new IllegalStateException("queue has been destroyed"); + logger.warn("queue has been destroyed"); + return; } this.noMoreTsBlocks = noMoreTsBlocks; if (!blocked.isDone()) { @@ -135,21 +140,38 @@ public class SharedTsBlockQueue { */ public ListenableFuture<Void> add(TsBlock tsBlock) { if (closed) { - throw new IllegalStateException("queue has been destroyed"); + logger.warn("queue has been destroyed"); + return immediateVoidFuture(); } Validate.notNull(tsBlock, "TsBlock cannot be null"); Validate.isTrue(blockedOnMemory == null || blockedOnMemory.isDone(), "queue is full"); - blockedOnMemory = + Pair<ListenableFuture<Void>, Boolean> pair = localMemoryManager .getQueryPool() - .reserve(localFragmentInstanceId.getQueryId(), tsBlock.getRetainedSizeInBytes()) - .left; + .reserve(localFragmentInstanceId.getQueryId(), tsBlock.getRetainedSizeInBytes()); + blockedOnMemory = pair.left; bufferRetainedSizeInBytes += tsBlock.getRetainedSizeInBytes(); - queue.add(tsBlock); - if (!blocked.isDone()) { - blocked.set(null); + + // reserve memory failed, we should wait until there is enough memory + if (!pair.right) { + blockedOnMemory.addListener( + () -> { + synchronized (this) { + queue.add(tsBlock); + if (!blocked.isDone()) { + blocked.set(null); + } + } + }, + directExecutor()); + } else { // reserve memory succeeded, add the TsBlock directly + queue.add(tsBlock); + if (!blocked.isDone()) { + blocked.set(null); + } } + return blockedOnMemory; } diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/SourceHandle.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/SourceHandle.java index 1e4fa305e4..c0a353f8b9 100644 --- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/SourceHandle.java +++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/SourceHandle.java @@ -129,13 +129,11 @@ public class SourceHandle implements ISourceHandle { if (tsBlock == null) { return null; } - logger.info( - "Receive {} TsBlock, size is {}", currSequenceId, tsBlock.getRetainedSizeInBytes()); + long retainedSize = sequenceIdToDataBlockSize.remove(currSequenceId); + logger.info("Receive {} TsBlock, size is {}", currSequenceId, retainedSize); currSequenceId += 1; - bufferRetainedSizeInBytes -= tsBlock.getRetainedSizeInBytes(); - localMemoryManager - .getQueryPool() - .free(localFragmentInstanceId.getQueryId(), tsBlock.getRetainedSizeInBytes()); + bufferRetainedSizeInBytes -= retainedSize; + localMemoryManager.getQueryPool().free(localFragmentInstanceId.getQueryId(), retainedSize); if (sequenceIdToTsBlock.isEmpty() && !isFinished()) { logger.info("no buffered TsBlock, blocked");
