This is an automated email from the ASF dual-hosted git repository.
panxiaolei pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new a18d2bfa22e [Bug](cte) set rec cte source dep ready after exchange
reset (#59768)
a18d2bfa22e is described below
commit a18d2bfa22ee6dd869643e1dd7fa3151865f6d86
Author: Pxl <[email protected]>
AuthorDate: Wed Jan 14 13:37:17 2026 +0800
[Bug](cte) set rec cte source dep ready after exchange reset (#59768)
This pull request addresses the handling of the `set_ready()` call for
the `RecCTESharedState` in the pipeline execution logic. The main change
is to ensure that the readiness signal is set at the correct point in
the pipeline, specifically after the exchange source has been reset, to
prevent potential deadlocks.
Key changes:
**Pipeline execution logic:**
* Moved the call to `set_ready()` for `RecCTESharedState` from the
`RecCTESinkOperatorX` to the pipeline task execution, ensuring it is
invoked after the exchange source reset to avoid blocking issues.
[[1]](diffhunk://#diff-00c79e5e3bf6d92b947372a45d5ad4df30ac275c831ffb7b634f233a9fb2229eL86-L89)
[[2]](diffhunk://#diff-dd9f3f47f0ec9d346d560dc86f3c0c518f21d5e512e62b623b5db48bddf04ebbR610-R620)
---
be/src/pipeline/exec/exchange_source_operator.h | 2 +-
be/src/pipeline/exec/operator.h | 7 ++++++-
be/src/pipeline/exec/rec_cte_sink_operator.h | 16 ++++++++++------
be/src/pipeline/exec/sort_sink_operator.cpp | 3 ++-
be/src/pipeline/exec/sort_sink_operator.h | 2 +-
be/src/pipeline/exec/spill_sort_sink_operator.cpp | 2 +-
be/src/pipeline/pipeline_task.cpp | 18 ++++--------------
7 files changed, 25 insertions(+), 25 deletions(-)
diff --git a/be/src/pipeline/exec/exchange_source_operator.h
b/be/src/pipeline/exec/exchange_source_operator.h
index 03f2a288cdf..b11e5bef6a0 100644
--- a/be/src/pipeline/exec/exchange_source_operator.h
+++ b/be/src/pipeline/exec/exchange_source_operator.h
@@ -100,7 +100,7 @@ public:
Status init(const TPlanNode& tnode, RuntimeState* state) override;
Status prepare(RuntimeState* state) override;
- Status reset(RuntimeState* state);
+ Status reset(RuntimeState* state) override;
Status get_block(RuntimeState* state, vectorized::Block* block, bool* eos)
override;
diff --git a/be/src/pipeline/exec/operator.h b/be/src/pipeline/exec/operator.h
index e7fa2c4ca47..1b6afecd0e6 100644
--- a/be/src/pipeline/exec/operator.h
+++ b/be/src/pipeline/exec/operator.h
@@ -29,6 +29,7 @@
#include <vector>
#include "common/be_mock_util.h"
+#include "common/exception.h"
#include "common/logging.h"
#include "common/status.h"
#include "pipeline/dependency.h"
@@ -158,6 +159,10 @@ public:
RuntimeState* /*state*/) const;
[[nodiscard]] virtual bool
require_shuffled_data_distribution(RuntimeState* /*state*/) const;
+ virtual Status reset(RuntimeState* state) {
+ return Status::InternalError("Reset is not implemented in operator:
{}", get_name());
+ }
+
protected:
OperatorPtr _child = nullptr;
@@ -613,7 +618,7 @@ public:
// For agg/sort/join sink.
virtual Status init(const TPlanNode& tnode, RuntimeState* state);
- virtual bool need_rerun(RuntimeState* state) const { return false; }
+ virtual bool reset_to_rerun(RuntimeState* state, OperatorXBase* root)
const { return false; }
Status init(const TDataSink& tsink) override;
[[nodiscard]] virtual Status init(RuntimeState* state, ExchangeType type,
const int num_buckets,
diff --git a/be/src/pipeline/exec/rec_cte_sink_operator.h
b/be/src/pipeline/exec/rec_cte_sink_operator.h
index 01ccccd41b4..e4d6022758c 100644
--- a/be/src/pipeline/exec/rec_cte_sink_operator.h
+++ b/be/src/pipeline/exec/rec_cte_sink_operator.h
@@ -62,8 +62,16 @@ public:
Status init(const TPlanNode& tnode, RuntimeState* state) override;
Status prepare(RuntimeState* state) override;
- bool need_rerun(RuntimeState* state) const override {
- return get_local_state(state)._shared_state->ready_to_return == false;
+ bool reset_to_rerun(RuntimeState* state, OperatorXBase* root) const
override {
+ auto* shared_state = get_local_state(state)._shared_state;
+ if (shared_state->ready_to_return == false) {
+ THROW_IF_ERROR(root->reset(state));
+ // must set_ready after root(exchange) reset
+ // if next round executed before exchange source reset, it maybe
cant find receiver and cause blocked forever
+ shared_state->source_dep->set_ready();
+ return true;
+ }
+ return false;
}
std::shared_ptr<BasicSharedState> create_shared_state() const override {
return nullptr; }
@@ -83,10 +91,6 @@ public:
RETURN_IF_ERROR(materialize_block(local_state._child_expr,
input_block, &block, true));
RETURN_IF_ERROR(local_state._shared_state->emplace_block(state,
std::move(block)));
}
-
- if (eos) {
- local_state._shared_state->source_dep->set_ready();
- }
return Status::OK();
}
diff --git a/be/src/pipeline/exec/sort_sink_operator.cpp
b/be/src/pipeline/exec/sort_sink_operator.cpp
index 10f89c38722..b85373f7663 100644
--- a/be/src/pipeline/exec/sort_sink_operator.cpp
+++ b/be/src/pipeline/exec/sort_sink_operator.cpp
@@ -189,9 +189,10 @@ Status
SortSinkOperatorX::merge_sort_read_for_spill(RuntimeState* state,
return local_state._shared_state->sorter->merge_sort_read_for_spill(state,
block, batch_size,
eos);
}
-void SortSinkOperatorX::reset(RuntimeState* state) {
+Status SortSinkOperatorX::reset(RuntimeState* state) {
auto& local_state = get_local_state(state);
local_state._shared_state->sorter->reset();
+ return Status::OK();
}
#include "common/compile_check_end.h"
} // namespace doris::pipeline
diff --git a/be/src/pipeline/exec/sort_sink_operator.h
b/be/src/pipeline/exec/sort_sink_operator.h
index e6c8f3183cc..b520dbb3111 100644
--- a/be/src/pipeline/exec/sort_sink_operator.h
+++ b/be/src/pipeline/exec/sort_sink_operator.h
@@ -99,7 +99,7 @@ public:
Status merge_sort_read_for_spill(RuntimeState* state,
doris::vectorized::Block* block,
int batch_size, bool* eos);
- void reset(RuntimeState* state);
+ Status reset(RuntimeState* state) override;
int64_t limit() const { return _limit; }
int64_t offset() const { return _offset; }
diff --git a/be/src/pipeline/exec/spill_sort_sink_operator.cpp
b/be/src/pipeline/exec/spill_sort_sink_operator.cpp
index 1880f1d2e76..9669f508d7a 100644
--- a/be/src/pipeline/exec/spill_sort_sink_operator.cpp
+++ b/be/src/pipeline/exec/spill_sort_sink_operator.cpp
@@ -230,7 +230,7 @@ Status
SpillSortSinkLocalState::_execute_spill_sort(RuntimeState* state, TUnique
RETURN_IF_ERROR(status);
block.clear_column_data();
}
- parent._sort_sink_operator->reset(_runtime_state.get());
+ RETURN_IF_ERROR(parent._sort_sink_operator->reset(_runtime_state.get()));
return Status::OK();
}
diff --git a/be/src/pipeline/pipeline_task.cpp
b/be/src/pipeline/pipeline_task.cpp
index 27f0b1af29f..7e5ada8881a 100644
--- a/be/src/pipeline/pipeline_task.cpp
+++ b/be/src/pipeline/pipeline_task.cpp
@@ -554,10 +554,6 @@ Status PipelineTask::execute(bool* done) {
}
}
- if (_eos && !_sink->need_rerun(_state)) {
- RETURN_IF_ERROR(close(Status::OK(), false));
- }
-
DBUG_EXECUTE_IF("PipelineTask::execute.sink_eos_sleep", {
auto required_pipeline_id =
DebugPoints::instance()->get_debug_param_or_default<int32_t>(
@@ -597,16 +593,10 @@ Status PipelineTask::execute(bool* done) {
status = _sink->sink(_state, block, _eos);
if (_eos) {
- if (_sink->need_rerun(_state)) {
- if (auto* source =
dynamic_cast<ExchangeSourceOperatorX*>(_root);
- source != nullptr) {
- RETURN_IF_ERROR(source->reset(_state));
- _eos = false;
- } else {
- return Status::InternalError(
- "Only ExchangeSourceOperatorX can be rerun,
real is {}",
- _root->get_name());
- }
+ if (_sink->reset_to_rerun(_state, _root)) {
+ _eos = false;
+ } else {
+ RETURN_IF_ERROR(close(Status::OK(), false));
}
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]