This is an automated email from the ASF dual-hosted git repository.
morrysnow pushed a commit to branch branch-3.1
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-3.1 by this push:
new 0e5fc3737c3 branch-3.1: [fix](pipeline) premature exit causing core
dump during concurrent prepare execution #51492 (#52364)
0e5fc3737c3 is described below
commit 0e5fc3737c387cb94cb2f662fd3f31834b5e7882
Author: Gabriel <[email protected]>
AuthorDate: Fri Jun 27 10:38:22 2025 +0800
branch-3.1: [fix](pipeline) premature exit causing core dump during
concurrent prepare execution #51492 (#52364)
Cherry-pick from #51492
Co-authored-by: XLPE <[email protected]>
Co-authored-by: XLPE <[email protected]>
---
be/src/pipeline/pipeline_fragment_context.cpp | 38 ++++++++++++++-------------
be/src/util/countdown_latch.h | 7 +++++
2 files changed, 27 insertions(+), 18 deletions(-)
diff --git a/be/src/pipeline/pipeline_fragment_context.cpp
b/be/src/pipeline/pipeline_fragment_context.cpp
index 81a84451e89..a8e62f4f6d3 100644
--- a/be/src/pipeline/pipeline_fragment_context.cpp
+++ b/be/src/pipeline/pipeline_fragment_context.cpp
@@ -106,6 +106,7 @@
#include "runtime/thread_context.h"
#include "service/backend_options.h"
#include "util/container_util.hpp"
+#include "util/countdown_latch.h"
#include "util/debug_util.h"
#include "util/uid_util.h"
#include "vec/common/sort/heap_sorter.h"
@@ -513,27 +514,28 @@ Status
PipelineFragmentContext::_build_pipeline_tasks(const doris::TPipelineFrag
target_size >
_runtime_state->query_options().parallel_prepare_threshold)) {
// If instances parallelism is big enough ( >
parallel_prepare_threshold), we will prepare all tasks by multi-threads
std::vector<Status> prepare_status(target_size);
- std::mutex m;
- std::condition_variable cv;
- int prepare_done = 0;
+ int submitted_tasks = 0;
+ Status submit_status;
+ CountDownLatch latch((int)target_size);
for (size_t i = 0; i < target_size; i++) {
- RETURN_IF_ERROR(thread_pool->submit_func([&, i]() {
+ submit_status = thread_pool->submit_func([&, i]() {
SCOPED_ATTACH_TASK(_query_ctx.get());
prepare_status[i] = pre_and_submit(i, this);
- std::unique_lock<std::mutex> lock(m);
- prepare_done++;
- if (prepare_done == target_size) {
- cv.notify_one();
- }
- }));
- }
- std::unique_lock<std::mutex> lock(m);
- if (prepare_done != target_size) {
- cv.wait(lock);
- for (size_t i = 0; i < target_size; i++) {
- if (!prepare_status[i].ok()) {
- return prepare_status[i];
- }
+ latch.count_down();
+ });
+ if (LIKELY(submit_status.ok())) {
+ submitted_tasks++;
+ } else {
+ break;
+ }
+ }
+ latch.arrive_and_wait(target_size - submitted_tasks);
+ if (UNLIKELY(!submit_status.ok())) {
+ return submit_status;
+ }
+ for (int i = 0; i < submitted_tasks; i++) {
+ if (!prepare_status[i].ok()) {
+ return prepare_status[i];
}
}
} else {
diff --git a/be/src/util/countdown_latch.h b/be/src/util/countdown_latch.h
index a41a417d20f..b27737e8bb1 100644
--- a/be/src/util/countdown_latch.h
+++ b/be/src/util/countdown_latch.h
@@ -91,6 +91,13 @@ public:
}
}
+ // decrements the internal counter by n and blocks the calling thread
until the counter reaches zero.
+ void arrive_and_wait(uint64_t n) {
+ DCHECK_GE(n, 0);
+ count_down(n);
+ wait();
+ }
+
uint64_t count() const {
std::lock_guard<std::mutex> lock(_lock);
return _count;
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]