Copilot commented on code in PR #61104:
URL: https://github.com/apache/doris/pull/61104#discussion_r2894198509
##########
be/src/pipeline/exec/distinct_streaming_aggregation_operator.cpp:
##########
@@ -53,6 +53,17 @@ static constexpr StreamingHtMinReductionEntry
STREAMING_HT_MIN_REDUCTION[] = {
{.min_ht_mem = 16 * 1024 * 1024, .streaming_ht_min_reduction = 2.0},
};
+static constexpr StreamingHtMinReductionEntry
SINGLE_BE_STREAMING_HT_MIN_REDUCTION[] = {
+ // Expand up to L2 cache always.
+ {.min_ht_mem = 0, .streaming_ht_min_reduction = 0.0},
+ // Expand into L3 cache if we look like we're getting some reduction.
+ // At present, The L2 cache is generally 1024k or more
+ {.min_ht_mem = 256 * 1024, .streaming_ht_min_reduction = 5.0},
Review Comment:
The comment says "The L2 cache is generally 1024k or more" but the
`min_ht_mem` threshold is set to `256 * 1024` (256KB). The comment is
inconsistent with the value. Please update the comment to match the chosen
threshold and explain the rationale.
##########
be/src/pipeline/exec/streaming_aggregation_operator.cpp:
##########
@@ -66,13 +66,24 @@ struct StreamingHtMinReductionEntry {
// of the machine that we're running on.
static constexpr StreamingHtMinReductionEntry STREAMING_HT_MIN_REDUCTION[] = {
// Expand up to L2 cache always.
- {0, 0.0},
+ {.min_ht_mem = 0, .streaming_ht_min_reduction = 0.0},
// Expand into L3 cache if we look like we're getting some reduction.
// At present, The L2 cache is generally 1024k or more
- {1024 * 1024, 1.1},
+ {.min_ht_mem = 256 * 1024, .streaming_ht_min_reduction = 1.1},
// Expand into main memory if we're getting a significant reduction.
// The L3 cache is generally 16MB or more
- {16 * 1024 * 1024, 2.0},
+ {.min_ht_mem = 16 * 1024 * 1024, .streaming_ht_min_reduction = 2.0},
+};
+
+static constexpr StreamingHtMinReductionEntry
SINGLE_BE_STREAMING_HT_MIN_REDUCTION[] = {
+ // Expand up to L2 cache always.
+ {.min_ht_mem = 0, .streaming_ht_min_reduction = 0.0},
+ // Expand into L3 cache if we look like we're getting some reduction.
+ // At present, The L2 cache is generally 1024k or more
+ {.min_ht_mem = 256 * 1024, .streaming_ht_min_reduction = 5.0},
Review Comment:
The comment says "The L2 cache is generally 1024k or more" but the
`min_ht_mem` threshold is set to `256 * 1024` (256KB). The comment is
inconsistent with the value. Please update the comment to match the chosen
threshold and explain the rationale.
##########
be/src/pipeline/exec/distinct_streaming_aggregation_operator.cpp:
##########
@@ -53,6 +53,17 @@ static constexpr StreamingHtMinReductionEntry
STREAMING_HT_MIN_REDUCTION[] = {
{.min_ht_mem = 16 * 1024 * 1024, .streaming_ht_min_reduction = 2.0},
};
+static constexpr StreamingHtMinReductionEntry
SINGLE_BE_STREAMING_HT_MIN_REDUCTION[] = {
+ // Expand up to L2 cache always.
+ {.min_ht_mem = 0, .streaming_ht_min_reduction = 0.0},
+ // Expand into L3 cache if we look like we're getting some reduction.
+ // At present, The L2 cache is generally 1024k or more
+ {.min_ht_mem = 256 * 1024, .streaming_ht_min_reduction = 5.0},
+ // Expand into main memory if we're getting a significant reduction.
+ // The L3 cache is generally 16MB or more
+ {.min_ht_mem = 16 * 1024 * 1024, .streaming_ht_min_reduction = 10.0},
+};
+
Review Comment:
`STREAMING_HT_MIN_REDUCTION_SIZE` is computed from
`STREAMING_HT_MIN_REDUCTION` but also used to bound the loop when iterating
over `SINGLE_BE_STREAMING_HT_MIN_REDUCTION`. If the two tables ever diverge in
size, this would cause an out-of-bounds access or premature termination.
Consider either adding a `static_assert` that both arrays have the same size,
or computing a separate size constant for the single-BE table.
```suggestion
static_assert(sizeof(STREAMING_HT_MIN_REDUCTION) ==
sizeof(SINGLE_BE_STREAMING_HT_MIN_REDUCTION),
"STREAMING_HT_MIN_REDUCTION and
SINGLE_BE_STREAMING_HT_MIN_REDUCTION "
"must have the same size");
```
##########
be/src/pipeline/exec/streaming_aggregation_operator.cpp:
##########
@@ -66,13 +66,24 @@ struct StreamingHtMinReductionEntry {
// of the machine that we're running on.
static constexpr StreamingHtMinReductionEntry STREAMING_HT_MIN_REDUCTION[] = {
// Expand up to L2 cache always.
- {0, 0.0},
+ {.min_ht_mem = 0, .streaming_ht_min_reduction = 0.0},
// Expand into L3 cache if we look like we're getting some reduction.
// At present, The L2 cache is generally 1024k or more
- {1024 * 1024, 1.1},
+ {.min_ht_mem = 256 * 1024, .streaming_ht_min_reduction = 1.1},
Review Comment:
The L2 cache `min_ht_mem` threshold was changed from `1024 * 1024` (1MB) to
`256 * 1024` (256KB) for the non-single-backend case. This is a behavioral
change to the default (multi-backend) streaming aggregation path that is not
mentioned in the PR description. Additionally, the comment on the preceding
line says "The L2 cache is generally 1024k or more", which is inconsistent with
the new 256KB threshold. Please either update the comment to explain the
rationale for 256KB, or confirm that this change is intentional and document it
in the PR description.
##########
be/src/runtime/fragment_mgr.cpp:
##########
@@ -923,6 +923,9 @@ Status FragmentMgr::exec_plan_fragment(const
TPipelineFragmentParams& params,
if (!params.__isset.need_wait_execution_trigger ||
!params.need_wait_execution_trigger) {
query_ctx->set_ready_to_execute_only();
}
+ query_ctx->set_single_backend_query(params.__isset.query_options &&
+
params.query_options.__isset.single_backend_query &&
+
params.query_options.single_backend_query);
Review Comment:
`set_single_backend_query` is called **after** `context->prepare()` (line
892), but `StreamingAggLocalState` reads `is_single_backend_query()` during its
construction inside `prepare`. At line 892, the local state is created and
`_is_single_backend` is initialized to the default `false`. Only at line 926 is
`set_single_backend_query` called on the `QueryContext`, but by then the local
state has already been constructed with the wrong value.
The fix should be to move `set_single_backend_query` before the `prepare()`
call, or read the flag directly from `_query_options` (which already contains
the correct value since it was set by the FE before sending the params).
Alternatively, you could read the value from `query_options` in the
`QueryContext` constructor or in `is_single_backend_query()` itself.
##########
be/src/pipeline/exec/distinct_streaming_aggregation_operator.cpp:
##########
@@ -53,6 +53,17 @@ static constexpr StreamingHtMinReductionEntry
STREAMING_HT_MIN_REDUCTION[] = {
{.min_ht_mem = 16 * 1024 * 1024, .streaming_ht_min_reduction = 2.0},
};
+static constexpr StreamingHtMinReductionEntry
SINGLE_BE_STREAMING_HT_MIN_REDUCTION[] = {
+ // Expand up to L2 cache always.
+ {.min_ht_mem = 0, .streaming_ht_min_reduction = 0.0},
+ // Expand into L3 cache if we look like we're getting some reduction.
+ // At present, The L2 cache is generally 1024k or more
+ {.min_ht_mem = 256 * 1024, .streaming_ht_min_reduction = 5.0},
+ // Expand into main memory if we're getting a significant reduction.
+ // The L3 cache is generally 16MB or more
+ {.min_ht_mem = 16 * 1024 * 1024, .streaming_ht_min_reduction = 10.0},
+};
Review Comment:
The `StreamingHtMinReductionEntry` struct, both reduction tables
(`STREAMING_HT_MIN_REDUCTION` and `SINGLE_BE_STREAMING_HT_MIN_REDUCTION`), and
`STREAMING_HT_MIN_REDUCTION_SIZE` are now fully duplicated between
`streaming_aggregation_operator.cpp` and
`distinct_streaming_aggregation_operator.cpp`. Consider extracting these into a
shared header to avoid having to maintain two copies and keep them in sync.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]