github-actions[bot] commented on code in PR #61535: URL: https://github.com/apache/doris/pull/61535#discussion_r2963550638
########## be/src/storage/segment/adaptive_block_size_predictor.cpp: ########## @@ -0,0 +1,171 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "storage/segment/adaptive_block_size_predictor.h" + +#include <gen_cpp/segment_v2.pb.h> + +#include <algorithm> +#include <cstddef> +#include <unordered_set> + +#include "core/block/block.h" +#include "storage/segment/segment.h" +#include "storage/tablet/tablet_schema.h" + +namespace doris::segment_v2 { + +AdaptiveBlockSizePredictor::AdaptiveBlockSizePredictor(size_t preferred_block_size_bytes, + size_t preferred_max_col_bytes) + : _block_size_bytes(preferred_block_size_bytes), _max_col_bytes(preferred_max_col_bytes) {} + +void AdaptiveBlockSizePredictor::update(const Block& block, + const std::vector<ColumnId>& output_columns) { + size_t rows = block.rows(); + if (rows == 0) { + return; + } + double cur = static_cast<double>(block.bytes()) / static_cast<double>(rows); + + if (!_has_history) { + _bytes_per_row = cur; + _has_history = true; + } else { + _bytes_per_row = kAlpha * _bytes_per_row + kBeta * cur; + } + + // Per-column EWMA sampling. + // output_columns[i] corresponds to block column position i (caller's guarantee). + size_t ncols = std::min(output_columns.size(), size_t(block.columns())); + for (size_t i = 0; i < ncols; ++i) { + ColumnId cid = output_columns[i]; + double col_cur = static_cast<double>(block.get_by_position(i).column->byte_size()) / + static_cast<double>(rows); + auto it = _col_bytes_per_row.find(cid); + if (it == _col_bytes_per_row.end()) { + _col_bytes_per_row[cid] = col_cur; + } else { + it->second = kAlpha * it->second + kBeta * col_cur; + } + } +} + +size_t AdaptiveBlockSizePredictor::predict_next_rows(size_t max_rows, Segment& segment, + const std::vector<ColumnId>& output_columns) { + if (_block_size_bytes == 0) { + return max_rows; + } + + auto clamp_predicted_rows = [&](size_t predicted_rows) { + size_t clamped_rows = std::min(predicted_rows, max_rows); + if (!_has_history) { + clamped_rows = std::min(clamped_rows, kSafetyBatchRowThreshold); + } + return std::max(size_t(1), clamped_rows); + }; + + double estimated_bytes_per_row = 0.0; + + if (!_has_history) { + // First-call path: use cached metadata hint or compute it once. + if (_metadata_hint_bytes_per_row == 0.0) { + uint32_t seg_rows = segment.num_rows(); + if (seg_rows > 0) { + // Build a set of unique_ids we care about for fast lookup. + // We need the TabletSchema to map ColumnId -> unique_id. + // Use the segment's tablet_schema if available via traverse_column_meta_pbs. + // We collect raw data bytes only for columns in output_columns. + // To map output ColumnId to unique_id we need the schema; the segment + // exposes traverse_column_meta_pbs which visits every ColumnMetaPB. + // We accumulate bytes for all top-level columns and divide by segment rows. + // We use a conservative 1.2x factor to avoid over-shrinking the first batch. + double total_bytes = 0.0; + uint64_t matched_cols = 0; + + // Build a set of unique_ids corresponding to output_columns using the + // segment's tablet_schema. + const TabletSchemaSPtr& ts = segment.tablet_schema(); + std::unordered_set<int32_t> wanted_uids; + if (ts) { + for (ColumnId cid : output_columns) { + if (static_cast<size_t>(cid) < ts->num_columns()) { + int32_t uid = ts->column(cid).unique_id(); + if (uid >= 0) { + wanted_uids.insert(uid); + } + } + } + } + + if (!wanted_uids.empty()) { + // traverse_column_meta_pbs visits all ColumnMetaPB entries in the footer. + auto collector = [&](const ColumnMetaPB& meta) { + if (meta.has_unique_id() && wanted_uids.contains(meta.unique_id()) && + meta.has_raw_data_bytes()) { + total_bytes += static_cast<double>(meta.raw_data_bytes()); + matched_cols++; + } + }; + // Ignore error: if traversal fails, total_bytes stays 0 and we fall through. + auto st = segment.traverse_column_meta_pbs(collector); + if (!st.ok()) { + LOG(WARNING) << "Failed to traverse column metadata: " << st.to_string(); + return clamp_predicted_rows(max_rows); + } + } + + if (matched_cols > 0 && total_bytes > 0.0) { + _metadata_hint_bytes_per_row = + (total_bytes / static_cast<double>(seg_rows)) * 1.2; + } + } + } + + if (_metadata_hint_bytes_per_row > 0.0) { + estimated_bytes_per_row = _metadata_hint_bytes_per_row; + } else { + // No metadata or empty segment — fall back to max_rows. + return clamp_predicted_rows(max_rows); + } + } else { + estimated_bytes_per_row = _bytes_per_row; + } + + if (estimated_bytes_per_row <= 0.0) { + return clamp_predicted_rows(max_rows); + } + + auto predicted = + static_cast<size_t>(static_cast<double>(_block_size_bytes) / estimated_bytes_per_row); + LOG(INFO) << "****** predicted rows: " << predicted + << ", estimated_bytes_per_row: " << estimated_bytes_per_row + << ", _block_size_bytes: " << _block_size_bytes; Review Comment: **[Bug/High]** This `LOG(INFO)` with `******` prefix is called on **every** `predict_next_rows()` invocation — i.e., every batch in every segment scan. This will generate massive log spam in production. This should be changed to `VLOG_DEBUG` or removed entirely. At minimum, this is clearly a debug statement accidentally left in: ```cpp VLOG_DEBUG << "predicted rows: " << predicted << ", estimated_bytes_per_row: " << estimated_bytes_per_row << ", _block_size_bytes: " << _block_size_bytes; ``` ########## be/src/storage/iterator/block_reader.cpp: ########## @@ -386,6 +387,13 @@ Status BlockReader::_replace_key_next_block(Block* block, bool* eof) { break; } } + // Byte-budget check: after the inner loop _next_row is either EOF or the next different + // key, so it is safe to stop accumulating here without repeating any row. + if (config::enable_adaptive_batch_size && _reader_context.preferred_block_size_bytes > 0 && + Block::columns_byte_size(target_columns) >= + _reader_context.preferred_block_size_bytes) { + break; + } Review Comment: **[Bug]** When `record_rowids` is enabled, this byte-budget `break` exits the outer `while` loop without resizing `_block_row_locations` to `target_block_row`. At line 349, `_block_row_locations` was resized to `batch_size()`, so after this break it will still have `batch_size()` entries instead of `target_block_row` entries. The EOF path at lines 372-374 correctly does `_block_row_locations.resize(target_block_row)`. This break path needs the same treatment: ```cpp if (config::enable_adaptive_batch_size && _reader_context.preferred_block_size_bytes > 0 && Block::columns_byte_size(target_columns) >= _reader_context.preferred_block_size_bytes) { if (UNLIKELY(_reader_context.record_rowids)) { _block_row_locations.resize(target_block_row); } break; } ``` Without this fix, `current_block_row_locations()` returns a vector with stale entries, and the DCHECK at line 604 (`DCHECK_EQ(_block_row_locations.size(), block->rows() + delete_count)`) will fail. ########## be/src/storage/iterator/block_reader.cpp: ########## @@ -539,6 +555,12 @@ Status BlockReader::_unique_key_next_block(Block* block, bool* eof) { LOG(WARNING) << "next failed: " << res; return res; } + // Byte-budget check: _next_row is already saved so stopping here is safe. + if (config::enable_adaptive_batch_size && _reader_context.preferred_block_size_bytes > 0 && + Block::columns_byte_size(target_columns) >= + _reader_context.preferred_block_size_bytes) { + break; + } Review Comment: **[Bug]** Same issue as in `_replace_key_next_block`: when `record_rowids` is enabled, this byte-budget `break` exits without resizing `_block_row_locations` to `target_block_row`. At line 530, `_block_row_locations` was resized to `_reader_context.batch_size`. The EOF path at lines 548-550 correctly resizes. Fix: ```cpp if (config::enable_adaptive_batch_size && _reader_context.preferred_block_size_bytes > 0 && Block::columns_byte_size(target_columns) >= _reader_context.preferred_block_size_bytes) { if (UNLIKELY(_reader_context.record_rowids)) { _block_row_locations.resize(target_block_row); } break; } ``` Note: in `_unique_key_next_block`, there is also the `_delete_sign_available` filtering path (starting at line 566) that uses `target_block_row` and `_block_row_locations`. An incorrectly-sized `_block_row_locations` could corrupt the filter logic. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
