github-actions[bot] commented on code in PR #61535:
URL: https://github.com/apache/doris/pull/61535#discussion_r3037577896


##########
be/src/runtime/runtime_state.h:
##########
@@ -138,7 +138,40 @@ class RuntimeState {
 
     const DescriptorTbl& desc_tbl() const { return *_desc_tbl; }
     void set_desc_tbl(const DescriptorTbl* desc_tbl) { _desc_tbl = desc_tbl; }
+
+    MOCK_FUNCTION int block_max_rows() const {
+        return config::enable_adaptive_batch_size
+                       ? std::max(_query_options.batch_size, 
int(preferred_block_size_rows()))

Review Comment:
   `preferred_block_size_rows` is documented in FE as an upper bound, but this 
`std::max()` turns it into a lower bound whenever the session value is smaller 
than `batch_size`. After this PR, operators like `SetSource`, `StreamingAgg`, 
`HashJoinProbe`, etc. call `state->block_max_rows()`, so a session such as `set 
batch_size = 8192; set preferred_block_size_rows = 1024;` will still emit 
8192-row blocks. That looks like a behavior regression against the new 
session-variable contract.



##########
be/src/exec/operator/distinct_streaming_aggregation_operator.cpp:
##########
@@ -392,9 +392,13 @@ Status DistinctStreamingAggOperatorX::pull(RuntimeState* 
state, Block* block, bo
 
 bool DistinctStreamingAggOperatorX::need_more_input_data(RuntimeState* state) 
const {
     auto& local_state = get_local_state(state);
-    const bool need_batch = local_state._stop_emplace_flag
-                                    ? local_state._aggregated_block->empty()
-                                    : local_state._aggregated_block->rows() < 
state->batch_size();

Review Comment:
   Checking `block_max_bytes` only in `need_more_input_data()` is too late for 
this operator. `_distinct_pre_agg_with_serialized_key()` can materialize an 
oversized `_aggregated_block` in a single `push()`, because the split logic in 
`_insert_keys_into_block()` still only enforces the row cap stored in 
`batch_size`. For wide grouping keys, `pull()` will then return a block that 
already exceeds `state->block_max_bytes()`.



##########
be/src/exec/sort/vsorted_run_merger.cpp:
##########
@@ -197,6 +201,14 @@ Status VSortedRunMerger::get_next(Block* output_block, 
bool* eos) {
                 do_insert();
                 return Status::OK();
             }
+

Review Comment:
   This fixes the multi-run merge loop, but the `if (_priority_queue.size() == 
1)` fast path above still swaps the entire remaining supplier block into 
`output_block` without checking `_batch_size` or `_block_max_bytes`. Once the 
merge collapses to one run, sort sources can still emit an oversized block and 
bypass the adaptive limit.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to