This is an automated email from the ASF dual-hosted git repository.
yiguolei pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new 7037abc6231 [refactor](close)Full refactor async writer (#30082)
7037abc6231 is described below
commit 7037abc6231b2bba7c4ac3e2d4b34ca8a09ec061
Author: yiguolei <[email protected]>
AuthorDate: Tue Jan 23 13:21:17 2024 +0800
[refactor](close)Full refactor async writer (#30082)
---------
Co-authored-by: yiguolei <[email protected]>
---
be/src/exec/data_sink.h | 4 --
be/src/exec/exec_node.h | 4 --
be/src/pipeline/exec/exchange_sink_buffer.cpp | 53 ++++++++++----
be/src/pipeline/exec/exchange_sink_buffer.h | 5 +-
be/src/pipeline/exec/exchange_sink_operator.cpp | 34 +++++----
be/src/pipeline/exec/exchange_sink_operator.h | 2 -
be/src/pipeline/exec/operator.h | 6 --
be/src/pipeline/pipeline_fragment_context.cpp | 7 +-
be/src/pipeline/pipeline_fragment_context.h | 1 -
be/src/pipeline/pipeline_task.cpp | 13 +---
be/src/pipeline/pipeline_task.h | 5 --
be/src/pipeline/pipeline_x/operator.cpp | 21 +++---
be/src/pipeline/pipeline_x/operator.h | 13 ----
.../pipeline_x/pipeline_x_fragment_context.cpp | 6 +-
be/src/pipeline/pipeline_x/pipeline_x_task.cpp | 15 +---
be/src/pipeline/pipeline_x/pipeline_x_task.h | 3 -
be/src/pipeline/task_scheduler.cpp | 82 ++++++++++++----------
be/src/pipeline/task_scheduler.h | 6 +-
be/src/runtime/task_execution_context.h | 1 +
be/src/vec/sink/async_writer_sink.h | 19 ++---
be/src/vec/sink/vdata_stream_sender.cpp | 22 +++---
be/src/vec/sink/vdata_stream_sender.h | 2 +-
be/src/vec/sink/writer/async_result_writer.cpp | 27 ++++---
be/src/vec/sink/writer/async_result_writer.h | 4 +-
24 files changed, 177 insertions(+), 178 deletions(-)
diff --git a/be/src/exec/data_sink.h b/be/src/exec/data_sink.h
index be6cfe236b7..c0b27e8ae90 100644
--- a/be/src/exec/data_sink.h
+++ b/be/src/exec/data_sink.h
@@ -70,10 +70,6 @@ public:
return send(state, block, eos);
}
- [[nodiscard]] virtual Status try_close(RuntimeState* state, Status
exec_status) {
- return Status::OK();
- }
-
[[nodiscard]] virtual bool is_pending_finish() const { return false; }
// Releases all resources that were allocated in prepare()/send().
diff --git a/be/src/exec/exec_node.h b/be/src/exec/exec_node.h
index 903122ecded..5a6b04667e7 100644
--- a/be/src/exec/exec_node.h
+++ b/be/src/exec/exec_node.h
@@ -232,10 +232,6 @@ public:
size_t children_count() const { return _children.size(); }
- // when the fragment is normal finished, call this method to do some
finish work
- // such as send the last buffer to remote.
- virtual Status try_close(RuntimeState* state) { return Status::OK(); }
-
std::shared_ptr<QueryStatistics> get_query_statistics() { return
_query_statistics; }
protected:
diff --git a/be/src/pipeline/exec/exchange_sink_buffer.cpp
b/be/src/pipeline/exec/exchange_sink_buffer.cpp
index 4484a34375b..9d240945be1 100644
--- a/be/src/pipeline/exec/exchange_sink_buffer.cpp
+++ b/be/src/pipeline/exec/exchange_sink_buffer.cpp
@@ -85,14 +85,15 @@ namespace pipeline {
template <typename Parent>
ExchangeSinkBuffer<Parent>::ExchangeSinkBuffer(PUniqueId query_id, PlanNodeId
dest_node_id,
- int send_id, int be_number,
QueryContext* context)
- : _queue_capacity(0),
+ int send_id, int be_number,
RuntimeState* state)
+ : HasTaskExecutionCtx(state),
+ _queue_capacity(0),
_is_finishing(false),
_query_id(query_id),
_dest_node_id(dest_node_id),
_sender_id(send_id),
_be_number(be_number),
- _context(context) {}
+ _context(state->get_query_ctx()) {}
template <typename Parent>
ExchangeSinkBuffer<Parent>::~ExchangeSinkBuffer() = default;
@@ -270,12 +271,25 @@ Status ExchangeSinkBuffer<Parent>::_send_rpc(InstanceLoId
id) {
if (config::exchange_sink_ignore_eovercrowded) {
send_callback->cntl_->ignore_eovercrowded();
}
- send_callback->addFailedHandler(
- [&](const InstanceLoId& id, const std::string& err) {
_failed(id, err); });
+ send_callback->addFailedHandler([&, weak_task_ctx =
weak_task_exec_ctx()](
+ const InstanceLoId& id, const
std::string& err) {
+ auto task_lock = weak_task_ctx.lock();
+ if (task_lock == nullptr) {
+ // This means ExchangeSinkBuffer Ojbect already destroyed, not
need run failed any more.
+ return;
+ }
+ _failed(id, err);
+ });
send_callback->start_rpc_time = GetCurrentTimeNanos();
- send_callback->addSuccessHandler([&](const InstanceLoId& id, const
bool& eos,
- const PTransmitDataResult& result,
- const int64_t& start_rpc_time) {
+ send_callback->addSuccessHandler([&, weak_task_ctx =
weak_task_exec_ctx()](
+ const InstanceLoId& id, const
bool& eos,
+ const PTransmitDataResult&
result,
+ const int64_t&
start_rpc_time) {
+ auto task_lock = weak_task_ctx.lock();
+ if (task_lock == nullptr) {
+ // This means ExchangeSinkBuffer Ojbect already destroyed, not
need run failed any more.
+ return;
+ }
set_rpc_time(id, start_rpc_time, result.receive_time());
Status s(Status::create(result.status()));
if (s.is<ErrorCode::END_OF_FILE>()) {
@@ -332,12 +346,25 @@ Status ExchangeSinkBuffer<Parent>::_send_rpc(InstanceLoId
id) {
if (config::exchange_sink_ignore_eovercrowded) {
send_callback->cntl_->ignore_eovercrowded();
}
- send_callback->addFailedHandler(
- [&](const InstanceLoId& id, const std::string& err) {
_failed(id, err); });
+ send_callback->addFailedHandler([&, weak_task_ctx =
weak_task_exec_ctx()](
+ const InstanceLoId& id, const
std::string& err) {
+ auto task_lock = weak_task_ctx.lock();
+ if (task_lock == nullptr) {
+ // This means ExchangeSinkBuffer Ojbect already destroyed, not
need run failed any more.
+ return;
+ }
+ _failed(id, err);
+ });
send_callback->start_rpc_time = GetCurrentTimeNanos();
- send_callback->addSuccessHandler([&](const InstanceLoId& id, const
bool& eos,
- const PTransmitDataResult& result,
- const int64_t& start_rpc_time) {
+ send_callback->addSuccessHandler([&, weak_task_ctx =
weak_task_exec_ctx()](
+ const InstanceLoId& id, const
bool& eos,
+ const PTransmitDataResult&
result,
+ const int64_t&
start_rpc_time) {
+ auto task_lock = weak_task_ctx.lock();
+ if (task_lock == nullptr) {
+ // This means ExchangeSinkBuffer Ojbect already destroyed, not
need run failed any more.
+ return;
+ }
set_rpc_time(id, start_rpc_time, result.receive_time());
Status s(Status::create(result.status()));
if (s.is<ErrorCode::END_OF_FILE>()) {
diff --git a/be/src/pipeline/exec/exchange_sink_buffer.h
b/be/src/pipeline/exec/exchange_sink_buffer.h
index a11b637f4d4..f0b55d528ec 100644
--- a/be/src/pipeline/exec/exchange_sink_buffer.h
+++ b/be/src/pipeline/exec/exchange_sink_buffer.h
@@ -196,9 +196,10 @@ struct ExchangeRpcContext {
// Each ExchangeSinkOperator have one ExchangeSinkBuffer
template <typename Parent>
-class ExchangeSinkBuffer {
+class ExchangeSinkBuffer : public HasTaskExecutionCtx {
public:
- ExchangeSinkBuffer(PUniqueId, int, PlanNodeId, int, QueryContext*);
+ ExchangeSinkBuffer(PUniqueId query_id, PlanNodeId dest_node_id, int
send_id, int be_number,
+ RuntimeState* state);
~ExchangeSinkBuffer();
void register_sink(TUniqueId);
diff --git a/be/src/pipeline/exec/exchange_sink_operator.cpp
b/be/src/pipeline/exec/exchange_sink_operator.cpp
index aa29bb4cf0f..1f9ba3b4203 100644
--- a/be/src/pipeline/exec/exchange_sink_operator.cpp
+++ b/be/src/pipeline/exec/exchange_sink_operator.cpp
@@ -65,7 +65,7 @@ Status ExchangeSinkOperator::prepare(RuntimeState* state) {
id.set_hi(_state->query_id().hi);
id.set_lo(_state->query_id().lo);
_sink_buffer =
std::make_unique<ExchangeSinkBuffer<vectorized::VDataStreamSender>>(
- id, _dest_node_id, _sink->_sender_id, _state->be_number(),
state->get_query_ctx());
+ id, _dest_node_id, _sink->_sender_id, _state->be_number(), state);
RETURN_IF_ERROR(DataSinkOperator::prepare(state));
_sink->register_pipeline_channels(_sink_buffer.get());
@@ -168,7 +168,7 @@ Status ExchangeSinkLocalState::init(RuntimeState* state,
LocalSinkStateInfo& inf
id.set_hi(_state->query_id().hi);
id.set_lo(_state->query_id().lo);
_sink_buffer =
std::make_unique<ExchangeSinkBuffer<ExchangeSinkLocalState>>(
- id, p._dest_node_id, _sender_id, _state->be_number(),
state->get_query_ctx());
+ id, p._dest_node_id, _sender_id, _state->be_number(), state);
register_channels(_sink_buffer.get());
auto* _exchange_sink_dependency = _dependency;
@@ -414,7 +414,20 @@ Status ExchangeSinkOperatorX::sink(RuntimeState* state,
vectorized::Block* block
// 1. calculate range
// 2. dispatch rows to channel
}
- return Status::OK();
+
+ Status final_st = Status::OK();
+ if (source_state == SourceState::FINISHED) {
+ local_state._serializer.reset_block();
+ for (int i = 0; i < local_state.channels.size(); ++i) {
+ Status st = local_state.channels[i]->close(state, Status::OK());
+ if (!st.ok() && final_st.ok()) {
+ final_st = st;
+ }
+ }
+ local_state._sink_buffer->set_should_stop();
+ return final_st;
+ }
+ return final_st;
}
Status ExchangeSinkOperatorX::serialize_block(ExchangeSinkLocalState& state,
vectorized::Block* src,
@@ -488,21 +501,6 @@ Status
ExchangeSinkOperatorX::channel_add_rows(RuntimeState* state, Channels& ch
return Status::OK();
}
-Status ExchangeSinkOperatorX::try_close(RuntimeState* state, Status
exec_status) {
- auto& local_state = get_local_state(state);
- local_state._serializer.reset_block();
- Status final_st = Status::OK();
- Status final_status = exec_status;
- for (int i = 0; i < local_state.channels.size(); ++i) {
- Status st = local_state.channels[i]->close(state, exec_status);
- if (!st.ok() && final_st.ok()) {
- final_st = st;
- }
- }
- local_state._sink_buffer->set_should_stop();
- return final_st;
-}
-
std::string ExchangeSinkLocalState::debug_string(int indentation_level) const {
fmt::memory_buffer debug_string_buffer;
fmt::format_to(debug_string_buffer, "{}",
Base::debug_string(indentation_level));
diff --git a/be/src/pipeline/exec/exchange_sink_operator.h
b/be/src/pipeline/exec/exchange_sink_operator.h
index 6d1d1b6a4fe..3e6486e34fd 100644
--- a/be/src/pipeline/exec/exchange_sink_operator.h
+++ b/be/src/pipeline/exec/exchange_sink_operator.h
@@ -217,8 +217,6 @@ public:
Status serialize_block(ExchangeSinkLocalState& stete, vectorized::Block*
src, PBlock* dest,
int num_receivers = 1);
- Status try_close(RuntimeState* state, Status exec_status) override;
-
private:
friend class ExchangeSinkLocalState;
diff --git a/be/src/pipeline/exec/operator.h b/be/src/pipeline/exec/operator.h
index bf41c670e0c..8755872a979 100644
--- a/be/src/pipeline/exec/operator.h
+++ b/be/src/pipeline/exec/operator.h
@@ -239,8 +239,6 @@ public:
*/
virtual bool is_pending_finish() const { return false; }
- virtual Status try_close(RuntimeState* state) { return Status::OK(); }
-
bool is_closed() const { return _is_closed; }
const OperatorBuilderBase* operator_builder() const { return
_operator_builder; }
@@ -289,10 +287,6 @@ public:
return Status::OK();
}
- Status try_close(RuntimeState* state) override {
- return _sink->try_close(state, state->query_status());
- }
-
[[nodiscard]] bool is_pending_finish() const override { return
_sink->is_pending_finish(); }
Status close(RuntimeState* state) override {
diff --git a/be/src/pipeline/pipeline_fragment_context.cpp
b/be/src/pipeline/pipeline_fragment_context.cpp
index 909039b23fb..2341071d963 100644
--- a/be/src/pipeline/pipeline_fragment_context.cpp
+++ b/be/src/pipeline/pipeline_fragment_context.cpp
@@ -163,13 +163,12 @@ void PipelineFragmentContext::cancel(const
PPlanFragmentCancelReason& reason,
// make result receiver on fe be stocked on rpc forever until timeout...
// We need a more detail discussion.
if (_query_ctx->cancel(true, msg, Status::Cancelled(msg))) {
- if (reason != PPlanFragmentCancelReason::LIMIT_REACH) {
+ if (reason == PPlanFragmentCancelReason::LIMIT_REACH) {
+ _is_report_on_cancel = false;
+ } else {
LOG(WARNING) << "PipelineFragmentContext "
<< PrintInstanceStandardInfo(_query_id,
_fragment_instance_id)
<< " is canceled, cancel message: " << msg;
-
- } else {
- _set_is_report_on_cancel(false); // TODO bug llj fix this not
projected by lock
}
_runtime_state->set_process_status(_query_ctx->exec_status());
diff --git a/be/src/pipeline/pipeline_fragment_context.h
b/be/src/pipeline/pipeline_fragment_context.h
index a7a45d8f07f..0ec27c5054f 100644
--- a/be/src/pipeline/pipeline_fragment_context.h
+++ b/be/src/pipeline/pipeline_fragment_context.h
@@ -158,7 +158,6 @@ protected:
Status _build_operators_for_set_operation_node(ExecNode*, PipelinePtr);
virtual void _close_fragment_instance();
void _init_next_report_time();
- void _set_is_report_on_cancel(bool val) { _is_report_on_cancel = val; }
// Id of this query
TUniqueId _query_id;
diff --git a/be/src/pipeline/pipeline_task.cpp
b/be/src/pipeline/pipeline_task.cpp
index d87c477a8db..9b72762ebf9 100644
--- a/be/src/pipeline/pipeline_task.cpp
+++ b/be/src/pipeline/pipeline_task.cpp
@@ -301,17 +301,8 @@ Status PipelineTask::execute(bool* eos) {
_finish_p_dependency();
}
- return Status::OK();
-}
-
-Status PipelineTask::try_close(Status exec_status) {
- if (_try_close_flag) {
- return Status::OK();
- }
- _try_close_flag = true;
- Status status1 = _sink->try_close(_state);
- Status status2 = _source->try_close(_state);
- return status1.ok() ? status2 : status1;
+ // If the status is eof(sink node will return eof if downstream fragment
finished), then return it.
+ return status;
}
Status PipelineTask::close(Status exec_status) {
diff --git a/be/src/pipeline/pipeline_task.h b/be/src/pipeline/pipeline_task.h
index 56e42370ff2..dd512293e05 100644
--- a/be/src/pipeline/pipeline_task.h
+++ b/be/src/pipeline/pipeline_task.h
@@ -132,9 +132,6 @@ public:
virtual Status execute(bool* eos);
- // Try to close this pipeline task. If there are still some resources need
to be released after `try_close`,
- // this task will enter the `PENDING_FINISH` state.
- virtual Status try_close(Status exec_status);
// if the pipeline create a bunch of pipeline task
// must be call after all pipeline task is finish to release resource
virtual Status close(Status exec_status);
@@ -328,8 +325,6 @@ protected:
int _queue_level = 0;
int _core_id = 0;
- bool _try_close_flag = false;
-
RuntimeProfile* _parent_profile = nullptr;
std::unique_ptr<RuntimeProfile> _task_profile;
RuntimeProfile::Counter* _task_cpu_timer = nullptr;
diff --git a/be/src/pipeline/pipeline_x/operator.cpp
b/be/src/pipeline/pipeline_x/operator.cpp
index e00b1632eb4..a59f4ced6de 100644
--- a/be/src/pipeline/pipeline_x/operator.cpp
+++ b/be/src/pipeline/pipeline_x/operator.cpp
@@ -544,20 +544,21 @@ Status AsyncWriterSink<Writer,
Parent>::close(RuntimeState* state, Status exec_s
COUNTER_SET(_wait_for_finish_dependency_timer,
_finish_dependency->watcher_elapse_time());
// if the init failed, the _writer may be nullptr. so here need check
if (_writer) {
- RETURN_IF_ERROR(_writer->get_writer_status());
+ Status st = _writer->get_writer_status();
+ if (exec_status.ok()) {
+ _writer->force_close(state->is_cancelled() ?
Status::Cancelled("Cancelled")
+ :
Status::Cancelled("force close"));
+ } else {
+ _writer->force_close(exec_status);
+ }
+ // If there is an error in process_block thread, then we should get
the writer
+ // status before call force_close. For example, the thread may failed
in commit
+ // transaction.
+ RETURN_IF_ERROR(st);
}
return Base::close(state, exec_status);
}
-template <typename Writer, typename Parent>
- requires(std::is_base_of_v<vectorized::AsyncResultWriter, Writer>)
-Status AsyncWriterSink<Writer, Parent>::try_close(RuntimeState* state, Status
exec_status) {
- if (state->is_cancelled() || !exec_status.ok()) {
- _writer->force_close(!exec_status.ok() ? exec_status :
Status::Cancelled("Cancelled"));
- }
- return Status::OK();
-}
-
#define DECLARE_OPERATOR_X(LOCAL_STATE) template class
DataSinkOperatorX<LOCAL_STATE>;
DECLARE_OPERATOR_X(HashJoinBuildSinkLocalState)
DECLARE_OPERATOR_X(ResultSinkLocalState)
diff --git a/be/src/pipeline/pipeline_x/operator.h
b/be/src/pipeline/pipeline_x/operator.h
index ca1b224c5c5..98feb60ac22 100644
--- a/be/src/pipeline/pipeline_x/operator.h
+++ b/be/src/pipeline/pipeline_x/operator.h
@@ -387,7 +387,6 @@ public:
// idempotent (e.g. wait for runtime filters).
virtual Status open(RuntimeState* state) = 0;
virtual Status close(RuntimeState* state, Status exec_status) = 0;
- virtual Status try_close(RuntimeState* state, Status exec_status) = 0;
[[nodiscard]] virtual std::string debug_string(int indentation_level)
const = 0;
@@ -512,10 +511,6 @@ public:
return Status::InternalError("Should not reach here!");
}
- Status try_close(RuntimeState* state) override {
- return Status::InternalError("Should not reach here!");
- }
-
bool can_read() override {
LOG(FATAL) << "should not reach here!";
return false;
@@ -546,10 +541,6 @@ public:
return state->get_sink_local_state(operator_id())->close(state,
exec_status);
}
- [[nodiscard]] virtual Status try_close(RuntimeState* state, Status
exec_status) {
- return state->get_sink_local_state(operator_id())->try_close(state,
exec_status);
- }
-
[[nodiscard]] RuntimeProfile* get_runtime_profile() const override {
throw doris::Exception(ErrorCode::NOT_IMPLEMENTED_ERROR,
"Runtime Profile is not owned by operator");
@@ -620,8 +611,6 @@ public:
Status open(RuntimeState* state) override { return Status::OK(); }
- Status try_close(RuntimeState* state, Status exec_status) override {
return Status::OK(); }
-
Status close(RuntimeState* state, Status exec_status) override;
[[nodiscard]] std::string debug_string(int indentation_level) const
override;
@@ -704,8 +693,6 @@ public:
Dependency* dependency() override { return _async_writer_dependency.get();
}
Status close(RuntimeState* state, Status exec_status) override;
- Status try_close(RuntimeState* state, Status exec_status) override;
-
Dependency* finishdependency() override { return _finish_dependency.get();
}
protected:
diff --git a/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.cpp
b/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.cpp
index cf4c312926e..aa580b49d48 100644
--- a/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.cpp
+++ b/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.cpp
@@ -133,12 +133,12 @@ void PipelineXFragmentContext::cancel(const
PPlanFragmentCancelReason& reason,
LOG(WARNING) << "PipelineXFragmentContext is cancelled due to timeout
: " << debug_string();
}
if (_query_ctx->cancel(true, msg, Status::Cancelled(msg), _fragment_id)) {
- if (reason != PPlanFragmentCancelReason::LIMIT_REACH) {
+ if (reason == PPlanFragmentCancelReason::LIMIT_REACH) {
+ _is_report_on_cancel = false;
+ } else {
for (auto& id : _fragment_instance_ids) {
LOG(WARNING) << "PipelineXFragmentContext cancel instance: "
<< print_id(id);
}
- } else {
- _set_is_report_on_cancel(false); // TODO bug llj
}
// Get pipe from new load stream manager and send cancel to it or the
fragment may hang to wait read from pipe
// For stream load the fragment's query_id == load id, it is set in FE.
diff --git a/be/src/pipeline/pipeline_x/pipeline_x_task.cpp
b/be/src/pipeline/pipeline_x/pipeline_x_task.cpp
index 1b1c04eb814..c44570eb7c6 100644
--- a/be/src/pipeline/pipeline_x/pipeline_x_task.cpp
+++ b/be/src/pipeline/pipeline_x/pipeline_x_task.cpp
@@ -256,6 +256,7 @@ Status PipelineXTask::execute(bool* eos) {
}
}
+ Status status = Status::OK();
set_begin_execute_time();
while (!_fragment_context->is_canceled()) {
if (_data_state != SourceState::MORE_DATA && !source_can_read()) {
@@ -287,7 +288,7 @@ Status PipelineXTask::execute(bool* eos) {
*eos = _data_state == SourceState::FINISHED;
if (_block->rows() != 0 || *eos) {
SCOPED_TIMER(_sink_timer);
- auto status = _sink->sink(_state, block, _data_state);
+ status = _sink->sink(_state, block, _data_state);
if (!status.is<ErrorCode::END_OF_FILE>()) {
RETURN_IF_ERROR(status);
}
@@ -298,7 +299,7 @@ Status PipelineXTask::execute(bool* eos) {
}
}
- return Status::OK();
+ return status;
}
void PipelineXTask::finalize() {
@@ -312,16 +313,6 @@ void PipelineXTask::finalize() {
_le_state_map.clear();
}
-Status PipelineXTask::try_close(Status exec_status) {
- if (_try_close_flag) {
- return Status::OK();
- }
- _try_close_flag = true;
- Status status1 = _sink->try_close(_state, exec_status);
- Status status2 = _source->try_close(_state);
- return status1.ok() ? status2 : status1;
-}
-
Status PipelineXTask::close(Status exec_status) {
int64_t close_ns = 0;
Defer defer {[&]() {
diff --git a/be/src/pipeline/pipeline_x/pipeline_x_task.h
b/be/src/pipeline/pipeline_x/pipeline_x_task.h
index 164b00a8d28..96069cbbea2 100644
--- a/be/src/pipeline/pipeline_x/pipeline_x_task.h
+++ b/be/src/pipeline/pipeline_x/pipeline_x_task.h
@@ -67,9 +67,6 @@ public:
Status execute(bool* eos) override;
- // Try to close this pipeline task. If there are still some resources need
to be released after `try_close`,
- // this task will enter the `PENDING_FINISH` state.
- Status try_close(Status exec_status) override;
// if the pipeline create a bunch of pipeline task
// must be call after all pipeline task is finish to release resource
Status close(Status exec_status) override;
diff --git a/be/src/pipeline/task_scheduler.cpp
b/be/src/pipeline/task_scheduler.cpp
index d253c6a589a..1f8a714a623 100644
--- a/be/src/pipeline/task_scheduler.cpp
+++ b/be/src/pipeline/task_scheduler.cpp
@@ -246,13 +246,14 @@ void TaskScheduler::_do_work(size_t index) {
bool canceled = fragment_ctx->is_canceled();
auto state = task->get_state();
+ // If the state is PENDING_FINISH, then the task is come from blocked
queue, its is_pending_finish
+ // has to return false. The task is finished and need to close now.
if (state == PipelineTaskState::PENDING_FINISH) {
DCHECK(task->is_pipelineX() || !task->is_pending_finish())
<< "must not pending close " << task->debug_string();
Status exec_status = fragment_ctx->get_query_ctx()->exec_status();
- _try_close_task(task,
- canceled ? PipelineTaskState::CANCELED :
PipelineTaskState::FINISHED,
- exec_status);
+ _close_task(task, canceled ? PipelineTaskState::CANCELED :
PipelineTaskState::FINISHED,
+ exec_status);
continue;
}
@@ -267,7 +268,7 @@ void TaskScheduler::_do_work(size_t index) {
// errors to downstream through exchange. So, here we needn't
send_report.
// fragment_ctx->send_report(true);
Status cancel_status =
fragment_ctx->get_query_ctx()->exec_status();
- _try_close_task(task, PipelineTaskState::CANCELED, cancel_status);
+ _close_task(task, PipelineTaskState::CANCELED, cancel_status);
continue;
}
@@ -289,7 +290,12 @@ void TaskScheduler::_do_work(size_t index) {
}
task->set_previous_core_id(index);
- if (!status.ok()) {
+
+ if (status.is<ErrorCode::END_OF_FILE>()) {
+ // Sink operator finished, just close task now.
+ _close_task(task, PipelineTaskState::FINISHED, Status::OK());
+ continue;
+ } else if (!status.ok()) {
task->set_eos_time();
LOG(WARNING) << fmt::format(
"Pipeline task failed. query_id: {} reason: {}",
@@ -302,7 +308,7 @@ void TaskScheduler::_do_work(size_t index) {
// exec failed,cancel all fragment instance
fragment_ctx->cancel(PPlanFragmentCancelReason::INTERNAL_ERROR,
status.msg());
- _try_close_task(task, PipelineTaskState::CANCELED, status);
+ _close_task(task, PipelineTaskState::CANCELED, status);
continue;
}
fragment_ctx->trigger_report_if_necessary();
@@ -316,10 +322,31 @@ void TaskScheduler::_do_work(size_t index) {
PrintInstanceStandardInfo(task->query_context()->query_id(),
task->fragment_context()->get_fragment_instance_id()),
fragment_ctx->is_canceled());
- _try_close_task(task,
- fragment_ctx->is_canceled() ?
PipelineTaskState::CANCELED
- :
PipelineTaskState::FINISHED,
- status);
+ if (task->is_pipelineX()) {
+ // is pending finish will add the task to dependency's
blocking queue, and then the task will be
+ // added to running queue when dependency is ready.
+ if (task->is_pending_finish()) {
+ // Only meet eos, should set task to PENDING_FINISH state
+ task->set_state(PipelineTaskState::PENDING_FINISH);
+ task->set_running(false);
+ } else {
+ // Close the task directly?
+ Status exec_status =
fragment_ctx->get_query_ctx()->exec_status();
+ _close_task(
+ task,
+ canceled ? PipelineTaskState::CANCELED :
PipelineTaskState::FINISHED,
+ exec_status);
+ }
+ } else {
+ // Only meet eos, should set task to PENDING_FINISH state
+ // pipeline is ok, because it will check is pending finish,
and if it is ready, it will be invoked.
+ task->set_state(PipelineTaskState::PENDING_FINISH);
+ task->set_running(false);
+ // After the task is added to the block queue, it maybe run by
another thread
+ // and the task maybe released in the other thread. And will
core at
+ // task set running.
+
static_cast<void>(_blocked_task_scheduler->add_blocked_task(task));
+ }
continue;
}
@@ -343,39 +370,20 @@ void TaskScheduler::_do_work(size_t index) {
}
}
-void TaskScheduler::_try_close_task(PipelineTask* task, PipelineTaskState
state,
- Status exec_status) {
+void TaskScheduler::_close_task(PipelineTask* task, PipelineTaskState state,
Status exec_status) {
// close_a_pipeline may delete fragment context and will core in some defer
// code, because the defer code will access fragment context it self.
auto lock_for_context = task->fragment_context()->shared_from_this();
- auto status = task->try_close(exec_status);
- auto cancel = [&]() {
+ // is_pending_finish does not check status, so has to check status in
close API.
+ // For example, in async writer, the writer may failed during dealing with
eos_block
+ // but it does not return error status. Has to check the error status in
close API.
+ // We have already refactor all source and sink api, the close API does
not need waiting
+ // for pending finish now. So that could call close directly.
+ Status status = task->close(exec_status);
+ if (!status.ok() && state != PipelineTaskState::CANCELED) {
task->query_context()->cancel(true, status.to_string(),
Status::Cancelled(status.to_string()));
state = PipelineTaskState::CANCELED;
- };
-
- auto try_close_failed = !status.ok() && state !=
PipelineTaskState::CANCELED;
- if (try_close_failed) {
- cancel();
- }
- if (!task->is_pipelineX() && task->is_pending_finish()) {
- task->set_state(PipelineTaskState::PENDING_FINISH);
- // After the task is added to the block queue, it maybe run by another
thread
- // and the task maybe released in the other thread. And will core at
- // task set running.
- static_cast<void>(_blocked_task_scheduler->add_blocked_task(task));
- task->set_running(false);
- return;
- } else if (task->is_pending_finish()) {
- task->set_state(PipelineTaskState::PENDING_FINISH);
- task->set_running(false);
- return;
- }
-
- status = task->close(exec_status);
- if (!status.ok() && state != PipelineTaskState::CANCELED) {
- cancel();
}
task->set_state(state);
task->set_close_pipeline_time();
diff --git a/be/src/pipeline/task_scheduler.h b/be/src/pipeline/task_scheduler.h
index af12376c5b7..41ac8c0c098 100644
--- a/be/src/pipeline/task_scheduler.h
+++ b/be/src/pipeline/task_scheduler.h
@@ -103,8 +103,8 @@ private:
CgroupCpuCtl* _cgroup_cpu_ctl = nullptr;
void _do_work(size_t index);
- // after _try_close_task, task maybe destructed.
- void _try_close_task(PipelineTask* task, PipelineTaskState state,
- Status exec_status = Status::OK());
+ // after _close_task, task maybe destructed.
+ void _close_task(PipelineTask* task, PipelineTaskState state,
+ Status exec_status = Status::OK());
};
} // namespace doris::pipeline
\ No newline at end of file
diff --git a/be/src/runtime/task_execution_context.h
b/be/src/runtime/task_execution_context.h
index c876ed5cb0d..3ea54177a05 100644
--- a/be/src/runtime/task_execution_context.h
+++ b/be/src/runtime/task_execution_context.h
@@ -47,6 +47,7 @@ struct HasTaskExecutionCtx {
public:
inline TaskExecutionContextSPtr task_exec_ctx() const { return
task_exec_ctx_.lock(); }
+ inline Weak weak_task_exec_ctx() const { return task_exec_ctx_; }
private:
Weak task_exec_ctx_;
diff --git a/be/src/vec/sink/async_writer_sink.h
b/be/src/vec/sink/async_writer_sink.h
index 1260382d6fa..8105ff96573 100644
--- a/be/src/vec/sink/async_writer_sink.h
+++ b/be/src/vec/sink/async_writer_sink.h
@@ -105,7 +105,17 @@ public:
if (_writer) {
// For pipeline engine, the writer is always closed in async
thread process_block
if (state->enable_pipeline_exec()) {
- RETURN_IF_ERROR(_writer->get_writer_status());
+ Status st = _writer->get_writer_status();
+ if (exec_status.ok()) {
+ _writer->force_close(state->is_cancelled() ?
Status::Cancelled("Cancelled")
+ :
Status::Cancelled("force close"));
+ } else {
+ _writer->force_close(exec_status);
+ }
+ // If there is an error in process_block thread, then we
should get the writer
+ // status before call force_close. For example, the thread may
failed in commit
+ // transaction.
+ RETURN_IF_ERROR(st);
} else {
RETURN_IF_ERROR(_writer->close(exec_status));
}
@@ -113,13 +123,6 @@ public:
return DataSink::close(state, exec_status);
}
- Status try_close(RuntimeState* state, Status exec_status) override {
- if (state->is_cancelled() || !exec_status.ok()) {
- _writer->force_close(!exec_status.ok() ? exec_status :
Status::Cancelled("Cancelled"));
- }
- return Status::OK();
- }
-
[[nodiscard]] bool is_pending_finish() const override { return
_writer->is_pending_finish(); }
protected:
diff --git a/be/src/vec/sink/vdata_stream_sender.cpp
b/be/src/vec/sink/vdata_stream_sender.cpp
index 6c4d10839e0..79ffebcc21e 100644
--- a/be/src/vec/sink/vdata_stream_sender.cpp
+++ b/be/src/vec/sink/vdata_stream_sender.cpp
@@ -282,6 +282,7 @@ Status Channel<Parent>::close_internal(Status exec_status) {
_local_recvr->remove_sender(_parent->sender_id(), _be_number,
exec_status);
}
} else {
+ // Non pipeline engine will send an empty eos block
status = send_remote_block((PBlock*)nullptr, true, exec_status);
}
}
@@ -632,17 +633,20 @@ Status VDataStreamSender::send(RuntimeState* state,
Block* block, bool eos) {
// 1. calculate range
// 2. dispatch rows to channel
}
- return Status::OK();
-}
-Status VDataStreamSender::try_close(RuntimeState* state, Status exec_status) {
- SCOPED_TIMER(_exec_timer);
- _serializer.reset_block();
+ // If eos == true, then this is the last block, should close the channel
in this step.
Status final_st = Status::OK();
- for (int i = 0; i < _channels.size(); ++i) {
- Status st = _channels[i]->close(state, exec_status);
- if (!st.ok() && final_st.ok()) {
- final_st = st;
+ // For non-pipeline engine, there maybe an block in serializer, should
wait for
+ if (eos && _enable_pipeline_exec) {
+ _serializer.reset_block();
+ for (int i = 0; i < _channels.size(); ++i) {
+ // For non-pipeline engine, this API maybe hang to wait last rpc.
+ // For pipeline engine, it will add block to exchange sink buffer,
+ // and then come into pending finish state.
+ Status st = _channels[i]->close(state, Status::OK());
+ if (!st.ok() && final_st.ok()) {
+ final_st = st;
+ }
}
}
return final_st;
diff --git a/be/src/vec/sink/vdata_stream_sender.h
b/be/src/vec/sink/vdata_stream_sender.h
index ca020d9bab8..1224ded31ee 100644
--- a/be/src/vec/sink/vdata_stream_sender.h
+++ b/be/src/vec/sink/vdata_stream_sender.h
@@ -122,7 +122,7 @@ public:
Status open(RuntimeState* state) override;
Status send(RuntimeState* state, Block* block, bool eos = false) override;
- Status try_close(RuntimeState* state, Status exec_status) override;
+
Status close(RuntimeState* state, Status exec_status) override;
RuntimeState* state() { return _state; }
diff --git a/be/src/vec/sink/writer/async_result_writer.cpp
b/be/src/vec/sink/writer/async_result_writer.cpp
index 35d94fe3c47..471d4518724 100644
--- a/be/src/vec/sink/writer/async_result_writer.cpp
+++ b/be/src/vec/sink/writer/async_result_writer.cpp
@@ -67,8 +67,6 @@ Status AsyncResultWriter::sink(Block* block, bool eos) {
if (_dependency && !_data_queue_is_available() && !_is_finished()) {
_dependency->block();
}
- } else if (_eos && _data_queue.empty()) {
- status = Status::EndOfFile("Run out of sink data");
}
_cv.notify_one();
@@ -87,8 +85,20 @@ std::unique_ptr<Block>
AsyncResultWriter::_get_block_from_queue() {
}
void AsyncResultWriter::start_writer(RuntimeState* state, RuntimeProfile*
profile) {
+ // Should set to false here, to
+ _writer_thread_closed = false;
+ // This is a async thread, should lock the task ctx, to make sure
runtimestate and profile
+ // not deconstructed before the thread exit.
+ auto task_ctx = state->get_task_execution_context();
static_cast<void>(ExecEnv::GetInstance()->fragment_mgr()->get_thread_pool()->submit_func(
- [this, state, profile]() { this->process_block(state, profile);
}));
+ [this, state, profile, task_ctx]() {
+ auto task_lock = task_ctx.lock();
+ if (task_lock == nullptr) {
+ _writer_thread_closed = true;
+ return;
+ }
+ this->process_block(state, profile);
+ }));
}
void AsyncResultWriter::process_block(RuntimeState* state, RuntimeProfile*
profile) {
@@ -131,16 +141,17 @@ void AsyncResultWriter::process_block(RuntimeState*
state, RuntimeProfile* profi
// There is a unique ptr err_msg in Status, if it is modified, the unique
ptr
// maybe released. And it will core because use after free.
std::lock_guard l(_m);
+ // eos only means the last block is input to the queue and there is no
more block to be added,
+ // it is not sure that the block is written to stream.
if (_writer_status.ok() && _eos) {
_writer_status = finish(state);
}
+ Status close_st = close(_writer_status);
+ // If it is already failed before, then not update the write status so
that we could get
+ // the real reason.
if (_writer_status.ok()) {
- _writer_status = close(_writer_status);
- } else {
- // If it is already failed before, then not update the write status so
that we could get
- // the real reason.
- static_cast<void>(close(_writer_status));
+ _writer_status = close_st;
}
_writer_thread_closed = true;
if (_finish_dependency) {
diff --git a/be/src/vec/sink/writer/async_result_writer.h
b/be/src/vec/sink/writer/async_result_writer.h
index 5fbcca98af3..8ed39aeb795 100644
--- a/be/src/vec/sink/writer/async_result_writer.h
+++ b/be/src/vec/sink/writer/async_result_writer.h
@@ -104,7 +104,9 @@ private:
std::deque<std::unique_ptr<Block>> _data_queue;
Status _writer_status = Status::OK();
bool _eos = false;
- bool _writer_thread_closed = false;
+ // The writer is not started at the beginning. If prepare failed but not
open, the the writer
+ // is not started, so should not pending finish on it.
+ bool _writer_thread_closed = true;
// Used by pipelineX
pipeline::AsyncWriterDependency* _dependency;
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]