This is an automated email from the ASF dual-hosted git repository.

panxiaolei pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new 8572a0a467f [Chore](segment) refactor of 
SegmentIterator::_next_batch_internal (#56063)
8572a0a467f is described below

commit 8572a0a467f546ad68d6f6aafaf695fd70ce07dc
Author: Pxl <[email protected]>
AuthorDate: Thu Sep 18 11:17:18 2025 +0800

    [Chore](segment) refactor of SegmentIterator::_next_batch_internal (#56063)
    
    refactor of SegmentIterator::_next_batch_internal
---
 be/src/olap/iterators.h                            |   5 +-
 be/src/olap/rowset/segment_v2/segment_iterator.cpp | 471 +++++++++------------
 be/src/olap/rowset/segment_v2/segment_iterator.h   |  27 +-
 3 files changed, 223 insertions(+), 280 deletions(-)

diff --git a/be/src/olap/iterators.h b/be/src/olap/iterators.h
index 69c1eae6213..f2aa423f72b 100644
--- a/be/src/olap/iterators.h
+++ b/be/src/olap/iterators.h
@@ -96,7 +96,7 @@ public:
     // REQUIRED (null is not allowed)
     OlapReaderStatistics* stats = nullptr;
     bool use_page_cache = false;
-    int block_row_max = 4096 - 32; // see 
https://github.com/apache/doris/pull/11816
+    uint32_t block_row_max = 4096 - 32; // see 
https://github.com/apache/doris/pull/11816
 
     TabletSchemaSPtr tablet_schema = nullptr;
     bool enable_unique_key_merge_on_write = false;
@@ -189,9 +189,6 @@ public:
     // return schema for this Iterator
     virtual const Schema& schema() const = 0;
 
-    // Only used by UT. Whether lazy-materialization-read is used by this 
iterator or not.
-    virtual bool is_lazy_materialization_read() const { return false; }
-
     // Return the data id such as segment id, used for keep the insert order 
when do
     // merge sort in priority queue
     virtual uint64_t data_id() const { return 0; }
diff --git a/be/src/olap/rowset/segment_v2/segment_iterator.cpp 
b/be/src/olap/rowset/segment_v2/segment_iterator.cpp
index 25124643c18..6d5141ab814 100644
--- a/be/src/olap/rowset/segment_v2/segment_iterator.cpp
+++ b/be/src/olap/rowset/segment_v2/segment_iterator.cpp
@@ -391,7 +391,10 @@ Status SegmentIterator::init_iterators() {
     return Status::OK();
 }
 
-Status SegmentIterator::_lazy_init() {
+Status SegmentIterator::_lazy_init(vectorized::Block* block) {
+    if (_lazy_inited) {
+        return Status::OK();
+    }
     SCOPED_RAW_TIMER(&_opts.stats->block_init_ns);
     DorisMetrics::instance()->segment_read_total->increment(1);
     _row_bitmap.addRange(0, _segment->num_rows());
@@ -426,6 +429,64 @@ Status SegmentIterator::_lazy_init() {
     } else {
         _range_iter.reset(new BitmapRangeIterator(_row_bitmap));
     }
+
+    // If the row bitmap size is smaller than block_row_max, there's no need 
to reserve that many column rows.
+    auto nrows_reserve_limit = std::min(_row_bitmap.cardinality(), 
uint64_t(_opts.block_row_max));
+    if (_lazy_materialization_read || _opts.record_rowids || 
_is_need_expr_eval) {
+        _block_rowids.resize(_opts.block_row_max);
+    }
+    _current_return_columns.resize(_schema->columns().size());
+
+    _vec_init_char_column_id(block);
+    for (size_t i = 0; i < _schema->column_ids().size(); i++) {
+        ColumnId cid = _schema->column_ids()[i];
+        const auto* column_desc = _schema->column(cid);
+        if (_is_pred_column[cid]) {
+            auto storage_column_type = _storage_name_and_type[cid].second;
+            // Char type is special , since char type's computational datatype 
is same with string,
+            // both are DataTypeString, but DataTypeString only return 
FieldType::OLAP_FIELD_TYPE_STRING
+            // in get_storage_field_type.
+            RETURN_IF_CATCH_EXCEPTION(
+                    // Here, cid will not go out of bounds
+                    // because the size of _current_return_columns equals 
_schema->tablet_columns().size()
+                    _current_return_columns[cid] = 
Schema::get_predicate_column_ptr(
+                            _is_char_type[cid] ? 
FieldType::OLAP_FIELD_TYPE_CHAR
+                                               : 
storage_column_type->get_storage_field_type(),
+                            storage_column_type->is_nullable(), 
_opts.io_ctx.reader_type));
+            _current_return_columns[cid]->set_rowset_segment_id(
+                    {_segment->rowset_id(), _segment->id()});
+            _current_return_columns[cid]->reserve(nrows_reserve_limit);
+        } else if (i >= block->columns()) {
+            // This column needs to be scanned, but doesn't need to be 
returned upward. (delete sign)
+            // if i >= block->columns means the column and not the pred_column 
means `column i` is
+            // a delete condition column. but the column is not effective in 
the segment. so we just
+            // create a column to hold the data.
+            // a. origin data -> b. delete condition -> c. new load data
+            // the segment of c do not effective delete condition, but it 
still need read the column
+            // to match the schema.
+            // TODO: skip read the not effective delete column to speed up 
segment read.
+            _current_return_columns[cid] = 
Schema::get_data_type_ptr(*column_desc)->create_column();
+            _current_return_columns[cid]->reserve(nrows_reserve_limit);
+        }
+    }
+
+    // Additional deleted filter condition will be materialized column be at 
the end of the block,
+    // after _output_column_by_sel_idx  will be erase, we not need to filter 
it,
+    // so erase it from _columns_to_filter in the first next_batch.
+    // Eg:
+    //      `delete from table where a = 10;`
+    //      `select b from table;`
+    // a column only effective in segment iterator, the block from query 
engine only contain the b column,
+    // so no need to filter a column by expr.
+    for (auto it = _columns_to_filter.begin(); it != 
_columns_to_filter.end();) {
+        if (*it >= block->columns()) {
+            it = _columns_to_filter.erase(it);
+        } else {
+            ++it;
+        }
+    }
+
+    _lazy_inited = true;
     return Status::OK();
 }
 
@@ -1815,6 +1876,10 @@ bool SegmentIterator::_has_char_type(const Field& 
column_desc) {
 };
 
 void SegmentIterator::_vec_init_char_column_id(vectorized::Block* block) {
+    if (!_char_type_idx.empty()) {
+        return;
+    }
+    _is_char_type.resize(_schema->columns().size(), false);
     for (size_t i = 0; i < _schema->num_column_ids(); i++) {
         auto cid = _schema->column_id(i);
         const Field* column_desc = _schema->column(cid);
@@ -1824,9 +1889,6 @@ void 
SegmentIterator::_vec_init_char_column_id(vectorized::Block* block) {
         if (i < block->columns()) {
             if (_has_char_type(*column_desc)) {
                 _char_type_idx.emplace_back(i);
-                if (i != 0) {
-                    _char_type_idx_no_0.emplace_back(i);
-                }
             }
             if (column_desc->type() == FieldType::OLAP_FIELD_TYPE_CHAR) {
                 _is_char_type[cid] = true;
@@ -1920,12 +1982,13 @@ Status SegmentIterator::_init_current_block(
     return Status::OK();
 }
 
-void SegmentIterator::_output_non_pred_columns(vectorized::Block* block) {
+Status SegmentIterator::_output_non_pred_columns(vectorized::Block* block) {
     SCOPED_RAW_TIMER(&_opts.stats->output_col_ns);
     VLOG_DEBUG << fmt::format(
             "Output non-predicate columns, _non_predicate_columns: [{}], "
             "_schema_block_id_map: [{}]",
             fmt::join(_non_predicate_columns, ","), 
fmt::join(_schema_block_id_map, ","));
+    RETURN_IF_ERROR(_convert_to_expected_type(_non_predicate_columns));
     for (auto cid : _non_predicate_columns) {
         auto loc = _schema_block_id_map[cid];
         // if loc > block->columns() means the column is delete column and 
should
@@ -1954,6 +2017,7 @@ void 
SegmentIterator::_output_non_pred_columns(vectorized::Block* block) {
             // so do nothing here.
         }
     }
+    return Status::OK();
 }
 
 /**
@@ -1977,10 +2041,10 @@ void 
SegmentIterator::_output_non_pred_columns(vectorized::Block* block) {
  * This approach optimizes reading performance by leveraging batch processing 
for continuous
  * rowid sequences and handling discontinuities gracefully in smaller chunks.
  */
-Status SegmentIterator::_read_columns_by_index(uint32_t nrows_read_limit, 
uint32_t& nrows_read) {
+Status SegmentIterator::_read_columns_by_index(uint32_t nrows_read_limit, 
uint16_t& nrows_read) {
     SCOPED_RAW_TIMER(&_opts.stats->predicate_column_read_ns);
 
-    nrows_read = _range_iter->read_batch_rowids(_block_rowids.data(), 
nrows_read_limit);
+    nrows_read = 
(uint16_t)_range_iter->read_batch_rowids(_block_rowids.data(), 
nrows_read_limit);
     bool is_continuous = (nrows_read > 1) &&
                          (_block_rowids[nrows_read - 1] - _block_rowids[0] == 
nrows_read - 1);
     VLOG_DEBUG << fmt::format(
@@ -2066,8 +2130,8 @@ Status SegmentIterator::_read_columns_by_index(uint32_t 
nrows_read_limit, uint32
 
     return Status::OK();
 }
-
-void SegmentIterator::_replace_version_col(size_t num_rows) {
+void SegmentIterator::_replace_version_col_if_needed(const 
std::vector<ColumnId>& column_ids,
+                                                     size_t num_rows) {
     // Only the rowset with single version need to replace the version column.
     // Doris can't determine the version before publish_version finished, so
     // we can't write data to __DORIS_VERSION_COL__ in segment writer, the 
value
@@ -2076,10 +2140,8 @@ void SegmentIterator::_replace_version_col(size_t 
num_rows) {
     if (_opts.version.first != _opts.version.second) {
         return;
     }
-    auto cids = _schema->column_ids();
     int32_t version_idx = _schema->version_col_idx();
-    auto iter = std::find(cids.begin(), cids.end(), version_idx);
-    if (iter == cids.end()) {
+    if (std::ranges::find(column_ids, version_idx) == column_ids.end()) {
         return;
     }
 
@@ -2310,7 +2372,7 @@ Status SegmentIterator::_convert_to_expected_type(const 
std::vector<ColumnId>& c
             RETURN_IF_ERROR(vectorized::schema_util::cast_column({original, 
file_column_type, ""},
                                                                  
expected_type, &expected));
             _current_return_columns[i] = expected->assume_mutable();
-            _converted_column_ids[i] = 1;
+            _converted_column_ids[i] = true;
             VLOG_DEBUG << fmt::format(
                     "Convert {} fom file column type {} to {}, num_rows {}",
                     field_type->path() == nullptr ? "" : 
field_type->path()->get_path(),
@@ -2335,88 +2397,20 @@ Status 
SegmentIterator::copy_column_data_by_selector(vectorized::IColumn* input_
     return input_col_ptr->filter_by_selector(sel_rowid_idx, select_size, 
output_col.get());
 }
 
-void SegmentIterator::_clear_iterators() {
-    _column_iterators.clear();
-    _bitmap_index_iterators.clear();
-    _index_iterators.clear();
-}
-
 Status SegmentIterator::_next_batch_internal(vectorized::Block* block) {
     bool is_mem_reuse = block->mem_reuse();
     DCHECK(is_mem_reuse);
 
-    SCOPED_RAW_TIMER(&_opts.stats->block_load_ns);
-    if (UNLIKELY(!_lazy_inited)) {
-        RETURN_IF_ERROR(_lazy_init());
-        _lazy_inited = true;
-        // If the row bitmap size is smaller than block_row_max, there's no 
need to reserve that many column rows.
-        auto nrows_reserve_limit =
-                std::min(_row_bitmap.cardinality(), 
uint64_t(_opts.block_row_max));
-        if (_lazy_materialization_read || _opts.record_rowids || 
_is_need_expr_eval) {
-            _block_rowids.resize(_opts.block_row_max);
-        }
-        _current_return_columns.resize(_schema->columns().size());
-        _converted_column_ids.resize(_schema->columns().size(), 0);
-        if (_char_type_idx.empty() && _char_type_idx_no_0.empty()) {
-            _is_char_type.resize(_schema->columns().size(), false);
-            _vec_init_char_column_id(block);
-        }
-        for (size_t i = 0; i < _schema->column_ids().size(); i++) {
-            ColumnId cid = _schema->column_ids()[i];
-            auto column_desc = _schema->column(cid);
-            if (_is_pred_column[cid]) {
-                auto storage_column_type = _storage_name_and_type[cid].second;
-                // Char type is special , since char type's computational 
datatype is same with string,
-                // both are DataTypeString, but DataTypeString only return 
FieldType::OLAP_FIELD_TYPE_STRING
-                // in get_storage_field_type.
-                RETURN_IF_CATCH_EXCEPTION(
-                        // Here, cid will not go out of bounds
-                        // because the size of _current_return_columns equals 
_schema->tablet_columns().size()
-                        _current_return_columns[cid] = 
Schema::get_predicate_column_ptr(
-                                _is_char_type[cid] ? 
FieldType::OLAP_FIELD_TYPE_CHAR
-                                                   : 
storage_column_type->get_storage_field_type(),
-                                storage_column_type->is_nullable(), 
_opts.io_ctx.reader_type));
-                _current_return_columns[cid]->set_rowset_segment_id(
-                        {_segment->rowset_id(), _segment->id()});
-                _current_return_columns[cid]->reserve(nrows_reserve_limit);
-            } else if (i >= block->columns()) {
-                // This column needs to be scanned, but doesn't need to be 
returned upward. (delete sign)
-                // if i >= block->columns means the column and not the 
pred_column means `column i` is
-                // a delete condition column. but the column is not effective 
in the segment. so we just
-                // create a column to hold the data.
-                // a. origin data -> b. delete condition -> c. new load data
-                // the segment of c do not effective delete condition, but it 
still need read the column
-                // to match the schema.
-                // TODO: skip read the not effective delete column to speed up 
segment read.
-                _current_return_columns[cid] =
-                        
Schema::get_data_type_ptr(*column_desc)->create_column();
-                _current_return_columns[cid]->reserve(nrows_reserve_limit);
-            }
-        }
+    RETURN_IF_ERROR(_lazy_init(block));
 
-        // Additional deleted filter condition will be materialized column be 
at the end of the block,
-        // after _output_column_by_sel_idx  will be erase, we not need to 
filter it,
-        // so erase it from _columns_to_filter in the first next_batch.
-        // Eg:
-        //      `delete from table where a = 10;`
-        //      `select b from table;`
-        // a column only effective in segment iterator, the block from query 
engine only contain the b column,
-        // so no need to filter a column by expr.
-        for (auto it = _columns_to_filter.begin(); it != 
_columns_to_filter.end();) {
-            if (*it >= block->columns()) {
-                it = _columns_to_filter.erase(it);
-            } else {
-                ++it;
-            }
-        }
-    }
+    SCOPED_RAW_TIMER(&_opts.stats->block_load_ns);
 
-    uint32_t nrows_read_limit = _opts.block_row_max;
+    // If the row bitmap size is smaller than nrows_read_limit, there's no 
need to reserve that many column rows.
+    uint32_t nrows_read_limit =
+            std::min(cast_set<uint32_t>(_row_bitmap.cardinality()), 
_opts.block_row_max);
     if (_can_opt_topn_reads()) {
         nrows_read_limit = std::min(static_cast<uint32_t>(_opts.topn_limit), 
nrows_read_limit);
     }
-    // If the row bitmap size is smaller than nrows_read_limit, there's no 
need to reserve that many column rows.
-    nrows_read_limit = std::min(cast_set<uint32_t>(_row_bitmap.cardinality()), 
nrows_read_limit);
     DBUG_EXECUTE_IF("segment_iterator.topn_opt_1", {
         if (nrows_read_limit != 1) {
             LOG(ERROR) << "nrows_read_limit: " << nrows_read_limit
@@ -2428,69 +2422,44 @@ Status 
SegmentIterator::_next_batch_internal(vectorized::Block* block) {
     })
 
     RETURN_IF_ERROR(_init_current_block(block, _current_return_columns, 
nrows_read_limit));
-    _converted_column_ids.assign(_schema->columns().size(), 0);
+    _converted_column_ids.assign(_schema->columns().size(), false);
 
-    _current_batch_rows_read = 0;
-    RETURN_IF_ERROR(_read_columns_by_index(nrows_read_limit, 
_current_batch_rows_read));
-    if (std::find(_predicate_column_ids.begin(), _predicate_column_ids.end(),
-                  _schema->version_col_idx()) != _predicate_column_ids.end()) {
-        _replace_version_col(_current_batch_rows_read);
-    }
+    _selected_size = 0;
+    RETURN_IF_ERROR(_read_columns_by_index(nrows_read_limit, _selected_size));
+    _replace_version_col_if_needed(_predicate_column_ids, _selected_size);
 
     _opts.stats->blocks_load += 1;
-    _opts.stats->raw_rows_read += _current_batch_rows_read;
+    _opts.stats->raw_rows_read += _selected_size;
 
-    if (_current_batch_rows_read == 0) {
-        // Convert all columns in _current_return_columns to schema column
-        RETURN_IF_ERROR(_convert_to_expected_type(_schema->column_ids()));
-        for (int i = 0; i < block->columns(); i++) {
-            auto cid = _schema->column_id(i);
-            // todo(wb) abstract make column where
-            if (!_is_pred_column[cid]) {
-                block->replace_by_position(i, 
std::move(_current_return_columns[cid]));
-            }
-        }
-        block->clear_column_data();
-        // clear and release iterators memory footprint in advance
-        _clear_iterators();
-        return Status::EndOfFile("no more data in segment");
+    if (_selected_size == 0) {
+        return _process_eof(block);
     }
 
-    if (!_is_need_vec_eval && !_is_need_short_eval && !_is_need_expr_eval) {
-        if (_non_predicate_columns.empty()) {
-            return Status::InternalError("_non_predicate_columns is empty");
-        }
-        RETURN_IF_ERROR(_convert_to_expected_type(_predicate_column_ids));
-        RETURN_IF_ERROR(_convert_to_expected_type(_non_predicate_columns));
-        VLOG_DEBUG << fmt::format(
-                "No need to evaluate any predicates or filter block rows {}, "
-                "_current_batch_rows_read {}",
-                block->rows(), _current_batch_rows_read);
-        _output_non_pred_columns(block);
-    } else {
-        uint16_t selected_size = cast_set<uint16_t>(_current_batch_rows_read);
-        _sel_rowid_idx.resize(selected_size);
+    if (_is_need_vec_eval || _is_need_short_eval || _is_need_expr_eval) {
+        _sel_rowid_idx.resize(_selected_size);
 
         if (_is_need_vec_eval || _is_need_short_eval) {
             _convert_dict_code_for_predicate_if_necessary();
 
             // step 1: evaluate vectorization predicate
-            selected_size = 
_evaluate_vectorization_predicate(_sel_rowid_idx.data(), selected_size);
+            _selected_size =
+                    _evaluate_vectorization_predicate(_sel_rowid_idx.data(), 
_selected_size);
 
             // step 2: evaluate short circuit predicate
             // todo(wb) research whether need to read short predicate after 
vectorization evaluation
             //          to reduce cost of read short circuit columns.
             //          In SSB test, it make no difference; So need more 
scenarios to test
-            selected_size = 
_evaluate_short_circuit_predicate(_sel_rowid_idx.data(), selected_size);
+            _selected_size =
+                    _evaluate_short_circuit_predicate(_sel_rowid_idx.data(), 
_selected_size);
             VLOG_DEBUG << fmt::format("After evaluate predicates, selected 
size: {} ",
-                                      selected_size);
-            if (selected_size > 0) {
+                                      _selected_size);
+            if (_selected_size > 0) {
                 // step 3.1: output short circuit and predicate column
                 // when lazy materialization enables, _predicate_column_ids = 
distinct(_short_cir_pred_column_ids + _vec_pred_column_ids)
                 // see _vec_init_lazy_materialization
                 // todo(wb) need to tell input columnids from output columnids
                 RETURN_IF_ERROR(_output_column_by_sel_idx(block, 
_predicate_column_ids,
-                                                          
_sel_rowid_idx.data(), selected_size));
+                                                          
_sel_rowid_idx.data(), _selected_size));
 
                 // step 3.2: read remaining expr column and evaluate it.
                 if (_is_need_expr_eval) {
@@ -2499,147 +2468,76 @@ Status 
SegmentIterator::_next_batch_internal(vectorized::Block* block) {
                         SCOPED_RAW_TIMER(&_opts.stats->non_predicate_read_ns);
                         RETURN_IF_ERROR(_read_columns_by_rowids(
                                 _non_predicate_column_ids, _block_rowids, 
_sel_rowid_idx.data(),
-                                selected_size, &_current_return_columns));
-                        if (std::find(_non_predicate_column_ids.begin(),
-                                      _non_predicate_column_ids.end(),
-                                      _schema->version_col_idx()) !=
-                            _non_predicate_column_ids.end()) {
-                            _replace_version_col(selected_size);
-                        }
-                        
RETURN_IF_ERROR(_convert_to_expected_type(_non_predicate_column_ids));
-                        for (auto cid : _non_predicate_column_ids) {
-                            auto loc = _schema_block_id_map[cid];
-                            block->replace_by_position(loc,
-                                                       
std::move(_current_return_columns[cid]));
-                        }
+                                _selected_size, &_current_return_columns));
+                        
_replace_version_col_if_needed(_non_predicate_column_ids, _selected_size);
+                        
RETURN_IF_ERROR(_process_columns(_non_predicate_column_ids, block));
                     }
 
                     DCHECK(block->columns() > 
_schema_block_id_map[*_common_expr_columns.begin()]);
-                    // block->rows() takes the size of the first column by 
default.
-                    // If the first column is not predicate column,
-                    // it has not been read yet. add a const column that has 
been read to calculate rows().
-                    if (block->rows() == 0) {
-                        vectorized::MutableColumnPtr col0 =
-                                
std::move(*block->get_by_position(0).column).mutate();
-                        auto tmp_indicator_col =
-                                block->get_by_position(0)
-                                        
.type->create_column_const_with_default_value(
-                                                selected_size);
-                        block->replace_by_position(0, 
std::move(tmp_indicator_col));
-                        
_output_index_result_column_for_expr(_sel_rowid_idx.data(), selected_size,
-                                                             block);
-                        
block->shrink_char_type_column_suffix_zero(_char_type_idx_no_0);
-                        RETURN_IF_ERROR(
-                                _execute_common_expr(_sel_rowid_idx.data(), 
selected_size, block));
-                        block->replace_by_position(0, std::move(col0));
-                    } else {
-                        
_output_index_result_column_for_expr(_sel_rowid_idx.data(), selected_size,
-                                                             block);
-                        
block->shrink_char_type_column_suffix_zero(_char_type_idx);
-                        RETURN_IF_ERROR(
-                                _execute_common_expr(_sel_rowid_idx.data(), 
selected_size, block));
-                    }
+                    RETURN_IF_ERROR(
+                            _process_common_expr(_sel_rowid_idx.data(), 
_selected_size, block));
                 }
             } else {
-                // If column_predicate filters out all rows, the corresponding 
column in _current_return_columns[cid] must be a ColumnNothing.
-                // Because:
-                // 1. Before each batch, _init_return_columns is called to 
initialize _current_return_columns, and virtual columns in 
_current_return_columns are initialized as ColumnNothing.
-                // 2. When select_size == 0, the read method of 
VirtualColumnIterator will definitely not be called, so the corresponding 
Column remains a ColumnNothing
-                for (const auto pair : _vir_cid_to_idx_in_block) {
-                    auto cid = pair.first;
-                    auto pos = pair.second;
-                    const vectorized::ColumnNothing* nothing_col =
-                            
vectorized::check_and_get_column<vectorized::ColumnNothing>(
-                                    _current_return_columns[cid].get());
-                    DCHECK(nothing_col != nullptr)
-                            << fmt::format("ColumnNothing expected, but got 
{}, cid: {}, pos: {}",
-                                           
_current_return_columns[cid]->get_name(), cid, pos);
-                    _current_return_columns[cid] = 
_opts.vir_col_idx_to_type[pos]->create_column();
-                }
-
+                _fill_column_nothing();
                 if (_is_need_expr_eval) {
-                    // rows of this batch are all filtered by column 
predicates.
-                    
RETURN_IF_ERROR(_convert_to_expected_type(_non_predicate_column_ids));
-
-                    for (auto cid : _non_predicate_column_ids) {
-                        auto loc = _schema_block_id_map[cid];
-                        block->replace_by_position(loc, 
std::move(_current_return_columns[cid]));
-                    }
+                    
RETURN_IF_ERROR(_process_columns(_non_predicate_column_ids, block));
                 }
             }
         } else if (_is_need_expr_eval) {
             DCHECK(!_predicate_column_ids.empty());
-            RETURN_IF_ERROR(_convert_to_expected_type(_predicate_column_ids));
+            RETURN_IF_ERROR(_process_columns(_predicate_column_ids, block));
             // first read all rows are insert block, initialize sel_rowid_idx 
to all rows.
-            for (auto cid : _predicate_column_ids) {
-                auto loc = _schema_block_id_map[cid];
-                block->replace_by_position(loc, 
std::move(_current_return_columns[cid]));
-            }
-            for (uint16_t i = 0; i < selected_size; ++i) {
+            for (uint16_t i = 0; i < _selected_size; ++i) {
                 _sel_rowid_idx[i] = i;
             }
-
-            // Here we just use col0 as row_number indicator. when reach here, 
we will calculate the predicates first.
-            //  then use the result to reduce our data read(that is, expr push 
down). there's now row in block means the first
-            //  column is not in common expr. so it's safe to replace it 
temporarily to provide correct `selected_size`.
-            VLOG_DEBUG << fmt::format("Execute common expr. block rows {}, 
selected size {}",
-                                      block->rows(), selected_size);
-            if (block->rows() == 0) {
-                vectorized::MutableColumnPtr col0 =
-                        std::move(*block->get_by_position(0).column).mutate();
-                // temporary replace the column with a row number indicator. 
using a ColumnConst is more efficient than
-                //  insert_many_default
-                auto tmp_indicator_col =
-                        
block->get_by_position(0).type->create_column_const_with_default_value(
-                                selected_size);
-                block->replace_by_position(0, std::move(tmp_indicator_col));
-
-                _output_index_result_column_for_expr(_sel_rowid_idx.data(), 
selected_size, block);
-                
block->shrink_char_type_column_suffix_zero(_char_type_idx_no_0);
-                RETURN_IF_ERROR(_execute_common_expr(_sel_rowid_idx.data(), 
selected_size, block));
-                // now recover the origin col0
-                block->replace_by_position(0, std::move(col0));
-            } else {
-                _output_index_result_column_for_expr(_sel_rowid_idx.data(), 
selected_size, block);
-                block->shrink_char_type_column_suffix_zero(_char_type_idx);
-                RETURN_IF_ERROR(_execute_common_expr(_sel_rowid_idx.data(), 
selected_size, block));
-            }
-            VLOG_DEBUG << fmt::format("Execute common expr end. block rows {}, 
selected size {}",
-                                      block->rows(), selected_size);
+            RETURN_IF_ERROR(_process_common_expr(_sel_rowid_idx.data(), 
_selected_size, block));
         }
 
-        if (UNLIKELY(_opts.record_rowids)) {
-            _sel_rowid_idx.resize(selected_size);
-            _selected_size = selected_size;
-        }
-
-        if (_non_predicate_columns.empty()) {
-            // shrink char_type suffix zero data
-            block->shrink_char_type_column_suffix_zero(_char_type_idx);
-
-            return Status::OK();
-        }
         // step4: read non_predicate column
-        if (selected_size > 0) {
+        if (_selected_size > 0 && !_non_predicate_columns.empty()) {
             RETURN_IF_ERROR(_read_columns_by_rowids(_non_predicate_columns, 
_block_rowids,
-                                                    _sel_rowid_idx.data(), 
selected_size,
+                                                    _sel_rowid_idx.data(), 
_selected_size,
                                                     &_current_return_columns));
-            if (std::find(_non_predicate_columns.begin(), 
_non_predicate_columns.end(),
-                          _schema->version_col_idx()) != 
_non_predicate_columns.end()) {
-                _replace_version_col(selected_size);
-            }
+            _replace_version_col_if_needed(_non_predicate_columns, 
_selected_size);
         }
-
-        RETURN_IF_ERROR(_convert_to_expected_type(_non_predicate_columns));
-        // step5: output columns
-        _output_non_pred_columns(block);
     }
 
+    // step5: output columns
+    RETURN_IF_ERROR(_output_non_pred_columns(block));
     RETURN_IF_ERROR(_materialization_of_virtual_column(block));
-
     // shrink char_type suffix zero data
     block->shrink_char_type_column_suffix_zero(_char_type_idx);
+    return _check_output_block(block);
+}
 
+Status SegmentIterator::_process_columns(const std::vector<ColumnId>& 
column_ids,
+                                         vectorized::Block* block) {
+    RETURN_IF_ERROR(_convert_to_expected_type(column_ids));
+    for (auto cid : column_ids) {
+        auto loc = _schema_block_id_map[cid];
+        block->replace_by_position(loc, 
std::move(_current_return_columns[cid]));
+    }
+    return Status::OK();
+}
+
+void SegmentIterator::_fill_column_nothing() {
+    // If column_predicate filters out all rows, the corresponding column in 
_current_return_columns[cid] must be a ColumnNothing.
+    // Because:
+    // 1. Before each batch, _init_return_columns is called to initialize 
_current_return_columns, and virtual columns in _current_return_columns are 
initialized as ColumnNothing.
+    // 2. When select_size == 0, the read method of VirtualColumnIterator will 
definitely not be called, so the corresponding Column remains a ColumnNothing
+    for (const auto pair : _vir_cid_to_idx_in_block) {
+        auto cid = pair.first;
+        auto pos = pair.second;
+        const auto* nothing_col = 
vectorized::check_and_get_column<vectorized::ColumnNothing>(
+                _current_return_columns[cid].get());
+        DCHECK(nothing_col != nullptr)
+                << fmt::format("ColumnNothing expected, but got {}, cid: {}, 
pos: {}",
+                               _current_return_columns[cid]->get_name(), cid, 
pos);
+        _current_return_columns[cid] = 
_opts.vir_col_idx_to_type[pos]->create_column();
+    }
+}
+
+Status SegmentIterator::_check_output_block(vectorized::Block* block) {
 #ifndef NDEBUG
     size_t rows = block->rows();
     size_t idx = 0;
@@ -2657,17 +2555,16 @@ Status 
SegmentIterator::_next_batch_internal(vectorized::Block* block) {
             }
             std::string vir_cid_to_idx_in_block_msg =
                     fmt::format("_vir_cid_to_idx_in_block:[{}]", 
fmt::join(vcid_to_idx, ","));
-            throw doris::Exception(
-                    ErrorCode::INTERNAL_ERROR,
+            return Status::InternalError(
                     "Column in idx {} is nothing, block columns {}, 
normal_columns {}, "
                     "vir_cid_to_idx_in_block_msg {}",
                     idx, block->columns(), _schema->num_column_ids(), 
vir_cid_to_idx_in_block_msg);
         } else if (entry.column->size() != rows) {
-            throw doris::Exception(
-                    ErrorCode::INTERNAL_ERROR,
-                    "Unmatched size {}, expected {}, column: {}, type: {}, 
idx_in_block: {}",
+            return Status::InternalError(
+                    "Unmatched size {}, expected {}, column: {}, type: {}, 
idx_in_block: {}, "
+                    "block: {}",
                     entry.column->size(), rows, entry.column->get_name(), 
entry.type->get_name(),
-                    idx);
+                    idx, block->dump_structure());
         }
         idx++;
     }
@@ -2675,6 +2572,57 @@ Status 
SegmentIterator::_next_batch_internal(vectorized::Block* block) {
     return Status::OK();
 }
 
+Status SegmentIterator::_process_column_predicate() {
+    return Status::OK();
+}
+
+Status SegmentIterator::_process_eof(vectorized::Block* block) {
+    // Convert all columns in _current_return_columns to schema column
+    RETURN_IF_ERROR(_convert_to_expected_type(_schema->column_ids()));
+    for (int i = 0; i < block->columns(); i++) {
+        auto cid = _schema->column_id(i);
+        if (!_is_pred_column[cid]) {
+            block->replace_by_position(i, 
std::move(_current_return_columns[cid]));
+        }
+    }
+    block->clear_column_data();
+    // clear and release iterators memory footprint in advance
+    _column_iterators.clear();
+    _bitmap_index_iterators.clear();
+    _index_iterators.clear();
+    return Status::EndOfFile("no more data in segment");
+}
+
+Status SegmentIterator::_process_common_expr(uint16_t* sel_rowid_idx, 
uint16_t& selected_size,
+                                             vectorized::Block* block) {
+    // Here we just use col0 as row_number indicator. when reach here, we will 
calculate the predicates first.
+    //  then use the result to reduce our data read(that is, expr push down). 
there's now row in block means the first
+    //  column is not in common expr. so it's safe to replace it temporarily 
to provide correct `selected_size`.
+    VLOG_DEBUG << fmt::format("Execute common expr. block rows {}, selected 
size {}", block->rows(),
+                              _selected_size);
+
+    bool need_mock_col = block->rows() != selected_size;
+    vectorized::MutableColumnPtr col0;
+    if (need_mock_col) {
+        col0 = std::move(*block->get_by_position(0).column).mutate();
+        block->replace_by_position(
+                0, 
block->get_by_position(0).type->create_column_const_with_default_value(
+                           _selected_size));
+    }
+
+    _output_index_result_column_for_expr(_sel_rowid_idx.data(), 
_selected_size, block);
+    block->shrink_char_type_column_suffix_zero(_char_type_idx);
+    RETURN_IF_ERROR(_execute_common_expr(_sel_rowid_idx.data(), 
_selected_size, block));
+
+    if (need_mock_col) {
+        block->replace_by_position(0, std::move(col0));
+    }
+
+    VLOG_DEBUG << fmt::format("Execute common expr end. block rows {}, 
selected size {}",
+                              block->rows(), _selected_size);
+    return Status::OK();
+}
+
 Status SegmentIterator::_execute_common_expr(uint16_t* sel_rowid_idx, 
uint16_t& selected_size,
                                              vectorized::Block* block) {
     SCOPED_RAW_TIMER(&_opts.stats->expr_filter_ns);
@@ -2749,23 +2697,15 @@ void 
SegmentIterator::_output_index_result_column_for_expr(uint16_t* sel_rowid_i
             const auto& index_result_bitmap =
                     
inverted_index_result_bitmap_for_expr.second.get_data_bitmap();
             auto index_result_column = vectorized::ColumnUInt8::create();
+
             vectorized::ColumnUInt8::Container& vec_match_pred = 
index_result_column->get_data();
             vec_match_pred.resize(block->rows());
-            size_t idx_in_selected = 0;
             roaring::BulkContext bulk_context;
-
-            for (uint32_t i = 0; i < _current_batch_rows_read; i++) {
-                auto rowid = _block_rowids[i];
-                if (sel_rowid_idx == nullptr ||
-                    (idx_in_selected < select_size && i == 
sel_rowid_idx[idx_in_selected])) {
-                    if (index_result_bitmap->containsBulk(bulk_context, 
rowid)) {
-                        vec_match_pred[idx_in_selected] = true;
-                    } else {
-                        vec_match_pred[idx_in_selected] = false;
-                    }
-                    idx_in_selected++;
-                }
+            for (uint32_t i = 0; i < select_size; i++) {
+                auto rowid = sel_rowid_idx ? _block_rowids[sel_rowid_idx[i]] : 
_block_rowids[i];
+                vec_match_pred[i] = 
index_result_bitmap->containsBulk(bulk_context, rowid);
             }
+
             DCHECK(block->rows() == vec_match_pred.size());
             
expr_ctx->get_inverted_index_context()->set_inverted_index_result_column_for_expr(
                     expr, std::move(index_result_column));
@@ -2805,15 +2745,14 @@ void 
SegmentIterator::_convert_dict_code_for_predicate_if_necessary_impl(
 
 Status SegmentIterator::current_block_row_locations(std::vector<RowLocation>* 
block_row_locations) {
     DCHECK(_opts.record_rowids);
-    DCHECK_GE(_block_rowids.size(), _current_batch_rows_read);
+    DCHECK_GE(_block_rowids.size(), _selected_size);
+    block_row_locations->resize(_selected_size);
     uint32_t sid = segment_id();
     if (!_is_need_vec_eval && !_is_need_short_eval && !_is_need_expr_eval) {
-        block_row_locations->resize(_current_batch_rows_read);
-        for (auto i = 0; i < _current_batch_rows_read; i++) {
+        for (auto i = 0; i < _selected_size; i++) {
             (*block_row_locations)[i] = RowLocation(sid, _block_rowids[i]);
         }
     } else {
-        block_row_locations->resize(_selected_size);
         for (auto i = 0; i < _selected_size; i++) {
             (*block_row_locations)[i] = RowLocation(sid, 
_block_rowids[_sel_rowid_idx[i]]);
         }
diff --git a/be/src/olap/rowset/segment_v2/segment_iterator.h 
b/be/src/olap/rowset/segment_v2/segment_iterator.h
index c83d6a4a866..7a82ed2f716 100644
--- a/be/src/olap/rowset/segment_v2/segment_iterator.h
+++ b/be/src/olap/rowset/segment_v2/segment_iterator.h
@@ -130,7 +130,6 @@ public:
     const Schema& schema() const override { return *_schema; }
     Segment& segment() { return *_segment; }
     StorageReadOptions& storage_read_options() { return _opts; }
-    bool is_lazy_materialization_read() const override { return 
_lazy_materialization_read; }
     uint64_t data_id() const override { return _segment->id(); }
     RowsetId rowset_id() const { return _segment->rowset_id(); }
     int64_t tablet_id() const { return _tablet_id; }
@@ -154,6 +153,8 @@ public:
 private:
     Status _next_batch_internal(vectorized::Block* block);
 
+    Status _check_output_block(vectorized::Block* block);
+
     template <typename Container>
     void _update_profile(RuntimeProfile* profile, const Container& predicates,
                          const std::string& title) {
@@ -167,7 +168,7 @@ private:
         profile->add_info_string(title, info);
     }
 
-    [[nodiscard]] Status _lazy_init();
+    [[nodiscard]] Status _lazy_init(vectorized::Block* block);
     [[nodiscard]] Status _init_impl(const StorageReadOptions& opts);
     [[nodiscard]] Status _init_return_column_iterators();
     [[nodiscard]] Status _init_bitmap_index_iterators();
@@ -217,15 +218,15 @@ private:
     // for vectorization implementation
     [[nodiscard]] Status _read_columns(const std::vector<ColumnId>& column_ids,
                                        vectorized::MutableColumns& 
column_block, size_t nrows);
-    [[nodiscard]] Status _read_columns_by_index(uint32_t nrows_read_limit, 
uint32_t& nrows_read);
-    void _replace_version_col(size_t num_rows);
+    [[nodiscard]] Status _read_columns_by_index(uint32_t nrows_read_limit, 
uint16_t& nrows_read);
+    void _replace_version_col_if_needed(const std::vector<ColumnId>& 
column_ids, size_t num_rows);
     Status _init_current_block(vectorized::Block* block,
                                std::vector<vectorized::MutableColumnPtr>& 
non_pred_vector,
                                uint32_t nrows_read_limit);
     uint16_t _evaluate_vectorization_predicate(uint16_t* sel_rowid_idx, 
uint16_t selected_size);
     uint16_t _evaluate_short_circuit_predicate(uint16_t* sel_rowid_idx, 
uint16_t selected_size);
     void _collect_runtime_filter_predicate();
-    void _output_non_pred_columns(vectorized::Block* block);
+    Status _output_non_pred_columns(vectorized::Block* block);
     [[nodiscard]] Status _read_columns_by_rowids(std::vector<ColumnId>& 
read_column_ids,
                                                  std::vector<rowid_t>& 
rowid_vector,
                                                  uint16_t* sel_rowid_idx, 
size_t select_size,
@@ -281,6 +282,9 @@ private:
     // same with _extract_common_expr_columns, but only extract columns that 
can be used for index
     [[nodiscard]] Status _execute_common_expr(uint16_t* sel_rowid_idx, 
uint16_t& selected_size,
                                               vectorized::Block* block);
+    Status _process_common_expr(uint16_t* sel_rowid_idx, uint16_t& 
selected_size,
+                                vectorized::Block* block);
+
     uint16_t _evaluate_common_expr_filter(uint16_t* sel_rowid_idx, uint16_t 
selected_size,
                                           const vectorized::IColumn::Filter& 
filter);
 
@@ -363,7 +367,13 @@ private:
 
     void _calculate_expr_in_remaining_conjunct_root();
 
-    void _clear_iterators();
+    Status _process_eof(vectorized::Block* block);
+
+    Status _process_column_predicate();
+
+    void _fill_column_nothing();
+
+    Status _process_columns(const std::vector<ColumnId>& column_ids, 
vectorized::Block* block);
 
     // Initialize virtual columns in the block, set all virtual columns in the 
block to ColumnNothing
     void _init_virtual_columns(vectorized::Block* block);
@@ -424,7 +434,7 @@ private:
     std::vector<ColumnId> _non_predicate_column_ids;
     // TODO: Should use std::vector<size_t>
     std::vector<ColumnId> _columns_to_filter;
-    std::vector<ColumnId> _converted_column_ids;
+    std::vector<bool> _converted_column_ids;
     // TODO: Should use std::vector<size_t>
     std::vector<int> _schema_block_id_map; // map from schema column id to 
column idx in Block
 
@@ -454,11 +464,8 @@ private:
 
     // char_type or array<char> type columns cid
     std::vector<size_t> _char_type_idx;
-    std::vector<size_t> _char_type_idx_no_0;
     std::vector<bool> _is_char_type;
 
-    // number of rows read in the current batch
-    uint32_t _current_batch_rows_read = 0;
     // used for compaction, record selectd rowids of current batch
     uint16_t _selected_size;
     std::vector<uint16_t> _sel_rowid_idx;


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]


Reply via email to