This is an automated email from the ASF dual-hosted git repository.
HappenLee pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new 12f02ebf090 [opt](be) Batch row_id reads in seek_and_read_by_rowid to
reduce column iterator overhead (#63436)
12f02ebf090 is described below
commit 12f02ebf090f707b3e745ba98563957b792f561a
Author: HappenLee <[email protected]>
AuthorDate: Thu May 21 12:00:36 2026 +0800
[opt](be) Batch row_id reads in seek_and_read_by_rowid to reduce column
iterator overhead (#63436)
Change seek_and_read_by_rowid to accept a batch of row_ids instead of a
single row_id, allowing the underlying column iterator's read_by_rowids
to process all rows in one call. This eliminates per-row iterator
re-initialization overhead in multi-row fetch paths (point query, batch
index lookup).
about 10% speed up
---
be/src/exec/rowid_fetcher.cpp | 136 +++++++++++++++++++++-----------
be/src/service/point_query_executor.cpp | 5 +-
be/src/storage/segment/segment.cpp | 18 +++--
be/src/storage/segment/segment.h | 5 +-
4 files changed, 111 insertions(+), 53 deletions(-)
diff --git a/be/src/exec/rowid_fetcher.cpp b/be/src/exec/rowid_fetcher.cpp
index f97bce17a8c..27c66197541 100644
--- a/be/src/exec/rowid_fetcher.cpp
+++ b/be/src/exec/rowid_fetcher.cpp
@@ -357,6 +357,36 @@ struct SegItem {
SegmentSharedPtr segment;
};
+// Groups all row_ids belonging to the same segment for batched reading.
+// Position index tracks where each row_id originated in the original request,
+// so results can be scattered back to the correct output positions.
+struct DorisFormatReadBatch {
+ std::shared_ptr<FileMapping> file_mapping;
+ // (row_id, index_in_request) pairs for all rows in this segment.
+ std::vector<std::pair<segment_v2::rowid_t, size_t>> row_ids_with_positions;
+};
+
+static void scatter_scan_blocks_to_result_block(
+ const std::vector<std::pair<size_t, size_t>>& row_id_block_idx,
+ std::vector<Block>& scan_blocks, Block& result_block) {
+ for (size_t column_id = 0; column_id < result_block.columns();
++column_id) {
+ auto dst_col =
const_cast<IColumn*>(result_block.get_by_position(column_id).column.get());
+
+ std::vector<const IColumn*> scan_src_columns;
+ scan_src_columns.reserve(row_id_block_idx.size());
+ std::vector<size_t> scan_positions;
+ scan_positions.reserve(row_id_block_idx.size());
+ for (const auto& [pos_block, block_idx] : row_id_block_idx) {
+ DCHECK(scan_blocks.size() > pos_block);
+ DCHECK(scan_blocks[pos_block].columns() > column_id);
+ scan_src_columns.emplace_back(
+
scan_blocks[pos_block].get_by_position(column_id).column.get());
+ scan_positions.emplace_back(block_idx);
+ }
+ dst_col->insert_from_multi_column(scan_src_columns, scan_positions);
+ }
+}
+
Status RowIdStorageReader::read_by_rowids(const PMultiGetRequest& request,
PMultiGetResponse* response) {
// read from storage engine row id by row id
@@ -460,7 +490,8 @@ Status RowIdStorageReader::read_by_rowids(const
PMultiGetRequest& request,
row_location.row_location.segment_id,
row_location.row_location.row_id);
for (int x = 0; x < slots.size(); ++x) {
- auto row_id =
static_cast<segment_v2::rowid_t>(row_loc.ordinal_id());
+ std::vector<segment_v2::rowid_t> row_ids {
+ static_cast<segment_v2::rowid_t>(row_loc.ordinal_id())};
MutableColumnPtr column =
result_block.get_by_position(x).column->assume_mutable();
IteratorKey iterator_key {.tablet_id = tablet->tablet_id(),
.rowset_id = rowset_id,
@@ -475,8 +506,8 @@ Status RowIdStorageReader::read_by_rowids(const
PMultiGetRequest& request,
}
segment = iterator_item.segment;
RETURN_IF_ERROR(segment->seek_and_read_by_rowid(
- full_read_schema, &slots[x], row_id, column,
iterator_item.storage_read_options,
- iterator_item.iterator));
+ full_read_schema, &slots[x], row_ids, column,
+ iterator_item.storage_read_options,
iterator_item.iterator));
}
}
// serialize block if not empty
@@ -656,36 +687,72 @@ Status RowIdStorageReader::read_batch_doris_format_row(
}
}
- std::vector<uint32_t> row_ids;
- int k = 1;
- auto max_k = 0;
- for (int j = 0; j < request_block_desc.row_id_size();) {
+ // Phase 1: Group all row_ids by their (tablet_id, rowset_id, segment_id)
key.
+ // Unlike the old code which only batched adjacent rows with the same
file_id,
+ // this merges non-contiguous same-segment requests into a single batch,
+ // maximizing the number of rows read per seek_and_read_by_rowid call.
+ std::vector<DorisFormatReadBatch> scan_batches;
+ std::unordered_map<SegKey, size_t, HashOfSegKey> batch_idx_by_seg;
+ // (batch_idx, position_in_batch) for each row in the original request.
+ std::vector<std::pair<size_t, size_t>>
row_id_block_idx(request_block_desc.row_id_size());
+ for (int j = 0; j < request_block_desc.row_id_size(); ++j) {
auto file_id = request_block_desc.file_id(j);
- row_ids.emplace_back(request_block_desc.row_id(j));
auto file_mapping = id_file_map->get_file_mapping(file_id);
if (!file_mapping) {
return Status::InternalError(
"Backend:{} file_mapping not found, query_id: {}, file_id:
{}",
BackendOptions::get_localhost(), print_id(query_id),
file_id);
}
- for (k = 1; j + k < request_block_desc.row_id_size(); ++k) {
- if (request_block_desc.file_id(j + k) == file_id) {
- row_ids.emplace_back(request_block_desc.row_id(j + k));
- } else {
- break;
- }
+
+ // Derive segment key and group by it — rows from the same segment are
batched together
+ // even if they are interleaved with rows from other segments in the
request.
+ auto [tablet_id, rowset_id, segment_id] =
file_mapping->get_doris_format_info();
+ SegKey seg_key {.tablet_id = tablet_id, .rowset_id = rowset_id,
.segment_id = segment_id};
+ auto [it, inserted] = batch_idx_by_seg.emplace(seg_key,
scan_batches.size());
+ if (inserted) {
+ // First time seeing this segment, create a new batch for it.
+ scan_batches.emplace_back();
+ scan_batches.back().file_mapping = file_mapping;
}
+ // Record (row_id, original_request_index) for later sorting and
scattering.
+
scan_batches[it->second].row_ids_with_positions.emplace_back(request_block_desc.row_id(j),
+ j);
+ }
- RETURN_IF_ERROR(read_doris_format_row(
- id_file_map, file_mapping, row_ids, slots, full_read_schema,
row_store_read_struct,
- stats, acquire_tablet_ms, acquire_rowsets_ms,
acquire_segments_ms,
- lookup_row_data_ms, seg_map, iterator_map, result_block));
+ // Phase 2: For each segment, sort row_ids ascending (required by
ColumnIterator),
+ // deduplicate, then read all rows in a single batch call.
+ std::vector<Block> scan_blocks(scan_batches.size());
+ for (size_t batch_idx = 0; batch_idx < scan_batches.size(); ++batch_idx) {
+ auto& scan_batch = scan_batches[batch_idx];
+ auto& row_ids_with_positions = scan_batch.row_ids_with_positions;
+ std::sort(row_ids_with_positions.begin(), row_ids_with_positions.end(),
+ [](const auto& lhs, const auto& rhs) { return lhs.first <
rhs.first; });
+
+ // Column iterators read rowids monotonically. Deduplicate consecutive
identical row_ids
+ // (different file_ids may map to the same row), then scatter rows
back to their original
+ // request positions.
+ std::vector<uint32_t> row_ids;
+ row_ids.reserve(row_ids_with_positions.size());
+
+ // Also builds the scatter map: row_id_block_idx[original_request_idx]
->
+ // (batch_idx, deduplicated_position_in_batch).
+ for (const auto& [row_id, result_idx] : row_ids_with_positions) {
+ if (row_ids.empty() || row_ids.back() != row_id) {
+ row_ids.emplace_back(row_id);
+ }
+ row_id_block_idx[result_idx] = std::make_pair(batch_idx,
row_ids.size() - 1);
+ }
- j += k;
- max_k = std::max(max_k, k);
- row_ids.clear();
+ scan_blocks[batch_idx] = Block(slots, row_ids.size());
+ RETURN_IF_ERROR(read_doris_format_row(id_file_map,
scan_batch.file_mapping, row_ids, slots,
+ full_read_schema,
row_store_read_struct, stats,
+ acquire_tablet_ms,
acquire_rowsets_ms,
+ acquire_segments_ms,
lookup_row_data_ms, seg_map,
+ iterator_map,
scan_blocks[batch_idx]));
}
+ scatter_scan_blocks_to_result_block(row_id_block_idx, scan_blocks,
result_block);
+
return Status::OK();
}
@@ -929,24 +996,7 @@ Status RowIdStorageReader::read_batch_external_row(
},
&scan_running_time));
- // Insert the read data into result_block.
- for (size_t column_id = 0; column_id < result_block.get_columns().size();
column_id++) {
- // The non-const Block(result_block) is passed in read_by_rowids, but
columns[i] in get_columns
- // is at bottom an immutable_ptr of Cow<IColumn>, so use const_cast
- auto dst_col =
const_cast<IColumn*>(result_block.get_columns()[column_id].get());
-
- std::vector<const IColumn*> scan_src_columns;
- scan_src_columns.reserve(row_id_block_idx.size());
- std::vector<size_t> scan_positions;
- scan_positions.reserve(row_id_block_idx.size());
- for (const auto& [pos_block, block_idx] : row_id_block_idx) {
- DCHECK(scan_blocks.size() > pos_block);
- DCHECK(scan_blocks[pos_block].get_columns().size() > column_id);
-
scan_src_columns.emplace_back(scan_blocks[pos_block].get_columns()[column_id].get());
- scan_positions.emplace_back(block_idx);
- }
- dst_col->insert_from_multi_column(scan_src_columns, scan_positions);
- }
+ scatter_scan_blocks_to_result_block(row_id_block_idx, scan_blocks,
result_block);
// Statistical runtime profile information.
std::unique_ptr<RuntimeProfile> runtime_profile =
@@ -1101,11 +1151,9 @@ Status RowIdStorageReader::read_doris_format_row(
iterator_item.storage_read_options.stats = &stats;
iterator_item.storage_read_options.io_ctx.reader_type =
ReaderType::READER_QUERY;
}
- for (auto row_id : row_ids) {
- RETURN_IF_ERROR(segment->seek_and_read_by_rowid(
- full_read_schema, &slots[x], row_id, column,
- iterator_item.storage_read_options,
iterator_item.iterator));
- }
+ RETURN_IF_ERROR(segment->seek_and_read_by_rowid(
+ full_read_schema, &slots[x], row_ids, column,
+ iterator_item.storage_read_options,
iterator_item.iterator));
}
}
return Status::OK();
diff --git a/be/src/service/point_query_executor.cpp
b/be/src/service/point_query_executor.cpp
index 441284a251b..af34a3fe1d4 100644
--- a/be/src/service/point_query_executor.cpp
+++ b/be/src/service/point_query_executor.cpp
@@ -556,7 +556,8 @@ Status PointQueryExecutor::_lookup_row_data() {
const auto& segment = *it;
for (int cid : _reusable->missing_col_uids()) {
int pos = _reusable->get_col_uid_to_idx().at(cid);
- auto row_id = static_cast<segment_v2::rowid_t>(row_loc.row_id);
+ std::vector<segment_v2::rowid_t> row_ids {
+ static_cast<segment_v2::rowid_t>(row_loc.row_id)};
MutableColumnPtr column =
_result_block->get_by_position(pos).column->assume_mutable();
std::unique_ptr<ColumnIterator> iter;
@@ -565,7 +566,7 @@ Status PointQueryExecutor::_lookup_row_data() {
storage_read_options.stats = &_read_stats;
storage_read_options.io_ctx.reader_type =
ReaderType::READER_QUERY;
RETURN_IF_ERROR(segment->seek_and_read_by_rowid(*_tablet->tablet_schema(), slot,
- row_id, column,
+ row_ids,
column,
storage_read_options, iter));
if (_tablet->tablet_schema()
->column_by_uid(slot->col_unique_id())
diff --git a/be/src/storage/segment/segment.cpp
b/be/src/storage/segment/segment.cpp
index 1c5578c1388..5c3c3f74202 100644
--- a/be/src/storage/segment/segment.cpp
+++ b/be/src/storage/segment/segment.cpp
@@ -23,6 +23,7 @@
#include <gen_cpp/olap_file.pb.h>
#include <gen_cpp/segment_v2.pb.h>
+#include <algorithm>
#include <cstring>
#include <memory>
#include <sstream>
@@ -944,9 +945,17 @@ Status Segment::read_key_by_rowid(uint32_t row_id,
std::string* key) {
}
Status Segment::seek_and_read_by_rowid(const TabletSchema& schema,
SlotDescriptor* slot,
- uint32_t row_id, MutableColumnPtr&
result,
+ const std::vector<uint32_t>& row_ids,
+ MutableColumnPtr& result,
StorageReadOptions&
storage_read_options,
std::unique_ptr<ColumnIterator>&
iterator_hint) {
+ if (row_ids.empty()) {
+ return Status::OK();
+ }
+ DORIS_CHECK(std::is_sorted(row_ids.begin(), row_ids.end()));
+ DORIS_CHECK(std::adjacent_find(row_ids.begin(), row_ids.end()) ==
row_ids.end());
+ // ColumnIterator::seek_and_read expects monotonically increasing row_ids
without
+ // duplicates for correct ordinal scanning. Enforce this contract at the
entry point.
segment_v2::ColumnIteratorOptions opt {
.use_page_cache = !config::disable_storage_page_cache,
.file_reader = file_reader().get(),
@@ -956,7 +965,6 @@ Status Segment::seek_and_read_by_rowid(const TabletSchema&
schema, SlotDescripto
&storage_read_options.stats->file_cache_stats},
};
- std::vector<segment_v2::rowid_t> single_row_loc {row_id};
if (!slot->column_paths().empty()) {
// here need create column readers to make sure column reader is
created before seek_and_read_by_rowid
// if segment cache miss, column reader will be created to make sure
the variant column result not coredump
@@ -977,13 +985,13 @@ Status Segment::seek_and_read_by_rowid(const
TabletSchema& schema, SlotDescripto
RETURN_IF_ERROR(iterator_hint->init(opt));
}
RETURN_IF_ERROR(
- iterator_hint->read_by_rowids(single_row_loc.data(), 1,
file_storage_column));
+ iterator_hint->read_by_rowids(row_ids.data(), row_ids.size(),
file_storage_column));
ColumnPtr source_ptr;
// storage may have different type with schema, so we need to cast the
column
RETURN_IF_ERROR(variant_util::cast_column(
ColumnWithTypeAndName(file_storage_column->get_ptr(),
storage_type, column.name()),
slot->type(), &source_ptr));
- RETURN_IF_CATCH_EXCEPTION(result->insert_range_from(*source_ptr, 0,
1));
+ RETURN_IF_CATCH_EXCEPTION(result->insert_range_from(*source_ptr, 0,
row_ids.size()));
} else {
int index = (slot->col_unique_id() >= 0) ?
schema.field_index(slot->col_unique_id())
:
schema.field_index(slot->col_name());
@@ -998,7 +1006,7 @@ Status Segment::seek_and_read_by_rowid(const TabletSchema&
schema, SlotDescripto
&storage_read_options));
RETURN_IF_ERROR(iterator_hint->init(opt));
}
- RETURN_IF_ERROR(iterator_hint->read_by_rowids(single_row_loc.data(),
1, result));
+ RETURN_IF_ERROR(iterator_hint->read_by_rowids(row_ids.data(),
row_ids.size(), result));
}
return Status::OK();
}
diff --git a/be/src/storage/segment/segment.h b/be/src/storage/segment/segment.h
index f18be6c093a..465d20343bf 100644
--- a/be/src/storage/segment/segment.h
+++ b/be/src/storage/segment/segment.h
@@ -144,8 +144,9 @@ public:
Status read_key_by_rowid(uint32_t row_id, std::string* key);
- Status seek_and_read_by_rowid(const TabletSchema& schema, SlotDescriptor*
slot, uint32_t row_id,
- MutableColumnPtr& result,
+ // row_ids must be strictly increasing.
+ Status seek_and_read_by_rowid(const TabletSchema& schema, SlotDescriptor*
slot,
+ const std::vector<uint32_t>& row_ids,
MutableColumnPtr& result,
StorageReadOptions& storage_read_options,
std::unique_ptr<ColumnIterator>&
iterator_hint);
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]