github-actions[bot] commented on code in PR #61535:
URL: https://github.com/apache/doris/pull/61535#discussion_r3044480697


##########
be/src/storage/iterator/vcollect_iterator.cpp:
##########
@@ -490,10 +497,56 @@ Status VCollectIterator::_topn_next(Block* block) {
                << " mutable_block.rows()=" << mutable_block.rows();
     *block = mutable_block.to_block();
 
+    // If byte budget is active and the result exceeds it, split into chunks.
+    size_t preferred_bytes = _reader->preferred_block_size_bytes();
+    if (preferred_bytes > 0 && block->rows() > 1 && block->bytes() > 
preferred_bytes) {
+        _topn_result_block = std::move(*block);
+        _topn_result_offset = 0;
+        return _topn_next_chunk(block);
+    }
+
     _topn_eof = true;
     return block->rows() > 0 ? Status::OK() : Status::Error<END_OF_FILE>("");
 }
 
+Status VCollectIterator::_topn_next_chunk(Block* block) {
+    size_t total_rows = _topn_result_block.rows();
+    if (_topn_result_offset >= total_rows) {
+        _topn_result_block.clear();
+        _topn_eof = true;
+        return Status::Error<END_OF_FILE>("");
+    }
+
+    size_t remaining = total_rows - _topn_result_offset;
+    size_t chunk_rows = remaining;
+
+    // Cap by batch_max_rows.
+    chunk_rows = std::min(chunk_rows, (size_t)_reader->batch_max_rows());
+
+    // Cap by byte budget.

Review Comment:
   This still does not guarantee the chunk respects preferred_block_size_bytes. 
chunk_rows is derived from the full result average row size, so a skewed TopN 
result, for example a few very wide rows followed by many narrow rows, can 
produce a first chunk whose real bytes are far above the budget even though the 
average says it is safe. If this path is supposed to enforce the byte cap, it 
needs a byte-aware cut over the actual rows being copied, not a global average.



##########
be/src/exec/operator/nested_loop_join_probe_operator.cpp:
##########
@@ -194,7 +194,10 @@ void 
NestedLoopJoinProbeLocalState::_generate_block_base_probe(RuntimeState* sta
         return build_blocks[_current_build_pos].rows();
     };
 
-    while (_join_block.rows() + add_rows() <= state->batch_size()) {
+    const auto block_max_bytes = state->block_max_bytes();
+    size_t effective_max_rows = state->block_max_rows();
+    bool bytes_estimated = false;

Review Comment:
   effective_max_rows only becomes byte-aware after the first 
process_probe_block() or process_build_block() append. That means the first 
iteration can still materialize a full build or probe chunk based only on row 
count and blow far past preferred_block_size_bytes on wide rows. The same late 
adjustment pattern is duplicated in _generate_block_base_build(), so this is 
still not a real byte-budget bound for nested-loop join output.



##########
be/src/exec/operator/distinct_streaming_aggregation_operator.cpp:
##########
@@ -207,16 +208,31 @@ Status 
DistinctStreamingAggLocalState::_distinct_pre_agg_with_serialized_key(
             }
         } else {
             DCHECK_EQ(_cache_block.rows(), 0);
-            // is output row > batch_size, split some to cache_block
-            if (out_block->rows() + _distinct_row.size() > batch_size) {
-                size_t split_size = batch_size - out_block->rows();
+            // Compute maximum additional rows for out_block respecting both
+            // row limit (batch_size) and byte budget (block_max_bytes).
+            size_t max_rows_to_add = (out_block->rows() < (size_t)batch_size)
+                                             ? ((size_t)batch_size - 
out_block->rows())
+                                             : 0;
+            if (block_max_bytes > 0 && out_block->rows() > 0) {
+                size_t current_bytes = out_block->bytes();
+                if (current_bytes >= block_max_bytes) {
+                    max_rows_to_add = 0;
+                } else {
+                    size_t avg_row_bytes = current_bytes / out_block->rows();
+                    if (avg_row_bytes > 0) {
+                        size_t rows_for_budget = (block_max_bytes - 
current_bytes) / avg_row_bytes;

Review Comment:
   Using out_block->bytes() / out_block->rows() here does not actually bound 
the bytes added by the rows in _distinct_row. If the buffered rows are narrow 
but the next distinct key is wide, such as large STRING, JSON, or VARIANT 
values, rows_for_budget can still be at least 1 and this push emits a block far 
above preferred_block_size_bytes. To preserve the new contract, this split 
needs to account for the candidate rows bytes themselves, not just the 
historical average of the already-buffered rows.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to