This is an automated email from the ASF dual-hosted git repository.
dataroaring pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new 6ee5dae4950 [fix](cloud) Fix cloud balance warm up lost idx file
(#57114)
6ee5dae4950 is described below
commit 6ee5dae4950ec53d1111881f3a2eca2496608111
Author: deardeng <[email protected]>
AuthorDate: Wed Oct 22 22:10:42 2025 +0800
[fix](cloud) Fix cloud balance warm up lost idx file (#57114)
---
be/src/cloud/cloud_internal_service.cpp | 72 +++++++++++++-----
be/src/io/cache/block_file_cache_downloader.cpp | 16 +++-
gensrc/proto/internal_service.proto | 1 +
.../cloud_p0/balance/test_balance_warm_up.groovy | 86 ++++++++++++++++------
4 files changed, 130 insertions(+), 45 deletions(-)
diff --git a/be/src/cloud/cloud_internal_service.cpp
b/be/src/cloud/cloud_internal_service.cpp
index 24f60ff1eb5..240ffe56c3c 100644
--- a/be/src/cloud/cloud_internal_service.cpp
+++ b/be/src/cloud/cloud_internal_service.cpp
@@ -96,28 +96,60 @@ void
CloudInternalServiceImpl::get_file_cache_meta_by_tablet_id(
<< " err msg: " << st.to_string();
}
auto rowsets = tablet->get_snapshot_rowset();
- std::for_each(rowsets.cbegin(), rowsets.cend(), [&](const
RowsetSharedPtr& rowset) {
- std::string rowset_id = rowset->rowset_id().to_string();
- for (int32_t segment_id = 0; segment_id < rowset->num_segments();
segment_id++) {
- std::string file_name = fmt::format("{}_{}.dat", rowset_id,
segment_id);
- auto cache_key = io::BlockFileCache::hash(file_name);
- auto* cache =
io::FileCacheFactory::instance()->get_by_path(cache_key);
- auto segments_meta = cache->get_hot_blocks_meta(cache_key);
- for (const auto& tuple : segments_meta) {
- FileCacheBlockMeta* meta =
response->add_file_cache_block_metas();
- meta->set_tablet_id(tablet_id);
- meta->set_rowset_id(rowset_id);
- meta->set_segment_id(segment_id);
- meta->set_file_name(file_name);
-
meta->set_file_size(rowset->rowset_meta()->segment_file_size(segment_id));
- meta->set_offset(std::get<0>(tuple));
- meta->set_size(std::get<1>(tuple));
- meta->set_cache_type(cache_type_to_pb(std::get<2>(tuple)));
- meta->set_expiration_time(std::get<3>(tuple));
- }
+ auto add_meta = [&](PGetFileCacheMetaResponse* resp, int64_t tablet_id,
+ const std::string& rowset_id, int32_t segment_id,
+ const std::string& file_name,
+ const std::tuple<int64_t, int64_t,
io::FileCacheType, int64_t>& tuple,
+ const RowsetSharedPtr& rowset, bool is_index) {
+ FileCacheBlockMeta* meta = resp->add_file_cache_block_metas();
+ meta->set_tablet_id(tablet_id);
+ meta->set_rowset_id(rowset_id);
+ meta->set_segment_id(segment_id);
+ meta->set_file_name(file_name);
+
+ if (!is_index) {
+ // .dat
+
meta->set_file_size(rowset->rowset_meta()->segment_file_size(segment_id));
+ meta->set_file_type(doris::FileType::SEGMENT_FILE);
+ } else {
+ // .idx
+ const auto& idx_file_info =
+
rowset->rowset_meta()->inverted_index_file_info(segment_id);
+ meta->set_file_size(idx_file_info.has_index_size() ?
idx_file_info.index_size()
+ : -1);
+ meta->set_file_type(doris::FileType::INVERTED_INDEX_FILE);
+ }
+
+ meta->set_offset(std::get<0>(tuple));
+ meta->set_size(std::get<1>(tuple));
+ meta->set_cache_type(cache_type_to_pb(std::get<2>(tuple)));
+ meta->set_expiration_time(std::get<3>(tuple));
+ };
+
+ auto process_file_for_segment = [&](PGetFileCacheMetaResponse* resp,
+ const RowsetSharedPtr& rowset,
int64_t tablet_id,
+ const std::string& rowset_id,
int32_t segment_id,
+ bool is_inedex) {
+ const char* extension = is_inedex ? ".idx" : ".dat";
+ std::string file_name = fmt::format("{}_{}{}", rowset_id,
segment_id, extension);
+ auto cache_key = io::BlockFileCache::hash(file_name);
+ auto* cache =
io::FileCacheFactory::instance()->get_by_path(cache_key);
+ if (!cache) return;
+ auto segments_meta = cache->get_hot_blocks_meta(cache_key);
+ for (const auto& tuple : segments_meta) {
+ add_meta(resp, tablet_id, rowset_id, segment_id, file_name,
tuple, rowset,
+ is_inedex);
}
- });
+ };
+
+ for (const RowsetSharedPtr& rowset : rowsets) {
+ std::string rowset_id = rowset->rowset_id().to_string();
+ for (int32_t segment_id = 0; segment_id < rowset->num_segments();
++segment_id) {
+ process_file_for_segment(response, rowset, tablet_id,
rowset_id, segment_id, false);
+ process_file_for_segment(response, rowset, tablet_id,
rowset_id, segment_id, true);
+ }
+ }
}
VLOG_DEBUG << "warm up get meta request=" << request->DebugString()
<< ", response=" << response->DebugString();
diff --git a/be/src/io/cache/block_file_cache_downloader.cpp
b/be/src/io/cache/block_file_cache_downloader.cpp
index d79fe269971..f5a2a287f83 100644
--- a/be/src/io/cache/block_file_cache_downloader.cpp
+++ b/be/src/io/cache/block_file_cache_downloader.cpp
@@ -219,9 +219,21 @@ void FileCacheBlockDownloader::download_file_cache_block(
<< "status=" << st.to_string();
};
+ std::string path;
+ doris::FileType file_type =
+ meta.has_file_type() ? meta.file_type() :
doris::FileType::SEGMENT_FILE;
+ bool is_index = (file_type == doris::FileType::INVERTED_INDEX_FILE);
+ if (is_index) {
+ path =
storage_resource.value()->remote_idx_v2_path(*find_it->second,
+
meta.segment_id());
+ } else {
+ // default .dat
+ path =
storage_resource.value()->remote_segment_path(*find_it->second,
+
meta.segment_id());
+ }
+
DownloadFileMeta download_meta {
- .path =
storage_resource.value()->remote_segment_path(*find_it->second,
-
meta.segment_id()),
+ .path = path,
.file_size = meta.has_file_size() ? meta.file_size()
: -1, // To avoid trigger
get file size IO
.offset = meta.offset(),
diff --git a/gensrc/proto/internal_service.proto
b/gensrc/proto/internal_service.proto
index 89ec5cb6f09..8bbc80adfbb 100644
--- a/gensrc/proto/internal_service.proto
+++ b/gensrc/proto/internal_service.proto
@@ -854,6 +854,7 @@ message FileCacheBlockMeta {
required FileCacheType cache_type = 7;
required int64 expiration_time = 8;
optional int64 file_size = 9;
+ optional FileType file_type = 10;
}
message PGetFileCacheMetaResponse {
diff --git
a/regression-test/suites/cloud_p0/balance/test_balance_warm_up.groovy
b/regression-test/suites/cloud_p0/balance/test_balance_warm_up.groovy
index f6afcfb2f99..7a18f22bb31 100644
--- a/regression-test/suites/cloud_p0/balance/test_balance_warm_up.groovy
+++ b/regression-test/suites/cloud_p0/balance/test_balance_warm_up.groovy
@@ -46,36 +46,71 @@ suite('test_balance_warm_up', 'docker') {
def ms = cluster.getAllMetaservices().get(0)
def msHttpPort = ms.host + ":" + ms.httpPort
sql """CREATE TABLE $table (
- `k1` int(11) NULL,
- `v1` VARCHAR(2048)
+ `id` BIGINT,
+ `deleted` TINYINT,
+ `type` String,
+ `author` String,
+ `timestamp` DateTimeV2,
+ `comment` String,
+ `dead` TINYINT,
+ `parent` BIGINT,
+ `poll` BIGINT,
+ `children` Array<BIGINT>,
+ `url` String,
+ `score` INT,
+ `title` String,
+ `parts` Array<INT>,
+ `descendants` INT,
+ INDEX idx_comment (`comment`) USING INVERTED
PROPERTIES("parser" = "english") COMMENT 'inverted index for comment'
)
- DUPLICATE KEY(`k1`)
- COMMENT 'OLAP'
- DISTRIBUTED BY HASH(`k1`) BUCKETS 2
- PROPERTIES (
- "replication_num"="1"
- );
+ DUPLICATE KEY(`id`)
+ DISTRIBUTED BY HASH(`id`) BUCKETS 2
+ PROPERTIES ("replication_num" = "1");
"""
sql """
- insert into $table values (10, '1'), (20, '2')
+ insert into $table values (344083, 1, 'comment', 'spez',
'2008-10-26 13:49:29', 'Stay tuned...', 0, 343906, 0, [31, 454446], '', 0, '',
[], 0), (33, 0, 'comment', 'spez', '2006-10-10 23:50:40', 'winnar winnar
chicken dinnar!', 0, 31, 0, [34, 454450], '', 0, '', [], 0);
"""
sql """
- insert into $table values (30, '3'), (40, '4')
+ insert into $table values (44, 1, 'comment', 'spez', '2006-10-11
23:00:48', 'Welcome back, Randall', 0, 43, 0, [454465], '', 0, '', [], 0),
(46, 0, 'story', 'goldfish', '2006-10-11 23:39:28', '', 0, 0, 0, [454470],
'http://www.rentometer.com/', 0, ' VCs Prefer to Fund Nearby Firms - New York
Times', [], 0);
"""
// before add be
def beforeGetFromFe = getTabletAndBeHostFromFe(table)
def beforeGetFromBe =
getTabletAndBeHostFromBe(cluster.getAllBackends())
// version 2
- def beforeCacheDirVersion2 = getTabletFileCacheDirFromBe(msHttpPort,
table, 2)
- logger.info("cache dir version 2 {}", beforeCacheDirVersion2)
+ def beforeDataCacheDirVersion2 =
getTabletFileCacheDirFromBe(msHttpPort, table, 2, "dat")
+ def beforeIdxCacheDirVersion2 =
getTabletFileCacheDirFromBe(msHttpPort, table, 2, "idx")
+ logger.info("cache dir version 2 data={}, idx={}",
beforeDataCacheDirVersion2, beforeIdxCacheDirVersion2)
// version 3
- def beforeCacheDirVersion3 = getTabletFileCacheDirFromBe(msHttpPort,
table, 3)
- logger.info("cache dir version 3 {}", beforeCacheDirVersion3)
+ def beforeDataCacheDirVersion3 =
getTabletFileCacheDirFromBe(msHttpPort, table, 3, "dat")
+ def beforeIdxCacheDirVersion3 =
getTabletFileCacheDirFromBe(msHttpPort, table, 3, "idx")
+ logger.info("cache dir version 3 data={}, idx={}",
beforeDataCacheDirVersion3, beforeIdxCacheDirVersion3)
- def beforeMergedCacheDir = beforeCacheDirVersion2 +
beforeCacheDirVersion3.collectEntries { host, hashFiles ->
- [(host): beforeCacheDirVersion2[host] ?
(beforeCacheDirVersion2[host] + hashFiles) : hashFiles]
+ // 通用合并函数:按 host 合并多个 map, 并去重
+ def mergeCacheDirs = { Map[] maps ->
+ def result = [:].withDefault { [] } // 每个不存在的 key 都会生成新的 List
+ maps.each { m ->
+ if (!m) return
+ m.each { host, files ->
+ if (!files) return
+ def target = result[host]
+ if (files instanceof Collection) {
+ target.addAll(files)
+ } else {
+ target << files
+ }
+ }
+ }
+ // 确保每个 host 的文件列表去重后返回
+ result.collectEntries { host, files -> [ (host): files.unique() ] }
}
+
+ def beforeMergedCacheDir = mergeCacheDirs(
+ beforeDataCacheDirVersion2,
+ beforeIdxCacheDirVersion2,
+ beforeDataCacheDirVersion3,
+ beforeIdxCacheDirVersion3
+ )
logger.info("before fe tablets {}, be tablets {}, cache dir {}",
beforeGetFromFe, beforeGetFromBe, beforeMergedCacheDir)
def beforeWarmUpResult = sql_return_maparray """ADMIN SHOW REPLICA
DISTRIBUTION FROM $table"""
@@ -98,15 +133,20 @@ suite('test_balance_warm_up', 'docker') {
def afterGetFromFe = getTabletAndBeHostFromFe(table)
def afterGetFromBe = getTabletAndBeHostFromBe(cluster.getAllBackends())
// version 2
- def afterCacheDirVersion2 = getTabletFileCacheDirFromBe(msHttpPort,
table, 2)
- logger.info("after cache dir version 2 {}", afterCacheDirVersion2)
+ def afterDataCacheDirVersion2 =
getTabletFileCacheDirFromBe(msHttpPort, table, 2, "dat")
+ def afterIdxCacheDirVersion2 = getTabletFileCacheDirFromBe(msHttpPort,
table, 2, "idx")
+ logger.info("after cache dir version 2 data={}, idx={}",
afterDataCacheDirVersion2, afterIdxCacheDirVersion2)
// version 3
- def afterCacheDirVersion3 = getTabletFileCacheDirFromBe(msHttpPort,
table, 3)
- logger.info("after cache dir version 3 {}", afterCacheDirVersion3)
+ def afterDataCacheDirVersion3 =
getTabletFileCacheDirFromBe(msHttpPort, table, 3, "dat")
+ def afterIdxCacheDirVersion3 = getTabletFileCacheDirFromBe(msHttpPort,
table, 3, "idx")
+ logger.info("after cache dir version 3 data={}, idx={}",
afterDataCacheDirVersion3, afterIdxCacheDirVersion3)
- def afterMergedCacheDir = afterCacheDirVersion2 +
afterCacheDirVersion3.collectEntries { host, hashFiles ->
- [(host): afterCacheDirVersion2[host] ?
(afterCacheDirVersion2[host] + hashFiles) : hashFiles]
- }
+ def afterMergedCacheDir = mergeCacheDirs(
+ afterDataCacheDirVersion2,
+ afterIdxCacheDirVersion2,
+ afterDataCacheDirVersion3,
+ afterIdxCacheDirVersion3
+ )
logger.info("after fe tablets {}, be tablets {}, cache dir {}",
afterGetFromFe, afterGetFromBe, afterMergedCacheDir)
def newAddBeCacheDir = afterMergedCacheDir.get(newAddBe.Host)
logger.info("new add be cache dir {}", newAddBeCacheDir)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]