This is an automated email from the ASF dual-hosted git repository.

jackietien pushed a commit to branch rel/1.1
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/rel/1.1 by this push:
     new 174ab624147 [To rel/1.1][IOTDB-5963] Make sure that TsBlock blocked on 
memory is added in queue before the next TsBlock returned by root operator
174ab624147 is described below

commit 174ab6241474348c119eff950de1ca860b818b35
Author: Liao Lanyu <[email protected]>
AuthorDate: Fri Jun 2 13:42:51 2023 +0800

    [To rel/1.1][IOTDB-5963] Make sure that TsBlock blocked on memory is added 
in queue before the next TsBlock returned by root operator
---
 .../apache/iotdb/db/mpp/execution/exchange/SharedTsBlockQueue.java  | 6 ++++--
 1 file changed, 4 insertions(+), 2 deletions(-)

diff --git 
a/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/SharedTsBlockQueue.java
 
b/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/SharedTsBlockQueue.java
index 6566d767d43..66707cae90c 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/SharedTsBlockQueue.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/SharedTsBlockQueue.java
@@ -224,6 +224,7 @@ public class SharedTsBlockQueue {
 
     // reserve memory failed, we should wait until there is enough memory
     if (!pair.right) {
+      SettableFuture<Void> channelBlocked = SettableFuture.create();
       blockedOnMemory.addListener(
           () -> {
             synchronized (this) {
@@ -231,6 +232,7 @@ public class SharedTsBlockQueue {
               if (!blocked.isDone()) {
                 blocked.set(null);
               }
+              channelBlocked.set(null);
             }
           },
           // Use directExecutor() here could lead to deadlock. Thread A holds 
lock of
@@ -239,14 +241,14 @@ public class SharedTsBlockQueue {
           // Thread B holds lock of SharedTsBlockQueueB and tries to invoke 
the listener of
           // SharedTsBlockQueueA
           executorService);
+      return channelBlocked;
     } else { // reserve memory succeeded, add the TsBlock directly
       queue.add(tsBlock);
       if (!blocked.isDone()) {
         blocked.set(null);
       }
+      return blockedOnMemory;
     }
-
-    return blockedOnMemory;
   }
 
   /** Destroy the queue and complete the future. Should only be called in 
normal case */

Reply via email to