HappenLee commented on code in PR #52114:
URL: https://github.com/apache/doris/pull/52114#discussion_r2188904669
##########
be/src/exec/rowid_fetcher.cpp:
##########
@@ -717,19 +749,226 @@ Status RowIdStorageReader::read_batch_external_row(const
PRequestBlockDesc& requ
BackendOptions::get_localhost(), print_id(query_id),
file_id);
}
- auto& external_info = file_mapping->get_external_file_info();
- auto& scan_range_desc = external_info.scan_range_desc;
+ const auto& external_info = file_mapping->get_external_file_info();
+ const auto& scan_range_desc = external_info.scan_range_desc;
+
+ auto scan_range_hash = hash_file_range(scan_range_desc);
+ if (scan_rows.contains(scan_range_hash)) {
+
scan_rows.at(scan_range_hash).emplace(request_block_desc.row_id(j), j);
+ } else {
+ map<segment_v2::rowid_t, size_t> tmp
{{request_block_desc.row_id(j), j}};
+ scan_rows.emplace(scan_range_hash, tmp);
+ }
+ }
+
+ scan_blocks.resize(scan_rows.size());
+ row_id_block_idx.resize(request_block_desc.row_id_size());
+ init_reader_ms.resize(scan_rows.size(), 0);
+ get_block_ms.resize(scan_rows.size(), 0);
+ file_read_bytes.resize(scan_rows.size());
+ file_read_times.resize(scan_rows.size());
+
+ // Get the workload group for subsequent scan task submission.
+ std::vector<uint64_t> workload_group_ids;
+ workload_group_ids.emplace_back(workload_group_id);
+ auto wg =
ExecEnv::GetInstance()->workload_group_mgr()->get_group(workload_group_ids);
+ doris::pipeline::TaskScheduler* exec_sched = nullptr;
+ vectorized::SimplifiedScanScheduler* scan_sched = nullptr;
+ vectorized::SimplifiedScanScheduler* remote_scan_sched = nullptr;
+ wg->get_query_scheduler(&exec_sched, &scan_sched, &remote_scan_sched);
+ DCHECK(remote_scan_sched);
+
+ int64_t scan_running_time = 0;
+ RETURN_IF_ERROR(scope_timer_run(
+ [&]() -> Status {
+ // Make sure to insert data into result_block only after all
scan tasks have been executed.
+ std::atomic<int> producer_count {0};
+ std::condition_variable cv;
+ std::mutex mtx;
+
+ //semaphore: Limit the number of scan tasks submitted at one
time
+ std::counting_semaphore semaphore {max_file_scanners};
+
+ size_t idx = 0;
+ for (const auto& [_, row_ids] : scan_rows) {
+ semaphore.acquire();
+ RETURN_IF_ERROR(
+
remote_scan_sched->submit_scan_task(vectorized::SimplifiedScanTask(
+ [&, row_ids, idx]() {
+ SCOPED_ATTACH_TASK(
+ ExecEnv::GetInstance()
+
->rowid_storage_reader_tracker());
+ signal::set_signal_task_id(query_id);
+
+ scan_blocks[idx] =
vectorized::Block(slots, row_ids.size());
+
+ size_t j = 0;
+ list<int64_t> read_ids;
Review Comment:
why use list
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]