Copilot commented on code in PR #61326:
URL: https://github.com/apache/doris/pull/61326#discussion_r2934656111
##########
be/src/exec/operator/streaming_aggregation_operator.cpp:
##########
@@ -180,65 +185,64 @@ bool
StreamingAggLocalState::_should_expand_preagg_hash_tables() {
}
return std::visit(
- Overload {
- [&](std::monostate& arg) -> bool {
- throw doris::Exception(ErrorCode::INTERNAL_ERROR,
"uninited hash table");
- return false;
- },
- [&](auto& agg_method) -> bool {
- auto& hash_tbl = *agg_method.hash_table;
- auto [ht_mem, ht_rows] =
- std::pair
{hash_tbl.get_buffer_size_in_bytes(), hash_tbl.size()};
-
- // Need some rows in tables to have valid statistics.
- if (ht_rows == 0) {
- return true;
- }
+ Overload {[&](std::monostate& arg) -> bool {
+ throw doris::Exception(ErrorCode::INTERNAL_ERROR,
"uninited hash table");
+ return false;
+ },
+ [&](auto& agg_method) -> bool {
+ auto& hash_tbl = *agg_method.hash_table;
+ auto [ht_mem, ht_rows] =
+ std::pair
{hash_tbl.get_buffer_size_in_bytes(), hash_tbl.size()};
- const auto* reduction = _is_single_backend
- ?
SINGLE_BE_STREAMING_HT_MIN_REDUCTION
- :
STREAMING_HT_MIN_REDUCTION;
+ // Need some rows in tables to have valid statistics.
+ if (ht_rows == 0) {
+ return true;
+ }
- // Find the appropriate reduction factor in our table
for the current hash table sizes.
- int cache_level = 0;
- while (cache_level + 1 <
STREAMING_HT_MIN_REDUCTION_SIZE &&
- ht_mem >= reduction[cache_level +
1].min_ht_mem) {
- ++cache_level;
- }
+ const auto* reduction = _is_single_backend
+ ?
SINGLE_BE_STREAMING_HT_MIN_REDUCTION
+ :
STREAMING_HT_MIN_REDUCTION;
Review Comment:
`const auto* reduction = _is_single_backend ?
SINGLE_BE_STREAMING_HT_MIN_REDUCTION : STREAMING_HT_MIN_REDUCTION;` mixes a
constexpr array (SINGLE_BE_*) with a `std::vector` reference
(STREAMING_HT_MIN_REDUCTION) and will not compile (conditional operator has no
common pointer type here). Additionally, SINGLE_BE_* is currently undefined in
this TU after removing `streaming_agg_min_reduction.h`. Use a consistent
representation (e.g., `std::span<const Entry>` or two `std::vector<Entry>`s)
and include/define the single-backend table using the same Entry type as the
dynamic table.
##########
be/src/exec/operator/streaming_aggregation_operator.cpp:
##########
@@ -24,18 +24,23 @@
#include "common/cast_set.h"
#include "common/compiler_util.h" // IWYU pragma: keep
-#include "exec/operator/operator.h"
-#include "exec/operator/streaming_agg_min_reduction.h"
-#include "exprs/aggregate/aggregate_function_simple_factory.h"
-#include "exprs/vectorized_agg_fn.h"
-#include "exprs/vslot_ref.h"
+#include "pipeline/exec/operator.h"
+#include "util/cpu_info.h"
+#include "vec/aggregate_functions/aggregate_function_simple_factory.h"
+#include "vec/exprs/vectorized_agg_fn.h"
+#include "vec/exprs/vslot_ref.h"
Review Comment:
`#include "pipeline/exec/operator.h"` does not exist under
`be/src/pipeline/exec/` (only `partitioned_aggregation_source_operator.h` is
present). This will fail to compile; the include should reference an existing
header (likely the previous `exec/operator/operator.h` or another correct
pipeline operator header).
##########
be/src/exec/operator/streaming_aggregation_operator.cpp:
##########
@@ -24,18 +24,23 @@
#include "common/cast_set.h"
#include "common/compiler_util.h" // IWYU pragma: keep
-#include "exec/operator/operator.h"
-#include "exec/operator/streaming_agg_min_reduction.h"
-#include "exprs/aggregate/aggregate_function_simple_factory.h"
-#include "exprs/vectorized_agg_fn.h"
-#include "exprs/vslot_ref.h"
+#include "pipeline/exec/operator.h"
+#include "util/cpu_info.h"
+#include "vec/aggregate_functions/aggregate_function_simple_factory.h"
+#include "vec/exprs/vectorized_agg_fn.h"
+#include "vec/exprs/vslot_ref.h"
namespace doris {
#include "common/compile_check_begin.h"
class RuntimeState;
} // namespace doris
-namespace doris {
+namespace doris::pipeline {
+
+using StreamingHtMinReductionEntry =
doris::CpuInfo::StreamingHtMinReductionEntry;
+static const std::vector<StreamingHtMinReductionEntry>&
STREAMING_HT_MIN_REDUCTION =
+ doris::CpuInfo::get_streaming_ht_min_reduction();
+static const size_t STREAMING_HT_MIN_REDUCTION_SIZE =
STREAMING_HT_MIN_REDUCTION.size();
StreamingAggLocalState::StreamingAggLocalState(RuntimeState* state,
OperatorXBase* parent)
Review Comment:
This .cpp switches to `namespace doris::pipeline`, but the corresponding
header `streaming_aggregation_operator.h` declares
`StreamingAggLocalState`/`StreamingAggOperatorX` in `namespace doris`. As-is,
the definitions in this file won’t match the declarations and will not
link/compile. Either keep the implementation in `namespace doris` or update the
header (and all references) to the new namespace consistently.
##########
be/src/util/cpu_info.h:
##########
@@ -148,6 +148,56 @@ class CpuInfo {
static std::string debug_string();
+ static long get_cache_size(CacheLevel level) {
+ long cache_sizes[NUM_CACHE_LEVELS];
+ long cache_line_sizes[NUM_CACHE_LEVELS];
+ _get_cache_info(cache_sizes, cache_line_sizes);
+ return cache_sizes[level];
+ }
+
+ static long get_cache_line_size(CacheLevel level) {
+ long cache_sizes[NUM_CACHE_LEVELS];
+ long cache_line_sizes[NUM_CACHE_LEVELS];
+ _get_cache_info(cache_sizes, cache_line_sizes);
+ return cache_line_sizes[level];
+ }
+
+ struct StreamingHtMinReductionEntry {
+ long min_ht_mem;
+ double streaming_ht_min_reduction;
+ };
+
+ static const std::vector<StreamingHtMinReductionEntry>&
get_streaming_ht_min_reduction() {
+ static std::vector<StreamingHtMinReductionEntry> entries;
+ static bool initialized = false;
+
+ if (!initialized) {
+ long l2_cache_size = CpuInfo::get_cache_size(CpuInfo::L2_CACHE);
+ long l3_cache_size = CpuInfo::get_cache_size(CpuInfo::L3_CACHE);
+
+ entries.push_back({.min_ht_mem = 0, .streaming_ht_min_reduction =
0.0});
+
+ if (l2_cache_size > 256 * 1024) {
+ entries.push_back(
+ {.min_ht_mem = l2_cache_size / 4,
.streaming_ht_min_reduction = 1.1});
+ } else {
+ entries.push_back({.min_ht_mem = 256 * 1024,
.streaming_ht_min_reduction = 1.1});
+ }
+
+ if (l3_cache_size > 4 * 1024 * 1024) {
+ entries.push_back(
+ {.min_ht_mem = l3_cache_size / 2,
.streaming_ht_min_reduction = 2.0});
+ } else {
+ entries.push_back(
+ {.min_ht_mem = 16 * 1024 * 1024,
.streaming_ht_min_reduction = 2.0});
+ }
+
+ initialized = true;
+ }
Review Comment:
get_streaming_ht_min_reduction() uses a non-atomic static `initialized` flag
guarding mutation of a static `entries` vector. This is not thread-safe:
concurrent calls can race and push duplicate entries or corrupt the vector.
Prefer thread-safe one-time initialization (e.g., function-local `static const
auto entries = ...;` initialized via a lambda, or `std::call_once`) and return
a reference to that fully-initialized container.
##########
be/src/util/cpu_info.h:
##########
@@ -148,6 +148,56 @@ class CpuInfo {
static std::string debug_string();
+ static long get_cache_size(CacheLevel level) {
+ long cache_sizes[NUM_CACHE_LEVELS];
+ long cache_line_sizes[NUM_CACHE_LEVELS];
+ _get_cache_info(cache_sizes, cache_line_sizes);
+ return cache_sizes[level];
+ }
+
+ static long get_cache_line_size(CacheLevel level) {
+ long cache_sizes[NUM_CACHE_LEVELS];
+ long cache_line_sizes[NUM_CACHE_LEVELS];
+ _get_cache_info(cache_sizes, cache_line_sizes);
+ return cache_line_sizes[level];
+ }
+
+ struct StreamingHtMinReductionEntry {
+ long min_ht_mem;
+ double streaming_ht_min_reduction;
+ };
+
+ static const std::vector<StreamingHtMinReductionEntry>&
get_streaming_ht_min_reduction() {
+ static std::vector<StreamingHtMinReductionEntry> entries;
Review Comment:
CpuInfo introduces a new `StreamingHtMinReductionEntry` type, but an
identically-named entry struct already exists in
`be/src/exec/operator/streaming_agg_min_reduction.h`. This duplication is
likely to cause type mismatches/ambiguous usage across TUs (and the operator
code currently mixes both). Consider reusing the existing struct (or moving it
to a shared header) so both the dynamic and single-backend tables share the
same type.
##########
be/src/util/cpu_info.h:
##########
@@ -148,6 +148,56 @@ class CpuInfo {
static std::string debug_string();
+ static long get_cache_size(CacheLevel level) {
+ long cache_sizes[NUM_CACHE_LEVELS];
+ long cache_line_sizes[NUM_CACHE_LEVELS];
+ _get_cache_info(cache_sizes, cache_line_sizes);
+ return cache_sizes[level];
+ }
+
+ static long get_cache_line_size(CacheLevel level) {
+ long cache_sizes[NUM_CACHE_LEVELS];
+ long cache_line_sizes[NUM_CACHE_LEVELS];
+ _get_cache_info(cache_sizes, cache_line_sizes);
+ return cache_line_sizes[level];
+ }
Review Comment:
The new public `get_cache_size()` / `get_cache_line_size()` wrappers expose
cache info that `_get_cache_info()` explicitly notes can be unreliable (e.g.,
returning 0 on some systems). Since these are now part of the public CpuInfo
API, it would be helpful to document that callers must handle 0/unknown values
(and possibly that values may be best-effort only).
##########
be/src/exec/operator/distinct_streaming_aggregation_operator.cpp:
##########
@@ -77,65 +82,64 @@ bool
DistinctStreamingAggLocalState::_should_expand_preagg_hash_tables() {
}
return std::visit(
- Overload {
- [&](std::monostate& arg) -> bool {
- throw doris::Exception(ErrorCode::INTERNAL_ERROR,
"uninited hash table");
- return false;
- },
- [&](auto& agg_method) -> bool {
- auto& hash_tbl = *agg_method.hash_table;
- auto [ht_mem, ht_rows] =
- std::pair
{hash_tbl.get_buffer_size_in_bytes(), hash_tbl.size()};
-
- // Need some rows in tables to have valid statistics.
- if (ht_rows == 0) {
- return true;
- }
-
- const auto* reduction = _is_single_backend
- ?
SINGLE_BE_STREAMING_HT_MIN_REDUCTION
- :
STREAMING_HT_MIN_REDUCTION;
-
- // Find the appropriate reduction factor in our table
for the current hash table sizes.
- int cache_level = 0;
- while (cache_level + 1 <
STREAMING_HT_MIN_REDUCTION_SIZE &&
- ht_mem >= reduction[cache_level +
1].min_ht_mem) {
- ++cache_level;
- }
-
- // Compare the number of rows in the hash table with
the number of input rows that
- // were aggregated into it. Exclude passed through
rows from this calculation since
- // they were not in hash tables.
- const int64_t input_rows = _input_num_rows;
- const int64_t aggregated_input_rows = input_rows -
_num_rows_returned;
- // TODO chenhao
- // const int64_t expected_input_rows =
estimated_input_cardinality_ - num_rows_returned_;
- double current_reduction =
static_cast<double>(aggregated_input_rows) /
-
static_cast<double>(ht_rows);
-
- // TODO: workaround for IMPALA-2490: subplan node
rows_returned counter may be
- // inaccurate, which could lead to a divide by zero
below.
- if (aggregated_input_rows <= 0) {
- return true;
- }
-
- // Extrapolate the current reduction factor (r) using
the formula
- // R = 1 + (N / n) * (r - 1), where R is the reduction
factor over the full input data
- // set, N is the number of input rows, excluding
passed-through rows, and n is the
- // number of rows inserted or merged into the hash
tables. This is a very rough
- // approximation but is good enough to be useful.
- // TODO: consider collecting more statistics to better
estimate reduction.
- // double estimated_reduction = aggregated_input_rows
>= expected_input_rows
- // ? current_reduction
- // : 1 + (expected_input_rows /
aggregated_input_rows) * (current_reduction - 1);
- double min_reduction =
reduction[cache_level].streaming_ht_min_reduction;
-
- // COUNTER_SET(preagg_estimated_reduction_,
estimated_reduction);
- // COUNTER_SET(preagg_streaming_ht_min_reduction_,
min_reduction);
- // return estimated_reduction > min_reduction;
- _should_expand_hash_table = current_reduction >
min_reduction;
- return _should_expand_hash_table;
- }},
+ Overload {[&](std::monostate& arg) -> bool {
+ throw doris::Exception(ErrorCode::INTERNAL_ERROR,
"uninited hash table");
+ return false;
+ },
+ [&](auto& agg_method) -> bool {
+ auto& hash_tbl = *agg_method.hash_table;
+ auto [ht_mem, ht_rows] =
+ std::pair
{hash_tbl.get_buffer_size_in_bytes(), hash_tbl.size()};
+
+ // Need some rows in tables to have valid statistics.
+ if (ht_rows == 0) {
+ return true;
+ }
+
+ const auto* reduction = _is_single_backend
+ ?
SINGLE_BE_STREAMING_HT_MIN_REDUCTION
+ :
STREAMING_HT_MIN_REDUCTION;
+
+ // Find the appropriate reduction factor in our
table for the current hash table sizes.
+ int cache_level = 0;
+ while (cache_level + 1 <
STREAMING_HT_MIN_REDUCTION_SIZE &&
+ ht_mem >= reduction[cache_level +
1].min_ht_mem) {
Review Comment:
`const auto* reduction = _is_single_backend ?
SINGLE_BE_STREAMING_HT_MIN_REDUCTION : STREAMING_HT_MIN_REDUCTION;` has two
compile issues: (1) `SINGLE_BE_STREAMING_HT_MIN_REDUCTION` is no longer
included/defined in this TU after removing `streaming_agg_min_reduction.h`, and
(2) the conditional operator mixes an array and a `std::vector` reference, so
there is no valid common pointer type. Use a consistent container/type for both
branches (e.g., `std::span<const Entry>` or `std::vector<Entry>`), and ensure
the single-backend table is available with the same `Entry` type.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]