This is an automated email from the ASF dual-hosted git repository.
zhaoc pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-doris.git
The following commit(s) were added to refs/heads/master by this push:
new fb02bb5 [Load] Fix mem limit in NodeChannel (#3643)
fb02bb5 is described below
commit fb02bb5cd9ed04834efda9f150d843b05885307d
Author: HuangWei <[email protected]>
AuthorDate: Fri May 22 09:11:59 2020 +0800
[Load] Fix mem limit in NodeChannel (#3643)
---
be/src/exec/tablet_sink.cpp | 10 +++++++++-
be/src/exec/tablet_sink.h | 1 +
2 files changed, 10 insertions(+), 1 deletion(-)
diff --git a/be/src/exec/tablet_sink.cpp b/be/src/exec/tablet_sink.cpp
index 45df64f..4941bb3 100644
--- a/be/src/exec/tablet_sink.cpp
+++ b/be/src/exec/tablet_sink.cpp
@@ -59,6 +59,7 @@ Status NodeChannel::init(RuntimeState* state) {
if (_node_info == nullptr) {
std::stringstream ss;
ss << "unknown node id, id=" << _node_id;
+ _cancelled = true;
return Status::InternalError(ss.str());
}
@@ -131,6 +132,11 @@ Status NodeChannel::open_wait() {
}
_open_closure = nullptr;
+ if (!status.ok()) {
+ _cancelled = true;
+ return status;
+ }
+
// add batch closure
_add_batch_closure =
ReusableClosure<PTabletWriterAddBatchResult>::create();
_add_batch_closure->addFailedHandler([this]() {
@@ -179,7 +185,9 @@ Status NodeChannel::add_row(Tuple* input_tuple, int64_t
tablet_id) {
// We use OlapTableSink mem_tracker which has the same ancestor of _plan
node,
// so in the ideal case, mem limit is a matter for _plan node.
// But there is still some unfinished things, we do mem limit here
temporarily.
- while (_parent->_mem_tracker->any_limit_exceeded()) {
+ // _cancelled may be set by rpc callback, and it's possible that
_cancelled might be set in any of the steps below.
+ // It's fine to do a fake add_row() and return OK, because we will check
_cancelled in next add_row() or mark_close().
+ while (!_cancelled && _parent->_mem_tracker->any_limit_exceeded() &&
_pending_batches_num > 0) {
SCOPED_RAW_TIMER(&_mem_exceeded_block_ns);
SleepFor(MonoDelta::FromMilliseconds(10));
}
diff --git a/be/src/exec/tablet_sink.h b/be/src/exec/tablet_sink.h
index 9ff18cf..47c9cfe 100644
--- a/be/src/exec/tablet_sink.h
+++ b/be/src/exec/tablet_sink.h
@@ -166,6 +166,7 @@ public:
// 0: stopped, send finished(eos request has been sent), or any internal
error;
// 1: running, haven't reach eos.
// only allow 1 rpc in flight
+ // plz make sure, this func should be called after open_wait().
int try_send_and_fetch_status();
void time_report(std::unordered_map<int64_t, AddBatchCounter>*
add_batch_counter_map,
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]