This is an automated email from the ASF dual-hosted git repository.

gavinchou pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new 522cefb0b34 [chore](cloud) Add async wrap for `bthread_fork_join` with 
promise-future (#52816)
522cefb0b34 is described below

commit 522cefb0b341c06078d83bdbb9cbfdea5e486804
Author: Gavin Chou <[email protected]>
AuthorDate: Mon Jul 7 10:30:19 2025 +0800

    [chore](cloud) Add async wrap for `bthread_fork_join` with promise-future 
(#52816)
---
 be/src/cloud/cloud_meta_mgr.cpp       | 32 ++++++++++---
 be/src/cloud/cloud_meta_mgr.h         |  9 ++++
 be/test/cloud/cloud_meta_mgr_test.cpp | 89 +++++++++++++++++++++++++++++++++++
 3 files changed, 123 insertions(+), 7 deletions(-)

diff --git a/be/src/cloud/cloud_meta_mgr.cpp b/be/src/cloud/cloud_meta_mgr.cpp
index 32af870af29..fd6a66eb60a 100644
--- a/be/src/cloud/cloud_meta_mgr.cpp
+++ b/be/src/cloud/cloud_meta_mgr.cpp
@@ -69,6 +69,13 @@ namespace doris::cloud {
 #include "common/compile_check_begin.h"
 using namespace ErrorCode;
 
+void* run_bthread_work(void* arg) {
+    auto* f = reinterpret_cast<std::function<void()>*>(arg);
+    (*f)();
+    delete f;
+    return nullptr;
+}
+
 Status bthread_fork_join(const std::vector<std::function<Status()>>& tasks, 
int concurrency) {
     if (tasks.empty()) {
         return Status::OK();
@@ -79,13 +86,6 @@ Status bthread_fork_join(const 
std::vector<std::function<Status()>>& tasks, int
     Status status; // Guard by lock
     int count = 0; // Guard by lock
 
-    auto* run_bthread_work = +[](void* arg) -> void* {
-        auto* f = reinterpret_cast<std::function<void()>*>(arg);
-        (*f)();
-        delete f;
-        return nullptr;
-    };
-
     for (const auto& task : tasks) {
         {
             std::unique_lock lk(lock);
@@ -131,6 +131,24 @@ Status bthread_fork_join(const 
std::vector<std::function<Status()>>& tasks, int
     return status;
 }
 
+Status bthread_fork_join(const std::vector<std::function<Status()>>& tasks, 
int concurrency,
+                         std::future<Status>* fut) {
+    // std::function will cause `copy`, we need to use heap memory to avoid 
copy ctor called
+    auto prom = std::make_shared<std::promise<Status>>();
+    *fut = prom->get_future();
+    std::function<void()>* fn =
+            new std::function<void()>([&tasks, concurrency, p = 
std::move(prom)]() mutable {
+                p->set_value(bthread_fork_join(tasks, concurrency));
+            });
+
+    bthread_t bthread_id;
+    if (bthread_start_background(&bthread_id, nullptr, run_bthread_work, fn) 
!= 0) {
+        delete fn;
+        return Status::InternalError<false>("failed to create bthread");
+    }
+    return Status::OK();
+}
+
 namespace {
 constexpr int kBrpcRetryTimes = 3;
 
diff --git a/be/src/cloud/cloud_meta_mgr.h b/be/src/cloud/cloud_meta_mgr.h
index 962d0b06dd1..deed11bb283 100644
--- a/be/src/cloud/cloud_meta_mgr.h
+++ b/be/src/cloud/cloud_meta_mgr.h
@@ -18,6 +18,7 @@
 
 #include <gen_cpp/olap_file.pb.h>
 
+#include <future>
 #include <memory>
 #include <string>
 #include <tuple>
@@ -50,8 +51,16 @@ class TabletIndexPB;
 using StorageVaultInfos = std::vector<
         std::tuple<std::string, std::variant<S3Conf, HdfsVaultInfo>, 
StorageVaultPB_PathFormat>>;
 
+// run tasks in bthread with concurrency and wait until all tasks done
+// it stops running tasks if there are any tasks return !ok, leaving some 
tasks untouched
+// return OK if all tasks successfully done, otherwise return the result of 
the failed task
 Status bthread_fork_join(const std::vector<std::function<Status()>>& tasks, 
int concurrency);
 
+// An async wrap of `bthread_fork_join` declared previously using 
promise-future
+// return OK if fut successfully created, otherwise return error
+Status bthread_fork_join(const std::vector<std::function<Status()>>& tasks, 
int concurrency,
+                         std::future<Status>* fut);
+
 class CloudMetaMgr {
 public:
     CloudMetaMgr() = default;
diff --git a/be/test/cloud/cloud_meta_mgr_test.cpp 
b/be/test/cloud/cloud_meta_mgr_test.cpp
new file mode 100644
index 00000000000..43611c6e4e4
--- /dev/null
+++ b/be/test/cloud/cloud_meta_mgr_test.cpp
@@ -0,0 +1,89 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "cloud/cloud_meta_mgr.h"
+
+#include <gtest/gtest.h>
+
+#include <chrono>
+#include <memory>
+
+namespace doris {
+using namespace cloud;
+using namespace std::chrono;
+
+class CloudMetaMgrTest : public testing::Test {
+    void SetUp() override {}
+    void TearDown() override {}
+};
+
+TEST_F(CloudMetaMgrTest, bthread_fork_join_test) {
+    // clang-format off
+    std::vector<std::function<Status()>> tasks {
+        []{ bthread_usleep(20000); return Status::OK(); },
+        []{ bthread_usleep(20000); return Status::OK(); },
+        []{ bthread_usleep(20000); return Status::OK(); },
+        []{ bthread_usleep(20000); return Status::OK(); },
+        []{ bthread_usleep(20000); return Status::OK(); },
+        []{ bthread_usleep(20000); return Status::OK(); },
+        []{ bthread_usleep(20000); return Status::OK(); },
+    };
+    {
+        auto start = steady_clock::now();
+        EXPECT_TRUE(bthread_fork_join(tasks, 3).ok());
+        auto end = steady_clock::now();
+        auto elapsed = duration_cast<milliseconds>(end - start).count();
+        EXPECT_GT(elapsed, 40); // at least 2 rounds running for 7 tasks
+    }
+    {
+        std::future<Status> fut;
+        auto start = steady_clock::now();
+        EXPECT_TRUE(bthread_fork_join(tasks, 3, &fut).ok()); // return 
immediately
+        auto end = steady_clock::now();
+        auto elapsed = duration_cast<milliseconds>(end - start).count();
+        EXPECT_LE(elapsed, 40); // async
+        EXPECT_TRUE(fut.get().ok());
+        end = steady_clock::now();
+        elapsed = duration_cast<milliseconds>(end - start).count();
+        EXPECT_GT(elapsed, 40); // at least 2 rounds running for 7 tasks
+    }
+
+    // make the first batch fail fast
+    tasks.insert(tasks.begin(), []{ bthread_usleep(20000); return 
Status::InternalError<false>("error"); });
+    {
+        auto start = steady_clock::now();
+        EXPECT_FALSE(bthread_fork_join(tasks, 3).ok());
+        auto end = steady_clock::now();
+        auto elapsed = duration_cast<milliseconds>(end - start).count();
+        EXPECT_LE(elapsed, 40); // at most 1 round running for 7 tasks
+    }
+    {
+        std::future<Status> fut;
+        auto start = steady_clock::now();
+        EXPECT_TRUE(bthread_fork_join(tasks, 3, &fut).ok()); // return 
immediately
+        auto end = steady_clock::now();
+        auto elapsed = duration_cast<milliseconds>(end - start).count();
+        EXPECT_LE(elapsed, 40); // async
+        EXPECT_FALSE(fut.get().ok());
+        end = steady_clock::now();
+        elapsed = duration_cast<milliseconds>(end - start).count();
+        EXPECT_LE(elapsed, 40); // at most 1 round running for 7 tasks
+    }
+    // clang-format on
+}
+
+} // namespace doris


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to