This is an automated email from the ASF dual-hosted git repository. morningman pushed a commit to branch branch-1.2-lts in repository https://gitbox.apache.org/repos/asf/doris.git
commit 171a110d7901b3a3652ec893528d05ee5206ea52 Author: Yongqiang YANG <[email protected]> AuthorDate: Thu May 25 20:01:06 2023 +0800 [fix](publish) dot use wait_for for publish synchorization (#20029) It leads to use after free problem. --- be/src/olap/task/engine_publish_version_task.cpp | 31 ++++++++++++------------ be/src/olap/task/engine_publish_version_task.h | 12 ++++----- 2 files changed, 22 insertions(+), 21 deletions(-) diff --git a/be/src/olap/task/engine_publish_version_task.cpp b/be/src/olap/task/engine_publish_version_task.cpp index 8fd438354a..bdea449d35 100644 --- a/be/src/olap/task/engine_publish_version_task.cpp +++ b/be/src/olap/task/engine_publish_version_task.cpp @@ -34,7 +34,8 @@ using std::map; EnginePublishVersionTask::EnginePublishVersionTask(TPublishVersionRequest& publish_version_req, std::vector<TTabletId>* error_tablet_ids, std::vector<TTabletId>* succ_tablet_ids) - : _publish_version_req(publish_version_req), + : _total_task_num(0), + _publish_version_req(publish_version_req), _error_tablet_ids(error_tablet_ids), _succ_tablet_ids(succ_tablet_ids) {} @@ -49,13 +50,17 @@ void EnginePublishVersionTask::add_succ_tablet_id(int64_t tablet_id) { } void EnginePublishVersionTask::wait() { - std::unique_lock<std::mutex> lock(_tablet_finish_sleep_mutex); - _tablet_finish_sleep_cond.wait_for(lock, std::chrono::milliseconds(10)); + std::unique_lock<std::mutex> lock(_tablet_finish_mutex); + _tablet_finish_cond.wait(lock); } void EnginePublishVersionTask::notify() { - std::unique_lock<std::mutex> lock(_tablet_finish_sleep_mutex); - _tablet_finish_sleep_cond.notify_one(); + std::unique_lock<std::mutex> lock(_tablet_finish_mutex); + _tablet_finish_cond.notify_one(); +} + +int64_t EnginePublishVersionTask::finish_task() { + return _total_task_num.fetch_sub(1); } Status EnginePublishVersionTask::finish() { @@ -65,7 +70,6 @@ Status EnginePublishVersionTask::finish() { VLOG_NOTICE << "begin to process publish version. transaction_id=" << transaction_id; // each partition - std::atomic<int64_t> total_task_num(0); for (auto& par_ver_info : _publish_version_req.partition_version_infos) { int64_t partition_id = par_ver_info.partition_id; // get all partition related tablets and check whether the tablet have the related version @@ -141,10 +145,9 @@ Status EnginePublishVersionTask::finish() { continue; } } - total_task_num.fetch_add(1); + _total_task_num.fetch_add(1); auto tablet_publish_txn_ptr = std::make_shared<TabletPublishTxnTask>( - this, tablet, rowset, partition_id, transaction_id, version, tablet_info, - &total_task_num); + this, tablet, rowset, partition_id, transaction_id, version, tablet_info); auto submit_st = StorageEngine::instance()->tablet_publish_txn_thread_pool()->submit_func( [=]() { tablet_publish_txn_ptr->handle(); }); @@ -152,7 +155,7 @@ Status EnginePublishVersionTask::finish() { } } // wait for all publish txn finished - while (total_task_num.load() != 0) { + while (_total_task_num.load() != 0) { wait(); } @@ -196,20 +199,18 @@ Status EnginePublishVersionTask::finish() { TabletPublishTxnTask::TabletPublishTxnTask(EnginePublishVersionTask* engine_task, TabletSharedPtr tablet, RowsetSharedPtr rowset, int64_t partition_id, int64_t transaction_id, - Version version, const TabletInfo& tablet_info, - std::atomic<int64_t>* total_task_num) + Version version, const TabletInfo& tablet_info) : _engine_publish_version_task(engine_task), _tablet(tablet), _rowset(rowset), _partition_id(partition_id), _transaction_id(transaction_id), _version(version), - _tablet_info(tablet_info), - _total_task_num(total_task_num) {} + _tablet_info(tablet_info) {} void TabletPublishTxnTask::handle() { Defer defer {[&] { - if (_total_task_num->fetch_sub(1) == 1) { + if (_engine_publish_version_task->finish_task() == 1) { _engine_publish_version_task->notify(); } }}; diff --git a/be/src/olap/task/engine_publish_version_task.h b/be/src/olap/task/engine_publish_version_task.h index 959584d116..0a0de3efc0 100644 --- a/be/src/olap/task/engine_publish_version_task.h +++ b/be/src/olap/task/engine_publish_version_task.h @@ -29,8 +29,7 @@ class TabletPublishTxnTask { public: TabletPublishTxnTask(EnginePublishVersionTask* engine_task, TabletSharedPtr tablet, RowsetSharedPtr rowset, int64_t partition_id, int64_t transaction_id, - Version version, const TabletInfo& tablet_info, - std::atomic<int64_t>* total_task_num); + Version version, const TabletInfo& tablet_info); ~TabletPublishTxnTask() {} void handle(); @@ -44,8 +43,6 @@ private: int64_t _transaction_id; Version _version; TabletInfo _tablet_info; - - std::atomic<int64_t>* _total_task_num; }; class EnginePublishVersionTask : public EngineTask { @@ -63,14 +60,17 @@ public: void notify(); void wait(); + int64_t finish_task(); + private: + std::atomic<int64_t> _total_task_num; const TPublishVersionRequest& _publish_version_req; std::mutex _tablet_ids_mutex; vector<TTabletId>* _error_tablet_ids; vector<TTabletId>* _succ_tablet_ids; - std::mutex _tablet_finish_sleep_mutex; - std::condition_variable _tablet_finish_sleep_cond; + std::mutex _tablet_finish_mutex; + std::condition_variable _tablet_finish_cond; }; } // namespace doris --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
