This is an automated email from the ASF dual-hosted git repository.

yiguolei pushed a commit to branch branch-4.0
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/branch-4.0 by this push:
     new a029a6fa4f3 branch-4.0: [chore](UT) Make cloud mgr test stable #57033 
(#57053)
a029a6fa4f3 is described below

commit a029a6fa4f35c7d2efa399e37c09bf9e467ccda8
Author: github-actions[bot] 
<41898282+github-actions[bot]@users.noreply.github.com>
AuthorDate: Fri Oct 17 08:51:02 2025 +0800

    branch-4.0: [chore](UT) Make cloud mgr test stable #57033 (#57053)
    
    Cherry-picked from #57033
    
    Co-authored-by: Gavin Chou <[email protected]>
---
 be/test/cloud/cloud_meta_mgr_test.cpp | 135 ++++++++++++++++++++++++++--------
 1 file changed, 103 insertions(+), 32 deletions(-)

diff --git a/be/test/cloud/cloud_meta_mgr_test.cpp 
b/be/test/cloud/cloud_meta_mgr_test.cpp
index df3df147959..3f2f7fe4ef6 100644
--- a/be/test/cloud/cloud_meta_mgr_test.cpp
+++ b/be/test/cloud/cloud_meta_mgr_test.cpp
@@ -21,6 +21,7 @@
 
 #include <chrono>
 #include <memory>
+#include <random>
 #include <set>
 
 #include "cloud/cloud_storage_engine.h"
@@ -44,57 +45,127 @@ class CloudMetaMgrTest : public testing::Test {
 
 TEST_F(CloudMetaMgrTest, bthread_fork_join_test) {
     // clang-format off
-    std::vector<std::function<Status()>> tasks {
-        []{ bthread_usleep(20000); return Status::OK(); },
-        []{ bthread_usleep(20000); return Status::OK(); },
-        []{ bthread_usleep(20000); return Status::OK(); },
-        []{ bthread_usleep(20000); return Status::OK(); },
-        []{ bthread_usleep(20000); return Status::OK(); },
-        []{ bthread_usleep(20000); return Status::OK(); },
-        []{ bthread_usleep(20000); return Status::OK(); },
-    };
+    std::atomic<int> task_counter{0};
+    std::atomic<int> concurrent_tasks{0};
+    std::atomic<int> max_concurrent_tasks{0};
+    int num_tasks = 7;
+    int concurrency = 3;
+    int sleep_us = 10000;
+
+    std::mt19937 rng(system_clock::now().time_since_epoch().count());
+    std::uniform_int_distribution<int> dist(0, sleep_us);
+    std::vector<std::function<Status()>> tasks (
+        num_tasks,
+        [&]{
+            int current = ++concurrent_tasks;
+            int old_max = max_concurrent_tasks.load();
+            while (current > old_max && 
!max_concurrent_tasks.compare_exchange_weak(old_max, current)) {
+                // Update max if needed
+            }
+            bthread_usleep(sleep_us);
+            task_counter.fetch_add(1);
+            --concurrent_tasks;
+            return Status::OK();
+        }
+    );
+
+    // Test synchronous execution
     {
         auto start = steady_clock::now();
-        EXPECT_TRUE(bthread_fork_join(tasks, 3).ok());
+        task_counter.store(0);
+        concurrent_tasks.store(0);
+        max_concurrent_tasks.store(0);
+
+        EXPECT_TRUE(bthread_fork_join(tasks, concurrency).ok());
         auto end = steady_clock::now();
-        auto elapsed = duration_cast<milliseconds>(end - start).count();
-        EXPECT_GT(elapsed, 40); // at least 2 rounds running for 7 tasks
+
+        // All 7 tasks should have completed
+        EXPECT_EQ(task_counter.load(), num_tasks);
+        // With concurrency 3, we should see at most 3 concurrent tasks
+        EXPECT_LE(max_concurrent_tasks.load(), concurrency);
+        EXPECT_GT(max_concurrent_tasks.load(), 0);
+        EXPECT_GE(duration_cast<microseconds>(end - start).count(), (num_tasks 
* sleep_us / concurrency));
     }
+
+    // Test asynchronous execution
     {
-        std::future<Status> fut;
         auto start = steady_clock::now();
+        task_counter.store(0);
+        concurrent_tasks.store(0);
+        max_concurrent_tasks.store(0);
+
+        std::future<Status> fut;
         auto t = tasks;
-        EXPECT_TRUE(bthread_fork_join(std::move(t), 3, &fut).ok()); // return 
immediately
-        auto end = steady_clock::now();
-        auto elapsed = duration_cast<milliseconds>(end - start).count();
-        EXPECT_LE(elapsed, 40); // async
+        EXPECT_TRUE(bthread_fork_join(std::move(t), concurrency, &fut).ok()); 
// return immediately
+        EXPECT_LT(task_counter.load(), num_tasks);
+
+        // Initially no tasks should have completed
+        EXPECT_EQ(task_counter.load(), 0);
+
+        // Wait for completion
         EXPECT_TRUE(fut.get().ok());
-        end = steady_clock::now();
-        elapsed = duration_cast<milliseconds>(end - start).count();
-        EXPECT_GT(elapsed, 40); // at least 2 rounds running for 7 tasks
+        auto end = steady_clock::now();
+
+        // All 7 tasks should have completed
+        EXPECT_EQ(task_counter.load(), num_tasks);
+        // With concurrency 3, we should see at most 3 concurrent tasks
+        EXPECT_LE(max_concurrent_tasks.load(), concurrency);
+        EXPECT_GT(max_concurrent_tasks.load(), 0);
+        EXPECT_GE(duration_cast<microseconds>(end - start).count(), (num_tasks 
* sleep_us / concurrency));
     }
 
-    // make the first batch fail fast
-    tasks.insert(tasks.begin(), []{ bthread_usleep(20000); return 
Status::InternalError<false>("error"); });
+    // Test error handling - make the first task fail fast
     {
         auto start = steady_clock::now();
-        EXPECT_FALSE(bthread_fork_join(tasks, 3).ok());
+        task_counter.store(0);
+        concurrent_tasks.store(0);
+        max_concurrent_tasks.store(0);
+
+        auto error_tasks = tasks;
+        error_tasks.insert(error_tasks.begin(), [&]{
+            // This task fails immediately
+            return Status::InternalError<false>("error");
+        });
+
+        EXPECT_FALSE(bthread_fork_join(error_tasks, concurrency).ok());
         auto end = steady_clock::now();
-        auto elapsed = duration_cast<milliseconds>(end - start).count();
-        EXPECT_LE(elapsed, 40); // at most 1 round running for 7 tasks
+
+        // When first task fails, not all tasks may complete
+        // We can only verify that at least one task ran
+        EXPECT_LE(task_counter.load(), concurrency);
+        EXPECT_GE(duration_cast<microseconds>(end - start).count(), sleep_us);
     }
+
+    // Test asynchronous error handling
     {
-        std::future<Status> fut;
         auto start = steady_clock::now();
-        auto t = tasks;
-        EXPECT_TRUE(bthread_fork_join(std::move(t), 3, &fut).ok()); // return 
immediately
         auto end = steady_clock::now();
-        auto elapsed = duration_cast<milliseconds>(end - start).count();
-        EXPECT_LE(elapsed, 40); // async
+        task_counter.store(0);
+        concurrent_tasks.store(0);
+        max_concurrent_tasks.store(0);
+
+        auto error_tasks = tasks;
+        error_tasks.insert(error_tasks.begin(), [&]{
+            // This task fails immediately
+            return Status::InternalError<false>("error");
+        });
+
+        std::future<Status> fut;
+        auto t = error_tasks;
+        EXPECT_TRUE(bthread_fork_join(std::move(t), concurrency, &fut).ok()); 
// return immediately
+        EXPECT_LT(task_counter.load(), concurrency);
+
+        // Initially no tasks should have completed
+        EXPECT_EQ(task_counter.load(), 0);
+
+        // Wait for completion - should fail
         EXPECT_FALSE(fut.get().ok());
         end = steady_clock::now();
-        elapsed = duration_cast<milliseconds>(end - start).count();
-        EXPECT_LE(elapsed, 40); // at most 1 round running for 7 tasks
+
+        // When first task fails, not all tasks may complete
+        // We can only verify that at least one task ran
+        EXPECT_LE(task_counter.load(), concurrency);
+        EXPECT_GE(duration_cast<microseconds>(end - start).count(), sleep_us);
     }
     // clang-format on
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to