This is an automated email from the ASF dual-hosted git repository.
yiguolei pushed a commit to branch branch-4.1
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-4.1 by this push:
new af470361117 [Opt](exec) cherry pick the opt code #60137 #59492 #59446
#58728 (#61282)
af470361117 is described below
commit af4703611170acd23fbc04d8fef17f319c3f66d8
Author: HappenLee <[email protected]>
AuthorDate: Fri Mar 13 19:44:36 2026 +0800
[Opt](exec) cherry pick the opt code #60137 #59492 #59446 #58728 (#61282)
### What problem does this PR solve?
cherry pick the opt code #60137 #59492 #59446 #58728
Problem Summary:
### Release note
None
### Check List (For Author)
- Test <!-- At least one of them must be included. -->
- [ ] Regression test
- [ ] Unit Test
- [ ] Manual test (add detailed scripts or steps below)
- [ ] No need to test or manual test. Explain why:
- [ ] This is a refactor/code format and no logic has been changed.
- [ ] Previous test can cover this change.
- [ ] No code files have been changed.
- [ ] Other reason <!-- Add your reason? -->
- Behavior changed:
- [ ] No.
- [ ] Yes. <!-- Explain the behavior change -->
- Does this need documentation?
- [ ] No.
- [ ] Yes. <!-- Add document PR link here. eg:
https://github.com/apache/doris-website/pull/1214 -->
### Check List (For Reviewer who merge this PR)
- [ ] Confirm the release note
- [ ] Confirm test cases
- [ ] Confirm document
- [ ] Add branch pick label <!-- Add branch pick label that this PR
should merge into -->
---
be/src/olap/comparison_predicate.h | 15 +-
.../exec/streaming_aggregation_operator.cpp | 301 ++++++++++++++++++---
.../pipeline/exec/streaming_aggregation_operator.h | 78 +++++-
be/src/vec/exec/scan/scanner.cpp | 35 ++-
be/src/vec/exec/scan/scanner.h | 12 +
be/src/vec/exprs/vcompound_pred.h | 67 +++--
.../operator/streaming_agg_operator_test.cpp | 20 +-
.../glue/translator/PhysicalPlanTranslator.java | 3 +-
.../nereids_tpch_p0/tpch/push_topn_to_agg.groovy | 5 +-
9 files changed, 454 insertions(+), 82 deletions(-)
diff --git a/be/src/olap/comparison_predicate.h
b/be/src/olap/comparison_predicate.h
index 6992112b63f..1ef691b1283 100644
--- a/be/src/olap/comparison_predicate.h
+++ b/be/src/olap/comparison_predicate.h
@@ -380,8 +380,8 @@ public:
}
template <bool is_and>
- void __attribute__((flatten))
- _evaluate_vec_internal(const vectorized::IColumn& column, uint16_t size,
bool* flags) const {
+ void __attribute__((flatten)) _evaluate_vec_internal(const
vectorized::IColumn& column,
+ uint16_t size, bool*
flags) const {
uint16_t current_evaluated_rows = 0;
uint16_t current_passed_rows = 0;
if (_can_ignore()) {
@@ -579,9 +579,10 @@ private:
}
template <bool is_nullable, bool is_and, typename TArray, typename TValue>
- void __attribute__((flatten))
- _base_loop_vec(uint16_t size, bool* __restrict bflags, const uint8_t*
__restrict null_map,
- const TArray* __restrict data_array, const TValue& value)
const {
+ void __attribute__((flatten)) _base_loop_vec(uint16_t size, bool*
__restrict bflags,
+ const uint8_t* __restrict
null_map,
+ const TArray* __restrict
data_array,
+ const TValue& value) const {
//uint8_t helps compiler to generate vectorized code
auto* flags = reinterpret_cast<uint8_t*>(bflags);
if constexpr (is_and) {
@@ -696,8 +697,8 @@ private:
}
}
- int32_t __attribute__((flatten))
- _find_code_from_dictionary_column(const vectorized::ColumnDictI32& column)
const {
+ int32_t __attribute__((flatten)) _find_code_from_dictionary_column(
+ const vectorized::ColumnDictI32& column) const {
static_assert(is_string_type(Type),
"Only string type predicate can use dictionary column.");
int32_t code = 0;
diff --git a/be/src/pipeline/exec/streaming_aggregation_operator.cpp
b/be/src/pipeline/exec/streaming_aggregation_operator.cpp
index 6c0506e600c..6c0c412f819 100644
--- a/be/src/pipeline/exec/streaming_aggregation_operator.cpp
+++ b/be/src/pipeline/exec/streaming_aggregation_operator.cpp
@@ -99,6 +99,8 @@ Status StreamingAggLocalState::init(RuntimeState* state,
LocalStateInfo& info) {
_insert_values_to_column_timer = ADD_TIMER(Base::custom_profile(),
"InsertValuesToColumnTime");
_deserialize_data_timer = ADD_TIMER(Base::custom_profile(),
"DeserializeAndMergeTime");
_hash_table_compute_timer = ADD_TIMER(Base::custom_profile(),
"HashTableComputeTime");
+ _hash_table_limit_compute_timer =
+ ADD_TIMER(Base::custom_profile(), "HashTableLimitComputeTime");
_hash_table_emplace_timer = ADD_TIMER(Base::custom_profile(),
"HashTableEmplaceTime");
_hash_table_input_counter =
ADD_COUNTER(Base::custom_profile(), "HashTableInputCount",
TUnit::UNIT);
@@ -152,16 +154,10 @@ Status StreamingAggLocalState::open(RuntimeState* state) {
}},
_agg_data->method_variant);
- if (p._is_merge || p._needs_finalize) {
- return Status::InvalidArgument(
- "StreamingAggLocalState only support no merge and no finalize,
"
- "but got is_merge={}, needs_finalize={}",
- p._is_merge, p._needs_finalize);
- }
-
- _should_limit_output = p._limit != -1 && // has limit
- (!p._have_conjuncts) && // no having conjunct
- p._needs_finalize; // agg's finalize step
+ limit = p._sort_limit;
+ do_sort_limit = p._do_sort_limit;
+ null_directions = p._null_directions;
+ order_directions = p._order_directions;
return Status::OK();
}
@@ -316,23 +312,22 @@ bool
StreamingAggLocalState::_should_not_do_pre_agg(size_t rows) {
const auto spill_streaming_agg_mem_limit =
p._spill_streaming_agg_mem_limit;
const bool used_too_much_memory =
spill_streaming_agg_mem_limit > 0 && _memory_usage() >
spill_streaming_agg_mem_limit;
- std::visit(
- vectorized::Overload {
- [&](std::monostate& arg) {
- throw doris::Exception(ErrorCode::INTERNAL_ERROR,
"uninited hash table");
- },
- [&](auto& agg_method) {
- auto& hash_tbl = *agg_method.hash_table;
- /// If too much memory is used during the
pre-aggregation stage,
- /// it is better to output the data directly without
performing further aggregation.
- // do not try to do agg, just init and serialize
directly return the out_block
- if (used_too_much_memory ||
(hash_tbl.add_elem_size_overflow(rows) &&
-
!_should_expand_preagg_hash_tables())) {
- SCOPED_TIMER(_streaming_agg_timer);
- ret_flag = true;
- }
- }},
- _agg_data->method_variant);
+ std::visit(vectorized::Overload {
+ [&](std::monostate& arg) {
+ throw doris::Exception(ErrorCode::INTERNAL_ERROR,
"uninited hash table");
+ },
+ [&](auto& agg_method) {
+ auto& hash_tbl = *agg_method.hash_table;
+ /// If too much memory is used during the
pre-aggregation stage,
+ /// it is better to output the data directly
without performing further aggregation.
+ // do not try to do agg, just init and serialize
directly return the out_block
+ if (used_too_much_memory ||
(hash_tbl.add_elem_size_overflow(rows) &&
+
!_should_expand_preagg_hash_tables())) {
+ SCOPED_TIMER(_streaming_agg_timer);
+ ret_flag = true;
+ }
+ }},
+ _agg_data->method_variant);
return ret_flag;
}
@@ -363,6 +358,30 @@ Status
StreamingAggLocalState::_pre_agg_with_serialized_key(doris::vectorized::B
_places.resize(rows);
if (_should_not_do_pre_agg(rows)) {
+ if (limit > 0) {
+ DCHECK(do_sort_limit);
+ if (need_do_sort_limit == -1) {
+ const size_t hash_table_size = _get_hash_table_size();
+ need_do_sort_limit = hash_table_size >= limit ? 1 : 0;
+ if (need_do_sort_limit == 1) {
+ build_limit_heap(hash_table_size);
+ }
+ }
+
+ if (need_do_sort_limit == 1) {
+ if (_do_limit_filter(rows, key_columns)) {
+ bool need_filter = std::find(need_computes.begin(),
need_computes.end(), 1) !=
+ need_computes.end();
+ if (need_filter) {
+ _add_limit_heap_top(key_columns, rows);
+ vectorized::Block::filter_block_internal(in_block,
need_computes);
+ rows = (uint32_t)in_block->rows();
+ } else {
+ return Status::OK();
+ }
+ }
+ }
+ }
bool mem_reuse = p._make_nullable_keys.empty() &&
out_block->mem_reuse();
std::vector<vectorized::DataTypePtr> data_types;
@@ -404,12 +423,23 @@ Status
StreamingAggLocalState::_pre_agg_with_serialized_key(doris::vectorized::B
}
}
} else {
- _emplace_into_hash_table(_places.data(), key_columns, rows);
+ bool need_agg = true;
+ if (need_do_sort_limit != 1) {
+ _emplace_into_hash_table(_places.data(), key_columns, rows);
+ } else {
+ need_agg = _emplace_into_hash_table_limit(_places.data(),
in_block, key_columns, rows);
+ }
- for (int i = 0; i < _aggregate_evaluators.size(); ++i) {
- RETURN_IF_ERROR(_aggregate_evaluators[i]->execute_batch_add(
- in_block, p._offsets_of_aggregate_states[i],
_places.data(), _agg_arena_pool,
- _should_expand_hash_table));
+ if (need_agg) {
+ for (int i = 0; i < _aggregate_evaluators.size(); ++i) {
+ RETURN_IF_ERROR(_aggregate_evaluators[i]->execute_batch_add(
+ in_block, p._offsets_of_aggregate_states[i],
_places.data(),
+ _agg_arena_pool, _should_expand_hash_table));
+ }
+ if (limit > 0 && need_do_sort_limit == -1 &&
_get_hash_table_size() >= limit) {
+ need_do_sort_limit = 1;
+ build_limit_heap(_get_hash_table_size());
+ }
}
}
@@ -561,6 +591,183 @@ void
StreamingAggLocalState::_destroy_agg_status(vectorized::AggregateDataPtr da
}
}
+vectorized::MutableColumns StreamingAggLocalState::_get_keys_hash_table() {
+ return std::visit(
+ vectorized::Overload {
+ [&](std::monostate& arg) {
+ throw doris::Exception(ErrorCode::INTERNAL_ERROR,
"uninited hash table");
+ return vectorized::MutableColumns();
+ },
+ [&](auto&& agg_method) -> vectorized::MutableColumns {
+ vectorized::MutableColumns key_columns;
+ for (int i = 0; i < _probe_expr_ctxs.size(); ++i) {
+ key_columns.emplace_back(
+
_probe_expr_ctxs[i]->root()->data_type()->create_column());
+ }
+ auto& data = *agg_method.hash_table;
+ bool has_null_key = data.has_null_key_data();
+ const auto size = data.size() - has_null_key;
+ using KeyType =
std::decay_t<decltype(agg_method)>::Key;
+ std::vector<KeyType> keys(size);
+
+ uint32_t num_rows = 0;
+ auto iter = _aggregate_data_container->begin();
+ {
+ while (iter != _aggregate_data_container->end()) {
+ keys[num_rows] = iter.get_key<KeyType>();
+ ++iter;
+ ++num_rows;
+ }
+ }
+ agg_method.insert_keys_into_columns(keys, key_columns,
num_rows);
+ if (has_null_key) {
+ key_columns[0]->insert_data(nullptr, 0);
+ }
+ return key_columns;
+ }},
+ _agg_data->method_variant);
+}
+
+void StreamingAggLocalState::build_limit_heap(size_t hash_table_size) {
+ limit_columns = _get_keys_hash_table();
+ for (size_t i = 0; i < hash_table_size; ++i) {
+ limit_heap.emplace(i, limit_columns, order_directions,
null_directions);
+ }
+ while (hash_table_size > limit) {
+ limit_heap.pop();
+ hash_table_size--;
+ }
+ limit_columns_min = limit_heap.top()._row_id;
+}
+
+void StreamingAggLocalState::_add_limit_heap_top(vectorized::ColumnRawPtrs&
key_columns,
+ size_t rows) {
+ for (int i = 0; i < rows; ++i) {
+ if (cmp_res[i] == 1 && need_computes[i]) {
+ for (int j = 0; j < key_columns.size(); ++j) {
+ limit_columns[j]->insert_from(*key_columns[j], i);
+ }
+ limit_heap.emplace(limit_columns[0]->size() - 1, limit_columns,
order_directions,
+ null_directions);
+ limit_heap.pop();
+ limit_columns_min = limit_heap.top()._row_id;
+ break;
+ }
+ }
+}
+
+void StreamingAggLocalState::_refresh_limit_heap(size_t i,
vectorized::ColumnRawPtrs& key_columns) {
+ for (int j = 0; j < key_columns.size(); ++j) {
+ limit_columns[j]->insert_from(*key_columns[j], i);
+ }
+ limit_heap.emplace(limit_columns[0]->size() - 1, limit_columns,
order_directions,
+ null_directions);
+ limit_heap.pop();
+ limit_columns_min = limit_heap.top()._row_id;
+}
+
+bool
StreamingAggLocalState::_emplace_into_hash_table_limit(vectorized::AggregateDataPtr*
places,
+ vectorized::Block*
block,
+
vectorized::ColumnRawPtrs& key_columns,
+ uint32_t num_rows)
{
+ return std::visit(
+ vectorized::Overload {
+ [&](std::monostate& arg) {
+ throw doris::Exception(ErrorCode::INTERNAL_ERROR,
"uninited hash table");
+ return true;
+ },
+ [&](auto&& agg_method) -> bool {
+ SCOPED_TIMER(_hash_table_compute_timer);
+ using HashMethodType =
std::decay_t<decltype(agg_method)>;
+ using AggState = typename HashMethodType::State;
+
+ bool need_filter = _do_limit_filter(num_rows,
key_columns);
+ if (auto need_agg =
+ std::find(need_computes.begin(),
need_computes.end(), 1);
+ need_agg != need_computes.end()) {
+ if (need_filter) {
+
vectorized::Block::filter_block_internal(block, need_computes);
+ num_rows = (uint32_t)block->rows();
+ }
+
+ AggState state(key_columns);
+ agg_method.init_serialized_keys(key_columns,
num_rows);
+ size_t i = 0;
+
+ auto creator = [&](const auto& ctor, auto& key,
auto& origin) {
+ try {
+
HashMethodType::try_presis_key_and_origin(key, origin,
+
_agg_arena_pool);
+ auto mapped =
_aggregate_data_container->append_data(origin);
+ auto st = _create_agg_status(mapped);
+ if (!st) {
+ throw Exception(st.code(),
st.to_string());
+ }
+ ctor(key, mapped);
+ _refresh_limit_heap(i, key_columns);
+ } catch (...) {
+ // Exception-safety - if it can not
allocate memory or create status,
+ // the destructors will not be called.
+ ctor(key, nullptr);
+ throw;
+ }
+ };
+
+ auto creator_for_null_key = [&](auto& mapped) {
+ mapped = _agg_arena_pool.aligned_alloc(
+ Base::_parent->template
cast<StreamingAggOperatorX>()
+
._total_size_of_aggregate_states,
+ Base::_parent->template
cast<StreamingAggOperatorX>()
+ ._align_aggregate_states);
+ auto st = _create_agg_status(mapped);
+ if (!st) {
+ throw Exception(st.code(), st.to_string());
+ }
+ _refresh_limit_heap(i, key_columns);
+ };
+
+ SCOPED_TIMER(_hash_table_emplace_timer);
+ for (i = 0; i < num_rows; ++i) {
+ places[i] = *agg_method.lazy_emplace(state, i,
creator,
+
creator_for_null_key);
+ }
+ COUNTER_UPDATE(_hash_table_input_counter,
num_rows);
+ return true;
+ }
+ return false;
+ }},
+ _agg_data->method_variant);
+}
+
+bool StreamingAggLocalState::_do_limit_filter(size_t num_rows,
+ vectorized::ColumnRawPtrs&
key_columns) {
+ SCOPED_TIMER(_hash_table_limit_compute_timer);
+ if (num_rows) {
+ cmp_res.resize(num_rows);
+ need_computes.resize(num_rows);
+ memset(need_computes.data(), 0, need_computes.size());
+ memset(cmp_res.data(), 0, cmp_res.size());
+
+ const auto key_size = null_directions.size();
+ for (int i = 0; i < key_size; i++) {
+ key_columns[i]->compare_internal(limit_columns_min,
*limit_columns[i],
+ null_directions[i],
order_directions[i], cmp_res,
+ need_computes.data());
+ }
+
+ auto set_computes_arr = [](auto* __restrict res, auto* __restrict
computes, size_t rows) {
+ for (size_t i = 0; i < rows; ++i) {
+ computes[i] = computes[i] == res[i];
+ }
+ };
+ set_computes_arr(cmp_res.data(), need_computes.data(), num_rows);
+
+ return std::find(need_computes.begin(), need_computes.end(), 0) !=
need_computes.end();
+ }
+
+ return false;
+}
+
void
StreamingAggLocalState::_emplace_into_hash_table(vectorized::AggregateDataPtr*
places,
vectorized::ColumnRawPtrs& key_columns,
const uint32_t num_rows)
{
@@ -615,7 +822,6 @@ StreamingAggOperatorX::StreamingAggOperatorX(ObjectPool*
pool, int operator_id,
_intermediate_tuple_id(tnode.agg_node.intermediate_tuple_id),
_output_tuple_id(tnode.agg_node.output_tuple_id),
_needs_finalize(tnode.agg_node.need_finalize),
- _is_merge(false),
_is_first_phase(tnode.agg_node.__isset.is_first_phase &&
tnode.agg_node.is_first_phase),
_have_conjuncts(tnode.__isset.vconjunct &&
!tnode.vconjunct.nodes.empty()),
_agg_fn_output_row_descriptor(descs, tnode.row_tuples,
tnode.nullable_tuples) {}
@@ -673,8 +879,33 @@ Status StreamingAggOperatorX::init(const TPlanNode& tnode,
RuntimeState* state)
}
const auto& agg_functions = tnode.agg_node.aggregate_functions;
- _is_merge = std::any_of(agg_functions.cbegin(), agg_functions.cend(),
- [](const auto& e) { return
e.nodes[0].agg_expr.is_merge_agg; });
+ auto is_merge = std::any_of(agg_functions.cbegin(), agg_functions.cend(),
+ [](const auto& e) { return
e.nodes[0].agg_expr.is_merge_agg; });
+ if (is_merge || _needs_finalize) {
+ return Status::InvalidArgument(
+ "StreamingAggLocalState only support no merge and no finalize,
"
+ "but got is_merge={}, needs_finalize={}",
+ is_merge, _needs_finalize);
+ }
+
+ // Handle sort limit
+ if (tnode.agg_node.__isset.agg_sort_info_by_group_key) {
+ _sort_limit = _limit;
+ _limit = -1;
+ _do_sort_limit = true;
+ const auto& agg_sort_info = tnode.agg_node.agg_sort_info_by_group_key;
+ DCHECK_EQ(agg_sort_info.nulls_first.size(),
agg_sort_info.is_asc_order.size());
+
+ const size_t order_by_key_size = agg_sort_info.is_asc_order.size();
+ _order_directions.resize(order_by_key_size);
+ _null_directions.resize(order_by_key_size);
+ for (int i = 0; i < order_by_key_size; ++i) {
+ _order_directions[i] = agg_sort_info.is_asc_order[i] ? 1 : -1;
+ _null_directions[i] =
+ agg_sort_info.nulls_first[i] ? -_order_directions[i] :
_order_directions[i];
+ }
+ }
+
_op_name = "STREAMING_AGGREGATION_OPERATOR";
return Status::OK();
}
diff --git a/be/src/pipeline/exec/streaming_aggregation_operator.h
b/be/src/pipeline/exec/streaming_aggregation_operator.h
index d5b09c7eb25..a9e8cc54ba8 100644
--- a/be/src/pipeline/exec/streaming_aggregation_operator.h
+++ b/be/src/pipeline/exec/streaming_aggregation_operator.h
@@ -48,6 +48,7 @@ public:
Status do_pre_agg(RuntimeState* state, vectorized::Block* input_block,
vectorized::Block* output_block);
void make_nullable_output_key(vectorized::Block* block);
+ void build_limit_heap(size_t hash_table_size);
private:
friend class StreamingAggOperatorX;
@@ -55,6 +56,10 @@ private:
friend class StatefulOperatorX;
size_t _memory_usage() const;
+ void _add_limit_heap_top(vectorized::ColumnRawPtrs& key_columns, size_t
rows);
+ bool _do_limit_filter(size_t num_rows, vectorized::ColumnRawPtrs&
key_columns);
+ void _refresh_limit_heap(size_t i, vectorized::ColumnRawPtrs& key_columns);
+
Status _pre_agg_with_serialized_key(doris::vectorized::Block* in_block,
doris::vectorized::Block* out_block);
bool _should_expand_preagg_hash_tables();
@@ -68,11 +73,15 @@ private:
bool* eos);
void _emplace_into_hash_table(vectorized::AggregateDataPtr* places,
vectorized::ColumnRawPtrs& key_columns,
const uint32_t num_rows);
+ bool _emplace_into_hash_table_limit(vectorized::AggregateDataPtr* places,
+ vectorized::Block* block,
+ vectorized::ColumnRawPtrs&
key_columns, uint32_t num_rows);
Status _create_agg_status(vectorized::AggregateDataPtr data);
size_t _get_hash_table_size();
RuntimeProfile::Counter* _streaming_agg_timer = nullptr;
RuntimeProfile::Counter* _hash_table_compute_timer = nullptr;
+ RuntimeProfile::Counter* _hash_table_limit_compute_timer = nullptr;
RuntimeProfile::Counter* _hash_table_emplace_timer = nullptr;
RuntimeProfile::Counter* _hash_table_input_counter = nullptr;
RuntimeProfile::Counter* _build_timer = nullptr;
@@ -95,10 +104,70 @@ private:
// group by k1,k2
vectorized::VExprContextSPtrs _probe_expr_ctxs;
std::unique_ptr<AggregateDataContainer> _aggregate_data_container =
nullptr;
- bool _should_limit_output = false;
bool _reach_limit = false;
size_t _input_num_rows = 0;
+ int64_t limit = -1;
+ int need_do_sort_limit = -1;
+ bool do_sort_limit = false;
+ vectorized::MutableColumns limit_columns;
+ int limit_columns_min = -1;
+ vectorized::PaddedPODArray<uint8_t> need_computes;
+ std::vector<uint8_t> cmp_res;
+ std::vector<int> order_directions;
+ std::vector<int> null_directions;
+
+ struct HeapLimitCursor {
+ HeapLimitCursor(int row_id, vectorized::MutableColumns& limit_columns,
+ std::vector<int>& order_directions, std::vector<int>&
null_directions)
+ : _row_id(row_id),
+ _limit_columns(limit_columns),
+ _order_directions(order_directions),
+ _null_directions(null_directions) {}
+
+ HeapLimitCursor(const HeapLimitCursor& other) = default;
+
+ HeapLimitCursor(HeapLimitCursor&& other) noexcept
+ : _row_id(other._row_id),
+ _limit_columns(other._limit_columns),
+ _order_directions(other._order_directions),
+ _null_directions(other._null_directions) {}
+
+ HeapLimitCursor& operator=(const HeapLimitCursor& other) noexcept {
+ _row_id = other._row_id;
+ return *this;
+ }
+
+ HeapLimitCursor& operator=(HeapLimitCursor&& other) noexcept {
+ _row_id = other._row_id;
+ return *this;
+ }
+
+ bool operator<(const HeapLimitCursor& rhs) const {
+ for (int i = 0; i < _limit_columns.size(); ++i) {
+ const auto& _limit_column = _limit_columns[i];
+ auto res = _limit_column->compare_at(_row_id, rhs._row_id,
*_limit_column,
+ _null_directions[i]) *
+ _order_directions[i];
+ if (res < 0) {
+ return true;
+ } else if (res > 0) {
+ return false;
+ }
+ }
+ return false;
+ }
+
+ int _row_id;
+ vectorized::MutableColumns& _limit_columns;
+ std::vector<int>& _order_directions;
+ std::vector<int>& _null_directions;
+ };
+
+ std::priority_queue<HeapLimitCursor> limit_heap;
+
+ vectorized::MutableColumns _get_keys_hash_table();
+
vectorized::PODArray<vectorized::AggregateDataPtr> _places;
std::vector<char> _deserialize_buffer;
@@ -185,7 +254,6 @@ private:
TupleId _output_tuple_id;
TupleDescriptor* _output_tuple_desc = nullptr;
bool _needs_finalize;
- bool _is_merge;
const bool _is_first_phase;
size_t _align_aggregate_states = 1;
/// The offset to the n-th aggregate function in a row of aggregate
functions.
@@ -202,6 +270,12 @@ private:
std::vector<size_t> _make_nullable_keys;
bool _have_conjuncts;
RowDescriptor _agg_fn_output_row_descriptor;
+ // For sort limit
+ bool _do_sort_limit = false;
+ int64_t _sort_limit = -1;
+ std::vector<int> _order_directions;
+ std::vector<int> _null_directions;
+
std::vector<TExpr> _partition_exprs;
};
diff --git a/be/src/vec/exec/scan/scanner.cpp b/be/src/vec/exec/scan/scanner.cpp
index 483aa545eec..aad88f1b41b 100644
--- a/be/src/vec/exec/scan/scanner.cpp
+++ b/be/src/vec/exec/scan/scanner.cpp
@@ -79,8 +79,39 @@ Status Scanner::init(RuntimeState* state, const
VExprContextSPtrs& conjuncts) {
Status Scanner::get_block_after_projects(RuntimeState* state,
vectorized::Block* block, bool* eos) {
auto& row_descriptor = _local_state->_parent->row_descriptor();
if (_output_row_descriptor) {
-
_origin_block.clear_column_data(row_descriptor.num_materialized_slots());
- RETURN_IF_ERROR(get_block(state, &_origin_block, eos));
+ if (_alreay_eos) {
+ *eos = true;
+ _padding_block.swap(_origin_block);
+ } else {
+
_origin_block.clear_column_data(row_descriptor.num_materialized_slots());
+ const auto min_batch_size = std::max(state->batch_size() / 2, 1);
+ while (_padding_block.rows() < min_batch_size && !*eos) {
+ RETURN_IF_ERROR(get_block(state, &_origin_block, eos));
+ if (_origin_block.rows() >= min_batch_size) {
+ break;
+ }
+
+ if (_origin_block.rows() + _padding_block.rows() <=
state->batch_size()) {
+ RETURN_IF_ERROR(_merge_padding_block());
+
_origin_block.clear_column_data(row_descriptor.num_materialized_slots());
+ } else {
+ if (_origin_block.rows() < _padding_block.rows()) {
+ _padding_block.swap(_origin_block);
+ }
+ break;
+ }
+ }
+ }
+
+ // first output the origin block change eos = false, next time output
padding block
+ // set the eos to true
+ if (*eos && !_padding_block.empty() && !_origin_block.empty()) {
+ _alreay_eos = true;
+ *eos = false;
+ }
+ if (_origin_block.empty() && !_padding_block.empty()) {
+ _padding_block.swap(_origin_block);
+ }
return _do_projections(&_origin_block, block);
} else {
return get_block(state, block, eos);
diff --git a/be/src/vec/exec/scan/scanner.h b/be/src/vec/exec/scan/scanner.h
index dec0349c8fc..9840eac1fd8 100644
--- a/be/src/vec/exec/scan/scanner.h
+++ b/be/src/vec/exec/scan/scanner.h
@@ -107,6 +107,16 @@ protected:
// Subclass should implement this to return data.
virtual Status _get_block_impl(RuntimeState* state, Block* block, bool*
eof) = 0;
+ Status _merge_padding_block() {
+ if (_padding_block.empty()) {
+ _padding_block.swap(_origin_block);
+ } else if (_origin_block.rows()) {
+ RETURN_IF_ERROR(
+
MutableBlock::build_mutable_block(&_padding_block).merge(_origin_block));
+ }
+ return Status::OK();
+ }
+
// Update the counters before closing this scanner
virtual void _collect_profile_before_close();
@@ -209,6 +219,8 @@ protected:
// Used in common subexpression elimination to compute intermediate
results.
std::vector<vectorized::VExprContextSPtrs> _intermediate_projections;
vectorized::Block _origin_block;
+ vectorized::Block _padding_block;
+ bool _alreay_eos = false;
VExprContextSPtrs _common_expr_ctxs_push_down;
diff --git a/be/src/vec/exprs/vcompound_pred.h
b/be/src/vec/exprs/vcompound_pred.h
index c3925786261..2d39319cedc 100644
--- a/be/src/vec/exprs/vcompound_pred.h
+++ b/be/src/vec/exprs/vcompound_pred.h
@@ -240,15 +240,7 @@ public:
result_column = std::move(col_res);
}
- if constexpr (is_and_op) {
- for (size_t i = 0; i < size; ++i) {
- lhs_data_column[i] &= rhs_data_column[i];
- }
- } else {
- for (size_t i = 0; i < size; ++i) {
- lhs_data_column[i] |= rhs_data_column[i];
- }
- }
+ do_not_null_pred<is_and_op>(lhs_data_column, rhs_data_column,
size);
};
auto vector_vector_null = [&]<bool is_and_op>() {
auto col_res = ColumnUInt8::create(size);
@@ -265,19 +257,9 @@ public:
auto* __restrict lhs_data_column_tmp = lhs_data_column;
auto* __restrict rhs_data_column_tmp = rhs_data_column;
- if constexpr (is_and_op) {
- for (size_t i = 0; i < size; ++i) {
- res_nulls[i] = apply_and_null(lhs_data_column_tmp[i],
lhs_null_map_tmp[i],
- rhs_data_column_tmp[i],
rhs_null_map_tmp[i]);
- res_datas[i] = lhs_data_column_tmp[i] &
rhs_data_column_tmp[i];
- }
- } else {
- for (size_t i = 0; i < size; ++i) {
- res_nulls[i] = apply_or_null(lhs_data_column_tmp[i],
lhs_null_map_tmp[i],
- rhs_data_column_tmp[i],
rhs_null_map_tmp[i]);
- res_datas[i] = lhs_data_column_tmp[i] |
rhs_data_column_tmp[i];
- }
- }
+ do_null_pred<is_and_op>(lhs_data_column_tmp, lhs_null_map_tmp,
rhs_data_column_tmp,
+ rhs_null_map_tmp, res_datas, res_nulls,
size);
+
result_column = ColumnNullable::create(std::move(col_res),
std::move(col_nulls));
};
@@ -358,6 +340,47 @@ private:
return (l_null & r_null) | (r_null & (r_null ^ a)) | (l_null & (l_null
^ b));
}
+ template <bool is_and>
+ void static do_not_null_pred(uint8_t* __restrict lhs, uint8_t* __restrict
rhs, size_t size) {
+#ifdef NDEBUG
+#if defined(__clang__)
+#pragma clang loop vectorize(enable)
+#elif defined(__GNUC__) && (__GNUC__ >= 5)
+#pragma GCC ivdep
+#endif
+#endif
+ for (size_t i = 0; i < size; ++i) {
+ if constexpr (is_and) {
+ lhs[i] &= rhs[i];
+ } else {
+ lhs[i] |= rhs[i];
+ }
+ }
+ }
+
+ template <bool is_and>
+ void static do_null_pred(uint8_t* __restrict lhs_data, uint8_t* __restrict
lhs_null,
+ uint8_t* __restrict rhs_data, uint8_t* __restrict
rhs_null,
+ uint8_t* __restrict res_data, uint8_t* __restrict
res_null,
+ size_t size) {
+#ifdef NDEBUG
+#if defined(__clang__)
+#pragma clang loop vectorize(enable)
+#elif defined(__GNUC__) && (__GNUC__ >= 5)
+#pragma GCC ivdep
+#endif
+#endif
+ for (size_t i = 0; i < size; ++i) {
+ if constexpr (is_and) {
+ res_null[i] = apply_and_null(lhs_data[i], lhs_null[i],
rhs_data[i], rhs_null[i]);
+ res_data[i] = lhs_data[i] & rhs_data[i];
+ } else {
+ res_null[i] = apply_or_null(lhs_data[i], lhs_null[i],
rhs_data[i], rhs_null[i]);
+ res_data[i] = lhs_data[i] | rhs_data[i];
+ }
+ }
+ }
+
bool _has_const_child() const {
return std::ranges::any_of(_children,
[](const VExprSPtr& arg) -> bool { return
arg->is_constant(); });
diff --git a/be/test/pipeline/operator/streaming_agg_operator_test.cpp
b/be/test/pipeline/operator/streaming_agg_operator_test.cpp
index 91ca56572be..664984db34a 100644
--- a/be/test/pipeline/operator/streaming_agg_operator_test.cpp
+++ b/be/test/pipeline/operator/streaming_agg_operator_test.cpp
@@ -109,7 +109,6 @@ TEST_F(StreamingAggOperatorTest, test1) {
false));
op->_pool = &pool;
op->_needs_finalize = false;
- op->_is_merge = false;
EXPECT_TRUE(op->set_child(child_op));
@@ -157,7 +156,9 @@ TEST_F(StreamingAggOperatorTest, test1) {
EXPECT_TRUE(op->need_more_input_data(state.get()));
}
- { EXPECT_TRUE(local_state->close(state.get()).ok()); }
+ {
+ EXPECT_TRUE(local_state->close(state.get()).ok());
+ }
}
TEST_F(StreamingAggOperatorTest, test2) {
@@ -166,7 +167,6 @@ TEST_F(StreamingAggOperatorTest, test2) {
false));
op->_pool = &pool;
op->_needs_finalize = false;
- op->_is_merge = false;
EXPECT_TRUE(op->set_child(child_op));
@@ -234,7 +234,9 @@ TEST_F(StreamingAggOperatorTest, test2) {
EXPECT_EQ(block.rows(), 3);
}
- { EXPECT_TRUE(local_state->close(state.get()).ok()); }
+ {
+ EXPECT_TRUE(local_state->close(state.get()).ok());
+ }
}
TEST_F(StreamingAggOperatorTest, test3) {
@@ -243,7 +245,6 @@ TEST_F(StreamingAggOperatorTest, test3) {
false));
op->_pool = &pool;
op->_needs_finalize = false;
- op->_is_merge = false;
EXPECT_TRUE(op->set_child(child_op));
@@ -314,7 +315,9 @@ TEST_F(StreamingAggOperatorTest, test3) {
EXPECT_EQ(block.rows(), 3);
}
- { EXPECT_TRUE(local_state->close(state.get()).ok()); }
+ {
+ EXPECT_TRUE(local_state->close(state.get()).ok());
+ }
}
TEST_F(StreamingAggOperatorTest, test4) {
@@ -323,7 +326,6 @@ TEST_F(StreamingAggOperatorTest, test4) {
std::make_shared<DataTypeBitMap>(),
false));
op->_pool = &pool;
op->_needs_finalize = false;
- op->_is_merge = false;
EXPECT_TRUE(op->set_child(child_op));
@@ -406,7 +408,9 @@ TEST_F(StreamingAggOperatorTest, test4) {
// << "Expected: " << res_block.dump_data() << ", but got: "
<< block.dump_data();
}
- { EXPECT_TRUE(local_state->close(state.get()).ok()); }
+ {
+ EXPECT_TRUE(local_state->close(state.get()).ok());
+ }
}
} // namespace doris::pipeline
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/nereids/glue/translator/PhysicalPlanTranslator.java
b/fe/fe-core/src/main/java/org/apache/doris/nereids/glue/translator/PhysicalPlanTranslator.java
index 7f22f4aa59c..43b05b6fe10 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/nereids/glue/translator/PhysicalPlanTranslator.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/nereids/glue/translator/PhysicalPlanTranslator.java
@@ -346,8 +346,7 @@ public class PhysicalPlanTranslator extends
DefaultPlanVisitor<PlanFragment, Pla
if (upstreamFragment.getPlanRoot() instanceof AggregationNode &&
upstream instanceof PhysicalHashAggregate) {
PhysicalHashAggregate<?> hashAggregate =
(PhysicalHashAggregate<?>) upstream;
if (hashAggregate.getAggPhase() == AggPhase.LOCAL
- && hashAggregate.getAggMode() == AggMode.INPUT_TO_BUFFER
- && hashAggregate.getTopnPushInfo() == null) {
+ && hashAggregate.getAggMode() == AggMode.INPUT_TO_BUFFER) {
AggregationNode aggregationNode = (AggregationNode)
upstreamFragment.getPlanRoot();
aggregationNode.setUseStreamingPreagg(hashAggregate.isMaybeUsingStream());
}
diff --git
a/regression-test/suites/nereids_tpch_p0/tpch/push_topn_to_agg.groovy
b/regression-test/suites/nereids_tpch_p0/tpch/push_topn_to_agg.groovy
index 06975eef5ea..5e694b4781d 100644
--- a/regression-test/suites/nereids_tpch_p0/tpch/push_topn_to_agg.groovy
+++ b/regression-test/suites/nereids_tpch_p0/tpch/push_topn_to_agg.groovy
@@ -32,7 +32,6 @@ suite("push_topn_to_agg") {
explain{
sql "select o_custkey, sum(o_shippriority) from orders group by
o_custkey limit 4;"
multiContains ("sortByGroupKey:true", 2)
- notContains("STREAMING")
}
// when apply this opt, trun off STREAMING
@@ -40,14 +39,12 @@ suite("push_topn_to_agg") {
explain{
sql "select sum(c_custkey), c_name from customer group by c_name limit
6;"
multiContains ("sortByGroupKey:true", 2)
- notContains("STREAMING")
}
// topn -> agg
explain{
sql "select o_custkey, sum(o_shippriority) from orders group by
o_custkey order by o_custkey limit 8;"
multiContains ("sortByGroupKey:true", 2)
- notContains("STREAMING")
}
// order keys are part of group keys,
@@ -185,4 +182,4 @@ suite("push_topn_to_agg") {
| planed with unknown column statistics
|
+--------------------------------------------------------------------------------+
**/
-}
\ No newline at end of file
+}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]