This is an automated email from the ASF dual-hosted git repository.
yiguolei pushed a commit to branch branch-4.0
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-4.0 by this push:
new a029a6fa4f3 branch-4.0: [chore](UT) Make cloud mgr test stable #57033
(#57053)
a029a6fa4f3 is described below
commit a029a6fa4f35c7d2efa399e37c09bf9e467ccda8
Author: github-actions[bot]
<41898282+github-actions[bot]@users.noreply.github.com>
AuthorDate: Fri Oct 17 08:51:02 2025 +0800
branch-4.0: [chore](UT) Make cloud mgr test stable #57033 (#57053)
Cherry-picked from #57033
Co-authored-by: Gavin Chou <[email protected]>
---
be/test/cloud/cloud_meta_mgr_test.cpp | 135 ++++++++++++++++++++++++++--------
1 file changed, 103 insertions(+), 32 deletions(-)
diff --git a/be/test/cloud/cloud_meta_mgr_test.cpp
b/be/test/cloud/cloud_meta_mgr_test.cpp
index df3df147959..3f2f7fe4ef6 100644
--- a/be/test/cloud/cloud_meta_mgr_test.cpp
+++ b/be/test/cloud/cloud_meta_mgr_test.cpp
@@ -21,6 +21,7 @@
#include <chrono>
#include <memory>
+#include <random>
#include <set>
#include "cloud/cloud_storage_engine.h"
@@ -44,57 +45,127 @@ class CloudMetaMgrTest : public testing::Test {
TEST_F(CloudMetaMgrTest, bthread_fork_join_test) {
// clang-format off
- std::vector<std::function<Status()>> tasks {
- []{ bthread_usleep(20000); return Status::OK(); },
- []{ bthread_usleep(20000); return Status::OK(); },
- []{ bthread_usleep(20000); return Status::OK(); },
- []{ bthread_usleep(20000); return Status::OK(); },
- []{ bthread_usleep(20000); return Status::OK(); },
- []{ bthread_usleep(20000); return Status::OK(); },
- []{ bthread_usleep(20000); return Status::OK(); },
- };
+ std::atomic<int> task_counter{0};
+ std::atomic<int> concurrent_tasks{0};
+ std::atomic<int> max_concurrent_tasks{0};
+ int num_tasks = 7;
+ int concurrency = 3;
+ int sleep_us = 10000;
+
+ std::mt19937 rng(system_clock::now().time_since_epoch().count());
+ std::uniform_int_distribution<int> dist(0, sleep_us);
+ std::vector<std::function<Status()>> tasks (
+ num_tasks,
+ [&]{
+ int current = ++concurrent_tasks;
+ int old_max = max_concurrent_tasks.load();
+ while (current > old_max &&
!max_concurrent_tasks.compare_exchange_weak(old_max, current)) {
+ // Update max if needed
+ }
+ bthread_usleep(sleep_us);
+ task_counter.fetch_add(1);
+ --concurrent_tasks;
+ return Status::OK();
+ }
+ );
+
+ // Test synchronous execution
{
auto start = steady_clock::now();
- EXPECT_TRUE(bthread_fork_join(tasks, 3).ok());
+ task_counter.store(0);
+ concurrent_tasks.store(0);
+ max_concurrent_tasks.store(0);
+
+ EXPECT_TRUE(bthread_fork_join(tasks, concurrency).ok());
auto end = steady_clock::now();
- auto elapsed = duration_cast<milliseconds>(end - start).count();
- EXPECT_GT(elapsed, 40); // at least 2 rounds running for 7 tasks
+
+ // All 7 tasks should have completed
+ EXPECT_EQ(task_counter.load(), num_tasks);
+ // With concurrency 3, we should see at most 3 concurrent tasks
+ EXPECT_LE(max_concurrent_tasks.load(), concurrency);
+ EXPECT_GT(max_concurrent_tasks.load(), 0);
+ EXPECT_GE(duration_cast<microseconds>(end - start).count(), (num_tasks
* sleep_us / concurrency));
}
+
+ // Test asynchronous execution
{
- std::future<Status> fut;
auto start = steady_clock::now();
+ task_counter.store(0);
+ concurrent_tasks.store(0);
+ max_concurrent_tasks.store(0);
+
+ std::future<Status> fut;
auto t = tasks;
- EXPECT_TRUE(bthread_fork_join(std::move(t), 3, &fut).ok()); // return
immediately
- auto end = steady_clock::now();
- auto elapsed = duration_cast<milliseconds>(end - start).count();
- EXPECT_LE(elapsed, 40); // async
+ EXPECT_TRUE(bthread_fork_join(std::move(t), concurrency, &fut).ok());
// return immediately
+ EXPECT_LT(task_counter.load(), num_tasks);
+
+ // Initially no tasks should have completed
+ EXPECT_EQ(task_counter.load(), 0);
+
+ // Wait for completion
EXPECT_TRUE(fut.get().ok());
- end = steady_clock::now();
- elapsed = duration_cast<milliseconds>(end - start).count();
- EXPECT_GT(elapsed, 40); // at least 2 rounds running for 7 tasks
+ auto end = steady_clock::now();
+
+ // All 7 tasks should have completed
+ EXPECT_EQ(task_counter.load(), num_tasks);
+ // With concurrency 3, we should see at most 3 concurrent tasks
+ EXPECT_LE(max_concurrent_tasks.load(), concurrency);
+ EXPECT_GT(max_concurrent_tasks.load(), 0);
+ EXPECT_GE(duration_cast<microseconds>(end - start).count(), (num_tasks
* sleep_us / concurrency));
}
- // make the first batch fail fast
- tasks.insert(tasks.begin(), []{ bthread_usleep(20000); return
Status::InternalError<false>("error"); });
+ // Test error handling - make the first task fail fast
{
auto start = steady_clock::now();
- EXPECT_FALSE(bthread_fork_join(tasks, 3).ok());
+ task_counter.store(0);
+ concurrent_tasks.store(0);
+ max_concurrent_tasks.store(0);
+
+ auto error_tasks = tasks;
+ error_tasks.insert(error_tasks.begin(), [&]{
+ // This task fails immediately
+ return Status::InternalError<false>("error");
+ });
+
+ EXPECT_FALSE(bthread_fork_join(error_tasks, concurrency).ok());
auto end = steady_clock::now();
- auto elapsed = duration_cast<milliseconds>(end - start).count();
- EXPECT_LE(elapsed, 40); // at most 1 round running for 7 tasks
+
+ // When first task fails, not all tasks may complete
+ // We can only verify that at least one task ran
+ EXPECT_LE(task_counter.load(), concurrency);
+ EXPECT_GE(duration_cast<microseconds>(end - start).count(), sleep_us);
}
+
+ // Test asynchronous error handling
{
- std::future<Status> fut;
auto start = steady_clock::now();
- auto t = tasks;
- EXPECT_TRUE(bthread_fork_join(std::move(t), 3, &fut).ok()); // return
immediately
auto end = steady_clock::now();
- auto elapsed = duration_cast<milliseconds>(end - start).count();
- EXPECT_LE(elapsed, 40); // async
+ task_counter.store(0);
+ concurrent_tasks.store(0);
+ max_concurrent_tasks.store(0);
+
+ auto error_tasks = tasks;
+ error_tasks.insert(error_tasks.begin(), [&]{
+ // This task fails immediately
+ return Status::InternalError<false>("error");
+ });
+
+ std::future<Status> fut;
+ auto t = error_tasks;
+ EXPECT_TRUE(bthread_fork_join(std::move(t), concurrency, &fut).ok());
// return immediately
+ EXPECT_LT(task_counter.load(), concurrency);
+
+ // Initially no tasks should have completed
+ EXPECT_EQ(task_counter.load(), 0);
+
+ // Wait for completion - should fail
EXPECT_FALSE(fut.get().ok());
end = steady_clock::now();
- elapsed = duration_cast<milliseconds>(end - start).count();
- EXPECT_LE(elapsed, 40); // at most 1 round running for 7 tasks
+
+ // When first task fails, not all tasks may complete
+ // We can only verify that at least one task ran
+ EXPECT_LE(task_counter.load(), concurrency);
+ EXPECT_GE(duration_cast<microseconds>(end - start).count(), sleep_us);
}
// clang-format on
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]