Copilot commented on code in PR #63539:
URL: https://github.com/apache/doris/pull/63539#discussion_r3287453644
##########
be/test/pipeline/pipeline_task_test.cpp:
##########
@@ -1186,4 +1188,81 @@ TEST_F(PipelineTaskTest, TEST_INJECT_SHARED_STATE) {
}
}
+// Test for the race condition fix between _wake_up_early and
_is_pending_finish().
+//
+// The race: Pipeline::make_all_runnable() writes in order (A)
set_wake_up_early -> (B) terminate()
+// [sets finish_dep._always_ready]. In execute()'s Defer block, if Thread A
reads _wake_up_early=false
+// (A), then Thread B writes A and B, then Thread A reads
_is_pending_finish()=false (due to
+// _always_ready from B), Thread A would set *done=true without calling
operator terminate().
+//
+// The fix: terminate() is called after _is_pending_finish() in the Defer. So
if Thread A sees B's
+// effect (_always_ready=true), it must also see A's effect
(_wake_up_early=true) on the subsequent
+// read, ensuring terminate() is always called.
+//
+// This test uses a debug point injected into the else-if branch to simulate
the exact bad timing:
+// the debug point fires set_wake_up_early() + terminate() after
_is_pending_finish() returns false
+// (due to finish_dep being naturally unblocked) but before the second
_wake_up_early check.
+TEST_F(PipelineTaskTest, TEST_TERMINATE_RACE_FIX) {
+ auto num_instances = 1;
+ auto pip_id = 0;
+ auto task_id = 0;
+ auto pip = std::make_shared<Pipeline>(pip_id, num_instances,
num_instances);
+ {
+ OperatorPtr source_op;
+ source_op.reset(new DummyOperator());
+ EXPECT_TRUE(pip->add_operator(source_op, num_instances).ok());
+
+ int op_id = 1;
+ int node_id = 2;
+ int dest_id = 3;
+ DataSinkOperatorPtr sink_op;
+ sink_op.reset(new DummySinkOperatorX(op_id, node_id, dest_id));
+ EXPECT_TRUE(pip->set_sink(sink_op).ok());
+ }
+ auto profile = std::make_shared<RuntimeProfile>("Pipeline : " +
std::to_string(pip_id));
+ std::map<int,
+ std::pair<std::shared_ptr<BasicSharedState>,
std::vector<std::shared_ptr<Dependency>>>>
+ shared_state_map;
+ _runtime_state->resize_op_id_to_local_state(-1);
+ auto task = std::make_shared<PipelineTask>(pip, task_id,
_runtime_state.get(), _context,
+ profile.get(),
shared_state_map, task_id);
+ task->_exec_time_slice = 10'000'000'000ULL;
+ {
+ std::vector<TScanRangeParams> scan_range;
+ int sender_id = 0;
+ TDataSink tsink;
+ EXPECT_TRUE(task->prepare(scan_range, sender_id, tsink).ok());
+ }
+ _query_ctx->get_execution_dependency()->set_ready();
+
+ auto* sink_finish_dep =
+
_runtime_state->get_sink_local_state()->cast<DummySinkLocalState>().finishdependency();
+ EXPECT_NE(sink_finish_dep, nullptr);
+ sink_finish_dep->block();
+
+ task->_operators.front()->cast<DummyOperator>()._eos = true;
+ {
+ bool done = false;
+ EXPECT_TRUE(task->execute(&done).ok());
+ EXPECT_TRUE(task->_eos);
+ EXPECT_FALSE(done);
+ EXPECT_FALSE(task->_wake_up_early);
+ }
+
+ sink_finish_dep->set_ready();
+ config::enable_debug_points = true;
+
DebugPoints::instance()->add("PipelineTask::execute.wake_up_early_in_else_if");
+ {
+ bool done = false;
+ EXPECT_TRUE(task->execute(&done).ok());
+ EXPECT_TRUE(task->_eos);
+ EXPECT_TRUE(done);
+ EXPECT_TRUE(task->_wake_up_early);
+
EXPECT_TRUE(task->_operators.front()->cast<DummyOperator>()._terminated);
+ EXPECT_TRUE(task->_sink->cast<DummySinkOperatorX>()._terminated);
+ }
+ DebugPoints::instance()->clear();
+ config::enable_debug_points = false;
Review Comment:
This test mutates global debug-point state but doesn’t restore the previous
values: it unconditionally sets `config::enable_debug_points` to true/false and
calls `DebugPoints::instance()->clear()` (which removes *all* debug points). If
another test (or suite-level setup) had debug points enabled or had other
active debug points, this test would silently change global state and could
cause unrelated flakes. Prefer saving the original
`config::enable_debug_points` value and restoring it via RAII/Defer, and remove
only the specific debug point you add (e.g.
`remove("PipelineTask::execute.wake_up_early_in_else_if")`) instead of
`clear()`.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]