This is an automated email from the ASF dual-hosted git repository.
dataroaring pushed a commit to branch branch-3.0
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-3.0 by this push:
new 8c486d01065 branch-3.0: [fix](cloud) add log for warm up task #52946
(#53959)
8c486d01065 is described below
commit 8c486d010656cb1ad368afbbf0c7c981f87a02f4
Author: deardeng <[email protected]>
AuthorDate: Tue Jul 29 23:01:27 2025 +0800
branch-3.0: [fix](cloud) add log for warm up task #52946 (#53959)
cherry pick from #52946
---
be/src/cloud/cloud_backend_service.cpp | 29 ++++++++++++++++++++
be/src/cloud/cloud_internal_service.cpp | 3 +++
be/src/io/cache/block_file_cache_downloader.cpp | 35 ++++++++++++++++++++++---
3 files changed, 64 insertions(+), 3 deletions(-)
diff --git a/be/src/cloud/cloud_backend_service.cpp
b/be/src/cloud/cloud_backend_service.cpp
index a50d0e36419..06617bcbbac 100644
--- a/be/src/cloud/cloud_backend_service.cpp
+++ b/be/src/cloud/cloud_backend_service.cpp
@@ -167,6 +167,16 @@ void
CloudBackendService::warm_up_tablets(TWarmUpTabletsResponse& response,
void CloudBackendService::warm_up_cache_async(TWarmUpCacheAsyncResponse&
response,
const TWarmUpCacheAsyncRequest&
request) {
+ std::ostringstream oss;
+ oss << "[";
+ for (size_t i = 0; i < request.tablet_ids.size() && i < 10; ++i) {
+ if (i > 0) oss << ",";
+ oss << request.tablet_ids[i];
+ }
+ oss << "]";
+ LOG(INFO) << "warm_up_cache_async: enter, request=" << request.host << ":"
<< request.brpc_port
+ << ", tablets num=" << request.tablet_ids.size() << ",
tablet_ids=" << oss.str();
+
std::string host = request.host;
auto dns_cache = ExecEnv::GetInstance()->dns_cache();
if (dns_cache == nullptr) {
@@ -186,6 +196,7 @@ void
CloudBackendService::warm_up_cache_async(TWarmUpCacheAsyncResponse& respons
_exec_env->brpc_internal_client_cache()->get_new_client_no_cache(brpc_addr);
if (!brpc_stub) {
st = Status::RpcError("Address {} is wrong", brpc_addr);
+ LOG(WARNING) << "warm_up_cache_async: failed to get brpc_stub for addr
" << brpc_addr;
return;
}
brpc::Controller cntl;
@@ -193,7 +204,10 @@ void
CloudBackendService::warm_up_cache_async(TWarmUpCacheAsyncResponse& respons
std::for_each(request.tablet_ids.cbegin(), request.tablet_ids.cend(),
[&](int64_t tablet_id) {
brpc_request.add_tablet_ids(tablet_id); });
PGetFileCacheMetaResponse brpc_response;
+
brpc_stub->get_file_cache_meta_by_tablet_id(&cntl, &brpc_request,
&brpc_response, nullptr);
+ VLOG_DEBUG << "warm_up_cache_async: request=" << brpc_request.DebugString()
+ << ", response=" << brpc_response.DebugString();
if (!cntl.Failed()) {
g_file_cache_warm_up_cache_async_submitted_segment_num
<< brpc_response.file_cache_block_metas().size();
@@ -201,6 +215,8 @@ void
CloudBackendService::warm_up_cache_async(TWarmUpCacheAsyncResponse& respons
std::move(*brpc_response.mutable_file_cache_block_metas()));
} else {
st = Status::RpcError("{} isn't connected", brpc_addr);
+ LOG(WARNING) << "warm_up_cache_async: brpc call failed, addr=" <<
brpc_addr
+ << ", error=" << cntl.ErrorText();
}
st.to_thrift(&t_status);
response.status = t_status;
@@ -208,6 +224,15 @@ void
CloudBackendService::warm_up_cache_async(TWarmUpCacheAsyncResponse& respons
void
CloudBackendService::check_warm_up_cache_async(TCheckWarmUpCacheAsyncResponse&
response,
const
TCheckWarmUpCacheAsyncRequest& request) {
+ std::ostringstream oss;
+ oss << "[";
+ for (size_t i = 0; i < request.tablets.size() && i < 10; ++i) {
+ if (i > 0) oss << ",";
+ oss << request.tablets[i];
+ }
+ oss << "]";
+ LOG(INFO) << "check_warm_up_cache_async: enter, request tablets num=" <<
request.tablets.size()
+ << ", tablet_ids=" << oss.str();
std::map<int64_t, bool> task_done;
_engine.file_cache_block_downloader().check_download_task(request.tablets,
&task_done);
DBUG_EXECUTE_IF("CloudBackendService.check_warm_up_cache_async.return_task_false",
{
@@ -217,6 +242,10 @@ void
CloudBackendService::check_warm_up_cache_async(TCheckWarmUpCacheAsyncRespon
});
response.__set_task_done(task_done);
+ for (const auto& [tablet_id, done] : task_done) {
+ VLOG_DEBUG << "check_warm_up_cache_async: tablet_id=" << tablet_id <<
", done=" << done;
+ }
+
Status st = Status::OK();
TStatus t_status;
st.to_thrift(&t_status);
diff --git a/be/src/cloud/cloud_internal_service.cpp
b/be/src/cloud/cloud_internal_service.cpp
index 72267252aa6..02dab747c33 100644
--- a/be/src/cloud/cloud_internal_service.cpp
+++ b/be/src/cloud/cloud_internal_service.cpp
@@ -73,6 +73,7 @@ void
CloudInternalServiceImpl::get_file_cache_meta_by_tablet_id(
LOG_WARNING("try to access tablet file cache meta, but file cache not
enabled");
return;
}
+ LOG(INFO) << "warm up get meta from this be, tablets num=" <<
request->tablet_ids().size();
for (const auto& tablet_id : request->tablet_ids()) {
auto res = _engine.tablet_mgr().get_tablet(tablet_id);
if (!res.has_value()) {
@@ -105,6 +106,8 @@ void
CloudInternalServiceImpl::get_file_cache_meta_by_tablet_id(
}
});
}
+ VLOG_DEBUG << "warm up get meta request=" << request->DebugString()
+ << ", response=" << response->DebugString();
}
bvar::Adder<uint64_t> g_file_cache_event_driven_warm_up_submitted_segment_num(
diff --git a/be/src/io/cache/block_file_cache_downloader.cpp
b/be/src/io/cache/block_file_cache_downloader.cpp
index 1732197e5b4..96f507816a6 100644
--- a/be/src/io/cache/block_file_cache_downloader.cpp
+++ b/be/src/io/cache/block_file_cache_downloader.cpp
@@ -45,6 +45,7 @@ bvar::Adder<uint64_t>
g_file_cache_download_finished_size("file_cache_download_f
bvar::Adder<uint64_t>
g_file_cache_download_submitted_num("file_cache_download_submitted_num");
bvar::Adder<uint64_t>
g_file_cache_download_finished_num("file_cache_download_finished_num");
bvar::Adder<uint64_t>
g_file_cache_download_failed_num("file_cache_download_failed_num");
+bvar::Adder<uint64_t>
block_file_cache_downloader_task_total("file_cache_downloader_queue_total");
FileCacheBlockDownloader::FileCacheBlockDownloader(CloudStorageEngine& engine)
: _engine(engine) {
_poller = std::thread(&FileCacheBlockDownloader::polling_download_task,
this);
@@ -81,6 +82,8 @@ void
FileCacheBlockDownloader::submit_download_task(DownloadTask task) {
std::lock_guard lock(_inflight_mtx);
for (auto& meta : std::get<0>(task.task_message)) {
++_inflight_tablets[meta.tablet_id()];
+ LOG(INFO) << "submit_download_task: inflight_tablets[" <<
meta.tablet_id()
+ << "] = " << _inflight_tablets[meta.tablet_id()];
if (meta.size() > 0) {
g_file_cache_download_submitted_size << meta.size();
}
@@ -103,9 +106,14 @@ void
FileCacheBlockDownloader::submit_download_task(DownloadTask task) {
}
g_file_cache_download_failed_num << 1;
}
+ LOG(INFO) << "submit_download_task: task queue full, pop front";
_task_queue.pop_front(); // Eliminate the earliest task in the
queue
+ block_file_cache_downloader_task_total << -1;
}
+ VLOG_DEBUG << "submit_download_task: push task, queue size before
push: "
+ << _task_queue.size();
_task_queue.push_back(std::move(task));
+ block_file_cache_downloader_task_total << 1;
_empty.notify_all();
}
g_file_cache_download_submitted_num << 1;
@@ -119,16 +127,21 @@ void FileCacheBlockDownloader::polling_download_task() {
std::unique_lock lock(_mtx);
_empty.wait(lock, [this]() { return !_task_queue.empty() ||
_closed; });
if (_closed) {
+ LOG(INFO) << "polling_download_task: downloader closed, exit
polling";
break;
}
task = std::move(_task_queue.front());
_task_queue.pop_front();
+ block_file_cache_downloader_task_total << -1;
+ VLOG_DEBUG << "polling_download_task: pop task, queue size after
pop: "
+ << _task_queue.size();
}
if
(std::chrono::duration_cast<std::chrono::seconds>(std::chrono::steady_clock::now()
-
task.atime)
.count() < hot_interval) {
+ VLOG_DEBUG << "polling_download_task: submit download_blocks to
thread pool";
auto st = _workers->submit_func(
[this, task_ = std::move(task)]() mutable {
download_blocks(task_); });
if (!st.ok()) {
@@ -159,6 +172,9 @@ std::unordered_map<std::string, RowsetMetaSharedPtr>
snapshot_rs_metas(BaseTable
void FileCacheBlockDownloader::download_file_cache_block(
const DownloadTask::FileCacheBlockMetaVec& metas) {
std::ranges::for_each(metas, [&](const FileCacheBlockMeta& meta) {
+ VLOG_DEBUG << "download_file_cache_block: start, tablet_id=" <<
meta.tablet_id()
+ << ", rowset_id=" << meta.rowset_id() << ", segment_id=" <<
meta.segment_id()
+ << ", offset=" << meta.offset() << ", size=" << meta.size();
CloudTabletSPtr tablet;
if (auto res = _engine.tablet_mgr().get_tablet(meta.tablet_id(),
false); !res.has_value()) {
LOG(INFO) << "failed to find tablet " << meta.tablet_id() << " : "
<< res.error();
@@ -170,6 +186,8 @@ void FileCacheBlockDownloader::download_file_cache_block(
auto id_to_rowset_meta_map = snapshot_rs_metas(tablet.get());
auto find_it = id_to_rowset_meta_map.find(meta.rowset_id());
if (find_it == id_to_rowset_meta_map.end()) {
+ LOG(WARNING) << "download_file_cache_block: tablet_id=" <<
meta.tablet_id()
+ << "rowset_id not found, rowset_id=" <<
meta.rowset_id();
return;
}
@@ -179,7 +197,7 @@ void FileCacheBlockDownloader::download_file_cache_block(
return;
}
- auto download_done = [&, tablet_id = meta.tablet_id()](Status) {
+ auto download_done = [&, tablet_id = meta.tablet_id()](Status st) {
std::lock_guard lock(_inflight_mtx);
auto it = _inflight_tablets.find(tablet_id);
TEST_SYNC_POINT_CALLBACK("FileCacheBlockDownloader::download_file_cache_block");
@@ -187,11 +205,17 @@ void FileCacheBlockDownloader::download_file_cache_block(
LOG(WARNING) << "inflight ref cnt not exist, tablet id " <<
tablet_id;
} else {
it->second--;
+ VLOG_DEBUG << "download_file_cache_block: inflight_tablets["
<< tablet_id
+ << "] = " << it->second;
if (it->second <= 0) {
DCHECK_EQ(it->second, 0) << it->first;
_inflight_tablets.erase(it);
+ VLOG_DEBUG << "download_file_cache_block: erase
inflight_tablets[" << tablet_id
+ << "]";
}
}
+ LOG(INFO) << "download_file_cache_block: download_done,
tablet_Id=" << tablet_id
+ << "status=" << st.to_string();
};
DownloadFileMeta download_meta {
@@ -215,6 +239,8 @@ void FileCacheBlockDownloader::download_file_cache_block(
}
void FileCacheBlockDownloader::download_segment_file(const DownloadFileMeta&
meta) {
+ LOG(INFO) << "download_segment_file: start, path=" << meta.path << ",
offset=" << meta.offset
+ << ", download_size=" << meta.download_size << ", file_size=" <<
meta.file_size;
FileReaderSPtr file_reader;
FileReaderOptions opts {
.cache_type = FileCachePolicy::FILE_BLOCK_CACHE,
@@ -224,7 +250,7 @@ void FileCacheBlockDownloader::download_segment_file(const
DownloadFileMeta& met
};
auto st = meta.file_system->open_file(meta.path, &file_reader, &opts);
if (!st.ok()) {
- LOG(WARNING) << "failed to download file: " << st;
+ LOG(WARNING) << "failed to download file path=" << meta.path << ",
st=" << st;
if (meta.download_done) {
meta.download_done(std::move(st));
}
@@ -244,13 +270,15 @@ void
FileCacheBlockDownloader::download_segment_file(const DownloadFileMeta& met
size_t size =
std::min(one_single_task_size,
static_cast<size_t>(meta.download_size - offset));
size_t bytes_read;
+ VLOG_DEBUG << "download_segment_file, path=" << meta.path << ",
read_at offset=" << offset
+ << ", size=" << size;
// TODO(plat1ko):
// 1. Directly append buffer data to file cache
// 2. Provide `FileReader::async_read()` interface
DCHECK(meta.ctx.is_dryrun ==
config::enable_reader_dryrun_when_download_file_cache);
auto st = file_reader->read_at(offset, {buffer.get(), size},
&bytes_read, &meta.ctx);
if (!st.ok()) {
- LOG(WARNING) << "failed to download file: " << st;
+ LOG(WARNING) << "failed to download file path=" << meta.path << ",
st=" << st;
if (meta.download_done) {
meta.download_done(std::move(st));
}
@@ -261,6 +289,7 @@ void FileCacheBlockDownloader::download_segment_file(const
DownloadFileMeta& met
}
if (meta.download_done) {
+ LOG(INFO) << "download_segment_file: download finished, path=" <<
meta.path;
meta.download_done(Status::OK());
}
g_file_cache_download_finished_num << 1;
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]