This is an automated email from the ASF dual-hosted git repository.

dataroaring pushed a commit to branch branch-2.0
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/branch-2.0 by this push:
     new f55dd2919bc [Fix](load) Fix the channel leak when close wait has been 
cancelled #38031 (#38186)
f55dd2919bc is described below

commit f55dd2919bcc8e32825a09ec4f2530b76bb02414
Author: Xin Liao <[email protected]>
AuthorDate: Mon Jul 22 17:19:38 2024 +0800

    [Fix](load) Fix the channel leak when close wait has been cancelled #38031 
(#38186)
    
    cherry pick from #38031
---
 be/src/vec/sink/vtablet_sink.cpp | 14 ++++++--------
 1 file changed, 6 insertions(+), 8 deletions(-)

diff --git a/be/src/vec/sink/vtablet_sink.cpp b/be/src/vec/sink/vtablet_sink.cpp
index c684d251a3c..4bf4cb0ff09 100644
--- a/be/src/vec/sink/vtablet_sink.cpp
+++ b/be/src/vec/sink/vtablet_sink.cpp
@@ -865,12 +865,6 @@ bool VNodeChannel::is_send_data_rpc_done() const {
 
 Status VNodeChannel::close_wait(RuntimeState* state) {
     SCOPED_CONSUME_MEM_TRACKER(_node_channel_tracker.get());
-    // set _is_closed to true finally
-    Defer set_closed {[&]() {
-        std::lock_guard<std::mutex> l(_closed_lock);
-        _is_closed = true;
-    }};
-
     auto st = none_of({_cancelled, !_eos_is_produced});
     if (!st.ok()) {
         if (_cancelled) {
@@ -891,8 +885,8 @@ Status VNodeChannel::close_wait(RuntimeState* state) {
     }
     _close_time_ms = UnixMillis() - _close_time_ms;
 
-    if (_cancelled || state->is_cancelled()) {
-        cancel(state->cancel_reason());
+    if (state->is_cancelled()) {
+        _cancel_with_msg(state->cancel_reason());
     }
 
     if (_add_batches_finished) {
@@ -904,6 +898,10 @@ Status VNodeChannel::close_wait(RuntimeState* state) {
         _index_channel->set_error_tablet_in_state(state);
         _index_channel->set_tablets_received_rows(_tablets_received_rows, 
_node_id);
         _index_channel->set_tablets_filtered_rows(_tablets_filtered_rows, 
_node_id);
+        std::lock_guard<std::mutex> l(_closed_lock);
+        // only when normal close, we set _is_closed to true.
+        // otherwise, we will set it to true in cancel().
+        _is_closed = true;
         return Status::OK();
     }
 


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to