This is an automated email from the ASF dual-hosted git repository. morningman pushed a commit to branch dev-1.0.1 in repository https://gitbox.apache.org/repos/asf/incubator-doris.git
commit abb3d0c08c803d7c24c9b51aec0f3d5ee76397e6 Author: Mingyu Chen <[email protected]> AuthorDate: Fri Mar 18 09:38:16 2022 +0800 [fix](load) fix bug that BE may crash when calling `mark_as_failed` (#8501) 1. The methods in the IndexChannel are called back in the RpcClosure in the NodeChannel. However, this callback may occur after the whole task is finished (e.g. due to network latency), and by that time the IndexChannel may have been destructured, so we should not call the IndexChannel methods anymore, otherwise the BE will crash. Therefore, we use the `_is_closed` variable and `_closed_lock` to ensure that the RPC callback function will not call the IndexChannel's method after the NodeChannel is closed. 2. Do not add IndexChannel to the ObjectPool. Because when deconstruct IndexChannel, it may call the deconstruction of NodeChannel. And the deconstruction of NodeChannel maybe time consuming(wait rpc finished). But the ObjectPool will hold a SpinLock to destroy the objects, so it may cause CPU busy. --- be/src/exec/tablet_sink.cpp | 32 +++++++++++++++++++++++--------- be/src/exec/tablet_sink.h | 15 +++++++++++++-- 2 files changed, 36 insertions(+), 11 deletions(-) diff --git a/be/src/exec/tablet_sink.cpp b/be/src/exec/tablet_sink.cpp index c1c5cde..0a56369 100644 --- a/be/src/exec/tablet_sink.cpp +++ b/be/src/exec/tablet_sink.cpp @@ -34,6 +34,7 @@ #include "service/brpc.h" #include "util/brpc_client_cache.h" #include "util/debug/sanitizer_scopes.h" +#include "util/defer_op.h" #include "util/monotime.h" #include "util/proto_util.h" #include "util/threadpool.h" @@ -183,6 +184,12 @@ Status NodeChannel::open_wait() { // add batch closure _add_batch_closure = ReusableClosure<PTabletWriterAddBatchResult>::create(); _add_batch_closure->addFailedHandler([this](bool is_last_rpc) { + std::lock_guard<std::mutex> l(this->_closed_lock); + if (this->_is_closed) { + // if the node channel is closed, no need to call `mark_as_failed`, + // and notice that _index_channel may already be destroyed. + return; + } // If rpc failed, mark all tablets on this node channel as failed _index_channel->mark_as_failed(this->node_id(), this->host(), _add_batch_closure->cntl.ErrorText(), -1); Status st = _index_channel->check_intolerable_failure(); @@ -197,6 +204,12 @@ Status NodeChannel::open_wait() { _add_batch_closure->addSuccessHandler([this](const PTabletWriterAddBatchResult& result, bool is_last_rpc) { + std::lock_guard<std::mutex> l(this->_closed_lock); + if (this->_is_closed) { + // if the node channel is closed, no need to call the following logic, + // and notice that _index_channel may already be destroyed. + return; + } Status status(result.status()); if (status.ok()) { // if has error tablet, handle them first @@ -329,15 +342,10 @@ Status NodeChannel::add_row(BlockRow& block_row, int64_t tablet_id) { return Status::OK(); } -Status NodeChannel::mark_close() { +void NodeChannel::mark_close() { auto st = none_of({_cancelled, _eos_is_produced}); if (!st.ok()) { - if (_cancelled) { - std::lock_guard<SpinLock> l(_cancel_msg_lock); - return Status::InternalError("mark close failed. " + _cancel_msg); - } else { - return st.clone_and_prepend("already stopped, can't mark as closed. cancelled/eos: "); - } + return; } _cur_add_batch_request.set_eos(true); @@ -354,10 +362,16 @@ Status NodeChannel::mark_close() { } _eos_is_produced = true; - return Status::OK(); + return; } Status NodeChannel::close_wait(RuntimeState* state) { + // set _is_closed to true finally + Defer set_closed {[&]() { + std::lock_guard<std::mutex> l(_closed_lock); + _is_closed = true; + }}; + auto st = none_of({_cancelled, !_eos_is_produced}); if (!st.ok()) { if (_cancelled) { @@ -809,7 +823,7 @@ Status OlapTableSink::prepare(RuntimeState* state) { tablets.emplace_back(std::move(tablet_with_partition)); } } - auto channel = _pool->add(new IndexChannel(this, index->index_id)); + auto channel = std::make_shared<IndexChannel>(this, index->index_id); RETURN_IF_ERROR(channel->init(state, tablets)); _channels.emplace_back(channel); } diff --git a/be/src/exec/tablet_sink.h b/be/src/exec/tablet_sink.h index 5c0330c..b5c10b7 100644 --- a/be/src/exec/tablet_sink.h +++ b/be/src/exec/tablet_sink.h @@ -170,7 +170,7 @@ public: // two ways to stop channel: // 1. mark_close()->close_wait() PS. close_wait() will block waiting for the last AddBatch rpc response. // 2. just cancel() - Status mark_close(); + void mark_close(); Status close_wait(RuntimeState* state); void cancel(const std::string& cancel_msg); @@ -284,6 +284,17 @@ private: // the timestamp when this node channel be marked closed and finished closed uint64_t _close_time_ms = 0; + + // lock to protect _is_closed. + // The methods in the IndexChannel are called back in the RpcClosure in the NodeChannel. + // However, this rpc callback may occur after the whole task is finished (e.g. due to network latency), + // and by that time the IndexChannel may have been destructured, so we should not call the + // IndexChannel methods anymore, otherwise the BE will crash. + // Therefore, we use the _is_closed and _closed_lock to ensure that the RPC callback + // function will not call the IndexChannel method after the NodeChannel is closed. + // The IndexChannel is definitely accessible until the NodeChannel is closed. + std::mutex _closed_lock; + bool _is_closed = false; }; class IndexChannel { @@ -425,7 +436,7 @@ protected: Bitmap _filter_bitmap; // index_channel - std::vector<IndexChannel*> _channels; + std::vector<std::shared_ptr<IndexChannel>> _channels; CountDownLatch _stop_background_threads_latch; scoped_refptr<Thread> _sender_thread; --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
