Copilot commented on code in PR #61104:
URL: https://github.com/apache/doris/pull/61104#discussion_r2894198509


##########
be/src/pipeline/exec/distinct_streaming_aggregation_operator.cpp:
##########
@@ -53,6 +53,17 @@ static constexpr StreamingHtMinReductionEntry 
STREAMING_HT_MIN_REDUCTION[] = {
         {.min_ht_mem = 16 * 1024 * 1024, .streaming_ht_min_reduction = 2.0},
 };
 
+static constexpr StreamingHtMinReductionEntry 
SINGLE_BE_STREAMING_HT_MIN_REDUCTION[] = {
+        // Expand up to L2 cache always.
+        {.min_ht_mem = 0, .streaming_ht_min_reduction = 0.0},
+        // Expand into L3 cache if we look like we're getting some reduction.
+        // At present, The L2 cache is generally 1024k or more
+        {.min_ht_mem = 256 * 1024, .streaming_ht_min_reduction = 5.0},

Review Comment:
   The comment says "The L2 cache is generally 1024k or more" but the 
`min_ht_mem` threshold is set to `256 * 1024` (256KB). The comment is 
inconsistent with the value. Please update the comment to match the chosen 
threshold and explain the rationale.



##########
be/src/pipeline/exec/streaming_aggregation_operator.cpp:
##########
@@ -66,13 +66,24 @@ struct StreamingHtMinReductionEntry {
 // of the machine that we're running on.
 static constexpr StreamingHtMinReductionEntry STREAMING_HT_MIN_REDUCTION[] = {
         // Expand up to L2 cache always.
-        {0, 0.0},
+        {.min_ht_mem = 0, .streaming_ht_min_reduction = 0.0},
         // Expand into L3 cache if we look like we're getting some reduction.
         // At present, The L2 cache is generally 1024k or more
-        {1024 * 1024, 1.1},
+        {.min_ht_mem = 256 * 1024, .streaming_ht_min_reduction = 1.1},
         // Expand into main memory if we're getting a significant reduction.
         // The L3 cache is generally 16MB or more
-        {16 * 1024 * 1024, 2.0},
+        {.min_ht_mem = 16 * 1024 * 1024, .streaming_ht_min_reduction = 2.0},
+};
+
+static constexpr StreamingHtMinReductionEntry 
SINGLE_BE_STREAMING_HT_MIN_REDUCTION[] = {
+        // Expand up to L2 cache always.
+        {.min_ht_mem = 0, .streaming_ht_min_reduction = 0.0},
+        // Expand into L3 cache if we look like we're getting some reduction.
+        // At present, The L2 cache is generally 1024k or more
+        {.min_ht_mem = 256 * 1024, .streaming_ht_min_reduction = 5.0},

Review Comment:
   The comment says "The L2 cache is generally 1024k or more" but the 
`min_ht_mem` threshold is set to `256 * 1024` (256KB). The comment is 
inconsistent with the value. Please update the comment to match the chosen 
threshold and explain the rationale.



##########
be/src/pipeline/exec/distinct_streaming_aggregation_operator.cpp:
##########
@@ -53,6 +53,17 @@ static constexpr StreamingHtMinReductionEntry 
STREAMING_HT_MIN_REDUCTION[] = {
         {.min_ht_mem = 16 * 1024 * 1024, .streaming_ht_min_reduction = 2.0},
 };
 
+static constexpr StreamingHtMinReductionEntry 
SINGLE_BE_STREAMING_HT_MIN_REDUCTION[] = {
+        // Expand up to L2 cache always.
+        {.min_ht_mem = 0, .streaming_ht_min_reduction = 0.0},
+        // Expand into L3 cache if we look like we're getting some reduction.
+        // At present, The L2 cache is generally 1024k or more
+        {.min_ht_mem = 256 * 1024, .streaming_ht_min_reduction = 5.0},
+        // Expand into main memory if we're getting a significant reduction.
+        // The L3 cache is generally 16MB or more
+        {.min_ht_mem = 16 * 1024 * 1024, .streaming_ht_min_reduction = 10.0},
+};
+

Review Comment:
   `STREAMING_HT_MIN_REDUCTION_SIZE` is computed from 
`STREAMING_HT_MIN_REDUCTION` but also used to bound the loop when iterating 
over `SINGLE_BE_STREAMING_HT_MIN_REDUCTION`. If the two tables ever diverge in 
size, this would cause an out-of-bounds access or premature termination. 
Consider either adding a `static_assert` that both arrays have the same size, 
or computing a separate size constant for the single-BE table.
   ```suggestion
   
   static_assert(sizeof(STREAMING_HT_MIN_REDUCTION) ==
                         sizeof(SINGLE_BE_STREAMING_HT_MIN_REDUCTION),
                 "STREAMING_HT_MIN_REDUCTION and 
SINGLE_BE_STREAMING_HT_MIN_REDUCTION "
                 "must have the same size");
   ```



##########
be/src/pipeline/exec/streaming_aggregation_operator.cpp:
##########
@@ -66,13 +66,24 @@ struct StreamingHtMinReductionEntry {
 // of the machine that we're running on.
 static constexpr StreamingHtMinReductionEntry STREAMING_HT_MIN_REDUCTION[] = {
         // Expand up to L2 cache always.
-        {0, 0.0},
+        {.min_ht_mem = 0, .streaming_ht_min_reduction = 0.0},
         // Expand into L3 cache if we look like we're getting some reduction.
         // At present, The L2 cache is generally 1024k or more
-        {1024 * 1024, 1.1},
+        {.min_ht_mem = 256 * 1024, .streaming_ht_min_reduction = 1.1},

Review Comment:
   The L2 cache `min_ht_mem` threshold was changed from `1024 * 1024` (1MB) to 
`256 * 1024` (256KB) for the non-single-backend case. This is a behavioral 
change to the default (multi-backend) streaming aggregation path that is not 
mentioned in the PR description. Additionally, the comment on the preceding 
line says "The L2 cache is generally 1024k or more", which is inconsistent with 
the new 256KB threshold. Please either update the comment to explain the 
rationale for 256KB, or confirm that this change is intentional and document it 
in the PR description.



##########
be/src/runtime/fragment_mgr.cpp:
##########
@@ -923,6 +923,9 @@ Status FragmentMgr::exec_plan_fragment(const 
TPipelineFragmentParams& params,
     if (!params.__isset.need_wait_execution_trigger || 
!params.need_wait_execution_trigger) {
         query_ctx->set_ready_to_execute_only();
     }
+    query_ctx->set_single_backend_query(params.__isset.query_options &&
+                                        
params.query_options.__isset.single_backend_query &&
+                                        
params.query_options.single_backend_query);

Review Comment:
   `set_single_backend_query` is called **after** `context->prepare()` (line 
892), but `StreamingAggLocalState` reads `is_single_backend_query()` during its 
construction inside `prepare`. At line 892, the local state is created and 
`_is_single_backend` is initialized to the default `false`. Only at line 926 is 
`set_single_backend_query` called on the `QueryContext`, but by then the local 
state has already been constructed with the wrong value. 
   
   The fix should be to move `set_single_backend_query` before the `prepare()` 
call, or read the flag directly from `_query_options` (which already contains 
the correct value since it was set by the FE before sending the params). 
Alternatively, you could read the value from `query_options` in the 
`QueryContext` constructor or in `is_single_backend_query()` itself.



##########
be/src/pipeline/exec/distinct_streaming_aggregation_operator.cpp:
##########
@@ -53,6 +53,17 @@ static constexpr StreamingHtMinReductionEntry 
STREAMING_HT_MIN_REDUCTION[] = {
         {.min_ht_mem = 16 * 1024 * 1024, .streaming_ht_min_reduction = 2.0},
 };
 
+static constexpr StreamingHtMinReductionEntry 
SINGLE_BE_STREAMING_HT_MIN_REDUCTION[] = {
+        // Expand up to L2 cache always.
+        {.min_ht_mem = 0, .streaming_ht_min_reduction = 0.0},
+        // Expand into L3 cache if we look like we're getting some reduction.
+        // At present, The L2 cache is generally 1024k or more
+        {.min_ht_mem = 256 * 1024, .streaming_ht_min_reduction = 5.0},
+        // Expand into main memory if we're getting a significant reduction.
+        // The L3 cache is generally 16MB or more
+        {.min_ht_mem = 16 * 1024 * 1024, .streaming_ht_min_reduction = 10.0},
+};

Review Comment:
   The `StreamingHtMinReductionEntry` struct, both reduction tables 
(`STREAMING_HT_MIN_REDUCTION` and `SINGLE_BE_STREAMING_HT_MIN_REDUCTION`), and 
`STREAMING_HT_MIN_REDUCTION_SIZE` are now fully duplicated between 
`streaming_aggregation_operator.cpp` and 
`distinct_streaming_aggregation_operator.cpp`. Consider extracting these into a 
shared header to avoid having to maintain two copies and keep them in sync.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to