This is an automated email from the ASF dual-hosted git repository.
lihaopeng pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new 20000e6fdc0 [Exec](agg) Fix agg limit result error (#37025)
20000e6fdc0 is described below
commit 20000e6fdc0815021579cbc137f43d7bb6fc2ac7
Author: HappenLee <[email protected]>
AuthorDate: Mon Jul 1 09:49:04 2024 +0800
[Exec](agg) Fix agg limit result error (#37025)
Before merge #34853, should merge the pr firstly
---
be/src/pipeline/dependency.cpp | 10 ++++++----
be/src/pipeline/dependency.h | 3 ++-
be/src/pipeline/exec/aggregation_sink_operator.cpp | 4 +++-
be/src/pipeline/exec/aggregation_source_operator.cpp | 8 +++++++-
4 files changed, 18 insertions(+), 7 deletions(-)
diff --git a/be/src/pipeline/dependency.cpp b/be/src/pipeline/dependency.cpp
index 68c00af409d..4938883062a 100644
--- a/be/src/pipeline/dependency.cpp
+++ b/be/src/pipeline/dependency.cpp
@@ -248,7 +248,8 @@ void AggSharedState::build_limit_heap(size_t
hash_table_size) {
limit_columns_min = limit_heap.top()._row_id;
}
-bool AggSharedState::do_limit_filter(vectorized::Block* block, size_t
num_rows) {
+bool AggSharedState::do_limit_filter(vectorized::Block* block, size_t num_rows,
+ const std::vector<int>* key_locs) {
if (num_rows) {
cmp_res.resize(num_rows);
need_computes.resize(num_rows);
@@ -257,9 +258,10 @@ bool AggSharedState::do_limit_filter(vectorized::Block*
block, size_t num_rows)
const auto key_size = null_directions.size();
for (int i = 0; i < key_size; i++) {
- block->get_by_position(i).column->compare_internal(
- limit_columns_min, *limit_columns[i], null_directions[i],
order_directions[i],
- cmp_res, need_computes.data());
+ block->get_by_position(key_locs ? key_locs->operator[](i) : i)
+ .column->compare_internal(limit_columns_min,
*limit_columns[i],
+ null_directions[i],
order_directions[i], cmp_res,
+ need_computes.data());
}
auto set_computes_arr = [](auto* __restrict res, auto* __restrict
computes, int rows) {
diff --git a/be/src/pipeline/dependency.h b/be/src/pipeline/dependency.h
index 5214022db13..8adc24d3b4e 100644
--- a/be/src/pipeline/dependency.h
+++ b/be/src/pipeline/dependency.h
@@ -311,7 +311,8 @@ public:
Status reset_hash_table();
- bool do_limit_filter(vectorized::Block* block, size_t num_rows);
+ bool do_limit_filter(vectorized::Block* block, size_t num_rows,
+ const std::vector<int>* key_locs = nullptr);
void build_limit_heap(size_t hash_table_size);
// We should call this function only at 1st phase.
diff --git a/be/src/pipeline/exec/aggregation_sink_operator.cpp
b/be/src/pipeline/exec/aggregation_sink_operator.cpp
index fae987394b4..1dab1669dd5 100644
--- a/be/src/pipeline/exec/aggregation_sink_operator.cpp
+++ b/be/src/pipeline/exec/aggregation_sink_operator.cpp
@@ -329,6 +329,7 @@ Status
AggSinkLocalState::_merge_with_serialized_key_helper(vectorized::Block* b
if (limit) {
need_do_agg = _emplace_into_hash_table_limit(_places.data(),
block, key_locs,
key_columns, rows);
+ rows = block->rows();
} else {
_emplace_into_hash_table(_places.data(), key_columns, rows);
}
@@ -589,7 +590,8 @@ bool
AggSinkLocalState::_emplace_into_hash_table_limit(vectorized::AggregateData
bool need_filter = false;
{
SCOPED_TIMER(_hash_table_limit_compute_timer);
- need_filter =
_shared_state->do_limit_filter(block, num_rows);
+ need_filter =
+ _shared_state->do_limit_filter(block,
num_rows, &key_locs);
}
auto& need_computes = _shared_state->need_computes;
diff --git a/be/src/pipeline/exec/aggregation_source_operator.cpp
b/be/src/pipeline/exec/aggregation_source_operator.cpp
index 5b371877f36..1b7a151e2af 100644
--- a/be/src/pipeline/exec/aggregation_source_operator.cpp
+++ b/be/src/pipeline/exec/aggregation_source_operator.cpp
@@ -452,8 +452,14 @@ void AggLocalState::do_agg_limit(vectorized::Block* block,
bool* eos) {
if (_shared_state->reach_limit) {
if (_shared_state->do_sort_limit &&
_shared_state->do_limit_filter(block, block->rows())) {
vectorized::Block::filter_block_internal(block,
_shared_state->need_computes);
+ if (auto rows = block->rows()) {
+ _num_rows_returned += rows;
+ COUNTER_UPDATE(_blocks_returned_counter, 1);
+ COUNTER_SET(_rows_returned_counter, _num_rows_returned);
+ }
+ } else {
+ reached_limit(block, eos);
}
- reached_limit(block, eos);
}
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]