This is an automated email from the ASF dual-hosted git repository.
jackietien pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new fb02f12401b [IOTDB-5963] Make sure that TsBlock blocked on memory is
added in queue before the next TsBlock returned by root operator[IOTDB-5963]
Make sure that TsBlock blocked on memory is added in queue before the next
TsBlock returned by root operator[IOTDB-5963] Make sure that TsBlock blocked on
memory is added in queue before the next TsBlock returned by root operator
fb02f12401b is described below
commit fb02f12401be1f528044b42ed677aadb39e6c27d
Author: Liao Lanyu <[email protected]>
AuthorDate: Fri Jun 2 09:13:48 2023 +0800
[IOTDB-5963] Make sure that TsBlock blocked on memory is added in queue
before the next TsBlock returned by root operator[IOTDB-5963] Make sure that
TsBlock blocked on memory is added in queue before the next TsBlock returned by
root operator[IOTDB-5963] Make sure that TsBlock blocked on memory is added in
queue before the next TsBlock returned by root operator
---
.../apache/iotdb/db/mpp/execution/exchange/SharedTsBlockQueue.java | 6 ++++--
1 file changed, 4 insertions(+), 2 deletions(-)
diff --git
a/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/SharedTsBlockQueue.java
b/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/SharedTsBlockQueue.java
index 905199f76e6..9c3ac0f5e74 100644
---
a/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/SharedTsBlockQueue.java
+++
b/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/SharedTsBlockQueue.java
@@ -231,6 +231,7 @@ public class SharedTsBlockQueue {
// reserve memory failed, we should wait until there is enough memory
if (!pair.right) {
+ SettableFuture<Void> channelBlocked = SettableFuture.create();
blockedOnMemory.addListener(
() -> {
synchronized (this) {
@@ -238,6 +239,7 @@ public class SharedTsBlockQueue {
if (!blocked.isDone()) {
blocked.set(null);
}
+ channelBlocked.set(null);
}
},
// Use directExecutor() here could lead to deadlock. Thread A holds
lock of
@@ -246,14 +248,14 @@ public class SharedTsBlockQueue {
// Thread B holds lock of SharedTsBlockQueueB and tries to invoke
the listener of
// SharedTsBlockQueueA
executorService);
+ return channelBlocked;
} else { // reserve memory succeeded, add the TsBlock directly
queue.add(tsBlock);
if (!blocked.isDone()) {
blocked.set(null);
}
+ return blockedOnMemory;
}
-
- return blockedOnMemory;
}
/** Destroy the queue and complete the future. Should only be called in
normal case */