This is an automated email from the ASF dual-hosted git repository. yiguolei pushed a commit to branch branch-2.1 in repository https://gitbox.apache.org/repos/asf/doris.git
commit ce5ba6164008ca78160de653384e0c15ca781ad5 Author: yiguolei <[email protected]> AuthorDate: Tue Jan 23 13:21:17 2024 +0800 [refactor](close)Full refactor async writer (#30082) --------- Co-authored-by: yiguolei <[email protected]> --- be/src/exec/data_sink.h | 4 -- be/src/exec/exec_node.h | 4 -- be/src/pipeline/exec/exchange_sink_buffer.cpp | 53 ++++++++++---- be/src/pipeline/exec/exchange_sink_buffer.h | 5 +- be/src/pipeline/exec/exchange_sink_operator.cpp | 34 +++++---- be/src/pipeline/exec/exchange_sink_operator.h | 2 - be/src/pipeline/exec/operator.h | 6 -- be/src/pipeline/pipeline_fragment_context.cpp | 7 +- be/src/pipeline/pipeline_fragment_context.h | 1 - be/src/pipeline/pipeline_task.cpp | 13 +--- be/src/pipeline/pipeline_task.h | 5 -- be/src/pipeline/pipeline_x/operator.cpp | 21 +++--- be/src/pipeline/pipeline_x/operator.h | 13 ---- .../pipeline_x/pipeline_x_fragment_context.cpp | 6 +- be/src/pipeline/pipeline_x/pipeline_x_task.cpp | 15 +--- be/src/pipeline/pipeline_x/pipeline_x_task.h | 3 - be/src/pipeline/task_scheduler.cpp | 82 ++++++++++++---------- be/src/pipeline/task_scheduler.h | 6 +- be/src/runtime/task_execution_context.h | 1 + be/src/vec/sink/async_writer_sink.h | 19 ++--- be/src/vec/sink/vdata_stream_sender.cpp | 22 +++--- be/src/vec/sink/vdata_stream_sender.h | 2 +- be/src/vec/sink/writer/async_result_writer.cpp | 27 ++++--- be/src/vec/sink/writer/async_result_writer.h | 4 +- 24 files changed, 177 insertions(+), 178 deletions(-) diff --git a/be/src/exec/data_sink.h b/be/src/exec/data_sink.h index be6cfe236b7..c0b27e8ae90 100644 --- a/be/src/exec/data_sink.h +++ b/be/src/exec/data_sink.h @@ -70,10 +70,6 @@ public: return send(state, block, eos); } - [[nodiscard]] virtual Status try_close(RuntimeState* state, Status exec_status) { - return Status::OK(); - } - [[nodiscard]] virtual bool is_pending_finish() const { return false; } // Releases all resources that were allocated in prepare()/send(). diff --git a/be/src/exec/exec_node.h b/be/src/exec/exec_node.h index 903122ecded..5a6b04667e7 100644 --- a/be/src/exec/exec_node.h +++ b/be/src/exec/exec_node.h @@ -232,10 +232,6 @@ public: size_t children_count() const { return _children.size(); } - // when the fragment is normal finished, call this method to do some finish work - // such as send the last buffer to remote. - virtual Status try_close(RuntimeState* state) { return Status::OK(); } - std::shared_ptr<QueryStatistics> get_query_statistics() { return _query_statistics; } protected: diff --git a/be/src/pipeline/exec/exchange_sink_buffer.cpp b/be/src/pipeline/exec/exchange_sink_buffer.cpp index 4484a34375b..9d240945be1 100644 --- a/be/src/pipeline/exec/exchange_sink_buffer.cpp +++ b/be/src/pipeline/exec/exchange_sink_buffer.cpp @@ -85,14 +85,15 @@ namespace pipeline { template <typename Parent> ExchangeSinkBuffer<Parent>::ExchangeSinkBuffer(PUniqueId query_id, PlanNodeId dest_node_id, - int send_id, int be_number, QueryContext* context) - : _queue_capacity(0), + int send_id, int be_number, RuntimeState* state) + : HasTaskExecutionCtx(state), + _queue_capacity(0), _is_finishing(false), _query_id(query_id), _dest_node_id(dest_node_id), _sender_id(send_id), _be_number(be_number), - _context(context) {} + _context(state->get_query_ctx()) {} template <typename Parent> ExchangeSinkBuffer<Parent>::~ExchangeSinkBuffer() = default; @@ -270,12 +271,25 @@ Status ExchangeSinkBuffer<Parent>::_send_rpc(InstanceLoId id) { if (config::exchange_sink_ignore_eovercrowded) { send_callback->cntl_->ignore_eovercrowded(); } - send_callback->addFailedHandler( - [&](const InstanceLoId& id, const std::string& err) { _failed(id, err); }); + send_callback->addFailedHandler([&, weak_task_ctx = weak_task_exec_ctx()]( + const InstanceLoId& id, const std::string& err) { + auto task_lock = weak_task_ctx.lock(); + if (task_lock == nullptr) { + // This means ExchangeSinkBuffer Ojbect already destroyed, not need run failed any more. + return; + } + _failed(id, err); + }); send_callback->start_rpc_time = GetCurrentTimeNanos(); - send_callback->addSuccessHandler([&](const InstanceLoId& id, const bool& eos, - const PTransmitDataResult& result, - const int64_t& start_rpc_time) { + send_callback->addSuccessHandler([&, weak_task_ctx = weak_task_exec_ctx()]( + const InstanceLoId& id, const bool& eos, + const PTransmitDataResult& result, + const int64_t& start_rpc_time) { + auto task_lock = weak_task_ctx.lock(); + if (task_lock == nullptr) { + // This means ExchangeSinkBuffer Ojbect already destroyed, not need run failed any more. + return; + } set_rpc_time(id, start_rpc_time, result.receive_time()); Status s(Status::create(result.status())); if (s.is<ErrorCode::END_OF_FILE>()) { @@ -332,12 +346,25 @@ Status ExchangeSinkBuffer<Parent>::_send_rpc(InstanceLoId id) { if (config::exchange_sink_ignore_eovercrowded) { send_callback->cntl_->ignore_eovercrowded(); } - send_callback->addFailedHandler( - [&](const InstanceLoId& id, const std::string& err) { _failed(id, err); }); + send_callback->addFailedHandler([&, weak_task_ctx = weak_task_exec_ctx()]( + const InstanceLoId& id, const std::string& err) { + auto task_lock = weak_task_ctx.lock(); + if (task_lock == nullptr) { + // This means ExchangeSinkBuffer Ojbect already destroyed, not need run failed any more. + return; + } + _failed(id, err); + }); send_callback->start_rpc_time = GetCurrentTimeNanos(); - send_callback->addSuccessHandler([&](const InstanceLoId& id, const bool& eos, - const PTransmitDataResult& result, - const int64_t& start_rpc_time) { + send_callback->addSuccessHandler([&, weak_task_ctx = weak_task_exec_ctx()]( + const InstanceLoId& id, const bool& eos, + const PTransmitDataResult& result, + const int64_t& start_rpc_time) { + auto task_lock = weak_task_ctx.lock(); + if (task_lock == nullptr) { + // This means ExchangeSinkBuffer Ojbect already destroyed, not need run failed any more. + return; + } set_rpc_time(id, start_rpc_time, result.receive_time()); Status s(Status::create(result.status())); if (s.is<ErrorCode::END_OF_FILE>()) { diff --git a/be/src/pipeline/exec/exchange_sink_buffer.h b/be/src/pipeline/exec/exchange_sink_buffer.h index a11b637f4d4..f0b55d528ec 100644 --- a/be/src/pipeline/exec/exchange_sink_buffer.h +++ b/be/src/pipeline/exec/exchange_sink_buffer.h @@ -196,9 +196,10 @@ struct ExchangeRpcContext { // Each ExchangeSinkOperator have one ExchangeSinkBuffer template <typename Parent> -class ExchangeSinkBuffer { +class ExchangeSinkBuffer : public HasTaskExecutionCtx { public: - ExchangeSinkBuffer(PUniqueId, int, PlanNodeId, int, QueryContext*); + ExchangeSinkBuffer(PUniqueId query_id, PlanNodeId dest_node_id, int send_id, int be_number, + RuntimeState* state); ~ExchangeSinkBuffer(); void register_sink(TUniqueId); diff --git a/be/src/pipeline/exec/exchange_sink_operator.cpp b/be/src/pipeline/exec/exchange_sink_operator.cpp index aa29bb4cf0f..1f9ba3b4203 100644 --- a/be/src/pipeline/exec/exchange_sink_operator.cpp +++ b/be/src/pipeline/exec/exchange_sink_operator.cpp @@ -65,7 +65,7 @@ Status ExchangeSinkOperator::prepare(RuntimeState* state) { id.set_hi(_state->query_id().hi); id.set_lo(_state->query_id().lo); _sink_buffer = std::make_unique<ExchangeSinkBuffer<vectorized::VDataStreamSender>>( - id, _dest_node_id, _sink->_sender_id, _state->be_number(), state->get_query_ctx()); + id, _dest_node_id, _sink->_sender_id, _state->be_number(), state); RETURN_IF_ERROR(DataSinkOperator::prepare(state)); _sink->register_pipeline_channels(_sink_buffer.get()); @@ -168,7 +168,7 @@ Status ExchangeSinkLocalState::init(RuntimeState* state, LocalSinkStateInfo& inf id.set_hi(_state->query_id().hi); id.set_lo(_state->query_id().lo); _sink_buffer = std::make_unique<ExchangeSinkBuffer<ExchangeSinkLocalState>>( - id, p._dest_node_id, _sender_id, _state->be_number(), state->get_query_ctx()); + id, p._dest_node_id, _sender_id, _state->be_number(), state); register_channels(_sink_buffer.get()); auto* _exchange_sink_dependency = _dependency; @@ -414,7 +414,20 @@ Status ExchangeSinkOperatorX::sink(RuntimeState* state, vectorized::Block* block // 1. calculate range // 2. dispatch rows to channel } - return Status::OK(); + + Status final_st = Status::OK(); + if (source_state == SourceState::FINISHED) { + local_state._serializer.reset_block(); + for (int i = 0; i < local_state.channels.size(); ++i) { + Status st = local_state.channels[i]->close(state, Status::OK()); + if (!st.ok() && final_st.ok()) { + final_st = st; + } + } + local_state._sink_buffer->set_should_stop(); + return final_st; + } + return final_st; } Status ExchangeSinkOperatorX::serialize_block(ExchangeSinkLocalState& state, vectorized::Block* src, @@ -488,21 +501,6 @@ Status ExchangeSinkOperatorX::channel_add_rows(RuntimeState* state, Channels& ch return Status::OK(); } -Status ExchangeSinkOperatorX::try_close(RuntimeState* state, Status exec_status) { - auto& local_state = get_local_state(state); - local_state._serializer.reset_block(); - Status final_st = Status::OK(); - Status final_status = exec_status; - for (int i = 0; i < local_state.channels.size(); ++i) { - Status st = local_state.channels[i]->close(state, exec_status); - if (!st.ok() && final_st.ok()) { - final_st = st; - } - } - local_state._sink_buffer->set_should_stop(); - return final_st; -} - std::string ExchangeSinkLocalState::debug_string(int indentation_level) const { fmt::memory_buffer debug_string_buffer; fmt::format_to(debug_string_buffer, "{}", Base::debug_string(indentation_level)); diff --git a/be/src/pipeline/exec/exchange_sink_operator.h b/be/src/pipeline/exec/exchange_sink_operator.h index 6d1d1b6a4fe..3e6486e34fd 100644 --- a/be/src/pipeline/exec/exchange_sink_operator.h +++ b/be/src/pipeline/exec/exchange_sink_operator.h @@ -217,8 +217,6 @@ public: Status serialize_block(ExchangeSinkLocalState& stete, vectorized::Block* src, PBlock* dest, int num_receivers = 1); - Status try_close(RuntimeState* state, Status exec_status) override; - private: friend class ExchangeSinkLocalState; diff --git a/be/src/pipeline/exec/operator.h b/be/src/pipeline/exec/operator.h index bf41c670e0c..8755872a979 100644 --- a/be/src/pipeline/exec/operator.h +++ b/be/src/pipeline/exec/operator.h @@ -239,8 +239,6 @@ public: */ virtual bool is_pending_finish() const { return false; } - virtual Status try_close(RuntimeState* state) { return Status::OK(); } - bool is_closed() const { return _is_closed; } const OperatorBuilderBase* operator_builder() const { return _operator_builder; } @@ -289,10 +287,6 @@ public: return Status::OK(); } - Status try_close(RuntimeState* state) override { - return _sink->try_close(state, state->query_status()); - } - [[nodiscard]] bool is_pending_finish() const override { return _sink->is_pending_finish(); } Status close(RuntimeState* state) override { diff --git a/be/src/pipeline/pipeline_fragment_context.cpp b/be/src/pipeline/pipeline_fragment_context.cpp index 909039b23fb..2341071d963 100644 --- a/be/src/pipeline/pipeline_fragment_context.cpp +++ b/be/src/pipeline/pipeline_fragment_context.cpp @@ -163,13 +163,12 @@ void PipelineFragmentContext::cancel(const PPlanFragmentCancelReason& reason, // make result receiver on fe be stocked on rpc forever until timeout... // We need a more detail discussion. if (_query_ctx->cancel(true, msg, Status::Cancelled(msg))) { - if (reason != PPlanFragmentCancelReason::LIMIT_REACH) { + if (reason == PPlanFragmentCancelReason::LIMIT_REACH) { + _is_report_on_cancel = false; + } else { LOG(WARNING) << "PipelineFragmentContext " << PrintInstanceStandardInfo(_query_id, _fragment_instance_id) << " is canceled, cancel message: " << msg; - - } else { - _set_is_report_on_cancel(false); // TODO bug llj fix this not projected by lock } _runtime_state->set_process_status(_query_ctx->exec_status()); diff --git a/be/src/pipeline/pipeline_fragment_context.h b/be/src/pipeline/pipeline_fragment_context.h index a7a45d8f07f..0ec27c5054f 100644 --- a/be/src/pipeline/pipeline_fragment_context.h +++ b/be/src/pipeline/pipeline_fragment_context.h @@ -158,7 +158,6 @@ protected: Status _build_operators_for_set_operation_node(ExecNode*, PipelinePtr); virtual void _close_fragment_instance(); void _init_next_report_time(); - void _set_is_report_on_cancel(bool val) { _is_report_on_cancel = val; } // Id of this query TUniqueId _query_id; diff --git a/be/src/pipeline/pipeline_task.cpp b/be/src/pipeline/pipeline_task.cpp index d87c477a8db..9b72762ebf9 100644 --- a/be/src/pipeline/pipeline_task.cpp +++ b/be/src/pipeline/pipeline_task.cpp @@ -301,17 +301,8 @@ Status PipelineTask::execute(bool* eos) { _finish_p_dependency(); } - return Status::OK(); -} - -Status PipelineTask::try_close(Status exec_status) { - if (_try_close_flag) { - return Status::OK(); - } - _try_close_flag = true; - Status status1 = _sink->try_close(_state); - Status status2 = _source->try_close(_state); - return status1.ok() ? status2 : status1; + // If the status is eof(sink node will return eof if downstream fragment finished), then return it. + return status; } Status PipelineTask::close(Status exec_status) { diff --git a/be/src/pipeline/pipeline_task.h b/be/src/pipeline/pipeline_task.h index 56e42370ff2..dd512293e05 100644 --- a/be/src/pipeline/pipeline_task.h +++ b/be/src/pipeline/pipeline_task.h @@ -132,9 +132,6 @@ public: virtual Status execute(bool* eos); - // Try to close this pipeline task. If there are still some resources need to be released after `try_close`, - // this task will enter the `PENDING_FINISH` state. - virtual Status try_close(Status exec_status); // if the pipeline create a bunch of pipeline task // must be call after all pipeline task is finish to release resource virtual Status close(Status exec_status); @@ -328,8 +325,6 @@ protected: int _queue_level = 0; int _core_id = 0; - bool _try_close_flag = false; - RuntimeProfile* _parent_profile = nullptr; std::unique_ptr<RuntimeProfile> _task_profile; RuntimeProfile::Counter* _task_cpu_timer = nullptr; diff --git a/be/src/pipeline/pipeline_x/operator.cpp b/be/src/pipeline/pipeline_x/operator.cpp index e00b1632eb4..a59f4ced6de 100644 --- a/be/src/pipeline/pipeline_x/operator.cpp +++ b/be/src/pipeline/pipeline_x/operator.cpp @@ -544,20 +544,21 @@ Status AsyncWriterSink<Writer, Parent>::close(RuntimeState* state, Status exec_s COUNTER_SET(_wait_for_finish_dependency_timer, _finish_dependency->watcher_elapse_time()); // if the init failed, the _writer may be nullptr. so here need check if (_writer) { - RETURN_IF_ERROR(_writer->get_writer_status()); + Status st = _writer->get_writer_status(); + if (exec_status.ok()) { + _writer->force_close(state->is_cancelled() ? Status::Cancelled("Cancelled") + : Status::Cancelled("force close")); + } else { + _writer->force_close(exec_status); + } + // If there is an error in process_block thread, then we should get the writer + // status before call force_close. For example, the thread may failed in commit + // transaction. + RETURN_IF_ERROR(st); } return Base::close(state, exec_status); } -template <typename Writer, typename Parent> - requires(std::is_base_of_v<vectorized::AsyncResultWriter, Writer>) -Status AsyncWriterSink<Writer, Parent>::try_close(RuntimeState* state, Status exec_status) { - if (state->is_cancelled() || !exec_status.ok()) { - _writer->force_close(!exec_status.ok() ? exec_status : Status::Cancelled("Cancelled")); - } - return Status::OK(); -} - #define DECLARE_OPERATOR_X(LOCAL_STATE) template class DataSinkOperatorX<LOCAL_STATE>; DECLARE_OPERATOR_X(HashJoinBuildSinkLocalState) DECLARE_OPERATOR_X(ResultSinkLocalState) diff --git a/be/src/pipeline/pipeline_x/operator.h b/be/src/pipeline/pipeline_x/operator.h index ca1b224c5c5..98feb60ac22 100644 --- a/be/src/pipeline/pipeline_x/operator.h +++ b/be/src/pipeline/pipeline_x/operator.h @@ -387,7 +387,6 @@ public: // idempotent (e.g. wait for runtime filters). virtual Status open(RuntimeState* state) = 0; virtual Status close(RuntimeState* state, Status exec_status) = 0; - virtual Status try_close(RuntimeState* state, Status exec_status) = 0; [[nodiscard]] virtual std::string debug_string(int indentation_level) const = 0; @@ -512,10 +511,6 @@ public: return Status::InternalError("Should not reach here!"); } - Status try_close(RuntimeState* state) override { - return Status::InternalError("Should not reach here!"); - } - bool can_read() override { LOG(FATAL) << "should not reach here!"; return false; @@ -546,10 +541,6 @@ public: return state->get_sink_local_state(operator_id())->close(state, exec_status); } - [[nodiscard]] virtual Status try_close(RuntimeState* state, Status exec_status) { - return state->get_sink_local_state(operator_id())->try_close(state, exec_status); - } - [[nodiscard]] RuntimeProfile* get_runtime_profile() const override { throw doris::Exception(ErrorCode::NOT_IMPLEMENTED_ERROR, "Runtime Profile is not owned by operator"); @@ -620,8 +611,6 @@ public: Status open(RuntimeState* state) override { return Status::OK(); } - Status try_close(RuntimeState* state, Status exec_status) override { return Status::OK(); } - Status close(RuntimeState* state, Status exec_status) override; [[nodiscard]] std::string debug_string(int indentation_level) const override; @@ -704,8 +693,6 @@ public: Dependency* dependency() override { return _async_writer_dependency.get(); } Status close(RuntimeState* state, Status exec_status) override; - Status try_close(RuntimeState* state, Status exec_status) override; - Dependency* finishdependency() override { return _finish_dependency.get(); } protected: diff --git a/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.cpp b/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.cpp index cf4c312926e..aa580b49d48 100644 --- a/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.cpp +++ b/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.cpp @@ -133,12 +133,12 @@ void PipelineXFragmentContext::cancel(const PPlanFragmentCancelReason& reason, LOG(WARNING) << "PipelineXFragmentContext is cancelled due to timeout : " << debug_string(); } if (_query_ctx->cancel(true, msg, Status::Cancelled(msg), _fragment_id)) { - if (reason != PPlanFragmentCancelReason::LIMIT_REACH) { + if (reason == PPlanFragmentCancelReason::LIMIT_REACH) { + _is_report_on_cancel = false; + } else { for (auto& id : _fragment_instance_ids) { LOG(WARNING) << "PipelineXFragmentContext cancel instance: " << print_id(id); } - } else { - _set_is_report_on_cancel(false); // TODO bug llj } // Get pipe from new load stream manager and send cancel to it or the fragment may hang to wait read from pipe // For stream load the fragment's query_id == load id, it is set in FE. diff --git a/be/src/pipeline/pipeline_x/pipeline_x_task.cpp b/be/src/pipeline/pipeline_x/pipeline_x_task.cpp index 1b1c04eb814..c44570eb7c6 100644 --- a/be/src/pipeline/pipeline_x/pipeline_x_task.cpp +++ b/be/src/pipeline/pipeline_x/pipeline_x_task.cpp @@ -256,6 +256,7 @@ Status PipelineXTask::execute(bool* eos) { } } + Status status = Status::OK(); set_begin_execute_time(); while (!_fragment_context->is_canceled()) { if (_data_state != SourceState::MORE_DATA && !source_can_read()) { @@ -287,7 +288,7 @@ Status PipelineXTask::execute(bool* eos) { *eos = _data_state == SourceState::FINISHED; if (_block->rows() != 0 || *eos) { SCOPED_TIMER(_sink_timer); - auto status = _sink->sink(_state, block, _data_state); + status = _sink->sink(_state, block, _data_state); if (!status.is<ErrorCode::END_OF_FILE>()) { RETURN_IF_ERROR(status); } @@ -298,7 +299,7 @@ Status PipelineXTask::execute(bool* eos) { } } - return Status::OK(); + return status; } void PipelineXTask::finalize() { @@ -312,16 +313,6 @@ void PipelineXTask::finalize() { _le_state_map.clear(); } -Status PipelineXTask::try_close(Status exec_status) { - if (_try_close_flag) { - return Status::OK(); - } - _try_close_flag = true; - Status status1 = _sink->try_close(_state, exec_status); - Status status2 = _source->try_close(_state); - return status1.ok() ? status2 : status1; -} - Status PipelineXTask::close(Status exec_status) { int64_t close_ns = 0; Defer defer {[&]() { diff --git a/be/src/pipeline/pipeline_x/pipeline_x_task.h b/be/src/pipeline/pipeline_x/pipeline_x_task.h index 164b00a8d28..96069cbbea2 100644 --- a/be/src/pipeline/pipeline_x/pipeline_x_task.h +++ b/be/src/pipeline/pipeline_x/pipeline_x_task.h @@ -67,9 +67,6 @@ public: Status execute(bool* eos) override; - // Try to close this pipeline task. If there are still some resources need to be released after `try_close`, - // this task will enter the `PENDING_FINISH` state. - Status try_close(Status exec_status) override; // if the pipeline create a bunch of pipeline task // must be call after all pipeline task is finish to release resource Status close(Status exec_status) override; diff --git a/be/src/pipeline/task_scheduler.cpp b/be/src/pipeline/task_scheduler.cpp index d253c6a589a..1f8a714a623 100644 --- a/be/src/pipeline/task_scheduler.cpp +++ b/be/src/pipeline/task_scheduler.cpp @@ -246,13 +246,14 @@ void TaskScheduler::_do_work(size_t index) { bool canceled = fragment_ctx->is_canceled(); auto state = task->get_state(); + // If the state is PENDING_FINISH, then the task is come from blocked queue, its is_pending_finish + // has to return false. The task is finished and need to close now. if (state == PipelineTaskState::PENDING_FINISH) { DCHECK(task->is_pipelineX() || !task->is_pending_finish()) << "must not pending close " << task->debug_string(); Status exec_status = fragment_ctx->get_query_ctx()->exec_status(); - _try_close_task(task, - canceled ? PipelineTaskState::CANCELED : PipelineTaskState::FINISHED, - exec_status); + _close_task(task, canceled ? PipelineTaskState::CANCELED : PipelineTaskState::FINISHED, + exec_status); continue; } @@ -267,7 +268,7 @@ void TaskScheduler::_do_work(size_t index) { // errors to downstream through exchange. So, here we needn't send_report. // fragment_ctx->send_report(true); Status cancel_status = fragment_ctx->get_query_ctx()->exec_status(); - _try_close_task(task, PipelineTaskState::CANCELED, cancel_status); + _close_task(task, PipelineTaskState::CANCELED, cancel_status); continue; } @@ -289,7 +290,12 @@ void TaskScheduler::_do_work(size_t index) { } task->set_previous_core_id(index); - if (!status.ok()) { + + if (status.is<ErrorCode::END_OF_FILE>()) { + // Sink operator finished, just close task now. + _close_task(task, PipelineTaskState::FINISHED, Status::OK()); + continue; + } else if (!status.ok()) { task->set_eos_time(); LOG(WARNING) << fmt::format( "Pipeline task failed. query_id: {} reason: {}", @@ -302,7 +308,7 @@ void TaskScheduler::_do_work(size_t index) { // exec failed,cancel all fragment instance fragment_ctx->cancel(PPlanFragmentCancelReason::INTERNAL_ERROR, status.msg()); - _try_close_task(task, PipelineTaskState::CANCELED, status); + _close_task(task, PipelineTaskState::CANCELED, status); continue; } fragment_ctx->trigger_report_if_necessary(); @@ -316,10 +322,31 @@ void TaskScheduler::_do_work(size_t index) { PrintInstanceStandardInfo(task->query_context()->query_id(), task->fragment_context()->get_fragment_instance_id()), fragment_ctx->is_canceled()); - _try_close_task(task, - fragment_ctx->is_canceled() ? PipelineTaskState::CANCELED - : PipelineTaskState::FINISHED, - status); + if (task->is_pipelineX()) { + // is pending finish will add the task to dependency's blocking queue, and then the task will be + // added to running queue when dependency is ready. + if (task->is_pending_finish()) { + // Only meet eos, should set task to PENDING_FINISH state + task->set_state(PipelineTaskState::PENDING_FINISH); + task->set_running(false); + } else { + // Close the task directly? + Status exec_status = fragment_ctx->get_query_ctx()->exec_status(); + _close_task( + task, + canceled ? PipelineTaskState::CANCELED : PipelineTaskState::FINISHED, + exec_status); + } + } else { + // Only meet eos, should set task to PENDING_FINISH state + // pipeline is ok, because it will check is pending finish, and if it is ready, it will be invoked. + task->set_state(PipelineTaskState::PENDING_FINISH); + task->set_running(false); + // After the task is added to the block queue, it maybe run by another thread + // and the task maybe released in the other thread. And will core at + // task set running. + static_cast<void>(_blocked_task_scheduler->add_blocked_task(task)); + } continue; } @@ -343,39 +370,20 @@ void TaskScheduler::_do_work(size_t index) { } } -void TaskScheduler::_try_close_task(PipelineTask* task, PipelineTaskState state, - Status exec_status) { +void TaskScheduler::_close_task(PipelineTask* task, PipelineTaskState state, Status exec_status) { // close_a_pipeline may delete fragment context and will core in some defer // code, because the defer code will access fragment context it self. auto lock_for_context = task->fragment_context()->shared_from_this(); - auto status = task->try_close(exec_status); - auto cancel = [&]() { + // is_pending_finish does not check status, so has to check status in close API. + // For example, in async writer, the writer may failed during dealing with eos_block + // but it does not return error status. Has to check the error status in close API. + // We have already refactor all source and sink api, the close API does not need waiting + // for pending finish now. So that could call close directly. + Status status = task->close(exec_status); + if (!status.ok() && state != PipelineTaskState::CANCELED) { task->query_context()->cancel(true, status.to_string(), Status::Cancelled(status.to_string())); state = PipelineTaskState::CANCELED; - }; - - auto try_close_failed = !status.ok() && state != PipelineTaskState::CANCELED; - if (try_close_failed) { - cancel(); - } - if (!task->is_pipelineX() && task->is_pending_finish()) { - task->set_state(PipelineTaskState::PENDING_FINISH); - // After the task is added to the block queue, it maybe run by another thread - // and the task maybe released in the other thread. And will core at - // task set running. - static_cast<void>(_blocked_task_scheduler->add_blocked_task(task)); - task->set_running(false); - return; - } else if (task->is_pending_finish()) { - task->set_state(PipelineTaskState::PENDING_FINISH); - task->set_running(false); - return; - } - - status = task->close(exec_status); - if (!status.ok() && state != PipelineTaskState::CANCELED) { - cancel(); } task->set_state(state); task->set_close_pipeline_time(); diff --git a/be/src/pipeline/task_scheduler.h b/be/src/pipeline/task_scheduler.h index af12376c5b7..41ac8c0c098 100644 --- a/be/src/pipeline/task_scheduler.h +++ b/be/src/pipeline/task_scheduler.h @@ -103,8 +103,8 @@ private: CgroupCpuCtl* _cgroup_cpu_ctl = nullptr; void _do_work(size_t index); - // after _try_close_task, task maybe destructed. - void _try_close_task(PipelineTask* task, PipelineTaskState state, - Status exec_status = Status::OK()); + // after _close_task, task maybe destructed. + void _close_task(PipelineTask* task, PipelineTaskState state, + Status exec_status = Status::OK()); }; } // namespace doris::pipeline \ No newline at end of file diff --git a/be/src/runtime/task_execution_context.h b/be/src/runtime/task_execution_context.h index c876ed5cb0d..3ea54177a05 100644 --- a/be/src/runtime/task_execution_context.h +++ b/be/src/runtime/task_execution_context.h @@ -47,6 +47,7 @@ struct HasTaskExecutionCtx { public: inline TaskExecutionContextSPtr task_exec_ctx() const { return task_exec_ctx_.lock(); } + inline Weak weak_task_exec_ctx() const { return task_exec_ctx_; } private: Weak task_exec_ctx_; diff --git a/be/src/vec/sink/async_writer_sink.h b/be/src/vec/sink/async_writer_sink.h index 1260382d6fa..8105ff96573 100644 --- a/be/src/vec/sink/async_writer_sink.h +++ b/be/src/vec/sink/async_writer_sink.h @@ -105,7 +105,17 @@ public: if (_writer) { // For pipeline engine, the writer is always closed in async thread process_block if (state->enable_pipeline_exec()) { - RETURN_IF_ERROR(_writer->get_writer_status()); + Status st = _writer->get_writer_status(); + if (exec_status.ok()) { + _writer->force_close(state->is_cancelled() ? Status::Cancelled("Cancelled") + : Status::Cancelled("force close")); + } else { + _writer->force_close(exec_status); + } + // If there is an error in process_block thread, then we should get the writer + // status before call force_close. For example, the thread may failed in commit + // transaction. + RETURN_IF_ERROR(st); } else { RETURN_IF_ERROR(_writer->close(exec_status)); } @@ -113,13 +123,6 @@ public: return DataSink::close(state, exec_status); } - Status try_close(RuntimeState* state, Status exec_status) override { - if (state->is_cancelled() || !exec_status.ok()) { - _writer->force_close(!exec_status.ok() ? exec_status : Status::Cancelled("Cancelled")); - } - return Status::OK(); - } - [[nodiscard]] bool is_pending_finish() const override { return _writer->is_pending_finish(); } protected: diff --git a/be/src/vec/sink/vdata_stream_sender.cpp b/be/src/vec/sink/vdata_stream_sender.cpp index 6c4d10839e0..79ffebcc21e 100644 --- a/be/src/vec/sink/vdata_stream_sender.cpp +++ b/be/src/vec/sink/vdata_stream_sender.cpp @@ -282,6 +282,7 @@ Status Channel<Parent>::close_internal(Status exec_status) { _local_recvr->remove_sender(_parent->sender_id(), _be_number, exec_status); } } else { + // Non pipeline engine will send an empty eos block status = send_remote_block((PBlock*)nullptr, true, exec_status); } } @@ -632,17 +633,20 @@ Status VDataStreamSender::send(RuntimeState* state, Block* block, bool eos) { // 1. calculate range // 2. dispatch rows to channel } - return Status::OK(); -} -Status VDataStreamSender::try_close(RuntimeState* state, Status exec_status) { - SCOPED_TIMER(_exec_timer); - _serializer.reset_block(); + // If eos == true, then this is the last block, should close the channel in this step. Status final_st = Status::OK(); - for (int i = 0; i < _channels.size(); ++i) { - Status st = _channels[i]->close(state, exec_status); - if (!st.ok() && final_st.ok()) { - final_st = st; + // For non-pipeline engine, there maybe an block in serializer, should wait for + if (eos && _enable_pipeline_exec) { + _serializer.reset_block(); + for (int i = 0; i < _channels.size(); ++i) { + // For non-pipeline engine, this API maybe hang to wait last rpc. + // For pipeline engine, it will add block to exchange sink buffer, + // and then come into pending finish state. + Status st = _channels[i]->close(state, Status::OK()); + if (!st.ok() && final_st.ok()) { + final_st = st; + } } } return final_st; diff --git a/be/src/vec/sink/vdata_stream_sender.h b/be/src/vec/sink/vdata_stream_sender.h index ca020d9bab8..1224ded31ee 100644 --- a/be/src/vec/sink/vdata_stream_sender.h +++ b/be/src/vec/sink/vdata_stream_sender.h @@ -122,7 +122,7 @@ public: Status open(RuntimeState* state) override; Status send(RuntimeState* state, Block* block, bool eos = false) override; - Status try_close(RuntimeState* state, Status exec_status) override; + Status close(RuntimeState* state, Status exec_status) override; RuntimeState* state() { return _state; } diff --git a/be/src/vec/sink/writer/async_result_writer.cpp b/be/src/vec/sink/writer/async_result_writer.cpp index 35d94fe3c47..471d4518724 100644 --- a/be/src/vec/sink/writer/async_result_writer.cpp +++ b/be/src/vec/sink/writer/async_result_writer.cpp @@ -67,8 +67,6 @@ Status AsyncResultWriter::sink(Block* block, bool eos) { if (_dependency && !_data_queue_is_available() && !_is_finished()) { _dependency->block(); } - } else if (_eos && _data_queue.empty()) { - status = Status::EndOfFile("Run out of sink data"); } _cv.notify_one(); @@ -87,8 +85,20 @@ std::unique_ptr<Block> AsyncResultWriter::_get_block_from_queue() { } void AsyncResultWriter::start_writer(RuntimeState* state, RuntimeProfile* profile) { + // Should set to false here, to + _writer_thread_closed = false; + // This is a async thread, should lock the task ctx, to make sure runtimestate and profile + // not deconstructed before the thread exit. + auto task_ctx = state->get_task_execution_context(); static_cast<void>(ExecEnv::GetInstance()->fragment_mgr()->get_thread_pool()->submit_func( - [this, state, profile]() { this->process_block(state, profile); })); + [this, state, profile, task_ctx]() { + auto task_lock = task_ctx.lock(); + if (task_lock == nullptr) { + _writer_thread_closed = true; + return; + } + this->process_block(state, profile); + })); } void AsyncResultWriter::process_block(RuntimeState* state, RuntimeProfile* profile) { @@ -131,16 +141,17 @@ void AsyncResultWriter::process_block(RuntimeState* state, RuntimeProfile* profi // There is a unique ptr err_msg in Status, if it is modified, the unique ptr // maybe released. And it will core because use after free. std::lock_guard l(_m); + // eos only means the last block is input to the queue and there is no more block to be added, + // it is not sure that the block is written to stream. if (_writer_status.ok() && _eos) { _writer_status = finish(state); } + Status close_st = close(_writer_status); + // If it is already failed before, then not update the write status so that we could get + // the real reason. if (_writer_status.ok()) { - _writer_status = close(_writer_status); - } else { - // If it is already failed before, then not update the write status so that we could get - // the real reason. - static_cast<void>(close(_writer_status)); + _writer_status = close_st; } _writer_thread_closed = true; if (_finish_dependency) { diff --git a/be/src/vec/sink/writer/async_result_writer.h b/be/src/vec/sink/writer/async_result_writer.h index 5fbcca98af3..8ed39aeb795 100644 --- a/be/src/vec/sink/writer/async_result_writer.h +++ b/be/src/vec/sink/writer/async_result_writer.h @@ -104,7 +104,9 @@ private: std::deque<std::unique_ptr<Block>> _data_queue; Status _writer_status = Status::OK(); bool _eos = false; - bool _writer_thread_closed = false; + // The writer is not started at the beginning. If prepare failed but not open, the the writer + // is not started, so should not pending finish on it. + bool _writer_thread_closed = true; // Used by pipelineX pipeline::AsyncWriterDependency* _dependency; --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
