Copilot commented on code in PR #61169:
URL: https://github.com/apache/doris/pull/61169#discussion_r2909891512
##########
be/src/exec/operator/streaming_aggregation_operator.cpp:
##########
@@ -199,141 +162,227 @@ Status StreamingAggLocalState::_init_hash_method(const
VExprContextSPtrs& probe_
return Status::OK();
}
-Status StreamingAggLocalState::do_pre_agg(RuntimeState* state, Block*
input_block,
- Block* output_block) {
- if (low_memory_mode()) {
- auto& p = Base::_parent->template cast<StreamingAggOperatorX>();
- p.set_low_memory_mode(state);
- }
- RETURN_IF_ERROR(_pre_agg_with_serialized_key(input_block, output_block));
-
- // pre stream agg need use _num_row_return to decide whether to do pre
stream agg
- _cur_num_rows_returned += output_block->rows();
- make_nullable_output_key(output_block);
+Status StreamingAggLocalState::do_pre_agg(RuntimeState* state, Block*
input_block) {
+ RETURN_IF_ERROR(_pre_agg_with_serialized_key(input_block));
_update_memusage_with_serialized_key();
return Status::OK();
}
-bool StreamingAggLocalState::_should_expand_preagg_hash_tables() {
- if (!_should_expand_hash_table) {
- return false;
- }
+/// Flush the hash table contents into a new Block and push it to
_output_blocks.
+/// Then clear all aggregation state: destroy agg states, clear HT, arena,
container.
+Status StreamingAggLocalState::_flush_and_reset_hash_table() {
+ SCOPED_TIMER(_flush_timer);
+ auto& p = Base::_parent->template cast<StreamingAggOperatorX>();
+ const auto key_size = _probe_expr_ctxs.size();
+ const auto agg_size = _aggregate_evaluators.size();
return std::visit(
Overload {
- [&](std::monostate& arg) -> bool {
+ [&](std::monostate& arg) -> Status {
throw doris::Exception(ErrorCode::INTERNAL_ERROR,
"uninited hash table");
- return false;
+ return Status::OK();
},
- [&](auto& agg_method) -> bool {
- auto& hash_tbl = *agg_method.hash_table;
- auto [ht_mem, ht_rows] =
- std::pair
{hash_tbl.get_buffer_size_in_bytes(), hash_tbl.size()};
-
- // Need some rows in tables to have valid statistics.
- if (ht_rows == 0) {
- return true;
+ [&](auto& agg_method) -> Status {
+ auto& data = *agg_method.hash_table;
+ const auto ht_size = data.size();
+ if (ht_size == 0) {
+ return Status::OK();
}
- // Find the appropriate reduction factor in our table
for the current hash table sizes.
- int cache_level = 0;
- while (cache_level + 1 <
STREAMING_HT_MIN_REDUCTION_SIZE &&
- ht_mem >=
STREAMING_HT_MIN_REDUCTION[cache_level + 1].min_ht_mem) {
- ++cache_level;
+ _abandoned_count += ht_size;
+
+ using KeyType =
std::decay_t<decltype(agg_method)>::Key;
+
+ MutableColumns key_columns;
+ for (size_t i = 0; i < key_size; ++i) {
+ key_columns.emplace_back(
+
_probe_expr_ctxs[i]->root()->data_type()->create_column());
+ }
+
+ MutableColumns value_columns(agg_size);
+ DataTypes value_data_types(agg_size);
+ for (size_t i = 0; i < agg_size; ++i) {
+ value_data_types[i] =
+
_aggregate_evaluators[i]->function()->get_serialized_type();
+ value_columns[i] =
+
_aggregate_evaluators[i]->function()->create_serialize_column();
+ }
+
+ // Iterate through the AggregateDataContainer to
extract keys and values.
+ std::vector<KeyType> keys(ht_size);
+ std::vector<AggregateDataPtr> values(ht_size);
+
+ uint32_t num_rows = 0;
+ auto iter = _aggregate_data_container->begin();
+ while (iter != _aggregate_data_container->end() &&
num_rows < ht_size) {
+ keys[num_rows] = iter.template get_key<KeyType>();
+ values[num_rows] = iter.get_aggregate_data();
+ ++iter;
+ ++num_rows;
+ }
+
+ agg_method.insert_keys_into_columns(keys, key_columns,
num_rows);
+
+ // Handle null key if present
+ if (data.has_null_key_data()) {
+ DCHECK(key_columns.size() == 1);
+ DCHECK(key_columns[0]->is_nullable());
+ key_columns[0]->insert_data(nullptr, 0);
+ values.push_back(data.template
get_null_key_data<AggregateDataPtr>());
+ ++num_rows;
}
- // Compare the number of rows in the hash table with
the number of input rows that
- // were aggregated into it. Exclude passed through
rows from this calculation since
- // they were not in hash tables.
- const int64_t input_rows = _input_num_rows;
- const int64_t aggregated_input_rows = input_rows -
_cur_num_rows_returned;
- // TODO chenhao
- // const int64_t expected_input_rows =
estimated_input_cardinality_ - num_rows_returned_;
- double current_reduction =
static_cast<double>(aggregated_input_rows) /
-
static_cast<double>(ht_rows);
-
- // TODO: workaround for IMPALA-2490: subplan node
rows_returned counter may be
- // inaccurate, which could lead to a divide by zero
below.
- if (aggregated_input_rows <= 0) {
- return true;
+ // Serialize aggregate states to columns
+ for (size_t i = 0; i < agg_size; ++i) {
+
_aggregate_evaluators[i]->function()->serialize_to_column(
+ values, p._offsets_of_aggregate_states[i],
value_columns[i],
+ num_rows);
}
- // Extrapolate the current reduction factor (r) using
the formula
- // R = 1 + (N / n) * (r - 1), where R is the reduction
factor over the full input data
- // set, N is the number of input rows, excluding
passed-through rows, and n is the
- // number of rows inserted or merged into the hash
tables. This is a very rough
- // approximation but is good enough to be useful.
- // TODO: consider collecting more statistics to better
estimate reduction.
- // double estimated_reduction = aggregated_input_rows
>= expected_input_rows
- // ? current_reduction
- // : 1 + (expected_input_rows /
aggregated_input_rows) * (current_reduction - 1);
- double min_reduction =
-
STREAMING_HT_MIN_REDUCTION[cache_level].streaming_ht_min_reduction;
-
- // COUNTER_SET(preagg_estimated_reduction_,
estimated_reduction);
- // COUNTER_SET(preagg_streaming_ht_min_reduction_,
min_reduction);
- // return estimated_reduction > min_reduction;
- _should_expand_hash_table = current_reduction >
min_reduction;
- return _should_expand_hash_table;
+ // Build the output block
+ ColumnsWithTypeAndName columns_with_schema;
+ for (size_t i = 0; i < key_size; ++i) {
+ columns_with_schema.emplace_back(
+ std::move(key_columns[i]),
+ _probe_expr_ctxs[i]->root()->data_type(),
+ _probe_expr_ctxs[i]->root()->expr_name());
+ }
+ for (size_t i = 0; i < agg_size; ++i) {
+
columns_with_schema.emplace_back(std::move(value_columns[i]),
+
value_data_types[i], "");
+ }
+ Block flush_block(columns_with_schema);
+ make_nullable_output_key(&flush_block);
+ _output_blocks.push_back(std::move(flush_block));
+
Review Comment:
`_flush_and_reset_hash_table()` materializes the entire hash table into a
single `flush_block` and pushes it to `_output_blocks`. With
`MICRO_HT_CAPACITY` this block can be very large (e.g., O(1e5) rows), while
other result paths (`_get_results_with_serialized_key`) cap output to
`state->batch_size()`. Consider chunking the flush output into blocks of at
most `batch_size` rows (or otherwise ensuring downstream operators can safely
handle blocks much larger than `batch_size`).
##########
be/src/exec/operator/streaming_aggregation_operator.cpp:
##########
@@ -382,68 +432,127 @@ Status
StreamingAggLocalState::_pre_agg_with_serialized_key(doris::Block* in_blo
}
}
}
- bool mem_reuse = p._make_nullable_keys.empty() &&
out_block->mem_reuse();
-
- std::vector<DataTypePtr> data_types;
- MutableColumns value_columns;
- for (int i = 0; i < _aggregate_evaluators.size(); ++i) {
- auto data_type =
_aggregate_evaluators[i]->function()->get_serialized_type();
- if (mem_reuse) {
- value_columns.emplace_back(
- std::move(*out_block->get_by_position(i +
key_size).column).mutate());
- } else {
- value_columns.emplace_back(
-
_aggregate_evaluators[i]->function()->create_serialize_column());
- }
- data_types.emplace_back(data_type);
- }
- for (int i = 0; i != _aggregate_evaluators.size(); ++i) {
- SCOPED_TIMER(_insert_values_to_column_timer);
-
RETURN_IF_ERROR(_aggregate_evaluators[i]->streaming_agg_serialize_to_column(
- in_block, value_columns[i], rows, _agg_arena_pool));
- }
+ return _output_pass_through(in_block, key_columns, rows);
+ }
- if (!mem_reuse) {
- ColumnsWithTypeAndName columns_with_schema;
- for (int i = 0; i < key_size; ++i) {
-
columns_with_schema.emplace_back(key_columns[i]->clone_resized(rows),
-
_probe_expr_ctxs[i]->root()->data_type(),
-
_probe_expr_ctxs[i]->root()->expr_name());
- }
- for (int i = 0; i < value_columns.size(); ++i) {
- columns_with_schema.emplace_back(std::move(value_columns[i]),
data_types[i], "");
- }
- out_block->swap(Block(columns_with_schema));
- } else {
- for (int i = 0; i < key_size; ++i) {
- std::move(*out_block->get_by_position(i).column)
- .mutate()
- ->insert_range_from(*key_columns[i], 0, rows);
- }
- }
- } else {
- bool need_agg = true;
- if (need_do_sort_limit != 1) {
- _emplace_into_hash_table(_places.data(), key_columns, rows);
- } else {
- need_agg = _emplace_into_hash_table_limit(_places.data(),
in_block, key_columns, rows);
- }
+ // --- Normal aggregation path (PROBING / RESIZED / STABLE) ---
+ _places.resize(rows);
- if (need_agg) {
- for (int i = 0; i < _aggregate_evaluators.size(); ++i) {
- RETURN_IF_ERROR(_aggregate_evaluators[i]->execute_batch_add(
- in_block, p._offsets_of_aggregate_states[i],
_places.data(),
- _agg_arena_pool, _should_expand_hash_table));
- }
- if (limit > 0 && need_do_sort_limit == -1 &&
_get_hash_table_size() >= limit) {
- need_do_sort_limit = 1;
- build_limit_heap(_get_hash_table_size());
- }
+ // Check if the adaptive decision needs to be made.
+ // We check before processing the current batch so that the decision takes
effect
+ // starting from this batch.
+ const size_t sink_count_before = _sink_count;
+ _sink_count += rows;
+
+ if (_adaptive_phase == AdaptivePhase::PROBING && sink_count_before <
ADAPTIVITY_THRESHOLD &&
+ _sink_count >= ADAPTIVITY_THRESHOLD) {
+ _check_adaptive_decision();
+
+ // If decision was pass-through, flush HT then handle this batch as
pass-through
+ if (_adaptive_phase == AdaptivePhase::PASS_THROUGH) {
+ RETURN_IF_ERROR(_flush_and_reset_hash_table());
+ return _output_pass_through(in_block, key_columns, rows);
}
}
- return Status::OK();
+ // Process the batch: row-by-row emplace with abandon-on-full logic.
+ return std::visit(
+ Overload {[&](std::monostate& arg) -> Status {
+ throw doris::Exception(ErrorCode::INTERNAL_ERROR,
"uninited hash table");
+ return Status::OK();
+ },
+ [&](auto& agg_method) -> Status {
+ SCOPED_TIMER(_hash_table_compute_timer);
+ using HashMethodType =
std::decay_t<decltype(agg_method)>;
+ using AggState = typename HashMethodType::State;
+ AggState state(key_columns);
+ agg_method.init_serialized_keys(key_columns, rows);
+
+ // Update HLL with hash values during PROBING phase
+ if (_adaptive_phase == AdaptivePhase::PROBING &&
_hll) {
+ for (uint32_t i = 0; i < rows; ++i) {
+ _hll->update(agg_method.hash_values[i]);
+ }
+ }
+
+ auto creator = [this](const auto& ctor, auto& key,
auto& origin) {
+ HashMethodType::try_presis_key_and_origin(key,
origin,
+
_agg_arena_pool);
+ auto mapped =
_aggregate_data_container->append_data(origin);
+ auto st = _create_agg_status(mapped);
+ if (!st) {
+ throw Exception(st.code(), st.to_string());
+ }
+ ctor(key, mapped);
+ };
+
+ auto creator_for_null_key = [&](auto& mapped) {
+ mapped = _agg_arena_pool.aligned_alloc(
+ p._total_size_of_aggregate_states,
p._align_aggregate_states);
+ auto st = _create_agg_status(mapped);
+ if (!st) {
+ throw Exception(st.code(), st.to_string());
+ }
+ };
+
+ // Initialize all places to nullptr so that
+ // execute_batch_add_selected skips unprocessed rows.
+ std::fill(_places.begin(), _places.begin() + rows,
nullptr);
+
+ {
+ SCOPED_TIMER(_hash_table_emplace_timer);
+ for (uint32_t i = 0; i < rows; ++i) {
+ auto& hash_tbl = *agg_method.hash_table;
+
+ // Check if HT is full before inserting
(would-overflow check).
+ // If so, aggregate all pending rows, flush,
then continue.
+ if (hash_tbl.add_elem_size_overflow(1)) {
+ // Aggregate the rows emplaced since the
last flush.
+ // _places entries for unprocessed rows
are nullptr and
+ // will be skipped by
execute_batch_add_selected.
+ for (size_t j = 0; j <
_aggregate_evaluators.size(); ++j) {
+ RETURN_IF_ERROR(
+ _aggregate_evaluators[j]
+
->execute_batch_add_selected(
+ in_block,
+
p._offsets_of_aggregate_states[j],
+
_places.data(), _agg_arena_pool));
+ }
+ // Null out already-aggregated places so
they won't be
+ // re-aggregated in subsequent calls.
+ for (uint32_t k = 0; k < i; ++k) {
+ _places[k] = nullptr;
+ }
+
+
RETURN_IF_ERROR(_flush_and_reset_hash_table());
+ }
+
+ _places[i] = *agg_method.lazy_emplace(state,
i, creator,
+
creator_for_null_key);
+ }
+ }
+
+ COUNTER_UPDATE(_hash_table_input_counter, rows);
+
+ // Execute aggregate functions for remaining rows
that haven't
+ // been aggregated yet (non-null _places entries).
+ for (size_t i = 0; i < _aggregate_evaluators.size();
++i) {
+
RETURN_IF_ERROR(_aggregate_evaluators[i]->execute_batch_add_selected(
+ in_block,
p._offsets_of_aggregate_states[i], _places.data(),
+ _agg_arena_pool));
+ }
+
+ // Handle sort limit: init heap if we've reached the
limit
+ if (limit > 0 && need_do_sort_limit == -1 &&
+ _get_hash_table_size() >= limit) {
+ need_do_sort_limit = 1;
+ build_limit_heap(_get_hash_table_size());
+ }
Review Comment:
The sort-limit path for the normal aggregation mode appears incomplete: once
`need_do_sort_limit` becomes 1 and `build_limit_heap()` is called, subsequent
batches are no longer filtered by `_do_limit_filter()` (and the heap is never
refreshed for new keys). Previously this was handled via
`_emplace_into_hash_table_limit()`/`_refresh_limit_heap()`. As-is,
`do_sort_limit` will not actually reduce incoming rows or constrain the hash
table, which can lead to incorrect top-N by group key behavior and/or unbounded
HT growth. Please reintroduce the limit-filter + heap-refresh logic on the
aggregation path when `need_do_sort_limit == 1`.
##########
be/src/exec/operator/streaming_aggregation_operator.cpp:
##########
@@ -199,141 +162,227 @@ Status StreamingAggLocalState::_init_hash_method(const
VExprContextSPtrs& probe_
return Status::OK();
}
-Status StreamingAggLocalState::do_pre_agg(RuntimeState* state, Block*
input_block,
- Block* output_block) {
- if (low_memory_mode()) {
- auto& p = Base::_parent->template cast<StreamingAggOperatorX>();
- p.set_low_memory_mode(state);
- }
- RETURN_IF_ERROR(_pre_agg_with_serialized_key(input_block, output_block));
-
- // pre stream agg need use _num_row_return to decide whether to do pre
stream agg
- _cur_num_rows_returned += output_block->rows();
- make_nullable_output_key(output_block);
+Status StreamingAggLocalState::do_pre_agg(RuntimeState* state, Block*
input_block) {
+ RETURN_IF_ERROR(_pre_agg_with_serialized_key(input_block));
_update_memusage_with_serialized_key();
return Status::OK();
}
-bool StreamingAggLocalState::_should_expand_preagg_hash_tables() {
- if (!_should_expand_hash_table) {
- return false;
- }
+/// Flush the hash table contents into a new Block and push it to
_output_blocks.
+/// Then clear all aggregation state: destroy agg states, clear HT, arena,
container.
+Status StreamingAggLocalState::_flush_and_reset_hash_table() {
+ SCOPED_TIMER(_flush_timer);
+ auto& p = Base::_parent->template cast<StreamingAggOperatorX>();
+ const auto key_size = _probe_expr_ctxs.size();
+ const auto agg_size = _aggregate_evaluators.size();
return std::visit(
Overload {
- [&](std::monostate& arg) -> bool {
+ [&](std::monostate& arg) -> Status {
throw doris::Exception(ErrorCode::INTERNAL_ERROR,
"uninited hash table");
- return false;
+ return Status::OK();
},
- [&](auto& agg_method) -> bool {
- auto& hash_tbl = *agg_method.hash_table;
- auto [ht_mem, ht_rows] =
- std::pair
{hash_tbl.get_buffer_size_in_bytes(), hash_tbl.size()};
-
- // Need some rows in tables to have valid statistics.
- if (ht_rows == 0) {
- return true;
+ [&](auto& agg_method) -> Status {
+ auto& data = *agg_method.hash_table;
+ const auto ht_size = data.size();
+ if (ht_size == 0) {
+ return Status::OK();
}
- // Find the appropriate reduction factor in our table
for the current hash table sizes.
- int cache_level = 0;
- while (cache_level + 1 <
STREAMING_HT_MIN_REDUCTION_SIZE &&
- ht_mem >=
STREAMING_HT_MIN_REDUCTION[cache_level + 1].min_ht_mem) {
- ++cache_level;
+ _abandoned_count += ht_size;
+
+ using KeyType =
std::decay_t<decltype(agg_method)>::Key;
+
+ MutableColumns key_columns;
+ for (size_t i = 0; i < key_size; ++i) {
+ key_columns.emplace_back(
+
_probe_expr_ctxs[i]->root()->data_type()->create_column());
+ }
+
+ MutableColumns value_columns(agg_size);
+ DataTypes value_data_types(agg_size);
+ for (size_t i = 0; i < agg_size; ++i) {
+ value_data_types[i] =
+
_aggregate_evaluators[i]->function()->get_serialized_type();
+ value_columns[i] =
+
_aggregate_evaluators[i]->function()->create_serialize_column();
+ }
+
+ // Iterate through the AggregateDataContainer to
extract keys and values.
+ std::vector<KeyType> keys(ht_size);
+ std::vector<AggregateDataPtr> values(ht_size);
+
Review Comment:
In _flush_and_reset_hash_table(), `ht_size` comes from `data.size()` (which
includes the null-key row when `has_null_key_data()` is true), but the
`keys/values` vectors are sized to `ht_size` while the AggregateDataContainer
only stores non-null keys. This means the last slot in `values` can remain
uninitialized in the null-key case and later gets serialized, producing invalid
pointers / crashes. Consider sizing the vectors to the container count (e.g.,
`ht_size - has_null_key`), and when a null key exists, write it into
`values[num_rows]` (or resize before push) so that `values[0..num_rows-1]` are
all valid.
##########
be/src/exec/operator/streaming_aggregation_operator.h:
##########
@@ -52,27 +66,42 @@ class StreamingAggLocalState MOCK_REMOVE(final) : public
PipelineXLocalState<Fak
template <typename LocalStateType>
friend class StatefulOperatorX;
- size_t _memory_usage() const;
+ // --- Constants for adaptive streaming aggregation ---
+ /// Micro hash table slot capacity (2^17).
+ static constexpr size_t MICRO_HT_CAPACITY = 131072;
+ /// Row threshold for adaptive decision (2^20 = 1,048,576).
+ static constexpr size_t ADAPTIVITY_THRESHOLD = 1048576;
+ /// Rule A: cardinality ratio above which we go pass-through.
+ static constexpr double HIGH_CARDINALITY_RATIO = 0.95;
+ /// Rule B: abandoned_count / hll_count ratio above which we resize.
+ static constexpr double ABANDON_RATIO_THRESHOLD = 2.0;
+ /// Hard memory ceiling for Rule B resize (256MB).
+ static constexpr size_t MAX_HT_MEMORY_BYTES = 256ULL * 1024 * 1024;
+
void _add_limit_heap_top(ColumnRawPtrs& key_columns, size_t rows);
bool _do_limit_filter(size_t num_rows, ColumnRawPtrs& key_columns);
- void _refresh_limit_heap(size_t i, ColumnRawPtrs& key_columns);
+ Status _pre_agg_with_serialized_key(doris::Block* in_block);
- Status _pre_agg_with_serialized_key(doris::Block* in_block, doris::Block*
out_block);
- bool _should_expand_preagg_hash_tables();
-
- MOCK_FUNCTION bool _should_not_do_pre_agg(size_t rows);
-
- Status _execute_with_serialized_key(Block* block);
void _update_memusage_with_serialized_key();
Status _init_hash_method(const VExprContextSPtrs& probe_exprs);
Status _get_results_with_serialized_key(RuntimeState* state, Block* block,
bool* eos);
- void _emplace_into_hash_table(AggregateDataPtr* places, ColumnRawPtrs&
key_columns,
- const uint32_t num_rows);
- bool _emplace_into_hash_table_limit(AggregateDataPtr* places, Block* block,
- ColumnRawPtrs& key_columns, uint32_t
num_rows);
Status _create_agg_status(AggregateDataPtr data);
size_t _get_hash_table_size();
Review Comment:
This change removes the `MOCK_FUNCTION _should_not_do_pre_agg(...)` path
(and related helpers like `_should_expand_preagg_hash_tables()` /
`_memory_usage()`), but there are existing unit tests that subclass
`StreamingAggLocalState` and override/call these methods (e.g.
`be/test/exec/operator/streaming_agg_operator_test.cpp`). The PR will break the
build unless those tests (or any other subclasses) are updated to the new
adaptive-phase mechanism, or compatible hooks are kept.
##########
be/src/exec/operator/streaming_aggregation_operator.cpp:
##########
@@ -1010,14 +980,18 @@ Status StreamingAggLocalState::close(RuntimeState*
state) {
Status StreamingAggOperatorX::pull(RuntimeState* state, Block* block, bool*
eos) const {
auto& local_state = get_local_state(state);
SCOPED_PEAK_MEM(&local_state._estimate_memory_usage);
- if (!local_state._pre_aggregated_block->empty()) {
- local_state._pre_aggregated_block->swap(*block);
- } else {
+ if (!local_state._output_blocks.empty()) {
+ // Pop a block from the output queue produced during the last push.
+ *block = std::move(local_state._output_blocks.front());
+ local_state._output_blocks.pop_front();
+ } else if (local_state._child_eos) {
Review Comment:
`pull()` does not set `*eos` when returning a block from `_output_blocks`.
If the caller passes in a previously-true value, this can prematurely signal
EOS while there are still queued blocks or more input to process. Set `*eos =
false` when popping from `_output_blocks`, and ensure EOS is only propagated
when draining the hash table after `_child_eos` (or when `reached_limit()` sets
it).
##########
be/src/exec/operator/streaming_aggregation_operator.cpp:
##########
@@ -199,141 +162,227 @@ Status StreamingAggLocalState::_init_hash_method(const
VExprContextSPtrs& probe_
return Status::OK();
}
-Status StreamingAggLocalState::do_pre_agg(RuntimeState* state, Block*
input_block,
- Block* output_block) {
- if (low_memory_mode()) {
- auto& p = Base::_parent->template cast<StreamingAggOperatorX>();
- p.set_low_memory_mode(state);
- }
- RETURN_IF_ERROR(_pre_agg_with_serialized_key(input_block, output_block));
-
- // pre stream agg need use _num_row_return to decide whether to do pre
stream agg
- _cur_num_rows_returned += output_block->rows();
- make_nullable_output_key(output_block);
+Status StreamingAggLocalState::do_pre_agg(RuntimeState* state, Block*
input_block) {
+ RETURN_IF_ERROR(_pre_agg_with_serialized_key(input_block));
_update_memusage_with_serialized_key();
return Status::OK();
}
-bool StreamingAggLocalState::_should_expand_preagg_hash_tables() {
- if (!_should_expand_hash_table) {
- return false;
- }
+/// Flush the hash table contents into a new Block and push it to
_output_blocks.
+/// Then clear all aggregation state: destroy agg states, clear HT, arena,
container.
+Status StreamingAggLocalState::_flush_and_reset_hash_table() {
+ SCOPED_TIMER(_flush_timer);
+ auto& p = Base::_parent->template cast<StreamingAggOperatorX>();
+ const auto key_size = _probe_expr_ctxs.size();
+ const auto agg_size = _aggregate_evaluators.size();
return std::visit(
Overload {
- [&](std::monostate& arg) -> bool {
+ [&](std::monostate& arg) -> Status {
throw doris::Exception(ErrorCode::INTERNAL_ERROR,
"uninited hash table");
- return false;
+ return Status::OK();
},
- [&](auto& agg_method) -> bool {
- auto& hash_tbl = *agg_method.hash_table;
- auto [ht_mem, ht_rows] =
- std::pair
{hash_tbl.get_buffer_size_in_bytes(), hash_tbl.size()};
-
- // Need some rows in tables to have valid statistics.
- if (ht_rows == 0) {
- return true;
+ [&](auto& agg_method) -> Status {
+ auto& data = *agg_method.hash_table;
+ const auto ht_size = data.size();
+ if (ht_size == 0) {
+ return Status::OK();
}
- // Find the appropriate reduction factor in our table
for the current hash table sizes.
- int cache_level = 0;
- while (cache_level + 1 <
STREAMING_HT_MIN_REDUCTION_SIZE &&
- ht_mem >=
STREAMING_HT_MIN_REDUCTION[cache_level + 1].min_ht_mem) {
- ++cache_level;
+ _abandoned_count += ht_size;
+
+ using KeyType =
std::decay_t<decltype(agg_method)>::Key;
+
+ MutableColumns key_columns;
+ for (size_t i = 0; i < key_size; ++i) {
+ key_columns.emplace_back(
+
_probe_expr_ctxs[i]->root()->data_type()->create_column());
+ }
+
+ MutableColumns value_columns(agg_size);
+ DataTypes value_data_types(agg_size);
+ for (size_t i = 0; i < agg_size; ++i) {
+ value_data_types[i] =
+
_aggregate_evaluators[i]->function()->get_serialized_type();
+ value_columns[i] =
+
_aggregate_evaluators[i]->function()->create_serialize_column();
+ }
+
+ // Iterate through the AggregateDataContainer to
extract keys and values.
+ std::vector<KeyType> keys(ht_size);
+ std::vector<AggregateDataPtr> values(ht_size);
+
+ uint32_t num_rows = 0;
+ auto iter = _aggregate_data_container->begin();
+ while (iter != _aggregate_data_container->end() &&
num_rows < ht_size) {
+ keys[num_rows] = iter.template get_key<KeyType>();
+ values[num_rows] = iter.get_aggregate_data();
+ ++iter;
+ ++num_rows;
+ }
+
+ agg_method.insert_keys_into_columns(keys, key_columns,
num_rows);
+
+ // Handle null key if present
+ if (data.has_null_key_data()) {
+ DCHECK(key_columns.size() == 1);
+ DCHECK(key_columns[0]->is_nullable());
+ key_columns[0]->insert_data(nullptr, 0);
+ values.push_back(data.template
get_null_key_data<AggregateDataPtr>());
+ ++num_rows;
}
- // Compare the number of rows in the hash table with
the number of input rows that
- // were aggregated into it. Exclude passed through
rows from this calculation since
- // they were not in hash tables.
- const int64_t input_rows = _input_num_rows;
- const int64_t aggregated_input_rows = input_rows -
_cur_num_rows_returned;
- // TODO chenhao
- // const int64_t expected_input_rows =
estimated_input_cardinality_ - num_rows_returned_;
- double current_reduction =
static_cast<double>(aggregated_input_rows) /
-
static_cast<double>(ht_rows);
-
- // TODO: workaround for IMPALA-2490: subplan node
rows_returned counter may be
- // inaccurate, which could lead to a divide by zero
below.
- if (aggregated_input_rows <= 0) {
- return true;
+ // Serialize aggregate states to columns
+ for (size_t i = 0; i < agg_size; ++i) {
+
_aggregate_evaluators[i]->function()->serialize_to_column(
+ values, p._offsets_of_aggregate_states[i],
value_columns[i],
+ num_rows);
}
- // Extrapolate the current reduction factor (r) using
the formula
- // R = 1 + (N / n) * (r - 1), where R is the reduction
factor over the full input data
- // set, N is the number of input rows, excluding
passed-through rows, and n is the
- // number of rows inserted or merged into the hash
tables. This is a very rough
- // approximation but is good enough to be useful.
- // TODO: consider collecting more statistics to better
estimate reduction.
- // double estimated_reduction = aggregated_input_rows
>= expected_input_rows
- // ? current_reduction
- // : 1 + (expected_input_rows /
aggregated_input_rows) * (current_reduction - 1);
- double min_reduction =
-
STREAMING_HT_MIN_REDUCTION[cache_level].streaming_ht_min_reduction;
-
- // COUNTER_SET(preagg_estimated_reduction_,
estimated_reduction);
- // COUNTER_SET(preagg_streaming_ht_min_reduction_,
min_reduction);
- // return estimated_reduction > min_reduction;
- _should_expand_hash_table = current_reduction >
min_reduction;
- return _should_expand_hash_table;
+ // Build the output block
+ ColumnsWithTypeAndName columns_with_schema;
+ for (size_t i = 0; i < key_size; ++i) {
+ columns_with_schema.emplace_back(
+ std::move(key_columns[i]),
+ _probe_expr_ctxs[i]->root()->data_type(),
+ _probe_expr_ctxs[i]->root()->expr_name());
+ }
+ for (size_t i = 0; i < agg_size; ++i) {
+
columns_with_schema.emplace_back(std::move(value_columns[i]),
+
value_data_types[i], "");
+ }
+ Block flush_block(columns_with_schema);
+ make_nullable_output_key(&flush_block);
+ _output_blocks.push_back(std::move(flush_block));
+
+ // Destroy all aggregate states
+ data.for_each_mapped([&](auto& mapped) {
+ if (mapped) {
+ _destroy_agg_status(mapped);
+ mapped = nullptr;
+ }
+ });
+ if (data.has_null_key_data()) {
+ _destroy_agg_status(
+ data.template
get_null_key_data<AggregateDataPtr>());
+ }
+
+ // Clear the hash table (keeps capacity/buffer
allocated)
+ data.clear();
+
+ // Reset arena and aggregate data container
+ _agg_arena_pool.clear();
+ _reset_aggregate_data_container();
+
+ return Status::OK();
}},
_agg_data->method_variant);
}
-size_t StreamingAggLocalState::_memory_usage() const {
- size_t usage = 0;
- usage += _agg_arena_pool.size();
-
- if (_aggregate_data_container) {
- usage += _aggregate_data_container->memory_usage();
- }
-
+void StreamingAggLocalState::_reset_aggregate_data_container() {
+ auto& p = Base::_parent->template cast<StreamingAggOperatorX>();
std::visit(Overload {[&](std::monostate& arg) -> void {
throw doris::Exception(ErrorCode::INTERNAL_ERROR,
"uninited hash table");
},
[&](auto& agg_method) {
- usage +=
agg_method.hash_table->get_buffer_size_in_bytes();
+ using HashTableType =
std::decay_t<decltype(agg_method)>;
+ using KeyType = typename HashTableType::Key;
+ _aggregate_data_container =
std::make_unique<AggregateDataContainer>(
+ sizeof(KeyType),
((p._total_size_of_aggregate_states +
+
p._align_aggregate_states - 1) /
+
p._align_aggregate_states) *
+
p._align_aggregate_states);
}},
_agg_data->method_variant);
+}
- return usage;
+void StreamingAggLocalState::_check_adaptive_decision() {
+ DCHECK(_sink_count >= ADAPTIVITY_THRESHOLD);
+ DCHECK(_hll != nullptr);
+
+ const int64_t hll_count = _hll->estimate_cardinality();
+ const double cardinality_ratio =
+ static_cast<double>(hll_count) /
static_cast<double>(ADAPTIVITY_THRESHOLD);
+
+ // Rule A: extremely high cardinality → pass-through
+ if (cardinality_ratio > HIGH_CARDINALITY_RATIO) {
+ _adaptive_phase = AdaptivePhase::PASS_THROUGH;
+ _hll.reset();
+ return;
+ }
+
+ // Rule B: hash table capacity bottleneck → resize
+ if (hll_count > 0 && static_cast<double>(_abandoned_count) /
static_cast<double>(hll_count) >
+ ABANDON_RATIO_THRESHOLD) {
+ _adaptive_phase = AdaptivePhase::RESIZED;
+ _hll.reset();
+
+ // Calculate target capacity: hll_count * 1.5, capped by
MAX_HT_MEMORY_BYTES.
+ const auto target_elements = static_cast<size_t>(double(hll_count) *
1.5);
+ std::visit(Overload {[&](std::monostate& arg) -> void {
+ throw
doris::Exception(ErrorCode::INTERNAL_ERROR,
+ "uninited hash table");
+ },
+ [&](auto& agg_method) -> void {
+ using HashMapType =
+ typename
std::decay_t<decltype(agg_method)>::HashMapType;
+ // Estimate memory for the target capacity.
+ // PHHashMap slot_type size gives us a rough
per-slot memory cost.
+ const size_t per_slot_bytes =
+ sizeof(typename
HashMapType::value_type) +
+ 1 /* overhead */;
+ // PHHashMap reserves capacity rounded up by
phmap internally.
+ size_t capped_elements = target_elements;
+ if (capped_elements * per_slot_bytes >
MAX_HT_MEMORY_BYTES) {
+ capped_elements = MAX_HT_MEMORY_BYTES /
per_slot_bytes;
+ }
+
agg_method.hash_table->reserve(capped_elements);
+ }},
+ _agg_data->method_variant);
Review Comment:
In `_check_adaptive_decision()` Rule B, calling
`agg_method.hash_table->reserve(capped_elements)` will rehash/resize the
underlying hash table while it already contains aggregated data. This is fine
for PHHashMap, but for StringHashMap the newly added `reserve()` is destructive
(it re-initializes buffers), which would drop partial aggregation state and
corrupt results. Please guard this resize to only safe hash table types, or
flush/reset the current HT before reserving a new capacity, or implement a
non-destructive reserve for StringHashMap.
##########
be/src/exec/operator/streaming_aggregation_operator.cpp:
##########
@@ -199,141 +162,227 @@ Status StreamingAggLocalState::_init_hash_method(const
VExprContextSPtrs& probe_
return Status::OK();
}
-Status StreamingAggLocalState::do_pre_agg(RuntimeState* state, Block*
input_block,
- Block* output_block) {
- if (low_memory_mode()) {
- auto& p = Base::_parent->template cast<StreamingAggOperatorX>();
- p.set_low_memory_mode(state);
- }
- RETURN_IF_ERROR(_pre_agg_with_serialized_key(input_block, output_block));
-
- // pre stream agg need use _num_row_return to decide whether to do pre
stream agg
- _cur_num_rows_returned += output_block->rows();
- make_nullable_output_key(output_block);
+Status StreamingAggLocalState::do_pre_agg(RuntimeState* state, Block*
input_block) {
+ RETURN_IF_ERROR(_pre_agg_with_serialized_key(input_block));
_update_memusage_with_serialized_key();
return Status::OK();
}
-bool StreamingAggLocalState::_should_expand_preagg_hash_tables() {
- if (!_should_expand_hash_table) {
- return false;
- }
+/// Flush the hash table contents into a new Block and push it to
_output_blocks.
+/// Then clear all aggregation state: destroy agg states, clear HT, arena,
container.
+Status StreamingAggLocalState::_flush_and_reset_hash_table() {
+ SCOPED_TIMER(_flush_timer);
+ auto& p = Base::_parent->template cast<StreamingAggOperatorX>();
+ const auto key_size = _probe_expr_ctxs.size();
+ const auto agg_size = _aggregate_evaluators.size();
return std::visit(
Overload {
- [&](std::monostate& arg) -> bool {
+ [&](std::monostate& arg) -> Status {
throw doris::Exception(ErrorCode::INTERNAL_ERROR,
"uninited hash table");
- return false;
+ return Status::OK();
},
- [&](auto& agg_method) -> bool {
- auto& hash_tbl = *agg_method.hash_table;
- auto [ht_mem, ht_rows] =
- std::pair
{hash_tbl.get_buffer_size_in_bytes(), hash_tbl.size()};
-
- // Need some rows in tables to have valid statistics.
- if (ht_rows == 0) {
- return true;
+ [&](auto& agg_method) -> Status {
+ auto& data = *agg_method.hash_table;
+ const auto ht_size = data.size();
+ if (ht_size == 0) {
+ return Status::OK();
}
- // Find the appropriate reduction factor in our table
for the current hash table sizes.
- int cache_level = 0;
- while (cache_level + 1 <
STREAMING_HT_MIN_REDUCTION_SIZE &&
- ht_mem >=
STREAMING_HT_MIN_REDUCTION[cache_level + 1].min_ht_mem) {
- ++cache_level;
+ _abandoned_count += ht_size;
+
+ using KeyType =
std::decay_t<decltype(agg_method)>::Key;
+
+ MutableColumns key_columns;
+ for (size_t i = 0; i < key_size; ++i) {
+ key_columns.emplace_back(
+
_probe_expr_ctxs[i]->root()->data_type()->create_column());
+ }
+
+ MutableColumns value_columns(agg_size);
+ DataTypes value_data_types(agg_size);
+ for (size_t i = 0; i < agg_size; ++i) {
+ value_data_types[i] =
+
_aggregate_evaluators[i]->function()->get_serialized_type();
+ value_columns[i] =
+
_aggregate_evaluators[i]->function()->create_serialize_column();
+ }
+
+ // Iterate through the AggregateDataContainer to
extract keys and values.
+ std::vector<KeyType> keys(ht_size);
+ std::vector<AggregateDataPtr> values(ht_size);
+
+ uint32_t num_rows = 0;
+ auto iter = _aggregate_data_container->begin();
+ while (iter != _aggregate_data_container->end() &&
num_rows < ht_size) {
+ keys[num_rows] = iter.template get_key<KeyType>();
+ values[num_rows] = iter.get_aggregate_data();
+ ++iter;
+ ++num_rows;
+ }
+
+ agg_method.insert_keys_into_columns(keys, key_columns,
num_rows);
+
+ // Handle null key if present
+ if (data.has_null_key_data()) {
+ DCHECK(key_columns.size() == 1);
+ DCHECK(key_columns[0]->is_nullable());
+ key_columns[0]->insert_data(nullptr, 0);
+ values.push_back(data.template
get_null_key_data<AggregateDataPtr>());
+ ++num_rows;
}
Review Comment:
When `data.has_null_key_data()` is true, `values.push_back(...)` appends the
null-key aggregate state at the end, but `num_rows` is incremented and
serialization uses the first `num_rows` entries. Because `values` was pre-sized
to `ht_size`, this leaves a “gap” (the old `values[ht_size-1]`) that is never
initialized yet gets serialized. Prefer assigning the null-key value into
`values[num_rows]` before incrementing, or shrink `values` to `num_rows` before
`push_back()`.
##########
be/src/exec/common/hash_table/string_hash_map.h:
##########
@@ -155,5 +155,31 @@ class StringHashMap : public
StringHashTable<StringHashMapSubMaps<TMapped, Alloc
return nullptr;
}
bool has_null_key_data() const { return false; }
+
+ void reserve(size_t num_elements) {
+ this->m1.init_buf_size(num_elements / 5);
+ this->m2.init_buf_size(num_elements / 5);
+ this->m3.init_buf_size(num_elements / 5);
+ this->m4.init_buf_size(num_elements / 5);
+ this->ms.init_buf_size(num_elements / 5);
+ }
Review Comment:
`StringHashMap::reserve()` is implemented via `init_buf_size()` on each
sub-map, which frees and reallocates the underlying buffers (see
HashMapTable::init_buf_size), destroying any existing entries. That makes
`reserve()` unsafe to call on a non-empty map, but the adaptive resize path
calls `hash_table->reserve(...)` after data has already been inserted. Consider
either (a) implementing a non-destructive reserve/rehash for StringHashMap, or
(b) renaming this API to something like `init_capacity()` and ensuring it is
only called on empty maps (and flushing/resetting before any resize).
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]