This is an automated email from the ASF dual-hosted git repository.
panxiaolei pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new 2b9a1a5523b [Bug](pipeline) fix wake up early without terminate call
(#61679)
2b9a1a5523b is described below
commit 2b9a1a5523ba837f28393ba96c4416acd6dd9239
Author: Pxl <[email protected]>
AuthorDate: Thu Mar 26 16:02:33 2026 +0800
[Bug](pipeline) fix wake up early without terminate call (#61679)
```
Thread A (正在执行 HashJoin Build Task) Thread B (下游 pipeline 全部完成)
────────────────────────────────────────
──────────────────────────────────
Defer 开始执行:
line 475: 读取 _wake_up_early → false
decrement_running_task()
触发
make_all_runnable():
line 127:
set_wake_up_early() → true
line 132: terminate()
→
finish_dep.set_always_ready()
line 481: else if (_eos && !_spilling &&
!_is_pending_finish())
_is_pending_finish() = false ← 因为 always_ready!
line 483: *done = true
← 注意: _sink->terminate() 从未被调用!
close_task():
task->close(OK):
```
This pull request addresses a subtle race condition in the pipeline task
execution logic and adds a targeted test to verify the fix. The main
improvement ensures that operator termination is reliably triggered even
in the presence of concurrent state changes, preventing operators from
being left in an inconsistent state. Additionally, the pull request
introduces a debug point for precise testing and includes minor test
code cleanups.
**Race condition fix and test coverage:**
* Fixed a race condition in `PipelineTask::execute()` by reordering the
logic to ensure `terminate()` is always called if required, even when
another thread updates task state between checks. Added a debug point to
simulate the race for testing.
* Added a new test `TEST_TERMINATE_RACE_FIX` in `pipeline_task_test.cpp`
that uses the debug point to reliably reproduce and verify the race
condition fix, ensuring operator termination is not skipped.
**Test infrastructure and cleanup:**
* Included `debug_points.h` and `common/config.h` in
`pipeline_task_test.cpp` to support debug point injection and
configuration toggling for the new test.
[[1]](diffhunk://#diff-262afd1bf43b83333335fec0b00b65ab0b0241315fd3ceb98c5b3d568971052fR21)
[[2]](diffhunk://#diff-262afd1bf43b83333335fec0b00b65ab0b0241315fd3ceb98c5b3d568971052fR36)
* Minor formatting cleanup in an existing test case for readability.
---
be/src/exec/pipeline/pipeline_task.cpp | 32 +++++++++-
be/test/exec/pipeline/pipeline_task_test.cpp | 91 ++++++++++++++++++++++++++++
2 files changed, 120 insertions(+), 3 deletions(-)
diff --git a/be/src/exec/pipeline/pipeline_task.cpp
b/be/src/exec/pipeline/pipeline_task.cpp
index ffd1817683f..1b82530ebb8 100644
--- a/be/src/exec/pipeline/pipeline_task.cpp
+++ b/be/src/exec/pipeline/pipeline_task.cpp
@@ -473,15 +473,41 @@ Status PipelineTask::execute(bool* done) {
// If task is woke up early, we should terminate all operators, and
this task could be closed immediately.
if (_wake_up_early) {
- terminate();
- THROW_IF_ERROR(_root->terminate(_state));
- THROW_IF_ERROR(_sink->terminate(_state));
_eos = true;
*done = true;
} else if (_eos && !_spilling &&
(fragment_context->is_canceled() || !_is_pending_finish()))
{
+ // Debug point for testing the race condition fix: inject
set_wake_up_early() +
+ // terminate() here to simulate Thread B writing A then B between
Thread A's two
+ // reads of _wake_up_early.
+ DBUG_EXECUTE_IF("PipelineTask::execute.wake_up_early_in_else_if", {
+ set_wake_up_early();
+ terminate();
+ });
*done = true;
}
+
+ // NOTE: The terminate() call is intentionally placed AFTER the
_is_pending_finish() check
+ // above, not before. This ordering is critical to avoid a race
condition:
+ //
+ // Pipeline::make_all_runnable() writes in this order:
+ // (A) set_wake_up_early() -> (B) terminate() [sets
finish_dep._always_ready]
+ //
+ // If we checked _wake_up_early (A) before _is_pending_finish() (B),
there would be a
+ // window where Thread A reads _wake_up_early=false, then Thread B
writes both A and B,
+ // then Thread A reads _is_pending_finish()=false (due to
_always_ready). Thread A would
+ // then set *done=true without ever calling operator terminate(),
causing close() to run
+ // on operators that were never properly terminated (e.g.
RuntimeFilterProducer still in
+ // WAITING_FOR_SYNCED_SIZE state when insert() is called).
+ //
+ // By reading _is_pending_finish() (B) before the second read of
_wake_up_early (A),
+ // if Thread A observes B's effect (_always_ready=true), it is
guaranteed to also observe
+ // A's effect (_wake_up_early=true) on this second read, ensuring
terminate() is called.
+ if (_wake_up_early) {
+ terminate();
+ THROW_IF_ERROR(_root->terminate(_state));
+ THROW_IF_ERROR(_sink->terminate(_state));
+ }
}};
const auto query_id = _state->query_id();
// If this task is already EOS and block is empty (which means we already
output all blocks),
diff --git a/be/test/exec/pipeline/pipeline_task_test.cpp
b/be/test/exec/pipeline/pipeline_task_test.cpp
index b47f00af89f..db61819c4da 100644
--- a/be/test/exec/pipeline/pipeline_task_test.cpp
+++ b/be/test/exec/pipeline/pipeline_task_test.cpp
@@ -18,6 +18,7 @@
#include <glog/logging.h>
#include <gtest/gtest.h>
+#include "common/config.h"
#include "common/status.h"
#include "exec/operator/operator.h"
#include "exec/operator/spill_utils.h"
@@ -32,6 +33,7 @@
#include "testutil/mock/mock_runtime_state.h"
#include "testutil/mock/mock_thread_mem_tracker_mgr.h"
#include "testutil/mock/mock_workload_group_mgr.h"
+#include "util/debug_points.h"
namespace doris {
@@ -1534,4 +1536,93 @@ TEST_F(PipelineTaskTest, TEST_REVOKE_MEMORY) {
}
}
+// Test for the race condition fix between _wake_up_early and
_is_pending_finish().
+//
+// The race: Pipeline::make_all_runnable() writes in order (A)
set_wake_up_early -> (B) terminate()
+// [sets finish_dep._always_ready]. In execute()'s Defer block, if Thread A
reads _wake_up_early=false
+// (A), then Thread B writes A and B, then Thread A reads
_is_pending_finish()=false (due to
+// _always_ready from B), Thread A would set *done=true without calling
operator terminate().
+//
+// The fix: terminate() is called after _is_pending_finish() in the Defer. So
if Thread A sees B's
+// effect (_always_ready=true), it must also see A's effect
(_wake_up_early=true) on the subsequent
+// read, ensuring terminate() is always called.
+//
+// This test uses a debug point injected into the else-if branch to simulate
the exact bad timing:
+// the debug point fires set_wake_up_early() + terminate() after
_is_pending_finish() returns false
+// (due to finish_dep being naturally unblocked) but before the second
_wake_up_early check.
+TEST_F(PipelineTaskTest, TEST_TERMINATE_RACE_FIX) {
+ auto num_instances = 1;
+ auto pip_id = 0;
+ auto task_id = 0;
+ auto pip = std::make_shared<Pipeline>(pip_id, num_instances,
num_instances);
+ {
+ OperatorPtr source_op;
+ source_op.reset(new DummyOperator());
+ EXPECT_TRUE(pip->add_operator(source_op, num_instances).ok());
+
+ int op_id = 1;
+ int node_id = 2;
+ int dest_id = 3;
+ DataSinkOperatorPtr sink_op;
+ sink_op.reset(new DummySinkOperatorX(op_id, node_id, dest_id));
+ EXPECT_TRUE(pip->set_sink(sink_op).ok());
+ }
+ auto profile = std::make_shared<RuntimeProfile>("Pipeline : " +
std::to_string(pip_id));
+ std::map<int,
+ std::pair<std::shared_ptr<BasicSharedState>,
std::vector<std::shared_ptr<Dependency>>>>
+ shared_state_map;
+ _runtime_state->resize_op_id_to_local_state(-1);
+ auto task = std::make_shared<PipelineTask>(pip, task_id,
_runtime_state.get(), _context,
+ profile.get(),
shared_state_map, task_id);
+ task->_exec_time_slice = 10'000'000'000ULL;
+ {
+ std::vector<TScanRangeParams> scan_range;
+ int sender_id = 0;
+ TDataSink tsink;
+ EXPECT_TRUE(task->prepare(scan_range, sender_id, tsink).ok());
+ }
+ _query_ctx->get_execution_dependency()->set_ready();
+
+ // Get the sink's finish dependency and block it to simulate a pending
async operation
+ // (e.g. runtime filter size sync RPC in flight).
+ auto* sink_finish_dep =
+
_runtime_state->get_sink_local_state()->cast<DummySinkLocalState>().finishdependency();
+ EXPECT_NE(sink_finish_dep, nullptr);
+ sink_finish_dep->block();
+
+ // Drive the task to EOS so it will enter the Defer's pending-finish check.
+ task->_operators.front()->cast<DummyOperator>()._eos = true;
+ {
+ bool done = false;
+ EXPECT_TRUE(task->execute(&done).ok());
+ // EOS reached but still blocked on finish dependency: not done yet.
+ EXPECT_TRUE(task->_eos);
+ EXPECT_FALSE(done);
+ EXPECT_FALSE(task->_wake_up_early);
+ }
+
+ // Now unblock the finish dependency (simulates the async op completing)
and activate the
+ // debug point. The debug point fires inside the else-if branch — after
_is_pending_finish()
+ // returns false but before the second _wake_up_early read — and calls
set_wake_up_early() +
+ // terminate(). This precisely reproduces the race where Thread B's writes
land between
+ // Thread A's two reads of _wake_up_early.
+ sink_finish_dep->set_ready();
+ config::enable_debug_points = true;
+
DebugPoints::instance()->add("PipelineTask::execute.wake_up_early_in_else_if");
+ {
+ bool done = false;
+ EXPECT_TRUE(task->execute(&done).ok());
+ EXPECT_TRUE(task->_eos);
+ EXPECT_TRUE(done);
+ // The key assertion: even though the task took the else-if path (not
the
+ // if(_wake_up_early) path), operator terminate() must have been
called because the
+ // second read of _wake_up_early correctly observed the value set by
the debug point.
+ EXPECT_TRUE(task->_wake_up_early);
+
EXPECT_TRUE(task->_operators.front()->cast<DummyOperator>()._terminated);
+ EXPECT_TRUE(task->_sink->cast<DummySinkOperatorX>()._terminated);
+ }
+ DebugPoints::instance()->clear();
+ config::enable_debug_points = false;
+}
+
} // namespace doris
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]