This is an automated email from the ASF dual-hosted git repository.
kxiao pushed a commit to branch branch-2.0
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-2.0 by this push:
new a6bf4ee8c5f [Pick-2.0](segment iterator) remove range in first read to
save time #26689 (#26861)
a6bf4ee8c5f is described below
commit a6bf4ee8c5f8d71c5fc0acb21a983d9d58fc81cc
Author: airborne12 <[email protected]>
AuthorDate: Mon Dec 11 21:37:15 2023 +0800
[Pick-2.0](segment iterator) remove range in first read to save time #26689
(#26861)
---
be/src/olap/rowset/segment_v2/segment_iterator.cpp | 193 ++++++++++++++-------
be/src/olap/rowset/segment_v2/segment_iterator.h | 3 +-
2 files changed, 133 insertions(+), 63 deletions(-)
diff --git a/be/src/olap/rowset/segment_v2/segment_iterator.cpp
b/be/src/olap/rowset/segment_v2/segment_iterator.cpp
index a168d12599c..aca5768b93c 100644
--- a/be/src/olap/rowset/segment_v2/segment_iterator.cpp
+++ b/be/src/olap/rowset/segment_v2/segment_iterator.cpp
@@ -104,11 +104,12 @@ public:
explicit BitmapRangeIterator(const roaring::Roaring& bitmap) {
roaring_init_iterator(&bitmap.roaring, &_iter);
- _read_next_batch();
}
bool has_more_range() const { return !_eof; }
+ [[nodiscard]] static uint32_t get_batch_size() { return kBatchSize; }
+
// read next range into [*from, *to) whose size <= max_range_size.
// return false when there is no more range.
virtual bool next_range(const uint32_t max_range_size, uint32_t* from,
uint32_t* to) {
@@ -147,6 +148,11 @@ public:
return true;
}
+ // read batch_size of rowids from roaring bitmap into buf array
+ virtual uint32_t read_batch_rowids(rowid_t* buf, uint32_t batch_size) {
+ return roaring::api::roaring_read_uint32_iterator(&_iter, buf,
batch_size);
+ }
+
private:
void _read_next_batch() {
_buf_pos = 0;
@@ -171,6 +177,8 @@ class SegmentIterator::BackwardBitmapRangeIterator : public
SegmentIterator::Bit
public:
explicit BackwardBitmapRangeIterator(const roaring::Roaring& bitmap) {
roaring_init_iterator_last(&bitmap.roaring, &_riter);
+ _rowid_count = roaring_bitmap_get_cardinality(&bitmap.roaring);
+ _rowid_left = _rowid_count;
}
bool has_more_range() const { return !_riter.has_value; }
@@ -194,9 +202,51 @@ public:
return true;
}
+ /**
+ * Reads a batch of row IDs from a roaring bitmap, starting from the end
and moving backwards.
+ * This function retrieves the last `batch_size` row IDs from the bitmap
and stores them in the provided buffer.
+ * It updates the internal state to track how many row IDs are left to
read in subsequent calls.
+ *
+ * The row IDs are read in reverse order, but stored in the buffer
maintaining their original order in the bitmap.
+ *
+ * Example:
+ * input bitmap: [0 1 4 5 6 7 10 15 16 17 18 19]
+ * If the bitmap has 12 elements and batch_size is set to 5, the
function will first read [15, 16, 17, 18, 19]
+ * into the buffer, leaving 7 elements left. In the next call with
batch_size 5, it will read [4, 5, 6, 7, 10].
+ *
+ */
+ uint32_t read_batch_rowids(rowid_t* buf, uint32_t batch_size) override {
+ if (!_riter.has_value || _rowid_left == 0) {
+ return 0;
+ }
+
+ if (_rowid_count <= batch_size) {
+ roaring_bitmap_to_uint32_array(_riter.parent,
+ buf); // Fill 'buf' with
'_rowid_count' elements.
+ uint32_t num_read = _rowid_left; // Save the number of row IDs
read.
+ _rowid_left = 0; // No row IDs left after this
operation.
+ return num_read; // Return the number of row
IDs read.
+ }
+
+ uint32_t read_size = std::min(batch_size, _rowid_left);
+ uint32_t num_read = 0; // Counter for the number of row IDs read.
+
+ // Read row IDs into the buffer in reverse order.
+ while (num_read < read_size && _riter.has_value) {
+ buf[read_size - num_read - 1] = _riter.current_value;
+ num_read++;
+ _rowid_left--; // Decrement the count of remaining row IDs.
+ roaring_previous_uint32_iterator(&_riter);
+ }
+
+ // Return the actual number of row IDs read.
+ return num_read;
+ }
private:
roaring::api::roaring_uint32_iterator_t _riter;
+ uint32_t _rowid_count;
+ uint32_t _rowid_left;
};
SegmentIterator::SegmentIterator(std::shared_ptr<Segment> segment, SchemaSPtr
schema)
@@ -1670,56 +1720,86 @@ void
SegmentIterator::_output_non_pred_columns(vectorized::Block* block) {
}
}
+/**
+ * Reads columns by their index, handling both continuous and discontinuous
rowid scenarios.
+ *
+ * This function is designed to read a specified number of rows (up to
nrows_read_limit)
+ * from the segment iterator, dealing with both continuous and discontinuous
rowid arrays.
+ * It operates as follows:
+ *
+ * 1. Reads a batch of rowids (up to the specified limit), and checks if they
are continuous.
+ * Continuous here means that the rowids form an unbroken sequence (e.g.,
1, 2, 3, 4...).
+ *
+ * 2. For each column that needs to be read (identified by
_first_read_column_ids):
+ * - If the rowids are continuous, the function uses seek_to_ordinal and
next_batch
+ * for efficient reading.
+ * - If the rowids are not continuous, the function processes them in
smaller batches
+ * (each of size up to 256). Each batch is checked for internal
continuity:
+ * a. If a batch is continuous, uses seek_to_ordinal and next_batch for
that batch.
+ * b. If a batch is not continuous, uses read_by_rowids for individual
rowids in the batch.
+ *
+ * This approach optimizes reading performance by leveraging batch processing
for continuous
+ * rowid sequences and handling discontinuities gracefully in smaller chunks.
+ */
Status SegmentIterator::_read_columns_by_index(uint32_t nrows_read_limit,
uint32_t& nrows_read,
bool set_block_rowid) {
SCOPED_RAW_TIMER(&_opts.stats->first_read_ns);
- do {
- uint32_t range_from = 0;
- uint32_t range_to = 0;
- bool has_next_range =
- _range_iter->next_range(nrows_read_limit - nrows_read,
&range_from, &range_to);
- if (!has_next_range) {
- break;
- }
-
- size_t rows_to_read = range_to - range_from;
- _cur_rowid = range_to;
-
- if (set_block_rowid) {
- // Here use std::iota is better performance than for-loop, maybe
for-loop is not vectorized
- auto start = _block_rowids.data() + nrows_read;
- auto end = start + rows_to_read;
- std::iota(start, end, range_from);
- nrows_read += rows_to_read;
- } else {
- nrows_read += rows_to_read;
- }
-
- _split_row_ranges.emplace_back(std::pair {range_from, range_to});
- } while (nrows_read < nrows_read_limit && !_opts.read_orderby_key_reverse);
+ nrows_read = _range_iter->read_batch_rowids(_block_rowids.data(),
nrows_read_limit);
+ bool is_continuous = (nrows_read > 1) &&
+ (_block_rowids[nrows_read - 1] - _block_rowids[0] ==
nrows_read - 1);
for (auto cid : _first_read_column_ids) {
auto& column = _current_return_columns[cid];
if (_prune_column(cid, column, true, nrows_read)) {
continue;
}
- for (auto& range : _split_row_ranges) {
- size_t nrows = range.second - range.first;
- {
- _opts.stats->block_first_read_seek_num += 1;
- if (_opts.runtime_state &&
_opts.runtime_state->enable_profile()) {
- SCOPED_RAW_TIMER(&_opts.stats->block_first_read_seek_ns);
-
RETURN_IF_ERROR(_column_iterators[cid]->seek_to_ordinal(range.first));
- } else {
-
RETURN_IF_ERROR(_column_iterators[cid]->seek_to_ordinal(range.first));
- }
+
+ if (is_continuous) {
+ size_t rows_read = nrows_read;
+ _opts.stats->block_first_read_seek_num += 1;
+ if (_opts.runtime_state && _opts.runtime_state->enable_profile()) {
+ SCOPED_RAW_TIMER(&_opts.stats->block_first_read_seek_ns);
+
RETURN_IF_ERROR(_column_iterators[cid]->seek_to_ordinal(_block_rowids[0]));
+ } else {
+
RETURN_IF_ERROR(_column_iterators[cid]->seek_to_ordinal(_block_rowids[0]));
}
- size_t rows_read = nrows;
RETURN_IF_ERROR(_column_iterators[cid]->next_batch(&rows_read,
column));
- if (rows_read != nrows) {
- return Status::Error<ErrorCode::INTERNAL_ERROR>("nrows({}) !=
rows_read({})", nrows,
- rows_read);
+ if (rows_read != nrows_read) {
+ return Status::Error<ErrorCode::INTERNAL_ERROR>("nrows({}) !=
rows_read({})",
+ nrows_read,
rows_read);
+ }
+ } else {
+ const uint32_t batch_size = _range_iter->get_batch_size();
+ uint32_t processed = 0;
+ while (processed < nrows_read) {
+ uint32_t current_batch_size = std::min(batch_size, nrows_read
- processed);
+ bool batch_continuous = (current_batch_size > 1) &&
+ (_block_rowids[processed +
current_batch_size - 1] -
+ _block_rowids[processed] ==
+ current_batch_size - 1);
+
+ if (batch_continuous) {
+ size_t rows_read = current_batch_size;
+ _opts.stats->block_first_read_seek_num += 1;
+ if (_opts.runtime_state &&
_opts.runtime_state->enable_profile()) {
+
SCOPED_RAW_TIMER(&_opts.stats->block_first_read_seek_ns);
+ RETURN_IF_ERROR(
+
_column_iterators[cid]->seek_to_ordinal(_block_rowids[processed]));
+ } else {
+ RETURN_IF_ERROR(
+
_column_iterators[cid]->seek_to_ordinal(_block_rowids[processed]));
+ }
+
RETURN_IF_ERROR(_column_iterators[cid]->next_batch(&rows_read, column));
+ if (rows_read != current_batch_size) {
+ return Status::Error<ErrorCode::INTERNAL_ERROR>(
+ "batch nrows({}) != rows_read({})",
current_batch_size, rows_read);
+ }
+ } else {
+ RETURN_IF_ERROR(_column_iterators[cid]->read_by_rowids(
+ &_block_rowids[processed], current_batch_size,
column));
+ }
+ processed += current_batch_size;
}
}
}
@@ -1918,8 +1998,6 @@ Status
SegmentIterator::_next_batch_internal(vectorized::Block* block) {
nrows_read_limit = std::min(nrows_read_limit, (uint32_t)100);
_wait_times_estimate_row_size--;
}
- _split_row_ranges.clear();
- _split_row_ranges.reserve(nrows_read_limit / 2);
RETURN_IF_ERROR(_read_columns_by_index(
nrows_read_limit, _current_batch_rows_read,
_lazy_materialization_read || _opts.record_rowids ||
_is_need_expr_eval));
@@ -2185,35 +2263,28 @@ void
SegmentIterator::_output_index_result_column(uint16_t* sel_rowid_idx, uint1
}
}
-void SegmentIterator::_build_index_result_column(uint16_t* sel_rowid_idx,
uint16_t select_size,
- vectorized::Block* block,
+void SegmentIterator::_build_index_result_column(const uint16_t* sel_rowid_idx,
+ uint16_t select_size,
vectorized::Block* block,
const std::string&
pred_result_sign,
const roaring::Roaring&
index_result) {
auto index_result_column = vectorized::ColumnUInt8::create();
vectorized::ColumnUInt8::Container& vec_match_pred =
index_result_column->get_data();
vec_match_pred.resize(block->rows());
- size_t idx_in_block = 0;
- size_t idx_in_row_range = 0;
size_t idx_in_selected = 0;
- // _split_row_ranges store multiple ranges which split in function
_read_columns_by_index(),
- // index_result is a column predicate apply result in a whole segement,
- // but a scanner thread one time can read max rows limit by block_row_max,
- // so split _row_bitmap by one time scan range, in order to match size of
one scanner thread read rows.
- for (auto origin_row_range : _split_row_ranges) {
- for (size_t rowid = origin_row_range.first; rowid <
origin_row_range.second; ++rowid) {
- if (sel_rowid_idx == nullptr || (idx_in_selected < select_size &&
- idx_in_row_range ==
sel_rowid_idx[idx_in_selected])) {
- if (index_result.contains(rowid)) {
- vec_match_pred[idx_in_block++] = true;
- } else {
- vec_match_pred[idx_in_block++] = false;
- }
- idx_in_selected++;
+
+ for (uint32_t i = 0; i < _current_batch_rows_read; i++) {
+ auto rowid = _block_rowids[i];
+ if (sel_rowid_idx == nullptr ||
+ (idx_in_selected < select_size && i ==
sel_rowid_idx[idx_in_selected])) {
+ if (index_result.contains(rowid)) {
+ vec_match_pred[idx_in_selected] = true;
+ } else {
+ vec_match_pred[idx_in_selected] = false;
}
- idx_in_row_range++;
+ idx_in_selected++;
}
}
- assert(block->rows() == vec_match_pred.size());
+ DCHECK(block->rows() == vec_match_pred.size());
auto index_result_position = block->get_position_by_name(pred_result_sign);
block->replace_by_position(index_result_position,
std::move(index_result_column));
}
diff --git a/be/src/olap/rowset/segment_v2/segment_iterator.h
b/be/src/olap/rowset/segment_v2/segment_iterator.h
index 33d3a3f5f9c..352929678b3 100644
--- a/be/src/olap/rowset/segment_v2/segment_iterator.h
+++ b/be/src/olap/rowset/segment_v2/segment_iterator.h
@@ -259,7 +259,7 @@ private:
std::string _gen_predicate_result_sign(ColumnPredicate* predicate);
std::string _gen_predicate_result_sign(ColumnPredicateInfo*
predicate_info);
- void _build_index_result_column(uint16_t* sel_rowid_idx, uint16_t
select_size,
+ void _build_index_result_column(const uint16_t* sel_rowid_idx, uint16_t
select_size,
vectorized::Block* block, const
std::string& pred_result_sign,
const roaring::Roaring& index_result);
void _output_index_result_column(uint16_t* sel_rowid_idx, uint16_t
select_size,
@@ -340,7 +340,6 @@ private:
roaring::Roaring _row_bitmap;
// "column_name+operator+value-> <in_compound_query, rowid_result>
std::unordered_map<std::string, std::pair<bool, roaring::Roaring>>
_rowid_result_for_index;
- std::vector<std::pair<uint32_t, uint32_t>> _split_row_ranges;
// an iterator for `_row_bitmap` that can be used to extract row range to
scan
std::unique_ptr<BitmapRangeIterator> _range_iter;
// the next rowid to read
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]