This is an automated email from the ASF dual-hosted git repository.
jackietien pushed a commit to branch rel/1.1
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/rel/1.1 by this push:
new 174ab624147 [To rel/1.1][IOTDB-5963] Make sure that TsBlock blocked on
memory is added in queue before the next TsBlock returned by root operator
174ab624147 is described below
commit 174ab6241474348c119eff950de1ca860b818b35
Author: Liao Lanyu <[email protected]>
AuthorDate: Fri Jun 2 13:42:51 2023 +0800
[To rel/1.1][IOTDB-5963] Make sure that TsBlock blocked on memory is added
in queue before the next TsBlock returned by root operator
---
.../apache/iotdb/db/mpp/execution/exchange/SharedTsBlockQueue.java | 6 ++++--
1 file changed, 4 insertions(+), 2 deletions(-)
diff --git
a/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/SharedTsBlockQueue.java
b/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/SharedTsBlockQueue.java
index 6566d767d43..66707cae90c 100644
---
a/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/SharedTsBlockQueue.java
+++
b/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/SharedTsBlockQueue.java
@@ -224,6 +224,7 @@ public class SharedTsBlockQueue {
// reserve memory failed, we should wait until there is enough memory
if (!pair.right) {
+ SettableFuture<Void> channelBlocked = SettableFuture.create();
blockedOnMemory.addListener(
() -> {
synchronized (this) {
@@ -231,6 +232,7 @@ public class SharedTsBlockQueue {
if (!blocked.isDone()) {
blocked.set(null);
}
+ channelBlocked.set(null);
}
},
// Use directExecutor() here could lead to deadlock. Thread A holds
lock of
@@ -239,14 +241,14 @@ public class SharedTsBlockQueue {
// Thread B holds lock of SharedTsBlockQueueB and tries to invoke
the listener of
// SharedTsBlockQueueA
executorService);
+ return channelBlocked;
} else { // reserve memory succeeded, add the TsBlock directly
queue.add(tsBlock);
if (!blocked.isDone()) {
blocked.set(null);
}
+ return blockedOnMemory;
}
-
- return blockedOnMemory;
}
/** Destroy the queue and complete the future. Should only be called in
normal case */