This is an automated email from the ASF dual-hosted git repository.
gavinchou pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new 522cefb0b34 [chore](cloud) Add async wrap for `bthread_fork_join` with
promise-future (#52816)
522cefb0b34 is described below
commit 522cefb0b341c06078d83bdbb9cbfdea5e486804
Author: Gavin Chou <[email protected]>
AuthorDate: Mon Jul 7 10:30:19 2025 +0800
[chore](cloud) Add async wrap for `bthread_fork_join` with promise-future
(#52816)
---
be/src/cloud/cloud_meta_mgr.cpp | 32 ++++++++++---
be/src/cloud/cloud_meta_mgr.h | 9 ++++
be/test/cloud/cloud_meta_mgr_test.cpp | 89 +++++++++++++++++++++++++++++++++++
3 files changed, 123 insertions(+), 7 deletions(-)
diff --git a/be/src/cloud/cloud_meta_mgr.cpp b/be/src/cloud/cloud_meta_mgr.cpp
index 32af870af29..fd6a66eb60a 100644
--- a/be/src/cloud/cloud_meta_mgr.cpp
+++ b/be/src/cloud/cloud_meta_mgr.cpp
@@ -69,6 +69,13 @@ namespace doris::cloud {
#include "common/compile_check_begin.h"
using namespace ErrorCode;
+void* run_bthread_work(void* arg) {
+ auto* f = reinterpret_cast<std::function<void()>*>(arg);
+ (*f)();
+ delete f;
+ return nullptr;
+}
+
Status bthread_fork_join(const std::vector<std::function<Status()>>& tasks,
int concurrency) {
if (tasks.empty()) {
return Status::OK();
@@ -79,13 +86,6 @@ Status bthread_fork_join(const
std::vector<std::function<Status()>>& tasks, int
Status status; // Guard by lock
int count = 0; // Guard by lock
- auto* run_bthread_work = +[](void* arg) -> void* {
- auto* f = reinterpret_cast<std::function<void()>*>(arg);
- (*f)();
- delete f;
- return nullptr;
- };
-
for (const auto& task : tasks) {
{
std::unique_lock lk(lock);
@@ -131,6 +131,24 @@ Status bthread_fork_join(const
std::vector<std::function<Status()>>& tasks, int
return status;
}
+Status bthread_fork_join(const std::vector<std::function<Status()>>& tasks,
int concurrency,
+ std::future<Status>* fut) {
+ // std::function will cause `copy`, we need to use heap memory to avoid
copy ctor called
+ auto prom = std::make_shared<std::promise<Status>>();
+ *fut = prom->get_future();
+ std::function<void()>* fn =
+ new std::function<void()>([&tasks, concurrency, p =
std::move(prom)]() mutable {
+ p->set_value(bthread_fork_join(tasks, concurrency));
+ });
+
+ bthread_t bthread_id;
+ if (bthread_start_background(&bthread_id, nullptr, run_bthread_work, fn)
!= 0) {
+ delete fn;
+ return Status::InternalError<false>("failed to create bthread");
+ }
+ return Status::OK();
+}
+
namespace {
constexpr int kBrpcRetryTimes = 3;
diff --git a/be/src/cloud/cloud_meta_mgr.h b/be/src/cloud/cloud_meta_mgr.h
index 962d0b06dd1..deed11bb283 100644
--- a/be/src/cloud/cloud_meta_mgr.h
+++ b/be/src/cloud/cloud_meta_mgr.h
@@ -18,6 +18,7 @@
#include <gen_cpp/olap_file.pb.h>
+#include <future>
#include <memory>
#include <string>
#include <tuple>
@@ -50,8 +51,16 @@ class TabletIndexPB;
using StorageVaultInfos = std::vector<
std::tuple<std::string, std::variant<S3Conf, HdfsVaultInfo>,
StorageVaultPB_PathFormat>>;
+// run tasks in bthread with concurrency and wait until all tasks done
+// it stops running tasks if there are any tasks return !ok, leaving some
tasks untouched
+// return OK if all tasks successfully done, otherwise return the result of
the failed task
Status bthread_fork_join(const std::vector<std::function<Status()>>& tasks,
int concurrency);
+// An async wrap of `bthread_fork_join` declared previously using
promise-future
+// return OK if fut successfully created, otherwise return error
+Status bthread_fork_join(const std::vector<std::function<Status()>>& tasks,
int concurrency,
+ std::future<Status>* fut);
+
class CloudMetaMgr {
public:
CloudMetaMgr() = default;
diff --git a/be/test/cloud/cloud_meta_mgr_test.cpp
b/be/test/cloud/cloud_meta_mgr_test.cpp
new file mode 100644
index 00000000000..43611c6e4e4
--- /dev/null
+++ b/be/test/cloud/cloud_meta_mgr_test.cpp
@@ -0,0 +1,89 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "cloud/cloud_meta_mgr.h"
+
+#include <gtest/gtest.h>
+
+#include <chrono>
+#include <memory>
+
+namespace doris {
+using namespace cloud;
+using namespace std::chrono;
+
+class CloudMetaMgrTest : public testing::Test {
+ void SetUp() override {}
+ void TearDown() override {}
+};
+
+TEST_F(CloudMetaMgrTest, bthread_fork_join_test) {
+ // clang-format off
+ std::vector<std::function<Status()>> tasks {
+ []{ bthread_usleep(20000); return Status::OK(); },
+ []{ bthread_usleep(20000); return Status::OK(); },
+ []{ bthread_usleep(20000); return Status::OK(); },
+ []{ bthread_usleep(20000); return Status::OK(); },
+ []{ bthread_usleep(20000); return Status::OK(); },
+ []{ bthread_usleep(20000); return Status::OK(); },
+ []{ bthread_usleep(20000); return Status::OK(); },
+ };
+ {
+ auto start = steady_clock::now();
+ EXPECT_TRUE(bthread_fork_join(tasks, 3).ok());
+ auto end = steady_clock::now();
+ auto elapsed = duration_cast<milliseconds>(end - start).count();
+ EXPECT_GT(elapsed, 40); // at least 2 rounds running for 7 tasks
+ }
+ {
+ std::future<Status> fut;
+ auto start = steady_clock::now();
+ EXPECT_TRUE(bthread_fork_join(tasks, 3, &fut).ok()); // return
immediately
+ auto end = steady_clock::now();
+ auto elapsed = duration_cast<milliseconds>(end - start).count();
+ EXPECT_LE(elapsed, 40); // async
+ EXPECT_TRUE(fut.get().ok());
+ end = steady_clock::now();
+ elapsed = duration_cast<milliseconds>(end - start).count();
+ EXPECT_GT(elapsed, 40); // at least 2 rounds running for 7 tasks
+ }
+
+ // make the first batch fail fast
+ tasks.insert(tasks.begin(), []{ bthread_usleep(20000); return
Status::InternalError<false>("error"); });
+ {
+ auto start = steady_clock::now();
+ EXPECT_FALSE(bthread_fork_join(tasks, 3).ok());
+ auto end = steady_clock::now();
+ auto elapsed = duration_cast<milliseconds>(end - start).count();
+ EXPECT_LE(elapsed, 40); // at most 1 round running for 7 tasks
+ }
+ {
+ std::future<Status> fut;
+ auto start = steady_clock::now();
+ EXPECT_TRUE(bthread_fork_join(tasks, 3, &fut).ok()); // return
immediately
+ auto end = steady_clock::now();
+ auto elapsed = duration_cast<milliseconds>(end - start).count();
+ EXPECT_LE(elapsed, 40); // async
+ EXPECT_FALSE(fut.get().ok());
+ end = steady_clock::now();
+ elapsed = duration_cast<milliseconds>(end - start).count();
+ EXPECT_LE(elapsed, 40); // at most 1 round running for 7 tasks
+ }
+ // clang-format on
+}
+
+} // namespace doris
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]