This is an automated email from the ASF dual-hosted git repository.

yiguolei pushed a commit to branch branch-2.1
in repository https://gitbox.apache.org/repos/asf/doris.git

commit 84e7ed6d6bb994d80f9753069a76bd38c9e14071
Author: Xin Liao <[email protected]>
AuthorDate: Tue Feb 6 19:35:58 2024 +0800

    [Fix](load) fix load channel leak when load exception occurs (#30915)
---
 be/src/vec/sink/writer/vtablet_writer.cpp | 8 +++++++-
 1 file changed, 7 insertions(+), 1 deletion(-)

diff --git a/be/src/vec/sink/writer/vtablet_writer.cpp 
b/be/src/vec/sink/writer/vtablet_writer.cpp
index b256ec5210f..bfac8e2e7a9 100644
--- a/be/src/vec/sink/writer/vtablet_writer.cpp
+++ b/be/src/vec/sink/writer/vtablet_writer.cpp
@@ -306,6 +306,8 @@ void VNodeChannel::clear_all_blocks() {
     _cur_mutable_block.reset();
 }
 
+// we don't need to send tablet_writer_cancel rpc request when
+// init failed, so set _is_closed to true.
 // if "_cancelled" is set to true,
 // no need to set _cancel_msg because the error will be
 // returned directly via "TabletSink::prepare()" method.
@@ -322,6 +324,7 @@ Status VNodeChannel::init(RuntimeState* state) {
     const auto* node = _parent->_nodes_info->find_node(_node_id);
     if (node == nullptr) {
         _cancelled = true;
+        _is_closed = true;
         return Status::InternalError("unknown node id, id={}", _node_id);
     }
     _node_info = *node;
@@ -336,6 +339,7 @@ Status VNodeChannel::init(RuntimeState* state) {
                                                                         
_node_info.brpc_port);
     if (_stub == nullptr) {
         _cancelled = true;
+        _is_closed = true;
         return Status::InternalError("Get rpc stub failed, host={}, port={}, 
info={}",
                                      _node_info.host, _node_info.brpc_port, 
channel_info());
     }
@@ -839,8 +843,10 @@ void VNodeChannel::_add_block_failed_callback(bool 
is_last_rpc) {
     }
 }
 
+// When _cancelled is true, we still need to send a tablet_writer_cancel
+// rpc request to truly release the load channel
 void VNodeChannel::cancel(const std::string& cancel_msg) {
-    if (_is_closed || _cancelled) {
+    if (_is_closed) {
         // skip the channels that have been canceled or close_wait.
         return;
     }


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to