This is an automated email from the ASF dual-hosted git repository.
morningman pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-doris.git
The following commit(s) were added to refs/heads/master by this push:
new ba91b44 [fix](load) fix bug that NodeChannel can not be destroyed
ontime (#8705)
ba91b44 is described below
commit ba91b4455381b05169726c60a0fbb45807c189b3
Author: Mingyu Chen <[email protected]>
AuthorDate: Wed Mar 30 09:52:11 2022 +0800
[fix](load) fix bug that NodeChannel can not be destroyed ontime (#8705)
After the ReusableClosure is reset, we can not call join() method, or it
will blocked forever.
---
be/src/exec/tablet_sink.cpp | 6 +++++-
be/src/exec/tablet_sink.h | 2 +-
2 files changed, 6 insertions(+), 2 deletions(-)
diff --git a/be/src/exec/tablet_sink.cpp b/be/src/exec/tablet_sink.cpp
index 821e3d5..adeeb94 100644
--- a/be/src/exec/tablet_sink.cpp
+++ b/be/src/exec/tablet_sink.cpp
@@ -504,7 +504,6 @@ void NodeChannel::try_send_batch(RuntimeState* state) {
}
}
- _add_batch_closure->reset();
int remain_ms = _rpc_timeout_ms - _timeout_watch.elapsed_time() /
NANOS_PER_MILLIS;
if (UNLIKELY(remain_ms < config::min_load_rpc_timeout_ms)) {
if (remain_ms <= 0 && !request.eos()) {
@@ -514,6 +513,11 @@ void NodeChannel::try_send_batch(RuntimeState* state) {
remain_ms = config::min_load_rpc_timeout_ms;
}
}
+
+ // After calling reset(), make sure that the rpc will be called finally.
+ // Otherwise, when calling _add_batch_closure->join(), it will be blocked
forever.
+ // and _add_batch_closure->join() will be called in ~NodeChannel().
+ _add_batch_closure->reset();
_add_batch_closure->cntl.set_timeout_ms(remain_ms);
if (config::tablet_writer_ignore_eovercrowded) {
_add_batch_closure->cntl.ignore_eovercrowded();
diff --git a/be/src/exec/tablet_sink.h b/be/src/exec/tablet_sink.h
index e0161ba..e69cef0 100644
--- a/be/src/exec/tablet_sink.h
+++ b/be/src/exec/tablet_sink.h
@@ -100,7 +100,7 @@ public:
void addSuccessHandler(std::function<void(const T&, bool)> fn) {
success_handler = fn; }
void join() {
- if (cid != INVALID_BTHREAD_ID) {
+ if (cid != INVALID_BTHREAD_ID && _packet_in_flight) {
brpc::Join(cid);
}
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]