This is an automated email from the ASF dual-hosted git repository.

morningman pushed a commit to branch branch-1.2-lts
in repository https://gitbox.apache.org/repos/asf/doris.git

commit 171a110d7901b3a3652ec893528d05ee5206ea52
Author: Yongqiang YANG <[email protected]>
AuthorDate: Thu May 25 20:01:06 2023 +0800

    [fix](publish) dot use wait_for for publish synchorization (#20029)
    
    It leads to use after free problem.
---
 be/src/olap/task/engine_publish_version_task.cpp | 31 ++++++++++++------------
 be/src/olap/task/engine_publish_version_task.h   | 12 ++++-----
 2 files changed, 22 insertions(+), 21 deletions(-)

diff --git a/be/src/olap/task/engine_publish_version_task.cpp 
b/be/src/olap/task/engine_publish_version_task.cpp
index 8fd438354a..bdea449d35 100644
--- a/be/src/olap/task/engine_publish_version_task.cpp
+++ b/be/src/olap/task/engine_publish_version_task.cpp
@@ -34,7 +34,8 @@ using std::map;
 EnginePublishVersionTask::EnginePublishVersionTask(TPublishVersionRequest& 
publish_version_req,
                                                    std::vector<TTabletId>* 
error_tablet_ids,
                                                    std::vector<TTabletId>* 
succ_tablet_ids)
-        : _publish_version_req(publish_version_req),
+        : _total_task_num(0),
+          _publish_version_req(publish_version_req),
           _error_tablet_ids(error_tablet_ids),
           _succ_tablet_ids(succ_tablet_ids) {}
 
@@ -49,13 +50,17 @@ void EnginePublishVersionTask::add_succ_tablet_id(int64_t 
tablet_id) {
 }
 
 void EnginePublishVersionTask::wait() {
-    std::unique_lock<std::mutex> lock(_tablet_finish_sleep_mutex);
-    _tablet_finish_sleep_cond.wait_for(lock, std::chrono::milliseconds(10));
+    std::unique_lock<std::mutex> lock(_tablet_finish_mutex);
+    _tablet_finish_cond.wait(lock);
 }
 
 void EnginePublishVersionTask::notify() {
-    std::unique_lock<std::mutex> lock(_tablet_finish_sleep_mutex);
-    _tablet_finish_sleep_cond.notify_one();
+    std::unique_lock<std::mutex> lock(_tablet_finish_mutex);
+    _tablet_finish_cond.notify_one();
+}
+
+int64_t EnginePublishVersionTask::finish_task() {
+    return _total_task_num.fetch_sub(1);
 }
 
 Status EnginePublishVersionTask::finish() {
@@ -65,7 +70,6 @@ Status EnginePublishVersionTask::finish() {
     VLOG_NOTICE << "begin to process publish version. transaction_id=" << 
transaction_id;
 
     // each partition
-    std::atomic<int64_t> total_task_num(0);
     for (auto& par_ver_info : _publish_version_req.partition_version_infos) {
         int64_t partition_id = par_ver_info.partition_id;
         // get all partition related tablets and check whether the tablet have 
the related version
@@ -141,10 +145,9 @@ Status EnginePublishVersionTask::finish() {
                     continue;
                 }
             }
-            total_task_num.fetch_add(1);
+            _total_task_num.fetch_add(1);
             auto tablet_publish_txn_ptr = 
std::make_shared<TabletPublishTxnTask>(
-                    this, tablet, rowset, partition_id, transaction_id, 
version, tablet_info,
-                    &total_task_num);
+                    this, tablet, rowset, partition_id, transaction_id, 
version, tablet_info);
             auto submit_st =
                     
StorageEngine::instance()->tablet_publish_txn_thread_pool()->submit_func(
                             [=]() { tablet_publish_txn_ptr->handle(); });
@@ -152,7 +155,7 @@ Status EnginePublishVersionTask::finish() {
         }
     }
     // wait for all publish txn finished
-    while (total_task_num.load() != 0) {
+    while (_total_task_num.load() != 0) {
         wait();
     }
 
@@ -196,20 +199,18 @@ Status EnginePublishVersionTask::finish() {
 TabletPublishTxnTask::TabletPublishTxnTask(EnginePublishVersionTask* 
engine_task,
                                            TabletSharedPtr tablet, 
RowsetSharedPtr rowset,
                                            int64_t partition_id, int64_t 
transaction_id,
-                                           Version version, const TabletInfo& 
tablet_info,
-                                           std::atomic<int64_t>* 
total_task_num)
+                                           Version version, const TabletInfo& 
tablet_info)
         : _engine_publish_version_task(engine_task),
           _tablet(tablet),
           _rowset(rowset),
           _partition_id(partition_id),
           _transaction_id(transaction_id),
           _version(version),
-          _tablet_info(tablet_info),
-          _total_task_num(total_task_num) {}
+          _tablet_info(tablet_info) {}
 
 void TabletPublishTxnTask::handle() {
     Defer defer {[&] {
-        if (_total_task_num->fetch_sub(1) == 1) {
+        if (_engine_publish_version_task->finish_task() == 1) {
             _engine_publish_version_task->notify();
         }
     }};
diff --git a/be/src/olap/task/engine_publish_version_task.h 
b/be/src/olap/task/engine_publish_version_task.h
index 959584d116..0a0de3efc0 100644
--- a/be/src/olap/task/engine_publish_version_task.h
+++ b/be/src/olap/task/engine_publish_version_task.h
@@ -29,8 +29,7 @@ class TabletPublishTxnTask {
 public:
     TabletPublishTxnTask(EnginePublishVersionTask* engine_task, 
TabletSharedPtr tablet,
                          RowsetSharedPtr rowset, int64_t partition_id, int64_t 
transaction_id,
-                         Version version, const TabletInfo& tablet_info,
-                         std::atomic<int64_t>* total_task_num);
+                         Version version, const TabletInfo& tablet_info);
     ~TabletPublishTxnTask() {}
 
     void handle();
@@ -44,8 +43,6 @@ private:
     int64_t _transaction_id;
     Version _version;
     TabletInfo _tablet_info;
-
-    std::atomic<int64_t>* _total_task_num;
 };
 
 class EnginePublishVersionTask : public EngineTask {
@@ -63,14 +60,17 @@ public:
     void notify();
     void wait();
 
+    int64_t finish_task();
+
 private:
+    std::atomic<int64_t> _total_task_num;
     const TPublishVersionRequest& _publish_version_req;
     std::mutex _tablet_ids_mutex;
     vector<TTabletId>* _error_tablet_ids;
     vector<TTabletId>* _succ_tablet_ids;
 
-    std::mutex _tablet_finish_sleep_mutex;
-    std::condition_variable _tablet_finish_sleep_cond;
+    std::mutex _tablet_finish_mutex;
+    std::condition_variable _tablet_finish_cond;
 };
 
 } // namespace doris


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to