This is an automated email from the ASF dual-hosted git repository.

dataroaring pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new fb2c4011e4f [fix](scan) Fix missing sync rowsets in cloud mode (#31717)
fb2c4011e4f is described below

commit fb2c4011e4fc3240d537a2eae19833ccf377af6b
Author: plat1ko <[email protected]>
AuthorDate: Mon Mar 4 16:08:01 2024 +0800

    [fix](scan) Fix missing sync rowsets in cloud mode (#31717)
---
 be/src/cloud/cloud_meta_mgr.cpp             | 13 ++++++-------
 be/src/common/sync_point.h                  |  4 ++--
 be/src/olap/parallel_scanner_builder.cpp    | 15 +++++++++++++++
 be/src/pipeline/exec/olap_scan_operator.cpp | 17 +++++++++++++++--
 4 files changed, 38 insertions(+), 11 deletions(-)

diff --git a/be/src/cloud/cloud_meta_mgr.cpp b/be/src/cloud/cloud_meta_mgr.cpp
index c3eb63bdb01..112930b6f34 100644
--- a/be/src/cloud/cloud_meta_mgr.cpp
+++ b/be/src/cloud/cloud_meta_mgr.cpp
@@ -74,10 +74,7 @@ Status bthread_fork_join(const 
std::vector<std::function<Status()>>& tasks, int
         return nullptr;
     };
 
-    std::vector<bthread_t> bthread_ids;
-    bthread_ids.resize(tasks.size());
-    for (int task_idx = 0; task_idx < tasks.size(); ++task_idx) {
-        auto* task = &(tasks[task_idx]);
+    for (const auto& task : tasks) {
         {
             std::unique_lock lk(lock);
             // Wait until there are available slots
@@ -93,8 +90,8 @@ Status bthread_fork_join(const 
std::vector<std::function<Status()>>& tasks, int
         }
 
         // dispatch task into bthreads
-        auto* fn = new std::function<void()>([&, task] {
-            auto st = (*task)();
+        auto* fn = new std::function<void()>([&, &task = task] {
+            auto st = task();
             {
                 std::lock_guard lk(lock);
                 --count;
@@ -104,7 +101,9 @@ Status bthread_fork_join(const 
std::vector<std::function<Status()>>& tasks, int
                 cond.notify_one();
             }
         });
-        if (bthread_start_background(&bthread_ids[task_idx], nullptr, 
run_bthread_work, fn) != 0) {
+
+        bthread_t bthread_id;
+        if (bthread_start_background(&bthread_id, nullptr, run_bthread_work, 
fn) != 0) {
             run_bthread_work(fn);
         }
     }
diff --git a/be/src/common/sync_point.h b/be/src/common/sync_point.h
index 18b3a63c05e..9cddce96e4c 100644
--- a/be/src/common/sync_point.h
+++ b/be/src/common/sync_point.h
@@ -155,7 +155,7 @@ auto try_any_cast_ret(std::vector<std::any>& any) {
 { \
   std::pair ret {default_ret_val, false}; \
   std::vector<std::any> args {__VA_ARGS__}; \
-  args.push_back(&ret); \
+  args.emplace_back(&ret); \
   doris::SyncPoint::get_instance()->process(x, std::move(args)); \
   if (ret.second) return std::move(ret.first); \
 }
@@ -163,7 +163,7 @@ auto try_any_cast_ret(std::vector<std::any>& any) {
 { \
   bool pred = false; \
   std::vector<std::any> args {__VA_ARGS__}; \
-  args.push_back(&pred); \
+  args.emplace_back(&pred); \
   doris::SyncPoint::get_instance()->process(x, std::move(args)); \
   if (pred) return; \
 }
diff --git a/be/src/olap/parallel_scanner_builder.cpp 
b/be/src/olap/parallel_scanner_builder.cpp
index e7f51881288..e9797d1a40a 100644
--- a/be/src/olap/parallel_scanner_builder.cpp
+++ b/be/src/olap/parallel_scanner_builder.cpp
@@ -17,6 +17,9 @@
 
 #include "parallel_scanner_builder.h"
 
+#include "cloud/cloud_meta_mgr.h"
+#include "cloud/cloud_tablet.h"
+#include "cloud/config.h"
 #include "olap/rowset/beta_rowset.h"
 #include "pipeline/exec/olap_scan_operator.h"
 #include "vec/exec/scan/new_olap_scanner.h"
@@ -40,6 +43,18 @@ template <typename ParentType>
 Status ParallelScannerBuilder<ParentType>::_build_scanners_by_rowid(
         std::list<VScannerSPtr>& scanners) {
     DCHECK_GE(_rows_per_scanner, _min_rows_per_scanner);
+
+    if (config::is_cloud_mode()) {
+        std::vector<std::function<Status()>> tasks;
+        tasks.reserve(_tablets.size());
+        for (auto&& [tablet, version] : _tablets) {
+            tasks.emplace_back([tablet, version]() {
+                return 
std::dynamic_pointer_cast<CloudTablet>(tablet)->sync_rowsets(version);
+            });
+        }
+        RETURN_IF_ERROR(cloud::bthread_fork_join(tasks, 10));
+    }
+
     for (auto&& [tablet, version] : _tablets) {
         DCHECK(_all_rowsets.contains(tablet->tablet_id()));
         auto& rowsets = _all_rowsets[tablet->tablet_id()];
diff --git a/be/src/pipeline/exec/olap_scan_operator.cpp 
b/be/src/pipeline/exec/olap_scan_operator.cpp
index 0aab714449e..cc9270f0809 100644
--- a/be/src/pipeline/exec/olap_scan_operator.cpp
+++ b/be/src/pipeline/exec/olap_scan_operator.cpp
@@ -21,6 +21,9 @@
 
 #include <memory>
 
+#include "cloud/cloud_meta_mgr.h"
+#include "cloud/cloud_tablet.h"
+#include "cloud/config.h"
 #include "olap/parallel_scanner_builder.h"
 #include "olap/storage_engine.h"
 #include "olap/tablet_manager.h"
@@ -277,8 +280,18 @@ Status 
OlapScanLocalState::_init_scanners(std::list<vectorized::VScannerSPtr>* s
             int64_t version = 0;
             std::from_chars(scan_range->version.data(),
                             scan_range->version.data() + 
scan_range->version.size(), version);
-            tablets.emplace_back(
-                    TabletWithVersion 
{std::dynamic_pointer_cast<Tablet>(tablet), version});
+            tablets.emplace_back(TabletWithVersion {std::move(tablet), 
version});
+        }
+
+        if (config::is_cloud_mode()) {
+            std::vector<std::function<Status()>> tasks;
+            tasks.reserve(tablets.size());
+            for (auto&& [tablet, version] : tablets) {
+                tasks.emplace_back([tablet, version]() {
+                    return 
std::dynamic_pointer_cast<CloudTablet>(tablet)->sync_rowsets(version);
+                });
+            }
+            RETURN_IF_ERROR(cloud::bthread_fork_join(tasks, 10));
         }
 
         if (is_dup_mow_key) {


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to