This is an automated email from the ASF dual-hosted git repository.
panxiaolei pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new 8572a0a467f [Chore](segment) refactor of
SegmentIterator::_next_batch_internal (#56063)
8572a0a467f is described below
commit 8572a0a467f546ad68d6f6aafaf695fd70ce07dc
Author: Pxl <[email protected]>
AuthorDate: Thu Sep 18 11:17:18 2025 +0800
[Chore](segment) refactor of SegmentIterator::_next_batch_internal (#56063)
refactor of SegmentIterator::_next_batch_internal
---
be/src/olap/iterators.h | 5 +-
be/src/olap/rowset/segment_v2/segment_iterator.cpp | 471 +++++++++------------
be/src/olap/rowset/segment_v2/segment_iterator.h | 27 +-
3 files changed, 223 insertions(+), 280 deletions(-)
diff --git a/be/src/olap/iterators.h b/be/src/olap/iterators.h
index 69c1eae6213..f2aa423f72b 100644
--- a/be/src/olap/iterators.h
+++ b/be/src/olap/iterators.h
@@ -96,7 +96,7 @@ public:
// REQUIRED (null is not allowed)
OlapReaderStatistics* stats = nullptr;
bool use_page_cache = false;
- int block_row_max = 4096 - 32; // see
https://github.com/apache/doris/pull/11816
+ uint32_t block_row_max = 4096 - 32; // see
https://github.com/apache/doris/pull/11816
TabletSchemaSPtr tablet_schema = nullptr;
bool enable_unique_key_merge_on_write = false;
@@ -189,9 +189,6 @@ public:
// return schema for this Iterator
virtual const Schema& schema() const = 0;
- // Only used by UT. Whether lazy-materialization-read is used by this
iterator or not.
- virtual bool is_lazy_materialization_read() const { return false; }
-
// Return the data id such as segment id, used for keep the insert order
when do
// merge sort in priority queue
virtual uint64_t data_id() const { return 0; }
diff --git a/be/src/olap/rowset/segment_v2/segment_iterator.cpp
b/be/src/olap/rowset/segment_v2/segment_iterator.cpp
index 25124643c18..6d5141ab814 100644
--- a/be/src/olap/rowset/segment_v2/segment_iterator.cpp
+++ b/be/src/olap/rowset/segment_v2/segment_iterator.cpp
@@ -391,7 +391,10 @@ Status SegmentIterator::init_iterators() {
return Status::OK();
}
-Status SegmentIterator::_lazy_init() {
+Status SegmentIterator::_lazy_init(vectorized::Block* block) {
+ if (_lazy_inited) {
+ return Status::OK();
+ }
SCOPED_RAW_TIMER(&_opts.stats->block_init_ns);
DorisMetrics::instance()->segment_read_total->increment(1);
_row_bitmap.addRange(0, _segment->num_rows());
@@ -426,6 +429,64 @@ Status SegmentIterator::_lazy_init() {
} else {
_range_iter.reset(new BitmapRangeIterator(_row_bitmap));
}
+
+ // If the row bitmap size is smaller than block_row_max, there's no need
to reserve that many column rows.
+ auto nrows_reserve_limit = std::min(_row_bitmap.cardinality(),
uint64_t(_opts.block_row_max));
+ if (_lazy_materialization_read || _opts.record_rowids ||
_is_need_expr_eval) {
+ _block_rowids.resize(_opts.block_row_max);
+ }
+ _current_return_columns.resize(_schema->columns().size());
+
+ _vec_init_char_column_id(block);
+ for (size_t i = 0; i < _schema->column_ids().size(); i++) {
+ ColumnId cid = _schema->column_ids()[i];
+ const auto* column_desc = _schema->column(cid);
+ if (_is_pred_column[cid]) {
+ auto storage_column_type = _storage_name_and_type[cid].second;
+ // Char type is special , since char type's computational datatype
is same with string,
+ // both are DataTypeString, but DataTypeString only return
FieldType::OLAP_FIELD_TYPE_STRING
+ // in get_storage_field_type.
+ RETURN_IF_CATCH_EXCEPTION(
+ // Here, cid will not go out of bounds
+ // because the size of _current_return_columns equals
_schema->tablet_columns().size()
+ _current_return_columns[cid] =
Schema::get_predicate_column_ptr(
+ _is_char_type[cid] ?
FieldType::OLAP_FIELD_TYPE_CHAR
+ :
storage_column_type->get_storage_field_type(),
+ storage_column_type->is_nullable(),
_opts.io_ctx.reader_type));
+ _current_return_columns[cid]->set_rowset_segment_id(
+ {_segment->rowset_id(), _segment->id()});
+ _current_return_columns[cid]->reserve(nrows_reserve_limit);
+ } else if (i >= block->columns()) {
+ // This column needs to be scanned, but doesn't need to be
returned upward. (delete sign)
+ // if i >= block->columns means the column and not the pred_column
means `column i` is
+ // a delete condition column. but the column is not effective in
the segment. so we just
+ // create a column to hold the data.
+ // a. origin data -> b. delete condition -> c. new load data
+ // the segment of c do not effective delete condition, but it
still need read the column
+ // to match the schema.
+ // TODO: skip read the not effective delete column to speed up
segment read.
+ _current_return_columns[cid] =
Schema::get_data_type_ptr(*column_desc)->create_column();
+ _current_return_columns[cid]->reserve(nrows_reserve_limit);
+ }
+ }
+
+ // Additional deleted filter condition will be materialized column be at
the end of the block,
+ // after _output_column_by_sel_idx will be erase, we not need to filter
it,
+ // so erase it from _columns_to_filter in the first next_batch.
+ // Eg:
+ // `delete from table where a = 10;`
+ // `select b from table;`
+ // a column only effective in segment iterator, the block from query
engine only contain the b column,
+ // so no need to filter a column by expr.
+ for (auto it = _columns_to_filter.begin(); it !=
_columns_to_filter.end();) {
+ if (*it >= block->columns()) {
+ it = _columns_to_filter.erase(it);
+ } else {
+ ++it;
+ }
+ }
+
+ _lazy_inited = true;
return Status::OK();
}
@@ -1815,6 +1876,10 @@ bool SegmentIterator::_has_char_type(const Field&
column_desc) {
};
void SegmentIterator::_vec_init_char_column_id(vectorized::Block* block) {
+ if (!_char_type_idx.empty()) {
+ return;
+ }
+ _is_char_type.resize(_schema->columns().size(), false);
for (size_t i = 0; i < _schema->num_column_ids(); i++) {
auto cid = _schema->column_id(i);
const Field* column_desc = _schema->column(cid);
@@ -1824,9 +1889,6 @@ void
SegmentIterator::_vec_init_char_column_id(vectorized::Block* block) {
if (i < block->columns()) {
if (_has_char_type(*column_desc)) {
_char_type_idx.emplace_back(i);
- if (i != 0) {
- _char_type_idx_no_0.emplace_back(i);
- }
}
if (column_desc->type() == FieldType::OLAP_FIELD_TYPE_CHAR) {
_is_char_type[cid] = true;
@@ -1920,12 +1982,13 @@ Status SegmentIterator::_init_current_block(
return Status::OK();
}
-void SegmentIterator::_output_non_pred_columns(vectorized::Block* block) {
+Status SegmentIterator::_output_non_pred_columns(vectorized::Block* block) {
SCOPED_RAW_TIMER(&_opts.stats->output_col_ns);
VLOG_DEBUG << fmt::format(
"Output non-predicate columns, _non_predicate_columns: [{}], "
"_schema_block_id_map: [{}]",
fmt::join(_non_predicate_columns, ","),
fmt::join(_schema_block_id_map, ","));
+ RETURN_IF_ERROR(_convert_to_expected_type(_non_predicate_columns));
for (auto cid : _non_predicate_columns) {
auto loc = _schema_block_id_map[cid];
// if loc > block->columns() means the column is delete column and
should
@@ -1954,6 +2017,7 @@ void
SegmentIterator::_output_non_pred_columns(vectorized::Block* block) {
// so do nothing here.
}
}
+ return Status::OK();
}
/**
@@ -1977,10 +2041,10 @@ void
SegmentIterator::_output_non_pred_columns(vectorized::Block* block) {
* This approach optimizes reading performance by leveraging batch processing
for continuous
* rowid sequences and handling discontinuities gracefully in smaller chunks.
*/
-Status SegmentIterator::_read_columns_by_index(uint32_t nrows_read_limit,
uint32_t& nrows_read) {
+Status SegmentIterator::_read_columns_by_index(uint32_t nrows_read_limit,
uint16_t& nrows_read) {
SCOPED_RAW_TIMER(&_opts.stats->predicate_column_read_ns);
- nrows_read = _range_iter->read_batch_rowids(_block_rowids.data(),
nrows_read_limit);
+ nrows_read =
(uint16_t)_range_iter->read_batch_rowids(_block_rowids.data(),
nrows_read_limit);
bool is_continuous = (nrows_read > 1) &&
(_block_rowids[nrows_read - 1] - _block_rowids[0] ==
nrows_read - 1);
VLOG_DEBUG << fmt::format(
@@ -2066,8 +2130,8 @@ Status SegmentIterator::_read_columns_by_index(uint32_t
nrows_read_limit, uint32
return Status::OK();
}
-
-void SegmentIterator::_replace_version_col(size_t num_rows) {
+void SegmentIterator::_replace_version_col_if_needed(const
std::vector<ColumnId>& column_ids,
+ size_t num_rows) {
// Only the rowset with single version need to replace the version column.
// Doris can't determine the version before publish_version finished, so
// we can't write data to __DORIS_VERSION_COL__ in segment writer, the
value
@@ -2076,10 +2140,8 @@ void SegmentIterator::_replace_version_col(size_t
num_rows) {
if (_opts.version.first != _opts.version.second) {
return;
}
- auto cids = _schema->column_ids();
int32_t version_idx = _schema->version_col_idx();
- auto iter = std::find(cids.begin(), cids.end(), version_idx);
- if (iter == cids.end()) {
+ if (std::ranges::find(column_ids, version_idx) == column_ids.end()) {
return;
}
@@ -2310,7 +2372,7 @@ Status SegmentIterator::_convert_to_expected_type(const
std::vector<ColumnId>& c
RETURN_IF_ERROR(vectorized::schema_util::cast_column({original,
file_column_type, ""},
expected_type, &expected));
_current_return_columns[i] = expected->assume_mutable();
- _converted_column_ids[i] = 1;
+ _converted_column_ids[i] = true;
VLOG_DEBUG << fmt::format(
"Convert {} fom file column type {} to {}, num_rows {}",
field_type->path() == nullptr ? "" :
field_type->path()->get_path(),
@@ -2335,88 +2397,20 @@ Status
SegmentIterator::copy_column_data_by_selector(vectorized::IColumn* input_
return input_col_ptr->filter_by_selector(sel_rowid_idx, select_size,
output_col.get());
}
-void SegmentIterator::_clear_iterators() {
- _column_iterators.clear();
- _bitmap_index_iterators.clear();
- _index_iterators.clear();
-}
-
Status SegmentIterator::_next_batch_internal(vectorized::Block* block) {
bool is_mem_reuse = block->mem_reuse();
DCHECK(is_mem_reuse);
- SCOPED_RAW_TIMER(&_opts.stats->block_load_ns);
- if (UNLIKELY(!_lazy_inited)) {
- RETURN_IF_ERROR(_lazy_init());
- _lazy_inited = true;
- // If the row bitmap size is smaller than block_row_max, there's no
need to reserve that many column rows.
- auto nrows_reserve_limit =
- std::min(_row_bitmap.cardinality(),
uint64_t(_opts.block_row_max));
- if (_lazy_materialization_read || _opts.record_rowids ||
_is_need_expr_eval) {
- _block_rowids.resize(_opts.block_row_max);
- }
- _current_return_columns.resize(_schema->columns().size());
- _converted_column_ids.resize(_schema->columns().size(), 0);
- if (_char_type_idx.empty() && _char_type_idx_no_0.empty()) {
- _is_char_type.resize(_schema->columns().size(), false);
- _vec_init_char_column_id(block);
- }
- for (size_t i = 0; i < _schema->column_ids().size(); i++) {
- ColumnId cid = _schema->column_ids()[i];
- auto column_desc = _schema->column(cid);
- if (_is_pred_column[cid]) {
- auto storage_column_type = _storage_name_and_type[cid].second;
- // Char type is special , since char type's computational
datatype is same with string,
- // both are DataTypeString, but DataTypeString only return
FieldType::OLAP_FIELD_TYPE_STRING
- // in get_storage_field_type.
- RETURN_IF_CATCH_EXCEPTION(
- // Here, cid will not go out of bounds
- // because the size of _current_return_columns equals
_schema->tablet_columns().size()
- _current_return_columns[cid] =
Schema::get_predicate_column_ptr(
- _is_char_type[cid] ?
FieldType::OLAP_FIELD_TYPE_CHAR
- :
storage_column_type->get_storage_field_type(),
- storage_column_type->is_nullable(),
_opts.io_ctx.reader_type));
- _current_return_columns[cid]->set_rowset_segment_id(
- {_segment->rowset_id(), _segment->id()});
- _current_return_columns[cid]->reserve(nrows_reserve_limit);
- } else if (i >= block->columns()) {
- // This column needs to be scanned, but doesn't need to be
returned upward. (delete sign)
- // if i >= block->columns means the column and not the
pred_column means `column i` is
- // a delete condition column. but the column is not effective
in the segment. so we just
- // create a column to hold the data.
- // a. origin data -> b. delete condition -> c. new load data
- // the segment of c do not effective delete condition, but it
still need read the column
- // to match the schema.
- // TODO: skip read the not effective delete column to speed up
segment read.
- _current_return_columns[cid] =
-
Schema::get_data_type_ptr(*column_desc)->create_column();
- _current_return_columns[cid]->reserve(nrows_reserve_limit);
- }
- }
+ RETURN_IF_ERROR(_lazy_init(block));
- // Additional deleted filter condition will be materialized column be
at the end of the block,
- // after _output_column_by_sel_idx will be erase, we not need to
filter it,
- // so erase it from _columns_to_filter in the first next_batch.
- // Eg:
- // `delete from table where a = 10;`
- // `select b from table;`
- // a column only effective in segment iterator, the block from query
engine only contain the b column,
- // so no need to filter a column by expr.
- for (auto it = _columns_to_filter.begin(); it !=
_columns_to_filter.end();) {
- if (*it >= block->columns()) {
- it = _columns_to_filter.erase(it);
- } else {
- ++it;
- }
- }
- }
+ SCOPED_RAW_TIMER(&_opts.stats->block_load_ns);
- uint32_t nrows_read_limit = _opts.block_row_max;
+ // If the row bitmap size is smaller than nrows_read_limit, there's no
need to reserve that many column rows.
+ uint32_t nrows_read_limit =
+ std::min(cast_set<uint32_t>(_row_bitmap.cardinality()),
_opts.block_row_max);
if (_can_opt_topn_reads()) {
nrows_read_limit = std::min(static_cast<uint32_t>(_opts.topn_limit),
nrows_read_limit);
}
- // If the row bitmap size is smaller than nrows_read_limit, there's no
need to reserve that many column rows.
- nrows_read_limit = std::min(cast_set<uint32_t>(_row_bitmap.cardinality()),
nrows_read_limit);
DBUG_EXECUTE_IF("segment_iterator.topn_opt_1", {
if (nrows_read_limit != 1) {
LOG(ERROR) << "nrows_read_limit: " << nrows_read_limit
@@ -2428,69 +2422,44 @@ Status
SegmentIterator::_next_batch_internal(vectorized::Block* block) {
})
RETURN_IF_ERROR(_init_current_block(block, _current_return_columns,
nrows_read_limit));
- _converted_column_ids.assign(_schema->columns().size(), 0);
+ _converted_column_ids.assign(_schema->columns().size(), false);
- _current_batch_rows_read = 0;
- RETURN_IF_ERROR(_read_columns_by_index(nrows_read_limit,
_current_batch_rows_read));
- if (std::find(_predicate_column_ids.begin(), _predicate_column_ids.end(),
- _schema->version_col_idx()) != _predicate_column_ids.end()) {
- _replace_version_col(_current_batch_rows_read);
- }
+ _selected_size = 0;
+ RETURN_IF_ERROR(_read_columns_by_index(nrows_read_limit, _selected_size));
+ _replace_version_col_if_needed(_predicate_column_ids, _selected_size);
_opts.stats->blocks_load += 1;
- _opts.stats->raw_rows_read += _current_batch_rows_read;
+ _opts.stats->raw_rows_read += _selected_size;
- if (_current_batch_rows_read == 0) {
- // Convert all columns in _current_return_columns to schema column
- RETURN_IF_ERROR(_convert_to_expected_type(_schema->column_ids()));
- for (int i = 0; i < block->columns(); i++) {
- auto cid = _schema->column_id(i);
- // todo(wb) abstract make column where
- if (!_is_pred_column[cid]) {
- block->replace_by_position(i,
std::move(_current_return_columns[cid]));
- }
- }
- block->clear_column_data();
- // clear and release iterators memory footprint in advance
- _clear_iterators();
- return Status::EndOfFile("no more data in segment");
+ if (_selected_size == 0) {
+ return _process_eof(block);
}
- if (!_is_need_vec_eval && !_is_need_short_eval && !_is_need_expr_eval) {
- if (_non_predicate_columns.empty()) {
- return Status::InternalError("_non_predicate_columns is empty");
- }
- RETURN_IF_ERROR(_convert_to_expected_type(_predicate_column_ids));
- RETURN_IF_ERROR(_convert_to_expected_type(_non_predicate_columns));
- VLOG_DEBUG << fmt::format(
- "No need to evaluate any predicates or filter block rows {}, "
- "_current_batch_rows_read {}",
- block->rows(), _current_batch_rows_read);
- _output_non_pred_columns(block);
- } else {
- uint16_t selected_size = cast_set<uint16_t>(_current_batch_rows_read);
- _sel_rowid_idx.resize(selected_size);
+ if (_is_need_vec_eval || _is_need_short_eval || _is_need_expr_eval) {
+ _sel_rowid_idx.resize(_selected_size);
if (_is_need_vec_eval || _is_need_short_eval) {
_convert_dict_code_for_predicate_if_necessary();
// step 1: evaluate vectorization predicate
- selected_size =
_evaluate_vectorization_predicate(_sel_rowid_idx.data(), selected_size);
+ _selected_size =
+ _evaluate_vectorization_predicate(_sel_rowid_idx.data(),
_selected_size);
// step 2: evaluate short circuit predicate
// todo(wb) research whether need to read short predicate after
vectorization evaluation
// to reduce cost of read short circuit columns.
// In SSB test, it make no difference; So need more
scenarios to test
- selected_size =
_evaluate_short_circuit_predicate(_sel_rowid_idx.data(), selected_size);
+ _selected_size =
+ _evaluate_short_circuit_predicate(_sel_rowid_idx.data(),
_selected_size);
VLOG_DEBUG << fmt::format("After evaluate predicates, selected
size: {} ",
- selected_size);
- if (selected_size > 0) {
+ _selected_size);
+ if (_selected_size > 0) {
// step 3.1: output short circuit and predicate column
// when lazy materialization enables, _predicate_column_ids =
distinct(_short_cir_pred_column_ids + _vec_pred_column_ids)
// see _vec_init_lazy_materialization
// todo(wb) need to tell input columnids from output columnids
RETURN_IF_ERROR(_output_column_by_sel_idx(block,
_predicate_column_ids,
-
_sel_rowid_idx.data(), selected_size));
+
_sel_rowid_idx.data(), _selected_size));
// step 3.2: read remaining expr column and evaluate it.
if (_is_need_expr_eval) {
@@ -2499,147 +2468,76 @@ Status
SegmentIterator::_next_batch_internal(vectorized::Block* block) {
SCOPED_RAW_TIMER(&_opts.stats->non_predicate_read_ns);
RETURN_IF_ERROR(_read_columns_by_rowids(
_non_predicate_column_ids, _block_rowids,
_sel_rowid_idx.data(),
- selected_size, &_current_return_columns));
- if (std::find(_non_predicate_column_ids.begin(),
- _non_predicate_column_ids.end(),
- _schema->version_col_idx()) !=
- _non_predicate_column_ids.end()) {
- _replace_version_col(selected_size);
- }
-
RETURN_IF_ERROR(_convert_to_expected_type(_non_predicate_column_ids));
- for (auto cid : _non_predicate_column_ids) {
- auto loc = _schema_block_id_map[cid];
- block->replace_by_position(loc,
-
std::move(_current_return_columns[cid]));
- }
+ _selected_size, &_current_return_columns));
+
_replace_version_col_if_needed(_non_predicate_column_ids, _selected_size);
+
RETURN_IF_ERROR(_process_columns(_non_predicate_column_ids, block));
}
DCHECK(block->columns() >
_schema_block_id_map[*_common_expr_columns.begin()]);
- // block->rows() takes the size of the first column by
default.
- // If the first column is not predicate column,
- // it has not been read yet. add a const column that has
been read to calculate rows().
- if (block->rows() == 0) {
- vectorized::MutableColumnPtr col0 =
-
std::move(*block->get_by_position(0).column).mutate();
- auto tmp_indicator_col =
- block->get_by_position(0)
-
.type->create_column_const_with_default_value(
- selected_size);
- block->replace_by_position(0,
std::move(tmp_indicator_col));
-
_output_index_result_column_for_expr(_sel_rowid_idx.data(), selected_size,
- block);
-
block->shrink_char_type_column_suffix_zero(_char_type_idx_no_0);
- RETURN_IF_ERROR(
- _execute_common_expr(_sel_rowid_idx.data(),
selected_size, block));
- block->replace_by_position(0, std::move(col0));
- } else {
-
_output_index_result_column_for_expr(_sel_rowid_idx.data(), selected_size,
- block);
-
block->shrink_char_type_column_suffix_zero(_char_type_idx);
- RETURN_IF_ERROR(
- _execute_common_expr(_sel_rowid_idx.data(),
selected_size, block));
- }
+ RETURN_IF_ERROR(
+ _process_common_expr(_sel_rowid_idx.data(),
_selected_size, block));
}
} else {
- // If column_predicate filters out all rows, the corresponding
column in _current_return_columns[cid] must be a ColumnNothing.
- // Because:
- // 1. Before each batch, _init_return_columns is called to
initialize _current_return_columns, and virtual columns in
_current_return_columns are initialized as ColumnNothing.
- // 2. When select_size == 0, the read method of
VirtualColumnIterator will definitely not be called, so the corresponding
Column remains a ColumnNothing
- for (const auto pair : _vir_cid_to_idx_in_block) {
- auto cid = pair.first;
- auto pos = pair.second;
- const vectorized::ColumnNothing* nothing_col =
-
vectorized::check_and_get_column<vectorized::ColumnNothing>(
- _current_return_columns[cid].get());
- DCHECK(nothing_col != nullptr)
- << fmt::format("ColumnNothing expected, but got
{}, cid: {}, pos: {}",
-
_current_return_columns[cid]->get_name(), cid, pos);
- _current_return_columns[cid] =
_opts.vir_col_idx_to_type[pos]->create_column();
- }
-
+ _fill_column_nothing();
if (_is_need_expr_eval) {
- // rows of this batch are all filtered by column
predicates.
-
RETURN_IF_ERROR(_convert_to_expected_type(_non_predicate_column_ids));
-
- for (auto cid : _non_predicate_column_ids) {
- auto loc = _schema_block_id_map[cid];
- block->replace_by_position(loc,
std::move(_current_return_columns[cid]));
- }
+
RETURN_IF_ERROR(_process_columns(_non_predicate_column_ids, block));
}
}
} else if (_is_need_expr_eval) {
DCHECK(!_predicate_column_ids.empty());
- RETURN_IF_ERROR(_convert_to_expected_type(_predicate_column_ids));
+ RETURN_IF_ERROR(_process_columns(_predicate_column_ids, block));
// first read all rows are insert block, initialize sel_rowid_idx
to all rows.
- for (auto cid : _predicate_column_ids) {
- auto loc = _schema_block_id_map[cid];
- block->replace_by_position(loc,
std::move(_current_return_columns[cid]));
- }
- for (uint16_t i = 0; i < selected_size; ++i) {
+ for (uint16_t i = 0; i < _selected_size; ++i) {
_sel_rowid_idx[i] = i;
}
-
- // Here we just use col0 as row_number indicator. when reach here,
we will calculate the predicates first.
- // then use the result to reduce our data read(that is, expr push
down). there's now row in block means the first
- // column is not in common expr. so it's safe to replace it
temporarily to provide correct `selected_size`.
- VLOG_DEBUG << fmt::format("Execute common expr. block rows {},
selected size {}",
- block->rows(), selected_size);
- if (block->rows() == 0) {
- vectorized::MutableColumnPtr col0 =
- std::move(*block->get_by_position(0).column).mutate();
- // temporary replace the column with a row number indicator.
using a ColumnConst is more efficient than
- // insert_many_default
- auto tmp_indicator_col =
-
block->get_by_position(0).type->create_column_const_with_default_value(
- selected_size);
- block->replace_by_position(0, std::move(tmp_indicator_col));
-
- _output_index_result_column_for_expr(_sel_rowid_idx.data(),
selected_size, block);
-
block->shrink_char_type_column_suffix_zero(_char_type_idx_no_0);
- RETURN_IF_ERROR(_execute_common_expr(_sel_rowid_idx.data(),
selected_size, block));
- // now recover the origin col0
- block->replace_by_position(0, std::move(col0));
- } else {
- _output_index_result_column_for_expr(_sel_rowid_idx.data(),
selected_size, block);
- block->shrink_char_type_column_suffix_zero(_char_type_idx);
- RETURN_IF_ERROR(_execute_common_expr(_sel_rowid_idx.data(),
selected_size, block));
- }
- VLOG_DEBUG << fmt::format("Execute common expr end. block rows {},
selected size {}",
- block->rows(), selected_size);
+ RETURN_IF_ERROR(_process_common_expr(_sel_rowid_idx.data(),
_selected_size, block));
}
- if (UNLIKELY(_opts.record_rowids)) {
- _sel_rowid_idx.resize(selected_size);
- _selected_size = selected_size;
- }
-
- if (_non_predicate_columns.empty()) {
- // shrink char_type suffix zero data
- block->shrink_char_type_column_suffix_zero(_char_type_idx);
-
- return Status::OK();
- }
// step4: read non_predicate column
- if (selected_size > 0) {
+ if (_selected_size > 0 && !_non_predicate_columns.empty()) {
RETURN_IF_ERROR(_read_columns_by_rowids(_non_predicate_columns,
_block_rowids,
- _sel_rowid_idx.data(),
selected_size,
+ _sel_rowid_idx.data(),
_selected_size,
&_current_return_columns));
- if (std::find(_non_predicate_columns.begin(),
_non_predicate_columns.end(),
- _schema->version_col_idx()) !=
_non_predicate_columns.end()) {
- _replace_version_col(selected_size);
- }
+ _replace_version_col_if_needed(_non_predicate_columns,
_selected_size);
}
-
- RETURN_IF_ERROR(_convert_to_expected_type(_non_predicate_columns));
- // step5: output columns
- _output_non_pred_columns(block);
}
+ // step5: output columns
+ RETURN_IF_ERROR(_output_non_pred_columns(block));
RETURN_IF_ERROR(_materialization_of_virtual_column(block));
-
// shrink char_type suffix zero data
block->shrink_char_type_column_suffix_zero(_char_type_idx);
+ return _check_output_block(block);
+}
+Status SegmentIterator::_process_columns(const std::vector<ColumnId>&
column_ids,
+ vectorized::Block* block) {
+ RETURN_IF_ERROR(_convert_to_expected_type(column_ids));
+ for (auto cid : column_ids) {
+ auto loc = _schema_block_id_map[cid];
+ block->replace_by_position(loc,
std::move(_current_return_columns[cid]));
+ }
+ return Status::OK();
+}
+
+void SegmentIterator::_fill_column_nothing() {
+ // If column_predicate filters out all rows, the corresponding column in
_current_return_columns[cid] must be a ColumnNothing.
+ // Because:
+ // 1. Before each batch, _init_return_columns is called to initialize
_current_return_columns, and virtual columns in _current_return_columns are
initialized as ColumnNothing.
+ // 2. When select_size == 0, the read method of VirtualColumnIterator will
definitely not be called, so the corresponding Column remains a ColumnNothing
+ for (const auto pair : _vir_cid_to_idx_in_block) {
+ auto cid = pair.first;
+ auto pos = pair.second;
+ const auto* nothing_col =
vectorized::check_and_get_column<vectorized::ColumnNothing>(
+ _current_return_columns[cid].get());
+ DCHECK(nothing_col != nullptr)
+ << fmt::format("ColumnNothing expected, but got {}, cid: {},
pos: {}",
+ _current_return_columns[cid]->get_name(), cid,
pos);
+ _current_return_columns[cid] =
_opts.vir_col_idx_to_type[pos]->create_column();
+ }
+}
+
+Status SegmentIterator::_check_output_block(vectorized::Block* block) {
#ifndef NDEBUG
size_t rows = block->rows();
size_t idx = 0;
@@ -2657,17 +2555,16 @@ Status
SegmentIterator::_next_batch_internal(vectorized::Block* block) {
}
std::string vir_cid_to_idx_in_block_msg =
fmt::format("_vir_cid_to_idx_in_block:[{}]",
fmt::join(vcid_to_idx, ","));
- throw doris::Exception(
- ErrorCode::INTERNAL_ERROR,
+ return Status::InternalError(
"Column in idx {} is nothing, block columns {},
normal_columns {}, "
"vir_cid_to_idx_in_block_msg {}",
idx, block->columns(), _schema->num_column_ids(),
vir_cid_to_idx_in_block_msg);
} else if (entry.column->size() != rows) {
- throw doris::Exception(
- ErrorCode::INTERNAL_ERROR,
- "Unmatched size {}, expected {}, column: {}, type: {},
idx_in_block: {}",
+ return Status::InternalError(
+ "Unmatched size {}, expected {}, column: {}, type: {},
idx_in_block: {}, "
+ "block: {}",
entry.column->size(), rows, entry.column->get_name(),
entry.type->get_name(),
- idx);
+ idx, block->dump_structure());
}
idx++;
}
@@ -2675,6 +2572,57 @@ Status
SegmentIterator::_next_batch_internal(vectorized::Block* block) {
return Status::OK();
}
+Status SegmentIterator::_process_column_predicate() {
+ return Status::OK();
+}
+
+Status SegmentIterator::_process_eof(vectorized::Block* block) {
+ // Convert all columns in _current_return_columns to schema column
+ RETURN_IF_ERROR(_convert_to_expected_type(_schema->column_ids()));
+ for (int i = 0; i < block->columns(); i++) {
+ auto cid = _schema->column_id(i);
+ if (!_is_pred_column[cid]) {
+ block->replace_by_position(i,
std::move(_current_return_columns[cid]));
+ }
+ }
+ block->clear_column_data();
+ // clear and release iterators memory footprint in advance
+ _column_iterators.clear();
+ _bitmap_index_iterators.clear();
+ _index_iterators.clear();
+ return Status::EndOfFile("no more data in segment");
+}
+
+Status SegmentIterator::_process_common_expr(uint16_t* sel_rowid_idx,
uint16_t& selected_size,
+ vectorized::Block* block) {
+ // Here we just use col0 as row_number indicator. when reach here, we will
calculate the predicates first.
+ // then use the result to reduce our data read(that is, expr push down).
there's now row in block means the first
+ // column is not in common expr. so it's safe to replace it temporarily
to provide correct `selected_size`.
+ VLOG_DEBUG << fmt::format("Execute common expr. block rows {}, selected
size {}", block->rows(),
+ _selected_size);
+
+ bool need_mock_col = block->rows() != selected_size;
+ vectorized::MutableColumnPtr col0;
+ if (need_mock_col) {
+ col0 = std::move(*block->get_by_position(0).column).mutate();
+ block->replace_by_position(
+ 0,
block->get_by_position(0).type->create_column_const_with_default_value(
+ _selected_size));
+ }
+
+ _output_index_result_column_for_expr(_sel_rowid_idx.data(),
_selected_size, block);
+ block->shrink_char_type_column_suffix_zero(_char_type_idx);
+ RETURN_IF_ERROR(_execute_common_expr(_sel_rowid_idx.data(),
_selected_size, block));
+
+ if (need_mock_col) {
+ block->replace_by_position(0, std::move(col0));
+ }
+
+ VLOG_DEBUG << fmt::format("Execute common expr end. block rows {},
selected size {}",
+ block->rows(), _selected_size);
+ return Status::OK();
+}
+
Status SegmentIterator::_execute_common_expr(uint16_t* sel_rowid_idx,
uint16_t& selected_size,
vectorized::Block* block) {
SCOPED_RAW_TIMER(&_opts.stats->expr_filter_ns);
@@ -2749,23 +2697,15 @@ void
SegmentIterator::_output_index_result_column_for_expr(uint16_t* sel_rowid_i
const auto& index_result_bitmap =
inverted_index_result_bitmap_for_expr.second.get_data_bitmap();
auto index_result_column = vectorized::ColumnUInt8::create();
+
vectorized::ColumnUInt8::Container& vec_match_pred =
index_result_column->get_data();
vec_match_pred.resize(block->rows());
- size_t idx_in_selected = 0;
roaring::BulkContext bulk_context;
-
- for (uint32_t i = 0; i < _current_batch_rows_read; i++) {
- auto rowid = _block_rowids[i];
- if (sel_rowid_idx == nullptr ||
- (idx_in_selected < select_size && i ==
sel_rowid_idx[idx_in_selected])) {
- if (index_result_bitmap->containsBulk(bulk_context,
rowid)) {
- vec_match_pred[idx_in_selected] = true;
- } else {
- vec_match_pred[idx_in_selected] = false;
- }
- idx_in_selected++;
- }
+ for (uint32_t i = 0; i < select_size; i++) {
+ auto rowid = sel_rowid_idx ? _block_rowids[sel_rowid_idx[i]] :
_block_rowids[i];
+ vec_match_pred[i] =
index_result_bitmap->containsBulk(bulk_context, rowid);
}
+
DCHECK(block->rows() == vec_match_pred.size());
expr_ctx->get_inverted_index_context()->set_inverted_index_result_column_for_expr(
expr, std::move(index_result_column));
@@ -2805,15 +2745,14 @@ void
SegmentIterator::_convert_dict_code_for_predicate_if_necessary_impl(
Status SegmentIterator::current_block_row_locations(std::vector<RowLocation>*
block_row_locations) {
DCHECK(_opts.record_rowids);
- DCHECK_GE(_block_rowids.size(), _current_batch_rows_read);
+ DCHECK_GE(_block_rowids.size(), _selected_size);
+ block_row_locations->resize(_selected_size);
uint32_t sid = segment_id();
if (!_is_need_vec_eval && !_is_need_short_eval && !_is_need_expr_eval) {
- block_row_locations->resize(_current_batch_rows_read);
- for (auto i = 0; i < _current_batch_rows_read; i++) {
+ for (auto i = 0; i < _selected_size; i++) {
(*block_row_locations)[i] = RowLocation(sid, _block_rowids[i]);
}
} else {
- block_row_locations->resize(_selected_size);
for (auto i = 0; i < _selected_size; i++) {
(*block_row_locations)[i] = RowLocation(sid,
_block_rowids[_sel_rowid_idx[i]]);
}
diff --git a/be/src/olap/rowset/segment_v2/segment_iterator.h
b/be/src/olap/rowset/segment_v2/segment_iterator.h
index c83d6a4a866..7a82ed2f716 100644
--- a/be/src/olap/rowset/segment_v2/segment_iterator.h
+++ b/be/src/olap/rowset/segment_v2/segment_iterator.h
@@ -130,7 +130,6 @@ public:
const Schema& schema() const override { return *_schema; }
Segment& segment() { return *_segment; }
StorageReadOptions& storage_read_options() { return _opts; }
- bool is_lazy_materialization_read() const override { return
_lazy_materialization_read; }
uint64_t data_id() const override { return _segment->id(); }
RowsetId rowset_id() const { return _segment->rowset_id(); }
int64_t tablet_id() const { return _tablet_id; }
@@ -154,6 +153,8 @@ public:
private:
Status _next_batch_internal(vectorized::Block* block);
+ Status _check_output_block(vectorized::Block* block);
+
template <typename Container>
void _update_profile(RuntimeProfile* profile, const Container& predicates,
const std::string& title) {
@@ -167,7 +168,7 @@ private:
profile->add_info_string(title, info);
}
- [[nodiscard]] Status _lazy_init();
+ [[nodiscard]] Status _lazy_init(vectorized::Block* block);
[[nodiscard]] Status _init_impl(const StorageReadOptions& opts);
[[nodiscard]] Status _init_return_column_iterators();
[[nodiscard]] Status _init_bitmap_index_iterators();
@@ -217,15 +218,15 @@ private:
// for vectorization implementation
[[nodiscard]] Status _read_columns(const std::vector<ColumnId>& column_ids,
vectorized::MutableColumns&
column_block, size_t nrows);
- [[nodiscard]] Status _read_columns_by_index(uint32_t nrows_read_limit,
uint32_t& nrows_read);
- void _replace_version_col(size_t num_rows);
+ [[nodiscard]] Status _read_columns_by_index(uint32_t nrows_read_limit,
uint16_t& nrows_read);
+ void _replace_version_col_if_needed(const std::vector<ColumnId>&
column_ids, size_t num_rows);
Status _init_current_block(vectorized::Block* block,
std::vector<vectorized::MutableColumnPtr>&
non_pred_vector,
uint32_t nrows_read_limit);
uint16_t _evaluate_vectorization_predicate(uint16_t* sel_rowid_idx,
uint16_t selected_size);
uint16_t _evaluate_short_circuit_predicate(uint16_t* sel_rowid_idx,
uint16_t selected_size);
void _collect_runtime_filter_predicate();
- void _output_non_pred_columns(vectorized::Block* block);
+ Status _output_non_pred_columns(vectorized::Block* block);
[[nodiscard]] Status _read_columns_by_rowids(std::vector<ColumnId>&
read_column_ids,
std::vector<rowid_t>&
rowid_vector,
uint16_t* sel_rowid_idx,
size_t select_size,
@@ -281,6 +282,9 @@ private:
// same with _extract_common_expr_columns, but only extract columns that
can be used for index
[[nodiscard]] Status _execute_common_expr(uint16_t* sel_rowid_idx,
uint16_t& selected_size,
vectorized::Block* block);
+ Status _process_common_expr(uint16_t* sel_rowid_idx, uint16_t&
selected_size,
+ vectorized::Block* block);
+
uint16_t _evaluate_common_expr_filter(uint16_t* sel_rowid_idx, uint16_t
selected_size,
const vectorized::IColumn::Filter&
filter);
@@ -363,7 +367,13 @@ private:
void _calculate_expr_in_remaining_conjunct_root();
- void _clear_iterators();
+ Status _process_eof(vectorized::Block* block);
+
+ Status _process_column_predicate();
+
+ void _fill_column_nothing();
+
+ Status _process_columns(const std::vector<ColumnId>& column_ids,
vectorized::Block* block);
// Initialize virtual columns in the block, set all virtual columns in the
block to ColumnNothing
void _init_virtual_columns(vectorized::Block* block);
@@ -424,7 +434,7 @@ private:
std::vector<ColumnId> _non_predicate_column_ids;
// TODO: Should use std::vector<size_t>
std::vector<ColumnId> _columns_to_filter;
- std::vector<ColumnId> _converted_column_ids;
+ std::vector<bool> _converted_column_ids;
// TODO: Should use std::vector<size_t>
std::vector<int> _schema_block_id_map; // map from schema column id to
column idx in Block
@@ -454,11 +464,8 @@ private:
// char_type or array<char> type columns cid
std::vector<size_t> _char_type_idx;
- std::vector<size_t> _char_type_idx_no_0;
std::vector<bool> _is_char_type;
- // number of rows read in the current batch
- uint32_t _current_batch_rows_read = 0;
// used for compaction, record selectd rowids of current batch
uint16_t _selected_size;
std::vector<uint16_t> _sel_rowid_idx;
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]