This is an automated email from the ASF dual-hosted git repository.
kxiao pushed a commit to branch branch-2.0
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-2.0 by this push:
new 5a623d87f95 [Fix](load) fix load channel leak when load exception
occurs (#30915) (#30920)
5a623d87f95 is described below
commit 5a623d87f956a098bbcd7a728e31dfc308c8bf43
Author: Xin Liao <[email protected]>
AuthorDate: Tue Feb 6 19:37:03 2024 +0800
[Fix](load) fix load channel leak when load exception occurs (#30915)
(#30920)
---
be/src/vec/sink/vtablet_sink.cpp | 8 +++++++-
1 file changed, 7 insertions(+), 1 deletion(-)
diff --git a/be/src/vec/sink/vtablet_sink.cpp b/be/src/vec/sink/vtablet_sink.cpp
index 606aded9d35..56e1829202c 100644
--- a/be/src/vec/sink/vtablet_sink.cpp
+++ b/be/src/vec/sink/vtablet_sink.cpp
@@ -292,6 +292,8 @@ void VNodeChannel::clear_all_blocks() {
_cur_mutable_block.reset();
}
+// we don't need to send tablet_writer_cancel rpc request when
+// init failed, so set _is_closed to true.
// if "_cancelled" is set to true,
// no need to set _cancel_msg because the error will be
// returned directly via "TabletSink::prepare()" method.
@@ -302,6 +304,7 @@ Status VNodeChannel::init(RuntimeState* state) {
auto node = _parent->_nodes_info->find_node(_node_id);
if (node == nullptr) {
_cancelled = true;
+ _is_closed = true;
return Status::InternalError("unknown node id, id={}", _node_id);
}
@@ -317,6 +320,7 @@ Status VNodeChannel::init(RuntimeState* state) {
_node_info.brpc_port);
if (_stub == nullptr) {
_cancelled = true;
+ _is_closed = true;
return Status::InternalError("Get rpc stub failed, host={}, port={},
info={}",
_node_info.host, _node_info.brpc_port,
channel_info());
}
@@ -831,8 +835,10 @@ void VNodeChannel::try_send_block(RuntimeState* state) {
_next_packet_seq++;
}
+// When _cancelled is true, we still need to send a tablet_writer_cancel
+// rpc request to truly release the load channel
void VNodeChannel::cancel(const std::string& cancel_msg) {
- if (_is_closed || _cancelled) {
+ if (_is_closed) {
// skip the channels that have been canceled or close_wait.
return;
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]