This is an automated email from the ASF dual-hosted git repository.

kxiao pushed a commit to branch branch-2.0
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/branch-2.0 by this push:
     new 5a623d87f95 [Fix](load) fix load channel leak when load exception 
occurs (#30915) (#30920)
5a623d87f95 is described below

commit 5a623d87f956a098bbcd7a728e31dfc308c8bf43
Author: Xin Liao <[email protected]>
AuthorDate: Tue Feb 6 19:37:03 2024 +0800

    [Fix](load) fix load channel leak when load exception occurs (#30915) 
(#30920)
---
 be/src/vec/sink/vtablet_sink.cpp | 8 +++++++-
 1 file changed, 7 insertions(+), 1 deletion(-)

diff --git a/be/src/vec/sink/vtablet_sink.cpp b/be/src/vec/sink/vtablet_sink.cpp
index 606aded9d35..56e1829202c 100644
--- a/be/src/vec/sink/vtablet_sink.cpp
+++ b/be/src/vec/sink/vtablet_sink.cpp
@@ -292,6 +292,8 @@ void VNodeChannel::clear_all_blocks() {
     _cur_mutable_block.reset();
 }
 
+// we don't need to send tablet_writer_cancel rpc request when
+// init failed, so set _is_closed to true.
 // if "_cancelled" is set to true,
 // no need to set _cancel_msg because the error will be
 // returned directly via "TabletSink::prepare()" method.
@@ -302,6 +304,7 @@ Status VNodeChannel::init(RuntimeState* state) {
     auto node = _parent->_nodes_info->find_node(_node_id);
     if (node == nullptr) {
         _cancelled = true;
+        _is_closed = true;
         return Status::InternalError("unknown node id, id={}", _node_id);
     }
 
@@ -317,6 +320,7 @@ Status VNodeChannel::init(RuntimeState* state) {
                                                                         
_node_info.brpc_port);
     if (_stub == nullptr) {
         _cancelled = true;
+        _is_closed = true;
         return Status::InternalError("Get rpc stub failed, host={}, port={}, 
info={}",
                                      _node_info.host, _node_info.brpc_port, 
channel_info());
     }
@@ -831,8 +835,10 @@ void VNodeChannel::try_send_block(RuntimeState* state) {
     _next_packet_seq++;
 }
 
+// When _cancelled is true, we still need to send a tablet_writer_cancel
+// rpc request to truly release the load channel
 void VNodeChannel::cancel(const std::string& cancel_msg) {
-    if (_is_closed || _cancelled) {
+    if (_is_closed) {
         // skip the channels that have been canceled or close_wait.
         return;
     }


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to