gavinchou commented on code in PR #57587:
URL: https://github.com/apache/doris/pull/57587#discussion_r2483582094
##########
be/src/cloud/cloud_internal_service.cpp:
##########
@@ -170,110 +170,151 @@ void
CloudInternalServiceImpl::get_file_cache_meta_by_tablet_id(
<< ", response=" << response->DebugString();
}
+namespace {
+// Helper functions for fetch_peer_data
+
+Status handle_peer_file_range_request(const std::string& path,
PFetchPeerDataResponse* response) {
+ // Read specific range [file_offset, file_offset+file_size) across cached
blocks
+ auto datas =
io::FileCacheFactory::instance()->get_cache_data_by_path(path);
+ for (auto& cb : datas) {
+ *(response->add_datas()) = std::move(cb);
+ }
+ return Status::OK();
+}
+
+void set_error_response(PFetchPeerDataResponse* response, const std::string&
error_msg) {
+ response->mutable_status()->add_error_msgs(error_msg);
+ response->mutable_status()->set_status_code(TStatusCode::INTERNAL_ERROR);
+}
+
+Status read_file_block(const std::shared_ptr<io::FileBlock>& file_block,
+ doris::CacheBlockPB* output) {
+ std::string data;
+ data.resize(file_block->range().size());
+
+ auto begin_read_file_ts =
std::chrono::duration_cast<std::chrono::microseconds>(
+
std::chrono::steady_clock::now().time_since_epoch())
+ .count();
+
+ SCOPED_ATTACH_TASK(ExecEnv::GetInstance()->s3_file_buffer_tracker());
+ Slice slice(data.data(), data.size());
+ Status read_st = file_block->read(slice, /*read_offset=*/0);
+
+ auto end_read_file_ts =
std::chrono::duration_cast<std::chrono::microseconds>(
+
std::chrono::steady_clock::now().time_since_epoch())
+ .count();
+ g_file_cache_get_by_peer_read_cache_file_latency << (end_read_file_ts -
begin_read_file_ts);
+
+ if (read_st.ok()) {
+ output->set_data(std::move(data));
+ return Status::OK();
+ } else {
+ g_file_cache_get_by_peer_failed_num << 1;
+ LOG(WARNING) << "read cache block failed: " << read_st;
+ return read_st;
+ }
+}
+
+Status handle_peer_file_cache_block_request(const PFetchPeerDataRequest*
request,
+ PFetchPeerDataResponse* response) {
+ const auto& path = request->path();
+ auto hash = io::BlockFileCache::hash(path);
+ auto* cache = io::FileCacheFactory::instance()->get_by_path(hash);
+ if (cache == nullptr) {
+ g_file_cache_get_by_peer_failed_num << 1;
+ set_error_response(response, "can't get file cache instance");
+ return Status::InternalError("can't get file cache instance");
+ }
+
+ io::CacheContext ctx {};
+ io::ReadStatistics local_stats;
+ ctx.stats = &local_stats;
+
+ for (const auto& cb_req : request->cache_req()) {
+ size_t offset = static_cast<size_t>(std::max<int64_t>(0,
cb_req.block_offset()));
+ size_t size = static_cast<size_t>(std::max<int64_t>(0,
cb_req.block_size()));
+ auto holder = cache->get_or_set(hash, offset, size, ctx);
+
+ for (auto& fb : holder.file_blocks) {
+ if (fb->state() != io::FileBlock::State::DOWNLOADED) {
+ g_file_cache_get_by_peer_failed_num << 1;
+ LOG(WARNING) << "read cache block failed, state=" <<
fb->state();
+ set_error_response(response, "read cache file error");
+ return Status::InternalError("cache block not downloaded");
+ }
+
+ g_file_cache_get_by_peer_blocks_num << 1;
+ doris::CacheBlockPB* out = response->add_datas();
+ out->set_block_offset(static_cast<int64_t>(fb->offset()));
+ out->set_block_size(static_cast<int64_t>(fb->range().size()));
+
+ Status read_status = read_file_block(fb, out);
+ if (!read_status.ok()) {
+ set_error_response(response, "read cache file error");
+ return read_status;
+ }
+ }
+ }
+
+ return Status::OK();
+}
+} // namespace
+
void
CloudInternalServiceImpl::fetch_peer_data(google::protobuf::RpcController*
controller
[[maybe_unused]],
const PFetchPeerDataRequest*
request,
PFetchPeerDataResponse*
response,
google::protobuf::Closure*
done) {
- // TODO(dx): use async thread pool to handle the request, not AsyncIO
- brpc::ClosureGuard closure_guard(done);
- g_file_cache_get_by_peer_num << 1;
- if (!config::enable_file_cache) {
- LOG_WARNING("try to access file cache data, but file cache not
enabled");
- return;
- }
- int64_t begin_ts = std::chrono::duration_cast<std::chrono::microseconds>(
-
std::chrono::steady_clock::now().time_since_epoch())
- .count();
- const auto type = request->type();
- const auto& path = request->path();
- response->mutable_status()->set_status_code(TStatusCode::OK);
- if (type == PFetchPeerDataRequest_Type_PEER_FILE_RANGE) {
- // Read specific range [file_offset, file_offset+file_size) across
cached blocks
- auto datas =
io::FileCacheFactory::instance()->get_cache_data_by_path(path);
- for (auto& cb : datas) {
- *(response->add_datas()) = std::move(cb);
- }
- } else if (type == PFetchPeerDataRequest_Type_PEER_FILE_CACHE_BLOCK) {
- // Multiple specific blocks
- auto hash = io::BlockFileCache::hash(path);
- auto* cache = io::FileCacheFactory::instance()->get_by_path(hash);
- if (cache == nullptr) {
- g_file_cache_get_by_peer_failed_num << 1;
- response->mutable_status()->add_error_msgs("can't get file cache
instance");
-
response->mutable_status()->set_status_code(TStatusCode::INTERNAL_ERROR);
+ bool ret = _heavy_work_pool.try_offer([request, response, done]() {
+ brpc::ClosureGuard closure_guard(done);
+ g_file_cache_get_by_peer_num << 1;
+
+ if (!config::enable_file_cache) {
+ LOG_WARNING("try to access file cache data, but file cache not
enabled");
return;
}
- io::CacheContext ctx {};
- // ensure a valid stats pointer is provided to cache layer
- io::ReadStatistics local_stats;
- ctx.stats = &local_stats;
- for (const auto& cb_req : request->cache_req()) {
- size_t offset = static_cast<size_t>(std::max<int64_t>(0,
cb_req.block_offset()));
- size_t size = static_cast<size_t>(std::max<int64_t>(0,
cb_req.block_size()));
- auto holder = cache->get_or_set(hash, offset, size, ctx);
- for (auto& fb : holder.file_blocks) {
- auto state = fb->state();
- if (state != io::FileBlock::State::DOWNLOADED) {
- g_file_cache_get_by_peer_failed_num << 1;
- LOG(WARNING) << "read cache block failed, state=" << state;
- response->mutable_status()->add_error_msgs("read cache
file error");
-
response->mutable_status()->set_status_code(TStatusCode::INTERNAL_ERROR);
- return;
- }
- g_file_cache_get_by_peer_blocks_num << 1;
- doris::CacheBlockPB* out = response->add_datas();
- out->set_block_offset(static_cast<int64_t>(fb->offset()));
- out->set_block_size(static_cast<int64_t>(fb->range().size()));
- std::string data;
- data.resize(fb->range().size());
- // Offload the file block read to a dedicated OS thread to
avoid bthread IO
- Status read_st = Status::OK();
- // due to file_reader.cpp:33] Check failed: bthread_self() == 0
- int64_t begin_read_file_ts =
- std::chrono::duration_cast<std::chrono::microseconds>(
-
std::chrono::steady_clock::now().time_since_epoch())
- .count();
- auto task = [&] {
- // Current thread not exist ThreadContext, usually after
the thread is started, using SCOPED_ATTACH_TASK macro to create a ThreadContext
and bind a Task.
-
SCOPED_ATTACH_TASK(ExecEnv::GetInstance()->s3_file_buffer_tracker());
- Slice slice(data.data(), data.size());
- read_st = fb->read(slice, /*read_offset=*/0);
- };
- AsyncIO::run_task(task, io::FileSystemType::LOCAL);
- int64_t end_read_file_ts =
- std::chrono::duration_cast<std::chrono::microseconds>(
+
+ auto begin_ts = std::chrono::duration_cast<std::chrono::microseconds>(
std::chrono::steady_clock::now().time_since_epoch())
.count();
- g_file_cache_get_by_peer_read_cache_file_latency
- << (end_read_file_ts - begin_read_file_ts);
- if (read_st.ok()) {
- out->set_data(std::move(data));
- } else {
- g_file_cache_get_by_peer_failed_num << 1;
- LOG(WARNING) << "read cache block failed: " << read_st;
- response->mutable_status()->add_error_msgs("read cache
file error");
-
response->mutable_status()->set_status_code(TStatusCode::INTERNAL_ERROR);
- return;
- }
- }
+
+ const auto type = request->type();
+ const auto& path = request->path();
+ response->mutable_status()->set_status_code(TStatusCode::OK);
+
+ Status status = Status::OK();
+ if (type == PFetchPeerDataRequest_Type_PEER_FILE_RANGE) {
+ status = handle_peer_file_range_request(path, response);
+ } else if (type == PFetchPeerDataRequest_Type_PEER_FILE_CACHE_BLOCK) {
+ status = handle_peer_file_cache_block_request(request, response);
Review Comment:
handle_peer_file_cache_block_request
and
handle_peer_file_range_request
can be a function
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]