This is an automated email from the ASF dual-hosted git repository.
hellostephen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new 77380ff1e2d [fix](cloud)Fix read from peer use thread pool not asyncio
(#57587)
77380ff1e2d is described below
commit 77380ff1e2da51ae042fd3a965193a8f5ff73087
Author: deardeng <[email protected]>
AuthorDate: Sun Nov 2 19:33:08 2025 +0800
[fix](cloud)Fix read from peer use thread pool not asyncio (#57587)
Related PR:
https://github.com/apache/doris/pull/56384 modify the
`enable_cache_read_from_peer` configuration from false to true.
---
be/src/cloud/cloud_internal_service.cpp | 225 ++++++++++++---------
be/src/cloud/config.cpp | 2 +-
be/src/io/cache/block_file_cache_downloader.cpp | 8 +
.../main/java/org/apache/doris/common/Config.java | 2 +-
.../doris/cloud/catalog/CloudTabletRebalancer.java | 4 +-
.../cloud_p0/balance/test_balance_metrics.groovy | 2 +-
.../balance/test_peer_read_async_warmup.groovy | 1 +
.../test_drop_cluster_clean_metrics.groovy | 2 +-
.../test_fe_tablet_same_backend.groovy | 2 +-
.../cloud_p0/multi_cluster/test_rebalance.groovy | 6 +-
10 files changed, 152 insertions(+), 102 deletions(-)
diff --git a/be/src/cloud/cloud_internal_service.cpp
b/be/src/cloud/cloud_internal_service.cpp
index 94bb951b95d..957a3febc07 100644
--- a/be/src/cloud/cloud_internal_service.cpp
+++ b/be/src/cloud/cloud_internal_service.cpp
@@ -170,110 +170,151 @@ void
CloudInternalServiceImpl::get_file_cache_meta_by_tablet_id(
<< ", response=" << response->DebugString();
}
+namespace {
+// Helper functions for fetch_peer_data
+
+Status handle_peer_file_range_request(const std::string& path,
PFetchPeerDataResponse* response) {
+ // Read specific range [file_offset, file_offset+file_size) across cached
blocks
+ auto datas =
io::FileCacheFactory::instance()->get_cache_data_by_path(path);
+ for (auto& cb : datas) {
+ *(response->add_datas()) = std::move(cb);
+ }
+ return Status::OK();
+}
+
+void set_error_response(PFetchPeerDataResponse* response, const std::string&
error_msg) {
+ response->mutable_status()->add_error_msgs(error_msg);
+ response->mutable_status()->set_status_code(TStatusCode::INTERNAL_ERROR);
+}
+
+Status read_file_block(const std::shared_ptr<io::FileBlock>& file_block,
+ doris::CacheBlockPB* output) {
+ std::string data;
+ data.resize(file_block->range().size());
+
+ auto begin_read_file_ts =
std::chrono::duration_cast<std::chrono::microseconds>(
+
std::chrono::steady_clock::now().time_since_epoch())
+ .count();
+
+ SCOPED_ATTACH_TASK(ExecEnv::GetInstance()->s3_file_buffer_tracker());
+ Slice slice(data.data(), data.size());
+ Status read_st = file_block->read(slice, /*read_offset=*/0);
+
+ auto end_read_file_ts =
std::chrono::duration_cast<std::chrono::microseconds>(
+
std::chrono::steady_clock::now().time_since_epoch())
+ .count();
+ g_file_cache_get_by_peer_read_cache_file_latency << (end_read_file_ts -
begin_read_file_ts);
+
+ if (read_st.ok()) {
+ output->set_data(std::move(data));
+ return Status::OK();
+ } else {
+ g_file_cache_get_by_peer_failed_num << 1;
+ LOG(WARNING) << "read cache block failed: " << read_st;
+ return read_st;
+ }
+}
+
+Status handle_peer_file_cache_block_request(const PFetchPeerDataRequest*
request,
+ PFetchPeerDataResponse* response) {
+ const auto& path = request->path();
+ auto hash = io::BlockFileCache::hash(path);
+ auto* cache = io::FileCacheFactory::instance()->get_by_path(hash);
+ if (cache == nullptr) {
+ g_file_cache_get_by_peer_failed_num << 1;
+ set_error_response(response, "can't get file cache instance");
+ return Status::InternalError("can't get file cache instance");
+ }
+
+ io::CacheContext ctx {};
+ io::ReadStatistics local_stats;
+ ctx.stats = &local_stats;
+
+ for (const auto& cb_req : request->cache_req()) {
+ size_t offset = static_cast<size_t>(std::max<int64_t>(0,
cb_req.block_offset()));
+ size_t size = static_cast<size_t>(std::max<int64_t>(0,
cb_req.block_size()));
+ auto holder = cache->get_or_set(hash, offset, size, ctx);
+
+ for (auto& fb : holder.file_blocks) {
+ if (fb->state() != io::FileBlock::State::DOWNLOADED) {
+ g_file_cache_get_by_peer_failed_num << 1;
+ LOG(WARNING) << "read cache block failed, state=" <<
fb->state();
+ set_error_response(response, "read cache file error");
+ return Status::InternalError("cache block not downloaded");
+ }
+
+ g_file_cache_get_by_peer_blocks_num << 1;
+ doris::CacheBlockPB* out = response->add_datas();
+ out->set_block_offset(static_cast<int64_t>(fb->offset()));
+ out->set_block_size(static_cast<int64_t>(fb->range().size()));
+
+ Status read_status = read_file_block(fb, out);
+ if (!read_status.ok()) {
+ set_error_response(response, "read cache file error");
+ return read_status;
+ }
+ }
+ }
+
+ return Status::OK();
+}
+} // namespace
+
void
CloudInternalServiceImpl::fetch_peer_data(google::protobuf::RpcController*
controller
[[maybe_unused]],
const PFetchPeerDataRequest*
request,
PFetchPeerDataResponse*
response,
google::protobuf::Closure*
done) {
- // TODO(dx): use async thread pool to handle the request, not AsyncIO
- brpc::ClosureGuard closure_guard(done);
- g_file_cache_get_by_peer_num << 1;
- if (!config::enable_file_cache) {
- LOG_WARNING("try to access file cache data, but file cache not
enabled");
- return;
- }
- int64_t begin_ts = std::chrono::duration_cast<std::chrono::microseconds>(
-
std::chrono::steady_clock::now().time_since_epoch())
- .count();
- const auto type = request->type();
- const auto& path = request->path();
- response->mutable_status()->set_status_code(TStatusCode::OK);
- if (type == PFetchPeerDataRequest_Type_PEER_FILE_RANGE) {
- // Read specific range [file_offset, file_offset+file_size) across
cached blocks
- auto datas =
io::FileCacheFactory::instance()->get_cache_data_by_path(path);
- for (auto& cb : datas) {
- *(response->add_datas()) = std::move(cb);
- }
- } else if (type == PFetchPeerDataRequest_Type_PEER_FILE_CACHE_BLOCK) {
- // Multiple specific blocks
- auto hash = io::BlockFileCache::hash(path);
- auto* cache = io::FileCacheFactory::instance()->get_by_path(hash);
- if (cache == nullptr) {
- g_file_cache_get_by_peer_failed_num << 1;
- response->mutable_status()->add_error_msgs("can't get file cache
instance");
-
response->mutable_status()->set_status_code(TStatusCode::INTERNAL_ERROR);
+ bool ret = _heavy_work_pool.try_offer([request, response, done]() {
+ brpc::ClosureGuard closure_guard(done);
+ g_file_cache_get_by_peer_num << 1;
+
+ if (!config::enable_file_cache) {
+ LOG_WARNING("try to access file cache data, but file cache not
enabled");
return;
}
- io::CacheContext ctx {};
- // ensure a valid stats pointer is provided to cache layer
- io::ReadStatistics local_stats;
- ctx.stats = &local_stats;
- for (const auto& cb_req : request->cache_req()) {
- size_t offset = static_cast<size_t>(std::max<int64_t>(0,
cb_req.block_offset()));
- size_t size = static_cast<size_t>(std::max<int64_t>(0,
cb_req.block_size()));
- auto holder = cache->get_or_set(hash, offset, size, ctx);
- for (auto& fb : holder.file_blocks) {
- auto state = fb->state();
- if (state != io::FileBlock::State::DOWNLOADED) {
- g_file_cache_get_by_peer_failed_num << 1;
- LOG(WARNING) << "read cache block failed, state=" << state;
- response->mutable_status()->add_error_msgs("read cache
file error");
-
response->mutable_status()->set_status_code(TStatusCode::INTERNAL_ERROR);
- return;
- }
- g_file_cache_get_by_peer_blocks_num << 1;
- doris::CacheBlockPB* out = response->add_datas();
- out->set_block_offset(static_cast<int64_t>(fb->offset()));
- out->set_block_size(static_cast<int64_t>(fb->range().size()));
- std::string data;
- data.resize(fb->range().size());
- // Offload the file block read to a dedicated OS thread to
avoid bthread IO
- Status read_st = Status::OK();
- // due to file_reader.cpp:33] Check failed: bthread_self() == 0
- int64_t begin_read_file_ts =
- std::chrono::duration_cast<std::chrono::microseconds>(
-
std::chrono::steady_clock::now().time_since_epoch())
- .count();
- auto task = [&] {
- // Current thread not exist ThreadContext, usually after
the thread is started, using SCOPED_ATTACH_TASK macro to create a ThreadContext
and bind a Task.
-
SCOPED_ATTACH_TASK(ExecEnv::GetInstance()->s3_file_buffer_tracker());
- Slice slice(data.data(), data.size());
- read_st = fb->read(slice, /*read_offset=*/0);
- };
- AsyncIO::run_task(task, io::FileSystemType::LOCAL);
- int64_t end_read_file_ts =
- std::chrono::duration_cast<std::chrono::microseconds>(
+
+ auto begin_ts = std::chrono::duration_cast<std::chrono::microseconds>(
std::chrono::steady_clock::now().time_since_epoch())
.count();
- g_file_cache_get_by_peer_read_cache_file_latency
- << (end_read_file_ts - begin_read_file_ts);
- if (read_st.ok()) {
- out->set_data(std::move(data));
- } else {
- g_file_cache_get_by_peer_failed_num << 1;
- LOG(WARNING) << "read cache block failed: " << read_st;
- response->mutable_status()->add_error_msgs("read cache
file error");
-
response->mutable_status()->set_status_code(TStatusCode::INTERNAL_ERROR);
- return;
- }
- }
+
+ const auto type = request->type();
+ const auto& path = request->path();
+ response->mutable_status()->set_status_code(TStatusCode::OK);
+
+ Status status = Status::OK();
+ if (type == PFetchPeerDataRequest_Type_PEER_FILE_RANGE) {
+ status = handle_peer_file_range_request(path, response);
+ } else if (type == PFetchPeerDataRequest_Type_PEER_FILE_CACHE_BLOCK) {
+ status = handle_peer_file_cache_block_request(request, response);
}
- }
- DBUG_EXECUTE_IF("CloudInternalServiceImpl::fetch_peer_data_slower", {
- int st_us = dp->param<int>("sleep", 1000);
-
LOG_WARNING("CloudInternalServiceImpl::fetch_peer_data_slower").tag("sleep",
st_us);
- // sleep us
- bthread_usleep(st_us);
- });
- int64_t end_ts = std::chrono::duration_cast<std::chrono::microseconds>(
-
std::chrono::steady_clock::now().time_since_epoch())
- .count();
- g_file_cache_get_by_peer_server_latency << (end_ts - begin_ts);
- g_file_cache_get_by_peer_success_num << 1;
+ if (!status.ok()) {
+ LOG(WARNING) << "fetch peer data failed: " << status.to_string();
+ set_error_response(response, status.to_string());
+ }
- VLOG_DEBUG << "fetch cache request=" << request->DebugString()
- << ", response=" << response->DebugString();
+ DBUG_EXECUTE_IF("CloudInternalServiceImpl::fetch_peer_data_slower", {
+ int st_us = dp->param<int>("sleep", 1000);
+
LOG_WARNING("CloudInternalServiceImpl::fetch_peer_data_slower").tag("sleep",
st_us);
+ bthread_usleep(st_us);
+ });
+
+ auto end_ts = std::chrono::duration_cast<std::chrono::microseconds>(
+
std::chrono::steady_clock::now().time_since_epoch())
+ .count();
+ g_file_cache_get_by_peer_server_latency << (end_ts - begin_ts);
+ g_file_cache_get_by_peer_success_num << 1;
+
+ VLOG_DEBUG << "fetch cache request=" << request->DebugString()
+ << ", response=" << response->DebugString();
+ });
+
+ if (!ret) {
+ brpc::ClosureGuard closure_guard(done);
+ LOG(WARNING) << "fail to offer fetch peer data request to the work
pool, pool="
+ << _heavy_work_pool.get_info();
+ }
}
#include "common/compile_check_end.h"
diff --git a/be/src/cloud/config.cpp b/be/src/cloud/config.cpp
index b7a504c4bac..4ac4254475c 100644
--- a/be/src/cloud/config.cpp
+++ b/be/src/cloud/config.cpp
@@ -133,7 +133,7 @@ DEFINE_mBool(enable_standby_passive_compaction, "true");
DEFINE_mDouble(standby_compaction_version_ratio, "0.8");
-DEFINE_mBool(enable_cache_read_from_peer, "false");
+DEFINE_mBool(enable_cache_read_from_peer, "true");
// Cache the expiration time of the peer address.
// This can be configured to be less than the
`rehash_tablet_after_be_dead_seconds` setting in the `fe` configuration.
diff --git a/be/src/io/cache/block_file_cache_downloader.cpp
b/be/src/io/cache/block_file_cache_downloader.cpp
index 4e9ffb2bc5a..a529a8df625 100644
--- a/be/src/io/cache/block_file_cache_downloader.cpp
+++ b/be/src/io/cache/block_file_cache_downloader.cpp
@@ -318,6 +318,14 @@ void FileCacheBlockDownloader::download_segment_file(const
DownloadFileMeta& met
std::unique_ptr<char[]> buffer(new char[one_single_task_size]);
+ DBUG_EXECUTE_IF("FileCacheBlockDownloader::download_segment_file_sleep", {
+ auto sleep_time =
DebugPoints::instance()->get_debug_param_or_default<int32_t>(
+ "FileCacheBlockDownloader::download_segment_file_sleep",
"sleep_time", 3);
+ LOG(INFO) << "FileCacheBlockDownloader::download_segment_file_sleep:
sleep_time="
+ << sleep_time;
+ sleep(sleep_time);
+ });
+
size_t task_offset = 0;
for (size_t i = 0; i < task_num; i++) {
size_t offset = meta.offset + task_offset;
diff --git a/fe/fe-common/src/main/java/org/apache/doris/common/Config.java
b/fe/fe-common/src/main/java/org/apache/doris/common/Config.java
index c7a6a44b123..d8c89648e26 100644
--- a/fe/fe-common/src/main/java/org/apache/doris/common/Config.java
+++ b/fe/fe-common/src/main/java/org/apache/doris/common/Config.java
@@ -3339,7 +3339,7 @@ public class Config extends ConfigBase {
@ConfField(mutable = true, masterOnly = true)
public static int cloud_min_balance_tablet_num_per_run = 2;
- @ConfField(description = {"指定存算分离模式下所有Compute group的扩缩容预热方式。"
+ @ConfField(mutable = true, masterOnly = true, description =
{"指定存算分离模式下所有Compute group的扩缩容预热方式。"
+ "without_warmup: 直接修改tablet分片映射,首次读从S3拉取,均衡最快但性能波动最大;"
+ "async_warmup: 异步预热,尽力而为拉取cache,均衡较快但可能cache miss;"
+ "sync_warmup: 同步预热,确保cache迁移完成,均衡较慢但无cache miss;"
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/cloud/catalog/CloudTabletRebalancer.java
b/fe/fe-core/src/main/java/org/apache/doris/cloud/catalog/CloudTabletRebalancer.java
index 2ed084911c1..f6d1a58c9d3 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/cloud/catalog/CloudTabletRebalancer.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/cloud/catalog/CloudTabletRebalancer.java
@@ -980,7 +980,7 @@ public class CloudTabletRebalancer extends MasterDaemon {
sendPreHeatingRpc(task.pickedTablet, task.srcBe, task.destBe);
} catch (Exception e) {
LOG.warn("Failed to preheat tablet {} from {} to {}, "
- + "help msg turn off fe config
enable_cloud_warm_up_for_rebalance",
+ + "help msg change fe config
cloud_warm_up_for_rebalance_type to without_warmup, ",
task.pickedTablet.getId(), task.srcBe, task.destBe, e);
}
}
@@ -1278,7 +1278,7 @@ public class CloudTabletRebalancer extends MasterDaemon {
sendPreHeatingRpc(pickedTablet, srcBe, destBe);
} catch (Exception e) {
LOG.warn("Failed to preheat tablet {} from {} to {}, "
- + "help msg turn off fe config
enable_cloud_warm_up_for_rebalance",
+ + "help msg change fe config
cloud_warm_up_for_rebalance_type to without_warmup ",
pickedTablet.getId(), srcBe, destBe, e);
return;
}
diff --git
a/regression-test/suites/cloud_p0/balance/test_balance_metrics.groovy
b/regression-test/suites/cloud_p0/balance/test_balance_metrics.groovy
index 3a31cd08dc1..eb81ad69524 100644
--- a/regression-test/suites/cloud_p0/balance/test_balance_metrics.groovy
+++ b/regression-test/suites/cloud_p0/balance/test_balance_metrics.groovy
@@ -29,7 +29,7 @@ suite('test_balance_metrics', 'docker') {
'sys_log_verbose_modules=org',
'heartbeat_interval_second=1',
'rehash_tablet_after_be_dead_seconds=3600',
- 'enable_cloud_warm_up_for_rebalance=false'
+ 'cloud_warm_up_for_rebalance_type=without_warmup'
]
options.beConfigs += [
'report_tablet_interval_seconds=1',
diff --git
a/regression-test/suites/cloud_p0/balance/test_peer_read_async_warmup.groovy
b/regression-test/suites/cloud_p0/balance/test_peer_read_async_warmup.groovy
index 8ce71606542..4843c8e6773 100644
--- a/regression-test/suites/cloud_p0/balance/test_peer_read_async_warmup.groovy
+++ b/regression-test/suites/cloud_p0/balance/test_peer_read_async_warmup.groovy
@@ -38,6 +38,7 @@ suite('test_peer_read_async_warmup', 'docker') {
'schedule_sync_tablets_interval_s=18000',
'disable_auto_compaction=true',
'sys_log_verbose_modules=*',
+ 'enable_cache_read_from_peer=true',
]
options.setFeNum(1)
options.setBeNum(1)
diff --git
a/regression-test/suites/cloud_p0/multi_cluster/test_drop_cluster_clean_metrics.groovy
b/regression-test/suites/cloud_p0/multi_cluster/test_drop_cluster_clean_metrics.groovy
index 91410e246c1..d5f2e24c50c 100644
---
a/regression-test/suites/cloud_p0/multi_cluster/test_drop_cluster_clean_metrics.groovy
+++
b/regression-test/suites/cloud_p0/multi_cluster/test_drop_cluster_clean_metrics.groovy
@@ -29,7 +29,7 @@ suite('test_drop_cluster_clean_metrics', 'docker') {
'sys_log_verbose_modules=org',
'heartbeat_interval_second=1',
'rehash_tablet_after_be_dead_seconds=3600',
- 'enable_cloud_warm_up_for_rebalance=false'
+ 'cloud_warm_up_for_rebalance_type=without_warmup'
]
options.beConfigs += [
'report_tablet_interval_seconds=1',
diff --git
a/regression-test/suites/cloud_p0/multi_cluster/test_fe_tablet_same_backend.groovy
b/regression-test/suites/cloud_p0/multi_cluster/test_fe_tablet_same_backend.groovy
index 5ecc610bf5a..9f24d1b2dbf 100644
---
a/regression-test/suites/cloud_p0/multi_cluster/test_fe_tablet_same_backend.groovy
+++
b/regression-test/suites/cloud_p0/multi_cluster/test_fe_tablet_same_backend.groovy
@@ -107,7 +107,7 @@ suite('test_fe_tablet_same_backend',
'multi_cluster,docker') {
def options = new ClusterOptions()
options.feConfigs += [
'cloud_cluster_check_interval_second=1',
- 'enable_cloud_warm_up_for_rebalance=true',
+ 'cloud_warm_up_for_rebalance_type=async_warmup',
'cloud_tablet_rebalancer_interval_second=1',
'cloud_balance_tablet_percent_per_run=1.0',
]
diff --git
a/regression-test/suites/cloud_p0/multi_cluster/test_rebalance.groovy
b/regression-test/suites/cloud_p0/multi_cluster/test_rebalance.groovy
index 83a8dc336de..81f2227ce44 100644
--- a/regression-test/suites/cloud_p0/multi_cluster/test_rebalance.groovy
+++ b/regression-test/suites/cloud_p0/multi_cluster/test_rebalance.groovy
@@ -40,8 +40,8 @@ suite('test_rebalance_in_cloud', 'multi_cluster,docker') {
'sys_log_verbose_modules=org',
]
}
- clusterOptions[0].feConfigs += ['enable_cloud_warm_up_for_rebalance=true',
'cloud_pre_heating_time_limit_sec=300']
- clusterOptions[1].feConfigs += ['enable_cloud_warm_up_for_rebalance=false']
+ clusterOptions[0].feConfigs +=
['cloud_warm_up_for_rebalance_type=sync_warmup','cloud_pre_heating_time_limit_sec=300']
+ clusterOptions[1].feConfigs +=
['cloud_warm_up_for_rebalance_type=without_warmup']
for (int i = 0; i < clusterOptions.size(); i++) {
@@ -178,7 +178,7 @@ suite('test_rebalance_in_cloud', 'multi_cluster,docker') {
// add a be
cluster.addBackend(1, null)
// warm up
- sql """admin set frontend
config("enable_cloud_warm_up_for_rebalance"="true")"""
+ sql """admin set frontend
config("cloud_warm_up_for_rebalance_type"="sync_warmup")"""
// test rebalance thread still work
awaitUntil(30) {
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]