This is an automated email from the ASF dual-hosted git repository.

dataroaring pushed a commit to branch branch-3.0
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/branch-3.0 by this push:
     new 35a2633ab92 [fix](pipeline) premature exit causing core dump during 
concurrent pr… (#52365)
35a2633ab92 is described below

commit 35a2633ab921195250a25902d217ea6b7986bd87
Author: Gabriel <[email protected]>
AuthorDate: Fri Jun 27 10:15:47 2025 +0800

    [fix](pipeline) premature exit causing core dump during concurrent pr… 
(#52365)
    
    …epare execution (#51492)
    
    Issue Number: close #51491
    
    Problem Summary:
    When the queue of the FragmentMgrAsync thread pool is full, newly
    submitted tasks are rejected and return early. However, previously
    submitted tasks may still be scheduled for execution later. This can
    lead to premature destruction of objects such as PipelineFragmentContext
    and TPipelineFragmentParams that are referenced by those tasks,
    resulting in null pointer exceptions during task execution and
    ultimately causing a coredump.
    
    The pr policy is to wait until all previously submitted tasks are
    completed before returning.
    
    ```
    *** SIGSEGV address not mapped to object (@0x1c8) received by PID 3941201 
(TID 2115617 OR 0xfe1685bb97f0) from PID 456; stack trace: ***
     0# doris::signal::(anonymous namespace)::FailureSignalHandler(int, 
siginfo_t*, void*) at 
/home/jenkins_agent/workspace/BigDataComponent_doris-unified-arm-release/be/src/common/signal_handler.h:421
     1# os::Linux::chained_handler(int, siginfo_t*, void*) in 
/usr/jdk64/current/jre/lib/aarch64/server/libjvm.so
     2# JVM_handle_linux_signal in 
/usr/jdk64/current/jre/lib/aarch64/server/libjvm.so
     3# signalHandler(int, siginfo_t*, void*) in 
/usr/jdk64/current/jre/lib/aarch64/server/libjvm.so
     4# 0x0000FFFF6B2A07C0 in linux-vdso.so.1
     5# doris::TUniqueId::TUniqueId(doris::TUniqueId const&) at 
/home/jenkins_agent/workspace/BigDataComponent_doris-unified-arm-release/gensrc/build/gen_cpp/Types_types.cpp:2354
     6# doris::AttachTask::AttachTask(doris::QueryContext*) at 
/home/jenkins_agent/workspace/BigDataComponent_doris-unified-arm-release/be/src/runtime/thread_context.cpp:60
     7# std::_Function_handler<void (), 
doris::pipeline::PipelineXFragmentContext::_build_pipeline_x_tasks(doris::TPipelineFragmentParams
 const&, doris::ThreadPool*)::$_0>::_M_invoke(std::_Any_data const&) at 
/usr/lib/gcc/aarch64-linux-gnu/13/../../../../include/c++/13/bits/std_function.h:290
     8# doris::ThreadPool::dispatch_thread() at 
/home/jenkins_agent/workspace/BigDataComponent_doris-unified-arm-release/be/src/util/threadpool.cpp:552
     9# doris::Thread::supervise_thread(void*) at 
/home/jenkins_agent/workspace/BigDataComponent_doris-unified-arm-release/be/src/util/thread.cpp:499
    10# 0x0000FFFF6AF187AC in /lib64/libpthread.so.0
    11# 0x0000FFFF6B16548C in /lib64/libc.so.6
    ```
    
    Co-authored-by: XLPE <[email protected]>
    Co-authored-by: XLPE <[email protected]>
---
 be/src/pipeline/pipeline_fragment_context.cpp | 38 ++++++++++++++-------------
 be/src/util/countdown_latch.h                 |  7 +++++
 2 files changed, 27 insertions(+), 18 deletions(-)

diff --git a/be/src/pipeline/pipeline_fragment_context.cpp 
b/be/src/pipeline/pipeline_fragment_context.cpp
index 81a84451e89..a8e62f4f6d3 100644
--- a/be/src/pipeline/pipeline_fragment_context.cpp
+++ b/be/src/pipeline/pipeline_fragment_context.cpp
@@ -106,6 +106,7 @@
 #include "runtime/thread_context.h"
 #include "service/backend_options.h"
 #include "util/container_util.hpp"
+#include "util/countdown_latch.h"
 #include "util/debug_util.h"
 #include "util/uid_util.h"
 #include "vec/common/sort/heap_sorter.h"
@@ -513,27 +514,28 @@ Status 
PipelineFragmentContext::_build_pipeline_tasks(const doris::TPipelineFrag
          target_size > 
_runtime_state->query_options().parallel_prepare_threshold)) {
         // If instances parallelism is big enough ( > 
parallel_prepare_threshold), we will prepare all tasks by multi-threads
         std::vector<Status> prepare_status(target_size);
-        std::mutex m;
-        std::condition_variable cv;
-        int prepare_done = 0;
+        int submitted_tasks = 0;
+        Status submit_status;
+        CountDownLatch latch((int)target_size);
         for (size_t i = 0; i < target_size; i++) {
-            RETURN_IF_ERROR(thread_pool->submit_func([&, i]() {
+            submit_status = thread_pool->submit_func([&, i]() {
                 SCOPED_ATTACH_TASK(_query_ctx.get());
                 prepare_status[i] = pre_and_submit(i, this);
-                std::unique_lock<std::mutex> lock(m);
-                prepare_done++;
-                if (prepare_done == target_size) {
-                    cv.notify_one();
-                }
-            }));
-        }
-        std::unique_lock<std::mutex> lock(m);
-        if (prepare_done != target_size) {
-            cv.wait(lock);
-            for (size_t i = 0; i < target_size; i++) {
-                if (!prepare_status[i].ok()) {
-                    return prepare_status[i];
-                }
+                latch.count_down();
+            });
+            if (LIKELY(submit_status.ok())) {
+                submitted_tasks++;
+            } else {
+                break;
+            }
+        }
+        latch.arrive_and_wait(target_size - submitted_tasks);
+        if (UNLIKELY(!submit_status.ok())) {
+            return submit_status;
+        }
+        for (int i = 0; i < submitted_tasks; i++) {
+            if (!prepare_status[i].ok()) {
+                return prepare_status[i];
             }
         }
     } else {
diff --git a/be/src/util/countdown_latch.h b/be/src/util/countdown_latch.h
index a41a417d20f..b27737e8bb1 100644
--- a/be/src/util/countdown_latch.h
+++ b/be/src/util/countdown_latch.h
@@ -91,6 +91,13 @@ public:
         }
     }
 
+    // decrements the internal counter by n and blocks the calling thread 
until the counter reaches zero.
+    void arrive_and_wait(uint64_t n) {
+        DCHECK_GE(n, 0);
+        count_down(n);
+        wait();
+    }
+
     uint64_t count() const {
         std::lock_guard<std::mutex> lock(_lock);
         return _count;


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to