This is an automated email from the ASF dual-hosted git repository.

dataroaring pushed a commit to branch branch-3.0
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/branch-3.0 by this push:
     new 8c486d01065 branch-3.0: [fix](cloud) add log for warm up task  #52946 
(#53959)
8c486d01065 is described below

commit 8c486d010656cb1ad368afbbf0c7c981f87a02f4
Author: deardeng <[email protected]>
AuthorDate: Tue Jul 29 23:01:27 2025 +0800

    branch-3.0: [fix](cloud) add log for warm up task  #52946 (#53959)
    
    cherry pick from #52946
---
 be/src/cloud/cloud_backend_service.cpp          | 29 ++++++++++++++++++++
 be/src/cloud/cloud_internal_service.cpp         |  3 +++
 be/src/io/cache/block_file_cache_downloader.cpp | 35 ++++++++++++++++++++++---
 3 files changed, 64 insertions(+), 3 deletions(-)

diff --git a/be/src/cloud/cloud_backend_service.cpp 
b/be/src/cloud/cloud_backend_service.cpp
index a50d0e36419..06617bcbbac 100644
--- a/be/src/cloud/cloud_backend_service.cpp
+++ b/be/src/cloud/cloud_backend_service.cpp
@@ -167,6 +167,16 @@ void 
CloudBackendService::warm_up_tablets(TWarmUpTabletsResponse& response,
 
 void CloudBackendService::warm_up_cache_async(TWarmUpCacheAsyncResponse& 
response,
                                               const TWarmUpCacheAsyncRequest& 
request) {
+    std::ostringstream oss;
+    oss << "[";
+    for (size_t i = 0; i < request.tablet_ids.size() && i < 10; ++i) {
+        if (i > 0) oss << ",";
+        oss << request.tablet_ids[i];
+    }
+    oss << "]";
+    LOG(INFO) << "warm_up_cache_async: enter, request=" << request.host << ":" 
<< request.brpc_port
+              << ", tablets num=" << request.tablet_ids.size() << ", 
tablet_ids=" << oss.str();
+
     std::string host = request.host;
     auto dns_cache = ExecEnv::GetInstance()->dns_cache();
     if (dns_cache == nullptr) {
@@ -186,6 +196,7 @@ void 
CloudBackendService::warm_up_cache_async(TWarmUpCacheAsyncResponse& respons
             
_exec_env->brpc_internal_client_cache()->get_new_client_no_cache(brpc_addr);
     if (!brpc_stub) {
         st = Status::RpcError("Address {} is wrong", brpc_addr);
+        LOG(WARNING) << "warm_up_cache_async: failed to get brpc_stub for addr 
" << brpc_addr;
         return;
     }
     brpc::Controller cntl;
@@ -193,7 +204,10 @@ void 
CloudBackendService::warm_up_cache_async(TWarmUpCacheAsyncResponse& respons
     std::for_each(request.tablet_ids.cbegin(), request.tablet_ids.cend(),
                   [&](int64_t tablet_id) { 
brpc_request.add_tablet_ids(tablet_id); });
     PGetFileCacheMetaResponse brpc_response;
+
     brpc_stub->get_file_cache_meta_by_tablet_id(&cntl, &brpc_request, 
&brpc_response, nullptr);
+    VLOG_DEBUG << "warm_up_cache_async: request=" << brpc_request.DebugString()
+               << ", response=" << brpc_response.DebugString();
     if (!cntl.Failed()) {
         g_file_cache_warm_up_cache_async_submitted_segment_num
                 << brpc_response.file_cache_block_metas().size();
@@ -201,6 +215,8 @@ void 
CloudBackendService::warm_up_cache_async(TWarmUpCacheAsyncResponse& respons
                 std::move(*brpc_response.mutable_file_cache_block_metas()));
     } else {
         st = Status::RpcError("{} isn't connected", brpc_addr);
+        LOG(WARNING) << "warm_up_cache_async: brpc call failed, addr=" << 
brpc_addr
+                     << ", error=" << cntl.ErrorText();
     }
     st.to_thrift(&t_status);
     response.status = t_status;
@@ -208,6 +224,15 @@ void 
CloudBackendService::warm_up_cache_async(TWarmUpCacheAsyncResponse& respons
 
 void 
CloudBackendService::check_warm_up_cache_async(TCheckWarmUpCacheAsyncResponse& 
response,
                                                     const 
TCheckWarmUpCacheAsyncRequest& request) {
+    std::ostringstream oss;
+    oss << "[";
+    for (size_t i = 0; i < request.tablets.size() && i < 10; ++i) {
+        if (i > 0) oss << ",";
+        oss << request.tablets[i];
+    }
+    oss << "]";
+    LOG(INFO) << "check_warm_up_cache_async: enter, request tablets num=" << 
request.tablets.size()
+              << ", tablet_ids=" << oss.str();
     std::map<int64_t, bool> task_done;
     _engine.file_cache_block_downloader().check_download_task(request.tablets, 
&task_done);
     
DBUG_EXECUTE_IF("CloudBackendService.check_warm_up_cache_async.return_task_false",
 {
@@ -217,6 +242,10 @@ void 
CloudBackendService::check_warm_up_cache_async(TCheckWarmUpCacheAsyncRespon
     });
     response.__set_task_done(task_done);
 
+    for (const auto& [tablet_id, done] : task_done) {
+        VLOG_DEBUG << "check_warm_up_cache_async: tablet_id=" << tablet_id << 
", done=" << done;
+    }
+
     Status st = Status::OK();
     TStatus t_status;
     st.to_thrift(&t_status);
diff --git a/be/src/cloud/cloud_internal_service.cpp 
b/be/src/cloud/cloud_internal_service.cpp
index 72267252aa6..02dab747c33 100644
--- a/be/src/cloud/cloud_internal_service.cpp
+++ b/be/src/cloud/cloud_internal_service.cpp
@@ -73,6 +73,7 @@ void 
CloudInternalServiceImpl::get_file_cache_meta_by_tablet_id(
         LOG_WARNING("try to access tablet file cache meta, but file cache not 
enabled");
         return;
     }
+    LOG(INFO) << "warm up get meta from this be, tablets num=" << 
request->tablet_ids().size();
     for (const auto& tablet_id : request->tablet_ids()) {
         auto res = _engine.tablet_mgr().get_tablet(tablet_id);
         if (!res.has_value()) {
@@ -105,6 +106,8 @@ void 
CloudInternalServiceImpl::get_file_cache_meta_by_tablet_id(
             }
         });
     }
+    VLOG_DEBUG << "warm up get meta request=" << request->DebugString()
+               << ", response=" << response->DebugString();
 }
 
 bvar::Adder<uint64_t> g_file_cache_event_driven_warm_up_submitted_segment_num(
diff --git a/be/src/io/cache/block_file_cache_downloader.cpp 
b/be/src/io/cache/block_file_cache_downloader.cpp
index 1732197e5b4..96f507816a6 100644
--- a/be/src/io/cache/block_file_cache_downloader.cpp
+++ b/be/src/io/cache/block_file_cache_downloader.cpp
@@ -45,6 +45,7 @@ bvar::Adder<uint64_t> 
g_file_cache_download_finished_size("file_cache_download_f
 bvar::Adder<uint64_t> 
g_file_cache_download_submitted_num("file_cache_download_submitted_num");
 bvar::Adder<uint64_t> 
g_file_cache_download_finished_num("file_cache_download_finished_num");
 bvar::Adder<uint64_t> 
g_file_cache_download_failed_num("file_cache_download_failed_num");
+bvar::Adder<uint64_t> 
block_file_cache_downloader_task_total("file_cache_downloader_queue_total");
 
 FileCacheBlockDownloader::FileCacheBlockDownloader(CloudStorageEngine& engine) 
: _engine(engine) {
     _poller = std::thread(&FileCacheBlockDownloader::polling_download_task, 
this);
@@ -81,6 +82,8 @@ void 
FileCacheBlockDownloader::submit_download_task(DownloadTask task) {
         std::lock_guard lock(_inflight_mtx);
         for (auto& meta : std::get<0>(task.task_message)) {
             ++_inflight_tablets[meta.tablet_id()];
+            LOG(INFO) << "submit_download_task: inflight_tablets[" << 
meta.tablet_id()
+                      << "] = " << _inflight_tablets[meta.tablet_id()];
             if (meta.size() > 0) {
                 g_file_cache_download_submitted_size << meta.size();
             }
@@ -103,9 +106,14 @@ void 
FileCacheBlockDownloader::submit_download_task(DownloadTask task) {
                 }
                 g_file_cache_download_failed_num << 1;
             }
+            LOG(INFO) << "submit_download_task: task queue full, pop front";
             _task_queue.pop_front(); // Eliminate the earliest task in the 
queue
+            block_file_cache_downloader_task_total << -1;
         }
+        VLOG_DEBUG << "submit_download_task: push task, queue size before 
push: "
+                   << _task_queue.size();
         _task_queue.push_back(std::move(task));
+        block_file_cache_downloader_task_total << 1;
         _empty.notify_all();
     }
     g_file_cache_download_submitted_num << 1;
@@ -119,16 +127,21 @@ void FileCacheBlockDownloader::polling_download_task() {
             std::unique_lock lock(_mtx);
             _empty.wait(lock, [this]() { return !_task_queue.empty() || 
_closed; });
             if (_closed) {
+                LOG(INFO) << "polling_download_task: downloader closed, exit 
polling";
                 break;
             }
 
             task = std::move(_task_queue.front());
             _task_queue.pop_front();
+            block_file_cache_downloader_task_total << -1;
+            VLOG_DEBUG << "polling_download_task: pop task, queue size after 
pop: "
+                       << _task_queue.size();
         }
 
         if 
(std::chrono::duration_cast<std::chrono::seconds>(std::chrono::steady_clock::now()
 -
                                                              task.atime)
                     .count() < hot_interval) {
+            VLOG_DEBUG << "polling_download_task: submit download_blocks to 
thread pool";
             auto st = _workers->submit_func(
                     [this, task_ = std::move(task)]() mutable { 
download_blocks(task_); });
             if (!st.ok()) {
@@ -159,6 +172,9 @@ std::unordered_map<std::string, RowsetMetaSharedPtr> 
snapshot_rs_metas(BaseTable
 void FileCacheBlockDownloader::download_file_cache_block(
         const DownloadTask::FileCacheBlockMetaVec& metas) {
     std::ranges::for_each(metas, [&](const FileCacheBlockMeta& meta) {
+        VLOG_DEBUG << "download_file_cache_block: start, tablet_id=" << 
meta.tablet_id()
+                   << ", rowset_id=" << meta.rowset_id() << ", segment_id=" << 
meta.segment_id()
+                   << ", offset=" << meta.offset() << ", size=" << meta.size();
         CloudTabletSPtr tablet;
         if (auto res = _engine.tablet_mgr().get_tablet(meta.tablet_id(), 
false); !res.has_value()) {
             LOG(INFO) << "failed to find tablet " << meta.tablet_id() << " : " 
<< res.error();
@@ -170,6 +186,8 @@ void FileCacheBlockDownloader::download_file_cache_block(
         auto id_to_rowset_meta_map = snapshot_rs_metas(tablet.get());
         auto find_it = id_to_rowset_meta_map.find(meta.rowset_id());
         if (find_it == id_to_rowset_meta_map.end()) {
+            LOG(WARNING) << "download_file_cache_block: tablet_id=" << 
meta.tablet_id()
+                         << "rowset_id not found, rowset_id=" << 
meta.rowset_id();
             return;
         }
 
@@ -179,7 +197,7 @@ void FileCacheBlockDownloader::download_file_cache_block(
             return;
         }
 
-        auto download_done = [&, tablet_id = meta.tablet_id()](Status) {
+        auto download_done = [&, tablet_id = meta.tablet_id()](Status st) {
             std::lock_guard lock(_inflight_mtx);
             auto it = _inflight_tablets.find(tablet_id);
             
TEST_SYNC_POINT_CALLBACK("FileCacheBlockDownloader::download_file_cache_block");
@@ -187,11 +205,17 @@ void FileCacheBlockDownloader::download_file_cache_block(
                 LOG(WARNING) << "inflight ref cnt not exist, tablet id " << 
tablet_id;
             } else {
                 it->second--;
+                VLOG_DEBUG << "download_file_cache_block: inflight_tablets[" 
<< tablet_id
+                           << "] = " << it->second;
                 if (it->second <= 0) {
                     DCHECK_EQ(it->second, 0) << it->first;
                     _inflight_tablets.erase(it);
+                    VLOG_DEBUG << "download_file_cache_block: erase 
inflight_tablets[" << tablet_id
+                               << "]";
                 }
             }
+            LOG(INFO) << "download_file_cache_block: download_done, 
tablet_Id=" << tablet_id
+                      << "status=" << st.to_string();
         };
 
         DownloadFileMeta download_meta {
@@ -215,6 +239,8 @@ void FileCacheBlockDownloader::download_file_cache_block(
 }
 
 void FileCacheBlockDownloader::download_segment_file(const DownloadFileMeta& 
meta) {
+    LOG(INFO) << "download_segment_file: start, path=" << meta.path << ", 
offset=" << meta.offset
+              << ", download_size=" << meta.download_size << ", file_size=" << 
meta.file_size;
     FileReaderSPtr file_reader;
     FileReaderOptions opts {
             .cache_type = FileCachePolicy::FILE_BLOCK_CACHE,
@@ -224,7 +250,7 @@ void FileCacheBlockDownloader::download_segment_file(const 
DownloadFileMeta& met
     };
     auto st = meta.file_system->open_file(meta.path, &file_reader, &opts);
     if (!st.ok()) {
-        LOG(WARNING) << "failed to download file: " << st;
+        LOG(WARNING) << "failed to download file path=" << meta.path << ", 
st=" << st;
         if (meta.download_done) {
             meta.download_done(std::move(st));
         }
@@ -244,13 +270,15 @@ void 
FileCacheBlockDownloader::download_segment_file(const DownloadFileMeta& met
         size_t size =
                 std::min(one_single_task_size, 
static_cast<size_t>(meta.download_size - offset));
         size_t bytes_read;
+        VLOG_DEBUG << "download_segment_file, path=" << meta.path << ", 
read_at offset=" << offset
+                   << ", size=" << size;
         // TODO(plat1ko):
         //  1. Directly append buffer data to file cache
         //  2. Provide `FileReader::async_read()` interface
         DCHECK(meta.ctx.is_dryrun == 
config::enable_reader_dryrun_when_download_file_cache);
         auto st = file_reader->read_at(offset, {buffer.get(), size}, 
&bytes_read, &meta.ctx);
         if (!st.ok()) {
-            LOG(WARNING) << "failed to download file: " << st;
+            LOG(WARNING) << "failed to download file path=" << meta.path << ", 
st=" << st;
             if (meta.download_done) {
                 meta.download_done(std::move(st));
             }
@@ -261,6 +289,7 @@ void FileCacheBlockDownloader::download_segment_file(const 
DownloadFileMeta& met
     }
 
     if (meta.download_done) {
+        LOG(INFO) << "download_segment_file: download finished, path=" << 
meta.path;
         meta.download_done(Status::OK());
     }
     g_file_cache_download_finished_num << 1;


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to