This is an automated email from the ASF dual-hosted git repository. dataroaring pushed a commit to branch branch-3.0 in repository https://gitbox.apache.org/repos/asf/doris.git
commit 64482b2e7c7e4a4aaa5cda4f5b5d7f7b3b678168 Author: Gabriel <[email protected]> AuthorDate: Tue May 28 10:31:31 2024 +0800 [fix](pipeline) Fix query hang if limited rows is reached (#35466) ## Proposed changes Some operators has limit condition, the source operator should notify the sink operator that limit reached. Although FE has limit logic but it not always send . ## Further comments If this is a relatively large or complex change, kick off the discussion at [[email protected]](mailto:[email protected]) by explaining why you chose the solution you did and what alternatives you considered, etc... --- be/src/pipeline/dependency.cpp | 7 +++++++ be/src/pipeline/dependency.h | 5 +++++ .../local_exchange/local_exchange_sink_operator.cpp | 11 +++++++++-- .../local_exchange/local_exchange_source_operator.cpp | 17 +++++++++++++++-- .../local_exchange/local_exchange_source_operator.h | 1 + be/src/pipeline/local_exchange/local_exchanger.h | 4 ++++ 6 files changed, 41 insertions(+), 4 deletions(-) diff --git a/be/src/pipeline/dependency.cpp b/be/src/pipeline/dependency.cpp index b8c55c7c68d..8cf025274af 100644 --- a/be/src/pipeline/dependency.cpp +++ b/be/src/pipeline/dependency.cpp @@ -184,6 +184,13 @@ void LocalExchangeSharedState::sub_running_sink_operators() { } } +void LocalExchangeSharedState::sub_running_source_operators() { + std::unique_lock<std::mutex> lc(le_lock); + if (exchanger->_running_source_operators.fetch_sub(1) == 1) { + _set_always_ready(); + } +} + LocalExchangeSharedState::LocalExchangeSharedState(int num_instances) { source_deps.resize(num_instances, nullptr); mem_trackers.resize(num_instances, nullptr); diff --git a/be/src/pipeline/dependency.h b/be/src/pipeline/dependency.h index 580cb8368c8..cdc0eec3933 100644 --- a/be/src/pipeline/dependency.h +++ b/be/src/pipeline/dependency.h @@ -759,11 +759,16 @@ public: } }; void sub_running_sink_operators(); + void sub_running_source_operators(); void _set_always_ready() { for (auto& dep : source_deps) { DCHECK(dep); dep->set_always_ready(); } + for (auto& dep : sink_deps) { + DCHECK(dep); + dep->set_always_ready(); + } } Dependency* get_dep_by_channel_id(int channel_id) { return source_deps[channel_id].get(); } diff --git a/be/src/pipeline/local_exchange/local_exchange_sink_operator.cpp b/be/src/pipeline/local_exchange/local_exchange_sink_operator.cpp index 043b66b2e08..39ebbe27d88 100644 --- a/be/src/pipeline/local_exchange/local_exchange_sink_operator.cpp +++ b/be/src/pipeline/local_exchange/local_exchange_sink_operator.cpp @@ -65,10 +65,11 @@ std::string LocalExchangeSinkLocalState::debug_string(int indentation_level) con fmt::memory_buffer debug_string_buffer; fmt::format_to(debug_string_buffer, "{}, _channel_id: {}, _num_partitions: {}, _num_senders: {}, _num_sources: {}, " - "_running_sink_operators: {}, _release_count: {}", + "_running_sink_operators: {}, _running_source_operators: {}, _release_count: {}", Base::debug_string(indentation_level), _channel_id, _exchanger->_num_partitions, _exchanger->_num_senders, _exchanger->_num_sources, - _exchanger->_running_sink_operators, _release_count); + _exchanger->_running_sink_operators, _exchanger->_running_source_operators, + _release_count); return fmt::to_string(debug_string_buffer); } @@ -79,6 +80,12 @@ Status LocalExchangeSinkOperatorX::sink(RuntimeState* state, vectorized::Block* COUNTER_UPDATE(local_state.rows_input_counter(), (int64_t)in_block->rows()); RETURN_IF_ERROR(local_state._exchanger->sink(state, in_block, eos, local_state)); + // If all exchange sources ended due to limit reached, current task should also finish + if (local_state._exchanger->_running_source_operators == 0) { + local_state._release_count = true; + local_state._shared_state->sub_running_sink_operators(); + return Status::EndOfFile("receiver eof"); + } if (eos) { local_state._shared_state->sub_running_sink_operators(); local_state._release_count = true; diff --git a/be/src/pipeline/local_exchange/local_exchange_source_operator.cpp b/be/src/pipeline/local_exchange/local_exchange_source_operator.cpp index 1ee48e54a44..ea9f7861dbf 100644 --- a/be/src/pipeline/local_exchange/local_exchange_source_operator.cpp +++ b/be/src/pipeline/local_exchange/local_exchange_source_operator.cpp @@ -47,13 +47,26 @@ Status LocalExchangeSourceLocalState::open(RuntimeState* state) { return Status::OK(); } +Status LocalExchangeSourceLocalState::close(RuntimeState* state) { + if (_closed) { + return Status::OK(); + } + + if (_shared_state) { + _shared_state->sub_running_source_operators(); + } + + return Base::close(state); +} + std::string LocalExchangeSourceLocalState::debug_string(int indentation_level) const { fmt::memory_buffer debug_string_buffer; fmt::format_to(debug_string_buffer, - "{}, _channel_id: {}, _num_partitions: {}, _num_senders: {}, _num_sources: {}", + "{}, _channel_id: {}, _num_partitions: {}, _num_senders: {}, _num_sources: {}, " + "_running_sink_operators: {}, _running_source_operators: {}", Base::debug_string(indentation_level), _channel_id, _exchanger->_num_partitions, _exchanger->_num_senders, _exchanger->_num_sources, - _exchanger->_running_sink_operators); + _exchanger->_running_sink_operators, _exchanger->_running_source_operators); return fmt::to_string(debug_string_buffer); } diff --git a/be/src/pipeline/local_exchange/local_exchange_source_operator.h b/be/src/pipeline/local_exchange/local_exchange_source_operator.h index 9413b60d03b..ec662178dea 100644 --- a/be/src/pipeline/local_exchange/local_exchange_source_operator.h +++ b/be/src/pipeline/local_exchange/local_exchange_source_operator.h @@ -36,6 +36,7 @@ public: Status init(RuntimeState* state, LocalStateInfo& info) override; Status open(RuntimeState* state) override; + Status close(RuntimeState* state) override; std::string debug_string(int indentation_level) const override; private: diff --git a/be/src/pipeline/local_exchange/local_exchanger.h b/be/src/pipeline/local_exchange/local_exchanger.h index 1a47a1f36f5..476f479e11e 100644 --- a/be/src/pipeline/local_exchange/local_exchanger.h +++ b/be/src/pipeline/local_exchange/local_exchanger.h @@ -30,12 +30,14 @@ class Exchanger { public: Exchanger(int running_sink_operators, int num_partitions, int free_block_limit) : _running_sink_operators(running_sink_operators), + _running_source_operators(num_partitions), _num_partitions(num_partitions), _num_senders(running_sink_operators), _num_sources(num_partitions), _free_block_limit(free_block_limit) {} Exchanger(int running_sink_operators, int num_sources, int num_partitions, int free_block_limit) : _running_sink_operators(running_sink_operators), + _running_source_operators(num_partitions), _num_partitions(num_partitions), _num_senders(running_sink_operators), _num_sources(num_sources), @@ -51,8 +53,10 @@ protected: friend struct LocalExchangeSharedState; friend struct ShuffleBlockWrapper; friend class LocalExchangeSourceLocalState; + friend class LocalExchangeSinkOperatorX; friend class LocalExchangeSinkLocalState; std::atomic<int> _running_sink_operators = 0; + std::atomic<int> _running_source_operators = 0; const int _num_partitions; const int _num_senders; const int _num_sources; --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
