This is an automated email from the ASF dual-hosted git repository.
yiguolei pushed a commit to branch branch-2.1
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-2.1 by this push:
new ffc0d6884d4 [Fix](load) Fix the channel leak when close wait has been
cancelled #38031 (#38125)
ffc0d6884d4 is described below
commit ffc0d6884d42eb296ff5be155176dc8ffb711291
Author: Xin Liao <[email protected]>
AuthorDate: Fri Jul 19 22:58:54 2024 +0800
[Fix](load) Fix the channel leak when close wait has been cancelled #38031
(#38125)
cherry pick from #38031
---
be/src/vec/sink/writer/vtablet_writer.cpp | 14 +++++++-------
1 file changed, 7 insertions(+), 7 deletions(-)
diff --git a/be/src/vec/sink/writer/vtablet_writer.cpp
b/be/src/vec/sink/writer/vtablet_writer.cpp
index 487bd60b838..51e80a615e3 100644
--- a/be/src/vec/sink/writer/vtablet_writer.cpp
+++ b/be/src/vec/sink/writer/vtablet_writer.cpp
@@ -878,11 +878,6 @@ void VNodeChannel::cancel(const std::string& cancel_msg) {
Status VNodeChannel::close_wait(RuntimeState* state) {
DBUG_EXECUTE_IF("VNodeChannel.close_wait_full_gc", {
MemoryReclamation::process_full_gc(); });
SCOPED_CONSUME_MEM_TRACKER(_node_channel_tracker.get());
- // set _is_closed to true finally
- Defer set_closed {[&]() {
- std::lock_guard<std::mutex> l(_closed_lock);
- _is_closed = true;
- }};
auto st = none_of({_cancelled, !_eos_is_produced});
if (!st.ok()) {
@@ -906,8 +901,8 @@ Status VNodeChannel::close_wait(RuntimeState* state) {
VLOG_CRITICAL << _parent->_sender_id << " close wait finished";
_close_time_ms = UnixMillis() - _close_time_ms;
- if (_cancelled || state->is_cancelled()) {
- cancel(state->cancel_reason());
+ if (state->is_cancelled()) {
+ _cancel_with_msg(state->cancel_reason());
}
if (_add_batches_finished) {
@@ -919,6 +914,11 @@ Status VNodeChannel::close_wait(RuntimeState* state) {
_index_channel->set_error_tablet_in_state(state);
_index_channel->set_tablets_received_rows(_tablets_received_rows,
_node_id);
_index_channel->set_tablets_filtered_rows(_tablets_filtered_rows,
_node_id);
+
+ std::lock_guard<std::mutex> l(_closed_lock);
+ // only when normal close, we set _is_closed to true.
+ // otherwise, we will set it to true in cancel().
+ _is_closed = true;
return Status::OK();
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]