HappenLee commented on code in PR #52114:
URL: https://github.com/apache/doris/pull/52114#discussion_r2188900790
##########
be/src/exec/rowid_fetcher.cpp:
##########
@@ -717,19 +749,226 @@ Status RowIdStorageReader::read_batch_external_row(const
PRequestBlockDesc& requ
BackendOptions::get_localhost(), print_id(query_id),
file_id);
}
- auto& external_info = file_mapping->get_external_file_info();
- auto& scan_range_desc = external_info.scan_range_desc;
+ const auto& external_info = file_mapping->get_external_file_info();
+ const auto& scan_range_desc = external_info.scan_range_desc;
+
+ auto scan_range_hash = hash_file_range(scan_range_desc);
+ if (scan_rows.contains(scan_range_hash)) {
+
scan_rows.at(scan_range_hash).emplace(request_block_desc.row_id(j), j);
+ } else {
+ map<segment_v2::rowid_t, size_t> tmp
{{request_block_desc.row_id(j), j}};
+ scan_rows.emplace(scan_range_hash, tmp);
+ }
+ }
+
+ scan_blocks.resize(scan_rows.size());
+ row_id_block_idx.resize(request_block_desc.row_id_size());
+ init_reader_ms.resize(scan_rows.size(), 0);
+ get_block_ms.resize(scan_rows.size(), 0);
+ file_read_bytes.resize(scan_rows.size());
+ file_read_times.resize(scan_rows.size());
+
+ // Get the workload group for subsequent scan task submission.
+ std::vector<uint64_t> workload_group_ids;
+ workload_group_ids.emplace_back(workload_group_id);
+ auto wg =
ExecEnv::GetInstance()->workload_group_mgr()->get_group(workload_group_ids);
+ doris::pipeline::TaskScheduler* exec_sched = nullptr;
+ vectorized::SimplifiedScanScheduler* scan_sched = nullptr;
+ vectorized::SimplifiedScanScheduler* remote_scan_sched = nullptr;
+ wg->get_query_scheduler(&exec_sched, &scan_sched, &remote_scan_sched);
+ DCHECK(remote_scan_sched);
+
+ int64_t scan_running_time = 0;
+ RETURN_IF_ERROR(scope_timer_run(
+ [&]() -> Status {
+ // Make sure to insert data into result_block only after all
scan tasks have been executed.
+ std::atomic<int> producer_count {0};
+ std::condition_variable cv;
+ std::mutex mtx;
+
+ //semaphore: Limit the number of scan tasks submitted at one
time
+ std::counting_semaphore semaphore {max_file_scanners};
+
+ size_t idx = 0;
+ for (const auto& [_, row_ids] : scan_rows) {
+ semaphore.acquire();
+ RETURN_IF_ERROR(
+
remote_scan_sched->submit_scan_task(vectorized::SimplifiedScanTask(
+ [&, row_ids, idx]() {
+ SCOPED_ATTACH_TASK(
+ ExecEnv::GetInstance()
+
->rowid_storage_reader_tracker());
+ signal::set_signal_task_id(query_id);
+
+ scan_blocks[idx] =
vectorized::Block(slots, row_ids.size());
+
+ size_t j = 0;
+ list<int64_t> read_ids;
+ //Generate an ordered list with the
help of the orderliness of the map.
+ for (const auto& [row_id,
result_block_idx] : row_ids) {
+ read_ids.emplace_back(row_id);
+ row_id_block_idx[result_block_idx]
= make_pair(idx, j);
+ j++;
+ }
+
+ auto file_id =
+
request_block_desc.file_id(row_ids.begin()->second);
+ auto file_mapping =
id_file_map->get_file_mapping(file_id);
+ if (!file_mapping) {
Review Comment:
the check all ready do in line 747, no need do it again. we should reuse the
work in line 747. like keep the shared_ptr in a struct
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]