vagetablechicken commented on a change in pull request #3143: Non blocking
OlapTableSink
URL: https://github.com/apache/incubator-doris/pull/3143#discussion_r395963965
##########
File path: be/src/exec/tablet_sink.cpp
##########
@@ -128,55 +129,128 @@ Status NodeChannel::open_wait() {
_open_closure = nullptr;
// add batch closure
- _add_batch_closure = new RefCountClosure<PTabletWriterAddBatchResult>();
- _add_batch_closure->ref();
+ _add_batch_closure =
ReusableClosure<PTabletWriterAddBatchResult>::create();
+ _add_batch_closure->addFailedHandler([this]() {
+ _rpc_error = true;
+ LOG(WARNING) << "NodeChannel add batch req rpc failed, load_id=" <<
_parent->_load_id
+ << ", node=" << node_info()->host << ":" <<
node_info()->brpc_port;
+ });
+
+ _add_batch_closure->addSuccessHandler(
+ [this](const PTabletWriterAddBatchResult& result, bool
is_last_rpc) {
+ Status status(result.status());
+ if (status.ok()) {
+ if (is_last_rpc) {
+ for (auto& tablet : result.tablet_vec()) {
+ TTabletCommitInfo commit_info;
+ commit_info.tabletId = tablet.tablet_id();
+ commit_info.backendId = _node_id;
+
_tablet_commit_infos.emplace_back(std::move(commit_info));
+ }
+ _add_batches_finished = true;
+ LOG(INFO) << name() << " last rpc has responsed";
+ }
+ } else {
+ _rpc_error = true;
+ LOG(WARNING) << "NodeChannel add batch req success but
status not ok, load_id="
+ << _parent->_load_id << ", node=" <<
node_info()->host << ":"
+ << node_info()->brpc_port << ", errmsg=" <<
status.get_error_msg();
+ }
+
+ if (result.has_execution_time_us()) {
+ _add_batch_counter.add_batch_execution_time_us +=
result.execution_time_us();
+ _add_batch_counter.add_batch_wait_lock_time_us +=
result.wait_lock_time_us();
+ _add_batch_counter.add_batch_num++;
+ }
+ });
return status;
}
Status NodeChannel::add_row(Tuple* input_tuple, int64_t tablet_id) {
- auto row_no = _batch->add_row();
+ // If add_row() when _eos_is_produced==true, there must be sth wrong, we
can only mark this channel as failed.
+ auto st = none_of({_rpc_error, _is_cancelled, _eos_is_produced});
+ if (!st.ok()) {
+ return st.clone_and_prepend("already stopped, can't add_row.
rpc_error/cancelled/eos: ");
+ }
+
+ auto row_no = _cur_batch->add_row();
if (row_no == RowBatch::INVALID_ROW_INDEX) {
- RETURN_IF_ERROR(_send_cur_batch());
- row_no = _batch->add_row();
+ {
+ SCOPED_RAW_TIMER(&_queue_push_lock_ns);
+ std::lock_guard<std::mutex> l(_pending_batches_lock);
+ //To simplify the add_row logic, postpone adding batch into req
until the time of sending req
+ _pending_batches.emplace(std::move(_cur_batch),
_cur_add_batch_request);
Review comment:
_plan node in PlanFragmentExec may not limit the mem, so I add mem limit
check in NodeChannel::add_row, it'll block the main thread if mem limit
exceeded.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
With regards,
Apache Git Services
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]