github-actions[bot] commented on code in PR #61535:
URL: https://github.com/apache/doris/pull/61535#discussion_r3044480697
##########
be/src/storage/iterator/vcollect_iterator.cpp:
##########
@@ -490,10 +497,56 @@ Status VCollectIterator::_topn_next(Block* block) {
<< " mutable_block.rows()=" << mutable_block.rows();
*block = mutable_block.to_block();
+ // If byte budget is active and the result exceeds it, split into chunks.
+ size_t preferred_bytes = _reader->preferred_block_size_bytes();
+ if (preferred_bytes > 0 && block->rows() > 1 && block->bytes() >
preferred_bytes) {
+ _topn_result_block = std::move(*block);
+ _topn_result_offset = 0;
+ return _topn_next_chunk(block);
+ }
+
_topn_eof = true;
return block->rows() > 0 ? Status::OK() : Status::Error<END_OF_FILE>("");
}
+Status VCollectIterator::_topn_next_chunk(Block* block) {
+ size_t total_rows = _topn_result_block.rows();
+ if (_topn_result_offset >= total_rows) {
+ _topn_result_block.clear();
+ _topn_eof = true;
+ return Status::Error<END_OF_FILE>("");
+ }
+
+ size_t remaining = total_rows - _topn_result_offset;
+ size_t chunk_rows = remaining;
+
+ // Cap by batch_max_rows.
+ chunk_rows = std::min(chunk_rows, (size_t)_reader->batch_max_rows());
+
+ // Cap by byte budget.
Review Comment:
This still does not guarantee the chunk respects preferred_block_size_bytes.
chunk_rows is derived from the full result average row size, so a skewed TopN
result, for example a few very wide rows followed by many narrow rows, can
produce a first chunk whose real bytes are far above the budget even though the
average says it is safe. If this path is supposed to enforce the byte cap, it
needs a byte-aware cut over the actual rows being copied, not a global average.
##########
be/src/exec/operator/nested_loop_join_probe_operator.cpp:
##########
@@ -194,7 +194,10 @@ void
NestedLoopJoinProbeLocalState::_generate_block_base_probe(RuntimeState* sta
return build_blocks[_current_build_pos].rows();
};
- while (_join_block.rows() + add_rows() <= state->batch_size()) {
+ const auto block_max_bytes = state->block_max_bytes();
+ size_t effective_max_rows = state->block_max_rows();
+ bool bytes_estimated = false;
Review Comment:
effective_max_rows only becomes byte-aware after the first
process_probe_block() or process_build_block() append. That means the first
iteration can still materialize a full build or probe chunk based only on row
count and blow far past preferred_block_size_bytes on wide rows. The same late
adjustment pattern is duplicated in _generate_block_base_build(), so this is
still not a real byte-budget bound for nested-loop join output.
##########
be/src/exec/operator/distinct_streaming_aggregation_operator.cpp:
##########
@@ -207,16 +208,31 @@ Status
DistinctStreamingAggLocalState::_distinct_pre_agg_with_serialized_key(
}
} else {
DCHECK_EQ(_cache_block.rows(), 0);
- // is output row > batch_size, split some to cache_block
- if (out_block->rows() + _distinct_row.size() > batch_size) {
- size_t split_size = batch_size - out_block->rows();
+ // Compute maximum additional rows for out_block respecting both
+ // row limit (batch_size) and byte budget (block_max_bytes).
+ size_t max_rows_to_add = (out_block->rows() < (size_t)batch_size)
+ ? ((size_t)batch_size -
out_block->rows())
+ : 0;
+ if (block_max_bytes > 0 && out_block->rows() > 0) {
+ size_t current_bytes = out_block->bytes();
+ if (current_bytes >= block_max_bytes) {
+ max_rows_to_add = 0;
+ } else {
+ size_t avg_row_bytes = current_bytes / out_block->rows();
+ if (avg_row_bytes > 0) {
+ size_t rows_for_budget = (block_max_bytes -
current_bytes) / avg_row_bytes;
Review Comment:
Using out_block->bytes() / out_block->rows() here does not actually bound
the bytes added by the rows in _distinct_row. If the buffered rows are narrow
but the next distinct key is wide, such as large STRING, JSON, or VARIANT
values, rows_for_budget can still be at least 1 and this push emits a block far
above preferred_block_size_bytes. To preserve the new contract, this split
needs to account for the candidate rows bytes themselves, not just the
historical average of the already-buffered rows.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]