This is an automated email from the ASF dual-hosted git repository.
dataroaring pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new fb2c4011e4f [fix](scan) Fix missing sync rowsets in cloud mode (#31717)
fb2c4011e4f is described below
commit fb2c4011e4fc3240d537a2eae19833ccf377af6b
Author: plat1ko <[email protected]>
AuthorDate: Mon Mar 4 16:08:01 2024 +0800
[fix](scan) Fix missing sync rowsets in cloud mode (#31717)
---
be/src/cloud/cloud_meta_mgr.cpp | 13 ++++++-------
be/src/common/sync_point.h | 4 ++--
be/src/olap/parallel_scanner_builder.cpp | 15 +++++++++++++++
be/src/pipeline/exec/olap_scan_operator.cpp | 17 +++++++++++++++--
4 files changed, 38 insertions(+), 11 deletions(-)
diff --git a/be/src/cloud/cloud_meta_mgr.cpp b/be/src/cloud/cloud_meta_mgr.cpp
index c3eb63bdb01..112930b6f34 100644
--- a/be/src/cloud/cloud_meta_mgr.cpp
+++ b/be/src/cloud/cloud_meta_mgr.cpp
@@ -74,10 +74,7 @@ Status bthread_fork_join(const
std::vector<std::function<Status()>>& tasks, int
return nullptr;
};
- std::vector<bthread_t> bthread_ids;
- bthread_ids.resize(tasks.size());
- for (int task_idx = 0; task_idx < tasks.size(); ++task_idx) {
- auto* task = &(tasks[task_idx]);
+ for (const auto& task : tasks) {
{
std::unique_lock lk(lock);
// Wait until there are available slots
@@ -93,8 +90,8 @@ Status bthread_fork_join(const
std::vector<std::function<Status()>>& tasks, int
}
// dispatch task into bthreads
- auto* fn = new std::function<void()>([&, task] {
- auto st = (*task)();
+ auto* fn = new std::function<void()>([&, &task = task] {
+ auto st = task();
{
std::lock_guard lk(lock);
--count;
@@ -104,7 +101,9 @@ Status bthread_fork_join(const
std::vector<std::function<Status()>>& tasks, int
cond.notify_one();
}
});
- if (bthread_start_background(&bthread_ids[task_idx], nullptr,
run_bthread_work, fn) != 0) {
+
+ bthread_t bthread_id;
+ if (bthread_start_background(&bthread_id, nullptr, run_bthread_work,
fn) != 0) {
run_bthread_work(fn);
}
}
diff --git a/be/src/common/sync_point.h b/be/src/common/sync_point.h
index 18b3a63c05e..9cddce96e4c 100644
--- a/be/src/common/sync_point.h
+++ b/be/src/common/sync_point.h
@@ -155,7 +155,7 @@ auto try_any_cast_ret(std::vector<std::any>& any) {
{ \
std::pair ret {default_ret_val, false}; \
std::vector<std::any> args {__VA_ARGS__}; \
- args.push_back(&ret); \
+ args.emplace_back(&ret); \
doris::SyncPoint::get_instance()->process(x, std::move(args)); \
if (ret.second) return std::move(ret.first); \
}
@@ -163,7 +163,7 @@ auto try_any_cast_ret(std::vector<std::any>& any) {
{ \
bool pred = false; \
std::vector<std::any> args {__VA_ARGS__}; \
- args.push_back(&pred); \
+ args.emplace_back(&pred); \
doris::SyncPoint::get_instance()->process(x, std::move(args)); \
if (pred) return; \
}
diff --git a/be/src/olap/parallel_scanner_builder.cpp
b/be/src/olap/parallel_scanner_builder.cpp
index e7f51881288..e9797d1a40a 100644
--- a/be/src/olap/parallel_scanner_builder.cpp
+++ b/be/src/olap/parallel_scanner_builder.cpp
@@ -17,6 +17,9 @@
#include "parallel_scanner_builder.h"
+#include "cloud/cloud_meta_mgr.h"
+#include "cloud/cloud_tablet.h"
+#include "cloud/config.h"
#include "olap/rowset/beta_rowset.h"
#include "pipeline/exec/olap_scan_operator.h"
#include "vec/exec/scan/new_olap_scanner.h"
@@ -40,6 +43,18 @@ template <typename ParentType>
Status ParallelScannerBuilder<ParentType>::_build_scanners_by_rowid(
std::list<VScannerSPtr>& scanners) {
DCHECK_GE(_rows_per_scanner, _min_rows_per_scanner);
+
+ if (config::is_cloud_mode()) {
+ std::vector<std::function<Status()>> tasks;
+ tasks.reserve(_tablets.size());
+ for (auto&& [tablet, version] : _tablets) {
+ tasks.emplace_back([tablet, version]() {
+ return
std::dynamic_pointer_cast<CloudTablet>(tablet)->sync_rowsets(version);
+ });
+ }
+ RETURN_IF_ERROR(cloud::bthread_fork_join(tasks, 10));
+ }
+
for (auto&& [tablet, version] : _tablets) {
DCHECK(_all_rowsets.contains(tablet->tablet_id()));
auto& rowsets = _all_rowsets[tablet->tablet_id()];
diff --git a/be/src/pipeline/exec/olap_scan_operator.cpp
b/be/src/pipeline/exec/olap_scan_operator.cpp
index 0aab714449e..cc9270f0809 100644
--- a/be/src/pipeline/exec/olap_scan_operator.cpp
+++ b/be/src/pipeline/exec/olap_scan_operator.cpp
@@ -21,6 +21,9 @@
#include <memory>
+#include "cloud/cloud_meta_mgr.h"
+#include "cloud/cloud_tablet.h"
+#include "cloud/config.h"
#include "olap/parallel_scanner_builder.h"
#include "olap/storage_engine.h"
#include "olap/tablet_manager.h"
@@ -277,8 +280,18 @@ Status
OlapScanLocalState::_init_scanners(std::list<vectorized::VScannerSPtr>* s
int64_t version = 0;
std::from_chars(scan_range->version.data(),
scan_range->version.data() +
scan_range->version.size(), version);
- tablets.emplace_back(
- TabletWithVersion
{std::dynamic_pointer_cast<Tablet>(tablet), version});
+ tablets.emplace_back(TabletWithVersion {std::move(tablet),
version});
+ }
+
+ if (config::is_cloud_mode()) {
+ std::vector<std::function<Status()>> tasks;
+ tasks.reserve(tablets.size());
+ for (auto&& [tablet, version] : tablets) {
+ tasks.emplace_back([tablet, version]() {
+ return
std::dynamic_pointer_cast<CloudTablet>(tablet)->sync_rowsets(version);
+ });
+ }
+ RETURN_IF_ERROR(cloud::bthread_fork_join(tasks, 10));
}
if (is_dup_mow_key) {
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]