This is an automated email from the ASF dual-hosted git repository.
yiguolei pushed a commit to branch dev-1.1.2
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/dev-1.1.2 by this push:
new 662ea3083f [bugfix](NodeChannel) fix OOM caused by pending queue in
sink send
662ea3083f is described below
commit 662ea3083f8b88b97b6ea84f52f1833180777350
Author: yiguolei <[email protected]>
AuthorDate: Thu Sep 8 08:58:28 2022 +0800
[bugfix](NodeChannel) fix OOM caused by pending queue in sink send
---
be/src/vec/sink/vtablet_sink.cpp | 359 ---------------------------------------
1 file changed, 359 deletions(-)
diff --git a/be/src/vec/sink/vtablet_sink.cpp b/be/src/vec/sink/vtablet_sink.cpp
index 846237229a..f0239b23c6 100644
--- a/be/src/vec/sink/vtablet_sink.cpp
+++ b/be/src/vec/sink/vtablet_sink.cpp
@@ -27,365 +27,6 @@
namespace doris {
namespace stream_load {
-<<<<<<< HEAD
-=======
-VNodeChannel::VNodeChannel(OlapTableSink* parent, IndexChannel* index_channel,
int64_t node_id)
- : NodeChannel(parent, index_channel, node_id) {
- _is_vectorized = true;
-}
-
-VNodeChannel::~VNodeChannel() {
- if (_add_block_closure != nullptr) {
- delete _add_block_closure;
- _add_block_closure = nullptr;
- }
- _cur_add_block_request.release_id();
-}
-
-void VNodeChannel::clear_all_blocks() {
- std::lock_guard<std::mutex> lg(_pending_batches_lock);
- std::queue<AddBlockReq> empty;
- std::swap(_pending_blocks, empty);
- _cur_mutable_block.reset();
-}
-
-// if "_cancelled" is set to true,
-// no need to set _cancel_msg because the error will be
-// returned directly via "TabletSink::prepare()" method.
-Status VNodeChannel::init(RuntimeState* state) {
- RETURN_IF_ERROR(NodeChannel::init(state));
-
- _cur_mutable_block.reset(new vectorized::MutableBlock({_tuple_desc}));
-
- // Initialize _cur_add_block_request
- _cur_add_block_request.set_allocated_id(&_parent->_load_id);
- _cur_add_block_request.set_index_id(_index_channel->_index_id);
- _cur_add_block_request.set_sender_id(_parent->_sender_id);
- _cur_add_block_request.set_backend_id(_node_id);
- _cur_add_block_request.set_eos(false);
-
- _name = fmt::format("VNodeChannel[{}-{}]", _index_channel->_index_id,
_node_id);
-
- return Status::OK();
-}
-
-Status VNodeChannel::open_wait() {
- Status status = NodeChannel::open_wait();
- if (!status.ok()) {
- return status;
- }
-
- // add block closure
- _add_block_closure =
ReusableClosure<PTabletWriterAddBlockResult>::create();
- _add_block_closure->addFailedHandler([this](bool is_last_rpc) {
- std::lock_guard<std::mutex> l(this->_closed_lock);
- if (this->_is_closed) {
- // if the node channel is closed, no need to call `mark_as_failed`,
- // and notice that _index_channel may already be destroyed.
- return;
- }
- // If rpc failed, mark all tablets on this node channel as failed
- _index_channel->mark_as_failed(this->node_id(), this->host(),
- _add_block_closure->cntl.ErrorText(),
-1);
- Status st = _index_channel->check_intolerable_failure();
- if (!st.ok()) {
- _cancel_with_msg(fmt::format("{}, err: {}", channel_info(),
st.get_error_msg()));
- } else if (is_last_rpc) {
- // if this is last rpc, will must set _add_batches_finished.
otherwise, node channel's close_wait
- // will be blocked.
- _add_batches_finished = true;
- }
- });
-
- _add_block_closure->addSuccessHandler([this](const
PTabletWriterAddBlockResult& result,
- bool is_last_rpc) {
- std::lock_guard<std::mutex> l(this->_closed_lock);
- if (this->_is_closed) {
- // if the node channel is closed, no need to call the following
logic,
- // and notice that _index_channel may already be destroyed.
- return;
- }
- Status status(result.status());
- if (status.ok()) {
- // if has error tablet, handle them first
- for (auto& error : result.tablet_errors()) {
- _index_channel->mark_as_failed(this->node_id(), this->host(),
error.msg(),
- error.tablet_id());
- }
-
- Status st = _index_channel->check_intolerable_failure();
- if (!st.ok()) {
- _cancel_with_msg(st.get_error_msg());
- } else if (is_last_rpc) {
- for (auto& tablet : result.tablet_vec()) {
- TTabletCommitInfo commit_info;
- commit_info.tabletId = tablet.tablet_id();
- commit_info.backendId = _node_id;
- _tablet_commit_infos.emplace_back(std::move(commit_info));
- VLOG_CRITICAL << "master replica commit info: tabletId="
<< tablet.tablet_id()
- << ", backendId=" << _node_id
- << ", master node id: " << this->node_id()
- << ", host: " << this->host() << ", txn_id="
<< _parent->_txn_id;
- }
- if (_parent->_write_single_replica) {
- for (auto& tablet_slave_node_ids :
result.success_slave_tablet_node_ids()) {
- for (auto slave_node_id :
tablet_slave_node_ids.second.slave_node_ids()) {
- TTabletCommitInfo commit_info;
- commit_info.tabletId = tablet_slave_node_ids.first;
- commit_info.backendId = slave_node_id;
-
_tablet_commit_infos.emplace_back(std::move(commit_info));
- VLOG_CRITICAL << "slave replica commit info:
tabletId="
- << tablet_slave_node_ids.first
- << ", backendId=" << slave_node_id
- << ", master node id: " <<
this->node_id()
- << ", host: " << this->host()
- << ", txn_id=" << _parent->_txn_id;
- }
- }
- }
- _add_batches_finished = true;
- }
- } else {
- _cancel_with_msg(fmt::format("{}, add batch req success but status
isn't ok, err: {}",
- channel_info(),
status.get_error_msg()));
- }
-
- if (result.has_execution_time_us()) {
- _add_batch_counter.add_batch_execution_time_us +=
result.execution_time_us();
- _add_batch_counter.add_batch_wait_execution_time_us +=
result.wait_execution_time_us();
- _add_batch_counter.add_batch_num++;
- }
- });
- return status;
-}
-
-Status VNodeChannel::add_block(vectorized::Block* block,
- const
std::pair<std::unique_ptr<vectorized::IColumn::Selector>,
- std::vector<int64_t>>& payload)
{
- SCOPED_CONSUME_MEM_TRACKER(_node_channel_tracker.get());
- // If add_block() when _eos_is_produced==true, there must be sth wrong, we
can only mark this channel as failed.
- auto st = none_of({_cancelled, _eos_is_produced});
- if (!st.ok()) {
- if (_cancelled) {
- std::lock_guard<SpinLock> l(_cancel_msg_lock);
- return Status::InternalError("add row failed. {}", _cancel_msg);
- } else {
- return std::move(st.prepend("already stopped, can't add row.
cancelled/eos: "));
- }
- }
-
- // We use OlapTableSink mem_tracker which has the same ancestor of _plan
node,
- // so in the ideal case, mem limit is a matter for _plan node.
- // But there is still some unfinished things, we do mem limit here
temporarily.
- // _cancelled may be set by rpc callback, and it's possible that
_cancelled might be set in any of the steps below.
- // It's fine to do a fake add_block() and return OK, because we will check
_cancelled in next add_block() or mark_close().
- while (!_cancelled && _pending_batches_num > 0 &&
- _pending_batches_bytes > _max_pending_batches_bytes) {
- SCOPED_ATOMIC_TIMER(&_mem_exceeded_block_ns);
- std::this_thread::sleep_for(std::chrono::milliseconds(10));
- }
-
- block->append_block_by_selector(_cur_mutable_block->mutable_columns(),
*(payload.first));
- for (auto tablet_id : payload.second) {
- _cur_add_block_request.add_tablet_ids(tablet_id);
- }
-
- if (_cur_mutable_block->rows() >= _batch_size ||
- _cur_mutable_block->bytes() > config::doris_scanner_row_bytes) {
- {
- SCOPED_ATOMIC_TIMER(&_queue_push_lock_ns);
- std::lock_guard<std::mutex> l(_pending_batches_lock);
- // To simplify the add_row logic, postpone adding block into req
until the time of sending req
- _pending_batches_bytes += _cur_mutable_block->allocated_bytes();
- _pending_blocks.emplace(std::move(_cur_mutable_block),
_cur_add_block_request);
- _pending_batches_num++;
- VLOG_DEBUG << "VOlapTableSink:" << _parent << " VNodeChannel:" <<
this
- << " pending_batches_bytes:" << _pending_batches_bytes
- << " jobid:" << std::to_string(_state->load_job_id())
- << " loadinfo:" << _load_info;
- }
-
- _cur_mutable_block.reset(new vectorized::MutableBlock({_tuple_desc}));
- _cur_add_block_request.clear_tablet_ids();
- }
-
- return Status::OK();
-}
-
-int VNodeChannel::try_send_and_fetch_status(RuntimeState* state,
- std::unique_ptr<ThreadPoolToken>&
thread_pool_token) {
- auto st = none_of({_cancelled, _send_finished});
- if (!st.ok()) {
- return 0;
- }
-
- if (!_add_block_closure->try_set_in_flight()) {
- return _send_finished ? 0 : 1;
- }
-
- // We are sure that try_send_batch is not running
- if (_pending_batches_num > 0) {
- auto s = thread_pool_token->submit_func(
- std::bind(&VNodeChannel::try_send_block, this, state));
- if (!s.ok()) {
- _cancel_with_msg("submit send_batch task to send_batch_thread_pool
failed");
- // clear in flight
- _add_block_closure->clear_in_flight();
- }
- // in_flight is cleared in closure::Run
- } else {
- // clear in flight
- _add_block_closure->clear_in_flight();
- }
- return _send_finished ? 0 : 1;
-}
-
-void VNodeChannel::try_send_block(RuntimeState* state) {
- SCOPED_ATTACH_TASK(state);
- SCOPED_CONSUME_MEM_TRACKER(_node_channel_tracker.get());
- SCOPED_ATOMIC_TIMER(&_actual_consume_ns);
- AddBlockReq send_block;
- {
- debug::ScopedTSANIgnoreReadsAndWrites ignore_tsan;
- std::lock_guard<std::mutex> l(_pending_batches_lock);
- DCHECK(!_pending_blocks.empty());
- send_block = std::move(_pending_blocks.front());
- _pending_blocks.pop();
- _pending_batches_num--;
- _pending_batches_bytes -= send_block.first->allocated_bytes();
- }
-
- auto mutable_block = std::move(send_block.first);
- auto request = std::move(send_block.second); // doesn't need to be saved
in heap
-
- // tablet_ids has already set when add row
- request.set_packet_seq(_next_packet_seq);
- auto block = mutable_block->to_block();
- if (block.rows() > 0) {
- SCOPED_ATOMIC_TIMER(&_serialize_batch_ns);
- size_t uncompressed_bytes = 0, compressed_bytes = 0;
- Status st = block.serialize(request.mutable_block(),
&uncompressed_bytes, &compressed_bytes,
-
state->fragement_transmission_compression_type(),
- _parent->_transfer_large_data_by_brpc);
- if (!st.ok()) {
- cancel(fmt::format("{}, err: {}", channel_info(),
st.get_error_msg()));
- _add_block_closure->clear_in_flight();
- return;
- }
- if (compressed_bytes >= double(config::brpc_max_body_size) * 0.95f) {
- LOG(WARNING) << "send block too large, this rpc may failed. send
size: "
- << compressed_bytes << ", threshold: " <<
config::brpc_max_body_size
- << ", " << channel_info();
- }
- }
-
- int remain_ms = _rpc_timeout_ms - _timeout_watch.elapsed_time() /
NANOS_PER_MILLIS;
- if (UNLIKELY(remain_ms < config::min_load_rpc_timeout_ms)) {
- if (remain_ms <= 0 && !request.eos()) {
- cancel(fmt::format("{}, err: timeout", channel_info()));
- _add_block_closure->clear_in_flight();
- return;
- } else {
- remain_ms = config::min_load_rpc_timeout_ms;
- }
- }
-
- _add_block_closure->reset();
- _add_block_closure->cntl.set_timeout_ms(remain_ms);
- if (config::tablet_writer_ignore_eovercrowded) {
- _add_block_closure->cntl.ignore_eovercrowded();
- }
-
- if (request.eos()) {
- for (auto pid : _parent->_partition_ids) {
- request.add_partition_ids(pid);
- }
-
- request.set_write_single_replica(false);
- if (_parent->_write_single_replica) {
- request.set_write_single_replica(true);
- for (std::unordered_map<int64_t, std::vector<int64_t>>::iterator
iter =
- _slave_tablet_nodes.begin();
- iter != _slave_tablet_nodes.end(); iter++) {
- PSlaveTabletNodes slave_tablet_nodes;
- for (auto node_id : iter->second) {
- auto node = _parent->_nodes_info->find_node(node_id);
- if (node == nullptr) {
- return;
- }
- PNodeInfo* pnode = slave_tablet_nodes.add_slave_nodes();
- pnode->set_id(node->id);
- pnode->set_option(node->option);
- pnode->set_host(node->host);
- pnode->set_async_internal_port(node->brpc_port);
- }
- request.mutable_slave_tablet_nodes()->insert({iter->first,
slave_tablet_nodes});
- }
- }
-
- // eos request must be the last request
- _add_block_closure->end_mark();
- _send_finished = true;
- CHECK(_pending_batches_num == 0) << _pending_batches_num;
- }
-
- if (_parent->_transfer_large_data_by_brpc && request.has_block() &&
- request.block().has_column_values() && request.ByteSizeLong() >
MIN_HTTP_BRPC_SIZE) {
- Status st = request_embed_attachment_contain_block<
- PTabletWriterAddBlockRequest,
ReusableClosure<PTabletWriterAddBlockResult>>(
- &request, _add_block_closure);
- if (!st.ok()) {
- cancel(fmt::format("{}, err: {}", channel_info(),
st.get_error_msg()));
- _add_block_closure->clear_in_flight();
- return;
- }
- std::string brpc_url = fmt::format("http://{}:{}", _node_info.host,
_node_info.brpc_port);
- std::shared_ptr<PBackendService_Stub> _brpc_http_stub =
-
_state->exec_env()->brpc_internal_client_cache()->get_new_client_no_cache(brpc_url,
-
"http");
- _add_block_closure->cntl.http_request().uri() =
- brpc_url +
"/PInternalServiceImpl/tablet_writer_add_block_by_http";
-
_add_block_closure->cntl.http_request().set_method(brpc::HTTP_METHOD_POST);
-
_add_block_closure->cntl.http_request().set_content_type("application/json");
- _brpc_http_stub->tablet_writer_add_block_by_http(
- &_add_block_closure->cntl, NULL, &_add_block_closure->result,
_add_block_closure);
- } else {
- _add_block_closure->cntl.http_request().Clear();
- _stub->tablet_writer_add_block(&_add_block_closure->cntl, &request,
- &_add_block_closure->result,
_add_block_closure);
- }
-
- _next_packet_seq++;
-}
-
-void VNodeChannel::_close_check() {
- std::lock_guard<std::mutex> lg(_pending_batches_lock);
- CHECK(_pending_blocks.empty()) << name();
- CHECK(_cur_mutable_block == nullptr) << name();
-}
-
-void VNodeChannel::mark_close() {
- auto st = none_of({_cancelled, _eos_is_produced});
- if (!st.ok()) {
- return;
- }
-
- _cur_add_block_request.set_eos(true);
- {
- debug::ScopedTSANIgnoreReadsAndWrites ignore_tsan;
- std::lock_guard<std::mutex> l(_pending_batches_lock);
- _pending_blocks.emplace(std::move(_cur_mutable_block),
_cur_add_block_request);
- _pending_batches_num++;
- DCHECK(_pending_blocks.back().second.eos());
- _close_time_ms = UnixMillis();
- LOG(INFO) << channel_info()
- << " mark closed, left pending batch size: " <<
_pending_blocks.size();
- }
-
- _eos_is_produced = true;
-}
-
->>>>>>> 569ab3055 ([bug](NodeChannel) fix OOM caused by pending queue in sink
send (#12359) (#12362))
VOlapTableSink::VOlapTableSink(ObjectPool* pool, const RowDescriptor& row_desc,
const std::vector<TExpr>& texprs, Status*
status)
: OlapTableSink(pool, row_desc, texprs, status) {
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]