github-actions[bot] commented on code in PR #61617:
URL: https://github.com/apache/doris/pull/61617#discussion_r2980249873


##########
be/src/exec/scan/scanner_context.h:
##########
@@ -115,7 +115,7 @@ class ScannerContext : public 
std::enable_shared_from_this<ScannerContext>,
                    const TupleDescriptor* output_tuple_desc,
                    const RowDescriptor* output_row_descriptor,
                    const std::list<std::shared_ptr<ScannerDelegate>>& 
scanners, int64_t limit_,
-                   std::shared_ptr<Dependency> dependency
+                   std::shared_ptr<Dependency> dependency, 
std::atomic<int64_t>* shared_scan_limit
 #ifdef BE_TEST

Review Comment:
   **[Critical/Build-Breaking]** This constructor signature change adds 
`shared_scan_limit` as a new required parameter between `dependency` and the 
`BE_TEST`-only `num_parallel_instances`. However, **all 14 callsites in 
`be/test/exec/scan/scanner_context_test.cpp` were not updated**.
   
   In `BE_TEST` mode, the constructor now expects 9 arguments:
   ```
   (state, local_state, output_tuple_desc, output_row_descriptor, scanners, 
limit_, dependency, shared_scan_limit, num_parallel_instances)
   ```
   
   But the tests still pass only 8 arguments, with the 8th being an `int` 
(`parallel_tasks`) that will be implicitly matched to the 
`std::atomic<int64_t>*` parameter — which will either fail to compile or cause 
undefined behavior.
   
   The test file needs:
   1. A `std::atomic<int64_t>` member (e.g., `std::atomic<int64_t> shared_limit 
{-1};`) in the test fixture.
   2. All `create_shared` calls updated to pass `&shared_limit` before 
`parallel_tasks`.



##########
be/src/exec/scan/scanner_context.cpp:
##########
@@ -56,7 +56,8 @@ ScannerContext::ScannerContext(RuntimeState* state, 
ScanLocalStateBase* local_st
                                const TupleDescriptor* output_tuple_desc,
                                const RowDescriptor* output_row_descriptor,
                                const 
std::list<std::shared_ptr<ScannerDelegate>>& scanners,
-                               int64_t limit_, std::shared_ptr<Dependency> 
dependency
+                               int64_t limit_, std::shared_ptr<Dependency> 
dependency,
+                               std::atomic<int64_t>* shared_scan_limit
 #ifdef BE_TEST
                                ,
                                int num_parallel_instances

Review Comment:
   **[Medium/Liveness concern]** Consider this scenario:
   1. There are 10 scanners total, max concurrency = 2. Scanner A and Scanner B 
are scheduled (`_num_scheduled_scanners = 2`).
   2. Scanner A produces a block, calls `acquire_limit_quota()`, gets all 
remaining quota → `_shared_scan_limit` becomes 0. Scanner A is marked `eos`.
   3. Scanner B hasn't produced any blocks yet. It checks `remaining_limit() == 
0` at the top of the while loop (scanner_scheduler.cpp:236) → marks `eos`.
   4. Both scanners push back their tasks via `push_back_scan_task` → 
`_num_scheduled_scanners` goes to 0, `_dependency->set_ready()`.
   5. `get_block_from_queue` is called. It processes Scanner A's task 
(increments `_num_finished_scanners`). Then it tries to 
`schedule_scan_task(nullptr, l)`. Inside, `_pull_next_scan_task(nullptr, ...)` 
checks `_shared_scan_limit->load() == 0` → returns `nullptr`. No new scanners 
scheduled.
   6. Processing Scanner B's task similarly.
   7. Eventually, `_tasks_queue` is empty, `_shared_scan_limit == 0`, 
`_num_scheduled_scanners == 0` → finish condition fires. **This works 
correctly.**
   
   However, there's a subtlety: between step 4 (scanner A pushes back) and step 
4 (scanner B pushes back), `get_block_from_queue` may run and find 
`_tasks_queue` not empty (A's task) but after consuming A's block, it finds 
`_tasks_queue` is now empty AND `_num_scheduled_scanners == 1` (B still 
running). The finish condition fails. Then `_dependency->block()` fires. This 
is fine because when B eventually pushes back, it calls 
`_dependency->set_ready()`, which re-triggers `get_block_from_queue`.
   
   **The logic appears correct**, but this should have unit tests covering the 
shared limit exhaustion path to confirm.



##########
be/src/exec/scan/scanner_scheduler.cpp:
##########
@@ -232,6 +232,11 @@ void 
ScannerScheduler::_scanner_scan(std::shared_ptr<ScannerContext> ctx,
                     eos = true;
                     break;
                 }
+                // If shared limit quota is exhausted, stop scanning.
+                if (ctx->remaining_limit() == 0) {
+                    eos = true;
+                    break;

Review Comment:
   **[Nit/Performance]** This `remaining_limit() == 0` check at the top of the 
while loop is redundant with the `acquire_limit_quota` check below (lines 
280-291) for all iterations after the first. After the first block is produced, 
if `acquire_limit_quota` returns 0, the loop already breaks. This top-of-loop 
check is only useful if a *different* scanner exhausted the limit between 
iterations.
   
   This is fine for correctness and is a reasonable optimization to avoid the 
overhead of `get_block` when the limit is already exhausted. Just noting that 
it is not strictly necessary — the `acquire_limit_quota(block_rows)` at line 
281 would catch it anyway. Keep it for the early-exit benefit.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to