This is an automated email from the ASF dual-hosted git repository.
dataroaring pushed a commit to branch branch-3.0
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-3.0 by this push:
new 33ec829128e [fix](pipeline) Fix mem control in local exchanger (#38885)
33ec829128e is described below
commit 33ec829128e352b5a3bdd3ab21b0dc6bcd589643
Author: Gabriel <[email protected]>
AuthorDate: Tue Aug 6 09:59:35 2024 +0800
[fix](pipeline) Fix mem control in local exchanger (#38885)
## Proposed changes
If a block (>128M) is dequeue by local exchange source operator and it
is the last block, both of source operators and sink operators will be
hang. This PR fixed it.
<!--Describe your changes.-->
---
be/src/pipeline/dependency.h | 4 ++--
1 file changed, 2 insertions(+), 2 deletions(-)
diff --git a/be/src/pipeline/dependency.h b/be/src/pipeline/dependency.h
index 1e29cf904c7..9c6f9d85dd8 100644
--- a/be/src/pipeline/dependency.h
+++ b/be/src/pipeline/dependency.h
@@ -868,13 +868,13 @@ public:
}
void add_total_mem_usage(size_t delta) {
- if (mem_usage.fetch_add(delta) >
config::local_exchange_buffer_mem_limit) {
+ if (mem_usage.fetch_add(delta) + delta >
config::local_exchange_buffer_mem_limit) {
sink_deps.front()->block();
}
}
void sub_total_mem_usage(size_t delta) {
- if (mem_usage.fetch_sub(delta) <=
config::local_exchange_buffer_mem_limit) {
+ if (mem_usage.fetch_sub(delta) - delta <=
config::local_exchange_buffer_mem_limit) {
sink_deps.front()->set_ready();
}
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]