github-actions[bot] commented on code in PR #61535:
URL: https://github.com/apache/doris/pull/61535#discussion_r2964348197
##########
be/src/storage/segment/segment_iterator.cpp:
##########
@@ -2456,6 +2463,22 @@ Status SegmentIterator::next_batch(Block* block) {
_init_virtual_columns(block);
auto status = [&]() {
RETURN_IF_CATCH_EXCEPTION({
+ // Adaptive batch size: predict how many rows this batch should
read.
+ if (_block_size_predictor) {
+ auto predicted =
static_cast<uint32_t>(_block_size_predictor->predict_next_rows(
+ _initial_block_row_max, *_segment,
_opts.adaptive_batch_output_columns));
+ _opts.block_row_max = std::min(predicted,
_initial_block_row_max);
+ _opts.stats->adaptive_batch_size_predict_min_rows =
+
std::min(_opts.stats->adaptive_batch_size_predict_min_rows,
+ static_cast<int64_t>(predicted));
+ _opts.stats->adaptive_batch_size_predict_max_rows =
+
std::max(_opts.stats->adaptive_batch_size_predict_max_rows,
+ static_cast<int64_t>(predicted));
+ } else {
+ _opts.stats->adaptive_batch_size_predict_min_rows =
_opts.block_row_max;
+ _opts.stats->adaptive_batch_size_predict_max_rows =
_opts.block_row_max;
Review Comment:
**Minor (Low):** When `_block_size_predictor` is null (feature disabled),
this `else` branch unconditionally overwrites
`adaptive_batch_size_predict_min_rows` and
`adaptive_batch_size_predict_max_rows` with `_opts.block_row_max` on **every**
`next_batch()` call. Since `OlapReaderStatistics` is shared across segment
iterators for the same scanner, and `adaptive_batch_size_predict_min_rows` is
initialized to `INT64_MAX`, the first segment iterator's overwrite clobbers the
sentinel.
This means the profile counters will show misleading values when the feature
is disabled. Consider only setting these once, or guarding with a check (e.g.,
only set if currently `INT64_MAX` / `0`).
##########
be/test/storage/segment/adaptive_block_size_predictor_test.cpp:
##########
@@ -0,0 +1,544 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "storage/segment/adaptive_block_size_predictor.h"
+
+#include <gen_cpp/segment_v2.pb.h>
+#include <gmock/gmock.h>
+#include <gtest/gtest.h>
+
+#include <memory>
+#include <vector>
+
+#include "common/config.h"
+#include "common/status.h"
+#include "core/block/block.h"
+#include "core/column/column_string.h"
+#include "core/column/column_vector.h"
+#include "core/data_type/data_type_number.h"
+#include "core/data_type/data_type_string.h"
+#include "storage/olap_common.h"
+#include "storage/segment/mock/mock_segment.h"
+#include "storage/tablet/tablet_schema.h"
+
+namespace doris {
+namespace segment_v2 {
+
+// ── helper functions
──────────────────────────────────────────────────────────
+
+// Build a Block with N rows, each containing a single Int32 column of the
given value.
+static Block make_int32_block(size_t rows, int32_t value = 42) {
+ auto col = ColumnVector<TYPE_INT>::create();
+ col->reserve(rows);
+ for (size_t i = 0; i < rows; ++i) {
+ col->insert_value(value);
+ }
+ Block block;
+ block.insert({std::move(col), std::make_shared<DataTypeInt32>(), "c0"});
+ return block;
+}
+
+// Build a Block with N rows where each row holds a string of |str_len| bytes.
+static Block make_string_block(size_t rows, size_t str_len) {
+ auto col = ColumnString::create();
+ col->reserve(rows);
+ std::string s(str_len, 'x');
+ for (size_t i = 0; i < rows; ++i) {
+ col->insert_data(s.data(), s.size());
+ }
+ Block block;
+ block.insert({std::move(col), std::make_shared<DataTypeString>(), "c0"});
+ return block;
+}
+
+// ── AdaptiveBlockSizePredictorTest
───────────────────────────────────────────
+
+class AdaptiveBlockSizePredictorTest : public testing::Test {
+protected:
+ // 8 MB target, 1 MB per-column limit
+ static constexpr size_t kBlockBytes = 8 * 1024 * 1024;
+ static constexpr size_t kColBytes = 1 * 1024 * 1024;
+ static constexpr size_t kMaxRows = 4096;
+};
+
+// ── Test 1: no history
────────────────────────────────────────────────────────
+// Before any update, has_history == false and bytes_per_row == 0.
+// After the first update, has_history == true and bytes_per_row ==
block.bytes()/rows.
+TEST_F(AdaptiveBlockSizePredictorTest, NoHistoryReturnsMaxRows) {
+ AdaptiveBlockSizePredictor pred(kBlockBytes, kColBytes);
+
+ // Initially no history.
+ EXPECT_FALSE(pred.has_history_for_test());
+ EXPECT_DOUBLE_EQ(pred.bytes_per_row_for_test(), 0.0);
+
+ // After one update the first sample is stored directly (no EWMA blending).
+ Block blk = make_int32_block(100);
+ std::vector<ColumnId> cols = {0};
+ pred.update(blk, cols);
+
+ EXPECT_TRUE(pred.has_history_for_test());
+ double expected_bpr = static_cast<double>(blk.bytes()) / 100.0;
+ EXPECT_DOUBLE_EQ(pred.bytes_per_row_for_test(), expected_bpr);
+}
+
+// ── Test 2: EWMA convergence
──────────────────────────────────────────────────
+// When every update delivers the same sample, the EWMA stays exactly at that
+// value (0.9*v + 0.1*v == v for any v).
+TEST_F(AdaptiveBlockSizePredictorTest, EwmaConvergence) {
+ AdaptiveBlockSizePredictor pred(kBlockBytes, kColBytes);
+
+ std::vector<ColumnId> cols = {0};
+
+ // Compute expected bytes-per-row from an actual block so the test does not
+ // hard-code internal column memory layout assumptions.
+ Block probe = make_string_block(100, 100);
+ double expected_bpr = static_cast<double>(probe.bytes()) / 100.0;
+
+ // First update seeds the EWMA directly.
+ pred.update(probe, cols);
+ EXPECT_DOUBLE_EQ(pred.bytes_per_row_for_test(), expected_bpr);
+
+ // All subsequent updates carry the same sample → EWMA stays constant.
+ for (int i = 1; i < 50; ++i) {
+ Block blk = make_string_block(100, 100);
+ pred.update(blk, cols);
+ }
+ EXPECT_NEAR(pred.bytes_per_row_for_test(), expected_bpr, 0.01);
+}
+
+// ── Test 3: per-column constraint
────────────────────────────────────────────
+// After updates with a wide-column block the per-column limit must be tighter
+// than the total-bytes limit, verifying the predictor would apply it.
+TEST_F(AdaptiveBlockSizePredictorTest, PerColumnConstraint) {
+ AdaptiveBlockSizePredictor pred(kBlockBytes, kColBytes);
+
+ Block blk = make_string_block(100, 1024);
+ double first_bpr = static_cast<double>(blk.bytes()) / 100.0;
+ std::vector<ColumnId> cols = {0};
+ pred.update(blk, cols);
+
+ // After first update, col[0] estimate equals block bytes-per-row exactly.
+ EXPECT_NEAR(pred.col_bytes_per_row_for_test(0), first_bpr, 0.01);
+
+ // Second update with 200-row block of same row width → EWMA blends.
+ Block blk2 = make_string_block(200, 1024);
+ double second_bpr = static_cast<double>(blk2.bytes()) / 200.0;
+ pred.update(blk2, cols);
+
+ double col_bpr = pred.col_bytes_per_row_for_test(0);
+ double ewma_expected = 0.9 * first_bpr + 0.1 * second_bpr;
+ EXPECT_NEAR(col_bpr, ewma_expected, 1.0);
+
+ // The per-column row limit must be strictly smaller than the total row
limit
+ // for a 1024 B/row column (1 MB col limit vs 8 MB block limit).
+ size_t col_limit = static_cast<size_t>(static_cast<double>(kColBytes) /
col_bpr);
+ size_t total_limit =
+ static_cast<size_t>(static_cast<double>(kBlockBytes) /
pred.bytes_per_row_for_test());
+ EXPECT_LT(col_limit, total_limit);
+}
+
+// ── Test 4: zero rows block is ignored
───────────────────────────────────────
+TEST_F(AdaptiveBlockSizePredictorTest, ZeroRowsBlockIgnored) {
+ AdaptiveBlockSizePredictor pred(kBlockBytes, kColBytes);
+
+ // update() with an empty block must be a no-op.
+ Block blk = make_int32_block(0);
+ std::vector<ColumnId> cols = {0};
+ pred.update(blk, cols);
+
+ EXPECT_FALSE(pred.has_history_for_test());
+ EXPECT_DOUBLE_EQ(pred.bytes_per_row_for_test(), 0.0);
+
+ // A subsequent real update must still work normally.
+ Block blk2 = make_int32_block(50);
+ pred.update(blk2, cols);
+ EXPECT_TRUE(pred.has_history_for_test());
+ double expected_bpr = static_cast<double>(blk2.bytes()) / 50.0;
+ EXPECT_DOUBLE_EQ(pred.bytes_per_row_for_test(), expected_bpr);
+}
+
+// ── Test 5: disabled when preferred_block_size_bytes == 0
────────────────────
+// update() still records history; predict_next_rows() returns max_rows early
+// (requires a real Segment so that path is covered by integration tests).
+TEST_F(AdaptiveBlockSizePredictorTest, DisabledWhenBlockSizeIsZero) {
+ AdaptiveBlockSizePredictor pred(0, kColBytes);
+
+ Block blk = make_int32_block(1000);
+ std::vector<ColumnId> cols = {0};
+ pred.update(blk, cols);
+
+ // update() still records history even when budget == 0.
+ EXPECT_TRUE(pred.has_history_for_test());
+ EXPECT_GT(pred.bytes_per_row_for_test(), 0.0);
+}
+
+// ── Test 6: config flag disables predictor
────────────────────────────────────
+// When enable_adaptive_batch_size is false, no predictor should be created.
+// This is verified at the SegmentIterator level; here we just validate the
+// config default is true.
+TEST_F(AdaptiveBlockSizePredictorTest, ConfigDefaultEnabled) {
+ EXPECT_TRUE(config::enable_adaptive_batch_size);
+}
+
+// ── Test 7: multiple column sampling
─────────────────────────────────────────
+TEST_F(AdaptiveBlockSizePredictorTest, MultipleColumnSampling) {
+ AdaptiveBlockSizePredictor pred(kBlockBytes, kColBytes);
+
+ // Build a 2-column block: col0 = Int32 (4 B/row), col1 = String(100
B/row).
+ auto col0 = ColumnVector<TYPE_INT>::create();
+ auto col1 = ColumnString::create();
+ const size_t rows = 50;
+ std::string s(100, 'y');
+ for (size_t i = 0; i < rows; ++i) {
+ col0->insert_value(1);
+ col1->insert_data(s.data(), s.size());
+ }
+ Block blk;
+ blk.insert({std::move(col0), std::make_shared<DataTypeInt32>(), "c0"});
+ blk.insert({std::move(col1), std::make_shared<DataTypeString>(), "c1"});
+
+ std::vector<ColumnId> cols = {0, 1};
+ pred.update(blk, cols);
+
+ // Both columns must have per-column estimates after the first update.
+ double c0_bpr = pred.col_bytes_per_row_for_test(0);
+ double c1_bpr = pred.col_bytes_per_row_for_test(1);
+ EXPECT_GT(c0_bpr, 0.0);
+ EXPECT_GT(c1_bpr, 0.0);
+
+ // String column (100 B/row) must be much wider than Int32 column (4
B/row).
+ EXPECT_GT(c1_bpr, c0_bpr);
+
+ // Second update with same block → EWMA stays constant (same sample).
+ pred.update(blk, cols);
+
+ double expected_c0 =
static_cast<double>(blk.get_by_position(0).column->byte_size()) / rows;
+ double expected_c1 =
static_cast<double>(blk.get_by_position(1).column->byte_size()) / rows;
+ EXPECT_NEAR(pred.col_bytes_per_row_for_test(0), expected_c0, 0.01);
+ EXPECT_NEAR(pred.col_bytes_per_row_for_test(1), expected_c1, 0.01);
+}
+
+// ── predict_next_rows tests
──────────────────────────────────────────────────
+
+using ::testing::_;
+using ::testing::NiceMock;
+using ::testing::Return;
+
+// Helper: set up a MockSegment with tablet schema columns and a footer
containing
+// ColumnMetaPB entries. Each entry in |col_specs| is {unique_id,
uncompressed_bytes}.
+static NiceMock<MockSegment>* make_mock_segment(
+ uint32_t num_rows, const std::vector<std::pair<int32_t, uint64_t>>&
col_specs) {
+ auto* seg = new NiceMock<MockSegment>();
+
+ // Set up num_rows mock.
+ ON_CALL(*seg, num_rows()).WillByDefault(Return(num_rows));
+
Review Comment:
**Bug (High):** `Segment::num_rows()` is **non-virtual** (declared as
`uint32_t num_rows() const { return _num_rows; }` in `segment.h:119`). The
`ON_CALL(*seg, num_rows()).WillByDefault(Return(num_rows))` configures gmock's
virtual mock method, but when `predict_next_rows()` calls `segment.num_rows()`
through a `Segment&` reference, the compiler statically dispatches to the base
class's non-virtual method, bypassing the mock entirely.
The actual value returned will be whatever `_num_rows` was left at by the
base `Segment` constructor — which does **not** initialize `_num_rows` in its
member initializer list (see `segment.cpp:168-174`). This means tests like
`PredictNoHistoryMetadataHint` are reading **indeterminate memory** (undefined
behavior).
**Fix:** Since `MockSegment` is declared as a `friend` of `Segment`, you can
directly set the member:
```cpp
seg->_num_rows = num_rows;
```
instead of (or in addition to) the `ON_CALL` mock.
##########
be/src/exec/scan/olap_scanner.cpp:
##########
@@ -840,6 +844,19 @@ void OlapScanner::_collect_profile_before_close() {
COUNTER_UPDATE(local_state->_variant_doc_value_column_iter_count,
stats.variant_doc_value_column_iter_count);
+ if (stats.adaptive_batch_size_predict_max_rows > 0) {
+ auto cur_min =
local_state->_adaptive_batch_predict_min_rows_counter->value();
+ if (cur_min == 0 || stats.adaptive_batch_size_predict_min_rows <
cur_min) {
Review Comment:
**Minor (Low):** This read-check-store pattern on shared
`RuntimeProfile::Counter` is a classic TOCTOU race. Multiple scanners call
`_collect_profile_before_close` concurrently on the same `local_state`
counters. Thread A can read a stale `cur_min`, pass the check, and overwrite
thread B's correct value.
Since these are display-only profile counters, this is benign — the worst
outcome is a slightly inaccurate min/max in the query profile. But consider
documenting this or using `COUNTER_UPDATE` with atomic min/max if
`RuntimeProfile::Counter` supports it.
##########
regression-test/suites/query_p0/adaptive_batch_size/adaptive_batch_size.groovy:
##########
@@ -0,0 +1,220 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+// Regression tests for the Adaptive Batch Size feature.
+//
+// Design notes:
+// - Each case runs the same query with the feature enabled and disabled, and
+// asserts that results are identical (correctness check).
+// - We do NOT directly assert internal block byte sizes, because the storage
+// layer does not expose them via SQL result columns. Correctness is the
+// primary requirement; performance / memory reduction is verified manually
+// or via profile counters in a separate benchmark.
+
+suite("adaptive_batch_size") {
+
+ // ── helpers
────────────────────────────────────────────────────────────────
+
+ def set_adaptive = { enabled ->
+ // preferred_block_size_bytes controls the feature end-to-end.
+ // Setting it to a very small value (e.g. 1 byte) forces the predictor
to
+ // return row count = 1 every time, which is a good stress test.
+ if (enabled) {
+ sql "set preferred_block_size_bytes = 8388608" // 8 MB
(default)
+ sql "set batch_size = 65535"
+ sql "set preferred_max_column_in_block_size_bytes = 1048576" // 1
MB
+ } else {
+ // Setting to 0 disables the byte-limit path at the accumulation
layer.
+ sql "set preferred_block_size_bytes = 0"
+ sql "set batch_size = 4096"
+ sql "set preferred_max_column_in_block_size_bytes = 0"
+ }
+ }
+
+ // ── Test 1: wide table (VARCHAR columns)
──────────────────────────────────
+ // Each row is ~10 KB; with 4096 rows that is ~40 MB/batch which OOM-risks.
+ // With adaptive=on the batch is trimmed to ~8 MB worth of rows.
+
+ sql "drop table if exists abs_wide_table"
+ sql """
+ create table abs_wide_table (
+ id int not null,
+ c1 varchar(4096),
+ c2 varchar(4096),
+ c3 varchar(4096)
+ )
+ ENGINE=OLAP
+ DUPLICATE KEY(id)
+ DISTRIBUTED BY HASH(id) BUCKETS 1
+ PROPERTIES ("replication_allocation" = "tag.location.default: 1")
+ """
+
+ // Insert 1000 rows with ~3 KB data each.
+ def wide_rows = (1..1000).collect { i ->
+ "(${i}, '${('a' * 1000)}', '${('b' * 1000)}', '${('c' * 1000)}')"
+ }
+ sql "insert into abs_wide_table values ${wide_rows.join(',')}"
+
+ // Run query with adaptive enabled and collect result.
+ set_adaptive(true)
+ def res_enabled = sql "select id, length(c1) as l1, length(c2) as l2,
length(c3) as l3 from abs_wide_table order by 1, 2, 3, 4"
+
+ qt_wide "select id, length(c1) as l1, length(c2) as l2, length(c3) as l3
from abs_wide_table order by 1, 2, 3, 4 limit 50"
+
Review Comment:
**Standards violation (Medium):** Per regression test standards, use
`order_qt_` prefix instead of `qt_` to ensure deterministic ordered output.
This applies to all query tags in this file (`qt_wide`, `qt_narrow`, `qt_agg`,
`qt_unique`, `qt_flag`).
While some of these queries have explicit `ORDER BY` or return single rows,
the standard convention is to consistently use `order_qt_` prefix.
##########
regression-test/suites/query_p0/adaptive_batch_size/adaptive_batch_size.groovy:
##########
@@ -0,0 +1,220 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+// Regression tests for the Adaptive Batch Size feature.
+//
+// Design notes:
+// - Each case runs the same query with the feature enabled and disabled, and
+// asserts that results are identical (correctness check).
+// - We do NOT directly assert internal block byte sizes, because the storage
+// layer does not expose them via SQL result columns. Correctness is the
+// primary requirement; performance / memory reduction is verified manually
+// or via profile counters in a separate benchmark.
+
+suite("adaptive_batch_size") {
+
+ // ── helpers
────────────────────────────────────────────────────────────────
+
+ def set_adaptive = { enabled ->
+ // preferred_block_size_bytes controls the feature end-to-end.
+ // Setting it to a very small value (e.g. 1 byte) forces the predictor
to
+ // return row count = 1 every time, which is a good stress test.
+ if (enabled) {
+ sql "set preferred_block_size_bytes = 8388608" // 8 MB
(default)
+ sql "set batch_size = 65535"
+ sql "set preferred_max_column_in_block_size_bytes = 1048576" // 1
MB
+ } else {
+ // Setting to 0 disables the byte-limit path at the accumulation
layer.
+ sql "set preferred_block_size_bytes = 0"
+ sql "set batch_size = 4096"
+ sql "set preferred_max_column_in_block_size_bytes = 0"
+ }
+ }
+
+ // ── Test 1: wide table (VARCHAR columns)
──────────────────────────────────
+ // Each row is ~10 KB; with 4096 rows that is ~40 MB/batch which OOM-risks.
+ // With adaptive=on the batch is trimmed to ~8 MB worth of rows.
+
+ sql "drop table if exists abs_wide_table"
+ sql """
+ create table abs_wide_table (
+ id int not null,
+ c1 varchar(4096),
+ c2 varchar(4096),
+ c3 varchar(4096)
+ )
+ ENGINE=OLAP
+ DUPLICATE KEY(id)
+ DISTRIBUTED BY HASH(id) BUCKETS 1
+ PROPERTIES ("replication_allocation" = "tag.location.default: 1")
+ """
+
+ // Insert 1000 rows with ~3 KB data each.
+ def wide_rows = (1..1000).collect { i ->
+ "(${i}, '${('a' * 1000)}', '${('b' * 1000)}', '${('c' * 1000)}')"
+ }
+ sql "insert into abs_wide_table values ${wide_rows.join(',')}"
+
+ // Run query with adaptive enabled and collect result.
+ set_adaptive(true)
+ def res_enabled = sql "select id, length(c1) as l1, length(c2) as l2,
length(c3) as l3 from abs_wide_table order by 1, 2, 3, 4"
+
+ qt_wide "select id, length(c1) as l1, length(c2) as l2, length(c3) as l3
from abs_wide_table order by 1, 2, 3, 4 limit 50"
+
+ // Run query with adaptive disabled and collect result.
+ set_adaptive(false)
+ def res_disabled = sql "select id, length(c1) as l1, length(c2) as l2,
length(c3) as l3 from abs_wide_table order by 1, 2, 3, 4"
+
+ // Results must be identical.
+ assertEquals(res_enabled.size(), res_disabled.size())
+ for (int i = 0; i < res_enabled.size(); i++) {
+ assertEquals(res_enabled[i].toString(), res_disabled[i].toString())
+ }
+
+ // sql "drop table abs_wide_table"
+
+
+ // ── Test 2: narrow table (INT columns)
───────────────────────────────────
+ // Rows are ~12 bytes each; with adaptive=on the predictor should converge
+ // toward returning close to max_rows (batch is still row-limited).
+
+ sql "drop table if exists abs_narrow_table"
+ sql """
Review Comment:
**Standards violation (Medium):** Per regression test standards: *"After
completing tests, do not drop tables; instead drop tables before using them in
tests, to preserve the environment for debugging."*
This `drop table` after use (and similar ones for `abs_agg_table`,
`abs_unique_table`, `abs_flag_table`) should be removed. The `drop table if
exists` before `CREATE TABLE` at the beginning of each test case is the correct
pattern (and is already present).
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]