This is an automated email from the ASF dual-hosted git repository.

jackietien pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/master by this push:
     new 57428ae2b9 [IOTDB-4199] Fix NPE in LocalSourceHandle and memory leak 
in SourceHandle (#7082)
57428ae2b9 is described below

commit 57428ae2b999154b39a496f908db26ce5d367baa
Author: Jackie Tien <[email protected]>
AuthorDate: Tue Aug 23 10:13:51 2022 +0800

    [IOTDB-4199] Fix NPE in LocalSourceHandle and memory leak in SourceHandle 
(#7082)
---
 .../mpp/execution/exchange/SharedTsBlockQueue.java | 38 +++++++++++++++++-----
 .../db/mpp/execution/exchange/SourceHandle.java    | 10 +++---
 2 files changed, 34 insertions(+), 14 deletions(-)

diff --git 
a/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/SharedTsBlockQueue.java
 
b/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/SharedTsBlockQueue.java
index aacffe4796..1222ee5ef0 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/SharedTsBlockQueue.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/SharedTsBlockQueue.java
@@ -22,6 +22,7 @@ package org.apache.iotdb.db.mpp.execution.exchange;
 import org.apache.iotdb.db.mpp.execution.memory.LocalMemoryManager;
 import org.apache.iotdb.mpp.rpc.thrift.TFragmentInstanceId;
 import org.apache.iotdb.tsfile.read.common.block.TsBlock;
+import org.apache.iotdb.tsfile.utils.Pair;
 
 import com.google.common.util.concurrent.ListenableFuture;
 import com.google.common.util.concurrent.SettableFuture;
@@ -34,6 +35,9 @@ import javax.annotation.concurrent.NotThreadSafe;
 import java.util.LinkedList;
 import java.util.Queue;
 
+import static com.google.common.util.concurrent.Futures.immediateVoidFuture;
+import static com.google.common.util.concurrent.MoreExecutors.directExecutor;
+
 /** This is not thread safe class, the caller should ensure multi-threads 
safety. */
 @NotThreadSafe
 public class SharedTsBlockQueue {
@@ -94,7 +98,8 @@ public class SharedTsBlockQueue {
   public void setNoMoreTsBlocks(boolean noMoreTsBlocks) {
     logger.info("SharedTsBlockQueue receive no more TsBlocks signal.");
     if (closed) {
-      throw new IllegalStateException("queue has been destroyed");
+      logger.warn("queue has been destroyed");
+      return;
     }
     this.noMoreTsBlocks = noMoreTsBlocks;
     if (!blocked.isDone()) {
@@ -135,21 +140,38 @@ public class SharedTsBlockQueue {
    */
   public ListenableFuture<Void> add(TsBlock tsBlock) {
     if (closed) {
-      throw new IllegalStateException("queue has been destroyed");
+      logger.warn("queue has been destroyed");
+      return immediateVoidFuture();
     }
 
     Validate.notNull(tsBlock, "TsBlock cannot be null");
     Validate.isTrue(blockedOnMemory == null || blockedOnMemory.isDone(), 
"queue is full");
-    blockedOnMemory =
+    Pair<ListenableFuture<Void>, Boolean> pair =
         localMemoryManager
             .getQueryPool()
-            .reserve(localFragmentInstanceId.getQueryId(), 
tsBlock.getRetainedSizeInBytes())
-            .left;
+            .reserve(localFragmentInstanceId.getQueryId(), 
tsBlock.getRetainedSizeInBytes());
+    blockedOnMemory = pair.left;
     bufferRetainedSizeInBytes += tsBlock.getRetainedSizeInBytes();
-    queue.add(tsBlock);
-    if (!blocked.isDone()) {
-      blocked.set(null);
+
+    // reserve memory failed, we should wait until there is enough memory
+    if (!pair.right) {
+      blockedOnMemory.addListener(
+          () -> {
+            synchronized (this) {
+              queue.add(tsBlock);
+              if (!blocked.isDone()) {
+                blocked.set(null);
+              }
+            }
+          },
+          directExecutor());
+    } else { // reserve memory succeeded, add the TsBlock directly
+      queue.add(tsBlock);
+      if (!blocked.isDone()) {
+        blocked.set(null);
+      }
     }
+
     return blockedOnMemory;
   }
 
diff --git 
a/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/SourceHandle.java
 
b/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/SourceHandle.java
index 1e4fa305e4..c0a353f8b9 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/SourceHandle.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/SourceHandle.java
@@ -129,13 +129,11 @@ public class SourceHandle implements ISourceHandle {
       if (tsBlock == null) {
         return null;
       }
-      logger.info(
-          "Receive {} TsBlock, size is {}", currSequenceId, 
tsBlock.getRetainedSizeInBytes());
+      long retainedSize = sequenceIdToDataBlockSize.remove(currSequenceId);
+      logger.info("Receive {} TsBlock, size is {}", currSequenceId, 
retainedSize);
       currSequenceId += 1;
-      bufferRetainedSizeInBytes -= tsBlock.getRetainedSizeInBytes();
-      localMemoryManager
-          .getQueryPool()
-          .free(localFragmentInstanceId.getQueryId(), 
tsBlock.getRetainedSizeInBytes());
+      bufferRetainedSizeInBytes -= retainedSize;
+      
localMemoryManager.getQueryPool().free(localFragmentInstanceId.getQueryId(), 
retainedSize);
 
       if (sequenceIdToTsBlock.isEmpty() && !isFinished()) {
         logger.info("no buffered TsBlock, blocked");

Reply via email to