This is an automated email from the ASF dual-hosted git repository.

dataroaring pushed a commit to branch branch-3.0
in repository https://gitbox.apache.org/repos/asf/doris.git

commit 0d28a05abd4bf7d5e320c8c98a32ea7761047a05
Author: HappenLee <[email protected]>
AuthorDate: Fri Jun 14 10:45:25 2024 +0800

    [Exec](agg) Remove unless opt in agg sink operator (#33691)
    
     Remove unless opt in agg sink operator for topn agg limit
---
 be/src/pipeline/exec/aggregation_sink_operator.cpp | 26 ++++++----------------
 be/src/pipeline/pipeline_fragment_context.cpp      |  3 ++-
 2 files changed, 9 insertions(+), 20 deletions(-)

diff --git a/be/src/pipeline/exec/aggregation_sink_operator.cpp 
b/be/src/pipeline/exec/aggregation_sink_operator.cpp
index 8e34de9bf98..bbca04352ca 100644
--- a/be/src/pipeline/exec/aggregation_sink_operator.cpp
+++ b/be/src/pipeline/exec/aggregation_sink_operator.cpp
@@ -498,22 +498,13 @@ Status 
AggSinkLocalState::_execute_with_serialized_key_helper(vectorized::Block*
 
             if (_should_limit_output && !Base::_shared_state->enable_spill) {
                 const size_t hash_table_size = _get_hash_table_size();
-                if (Base::_parent->template 
cast<AggSinkOperatorX>()._can_short_circuit) {
-                    _shared_state->reach_limit =
-                            hash_table_size >=
-                            Base::_parent->template 
cast<AggSinkOperatorX>()._limit;
-                    if (_shared_state->reach_limit) {
-                        Base::_dependency->set_ready_to_read();
-                        return Status::Error<ErrorCode::END_OF_FILE>("");
-                    }
-                } else {
-                    _shared_state->reach_limit =
-                            hash_table_size >= _shared_state->do_sort_limit
-                                    ? Base::_parent->template 
cast<AggSinkOperatorX>()._limit * 5
-                                    : Base::_parent->template 
cast<AggSinkOperatorX>()._limit;
-                    if (_shared_state->reach_limit && 
_shared_state->do_sort_limit) {
-                        _shared_state->build_limit_heap(hash_table_size);
-                    }
+
+                _shared_state->reach_limit =
+                        hash_table_size >= _shared_state->do_sort_limit
+                                ? Base::_parent->template 
cast<AggSinkOperatorX>()._limit * 5
+                                : Base::_parent->template 
cast<AggSinkOperatorX>()._limit;
+                if (_shared_state->reach_limit && 
_shared_state->do_sort_limit) {
+                    _shared_state->build_limit_heap(hash_table_size);
                 }
             }
         }
@@ -741,9 +732,6 @@ Status AggSinkOperatorX::init(const TPlanNode& tnode, 
RuntimeState* state) {
 
     // init aggregate functions
     _aggregate_evaluators.reserve(tnode.agg_node.aggregate_functions.size());
-    // In case of : `select * from (select GoodEvent from hits union select 
CounterID from hits) as h limit 10;`
-    // only union with limit: we can short circuit query the pipeline exec 
engine.
-    _can_short_circuit = tnode.agg_node.aggregate_functions.empty();
 
     TSortInfo dummy;
     for (int i = 0; i < tnode.agg_node.aggregate_functions.size(); ++i) {
diff --git a/be/src/pipeline/pipeline_fragment_context.cpp 
b/be/src/pipeline/pipeline_fragment_context.cpp
index 2b765afd77d..17466ae1069 100644
--- a/be/src/pipeline/pipeline_fragment_context.cpp
+++ b/be/src/pipeline/pipeline_fragment_context.cpp
@@ -1189,7 +1189,8 @@ Status 
PipelineFragmentContext::_create_operator(ObjectPool* pool, const TPlanNo
         if (tnode.agg_node.aggregate_functions.empty() && 
!_runtime_state->enable_agg_spill() &&
             
request.query_options.__isset.enable_distinct_streaming_aggregation &&
             request.query_options.enable_distinct_streaming_aggregation &&
-            !tnode.agg_node.grouping_exprs.empty()) {
+            !tnode.agg_node.grouping_exprs.empty() &&
+            !tnode.agg_node.__isset.agg_sort_info_by_group_key) {
             op.reset(new DistinctStreamingAggOperatorX(pool, 
next_operator_id(), tnode, descs,
                                                        
_require_bucket_distribution));
             _require_bucket_distribution =


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to