This is an automated email from the ASF dual-hosted git repository. dataroaring pushed a commit to branch branch-3.0 in repository https://gitbox.apache.org/repos/asf/doris.git
commit 0d28a05abd4bf7d5e320c8c98a32ea7761047a05 Author: HappenLee <[email protected]> AuthorDate: Fri Jun 14 10:45:25 2024 +0800 [Exec](agg) Remove unless opt in agg sink operator (#33691) Remove unless opt in agg sink operator for topn agg limit --- be/src/pipeline/exec/aggregation_sink_operator.cpp | 26 ++++++---------------- be/src/pipeline/pipeline_fragment_context.cpp | 3 ++- 2 files changed, 9 insertions(+), 20 deletions(-) diff --git a/be/src/pipeline/exec/aggregation_sink_operator.cpp b/be/src/pipeline/exec/aggregation_sink_operator.cpp index 8e34de9bf98..bbca04352ca 100644 --- a/be/src/pipeline/exec/aggregation_sink_operator.cpp +++ b/be/src/pipeline/exec/aggregation_sink_operator.cpp @@ -498,22 +498,13 @@ Status AggSinkLocalState::_execute_with_serialized_key_helper(vectorized::Block* if (_should_limit_output && !Base::_shared_state->enable_spill) { const size_t hash_table_size = _get_hash_table_size(); - if (Base::_parent->template cast<AggSinkOperatorX>()._can_short_circuit) { - _shared_state->reach_limit = - hash_table_size >= - Base::_parent->template cast<AggSinkOperatorX>()._limit; - if (_shared_state->reach_limit) { - Base::_dependency->set_ready_to_read(); - return Status::Error<ErrorCode::END_OF_FILE>(""); - } - } else { - _shared_state->reach_limit = - hash_table_size >= _shared_state->do_sort_limit - ? Base::_parent->template cast<AggSinkOperatorX>()._limit * 5 - : Base::_parent->template cast<AggSinkOperatorX>()._limit; - if (_shared_state->reach_limit && _shared_state->do_sort_limit) { - _shared_state->build_limit_heap(hash_table_size); - } + + _shared_state->reach_limit = + hash_table_size >= _shared_state->do_sort_limit + ? Base::_parent->template cast<AggSinkOperatorX>()._limit * 5 + : Base::_parent->template cast<AggSinkOperatorX>()._limit; + if (_shared_state->reach_limit && _shared_state->do_sort_limit) { + _shared_state->build_limit_heap(hash_table_size); } } } @@ -741,9 +732,6 @@ Status AggSinkOperatorX::init(const TPlanNode& tnode, RuntimeState* state) { // init aggregate functions _aggregate_evaluators.reserve(tnode.agg_node.aggregate_functions.size()); - // In case of : `select * from (select GoodEvent from hits union select CounterID from hits) as h limit 10;` - // only union with limit: we can short circuit query the pipeline exec engine. - _can_short_circuit = tnode.agg_node.aggregate_functions.empty(); TSortInfo dummy; for (int i = 0; i < tnode.agg_node.aggregate_functions.size(); ++i) { diff --git a/be/src/pipeline/pipeline_fragment_context.cpp b/be/src/pipeline/pipeline_fragment_context.cpp index 2b765afd77d..17466ae1069 100644 --- a/be/src/pipeline/pipeline_fragment_context.cpp +++ b/be/src/pipeline/pipeline_fragment_context.cpp @@ -1189,7 +1189,8 @@ Status PipelineFragmentContext::_create_operator(ObjectPool* pool, const TPlanNo if (tnode.agg_node.aggregate_functions.empty() && !_runtime_state->enable_agg_spill() && request.query_options.__isset.enable_distinct_streaming_aggregation && request.query_options.enable_distinct_streaming_aggregation && - !tnode.agg_node.grouping_exprs.empty()) { + !tnode.agg_node.grouping_exprs.empty() && + !tnode.agg_node.__isset.agg_sort_info_by_group_key) { op.reset(new DistinctStreamingAggOperatorX(pool, next_operator_id(), tnode, descs, _require_bucket_distribution)); _require_bucket_distribution = --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
