HappenLee commented on code in PR #52114:
URL: https://github.com/apache/doris/pull/52114#discussion_r2188913791
##########
be/src/exec/rowid_fetcher.cpp:
##########
@@ -717,19 +749,226 @@ Status RowIdStorageReader::read_batch_external_row(const
PRequestBlockDesc& requ
BackendOptions::get_localhost(), print_id(query_id),
file_id);
}
- auto& external_info = file_mapping->get_external_file_info();
- auto& scan_range_desc = external_info.scan_range_desc;
+ const auto& external_info = file_mapping->get_external_file_info();
+ const auto& scan_range_desc = external_info.scan_range_desc;
+
+ auto scan_range_hash = hash_file_range(scan_range_desc);
+ if (scan_rows.contains(scan_range_hash)) {
+
scan_rows.at(scan_range_hash).emplace(request_block_desc.row_id(j), j);
+ } else {
+ map<segment_v2::rowid_t, size_t> tmp
{{request_block_desc.row_id(j), j}};
+ scan_rows.emplace(scan_range_hash, tmp);
+ }
+ }
+
+ scan_blocks.resize(scan_rows.size());
+ row_id_block_idx.resize(request_block_desc.row_id_size());
+ init_reader_ms.resize(scan_rows.size(), 0);
+ get_block_ms.resize(scan_rows.size(), 0);
+ file_read_bytes.resize(scan_rows.size());
+ file_read_times.resize(scan_rows.size());
+
+ // Get the workload group for subsequent scan task submission.
+ std::vector<uint64_t> workload_group_ids;
+ workload_group_ids.emplace_back(workload_group_id);
+ auto wg =
ExecEnv::GetInstance()->workload_group_mgr()->get_group(workload_group_ids);
+ doris::pipeline::TaskScheduler* exec_sched = nullptr;
+ vectorized::SimplifiedScanScheduler* scan_sched = nullptr;
+ vectorized::SimplifiedScanScheduler* remote_scan_sched = nullptr;
+ wg->get_query_scheduler(&exec_sched, &scan_sched, &remote_scan_sched);
+ DCHECK(remote_scan_sched);
+
+ int64_t scan_running_time = 0;
+ RETURN_IF_ERROR(scope_timer_run(
+ [&]() -> Status {
+ // Make sure to insert data into result_block only after all
scan tasks have been executed.
+ std::atomic<int> producer_count {0};
+ std::condition_variable cv;
+ std::mutex mtx;
+
+ //semaphore: Limit the number of scan tasks submitted at one
time
+ std::counting_semaphore semaphore {max_file_scanners};
+
+ size_t idx = 0;
+ for (const auto& [_, row_ids] : scan_rows) {
+ semaphore.acquire();
+ RETURN_IF_ERROR(
+
remote_scan_sched->submit_scan_task(vectorized::SimplifiedScanTask(
+ [&, row_ids, idx]() {
+ SCOPED_ATTACH_TASK(
+ ExecEnv::GetInstance()
+
->rowid_storage_reader_tracker());
+ signal::set_signal_task_id(query_id);
+
+ scan_blocks[idx] =
vectorized::Block(slots, row_ids.size());
+
+ size_t j = 0;
+ list<int64_t> read_ids;
+ //Generate an ordered list with the
help of the orderliness of the map.
+ for (const auto& [row_id,
result_block_idx] : row_ids) {
+ read_ids.emplace_back(row_id);
+ row_id_block_idx[result_block_idx]
= make_pair(idx, j);
+ j++;
+ }
+
+ auto file_id =
+
request_block_desc.file_id(row_ids.begin()->second);
+ auto file_mapping =
id_file_map->get_file_mapping(file_id);
+ if (!file_mapping) {
+ return Status::InternalError(
+ "Backend:{} file_mapping
not found, query_id: "
+ "{}, file_id: {}",
+
BackendOptions::get_localhost(),
+ print_id(query_id),
file_id);
+ }
+
+ auto& external_info =
+
file_mapping->get_external_file_info();
+ auto& scan_range_desc =
external_info.scan_range_desc;
+
+ // Clear to avoid reading iceberg
position delete file...
+
scan_range_desc.table_format_params.iceberg_params =
+ TIcebergFileDesc {};
+
+ // Clear to avoid reading hive
transactional delete delta file...
+ scan_range_desc.table_format_params
+ .transactional_hive_params =
+ TTransactionalHiveDesc {};
+
+ std::unique_ptr<RuntimeProfile>
sub_runtime_profile =
+
std::make_unique<RuntimeProfile>(
+
"ExternalRowIDFetcher");
+ {
+
std::unique_ptr<vectorized::FileScanner>
+ vfile_scanner_ptr =
+
vectorized::FileScanner::create_unique(
+
runtime_state.get(),
+
sub_runtime_profile.get(),
+
&rpc_scan_params,
+
&colname_to_slot_id,
+
&tuple_desc);
+
+ RETURN_IF_ERROR(
+
vfile_scanner_ptr->prepare_for_read_lines(
+ scan_range_desc));
+ RETURN_IF_ERROR(
+
vfile_scanner_ptr->read_lines_from_range(
+ scan_range_desc,
read_ids,
+ &scan_blocks[idx],
external_info,
+
&init_reader_ms[idx],
+
&get_block_ms[idx]));
+ }
+
+ auto file_read_bytes_counter =
+
sub_runtime_profile->get_counter(
+
vectorized::FileScanner::
+
FileReadBytesProfile);
+
+ if (file_read_bytes_counter !=
nullptr) {
+ file_read_bytes[idx] =
PrettyPrinter::print(
+
file_read_bytes_counter->value(),
+
file_read_bytes_counter->type());
+ }
+
+ auto file_read_times_counter =
+
sub_runtime_profile->get_counter(
+
vectorized::FileScanner::
+
FileReadTimeProfile);
+ if (file_read_times_counter !=
nullptr) {
+ file_read_times[idx] =
PrettyPrinter::print(
+
file_read_times_counter->value(),
+
file_read_times_counter->type());
+ }
+
+ semaphore.release();
+ if (++producer_count ==
scan_rows.size()) {
+ std::lock_guard<std::mutex>
lock(mtx);
+ cv.notify_one();
+ }
+ return Status::OK();
+ },
+ nullptr)));
+ idx++;
+ }
- // Clear to avoid reading iceberg position delete file...
- scan_range_desc.table_format_params.iceberg_params = TIcebergFileDesc
{};
+ {
+ std::unique_lock<std::mutex> lock(mtx);
+ cv.wait(lock, [&] { return producer_count ==
scan_rows.size(); });
+ }
+ return Status::OK();
+ },
+ &scan_running_time));
+
+ // Insert the read data into result_block.
+ for (const auto& [pos_block, block_idx] : row_id_block_idx) {
+ for (size_t column_id = 0; column_id <
result_block.get_columns().size(); column_id++) {
+ auto dst_col =
+
const_cast<vectorized::IColumn*>(result_block.get_columns()[column_id].get());
- // Clear to avoid reading hive transactional delete delta file...
- scan_range_desc.table_format_params.transactional_hive_params =
TTransactionalHiveDesc {};
+ DCHECK(scan_blocks.size() > pos_block);
+ DCHECK(scan_blocks[pos_block].get_columns().size() > column_id);
+ auto& src_col =
*scan_blocks[pos_block].get_columns()[column_id].get();
- RETURN_IF_ERROR(vfile_scanner_ptr->read_one_line_from_range(
- scan_range_desc, request_block_desc.row_id(j), &result_block,
external_info,
- init_reader_ms, get_block_ms));
+ dst_col->insert_range_from(src_col, block_idx, 1);
Review Comment:
length 1, can use `insert_from` to do the work。
if want speed up the call, also can call `insert_from_multi_column`
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]