This is an automated email from the ASF dual-hosted git repository.

morrysnow pushed a commit to branch branch-3.1
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/branch-3.1 by this push:
     new 0e5fc3737c3 branch-3.1: [fix](pipeline) premature exit causing core 
dump during concurrent prepare execution #51492 (#52364)
0e5fc3737c3 is described below

commit 0e5fc3737c387cb94cb2f662fd3f31834b5e7882
Author: Gabriel <[email protected]>
AuthorDate: Fri Jun 27 10:38:22 2025 +0800

    branch-3.1: [fix](pipeline) premature exit causing core dump during 
concurrent prepare execution #51492 (#52364)
    
    Cherry-pick from #51492
    
    
    Co-authored-by: XLPE <[email protected]>
    Co-authored-by: XLPE <[email protected]>
---
 be/src/pipeline/pipeline_fragment_context.cpp | 38 ++++++++++++++-------------
 be/src/util/countdown_latch.h                 |  7 +++++
 2 files changed, 27 insertions(+), 18 deletions(-)

diff --git a/be/src/pipeline/pipeline_fragment_context.cpp 
b/be/src/pipeline/pipeline_fragment_context.cpp
index 81a84451e89..a8e62f4f6d3 100644
--- a/be/src/pipeline/pipeline_fragment_context.cpp
+++ b/be/src/pipeline/pipeline_fragment_context.cpp
@@ -106,6 +106,7 @@
 #include "runtime/thread_context.h"
 #include "service/backend_options.h"
 #include "util/container_util.hpp"
+#include "util/countdown_latch.h"
 #include "util/debug_util.h"
 #include "util/uid_util.h"
 #include "vec/common/sort/heap_sorter.h"
@@ -513,27 +514,28 @@ Status 
PipelineFragmentContext::_build_pipeline_tasks(const doris::TPipelineFrag
          target_size > 
_runtime_state->query_options().parallel_prepare_threshold)) {
         // If instances parallelism is big enough ( > 
parallel_prepare_threshold), we will prepare all tasks by multi-threads
         std::vector<Status> prepare_status(target_size);
-        std::mutex m;
-        std::condition_variable cv;
-        int prepare_done = 0;
+        int submitted_tasks = 0;
+        Status submit_status;
+        CountDownLatch latch((int)target_size);
         for (size_t i = 0; i < target_size; i++) {
-            RETURN_IF_ERROR(thread_pool->submit_func([&, i]() {
+            submit_status = thread_pool->submit_func([&, i]() {
                 SCOPED_ATTACH_TASK(_query_ctx.get());
                 prepare_status[i] = pre_and_submit(i, this);
-                std::unique_lock<std::mutex> lock(m);
-                prepare_done++;
-                if (prepare_done == target_size) {
-                    cv.notify_one();
-                }
-            }));
-        }
-        std::unique_lock<std::mutex> lock(m);
-        if (prepare_done != target_size) {
-            cv.wait(lock);
-            for (size_t i = 0; i < target_size; i++) {
-                if (!prepare_status[i].ok()) {
-                    return prepare_status[i];
-                }
+                latch.count_down();
+            });
+            if (LIKELY(submit_status.ok())) {
+                submitted_tasks++;
+            } else {
+                break;
+            }
+        }
+        latch.arrive_and_wait(target_size - submitted_tasks);
+        if (UNLIKELY(!submit_status.ok())) {
+            return submit_status;
+        }
+        for (int i = 0; i < submitted_tasks; i++) {
+            if (!prepare_status[i].ok()) {
+                return prepare_status[i];
             }
         }
     } else {
diff --git a/be/src/util/countdown_latch.h b/be/src/util/countdown_latch.h
index a41a417d20f..b27737e8bb1 100644
--- a/be/src/util/countdown_latch.h
+++ b/be/src/util/countdown_latch.h
@@ -91,6 +91,13 @@ public:
         }
     }
 
+    // decrements the internal counter by n and blocks the calling thread 
until the counter reaches zero.
+    void arrive_and_wait(uint64_t n) {
+        DCHECK_GE(n, 0);
+        count_down(n);
+        wait();
+    }
+
     uint64_t count() const {
         std::lock_guard<std::mutex> lock(_lock);
         return _count;


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to