This is an automated email from the ASF dual-hosted git repository.

morrysnow pushed a commit to branch branch-3.1
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/branch-3.1 by this push:
     new 4794327ac03 branch-3.1: [fix](cloud) Fix cloud balance warm up lack of 
latest rs information #54404 (#54505)
4794327ac03 is described below

commit 4794327ac03d37485633f0654250633371125585
Author: github-actions[bot] 
<41898282+github-actions[bot]@users.noreply.github.com>
AuthorDate: Wed Aug 13 11:47:54 2025 +0800

    branch-3.1: [fix](cloud) Fix cloud balance warm up lack of latest rs 
information #54404 (#54505)
    
    Cherry-picked from #54404
    
    Co-authored-by: deardeng <[email protected]>
---
 be/src/cloud/cloud_internal_service.cpp            |   6 +
 be/src/io/cache/block_file_cache_downloader.cpp    |   3 +-
 .../doris/cloud/catalog/CloudTabletRebalancer.java |   6 +-
 .../cloud_p0/balance/test_balance_warm_up.groovy   | 147 +++++++++++++++++++++
 4 files changed, 158 insertions(+), 4 deletions(-)

diff --git a/be/src/cloud/cloud_internal_service.cpp 
b/be/src/cloud/cloud_internal_service.cpp
index 0576cbe4deb..bc6689a4635 100644
--- a/be/src/cloud/cloud_internal_service.cpp
+++ b/be/src/cloud/cloud_internal_service.cpp
@@ -84,6 +84,12 @@ void 
CloudInternalServiceImpl::get_file_cache_meta_by_tablet_id(
             return;
         }
         CloudTabletSPtr tablet = std::move(res.value());
+        auto st = tablet->sync_rowsets();
+        if (!st) {
+            // just log failed, try it best
+            LOG(WARNING) << "failed to sync rowsets: " << tablet_id
+                         << " err msg: " << st.to_string();
+        }
         auto rowsets = tablet->get_snapshot_rowset();
         std::for_each(rowsets.cbegin(), rowsets.cend(), [&](const 
RowsetSharedPtr& rowset) {
             std::string rowset_id = rowset->rowset_id().to_string();
diff --git a/be/src/io/cache/block_file_cache_downloader.cpp 
b/be/src/io/cache/block_file_cache_downloader.cpp
index c7cc5476436..92bb91a7a3f 100644
--- a/be/src/io/cache/block_file_cache_downloader.cpp
+++ b/be/src/io/cache/block_file_cache_downloader.cpp
@@ -174,7 +174,8 @@ void FileCacheBlockDownloader::download_file_cache_block(
     std::ranges::for_each(metas, [&](const FileCacheBlockMeta& meta) {
         VLOG_DEBUG << "download_file_cache_block: start, tablet_id=" << 
meta.tablet_id()
                    << ", rowset_id=" << meta.rowset_id() << ", segment_id=" << 
meta.segment_id()
-                   << ", offset=" << meta.offset() << ", size=" << meta.size();
+                   << ", offset=" << meta.offset() << ", size=" << meta.size()
+                   << ", type=" << meta.cache_type();
         CloudTabletSPtr tablet;
         if (auto res = _engine.tablet_mgr().get_tablet(meta.tablet_id(), 
false); !res.has_value()) {
             LOG(INFO) << "failed to find tablet " << meta.tablet_id() << " : " 
<< res.error();
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/cloud/catalog/CloudTabletRebalancer.java
 
b/fe/fe-core/src/main/java/org/apache/doris/cloud/catalog/CloudTabletRebalancer.java
index e7f080ea329..887490d95f6 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/cloud/catalog/CloudTabletRebalancer.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/cloud/catalog/CloudTabletRebalancer.java
@@ -830,15 +830,15 @@ public class CloudTabletRebalancer extends MasterDaemon {
             req.setTablets(tabletIds);
             TCheckWarmUpCacheAsyncResponse result = 
client.checkWarmUpCacheAsync(req);
             if (result.getStatus().getStatusCode() != TStatusCode.OK) {
-                LOG.warn("check pre cache status {} {}", 
result.getStatus().getStatusCode(),
+                LOG.warn("check pre tablets {} cache status {} {}", tabletIds, 
result.getStatus().getStatusCode(),
                         result.getStatus().getErrorMsgs());
             } else {
-                LOG.info("check pre cache succ status {} {}", 
result.getStatus().getStatusCode(),
+                LOG.info("check pre tablets {} cache succ status {} {}", 
tabletIds, result.getStatus().getStatusCode(),
                         result.getStatus().getErrorMsgs());
             }
             return result.getTaskDone();
         } catch (Exception e) {
-            LOG.warn("send check pre cache rpc error. backend[{}]", 
destBackend.getId(), e);
+            LOG.warn("send check pre cache rpc error. tablets{} backend[{}]", 
tabletIds, destBackend.getId(), e);
             ok = false;
         } finally {
             if (ok) {
diff --git 
a/regression-test/suites/cloud_p0/balance/test_balance_warm_up.groovy 
b/regression-test/suites/cloud_p0/balance/test_balance_warm_up.groovy
new file mode 100644
index 00000000000..f6afcfb2f99
--- /dev/null
+++ b/regression-test/suites/cloud_p0/balance/test_balance_warm_up.groovy
@@ -0,0 +1,147 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+import org.apache.doris.regression.suite.ClusterOptions
+
+
+suite('test_balance_warm_up', 'docker') {
+    if (!isCloudMode()) {
+        return;
+    }
+    def options = new ClusterOptions()
+    options.feConfigs += [
+        'cloud_cluster_check_interval_second=1',
+        'cloud_tablet_rebalancer_interval_second=1',
+        'sys_log_verbose_modules=org',
+        'heartbeat_interval_second=1',
+        'rehash_tablet_after_be_dead_seconds=3600',
+        'enable_cloud_warm_up_for_rebalance=true'
+    ]
+    options.beConfigs += [
+        'report_tablet_interval_seconds=1',
+        'schedule_sync_tablets_interval_s=18000',
+        'disable_auto_compaction=true',
+        'sys_log_verbose_modules=*'
+    ]
+    options.setFeNum(1)
+    options.setBeNum(1)
+    options.cloudMode = true
+    options.enableDebugPoints()
+    
+    def testCase = { table -> 
+        def ms = cluster.getAllMetaservices().get(0)
+        def msHttpPort = ms.host + ":" + ms.httpPort
+        sql """CREATE TABLE $table (
+            `k1` int(11) NULL,
+            `v1` VARCHAR(2048)
+            )
+            DUPLICATE KEY(`k1`)
+            COMMENT 'OLAP'
+            DISTRIBUTED BY HASH(`k1`) BUCKETS 2
+            PROPERTIES (
+            "replication_num"="1"
+            );
+        """
+        sql """
+            insert into $table values (10, '1'), (20, '2')
+        """
+        sql """
+            insert into $table values (30, '3'), (40, '4')
+        """
+
+        // before add be
+        def beforeGetFromFe = getTabletAndBeHostFromFe(table)
+        def beforeGetFromBe = 
getTabletAndBeHostFromBe(cluster.getAllBackends())
+        // version 2
+        def beforeCacheDirVersion2 = getTabletFileCacheDirFromBe(msHttpPort, 
table, 2)
+        logger.info("cache dir version 2 {}", beforeCacheDirVersion2)
+        // version 3
+        def beforeCacheDirVersion3 = getTabletFileCacheDirFromBe(msHttpPort, 
table, 3)
+        logger.info("cache dir version 3 {}", beforeCacheDirVersion3)
+
+        def beforeMergedCacheDir = beforeCacheDirVersion2 + 
beforeCacheDirVersion3.collectEntries { host, hashFiles ->
+            [(host): beforeCacheDirVersion2[host] ? 
(beforeCacheDirVersion2[host] + hashFiles) : hashFiles]
+        }
+        logger.info("before fe tablets {}, be tablets {}, cache dir {}", 
beforeGetFromFe, beforeGetFromBe, beforeMergedCacheDir)
+
+        def beforeWarmUpResult = sql_return_maparray """ADMIN SHOW REPLICA 
DISTRIBUTION FROM $table"""
+        logger.info("before warm up result {}", beforeWarmUpResult)
+
+        cluster.addBackend(1, "compute_cluster")
+        def oldBe = sql_return_maparray('show backends').get(0)
+        def newAddBe = sql_return_maparray('show backends').get(1)
+        // balance tablet
+        awaitUntil(500) {
+            def afterWarmUpResult = sql_return_maparray """ADMIN SHOW REPLICA 
DISTRIBUTION FROM $table"""
+            logger.info("after warm up result {}", afterWarmUpResult)
+            afterWarmUpResult.any { row -> 
+                Integer.valueOf((String) row.ReplicaNum) == 1
+            }
+        }
+
+        // from be1 -> be2, warm up this tablet
+        // after add be
+        def afterGetFromFe = getTabletAndBeHostFromFe(table)
+        def afterGetFromBe = getTabletAndBeHostFromBe(cluster.getAllBackends())
+        // version 2
+        def afterCacheDirVersion2 = getTabletFileCacheDirFromBe(msHttpPort, 
table, 2)
+        logger.info("after cache dir version 2 {}", afterCacheDirVersion2)
+        // version 3
+        def afterCacheDirVersion3 = getTabletFileCacheDirFromBe(msHttpPort, 
table, 3)
+        logger.info("after cache dir version 3 {}", afterCacheDirVersion3)
+
+        def afterMergedCacheDir = afterCacheDirVersion2 + 
afterCacheDirVersion3.collectEntries { host, hashFiles ->
+            [(host): afterCacheDirVersion2[host] ? 
(afterCacheDirVersion2[host] + hashFiles) : hashFiles]
+        }
+        logger.info("after fe tablets {}, be tablets {}, cache dir {}", 
afterGetFromFe, afterGetFromBe, afterMergedCacheDir)
+        def newAddBeCacheDir = afterMergedCacheDir.get(newAddBe.Host)
+        logger.info("new add be cache dir {}", newAddBeCacheDir)
+        assert newAddBeCacheDir.size() != 0
+        assert 
beforeMergedCacheDir[oldBe.Host].containsAll(afterMergedCacheDir[newAddBe.Host])
+
+        def be = cluster.getBeByBackendId(newAddBe.BackendId.toLong())
+        def dataPath = new File("${be.path}/storage/file_cache")
+        logger.info("Checking file_cache directory: {}", dataPath.absolutePath)
+        logger.info("Directory exists: {}", dataPath.exists())
+        
+        def subDirs = []
+        
+        def collectDirs
+        collectDirs = { File dir ->
+            if (dir.exists()) {
+                dir.eachDir { subDir ->
+                    logger.info("Found subdir: {}", subDir.name)
+                    subDirs << subDir.name
+                    collectDirs(subDir) 
+                }
+            }
+        }
+        
+        collectDirs(dataPath)
+        logger.info("BE {} file_cache subdirs: {}", newAddBe.Host, subDirs)
+
+        newAddBeCacheDir.each { hashFile ->
+            assertTrue(subDirs.any { subDir -> subDir.startsWith(hashFile) }, 
+            "Expected cache file pattern ${hashFile} not found in BE 
${newAddBe.Host}'s file_cache directory. " + 
+            "Available subdirs: ${subDirs}")
+        }
+    }
+
+    docker(options) {
+        testCase("test_balance_warm_up_tbl")
+    }
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to