This is an automated email from the ASF dual-hosted git repository.

dataroaring pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new 6ee5dae4950 [fix](cloud) Fix cloud balance warm up lost idx file 
(#57114)
6ee5dae4950 is described below

commit 6ee5dae4950ec53d1111881f3a2eca2496608111
Author: deardeng <[email protected]>
AuthorDate: Wed Oct 22 22:10:42 2025 +0800

    [fix](cloud) Fix cloud balance warm up lost idx file (#57114)
---
 be/src/cloud/cloud_internal_service.cpp            | 72 +++++++++++++-----
 be/src/io/cache/block_file_cache_downloader.cpp    | 16 +++-
 gensrc/proto/internal_service.proto                |  1 +
 .../cloud_p0/balance/test_balance_warm_up.groovy   | 86 ++++++++++++++++------
 4 files changed, 130 insertions(+), 45 deletions(-)

diff --git a/be/src/cloud/cloud_internal_service.cpp 
b/be/src/cloud/cloud_internal_service.cpp
index 24f60ff1eb5..240ffe56c3c 100644
--- a/be/src/cloud/cloud_internal_service.cpp
+++ b/be/src/cloud/cloud_internal_service.cpp
@@ -96,28 +96,60 @@ void 
CloudInternalServiceImpl::get_file_cache_meta_by_tablet_id(
                          << " err msg: " << st.to_string();
         }
         auto rowsets = tablet->get_snapshot_rowset();
-        std::for_each(rowsets.cbegin(), rowsets.cend(), [&](const 
RowsetSharedPtr& rowset) {
-            std::string rowset_id = rowset->rowset_id().to_string();
-            for (int32_t segment_id = 0; segment_id < rowset->num_segments(); 
segment_id++) {
-                std::string file_name = fmt::format("{}_{}.dat", rowset_id, 
segment_id);
-                auto cache_key = io::BlockFileCache::hash(file_name);
-                auto* cache = 
io::FileCacheFactory::instance()->get_by_path(cache_key);
 
-                auto segments_meta = cache->get_hot_blocks_meta(cache_key);
-                for (const auto& tuple : segments_meta) {
-                    FileCacheBlockMeta* meta = 
response->add_file_cache_block_metas();
-                    meta->set_tablet_id(tablet_id);
-                    meta->set_rowset_id(rowset_id);
-                    meta->set_segment_id(segment_id);
-                    meta->set_file_name(file_name);
-                    
meta->set_file_size(rowset->rowset_meta()->segment_file_size(segment_id));
-                    meta->set_offset(std::get<0>(tuple));
-                    meta->set_size(std::get<1>(tuple));
-                    meta->set_cache_type(cache_type_to_pb(std::get<2>(tuple)));
-                    meta->set_expiration_time(std::get<3>(tuple));
-                }
+        auto add_meta = [&](PGetFileCacheMetaResponse* resp, int64_t tablet_id,
+                            const std::string& rowset_id, int32_t segment_id,
+                            const std::string& file_name,
+                            const std::tuple<int64_t, int64_t, 
io::FileCacheType, int64_t>& tuple,
+                            const RowsetSharedPtr& rowset, bool is_index) {
+            FileCacheBlockMeta* meta = resp->add_file_cache_block_metas();
+            meta->set_tablet_id(tablet_id);
+            meta->set_rowset_id(rowset_id);
+            meta->set_segment_id(segment_id);
+            meta->set_file_name(file_name);
+
+            if (!is_index) {
+                // .dat
+                
meta->set_file_size(rowset->rowset_meta()->segment_file_size(segment_id));
+                meta->set_file_type(doris::FileType::SEGMENT_FILE);
+            } else {
+                // .idx
+                const auto& idx_file_info =
+                        
rowset->rowset_meta()->inverted_index_file_info(segment_id);
+                meta->set_file_size(idx_file_info.has_index_size() ? 
idx_file_info.index_size()
+                                                                   : -1);
+                meta->set_file_type(doris::FileType::INVERTED_INDEX_FILE);
+            }
+
+            meta->set_offset(std::get<0>(tuple));
+            meta->set_size(std::get<1>(tuple));
+            meta->set_cache_type(cache_type_to_pb(std::get<2>(tuple)));
+            meta->set_expiration_time(std::get<3>(tuple));
+        };
+
+        auto process_file_for_segment = [&](PGetFileCacheMetaResponse* resp,
+                                            const RowsetSharedPtr& rowset, 
int64_t tablet_id,
+                                            const std::string& rowset_id, 
int32_t segment_id,
+                                            bool is_inedex) {
+            const char* extension = is_inedex ? ".idx" : ".dat";
+            std::string file_name = fmt::format("{}_{}{}", rowset_id, 
segment_id, extension);
+            auto cache_key = io::BlockFileCache::hash(file_name);
+            auto* cache = 
io::FileCacheFactory::instance()->get_by_path(cache_key);
+            if (!cache) return;
+            auto segments_meta = cache->get_hot_blocks_meta(cache_key);
+            for (const auto& tuple : segments_meta) {
+                add_meta(resp, tablet_id, rowset_id, segment_id, file_name, 
tuple, rowset,
+                         is_inedex);
             }
-        });
+        };
+
+        for (const RowsetSharedPtr& rowset : rowsets) {
+            std::string rowset_id = rowset->rowset_id().to_string();
+            for (int32_t segment_id = 0; segment_id < rowset->num_segments(); 
++segment_id) {
+                process_file_for_segment(response, rowset, tablet_id, 
rowset_id, segment_id, false);
+                process_file_for_segment(response, rowset, tablet_id, 
rowset_id, segment_id, true);
+            }
+        }
     }
     VLOG_DEBUG << "warm up get meta request=" << request->DebugString()
                << ", response=" << response->DebugString();
diff --git a/be/src/io/cache/block_file_cache_downloader.cpp 
b/be/src/io/cache/block_file_cache_downloader.cpp
index d79fe269971..f5a2a287f83 100644
--- a/be/src/io/cache/block_file_cache_downloader.cpp
+++ b/be/src/io/cache/block_file_cache_downloader.cpp
@@ -219,9 +219,21 @@ void FileCacheBlockDownloader::download_file_cache_block(
                       << "status=" << st.to_string();
         };
 
+        std::string path;
+        doris::FileType file_type =
+                meta.has_file_type() ? meta.file_type() : 
doris::FileType::SEGMENT_FILE;
+        bool is_index = (file_type == doris::FileType::INVERTED_INDEX_FILE);
+        if (is_index) {
+            path = 
storage_resource.value()->remote_idx_v2_path(*find_it->second,
+                                                                
meta.segment_id());
+        } else {
+            // default .dat
+            path = 
storage_resource.value()->remote_segment_path(*find_it->second,
+                                                                 
meta.segment_id());
+        }
+
         DownloadFileMeta download_meta {
-                .path = 
storage_resource.value()->remote_segment_path(*find_it->second,
-                                                                      
meta.segment_id()),
+                .path = path,
                 .file_size = meta.has_file_size() ? meta.file_size()
                                                   : -1, // To avoid trigger 
get file size IO
                 .offset = meta.offset(),
diff --git a/gensrc/proto/internal_service.proto 
b/gensrc/proto/internal_service.proto
index 89ec5cb6f09..8bbc80adfbb 100644
--- a/gensrc/proto/internal_service.proto
+++ b/gensrc/proto/internal_service.proto
@@ -854,6 +854,7 @@ message FileCacheBlockMeta {
     required FileCacheType cache_type = 7;
     required int64 expiration_time = 8;
     optional int64 file_size = 9;
+    optional FileType file_type = 10;
 }
 
 message PGetFileCacheMetaResponse {
diff --git 
a/regression-test/suites/cloud_p0/balance/test_balance_warm_up.groovy 
b/regression-test/suites/cloud_p0/balance/test_balance_warm_up.groovy
index f6afcfb2f99..7a18f22bb31 100644
--- a/regression-test/suites/cloud_p0/balance/test_balance_warm_up.groovy
+++ b/regression-test/suites/cloud_p0/balance/test_balance_warm_up.groovy
@@ -46,36 +46,71 @@ suite('test_balance_warm_up', 'docker') {
         def ms = cluster.getAllMetaservices().get(0)
         def msHttpPort = ms.host + ":" + ms.httpPort
         sql """CREATE TABLE $table (
-            `k1` int(11) NULL,
-            `v1` VARCHAR(2048)
+                `id` BIGINT,
+                `deleted` TINYINT,
+                `type` String,
+                `author` String,
+                `timestamp` DateTimeV2,
+                `comment` String,
+                `dead` TINYINT,
+                `parent` BIGINT,
+                `poll` BIGINT,
+                `children` Array<BIGINT>,
+                `url` String,
+                `score` INT,
+                `title` String,
+                `parts` Array<INT>,
+                `descendants` INT,
+                INDEX idx_comment (`comment`) USING INVERTED 
PROPERTIES("parser" = "english") COMMENT 'inverted index for comment'
             )
-            DUPLICATE KEY(`k1`)
-            COMMENT 'OLAP'
-            DISTRIBUTED BY HASH(`k1`) BUCKETS 2
-            PROPERTIES (
-            "replication_num"="1"
-            );
+            DUPLICATE KEY(`id`)
+            DISTRIBUTED BY HASH(`id`) BUCKETS 2 
+            PROPERTIES ("replication_num" = "1");
         """
         sql """
-            insert into $table values (10, '1'), (20, '2')
+            insert into $table values  (344083, 1, 'comment', 'spez', 
'2008-10-26 13:49:29', 'Stay tuned...',  0, 343906, 0, [31, 454446], '', 0, '', 
[], 0), (33, 0, 'comment', 'spez', '2006-10-10 23:50:40', 'winnar winnar 
chicken dinnar!',  0, 31, 0, [34, 454450], '', 0, '', [], 0);
         """
         sql """
-            insert into $table values (30, '3'), (40, '4')
+            insert into $table values  (44, 1, 'comment', 'spez', '2006-10-11 
23:00:48', 'Welcome back, Randall',  0, 43, 0, [454465], '', 0, '', [], 0), 
(46, 0, 'story', 'goldfish', '2006-10-11 23:39:28', '',  0, 0, 0, [454470], 
'http://www.rentometer.com/', 0, ' VCs Prefer to Fund Nearby Firms - New York 
Times', [], 0);
         """
 
         // before add be
         def beforeGetFromFe = getTabletAndBeHostFromFe(table)
         def beforeGetFromBe = 
getTabletAndBeHostFromBe(cluster.getAllBackends())
         // version 2
-        def beforeCacheDirVersion2 = getTabletFileCacheDirFromBe(msHttpPort, 
table, 2)
-        logger.info("cache dir version 2 {}", beforeCacheDirVersion2)
+        def beforeDataCacheDirVersion2 = 
getTabletFileCacheDirFromBe(msHttpPort, table, 2, "dat")
+        def beforeIdxCacheDirVersion2 = 
getTabletFileCacheDirFromBe(msHttpPort, table, 2, "idx")
+        logger.info("cache dir version 2 data={}, idx={}", 
beforeDataCacheDirVersion2, beforeIdxCacheDirVersion2)
         // version 3
-        def beforeCacheDirVersion3 = getTabletFileCacheDirFromBe(msHttpPort, 
table, 3)
-        logger.info("cache dir version 3 {}", beforeCacheDirVersion3)
+        def beforeDataCacheDirVersion3 = 
getTabletFileCacheDirFromBe(msHttpPort, table, 3, "dat")
+        def beforeIdxCacheDirVersion3 = 
getTabletFileCacheDirFromBe(msHttpPort, table, 3, "idx")
+        logger.info("cache dir version 3 data={}, idx={}", 
beforeDataCacheDirVersion3, beforeIdxCacheDirVersion3)
 
-        def beforeMergedCacheDir = beforeCacheDirVersion2 + 
beforeCacheDirVersion3.collectEntries { host, hashFiles ->
-            [(host): beforeCacheDirVersion2[host] ? 
(beforeCacheDirVersion2[host] + hashFiles) : hashFiles]
+        // 通用合并函数:按 host 合并多个 map, 并去重
+        def mergeCacheDirs = { Map[] maps ->
+            def result = [:].withDefault { [] } // 每个不存在的 key 都会生成新的 List
+            maps.each { m ->
+                if (!m) return
+                m.each { host, files ->
+                    if (!files) return
+                    def target = result[host]
+                    if (files instanceof Collection) {
+                        target.addAll(files)
+                    } else {
+                        target << files
+                    }
+                }
+            }
+            // 确保每个 host 的文件列表去重后返回
+            result.collectEntries { host, files -> [ (host): files.unique() ] }
         }
+
+        def beforeMergedCacheDir = mergeCacheDirs(
+            beforeDataCacheDirVersion2,
+            beforeIdxCacheDirVersion2,
+            beforeDataCacheDirVersion3,
+            beforeIdxCacheDirVersion3
+        )
         logger.info("before fe tablets {}, be tablets {}, cache dir {}", 
beforeGetFromFe, beforeGetFromBe, beforeMergedCacheDir)
 
         def beforeWarmUpResult = sql_return_maparray """ADMIN SHOW REPLICA 
DISTRIBUTION FROM $table"""
@@ -98,15 +133,20 @@ suite('test_balance_warm_up', 'docker') {
         def afterGetFromFe = getTabletAndBeHostFromFe(table)
         def afterGetFromBe = getTabletAndBeHostFromBe(cluster.getAllBackends())
         // version 2
-        def afterCacheDirVersion2 = getTabletFileCacheDirFromBe(msHttpPort, 
table, 2)
-        logger.info("after cache dir version 2 {}", afterCacheDirVersion2)
+        def afterDataCacheDirVersion2 = 
getTabletFileCacheDirFromBe(msHttpPort, table, 2, "dat")
+        def afterIdxCacheDirVersion2 = getTabletFileCacheDirFromBe(msHttpPort, 
table, 2, "idx")
+        logger.info("after cache dir version 2 data={}, idx={}", 
afterDataCacheDirVersion2, afterIdxCacheDirVersion2)
         // version 3
-        def afterCacheDirVersion3 = getTabletFileCacheDirFromBe(msHttpPort, 
table, 3)
-        logger.info("after cache dir version 3 {}", afterCacheDirVersion3)
+        def afterDataCacheDirVersion3 = 
getTabletFileCacheDirFromBe(msHttpPort, table, 3, "dat")
+        def afterIdxCacheDirVersion3 = getTabletFileCacheDirFromBe(msHttpPort, 
table, 3, "idx")
+        logger.info("after cache dir version 3 data={}, idx={}", 
afterDataCacheDirVersion3, afterIdxCacheDirVersion3)
 
-        def afterMergedCacheDir = afterCacheDirVersion2 + 
afterCacheDirVersion3.collectEntries { host, hashFiles ->
-            [(host): afterCacheDirVersion2[host] ? 
(afterCacheDirVersion2[host] + hashFiles) : hashFiles]
-        }
+        def afterMergedCacheDir = mergeCacheDirs(
+            afterDataCacheDirVersion2,
+            afterIdxCacheDirVersion2,
+            afterDataCacheDirVersion3,
+            afterIdxCacheDirVersion3 
+        )
         logger.info("after fe tablets {}, be tablets {}, cache dir {}", 
afterGetFromFe, afterGetFromBe, afterMergedCacheDir)
         def newAddBeCacheDir = afterMergedCacheDir.get(newAddBe.Host)
         logger.info("new add be cache dir {}", newAddBeCacheDir)


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to