This is an automated email from the ASF dual-hosted git repository. kxiao pushed a commit to branch branch-2.0 in repository https://gitbox.apache.org/repos/asf/doris.git
commit 31c4f2c961d1cf27f7450695c41b9df3bcf6b14c Author: Xin Liao <[email protected]> AuthorDate: Thu Jun 29 20:58:47 2023 +0800 [fix](merge-on-write) fix dead lock when publish (#21339) --- be/src/olap/task/engine_publish_version_task.cpp | 37 +++++------------------- be/src/olap/task/engine_publish_version_task.h | 7 ----- 2 files changed, 7 insertions(+), 37 deletions(-) diff --git a/be/src/olap/task/engine_publish_version_task.cpp b/be/src/olap/task/engine_publish_version_task.cpp index 02af6bf674..249a7a424c 100644 --- a/be/src/olap/task/engine_publish_version_task.cpp +++ b/be/src/olap/task/engine_publish_version_task.cpp @@ -72,8 +72,7 @@ EnginePublishVersionTask::EnginePublishVersionTask( const TPublishVersionRequest& publish_version_req, std::vector<TTabletId>* error_tablet_ids, std::vector<TTabletId>* succ_tablet_ids, std::vector<std::tuple<int64_t, int64_t, int64_t>>* discontinuous_version_tablets) - : _total_task_num(0), - _publish_version_req(publish_version_req), + : _publish_version_req(publish_version_req), _error_tablet_ids(error_tablet_ids), _succ_tablet_ids(succ_tablet_ids), _discontinuous_version_tablets(discontinuous_version_tablets) {} @@ -88,25 +87,14 @@ void EnginePublishVersionTask::add_succ_tablet_id(int64_t tablet_id) { _succ_tablet_ids->push_back(tablet_id); } -void EnginePublishVersionTask::wait() { - std::unique_lock<std::mutex> lock(_tablet_finish_mutex); - _tablet_finish_cond.wait(lock); -} - -void EnginePublishVersionTask::notify() { - std::unique_lock<std::mutex> lock(_tablet_finish_mutex); - _tablet_finish_cond.notify_one(); -} - -int64_t EnginePublishVersionTask::finish_task() { - return _total_task_num.fetch_sub(1); -} - Status EnginePublishVersionTask::finish() { Status res = Status::OK(); int64_t transaction_id = _publish_version_req.transaction_id; OlapStopWatch watch; VLOG_NOTICE << "begin to process publish version. transaction_id=" << transaction_id; + std::unique_ptr<ThreadPoolToken> token = + StorageEngine::instance()->tablet_publish_txn_thread_pool()->new_token( + ThreadPool::ExecutionMode::CONCURRENT); // each partition for (auto& par_ver_info : _publish_version_req.partition_version_infos) { @@ -187,19 +175,13 @@ Status EnginePublishVersionTask::finish() { continue; } } - _total_task_num.fetch_add(1); auto tablet_publish_txn_ptr = std::make_shared<TabletPublishTxnTask>( this, tablet, rowset, partition_id, transaction_id, version, tablet_info); - auto submit_st = - StorageEngine::instance()->tablet_publish_txn_thread_pool()->submit_func( - [=]() { tablet_publish_txn_ptr->handle(); }); + auto submit_st = token->submit_func([=]() { tablet_publish_txn_ptr->handle(); }); CHECK(submit_st.ok()) << submit_st; } } - // wait for all publish txn finished - while (_total_task_num.load() != 0) { - wait(); - } + token->wait(); // check if the related tablet remained all have the version for (auto& par_ver_info : _publish_version_req.partition_version_infos) { @@ -260,12 +242,7 @@ void TabletPublishTxnTask::handle() { _engine_publish_version_task->add_error_tablet_id(_tablet_info.tablet_id); return; } - Defer defer {[&] { - _rowset->finish_publish(); - if (_engine_publish_version_task->finish_task() == 1) { - _engine_publish_version_task->notify(); - } - }}; + Defer defer {[&] { _rowset->finish_publish(); }}; auto publish_status = StorageEngine::instance()->txn_manager()->publish_txn( _partition_id, _tablet, _transaction_id, _version, &_stats); if (publish_status != Status::OK()) { diff --git a/be/src/olap/task/engine_publish_version_task.h b/be/src/olap/task/engine_publish_version_task.h index c8a68dedea..8acf8099ca 100644 --- a/be/src/olap/task/engine_publish_version_task.h +++ b/be/src/olap/task/engine_publish_version_task.h @@ -93,21 +93,14 @@ public: void add_error_tablet_id(int64_t tablet_id); void add_succ_tablet_id(int64_t tablet_id); - void notify(); - void wait(); - int64_t finish_task(); private: - std::atomic<int64_t> _total_task_num; const TPublishVersionRequest& _publish_version_req; std::mutex _tablet_ids_mutex; vector<TTabletId>* _error_tablet_ids; vector<TTabletId>* _succ_tablet_ids; std::vector<std::tuple<int64_t, int64_t, int64_t>>* _discontinuous_version_tablets; - - std::mutex _tablet_finish_mutex; - std::condition_variable _tablet_finish_cond; }; class AsyncTabletPublishTask { --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
