Copilot commented on code in PR #61169:
URL: https://github.com/apache/doris/pull/61169#discussion_r2909891512


##########
be/src/exec/operator/streaming_aggregation_operator.cpp:
##########
@@ -199,141 +162,227 @@ Status StreamingAggLocalState::_init_hash_method(const 
VExprContextSPtrs& probe_
     return Status::OK();
 }
 
-Status StreamingAggLocalState::do_pre_agg(RuntimeState* state, Block* 
input_block,
-                                          Block* output_block) {
-    if (low_memory_mode()) {
-        auto& p = Base::_parent->template cast<StreamingAggOperatorX>();
-        p.set_low_memory_mode(state);
-    }
-    RETURN_IF_ERROR(_pre_agg_with_serialized_key(input_block, output_block));
-
-    // pre stream agg need use _num_row_return to decide whether to do pre 
stream agg
-    _cur_num_rows_returned += output_block->rows();
-    make_nullable_output_key(output_block);
+Status StreamingAggLocalState::do_pre_agg(RuntimeState* state, Block* 
input_block) {
+    RETURN_IF_ERROR(_pre_agg_with_serialized_key(input_block));
     _update_memusage_with_serialized_key();
     return Status::OK();
 }
 
-bool StreamingAggLocalState::_should_expand_preagg_hash_tables() {
-    if (!_should_expand_hash_table) {
-        return false;
-    }
+/// Flush the hash table contents into a new Block and push it to 
_output_blocks.
+/// Then clear all aggregation state: destroy agg states, clear HT, arena, 
container.
+Status StreamingAggLocalState::_flush_and_reset_hash_table() {
+    SCOPED_TIMER(_flush_timer);
+    auto& p = Base::_parent->template cast<StreamingAggOperatorX>();
+    const auto key_size = _probe_expr_ctxs.size();
+    const auto agg_size = _aggregate_evaluators.size();
 
     return std::visit(
             Overload {
-                    [&](std::monostate& arg) -> bool {
+                    [&](std::monostate& arg) -> Status {
                         throw doris::Exception(ErrorCode::INTERNAL_ERROR, 
"uninited hash table");
-                        return false;
+                        return Status::OK();
                     },
-                    [&](auto& agg_method) -> bool {
-                        auto& hash_tbl = *agg_method.hash_table;
-                        auto [ht_mem, ht_rows] =
-                                std::pair 
{hash_tbl.get_buffer_size_in_bytes(), hash_tbl.size()};
-
-                        // Need some rows in tables to have valid statistics.
-                        if (ht_rows == 0) {
-                            return true;
+                    [&](auto& agg_method) -> Status {
+                        auto& data = *agg_method.hash_table;
+                        const auto ht_size = data.size();
+                        if (ht_size == 0) {
+                            return Status::OK();
                         }
 
-                        // Find the appropriate reduction factor in our table 
for the current hash table sizes.
-                        int cache_level = 0;
-                        while (cache_level + 1 < 
STREAMING_HT_MIN_REDUCTION_SIZE &&
-                               ht_mem >= 
STREAMING_HT_MIN_REDUCTION[cache_level + 1].min_ht_mem) {
-                            ++cache_level;
+                        _abandoned_count += ht_size;
+
+                        using KeyType = 
std::decay_t<decltype(agg_method)>::Key;
+
+                        MutableColumns key_columns;
+                        for (size_t i = 0; i < key_size; ++i) {
+                            key_columns.emplace_back(
+                                    
_probe_expr_ctxs[i]->root()->data_type()->create_column());
+                        }
+
+                        MutableColumns value_columns(agg_size);
+                        DataTypes value_data_types(agg_size);
+                        for (size_t i = 0; i < agg_size; ++i) {
+                            value_data_types[i] =
+                                    
_aggregate_evaluators[i]->function()->get_serialized_type();
+                            value_columns[i] =
+                                    
_aggregate_evaluators[i]->function()->create_serialize_column();
+                        }
+
+                        // Iterate through the AggregateDataContainer to 
extract keys and values.
+                        std::vector<KeyType> keys(ht_size);
+                        std::vector<AggregateDataPtr> values(ht_size);
+
+                        uint32_t num_rows = 0;
+                        auto iter = _aggregate_data_container->begin();
+                        while (iter != _aggregate_data_container->end() && 
num_rows < ht_size) {
+                            keys[num_rows] = iter.template get_key<KeyType>();
+                            values[num_rows] = iter.get_aggregate_data();
+                            ++iter;
+                            ++num_rows;
+                        }
+
+                        agg_method.insert_keys_into_columns(keys, key_columns, 
num_rows);
+
+                        // Handle null key if present
+                        if (data.has_null_key_data()) {
+                            DCHECK(key_columns.size() == 1);
+                            DCHECK(key_columns[0]->is_nullable());
+                            key_columns[0]->insert_data(nullptr, 0);
+                            values.push_back(data.template 
get_null_key_data<AggregateDataPtr>());
+                            ++num_rows;
                         }
 
-                        // Compare the number of rows in the hash table with 
the number of input rows that
-                        // were aggregated into it. Exclude passed through 
rows from this calculation since
-                        // they were not in hash tables.
-                        const int64_t input_rows = _input_num_rows;
-                        const int64_t aggregated_input_rows = input_rows - 
_cur_num_rows_returned;
-                        // TODO chenhao
-                        //  const int64_t expected_input_rows = 
estimated_input_cardinality_ - num_rows_returned_;
-                        double current_reduction = 
static_cast<double>(aggregated_input_rows) /
-                                                   
static_cast<double>(ht_rows);
-
-                        // TODO: workaround for IMPALA-2490: subplan node 
rows_returned counter may be
-                        // inaccurate, which could lead to a divide by zero 
below.
-                        if (aggregated_input_rows <= 0) {
-                            return true;
+                        // Serialize aggregate states to columns
+                        for (size_t i = 0; i < agg_size; ++i) {
+                            
_aggregate_evaluators[i]->function()->serialize_to_column(
+                                    values, p._offsets_of_aggregate_states[i], 
value_columns[i],
+                                    num_rows);
                         }
 
-                        // Extrapolate the current reduction factor (r) using 
the formula
-                        // R = 1 + (N / n) * (r - 1), where R is the reduction 
factor over the full input data
-                        // set, N is the number of input rows, excluding 
passed-through rows, and n is the
-                        // number of rows inserted or merged into the hash 
tables. This is a very rough
-                        // approximation but is good enough to be useful.
-                        // TODO: consider collecting more statistics to better 
estimate reduction.
-                        //  double estimated_reduction = aggregated_input_rows 
>= expected_input_rows
-                        //      ? current_reduction
-                        //      : 1 + (expected_input_rows / 
aggregated_input_rows) * (current_reduction - 1);
-                        double min_reduction =
-                                
STREAMING_HT_MIN_REDUCTION[cache_level].streaming_ht_min_reduction;
-
-                        //  COUNTER_SET(preagg_estimated_reduction_, 
estimated_reduction);
-                        //    COUNTER_SET(preagg_streaming_ht_min_reduction_, 
min_reduction);
-                        //  return estimated_reduction > min_reduction;
-                        _should_expand_hash_table = current_reduction > 
min_reduction;
-                        return _should_expand_hash_table;
+                        // Build the output block
+                        ColumnsWithTypeAndName columns_with_schema;
+                        for (size_t i = 0; i < key_size; ++i) {
+                            columns_with_schema.emplace_back(
+                                    std::move(key_columns[i]),
+                                    _probe_expr_ctxs[i]->root()->data_type(),
+                                    _probe_expr_ctxs[i]->root()->expr_name());
+                        }
+                        for (size_t i = 0; i < agg_size; ++i) {
+                            
columns_with_schema.emplace_back(std::move(value_columns[i]),
+                                                             
value_data_types[i], "");
+                        }
+                        Block flush_block(columns_with_schema);
+                        make_nullable_output_key(&flush_block);
+                        _output_blocks.push_back(std::move(flush_block));
+

Review Comment:
   `_flush_and_reset_hash_table()` materializes the entire hash table into a 
single `flush_block` and pushes it to `_output_blocks`. With 
`MICRO_HT_CAPACITY` this block can be very large (e.g., O(1e5) rows), while 
other result paths (`_get_results_with_serialized_key`) cap output to 
`state->batch_size()`. Consider chunking the flush output into blocks of at 
most `batch_size` rows (or otherwise ensuring downstream operators can safely 
handle blocks much larger than `batch_size`).



##########
be/src/exec/operator/streaming_aggregation_operator.cpp:
##########
@@ -382,68 +432,127 @@ Status 
StreamingAggLocalState::_pre_agg_with_serialized_key(doris::Block* in_blo
                 }
             }
         }
-        bool mem_reuse = p._make_nullable_keys.empty() && 
out_block->mem_reuse();
-
-        std::vector<DataTypePtr> data_types;
-        MutableColumns value_columns;
-        for (int i = 0; i < _aggregate_evaluators.size(); ++i) {
-            auto data_type = 
_aggregate_evaluators[i]->function()->get_serialized_type();
-            if (mem_reuse) {
-                value_columns.emplace_back(
-                        std::move(*out_block->get_by_position(i + 
key_size).column).mutate());
-            } else {
-                value_columns.emplace_back(
-                        
_aggregate_evaluators[i]->function()->create_serialize_column());
-            }
-            data_types.emplace_back(data_type);
-        }
 
-        for (int i = 0; i != _aggregate_evaluators.size(); ++i) {
-            SCOPED_TIMER(_insert_values_to_column_timer);
-            
RETURN_IF_ERROR(_aggregate_evaluators[i]->streaming_agg_serialize_to_column(
-                    in_block, value_columns[i], rows, _agg_arena_pool));
-        }
+        return _output_pass_through(in_block, key_columns, rows);
+    }
 
-        if (!mem_reuse) {
-            ColumnsWithTypeAndName columns_with_schema;
-            for (int i = 0; i < key_size; ++i) {
-                
columns_with_schema.emplace_back(key_columns[i]->clone_resized(rows),
-                                                 
_probe_expr_ctxs[i]->root()->data_type(),
-                                                 
_probe_expr_ctxs[i]->root()->expr_name());
-            }
-            for (int i = 0; i < value_columns.size(); ++i) {
-                columns_with_schema.emplace_back(std::move(value_columns[i]), 
data_types[i], "");
-            }
-            out_block->swap(Block(columns_with_schema));
-        } else {
-            for (int i = 0; i < key_size; ++i) {
-                std::move(*out_block->get_by_position(i).column)
-                        .mutate()
-                        ->insert_range_from(*key_columns[i], 0, rows);
-            }
-        }
-    } else {
-        bool need_agg = true;
-        if (need_do_sort_limit != 1) {
-            _emplace_into_hash_table(_places.data(), key_columns, rows);
-        } else {
-            need_agg = _emplace_into_hash_table_limit(_places.data(), 
in_block, key_columns, rows);
-        }
+    // --- Normal aggregation path (PROBING / RESIZED / STABLE) ---
+    _places.resize(rows);
 
-        if (need_agg) {
-            for (int i = 0; i < _aggregate_evaluators.size(); ++i) {
-                RETURN_IF_ERROR(_aggregate_evaluators[i]->execute_batch_add(
-                        in_block, p._offsets_of_aggregate_states[i], 
_places.data(),
-                        _agg_arena_pool, _should_expand_hash_table));
-            }
-            if (limit > 0 && need_do_sort_limit == -1 && 
_get_hash_table_size() >= limit) {
-                need_do_sort_limit = 1;
-                build_limit_heap(_get_hash_table_size());
-            }
+    // Check if the adaptive decision needs to be made.
+    // We check before processing the current batch so that the decision takes 
effect
+    // starting from this batch.
+    const size_t sink_count_before = _sink_count;
+    _sink_count += rows;
+
+    if (_adaptive_phase == AdaptivePhase::PROBING && sink_count_before < 
ADAPTIVITY_THRESHOLD &&
+        _sink_count >= ADAPTIVITY_THRESHOLD) {
+        _check_adaptive_decision();
+
+        // If decision was pass-through, flush HT then handle this batch as 
pass-through
+        if (_adaptive_phase == AdaptivePhase::PASS_THROUGH) {
+            RETURN_IF_ERROR(_flush_and_reset_hash_table());
+            return _output_pass_through(in_block, key_columns, rows);
         }
     }
 
-    return Status::OK();
+    // Process the batch: row-by-row emplace with abandon-on-full logic.
+    return std::visit(
+            Overload {[&](std::monostate& arg) -> Status {
+                          throw doris::Exception(ErrorCode::INTERNAL_ERROR, 
"uninited hash table");
+                          return Status::OK();
+                      },
+                      [&](auto& agg_method) -> Status {
+                          SCOPED_TIMER(_hash_table_compute_timer);
+                          using HashMethodType = 
std::decay_t<decltype(agg_method)>;
+                          using AggState = typename HashMethodType::State;
+                          AggState state(key_columns);
+                          agg_method.init_serialized_keys(key_columns, rows);
+
+                          // Update HLL with hash values during PROBING phase
+                          if (_adaptive_phase == AdaptivePhase::PROBING && 
_hll) {
+                              for (uint32_t i = 0; i < rows; ++i) {
+                                  _hll->update(agg_method.hash_values[i]);
+                              }
+                          }
+
+                          auto creator = [this](const auto& ctor, auto& key, 
auto& origin) {
+                              HashMethodType::try_presis_key_and_origin(key, 
origin,
+                                                                        
_agg_arena_pool);
+                              auto mapped = 
_aggregate_data_container->append_data(origin);
+                              auto st = _create_agg_status(mapped);
+                              if (!st) {
+                                  throw Exception(st.code(), st.to_string());
+                              }
+                              ctor(key, mapped);
+                          };
+
+                          auto creator_for_null_key = [&](auto& mapped) {
+                              mapped = _agg_arena_pool.aligned_alloc(
+                                      p._total_size_of_aggregate_states, 
p._align_aggregate_states);
+                              auto st = _create_agg_status(mapped);
+                              if (!st) {
+                                  throw Exception(st.code(), st.to_string());
+                              }
+                          };
+
+                          // Initialize all places to nullptr so that
+                          // execute_batch_add_selected skips unprocessed rows.
+                          std::fill(_places.begin(), _places.begin() + rows, 
nullptr);
+
+                          {
+                              SCOPED_TIMER(_hash_table_emplace_timer);
+                              for (uint32_t i = 0; i < rows; ++i) {
+                                  auto& hash_tbl = *agg_method.hash_table;
+
+                                  // Check if HT is full before inserting 
(would-overflow check).
+                                  // If so, aggregate all pending rows, flush, 
then continue.
+                                  if (hash_tbl.add_elem_size_overflow(1)) {
+                                      // Aggregate the rows emplaced since the 
last flush.
+                                      // _places entries for unprocessed rows 
are nullptr and
+                                      // will be skipped by 
execute_batch_add_selected.
+                                      for (size_t j = 0; j < 
_aggregate_evaluators.size(); ++j) {
+                                          RETURN_IF_ERROR(
+                                                  _aggregate_evaluators[j]
+                                                          
->execute_batch_add_selected(
+                                                                  in_block,
+                                                                  
p._offsets_of_aggregate_states[j],
+                                                                  
_places.data(), _agg_arena_pool));
+                                      }
+                                      // Null out already-aggregated places so 
they won't be
+                                      // re-aggregated in subsequent calls.
+                                      for (uint32_t k = 0; k < i; ++k) {
+                                          _places[k] = nullptr;
+                                      }
+
+                                      
RETURN_IF_ERROR(_flush_and_reset_hash_table());
+                                  }
+
+                                  _places[i] = *agg_method.lazy_emplace(state, 
i, creator,
+                                                                        
creator_for_null_key);
+                              }
+                          }
+
+                          COUNTER_UPDATE(_hash_table_input_counter, rows);
+
+                          // Execute aggregate functions for remaining rows 
that haven't
+                          // been aggregated yet (non-null _places entries).
+                          for (size_t i = 0; i < _aggregate_evaluators.size(); 
++i) {
+                              
RETURN_IF_ERROR(_aggregate_evaluators[i]->execute_batch_add_selected(
+                                      in_block, 
p._offsets_of_aggregate_states[i], _places.data(),
+                                      _agg_arena_pool));
+                          }
+
+                          // Handle sort limit: init heap if we've reached the 
limit
+                          if (limit > 0 && need_do_sort_limit == -1 &&
+                              _get_hash_table_size() >= limit) {
+                              need_do_sort_limit = 1;
+                              build_limit_heap(_get_hash_table_size());
+                          }

Review Comment:
   The sort-limit path for the normal aggregation mode appears incomplete: once 
`need_do_sort_limit` becomes 1 and `build_limit_heap()` is called, subsequent 
batches are no longer filtered by `_do_limit_filter()` (and the heap is never 
refreshed for new keys). Previously this was handled via 
`_emplace_into_hash_table_limit()`/`_refresh_limit_heap()`. As-is, 
`do_sort_limit` will not actually reduce incoming rows or constrain the hash 
table, which can lead to incorrect top-N by group key behavior and/or unbounded 
HT growth. Please reintroduce the limit-filter + heap-refresh logic on the 
aggregation path when `need_do_sort_limit == 1`.



##########
be/src/exec/operator/streaming_aggregation_operator.cpp:
##########
@@ -199,141 +162,227 @@ Status StreamingAggLocalState::_init_hash_method(const 
VExprContextSPtrs& probe_
     return Status::OK();
 }
 
-Status StreamingAggLocalState::do_pre_agg(RuntimeState* state, Block* 
input_block,
-                                          Block* output_block) {
-    if (low_memory_mode()) {
-        auto& p = Base::_parent->template cast<StreamingAggOperatorX>();
-        p.set_low_memory_mode(state);
-    }
-    RETURN_IF_ERROR(_pre_agg_with_serialized_key(input_block, output_block));
-
-    // pre stream agg need use _num_row_return to decide whether to do pre 
stream agg
-    _cur_num_rows_returned += output_block->rows();
-    make_nullable_output_key(output_block);
+Status StreamingAggLocalState::do_pre_agg(RuntimeState* state, Block* 
input_block) {
+    RETURN_IF_ERROR(_pre_agg_with_serialized_key(input_block));
     _update_memusage_with_serialized_key();
     return Status::OK();
 }
 
-bool StreamingAggLocalState::_should_expand_preagg_hash_tables() {
-    if (!_should_expand_hash_table) {
-        return false;
-    }
+/// Flush the hash table contents into a new Block and push it to 
_output_blocks.
+/// Then clear all aggregation state: destroy agg states, clear HT, arena, 
container.
+Status StreamingAggLocalState::_flush_and_reset_hash_table() {
+    SCOPED_TIMER(_flush_timer);
+    auto& p = Base::_parent->template cast<StreamingAggOperatorX>();
+    const auto key_size = _probe_expr_ctxs.size();
+    const auto agg_size = _aggregate_evaluators.size();
 
     return std::visit(
             Overload {
-                    [&](std::monostate& arg) -> bool {
+                    [&](std::monostate& arg) -> Status {
                         throw doris::Exception(ErrorCode::INTERNAL_ERROR, 
"uninited hash table");
-                        return false;
+                        return Status::OK();
                     },
-                    [&](auto& agg_method) -> bool {
-                        auto& hash_tbl = *agg_method.hash_table;
-                        auto [ht_mem, ht_rows] =
-                                std::pair 
{hash_tbl.get_buffer_size_in_bytes(), hash_tbl.size()};
-
-                        // Need some rows in tables to have valid statistics.
-                        if (ht_rows == 0) {
-                            return true;
+                    [&](auto& agg_method) -> Status {
+                        auto& data = *agg_method.hash_table;
+                        const auto ht_size = data.size();
+                        if (ht_size == 0) {
+                            return Status::OK();
                         }
 
-                        // Find the appropriate reduction factor in our table 
for the current hash table sizes.
-                        int cache_level = 0;
-                        while (cache_level + 1 < 
STREAMING_HT_MIN_REDUCTION_SIZE &&
-                               ht_mem >= 
STREAMING_HT_MIN_REDUCTION[cache_level + 1].min_ht_mem) {
-                            ++cache_level;
+                        _abandoned_count += ht_size;
+
+                        using KeyType = 
std::decay_t<decltype(agg_method)>::Key;
+
+                        MutableColumns key_columns;
+                        for (size_t i = 0; i < key_size; ++i) {
+                            key_columns.emplace_back(
+                                    
_probe_expr_ctxs[i]->root()->data_type()->create_column());
+                        }
+
+                        MutableColumns value_columns(agg_size);
+                        DataTypes value_data_types(agg_size);
+                        for (size_t i = 0; i < agg_size; ++i) {
+                            value_data_types[i] =
+                                    
_aggregate_evaluators[i]->function()->get_serialized_type();
+                            value_columns[i] =
+                                    
_aggregate_evaluators[i]->function()->create_serialize_column();
+                        }
+
+                        // Iterate through the AggregateDataContainer to 
extract keys and values.
+                        std::vector<KeyType> keys(ht_size);
+                        std::vector<AggregateDataPtr> values(ht_size);
+

Review Comment:
   In _flush_and_reset_hash_table(), `ht_size` comes from `data.size()` (which 
includes the null-key row when `has_null_key_data()` is true), but the 
`keys/values` vectors are sized to `ht_size` while the AggregateDataContainer 
only stores non-null keys. This means the last slot in `values` can remain 
uninitialized in the null-key case and later gets serialized, producing invalid 
pointers / crashes. Consider sizing the vectors to the container count (e.g., 
`ht_size - has_null_key`), and when a null key exists, write it into 
`values[num_rows]` (or resize before push) so that `values[0..num_rows-1]` are 
all valid.



##########
be/src/exec/operator/streaming_aggregation_operator.h:
##########
@@ -52,27 +66,42 @@ class StreamingAggLocalState MOCK_REMOVE(final) : public 
PipelineXLocalState<Fak
     template <typename LocalStateType>
     friend class StatefulOperatorX;
 
-    size_t _memory_usage() const;
+    // --- Constants for adaptive streaming aggregation ---
+    /// Micro hash table slot capacity (2^17).
+    static constexpr size_t MICRO_HT_CAPACITY = 131072;
+    /// Row threshold for adaptive decision (2^20 = 1,048,576).
+    static constexpr size_t ADAPTIVITY_THRESHOLD = 1048576;
+    /// Rule A: cardinality ratio above which we go pass-through.
+    static constexpr double HIGH_CARDINALITY_RATIO = 0.95;
+    /// Rule B: abandoned_count / hll_count ratio above which we resize.
+    static constexpr double ABANDON_RATIO_THRESHOLD = 2.0;
+    /// Hard memory ceiling for Rule B resize (256MB).
+    static constexpr size_t MAX_HT_MEMORY_BYTES = 256ULL * 1024 * 1024;
+
     void _add_limit_heap_top(ColumnRawPtrs& key_columns, size_t rows);
     bool _do_limit_filter(size_t num_rows, ColumnRawPtrs& key_columns);
-    void _refresh_limit_heap(size_t i, ColumnRawPtrs& key_columns);
+    Status _pre_agg_with_serialized_key(doris::Block* in_block);
 
-    Status _pre_agg_with_serialized_key(doris::Block* in_block, doris::Block* 
out_block);
-    bool _should_expand_preagg_hash_tables();
-
-    MOCK_FUNCTION bool _should_not_do_pre_agg(size_t rows);
-
-    Status _execute_with_serialized_key(Block* block);
     void _update_memusage_with_serialized_key();
     Status _init_hash_method(const VExprContextSPtrs& probe_exprs);
     Status _get_results_with_serialized_key(RuntimeState* state, Block* block, 
bool* eos);
-    void _emplace_into_hash_table(AggregateDataPtr* places, ColumnRawPtrs& 
key_columns,
-                                  const uint32_t num_rows);
-    bool _emplace_into_hash_table_limit(AggregateDataPtr* places, Block* block,
-                                        ColumnRawPtrs& key_columns, uint32_t 
num_rows);
     Status _create_agg_status(AggregateDataPtr data);
     size_t _get_hash_table_size();
 

Review Comment:
   This change removes the `MOCK_FUNCTION _should_not_do_pre_agg(...)` path 
(and related helpers like `_should_expand_preagg_hash_tables()` / 
`_memory_usage()`), but there are existing unit tests that subclass 
`StreamingAggLocalState` and override/call these methods (e.g. 
`be/test/exec/operator/streaming_agg_operator_test.cpp`). The PR will break the 
build unless those tests (or any other subclasses) are updated to the new 
adaptive-phase mechanism, or compatible hooks are kept.



##########
be/src/exec/operator/streaming_aggregation_operator.cpp:
##########
@@ -1010,14 +980,18 @@ Status StreamingAggLocalState::close(RuntimeState* 
state) {
 Status StreamingAggOperatorX::pull(RuntimeState* state, Block* block, bool* 
eos) const {
     auto& local_state = get_local_state(state);
     SCOPED_PEAK_MEM(&local_state._estimate_memory_usage);
-    if (!local_state._pre_aggregated_block->empty()) {
-        local_state._pre_aggregated_block->swap(*block);
-    } else {
+    if (!local_state._output_blocks.empty()) {
+        // Pop a block from the output queue produced during the last push.
+        *block = std::move(local_state._output_blocks.front());
+        local_state._output_blocks.pop_front();
+    } else if (local_state._child_eos) {

Review Comment:
   `pull()` does not set `*eos` when returning a block from `_output_blocks`. 
If the caller passes in a previously-true value, this can prematurely signal 
EOS while there are still queued blocks or more input to process. Set `*eos = 
false` when popping from `_output_blocks`, and ensure EOS is only propagated 
when draining the hash table after `_child_eos` (or when `reached_limit()` sets 
it).



##########
be/src/exec/operator/streaming_aggregation_operator.cpp:
##########
@@ -199,141 +162,227 @@ Status StreamingAggLocalState::_init_hash_method(const 
VExprContextSPtrs& probe_
     return Status::OK();
 }
 
-Status StreamingAggLocalState::do_pre_agg(RuntimeState* state, Block* 
input_block,
-                                          Block* output_block) {
-    if (low_memory_mode()) {
-        auto& p = Base::_parent->template cast<StreamingAggOperatorX>();
-        p.set_low_memory_mode(state);
-    }
-    RETURN_IF_ERROR(_pre_agg_with_serialized_key(input_block, output_block));
-
-    // pre stream agg need use _num_row_return to decide whether to do pre 
stream agg
-    _cur_num_rows_returned += output_block->rows();
-    make_nullable_output_key(output_block);
+Status StreamingAggLocalState::do_pre_agg(RuntimeState* state, Block* 
input_block) {
+    RETURN_IF_ERROR(_pre_agg_with_serialized_key(input_block));
     _update_memusage_with_serialized_key();
     return Status::OK();
 }
 
-bool StreamingAggLocalState::_should_expand_preagg_hash_tables() {
-    if (!_should_expand_hash_table) {
-        return false;
-    }
+/// Flush the hash table contents into a new Block and push it to 
_output_blocks.
+/// Then clear all aggregation state: destroy agg states, clear HT, arena, 
container.
+Status StreamingAggLocalState::_flush_and_reset_hash_table() {
+    SCOPED_TIMER(_flush_timer);
+    auto& p = Base::_parent->template cast<StreamingAggOperatorX>();
+    const auto key_size = _probe_expr_ctxs.size();
+    const auto agg_size = _aggregate_evaluators.size();
 
     return std::visit(
             Overload {
-                    [&](std::monostate& arg) -> bool {
+                    [&](std::monostate& arg) -> Status {
                         throw doris::Exception(ErrorCode::INTERNAL_ERROR, 
"uninited hash table");
-                        return false;
+                        return Status::OK();
                     },
-                    [&](auto& agg_method) -> bool {
-                        auto& hash_tbl = *agg_method.hash_table;
-                        auto [ht_mem, ht_rows] =
-                                std::pair 
{hash_tbl.get_buffer_size_in_bytes(), hash_tbl.size()};
-
-                        // Need some rows in tables to have valid statistics.
-                        if (ht_rows == 0) {
-                            return true;
+                    [&](auto& agg_method) -> Status {
+                        auto& data = *agg_method.hash_table;
+                        const auto ht_size = data.size();
+                        if (ht_size == 0) {
+                            return Status::OK();
                         }
 
-                        // Find the appropriate reduction factor in our table 
for the current hash table sizes.
-                        int cache_level = 0;
-                        while (cache_level + 1 < 
STREAMING_HT_MIN_REDUCTION_SIZE &&
-                               ht_mem >= 
STREAMING_HT_MIN_REDUCTION[cache_level + 1].min_ht_mem) {
-                            ++cache_level;
+                        _abandoned_count += ht_size;
+
+                        using KeyType = 
std::decay_t<decltype(agg_method)>::Key;
+
+                        MutableColumns key_columns;
+                        for (size_t i = 0; i < key_size; ++i) {
+                            key_columns.emplace_back(
+                                    
_probe_expr_ctxs[i]->root()->data_type()->create_column());
+                        }
+
+                        MutableColumns value_columns(agg_size);
+                        DataTypes value_data_types(agg_size);
+                        for (size_t i = 0; i < agg_size; ++i) {
+                            value_data_types[i] =
+                                    
_aggregate_evaluators[i]->function()->get_serialized_type();
+                            value_columns[i] =
+                                    
_aggregate_evaluators[i]->function()->create_serialize_column();
+                        }
+
+                        // Iterate through the AggregateDataContainer to 
extract keys and values.
+                        std::vector<KeyType> keys(ht_size);
+                        std::vector<AggregateDataPtr> values(ht_size);
+
+                        uint32_t num_rows = 0;
+                        auto iter = _aggregate_data_container->begin();
+                        while (iter != _aggregate_data_container->end() && 
num_rows < ht_size) {
+                            keys[num_rows] = iter.template get_key<KeyType>();
+                            values[num_rows] = iter.get_aggregate_data();
+                            ++iter;
+                            ++num_rows;
+                        }
+
+                        agg_method.insert_keys_into_columns(keys, key_columns, 
num_rows);
+
+                        // Handle null key if present
+                        if (data.has_null_key_data()) {
+                            DCHECK(key_columns.size() == 1);
+                            DCHECK(key_columns[0]->is_nullable());
+                            key_columns[0]->insert_data(nullptr, 0);
+                            values.push_back(data.template 
get_null_key_data<AggregateDataPtr>());
+                            ++num_rows;
                         }
 
-                        // Compare the number of rows in the hash table with 
the number of input rows that
-                        // were aggregated into it. Exclude passed through 
rows from this calculation since
-                        // they were not in hash tables.
-                        const int64_t input_rows = _input_num_rows;
-                        const int64_t aggregated_input_rows = input_rows - 
_cur_num_rows_returned;
-                        // TODO chenhao
-                        //  const int64_t expected_input_rows = 
estimated_input_cardinality_ - num_rows_returned_;
-                        double current_reduction = 
static_cast<double>(aggregated_input_rows) /
-                                                   
static_cast<double>(ht_rows);
-
-                        // TODO: workaround for IMPALA-2490: subplan node 
rows_returned counter may be
-                        // inaccurate, which could lead to a divide by zero 
below.
-                        if (aggregated_input_rows <= 0) {
-                            return true;
+                        // Serialize aggregate states to columns
+                        for (size_t i = 0; i < agg_size; ++i) {
+                            
_aggregate_evaluators[i]->function()->serialize_to_column(
+                                    values, p._offsets_of_aggregate_states[i], 
value_columns[i],
+                                    num_rows);
                         }
 
-                        // Extrapolate the current reduction factor (r) using 
the formula
-                        // R = 1 + (N / n) * (r - 1), where R is the reduction 
factor over the full input data
-                        // set, N is the number of input rows, excluding 
passed-through rows, and n is the
-                        // number of rows inserted or merged into the hash 
tables. This is a very rough
-                        // approximation but is good enough to be useful.
-                        // TODO: consider collecting more statistics to better 
estimate reduction.
-                        //  double estimated_reduction = aggregated_input_rows 
>= expected_input_rows
-                        //      ? current_reduction
-                        //      : 1 + (expected_input_rows / 
aggregated_input_rows) * (current_reduction - 1);
-                        double min_reduction =
-                                
STREAMING_HT_MIN_REDUCTION[cache_level].streaming_ht_min_reduction;
-
-                        //  COUNTER_SET(preagg_estimated_reduction_, 
estimated_reduction);
-                        //    COUNTER_SET(preagg_streaming_ht_min_reduction_, 
min_reduction);
-                        //  return estimated_reduction > min_reduction;
-                        _should_expand_hash_table = current_reduction > 
min_reduction;
-                        return _should_expand_hash_table;
+                        // Build the output block
+                        ColumnsWithTypeAndName columns_with_schema;
+                        for (size_t i = 0; i < key_size; ++i) {
+                            columns_with_schema.emplace_back(
+                                    std::move(key_columns[i]),
+                                    _probe_expr_ctxs[i]->root()->data_type(),
+                                    _probe_expr_ctxs[i]->root()->expr_name());
+                        }
+                        for (size_t i = 0; i < agg_size; ++i) {
+                            
columns_with_schema.emplace_back(std::move(value_columns[i]),
+                                                             
value_data_types[i], "");
+                        }
+                        Block flush_block(columns_with_schema);
+                        make_nullable_output_key(&flush_block);
+                        _output_blocks.push_back(std::move(flush_block));
+
+                        // Destroy all aggregate states
+                        data.for_each_mapped([&](auto& mapped) {
+                            if (mapped) {
+                                _destroy_agg_status(mapped);
+                                mapped = nullptr;
+                            }
+                        });
+                        if (data.has_null_key_data()) {
+                            _destroy_agg_status(
+                                    data.template 
get_null_key_data<AggregateDataPtr>());
+                        }
+
+                        // Clear the hash table (keeps capacity/buffer 
allocated)
+                        data.clear();
+
+                        // Reset arena and aggregate data container
+                        _agg_arena_pool.clear();
+                        _reset_aggregate_data_container();
+
+                        return Status::OK();
                     }},
             _agg_data->method_variant);
 }
 
-size_t StreamingAggLocalState::_memory_usage() const {
-    size_t usage = 0;
-    usage += _agg_arena_pool.size();
-
-    if (_aggregate_data_container) {
-        usage += _aggregate_data_container->memory_usage();
-    }
-
+void StreamingAggLocalState::_reset_aggregate_data_container() {
+    auto& p = Base::_parent->template cast<StreamingAggOperatorX>();
     std::visit(Overload {[&](std::monostate& arg) -> void {
                              throw doris::Exception(ErrorCode::INTERNAL_ERROR,
                                                     "uninited hash table");
                          },
                          [&](auto& agg_method) {
-                             usage += 
agg_method.hash_table->get_buffer_size_in_bytes();
+                             using HashTableType = 
std::decay_t<decltype(agg_method)>;
+                             using KeyType = typename HashTableType::Key;
+                             _aggregate_data_container = 
std::make_unique<AggregateDataContainer>(
+                                     sizeof(KeyType), 
((p._total_size_of_aggregate_states +
+                                                        
p._align_aggregate_states - 1) /
+                                                       
p._align_aggregate_states) *
+                                                              
p._align_aggregate_states);
                          }},
                _agg_data->method_variant);
+}
 
-    return usage;
+void StreamingAggLocalState::_check_adaptive_decision() {
+    DCHECK(_sink_count >= ADAPTIVITY_THRESHOLD);
+    DCHECK(_hll != nullptr);
+
+    const int64_t hll_count = _hll->estimate_cardinality();
+    const double cardinality_ratio =
+            static_cast<double>(hll_count) / 
static_cast<double>(ADAPTIVITY_THRESHOLD);
+
+    // Rule A: extremely high cardinality → pass-through
+    if (cardinality_ratio > HIGH_CARDINALITY_RATIO) {
+        _adaptive_phase = AdaptivePhase::PASS_THROUGH;
+        _hll.reset();
+        return;
+    }
+
+    // Rule B: hash table capacity bottleneck → resize
+    if (hll_count > 0 && static_cast<double>(_abandoned_count) / 
static_cast<double>(hll_count) >
+                                 ABANDON_RATIO_THRESHOLD) {
+        _adaptive_phase = AdaptivePhase::RESIZED;
+        _hll.reset();
+
+        // Calculate target capacity: hll_count * 1.5, capped by 
MAX_HT_MEMORY_BYTES.
+        const auto target_elements = static_cast<size_t>(double(hll_count) * 
1.5);
+        std::visit(Overload {[&](std::monostate& arg) -> void {
+                                 throw 
doris::Exception(ErrorCode::INTERNAL_ERROR,
+                                                        "uninited hash table");
+                             },
+                             [&](auto& agg_method) -> void {
+                                 using HashMapType =
+                                         typename 
std::decay_t<decltype(agg_method)>::HashMapType;
+                                 // Estimate memory for the target capacity.
+                                 // PHHashMap slot_type size gives us a rough 
per-slot memory cost.
+                                 const size_t per_slot_bytes =
+                                         sizeof(typename 
HashMapType::value_type) +
+                                         1 /* overhead */;
+                                 // PHHashMap reserves capacity rounded up by 
phmap internally.
+                                 size_t capped_elements = target_elements;
+                                 if (capped_elements * per_slot_bytes > 
MAX_HT_MEMORY_BYTES) {
+                                     capped_elements = MAX_HT_MEMORY_BYTES / 
per_slot_bytes;
+                                 }
+                                 
agg_method.hash_table->reserve(capped_elements);
+                             }},
+                   _agg_data->method_variant);

Review Comment:
   In `_check_adaptive_decision()` Rule B, calling 
`agg_method.hash_table->reserve(capped_elements)` will rehash/resize the 
underlying hash table while it already contains aggregated data. This is fine 
for PHHashMap, but for StringHashMap the newly added `reserve()` is destructive 
(it re-initializes buffers), which would drop partial aggregation state and 
corrupt results. Please guard this resize to only safe hash table types, or 
flush/reset the current HT before reserving a new capacity, or implement a 
non-destructive reserve for StringHashMap.



##########
be/src/exec/operator/streaming_aggregation_operator.cpp:
##########
@@ -199,141 +162,227 @@ Status StreamingAggLocalState::_init_hash_method(const 
VExprContextSPtrs& probe_
     return Status::OK();
 }
 
-Status StreamingAggLocalState::do_pre_agg(RuntimeState* state, Block* 
input_block,
-                                          Block* output_block) {
-    if (low_memory_mode()) {
-        auto& p = Base::_parent->template cast<StreamingAggOperatorX>();
-        p.set_low_memory_mode(state);
-    }
-    RETURN_IF_ERROR(_pre_agg_with_serialized_key(input_block, output_block));
-
-    // pre stream agg need use _num_row_return to decide whether to do pre 
stream agg
-    _cur_num_rows_returned += output_block->rows();
-    make_nullable_output_key(output_block);
+Status StreamingAggLocalState::do_pre_agg(RuntimeState* state, Block* 
input_block) {
+    RETURN_IF_ERROR(_pre_agg_with_serialized_key(input_block));
     _update_memusage_with_serialized_key();
     return Status::OK();
 }
 
-bool StreamingAggLocalState::_should_expand_preagg_hash_tables() {
-    if (!_should_expand_hash_table) {
-        return false;
-    }
+/// Flush the hash table contents into a new Block and push it to 
_output_blocks.
+/// Then clear all aggregation state: destroy agg states, clear HT, arena, 
container.
+Status StreamingAggLocalState::_flush_and_reset_hash_table() {
+    SCOPED_TIMER(_flush_timer);
+    auto& p = Base::_parent->template cast<StreamingAggOperatorX>();
+    const auto key_size = _probe_expr_ctxs.size();
+    const auto agg_size = _aggregate_evaluators.size();
 
     return std::visit(
             Overload {
-                    [&](std::monostate& arg) -> bool {
+                    [&](std::monostate& arg) -> Status {
                         throw doris::Exception(ErrorCode::INTERNAL_ERROR, 
"uninited hash table");
-                        return false;
+                        return Status::OK();
                     },
-                    [&](auto& agg_method) -> bool {
-                        auto& hash_tbl = *agg_method.hash_table;
-                        auto [ht_mem, ht_rows] =
-                                std::pair 
{hash_tbl.get_buffer_size_in_bytes(), hash_tbl.size()};
-
-                        // Need some rows in tables to have valid statistics.
-                        if (ht_rows == 0) {
-                            return true;
+                    [&](auto& agg_method) -> Status {
+                        auto& data = *agg_method.hash_table;
+                        const auto ht_size = data.size();
+                        if (ht_size == 0) {
+                            return Status::OK();
                         }
 
-                        // Find the appropriate reduction factor in our table 
for the current hash table sizes.
-                        int cache_level = 0;
-                        while (cache_level + 1 < 
STREAMING_HT_MIN_REDUCTION_SIZE &&
-                               ht_mem >= 
STREAMING_HT_MIN_REDUCTION[cache_level + 1].min_ht_mem) {
-                            ++cache_level;
+                        _abandoned_count += ht_size;
+
+                        using KeyType = 
std::decay_t<decltype(agg_method)>::Key;
+
+                        MutableColumns key_columns;
+                        for (size_t i = 0; i < key_size; ++i) {
+                            key_columns.emplace_back(
+                                    
_probe_expr_ctxs[i]->root()->data_type()->create_column());
+                        }
+
+                        MutableColumns value_columns(agg_size);
+                        DataTypes value_data_types(agg_size);
+                        for (size_t i = 0; i < agg_size; ++i) {
+                            value_data_types[i] =
+                                    
_aggregate_evaluators[i]->function()->get_serialized_type();
+                            value_columns[i] =
+                                    
_aggregate_evaluators[i]->function()->create_serialize_column();
+                        }
+
+                        // Iterate through the AggregateDataContainer to 
extract keys and values.
+                        std::vector<KeyType> keys(ht_size);
+                        std::vector<AggregateDataPtr> values(ht_size);
+
+                        uint32_t num_rows = 0;
+                        auto iter = _aggregate_data_container->begin();
+                        while (iter != _aggregate_data_container->end() && 
num_rows < ht_size) {
+                            keys[num_rows] = iter.template get_key<KeyType>();
+                            values[num_rows] = iter.get_aggregate_data();
+                            ++iter;
+                            ++num_rows;
+                        }
+
+                        agg_method.insert_keys_into_columns(keys, key_columns, 
num_rows);
+
+                        // Handle null key if present
+                        if (data.has_null_key_data()) {
+                            DCHECK(key_columns.size() == 1);
+                            DCHECK(key_columns[0]->is_nullable());
+                            key_columns[0]->insert_data(nullptr, 0);
+                            values.push_back(data.template 
get_null_key_data<AggregateDataPtr>());
+                            ++num_rows;
                         }

Review Comment:
   When `data.has_null_key_data()` is true, `values.push_back(...)` appends the 
null-key aggregate state at the end, but `num_rows` is incremented and 
serialization uses the first `num_rows` entries. Because `values` was pre-sized 
to `ht_size`, this leaves a “gap” (the old `values[ht_size-1]`) that is never 
initialized yet gets serialized. Prefer assigning the null-key value into 
`values[num_rows]` before incrementing, or shrink `values` to `num_rows` before 
`push_back()`.



##########
be/src/exec/common/hash_table/string_hash_map.h:
##########
@@ -155,5 +155,31 @@ class StringHashMap : public 
StringHashTable<StringHashMapSubMaps<TMapped, Alloc
         return nullptr;
     }
     bool has_null_key_data() const { return false; }
+
+    void reserve(size_t num_elements) {
+        this->m1.init_buf_size(num_elements / 5);
+        this->m2.init_buf_size(num_elements / 5);
+        this->m3.init_buf_size(num_elements / 5);
+        this->m4.init_buf_size(num_elements / 5);
+        this->ms.init_buf_size(num_elements / 5);
+    }

Review Comment:
   `StringHashMap::reserve()` is implemented via `init_buf_size()` on each 
sub-map, which frees and reallocates the underlying buffers (see 
HashMapTable::init_buf_size), destroying any existing entries. That makes 
`reserve()` unsafe to call on a non-empty map, but the adaptive resize path 
calls `hash_table->reserve(...)` after data has already been inserted. Consider 
either (a) implementing a non-destructive reserve/rehash for StringHashMap, or 
(b) renaming this API to something like `init_capacity()` and ensuring it is 
only called on empty maps (and flushing/resetting before any resize).



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to