github-actions[bot] commented on code in PR #61535:
URL: https://github.com/apache/doris/pull/61535#discussion_r2963550638


##########
be/src/storage/segment/adaptive_block_size_predictor.cpp:
##########
@@ -0,0 +1,171 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "storage/segment/adaptive_block_size_predictor.h"
+
+#include <gen_cpp/segment_v2.pb.h>
+
+#include <algorithm>
+#include <cstddef>
+#include <unordered_set>
+
+#include "core/block/block.h"
+#include "storage/segment/segment.h"
+#include "storage/tablet/tablet_schema.h"
+
+namespace doris::segment_v2 {
+
+AdaptiveBlockSizePredictor::AdaptiveBlockSizePredictor(size_t 
preferred_block_size_bytes,
+                                                       size_t 
preferred_max_col_bytes)
+        : _block_size_bytes(preferred_block_size_bytes), 
_max_col_bytes(preferred_max_col_bytes) {}
+
+void AdaptiveBlockSizePredictor::update(const Block& block,
+                                        const std::vector<ColumnId>& 
output_columns) {
+    size_t rows = block.rows();
+    if (rows == 0) {
+        return;
+    }
+    double cur = static_cast<double>(block.bytes()) / 
static_cast<double>(rows);
+
+    if (!_has_history) {
+        _bytes_per_row = cur;
+        _has_history = true;
+    } else {
+        _bytes_per_row = kAlpha * _bytes_per_row + kBeta * cur;
+    }
+
+    // Per-column EWMA sampling.
+    // output_columns[i] corresponds to block column position i (caller's 
guarantee).
+    size_t ncols = std::min(output_columns.size(), size_t(block.columns()));
+    for (size_t i = 0; i < ncols; ++i) {
+        ColumnId cid = output_columns[i];
+        double col_cur = 
static_cast<double>(block.get_by_position(i).column->byte_size()) /
+                         static_cast<double>(rows);
+        auto it = _col_bytes_per_row.find(cid);
+        if (it == _col_bytes_per_row.end()) {
+            _col_bytes_per_row[cid] = col_cur;
+        } else {
+            it->second = kAlpha * it->second + kBeta * col_cur;
+        }
+    }
+}
+
+size_t AdaptiveBlockSizePredictor::predict_next_rows(size_t max_rows, Segment& 
segment,
+                                                     const 
std::vector<ColumnId>& output_columns) {
+    if (_block_size_bytes == 0) {
+        return max_rows;
+    }
+
+    auto clamp_predicted_rows = [&](size_t predicted_rows) {
+        size_t clamped_rows = std::min(predicted_rows, max_rows);
+        if (!_has_history) {
+            clamped_rows = std::min(clamped_rows, kSafetyBatchRowThreshold);
+        }
+        return std::max(size_t(1), clamped_rows);
+    };
+
+    double estimated_bytes_per_row = 0.0;
+
+    if (!_has_history) {
+        // First-call path: use cached metadata hint or compute it once.
+        if (_metadata_hint_bytes_per_row == 0.0) {
+            uint32_t seg_rows = segment.num_rows();
+            if (seg_rows > 0) {
+                // Build a set of unique_ids we care about for fast lookup.
+                // We need the TabletSchema to map ColumnId -> unique_id.
+                // Use the segment's tablet_schema if available via 
traverse_column_meta_pbs.
+                // We collect raw data bytes only for columns in 
output_columns.
+                // To map output ColumnId to unique_id we need the schema; the 
segment
+                // exposes traverse_column_meta_pbs which visits every 
ColumnMetaPB.
+                // We accumulate bytes for all top-level columns and divide by 
segment rows.
+                // We use a conservative 1.2x factor to avoid over-shrinking 
the first batch.
+                double total_bytes = 0.0;
+                uint64_t matched_cols = 0;
+
+                // Build a set of unique_ids corresponding to output_columns 
using the
+                // segment's tablet_schema.
+                const TabletSchemaSPtr& ts = segment.tablet_schema();
+                std::unordered_set<int32_t> wanted_uids;
+                if (ts) {
+                    for (ColumnId cid : output_columns) {
+                        if (static_cast<size_t>(cid) < ts->num_columns()) {
+                            int32_t uid = ts->column(cid).unique_id();
+                            if (uid >= 0) {
+                                wanted_uids.insert(uid);
+                            }
+                        }
+                    }
+                }
+
+                if (!wanted_uids.empty()) {
+                    // traverse_column_meta_pbs visits all ColumnMetaPB 
entries in the footer.
+                    auto collector = [&](const ColumnMetaPB& meta) {
+                        if (meta.has_unique_id() && 
wanted_uids.contains(meta.unique_id()) &&
+                            meta.has_raw_data_bytes()) {
+                            total_bytes += 
static_cast<double>(meta.raw_data_bytes());
+                            matched_cols++;
+                        }
+                    };
+                    // Ignore error: if traversal fails, total_bytes stays 0 
and we fall through.
+                    auto st = segment.traverse_column_meta_pbs(collector);
+                    if (!st.ok()) {
+                        LOG(WARNING) << "Failed to traverse column metadata: " 
<< st.to_string();
+                        return clamp_predicted_rows(max_rows);
+                    }
+                }
+
+                if (matched_cols > 0 && total_bytes > 0.0) {
+                    _metadata_hint_bytes_per_row =
+                            (total_bytes / static_cast<double>(seg_rows)) * 
1.2;
+                }
+            }
+        }
+
+        if (_metadata_hint_bytes_per_row > 0.0) {
+            estimated_bytes_per_row = _metadata_hint_bytes_per_row;
+        } else {
+            // No metadata or empty segment — fall back to max_rows.
+            return clamp_predicted_rows(max_rows);
+        }
+    } else {
+        estimated_bytes_per_row = _bytes_per_row;
+    }
+
+    if (estimated_bytes_per_row <= 0.0) {
+        return clamp_predicted_rows(max_rows);
+    }
+
+    auto predicted =
+            static_cast<size_t>(static_cast<double>(_block_size_bytes) / 
estimated_bytes_per_row);
+    LOG(INFO) << "****** predicted rows: " << predicted
+              << ", estimated_bytes_per_row: " << estimated_bytes_per_row
+              << ", _block_size_bytes: " << _block_size_bytes;

Review Comment:
   **[Bug/High]** This `LOG(INFO)` with `******` prefix is called on **every** 
`predict_next_rows()` invocation — i.e., every batch in every segment scan. 
This will generate massive log spam in production.
   
   This should be changed to `VLOG_DEBUG` or removed entirely. At minimum, this 
is clearly a debug statement accidentally left in:
   ```cpp
   VLOG_DEBUG << "predicted rows: " << predicted
              << ", estimated_bytes_per_row: " << estimated_bytes_per_row
              << ", _block_size_bytes: " << _block_size_bytes;
   ```



##########
be/src/storage/iterator/block_reader.cpp:
##########
@@ -386,6 +387,13 @@ Status BlockReader::_replace_key_next_block(Block* block, 
bool* eof) {
                 break;
             }
         }
+        // Byte-budget check: after the inner loop _next_row is either EOF or 
the next different
+        // key, so it is safe to stop accumulating here without repeating any 
row.
+        if (config::enable_adaptive_batch_size && 
_reader_context.preferred_block_size_bytes > 0 &&
+            Block::columns_byte_size(target_columns) >=
+                    _reader_context.preferred_block_size_bytes) {
+            break;
+        }

Review Comment:
   **[Bug]** When `record_rowids` is enabled, this byte-budget `break` exits 
the outer `while` loop without resizing `_block_row_locations` to 
`target_block_row`. At line 349, `_block_row_locations` was resized to 
`batch_size()`, so after this break it will still have `batch_size()` entries 
instead of `target_block_row` entries.
   
   The EOF path at lines 372-374 correctly does 
`_block_row_locations.resize(target_block_row)`. This break path needs the same 
treatment:
   ```cpp
   if (config::enable_adaptive_batch_size && 
_reader_context.preferred_block_size_bytes > 0 &&
       Block::columns_byte_size(target_columns) >=
               _reader_context.preferred_block_size_bytes) {
       if (UNLIKELY(_reader_context.record_rowids)) {
           _block_row_locations.resize(target_block_row);
       }
       break;
   }
   ```
   
   Without this fix, `current_block_row_locations()` returns a vector with 
stale entries, and the DCHECK at line 604 
(`DCHECK_EQ(_block_row_locations.size(), block->rows() + delete_count)`) will 
fail.



##########
be/src/storage/iterator/block_reader.cpp:
##########
@@ -539,6 +555,12 @@ Status BlockReader::_unique_key_next_block(Block* block, 
bool* eof) {
             LOG(WARNING) << "next failed: " << res;
             return res;
         }
+        // Byte-budget check: _next_row is already saved so stopping here is 
safe.
+        if (config::enable_adaptive_batch_size && 
_reader_context.preferred_block_size_bytes > 0 &&
+            Block::columns_byte_size(target_columns) >=
+                    _reader_context.preferred_block_size_bytes) {
+            break;
+        }

Review Comment:
   **[Bug]** Same issue as in `_replace_key_next_block`: when `record_rowids` 
is enabled, this byte-budget `break` exits without resizing 
`_block_row_locations` to `target_block_row`. At line 530, 
`_block_row_locations` was resized to `_reader_context.batch_size`. The EOF 
path at lines 548-550 correctly resizes.
   
   Fix:
   ```cpp
   if (config::enable_adaptive_batch_size && 
_reader_context.preferred_block_size_bytes > 0 &&
       Block::columns_byte_size(target_columns) >=
               _reader_context.preferred_block_size_bytes) {
       if (UNLIKELY(_reader_context.record_rowids)) {
           _block_row_locations.resize(target_block_row);
       }
       break;
   }
   ```
   
   Note: in `_unique_key_next_block`, there is also the 
`_delete_sign_available` filtering path (starting at line 566) that uses 
`target_block_row` and `_block_row_locations`. An incorrectly-sized 
`_block_row_locations` could corrupt the filter logic.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to