This is an automated email from the ASF dual-hosted git repository.

dataroaring pushed a commit to branch branch-3.0
in repository https://gitbox.apache.org/repos/asf/doris.git

commit a8f509c7fe66a93b0d21bf996a9fb55e04dce1a2
Author: TengJianPing <[email protected]>
AuthorDate: Mon May 27 17:07:34 2024 +0800

    [fix] fix wrong result of spill agg with limit (#35403)
---
 be/src/pipeline/exec/aggregation_sink_operator.cpp |  5 ++-
 .../pipeline/exec/aggregation_source_operator.cpp  |  5 ---
 be/src/pipeline/exec/aggregation_source_operator.h |  3 --
 regression-test/data/spill_p0/aggregate_spill.out  |  4 ++
 .../suites/spill_p0/aggregate_spill.groovy         | 43 ++++++++++++++++++++++
 5 files changed, 50 insertions(+), 10 deletions(-)

diff --git a/be/src/pipeline/exec/aggregation_sink_operator.cpp 
b/be/src/pipeline/exec/aggregation_sink_operator.cpp
index d79623ba4c3..67133187101 100644
--- a/be/src/pipeline/exec/aggregation_sink_operator.cpp
+++ b/be/src/pipeline/exec/aggregation_sink_operator.cpp
@@ -131,7 +131,8 @@ Status AggSinkLocalState::open(RuntimeState* state) {
 
         _should_limit_output = p._limit != -1 &&       // has limit
                                (!p._have_conjuncts) && // no having conjunct
-                               p._needs_finalize;      // agg's finalize step
+                               p._needs_finalize &&    // agg's finalize step
+                               !Base::_shared_state->enable_spill;
     }
     for (auto& evaluator : p._aggregate_evaluators) {
         
Base::_shared_state->aggregate_evaluators.push_back(evaluator->clone(state, 
p._pool));
@@ -462,7 +463,7 @@ Status 
AggSinkLocalState::_execute_with_serialized_key_helper(vectorized::Block*
                     _places.data(), _agg_arena_pool));
         }
 
-        if (_should_limit_output && !Base::_shared_state->enable_spill) {
+        if (_should_limit_output) {
             _reach_limit = _get_hash_table_size() >=
                            Base::_parent->template 
cast<AggSinkOperatorX>()._limit;
             if (_reach_limit &&
diff --git a/be/src/pipeline/exec/aggregation_source_operator.cpp 
b/be/src/pipeline/exec/aggregation_source_operator.cpp
index 8545fe42f68..9e1710cfa54 100644
--- a/be/src/pipeline/exec/aggregation_source_operator.cpp
+++ b/be/src/pipeline/exec/aggregation_source_operator.cpp
@@ -535,11 +535,6 @@ Status 
AggLocalState::merge_with_serialized_key_helper(vectorized::Block* block)
                         _shared_state->agg_arena_pool.get(), rows);
             }
         }
-
-        if (_should_limit_output) {
-            _reach_limit = _get_hash_table_size() >=
-                           Base::_parent->template 
cast<AggSourceOperatorX>()._limit;
-        }
     }
 
     return Status::OK();
diff --git a/be/src/pipeline/exec/aggregation_source_operator.h 
b/be/src/pipeline/exec/aggregation_source_operator.h
index ddf4a2d9b1a..bcfa9d27a04 100644
--- a/be/src/pipeline/exec/aggregation_source_operator.h
+++ b/be/src/pipeline/exec/aggregation_source_operator.h
@@ -93,9 +93,6 @@ protected:
     };
 
     executor _executor;
-
-    bool _should_limit_output = false;
-    bool _reach_limit = false;
 };
 
 class AggSourceOperatorX : public OperatorX<AggLocalState> {
diff --git a/regression-test/data/spill_p0/aggregate_spill.out 
b/regression-test/data/spill_p0/aggregate_spill.out
new file mode 100644
index 00000000000..e7251ff58e9
--- /dev/null
+++ b/regression-test/data/spill_p0/aggregate_spill.out
@@ -0,0 +1,4 @@
+-- This file is automatically generated. You should know what you did if you 
want to edit this
+-- !aggregate_spill --
+5      1
+
diff --git a/regression-test/suites/spill_p0/aggregate_spill.groovy 
b/regression-test/suites/spill_p0/aggregate_spill.groovy
new file mode 100644
index 00000000000..180ab37200f
--- /dev/null
+++ b/regression-test/suites/spill_p0/aggregate_spill.groovy
@@ -0,0 +1,43 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+suite("aggregate_spill") {
+    sql """
+        set enable_agg_spill = true;
+    """
+    sql """
+        set enable_force_spill = true;
+    """
+    sql """
+        set min_revocable_mem = 1;
+    """
+    sql """
+        set parallel_pipeline_task_num = 4;
+    """
+    sql """
+        drop table if exists aggregate_spill_test;
+    """
+    sql """
+        CREATE TABLE `aggregate_spill_test` (k1 int, k2 int replace) 
distributed by hash(k1) properties("replication_num"="1");
+    """
+    sql """
+        insert into aggregate_spill_test values(1, 1), (2, 1), (3, 1), (4, 1), 
(5, 1);
+    """
+    qt_aggregate_spill """
+        select count(), k2 from aggregate_spill_test group by k2 limit 1;
+    """
+}
\ No newline at end of file


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to