This is an automated email from the ASF dual-hosted git repository.

hellostephen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new 77380ff1e2d [fix](cloud)Fix read from peer use thread pool not asyncio 
(#57587)
77380ff1e2d is described below

commit 77380ff1e2da51ae042fd3a965193a8f5ff73087
Author: deardeng <[email protected]>
AuthorDate: Sun Nov 2 19:33:08 2025 +0800

    [fix](cloud)Fix read from peer use thread pool not asyncio (#57587)
    
    Related PR:
    https://github.com/apache/doris/pull/56384 modify the
    `enable_cache_read_from_peer` configuration from false to true.
---
 be/src/cloud/cloud_internal_service.cpp            | 225 ++++++++++++---------
 be/src/cloud/config.cpp                            |   2 +-
 be/src/io/cache/block_file_cache_downloader.cpp    |   8 +
 .../main/java/org/apache/doris/common/Config.java  |   2 +-
 .../doris/cloud/catalog/CloudTabletRebalancer.java |   4 +-
 .../cloud_p0/balance/test_balance_metrics.groovy   |   2 +-
 .../balance/test_peer_read_async_warmup.groovy     |   1 +
 .../test_drop_cluster_clean_metrics.groovy         |   2 +-
 .../test_fe_tablet_same_backend.groovy             |   2 +-
 .../cloud_p0/multi_cluster/test_rebalance.groovy   |   6 +-
 10 files changed, 152 insertions(+), 102 deletions(-)

diff --git a/be/src/cloud/cloud_internal_service.cpp 
b/be/src/cloud/cloud_internal_service.cpp
index 94bb951b95d..957a3febc07 100644
--- a/be/src/cloud/cloud_internal_service.cpp
+++ b/be/src/cloud/cloud_internal_service.cpp
@@ -170,110 +170,151 @@ void 
CloudInternalServiceImpl::get_file_cache_meta_by_tablet_id(
                << ", response=" << response->DebugString();
 }
 
+namespace {
+// Helper functions for fetch_peer_data
+
+Status handle_peer_file_range_request(const std::string& path, 
PFetchPeerDataResponse* response) {
+    // Read specific range [file_offset, file_offset+file_size) across cached 
blocks
+    auto datas = 
io::FileCacheFactory::instance()->get_cache_data_by_path(path);
+    for (auto& cb : datas) {
+        *(response->add_datas()) = std::move(cb);
+    }
+    return Status::OK();
+}
+
+void set_error_response(PFetchPeerDataResponse* response, const std::string& 
error_msg) {
+    response->mutable_status()->add_error_msgs(error_msg);
+    response->mutable_status()->set_status_code(TStatusCode::INTERNAL_ERROR);
+}
+
+Status read_file_block(const std::shared_ptr<io::FileBlock>& file_block,
+                       doris::CacheBlockPB* output) {
+    std::string data;
+    data.resize(file_block->range().size());
+
+    auto begin_read_file_ts = 
std::chrono::duration_cast<std::chrono::microseconds>(
+                                      
std::chrono::steady_clock::now().time_since_epoch())
+                                      .count();
+
+    SCOPED_ATTACH_TASK(ExecEnv::GetInstance()->s3_file_buffer_tracker());
+    Slice slice(data.data(), data.size());
+    Status read_st = file_block->read(slice, /*read_offset=*/0);
+
+    auto end_read_file_ts = 
std::chrono::duration_cast<std::chrono::microseconds>(
+                                    
std::chrono::steady_clock::now().time_since_epoch())
+                                    .count();
+    g_file_cache_get_by_peer_read_cache_file_latency << (end_read_file_ts - 
begin_read_file_ts);
+
+    if (read_st.ok()) {
+        output->set_data(std::move(data));
+        return Status::OK();
+    } else {
+        g_file_cache_get_by_peer_failed_num << 1;
+        LOG(WARNING) << "read cache block failed: " << read_st;
+        return read_st;
+    }
+}
+
+Status handle_peer_file_cache_block_request(const PFetchPeerDataRequest* 
request,
+                                            PFetchPeerDataResponse* response) {
+    const auto& path = request->path();
+    auto hash = io::BlockFileCache::hash(path);
+    auto* cache = io::FileCacheFactory::instance()->get_by_path(hash);
+    if (cache == nullptr) {
+        g_file_cache_get_by_peer_failed_num << 1;
+        set_error_response(response, "can't get file cache instance");
+        return Status::InternalError("can't get file cache instance");
+    }
+
+    io::CacheContext ctx {};
+    io::ReadStatistics local_stats;
+    ctx.stats = &local_stats;
+
+    for (const auto& cb_req : request->cache_req()) {
+        size_t offset = static_cast<size_t>(std::max<int64_t>(0, 
cb_req.block_offset()));
+        size_t size = static_cast<size_t>(std::max<int64_t>(0, 
cb_req.block_size()));
+        auto holder = cache->get_or_set(hash, offset, size, ctx);
+
+        for (auto& fb : holder.file_blocks) {
+            if (fb->state() != io::FileBlock::State::DOWNLOADED) {
+                g_file_cache_get_by_peer_failed_num << 1;
+                LOG(WARNING) << "read cache block failed, state=" << 
fb->state();
+                set_error_response(response, "read cache file error");
+                return Status::InternalError("cache block not downloaded");
+            }
+
+            g_file_cache_get_by_peer_blocks_num << 1;
+            doris::CacheBlockPB* out = response->add_datas();
+            out->set_block_offset(static_cast<int64_t>(fb->offset()));
+            out->set_block_size(static_cast<int64_t>(fb->range().size()));
+
+            Status read_status = read_file_block(fb, out);
+            if (!read_status.ok()) {
+                set_error_response(response, "read cache file error");
+                return read_status;
+            }
+        }
+    }
+
+    return Status::OK();
+}
+} // namespace
+
 void 
CloudInternalServiceImpl::fetch_peer_data(google::protobuf::RpcController* 
controller
                                                [[maybe_unused]],
                                                const PFetchPeerDataRequest* 
request,
                                                PFetchPeerDataResponse* 
response,
                                                google::protobuf::Closure* 
done) {
-    // TODO(dx): use async thread pool to handle the request, not AsyncIO
-    brpc::ClosureGuard closure_guard(done);
-    g_file_cache_get_by_peer_num << 1;
-    if (!config::enable_file_cache) {
-        LOG_WARNING("try to access file cache data, but file cache not 
enabled");
-        return;
-    }
-    int64_t begin_ts = std::chrono::duration_cast<std::chrono::microseconds>(
-                               
std::chrono::steady_clock::now().time_since_epoch())
-                               .count();
-    const auto type = request->type();
-    const auto& path = request->path();
-    response->mutable_status()->set_status_code(TStatusCode::OK);
-    if (type == PFetchPeerDataRequest_Type_PEER_FILE_RANGE) {
-        // Read specific range [file_offset, file_offset+file_size) across 
cached blocks
-        auto datas = 
io::FileCacheFactory::instance()->get_cache_data_by_path(path);
-        for (auto& cb : datas) {
-            *(response->add_datas()) = std::move(cb);
-        }
-    } else if (type == PFetchPeerDataRequest_Type_PEER_FILE_CACHE_BLOCK) {
-        // Multiple specific blocks
-        auto hash = io::BlockFileCache::hash(path);
-        auto* cache = io::FileCacheFactory::instance()->get_by_path(hash);
-        if (cache == nullptr) {
-            g_file_cache_get_by_peer_failed_num << 1;
-            response->mutable_status()->add_error_msgs("can't get file cache 
instance");
-            
response->mutable_status()->set_status_code(TStatusCode::INTERNAL_ERROR);
+    bool ret = _heavy_work_pool.try_offer([request, response, done]() {
+        brpc::ClosureGuard closure_guard(done);
+        g_file_cache_get_by_peer_num << 1;
+
+        if (!config::enable_file_cache) {
+            LOG_WARNING("try to access file cache data, but file cache not 
enabled");
             return;
         }
-        io::CacheContext ctx {};
-        // ensure a valid stats pointer is provided to cache layer
-        io::ReadStatistics local_stats;
-        ctx.stats = &local_stats;
-        for (const auto& cb_req : request->cache_req()) {
-            size_t offset = static_cast<size_t>(std::max<int64_t>(0, 
cb_req.block_offset()));
-            size_t size = static_cast<size_t>(std::max<int64_t>(0, 
cb_req.block_size()));
-            auto holder = cache->get_or_set(hash, offset, size, ctx);
-            for (auto& fb : holder.file_blocks) {
-                auto state = fb->state();
-                if (state != io::FileBlock::State::DOWNLOADED) {
-                    g_file_cache_get_by_peer_failed_num << 1;
-                    LOG(WARNING) << "read cache block failed, state=" << state;
-                    response->mutable_status()->add_error_msgs("read cache 
file error");
-                    
response->mutable_status()->set_status_code(TStatusCode::INTERNAL_ERROR);
-                    return;
-                }
-                g_file_cache_get_by_peer_blocks_num << 1;
-                doris::CacheBlockPB* out = response->add_datas();
-                out->set_block_offset(static_cast<int64_t>(fb->offset()));
-                out->set_block_size(static_cast<int64_t>(fb->range().size()));
-                std::string data;
-                data.resize(fb->range().size());
-                // Offload the file block read to a dedicated OS thread to 
avoid bthread IO
-                Status read_st = Status::OK();
-                // due to file_reader.cpp:33] Check failed: bthread_self() == 0
-                int64_t begin_read_file_ts =
-                        std::chrono::duration_cast<std::chrono::microseconds>(
-                                
std::chrono::steady_clock::now().time_since_epoch())
-                                .count();
-                auto task = [&] {
-                    // Current thread not exist ThreadContext, usually after 
the thread is started, using SCOPED_ATTACH_TASK macro to create a ThreadContext 
and bind a Task.
-                    
SCOPED_ATTACH_TASK(ExecEnv::GetInstance()->s3_file_buffer_tracker());
-                    Slice slice(data.data(), data.size());
-                    read_st = fb->read(slice, /*read_offset=*/0);
-                };
-                AsyncIO::run_task(task, io::FileSystemType::LOCAL);
-                int64_t end_read_file_ts =
-                        std::chrono::duration_cast<std::chrono::microseconds>(
+
+        auto begin_ts = std::chrono::duration_cast<std::chrono::microseconds>(
                                 
std::chrono::steady_clock::now().time_since_epoch())
                                 .count();
-                g_file_cache_get_by_peer_read_cache_file_latency
-                        << (end_read_file_ts - begin_read_file_ts);
-                if (read_st.ok()) {
-                    out->set_data(std::move(data));
-                } else {
-                    g_file_cache_get_by_peer_failed_num << 1;
-                    LOG(WARNING) << "read cache block failed: " << read_st;
-                    response->mutable_status()->add_error_msgs("read cache 
file error");
-                    
response->mutable_status()->set_status_code(TStatusCode::INTERNAL_ERROR);
-                    return;
-                }
-            }
+
+        const auto type = request->type();
+        const auto& path = request->path();
+        response->mutable_status()->set_status_code(TStatusCode::OK);
+
+        Status status = Status::OK();
+        if (type == PFetchPeerDataRequest_Type_PEER_FILE_RANGE) {
+            status = handle_peer_file_range_request(path, response);
+        } else if (type == PFetchPeerDataRequest_Type_PEER_FILE_CACHE_BLOCK) {
+            status = handle_peer_file_cache_block_request(request, response);
         }
-    }
-    DBUG_EXECUTE_IF("CloudInternalServiceImpl::fetch_peer_data_slower", {
-        int st_us = dp->param<int>("sleep", 1000);
-        
LOG_WARNING("CloudInternalServiceImpl::fetch_peer_data_slower").tag("sleep", 
st_us);
-        // sleep us
-        bthread_usleep(st_us);
-    });
 
-    int64_t end_ts = std::chrono::duration_cast<std::chrono::microseconds>(
-                             
std::chrono::steady_clock::now().time_since_epoch())
-                             .count();
-    g_file_cache_get_by_peer_server_latency << (end_ts - begin_ts);
-    g_file_cache_get_by_peer_success_num << 1;
+        if (!status.ok()) {
+            LOG(WARNING) << "fetch peer data failed: " << status.to_string();
+            set_error_response(response, status.to_string());
+        }
 
-    VLOG_DEBUG << "fetch cache request=" << request->DebugString()
-               << ", response=" << response->DebugString();
+        DBUG_EXECUTE_IF("CloudInternalServiceImpl::fetch_peer_data_slower", {
+            int st_us = dp->param<int>("sleep", 1000);
+            
LOG_WARNING("CloudInternalServiceImpl::fetch_peer_data_slower").tag("sleep", 
st_us);
+            bthread_usleep(st_us);
+        });
+
+        auto end_ts = std::chrono::duration_cast<std::chrono::microseconds>(
+                              
std::chrono::steady_clock::now().time_since_epoch())
+                              .count();
+        g_file_cache_get_by_peer_server_latency << (end_ts - begin_ts);
+        g_file_cache_get_by_peer_success_num << 1;
+
+        VLOG_DEBUG << "fetch cache request=" << request->DebugString()
+                   << ", response=" << response->DebugString();
+    });
+
+    if (!ret) {
+        brpc::ClosureGuard closure_guard(done);
+        LOG(WARNING) << "fail to offer fetch peer data request to the work 
pool, pool="
+                     << _heavy_work_pool.get_info();
+    }
 }
 
 #include "common/compile_check_end.h"
diff --git a/be/src/cloud/config.cpp b/be/src/cloud/config.cpp
index b7a504c4bac..4ac4254475c 100644
--- a/be/src/cloud/config.cpp
+++ b/be/src/cloud/config.cpp
@@ -133,7 +133,7 @@ DEFINE_mBool(enable_standby_passive_compaction, "true");
 
 DEFINE_mDouble(standby_compaction_version_ratio, "0.8");
 
-DEFINE_mBool(enable_cache_read_from_peer, "false");
+DEFINE_mBool(enable_cache_read_from_peer, "true");
 
 // Cache the expiration time of the peer address.
 // This can be configured to be less than the 
`rehash_tablet_after_be_dead_seconds` setting in the `fe` configuration.
diff --git a/be/src/io/cache/block_file_cache_downloader.cpp 
b/be/src/io/cache/block_file_cache_downloader.cpp
index 4e9ffb2bc5a..a529a8df625 100644
--- a/be/src/io/cache/block_file_cache_downloader.cpp
+++ b/be/src/io/cache/block_file_cache_downloader.cpp
@@ -318,6 +318,14 @@ void FileCacheBlockDownloader::download_segment_file(const 
DownloadFileMeta& met
 
     std::unique_ptr<char[]> buffer(new char[one_single_task_size]);
 
+    DBUG_EXECUTE_IF("FileCacheBlockDownloader::download_segment_file_sleep", {
+        auto sleep_time = 
DebugPoints::instance()->get_debug_param_or_default<int32_t>(
+                "FileCacheBlockDownloader::download_segment_file_sleep", 
"sleep_time", 3);
+        LOG(INFO) << "FileCacheBlockDownloader::download_segment_file_sleep: 
sleep_time="
+                  << sleep_time;
+        sleep(sleep_time);
+    });
+
     size_t task_offset = 0;
     for (size_t i = 0; i < task_num; i++) {
         size_t offset = meta.offset + task_offset;
diff --git a/fe/fe-common/src/main/java/org/apache/doris/common/Config.java 
b/fe/fe-common/src/main/java/org/apache/doris/common/Config.java
index c7a6a44b123..d8c89648e26 100644
--- a/fe/fe-common/src/main/java/org/apache/doris/common/Config.java
+++ b/fe/fe-common/src/main/java/org/apache/doris/common/Config.java
@@ -3339,7 +3339,7 @@ public class Config extends ConfigBase {
     @ConfField(mutable = true, masterOnly = true)
     public static int cloud_min_balance_tablet_num_per_run = 2;
 
-    @ConfField(description = {"指定存算分离模式下所有Compute group的扩缩容预热方式。"
+    @ConfField(mutable = true, masterOnly = true, description = 
{"指定存算分离模式下所有Compute group的扩缩容预热方式。"
             + "without_warmup: 直接修改tablet分片映射,首次读从S3拉取,均衡最快但性能波动最大;"
             + "async_warmup: 异步预热,尽力而为拉取cache,均衡较快但可能cache miss;"
             + "sync_warmup: 同步预热,确保cache迁移完成,均衡较慢但无cache miss;"
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/cloud/catalog/CloudTabletRebalancer.java
 
b/fe/fe-core/src/main/java/org/apache/doris/cloud/catalog/CloudTabletRebalancer.java
index 2ed084911c1..f6d1a58c9d3 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/cloud/catalog/CloudTabletRebalancer.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/cloud/catalog/CloudTabletRebalancer.java
@@ -980,7 +980,7 @@ public class CloudTabletRebalancer extends MasterDaemon {
                 sendPreHeatingRpc(task.pickedTablet, task.srcBe, task.destBe);
             } catch (Exception e) {
                 LOG.warn("Failed to preheat tablet {} from {} to {}, "
-                                + "help msg turn off fe config 
enable_cloud_warm_up_for_rebalance",
+                                + "help msg change fe config 
cloud_warm_up_for_rebalance_type to without_warmup, ",
                         task.pickedTablet.getId(), task.srcBe, task.destBe, e);
             }
         }
@@ -1278,7 +1278,7 @@ public class CloudTabletRebalancer extends MasterDaemon {
             sendPreHeatingRpc(pickedTablet, srcBe, destBe);
         } catch (Exception e) {
             LOG.warn("Failed to preheat tablet {} from {} to {}, "
-                    + "help msg turn off fe config 
enable_cloud_warm_up_for_rebalance",
+                    + "help msg change fe config 
cloud_warm_up_for_rebalance_type to without_warmup ",
                     pickedTablet.getId(), srcBe, destBe, e);
             return;
         }
diff --git 
a/regression-test/suites/cloud_p0/balance/test_balance_metrics.groovy 
b/regression-test/suites/cloud_p0/balance/test_balance_metrics.groovy
index 3a31cd08dc1..eb81ad69524 100644
--- a/regression-test/suites/cloud_p0/balance/test_balance_metrics.groovy
+++ b/regression-test/suites/cloud_p0/balance/test_balance_metrics.groovy
@@ -29,7 +29,7 @@ suite('test_balance_metrics', 'docker') {
         'sys_log_verbose_modules=org',
         'heartbeat_interval_second=1',
         'rehash_tablet_after_be_dead_seconds=3600',
-        'enable_cloud_warm_up_for_rebalance=false'
+        'cloud_warm_up_for_rebalance_type=without_warmup'
     ]
     options.beConfigs += [
         'report_tablet_interval_seconds=1',
diff --git 
a/regression-test/suites/cloud_p0/balance/test_peer_read_async_warmup.groovy 
b/regression-test/suites/cloud_p0/balance/test_peer_read_async_warmup.groovy
index 8ce71606542..4843c8e6773 100644
--- a/regression-test/suites/cloud_p0/balance/test_peer_read_async_warmup.groovy
+++ b/regression-test/suites/cloud_p0/balance/test_peer_read_async_warmup.groovy
@@ -38,6 +38,7 @@ suite('test_peer_read_async_warmup', 'docker') {
         'schedule_sync_tablets_interval_s=18000',
         'disable_auto_compaction=true',
         'sys_log_verbose_modules=*',
+        'enable_cache_read_from_peer=true',
     ]
     options.setFeNum(1)
     options.setBeNum(1)
diff --git 
a/regression-test/suites/cloud_p0/multi_cluster/test_drop_cluster_clean_metrics.groovy
 
b/regression-test/suites/cloud_p0/multi_cluster/test_drop_cluster_clean_metrics.groovy
index 91410e246c1..d5f2e24c50c 100644
--- 
a/regression-test/suites/cloud_p0/multi_cluster/test_drop_cluster_clean_metrics.groovy
+++ 
b/regression-test/suites/cloud_p0/multi_cluster/test_drop_cluster_clean_metrics.groovy
@@ -29,7 +29,7 @@ suite('test_drop_cluster_clean_metrics', 'docker') {
         'sys_log_verbose_modules=org',
         'heartbeat_interval_second=1',
         'rehash_tablet_after_be_dead_seconds=3600',
-        'enable_cloud_warm_up_for_rebalance=false'
+        'cloud_warm_up_for_rebalance_type=without_warmup'
     ]
     options.beConfigs += [
         'report_tablet_interval_seconds=1',
diff --git 
a/regression-test/suites/cloud_p0/multi_cluster/test_fe_tablet_same_backend.groovy
 
b/regression-test/suites/cloud_p0/multi_cluster/test_fe_tablet_same_backend.groovy
index 5ecc610bf5a..9f24d1b2dbf 100644
--- 
a/regression-test/suites/cloud_p0/multi_cluster/test_fe_tablet_same_backend.groovy
+++ 
b/regression-test/suites/cloud_p0/multi_cluster/test_fe_tablet_same_backend.groovy
@@ -107,7 +107,7 @@ suite('test_fe_tablet_same_backend', 
'multi_cluster,docker') {
     def options = new ClusterOptions()
     options.feConfigs += [
         'cloud_cluster_check_interval_second=1',
-        'enable_cloud_warm_up_for_rebalance=true',
+        'cloud_warm_up_for_rebalance_type=async_warmup',
         'cloud_tablet_rebalancer_interval_second=1',
         'cloud_balance_tablet_percent_per_run=1.0',
     ]
diff --git 
a/regression-test/suites/cloud_p0/multi_cluster/test_rebalance.groovy 
b/regression-test/suites/cloud_p0/multi_cluster/test_rebalance.groovy
index 83a8dc336de..81f2227ce44 100644
--- a/regression-test/suites/cloud_p0/multi_cluster/test_rebalance.groovy
+++ b/regression-test/suites/cloud_p0/multi_cluster/test_rebalance.groovy
@@ -40,8 +40,8 @@ suite('test_rebalance_in_cloud', 'multi_cluster,docker') {
             'sys_log_verbose_modules=org',
         ]
     }
-    clusterOptions[0].feConfigs += ['enable_cloud_warm_up_for_rebalance=true', 
            'cloud_pre_heating_time_limit_sec=300']
-    clusterOptions[1].feConfigs += ['enable_cloud_warm_up_for_rebalance=false']
+    clusterOptions[0].feConfigs += 
['cloud_warm_up_for_rebalance_type=sync_warmup','cloud_pre_heating_time_limit_sec=300']
+    clusterOptions[1].feConfigs += 
['cloud_warm_up_for_rebalance_type=without_warmup']
 
 
     for (int i = 0; i < clusterOptions.size(); i++) {
@@ -178,7 +178,7 @@ suite('test_rebalance_in_cloud', 'multi_cluster,docker') {
             // add a be
             cluster.addBackend(1, null)
             // warm up
-            sql """admin set frontend 
config("enable_cloud_warm_up_for_rebalance"="true")"""
+            sql """admin set frontend 
config("cloud_warm_up_for_rebalance_type"="sync_warmup")"""
 
             // test rebalance thread still work
             awaitUntil(30) {


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to