This is an automated email from the ASF dual-hosted git repository.

jackietien pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/master by this push:
     new fb02f12401b [IOTDB-5963] Make sure that TsBlock blocked on memory is 
added in queue before the next TsBlock returned by root operator[IOTDB-5963] 
Make sure that TsBlock blocked on memory is added in queue before the next 
TsBlock returned by root operator[IOTDB-5963] Make sure that TsBlock blocked on 
memory is added in queue before the next TsBlock returned by root operator
fb02f12401b is described below

commit fb02f12401be1f528044b42ed677aadb39e6c27d
Author: Liao Lanyu <[email protected]>
AuthorDate: Fri Jun 2 09:13:48 2023 +0800

    [IOTDB-5963] Make sure that TsBlock blocked on memory is added in queue 
before the next TsBlock returned by root operator[IOTDB-5963] Make sure that 
TsBlock blocked on memory is added in queue before the next TsBlock returned by 
root operator[IOTDB-5963] Make sure that TsBlock blocked on memory is added in 
queue before the next TsBlock returned by root operator
---
 .../apache/iotdb/db/mpp/execution/exchange/SharedTsBlockQueue.java  | 6 ++++--
 1 file changed, 4 insertions(+), 2 deletions(-)

diff --git 
a/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/SharedTsBlockQueue.java
 
b/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/SharedTsBlockQueue.java
index 905199f76e6..9c3ac0f5e74 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/SharedTsBlockQueue.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/SharedTsBlockQueue.java
@@ -231,6 +231,7 @@ public class SharedTsBlockQueue {
 
     // reserve memory failed, we should wait until there is enough memory
     if (!pair.right) {
+      SettableFuture<Void> channelBlocked = SettableFuture.create();
       blockedOnMemory.addListener(
           () -> {
             synchronized (this) {
@@ -238,6 +239,7 @@ public class SharedTsBlockQueue {
               if (!blocked.isDone()) {
                 blocked.set(null);
               }
+              channelBlocked.set(null);
             }
           },
           // Use directExecutor() here could lead to deadlock. Thread A holds 
lock of
@@ -246,14 +248,14 @@ public class SharedTsBlockQueue {
           // Thread B holds lock of SharedTsBlockQueueB and tries to invoke 
the listener of
           // SharedTsBlockQueueA
           executorService);
+      return channelBlocked;
     } else { // reserve memory succeeded, add the TsBlock directly
       queue.add(tsBlock);
       if (!blocked.isDone()) {
         blocked.set(null);
       }
+      return blockedOnMemory;
     }
-
-    return blockedOnMemory;
   }
 
   /** Destroy the queue and complete the future. Should only be called in 
normal case */

Reply via email to