github-actions[bot] commented on code in PR #61535:
URL: https://github.com/apache/doris/pull/61535#discussion_r3037577896
##########
be/src/runtime/runtime_state.h:
##########
@@ -138,7 +138,40 @@ class RuntimeState {
const DescriptorTbl& desc_tbl() const { return *_desc_tbl; }
void set_desc_tbl(const DescriptorTbl* desc_tbl) { _desc_tbl = desc_tbl; }
+
+ MOCK_FUNCTION int block_max_rows() const {
+ return config::enable_adaptive_batch_size
+ ? std::max(_query_options.batch_size,
int(preferred_block_size_rows()))
Review Comment:
`preferred_block_size_rows` is documented in FE as an upper bound, but this
`std::max()` turns it into a lower bound whenever the session value is smaller
than `batch_size`. After this PR, operators like `SetSource`, `StreamingAgg`,
`HashJoinProbe`, etc. call `state->block_max_rows()`, so a session such as `set
batch_size = 8192; set preferred_block_size_rows = 1024;` will still emit
8192-row blocks. That looks like a behavior regression against the new
session-variable contract.
##########
be/src/exec/operator/distinct_streaming_aggregation_operator.cpp:
##########
@@ -392,9 +392,13 @@ Status DistinctStreamingAggOperatorX::pull(RuntimeState*
state, Block* block, bo
bool DistinctStreamingAggOperatorX::need_more_input_data(RuntimeState* state)
const {
auto& local_state = get_local_state(state);
- const bool need_batch = local_state._stop_emplace_flag
- ? local_state._aggregated_block->empty()
- : local_state._aggregated_block->rows() <
state->batch_size();
Review Comment:
Checking `block_max_bytes` only in `need_more_input_data()` is too late for
this operator. `_distinct_pre_agg_with_serialized_key()` can materialize an
oversized `_aggregated_block` in a single `push()`, because the split logic in
`_insert_keys_into_block()` still only enforces the row cap stored in
`batch_size`. For wide grouping keys, `pull()` will then return a block that
already exceeds `state->block_max_bytes()`.
##########
be/src/exec/sort/vsorted_run_merger.cpp:
##########
@@ -197,6 +201,14 @@ Status VSortedRunMerger::get_next(Block* output_block,
bool* eos) {
do_insert();
return Status::OK();
}
+
Review Comment:
This fixes the multi-run merge loop, but the `if (_priority_queue.size() ==
1)` fast path above still swaps the entire remaining supplier block into
`output_block` without checking `_batch_size` or `_block_max_bytes`. Once the
merge collapses to one run, sort sources can still emit an oversized block and
bypass the adaptive limit.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]