This is an automated email from the ASF dual-hosted git repository.

yiguolei pushed a commit to branch branch-4.0
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/branch-4.0 by this push:
     new a17454c7e6b branch-4.0: [feature](cloud) Support balance sync warm up 
#56164 (#57533)
a17454c7e6b is described below

commit a17454c7e6b7ea51e57463a926ba9b518dce5b38
Author: github-actions[bot] 
<41898282+github-actions[bot]@users.noreply.github.com>
AuthorDate: Thu Nov 6 19:53:09 2025 +0800

    branch-4.0: [feature](cloud) Support balance sync warm up #56164 (#57533)
    
    Cherry-picked from #56164
    
    Co-authored-by: deardeng <[email protected]>
---
 be/src/io/cache/block_file_cache_downloader.cpp    |  14 +-
 cloud/src/meta-service/meta_service_resource.cpp   |  27 +-
 .../main/java/org/apache/doris/common/Config.java  |  22 +-
 .../antlr4/org/apache/doris/nereids/DorisParser.g4 |   2 +
 .../doris/cloud/catalog/BalanceTypeEnum.java       |  69 +++++
 .../cloud/catalog/CloudInstanceStatusChecker.java  |  53 ++++
 .../doris/cloud/catalog/CloudTabletRebalancer.java | 179 +++++++++--
 .../apache/doris/cloud/catalog/ComputeGroup.java   | 153 +++++++++
 .../doris/cloud/system/CloudSystemInfoService.java |  44 +++
 .../doris/nereids/parser/LogicalPlanBuilder.java   |   9 +
 .../apache/doris/nereids/trees/plans/PlanType.java |   1 +
 .../plans/commands/AlterComputeGroupCommand.java   |  98 ++++++
 .../trees/plans/commands/ShowClustersCommand.java  |  19 +-
 .../trees/plans/visitor/CommandVisitor.java        |   5 +
 .../doris/cloud/catalog/ComputeGroupTest.java      | 341 +++++++++++++++++++++
 .../commands/AlterComputeGroupCommandTest.java     | 243 +++++++++++++++
 .../trees/plans/commands/ShowComputeGroupTest.java |  91 ++++++
 gensrc/proto/cloud.proto                           |   2 +
 .../test_alter_compute_group_properties.groovy     |  92 ++++++
 ...est_balance_use_compute_group_properties.groovy | 212 +++++++++++++
 .../cloud_p0/balance/test_balance_warm_up.groovy   |   2 +-
 .../test_balance_warm_up_sync_global_config.groovy | 179 +++++++++++
 .../test_balance_warm_up_task_abnormal.groovy      | 121 ++++++++
 .../balance/test_peer_read_async_warmup.groovy     | 169 ++++++++++
 .../test_warmup_rebalance.groovy                   |   2 +-
 25 files changed, 2116 insertions(+), 33 deletions(-)

diff --git a/be/src/io/cache/block_file_cache_downloader.cpp 
b/be/src/io/cache/block_file_cache_downloader.cpp
index f5a2a287f83..46ce90aafcc 100644
--- a/be/src/io/cache/block_file_cache_downloader.cpp
+++ b/be/src/io/cache/block_file_cache_downloader.cpp
@@ -27,6 +27,7 @@
 
 #include <memory>
 #include <mutex>
+#include <unordered_set>
 #include <variant>
 
 #include "cloud/cloud_tablet_mgr.h"
@@ -171,6 +172,7 @@ std::unordered_map<std::string, RowsetMetaSharedPtr> 
snapshot_rs_metas(BaseTable
 
 void FileCacheBlockDownloader::download_file_cache_block(
         const DownloadTask::FileCacheBlockMetaVec& metas) {
+    std::unordered_set<int64_t> synced_tablets;
     std::ranges::for_each(metas, [&](const FileCacheBlockMeta& meta) {
         VLOG_DEBUG << "download_file_cache_block: start, tablet_id=" << 
meta.tablet_id()
                    << ", rowset_id=" << meta.rowset_id() << ", segment_id=" << 
meta.segment_id()
@@ -183,12 +185,20 @@ void FileCacheBlockDownloader::download_file_cache_block(
         } else {
             tablet = std::move(res).value();
         }
-
+        if (!synced_tablets.contains(meta.tablet_id())) {
+            auto st = tablet->sync_rowsets();
+            if (!st) {
+                // just log failed, try it best
+                LOG(WARNING) << "failed to sync rowsets: " << meta.tablet_id()
+                             << " err msg: " << st.to_string();
+            }
+            synced_tablets.insert(meta.tablet_id());
+        }
         auto id_to_rowset_meta_map = snapshot_rs_metas(tablet.get());
         auto find_it = id_to_rowset_meta_map.find(meta.rowset_id());
         if (find_it == id_to_rowset_meta_map.end()) {
             LOG(WARNING) << "download_file_cache_block: tablet_id=" << 
meta.tablet_id()
-                         << "rowset_id not found, rowset_id=" << 
meta.rowset_id();
+                         << " rowset_id not found, rowset_id=" << 
meta.rowset_id();
             return;
         }
 
diff --git a/cloud/src/meta-service/meta_service_resource.cpp 
b/cloud/src/meta-service/meta_service_resource.cpp
index 6c8add49cc0..77a74abd521 100644
--- a/cloud/src/meta-service/meta_service_resource.cpp
+++ b/cloud/src/meta-service/meta_service_resource.cpp
@@ -2607,7 +2607,7 @@ void handle_set_cluster_status(const std::string& 
instance_id, const ClusterInfo
             });
 }
 
-void handle_alter_vcluster_Info(const std::string& instance_id, const 
ClusterInfo& cluster,
+void handle_alter_vcluster_info(const std::string& instance_id, const 
ClusterInfo& cluster,
                                 std::shared_ptr<ResourceManager> resource_mgr, 
std::string& msg,
                                 MetaServiceCode& code) {
     msg = resource_mgr->update_cluster(
@@ -2694,6 +2694,26 @@ void handle_alter_vcluster_Info(const std::string& 
instance_id, const ClusterInf
             });
 }
 
+void handle_alter_properties(const std::string& instance_id, const 
ClusterInfo& cluster,
+                             std::shared_ptr<ResourceManager> resource_mgr, 
std::string& msg,
+                             MetaServiceCode& code) {
+    msg = resource_mgr->update_cluster(
+            instance_id, cluster,
+            [&](const ClusterPB& i) { return i.cluster_id() == 
cluster.cluster.cluster_id(); },
+            [&](ClusterPB& c, std::vector<ClusterPB>&) {
+                std::string msg;
+                std::stringstream ss;
+                if (ClusterPB::COMPUTE != c.type()) {
+                    code = MetaServiceCode::INVALID_ARGUMENT;
+                    ss << "just support set COMPUTE cluster status";
+                    msg = ss.str();
+                    return msg;
+                }
+                *c.mutable_properties() = cluster.cluster.properties();
+                return msg;
+            });
+}
+
 void MetaServiceImpl::alter_cluster(google::protobuf::RpcController* 
controller,
                                     const AlterClusterRequest* request,
                                     AlterClusterResponse* response,
@@ -2778,7 +2798,10 @@ void 
MetaServiceImpl::alter_cluster(google::protobuf::RpcController* controller,
         handle_set_cluster_status(instance_id, cluster, resource_mgr(), msg, 
code);
         break;
     case AlterClusterRequest::ALTER_VCLUSTER_INFO:
-        handle_alter_vcluster_Info(instance_id, cluster, resource_mgr(), msg, 
code);
+        handle_alter_vcluster_info(instance_id, cluster, resource_mgr(), msg, 
code);
+        break;
+    case AlterClusterRequest::ALTER_PROPERTIES:
+        handle_alter_properties(instance_id, cluster, resource_mgr(), msg, 
code);
         break;
     default:
         code = MetaServiceCode::INVALID_ARGUMENT;
diff --git a/fe/fe-common/src/main/java/org/apache/doris/common/Config.java 
b/fe/fe-common/src/main/java/org/apache/doris/common/Config.java
index 1b386f98588..8d943d5c7e7 100644
--- a/fe/fe-common/src/main/java/org/apache/doris/common/Config.java
+++ b/fe/fe-common/src/main/java/org/apache/doris/common/Config.java
@@ -3333,8 +3333,26 @@ public class Config extends ConfigBase {
     @ConfField(mutable = true, masterOnly = true)
     public static int cloud_min_balance_tablet_num_per_run = 2;
 
-    @ConfField(mutable = true, masterOnly = true)
-    public static boolean enable_cloud_warm_up_for_rebalance = true;
+    @ConfField(description = {"指定存算分离模式下所有Compute group的扩缩容预热方式。"
+            + "without_warmup: 直接修改tablet分片映射,首次读从S3拉取,均衡最快但性能波动最大;"
+            + "async_warmup: 异步预热,尽力而为拉取cache,均衡较快但可能cache miss;"
+            + "sync_warmup: 同步预热,确保cache迁移完成,均衡较慢但无cache miss;"
+            + "peer_read_async_warmup: 直接修改tablet分片映射,首次读从Peer 
BE拉取,均衡最快可能会影响同计算组中其他BE性能。"
+            + "注意:此为全局FE配置,也可通过SQL(ALTER COMPUTE GROUP cg PROPERTIES)"
+            + "设置compute group维度的balance类型,compute group维度配置优先级更高",
+        "Specify the scaling and warming methods for all Compute groups in a 
cloud mode. "
+            + "without_warmup: Directly modify shard mapping, first read from 
S3,"
+            + "fastest re-balance but largest fluctuation; "
+            + "async_warmup: Asynchronous warmup, best-effort cache pulling, "
+            + "faster re-balance but possible cache miss; "
+            + "sync_warmup: Synchronous warmup, ensure cache migration 
completion, "
+            + "slower re-balance but no cache miss; "
+            + "peer_read_async_warmup: Directly modify shard mapping, first 
read from Peer BE, "
+            + "fastest re-balance but may affect other BEs in the same compute 
group performance. "
+            + "Note: This is a global FE configuration, you can also use SQL 
(ALTER COMPUTE GROUP cg PROPERTIES) "
+            + "to set balance type at compute group level, compute group level 
configuration has higher priority"},
+            options = {"without_warmup", "async_warmup", "sync_warmup", 
"peer_read_async_warmup"})
+    public static String cloud_warm_up_for_rebalance_type = "async_warmup";
 
     @ConfField(mutable = true, masterOnly = false)
     public static String security_checker_class_name = "";
diff --git a/fe/fe-core/src/main/antlr4/org/apache/doris/nereids/DorisParser.g4 
b/fe/fe-core/src/main/antlr4/org/apache/doris/nereids/DorisParser.g4
index 14b02187173..38fe0e9247b 100644
--- a/fe/fe-core/src/main/antlr4/org/apache/doris/nereids/DorisParser.g4
+++ b/fe/fe-core/src/main/antlr4/org/apache/doris/nereids/DorisParser.g4
@@ -264,6 +264,8 @@ supportedAlterStatement
     | ALTER STORAGE VAULT name=multipartIdentifier properties=propertyClause   
             #alterStorageVault
     | ALTER WORKLOAD GROUP name=identifierOrText (FOR 
computeGroup=identifierOrText)?
         properties=propertyClause?                                             
             #alterWorkloadGroup
+    | ALTER COMPUTE GROUP name=identifierOrText
+        properties=propertyClause?                                             
             #alterComputeGroup
     | ALTER CATALOG name=identifier SET PROPERTIES
         LEFT_PAREN propertyItemList RIGHT_PAREN                                
             #alterCatalogProperties        
     | ALTER WORKLOAD POLICY name=identifierOrText
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/cloud/catalog/BalanceTypeEnum.java 
b/fe/fe-core/src/main/java/org/apache/doris/cloud/catalog/BalanceTypeEnum.java
new file mode 100644
index 00000000000..d66e3126d5b
--- /dev/null
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/cloud/catalog/BalanceTypeEnum.java
@@ -0,0 +1,69 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.cloud.catalog;
+
+import org.apache.doris.common.Config;
+
+import lombok.Getter;
+
+/**
+ * Enum for balance type options
+ */
+@Getter
+public enum BalanceTypeEnum {
+    WITHOUT_WARMUP("without_warmup"),
+    ASYNC_WARMUP("async_warmup"),
+    SYNC_WARMUP("sync_warmup"),
+    PEER_READ_ASYNC_WARMUP("peer_read_async_warmup");
+
+    private final String value;
+
+    BalanceTypeEnum(String value) {
+        this.value = value;
+    }
+
+    /**
+     * Parse string value to enum, case-insensitive
+     */
+    public static BalanceTypeEnum fromString(String value) {
+        if (value == null) {
+            return null;
+        }
+        for (BalanceTypeEnum type : BalanceTypeEnum.values()) {
+            if (type.value.equalsIgnoreCase(value)) {
+                return type;
+            }
+        }
+        return null;
+    }
+
+    /**
+     * Check if the given string is a valid balance type
+     */
+    public static boolean isValid(String value) {
+        return fromString(value) != null;
+    }
+
+    /**
+     * Get the balance type enum from the configuration string
+     */
+    public static BalanceTypeEnum getCloudWarmUpForRebalanceTypeEnum() {
+        return fromString(Config.cloud_warm_up_for_rebalance_type) == null
+            ? ComputeGroup.DEFAULT_COMPUTE_GROUP_BALANCE_ENUM : 
fromString(Config.cloud_warm_up_for_rebalance_type);
+    }
+}
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/cloud/catalog/CloudInstanceStatusChecker.java
 
b/fe/fe-core/src/main/java/org/apache/doris/cloud/catalog/CloudInstanceStatusChecker.java
index 044f24d2242..40feb8b787b 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/cloud/catalog/CloudInstanceStatusChecker.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/cloud/catalog/CloudInstanceStatusChecker.java
@@ -99,10 +99,63 @@ public class CloudInstanceStatusChecker extends 
MasterDaemon {
         List<Cloud.ClusterPB> virtualClusters = new ArrayList<>();
         List<Cloud.ClusterPB> computeClusters = new ArrayList<>();
         categorizeClusters(clusters, virtualClusters, computeClusters);
+        handleComputeClusters(computeClusters);
         handleVirtualClusters(virtualClusters, computeClusters);
         removeObsoleteVirtualGroups(virtualClusters);
     }
 
+    private void handleComputeClusters(List<Cloud.ClusterPB> computeClusters) {
+        for (Cloud.ClusterPB computeClusterInMs : computeClusters) {
+            ComputeGroup computeGroupInFe = cloudSystemInfoService
+                    .getComputeGroupById(computeClusterInMs.getClusterId());
+            if (computeGroupInFe == null) {
+                // cluster checker will sync it
+                LOG.info("found compute cluster {} in ms, but not in fe mem, "
+                        + "it may be wait cluster checker to sync, ignore it",
+                        computeClusterInMs);
+            } else {
+                // exist compute group, check properties changed and update if 
needed
+                updatePropertiesIfChanged(computeGroupInFe, 
computeClusterInMs);
+            }
+        }
+    }
+
+    /**
+     * Compare properties between compute cluster in MS and compute group in 
FE,
+     * update only the changed key-value pairs to avoid unnecessary updates.
+     */
+    private void updatePropertiesIfChanged(ComputeGroup computeGroupInFe, 
Cloud.ClusterPB computeClusterInMs) {
+        Map<String, String> propertiesInMs = 
computeClusterInMs.getPropertiesMap();
+        Map<String, String> propertiesInFe = computeGroupInFe.getProperties();
+
+        if (propertiesInMs == null || propertiesInMs.isEmpty()) {
+            return;
+        }
+        Map<String, String> changedProperties = new HashMap<>();
+
+        // Check for changed or new properties
+        for (Map.Entry<String, String> entry : propertiesInMs.entrySet()) {
+            String key = entry.getKey();
+            String valueInMs = entry.getValue();
+            String valueInFe = propertiesInFe.get(key);
+
+            if (valueInFe != null && valueInFe.equalsIgnoreCase(valueInMs)) {
+                continue;
+            }
+            changedProperties.put(key, valueInMs);
+
+            LOG.debug("Property changed for compute group {}: {} = {} (was: 
{})",
+                    computeGroupInFe.getName(), key, valueInMs, valueInFe);
+        }
+
+        // Only update if there are actual changes
+        if (!changedProperties.isEmpty()) {
+            LOG.info("Updating properties for compute group {}: {}",
+                    computeGroupInFe.getName(), changedProperties);
+            computeGroupInFe.setProperties(changedProperties);
+        }
+    }
+
     private void categorizeClusters(List<Cloud.ClusterPB> clusters,
                                     List<Cloud.ClusterPB> virtualClusters, 
List<Cloud.ClusterPB> computeClusters) {
         for (Cloud.ClusterPB cluster : clusters) {
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/cloud/catalog/CloudTabletRebalancer.java
 
b/fe/fe-core/src/main/java/org/apache/doris/cloud/catalog/CloudTabletRebalancer.java
index d8d6b712b2e..a33667f0f64 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/cloud/catalog/CloudTabletRebalancer.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/cloud/catalog/CloudTabletRebalancer.java
@@ -112,6 +112,54 @@ public class CloudTabletRebalancer extends MasterDaemon {
 
     private CloudSystemInfoService cloudSystemInfoService;
 
+    private BalanceTypeEnum globalBalanceTypeEnum = 
BalanceTypeEnum.getCloudWarmUpForRebalanceTypeEnum();
+
+    /**
+     * Get the current balance type for a compute group, falling back to 
global balance type if not found
+     */
+    private BalanceTypeEnum getCurrentBalanceType(String clusterId) {
+        ComputeGroup cg = 
cloudSystemInfoService.getComputeGroupById(clusterId);
+        if (cg == null) {
+            LOG.debug("compute group not found, use global balance type, id 
{}", clusterId);
+            return globalBalanceTypeEnum;
+        }
+
+        BalanceTypeEnum computeGroupBalanceType = cg.getBalanceType();
+        if (isComputeGroupBalanceChanged(clusterId)) {
+            return computeGroupBalanceType;
+        }
+        return globalBalanceTypeEnum;
+    }
+
+    /**
+     * Get the current task timeout for a compute group, falling back to 
global timeout if not found
+     */
+    private int getCurrentTaskTimeout(String clusterId) {
+        ComputeGroup cg = 
cloudSystemInfoService.getComputeGroupById(clusterId);
+        if (cg == null) {
+            return Config.cloud_pre_heating_time_limit_sec;
+        }
+
+        int computeGroupTimeout = cg.getBalanceWarmUpTaskTimeout();
+        if (isComputeGroupBalanceChanged(clusterId)) {
+            return computeGroupTimeout;
+        }
+
+        return Config.cloud_pre_heating_time_limit_sec;
+    }
+
+    private boolean isComputeGroupBalanceChanged(String clusterId) {
+        ComputeGroup cg = 
cloudSystemInfoService.getComputeGroupById(clusterId);
+        if (cg == null) {
+            return false;
+        }
+
+        BalanceTypeEnum computeGroupBalanceType = cg.getBalanceType();
+        int computeGroupTimeout = cg.getBalanceWarmUpTaskTimeout();
+        return computeGroupBalanceType != 
ComputeGroup.DEFAULT_COMPUTE_GROUP_BALANCE_ENUM
+               || computeGroupTimeout != 
ComputeGroup.DEFAULT_BALANCE_WARM_UP_TASK_TIMEOUT;
+    }
+
     public CloudTabletRebalancer(CloudSystemInfoService 
cloudSystemInfoService) {
         super("cloud tablet rebalancer", 
Config.cloud_tablet_rebalancer_interval_second * 1000);
         this.cloudSystemInfoService = cloudSystemInfoService;
@@ -239,6 +287,7 @@ public class CloudTabletRebalancer extends MasterDaemon {
 
         LOG.info("cloud tablet rebalance begin");
         long start = System.currentTimeMillis();
+        globalBalanceTypeEnum = 
BalanceTypeEnum.getCloudWarmUpForRebalanceTypeEnum();
 
         buildClusterToBackendMap();
         if (!completeRouteInfo()) {
@@ -417,10 +466,25 @@ public class CloudTabletRebalancer extends MasterDaemon {
     public void checkInflightWarmUpCacheAsync() {
         Map<Long, List<InfightTask>> beToInfightTasks = new HashMap<Long, 
List<InfightTask>>();
 
+        Set<InfightTablet> invalidTasks = new HashSet<>();
         for (Map.Entry<InfightTablet, InfightTask> entry : 
tabletToInfightTask.entrySet()) {
+            String clusterId = entry.getKey().getClusterId();
+            BalanceTypeEnum balanceTypeEnum = getCurrentBalanceType(clusterId);
+            if (balanceTypeEnum == BalanceTypeEnum.WITHOUT_WARMUP) {
+                // no need check warmup cache async
+                invalidTasks.add(entry.getKey());
+                continue;
+            }
             beToInfightTasks.putIfAbsent(entry.getValue().destBe, new 
ArrayList<>());
             
beToInfightTasks.get(entry.getValue().destBe).add(entry.getValue());
         }
+        invalidTasks.forEach(key -> {
+            if (LOG.isDebugEnabled()) {
+                LOG.debug("remove inflight warmup task tablet {} cluster {} no 
need warmup",
+                        key.getTabletId(), key.getClusterId());
+            }
+            tabletToInfightTask.remove(key);
+        });
 
         List<UpdateCloudReplicaInfo> infos = new ArrayList<>();
         long needRehashDeadTime = System.currentTimeMillis() - 
Config.rehash_tablet_after_be_dead_seconds * 1000L;
@@ -432,6 +496,7 @@ public class CloudTabletRebalancer extends MasterDaemon {
                 destBackend = null;
             }
             if (destBackend == null || (!destBackend.isAlive() && 
destBackend.getLastUpdateMs() < needRehashDeadTime)) {
+                // dest backend not exist or dead too long, need remove all 
inflight tasks in this dest backend
                 List<InfightTablet> toRemove = new LinkedList<>();
                 for (InfightTask task : entry.getValue()) {
                     for (InfightTablet key : tabletToInfightTask.keySet()) {
@@ -447,10 +512,12 @@ public class CloudTabletRebalancer extends MasterDaemon {
                 continue;
             }
             if (!destBackend.isAlive()) {
+                // dest backend dead, dead time smaller than 
rehash_tablet_after_be_dead_seconds, wait next time
                 continue;
             }
             List<Long> tablets = entry.getValue().stream()
                     .map(task -> 
task.pickedTablet.getId()).collect(Collectors.toList());
+            // check dest backend whether warmup cache done
             Map<Long, Boolean> taskDone = 
sendCheckWarmUpCacheAsyncRpc(tablets, entry.getKey());
             if (taskDone == null) {
                 LOG.warn("sendCheckWarmUpCacheAsyncRpc return null be {}, 
inFight tasks {}",
@@ -461,17 +528,7 @@ public class CloudTabletRebalancer extends MasterDaemon {
             for (Map.Entry<Long, Boolean> result : taskDone.entrySet()) {
                 InfightTask task = tabletToInfightTask
                         .getOrDefault(new InfightTablet(result.getKey(), 
clusterId), null);
-                if (task != null && (result.getValue() || 
System.currentTimeMillis() / 1000 - task.startTimestamp
-                            > Config.cloud_pre_heating_time_limit_sec)) {
-                    if (!result.getValue()) {
-                        LOG.info("{} pre cache timeout, forced to change the 
mapping", result.getKey());
-                    }
-                    updateClusterToBeMap(task.pickedTablet, task.destBe, 
clusterId, infos);
-                    if (LOG.isDebugEnabled()) {
-                        LOG.debug("remove tablet {}-{}", clusterId, 
task.pickedTablet.getId());
-                    }
-                    tabletToInfightTask.remove(new 
InfightTablet(task.pickedTablet.getId(), clusterId));
-                }
+                handleWarmupCompletion(task, clusterId, result.getValue(), 
result.getKey(), infos);
             }
         }
         long oldSize = infos.size();
@@ -855,6 +912,70 @@ public class CloudTabletRebalancer extends MasterDaemon {
         return null;
     }
 
+    private void handleWarmupCompletion(InfightTask task, String clusterId, 
boolean isDone, long tabletId,
+                                           List<UpdateCloudReplicaInfo> infos) 
{
+        if (task == null) {
+            LOG.warn("cannot find inflight task for tablet {}-{}", clusterId, 
tabletId);
+            return;
+        }
+        boolean shouldUpdateMapping = false;
+        BalanceTypeEnum currentBalanceType = getCurrentBalanceType(clusterId);
+        LOG.debug("cluster id {}, balance type {}, tabletId {}, ", clusterId, 
currentBalanceType, tabletId);
+
+        switch (currentBalanceType) {
+            case ASYNC_WARMUP: {
+                int currentTaskTimeout = getCurrentTaskTimeout(clusterId);
+                boolean timeExceeded = System.currentTimeMillis() / 1000 - 
task.startTimestamp > currentTaskTimeout;
+                LOG.debug("tablet {}-{} warmup cache isDone {} timeExceeded 
{}",
+                        clusterId, tabletId, isDone, timeExceeded);
+                if (isDone || timeExceeded) {
+                    if (!isDone) {
+                        // timeout but not done, not normal, info log
+                        LOG.info("{}-{} warmup cache timeout {}, forced to 
change the mapping",
+                                clusterId, tabletId, currentTaskTimeout);
+                    } else {
+                        // done, normal
+                        LOG.debug("{}-{} warmup cache done, change the 
mapping", clusterId, tabletId);
+                    }
+                    shouldUpdateMapping = true;
+                }
+                break;
+            }
+            case SYNC_WARMUP: {
+                if (isDone) {
+                    // done, normal
+                    LOG.debug("{} sync cache done, change the mapping", 
tabletId);
+                    shouldUpdateMapping = true;
+                }
+                break;
+            }
+            default:
+                break;
+        }
+
+        if (!shouldUpdateMapping) {
+            return;
+        }
+
+        updateClusterToBeMap(task.pickedTablet, task.destBe, clusterId, infos);
+
+        if (LOG.isDebugEnabled()) {
+            LOG.debug("remove tablet {}-{}", clusterId, 
task.pickedTablet.getId());
+        }
+        tabletToInfightTask.remove(new 
InfightTablet(task.pickedTablet.getId(), clusterId));
+
+        if (BalanceTypeEnum.SYNC_WARMUP.equals(currentBalanceType)) {
+            try {
+                // send sync cache rpc again, ignore the result, the best 
effort to sync some new data
+                sendPreHeatingRpc(task.pickedTablet, task.srcBe, task.destBe);
+            } catch (Exception e) {
+                LOG.warn("Failed to preheat tablet {} from {} to {}, "
+                                + "help msg turn off fe config 
enable_cloud_warm_up_for_rebalance",
+                        task.pickedTablet.getId(), task.srcBe, task.destBe, e);
+            }
+        }
+    }
+
     private void updateBeToTablets(Tablet pickedTablet, long srcBe, long 
destBe,
             Map<Long, Set<Tablet>> globalBeToTablets,
             Map<Long, Map<Long, Set<Tablet>>> beToTabletsInTable,
@@ -1050,6 +1171,10 @@ public class CloudTabletRebalancer extends MasterDaemon {
         long avgNum = totalTabletsNum / beNum;
         long transferNum = calculateTransferNum(avgNum);
 
+        BalanceTypeEnum currentBalanceType = getCurrentBalanceType(clusterId);
+        LOG.debug("balance type {}, be num {}, total tablets num {}, avg num 
{}, transfer num {}",
+                currentBalanceType, beNum, totalTabletsNum, avgNum, 
transferNum);
+
         for (int i = 0; i < transferNum; i++) {
             TransferPairInfo pairInfo = new TransferPairInfo();
             if (!getTransferPair(bes, beToTablets, avgNum, pairInfo)) {
@@ -1069,17 +1194,34 @@ public class CloudTabletRebalancer extends MasterDaemon 
{
             CloudReplica cloudReplica = (CloudReplica) 
pickedTablet.getReplicas().get(0);
             Backend srcBackend = Env.getCurrentSystemInfo().getBackend(srcBe);
 
-            if (Config.enable_cloud_warm_up_for_rebalance && srcBackend != 
null && srcBackend.isAlive()) {
-                if (isConflict(srcBe, destBe, cloudReplica, balanceType,
-                        futurePartitionToTablets, futureBeToTabletsInTable)) {
+            if ((BalanceTypeEnum.WITHOUT_WARMUP.equals(currentBalanceType)
+                    || 
BalanceTypeEnum.PEER_READ_ASYNC_WARMUP.equals(currentBalanceType))
+                    && srcBackend != null && srcBackend.isAlive()) {
+                // direct switch, update fe meta directly, not send preheating 
task
+                if (isConflict(srcBe, destBe, cloudReplica, balanceType, 
partitionToTablets, beToTabletsInTable)) {
                     continue;
                 }
-                preheatAndUpdateTablet(pickedTablet, srcBe, destBe, clusterId, 
balanceType, beToTablets);
+                transferTablet(pickedTablet, srcBe, destBe, clusterId, 
balanceType, infos);
+                if 
(BalanceTypeEnum.PEER_READ_ASYNC_WARMUP.equals(currentBalanceType)) {
+                    LOG.debug("directly switch {} from {} to {}, cluster {}", 
pickedTablet.getId(), srcBe, destBe,
+                            clusterId);
+                    // send sync cache rpc, best effort
+                    try {
+                        sendPreHeatingRpc(pickedTablet, srcBe, destBe);
+                    } catch (Exception e) {
+                        LOG.debug("Failed to preheat tablet {} from {} to {}, "
+                                + "directly policy, just ignore the error",
+                                pickedTablet.getId(), srcBe, destBe, e);
+                        return;
+                    }
+                }
             } else {
-                if (isConflict(srcBe, destBe, cloudReplica, balanceType, 
partitionToTablets, beToTabletsInTable)) {
+                // cache warm up
+                if (isConflict(srcBe, destBe, cloudReplica, balanceType,
+                        futurePartitionToTablets, futureBeToTabletsInTable)) {
                     continue;
                 }
-                transferTablet(pickedTablet, srcBe, destBe, clusterId, 
balanceType, infos);
+                preheatAndUpdateTablet(pickedTablet, srcBe, destBe, clusterId, 
balanceType, beToTablets);
             }
         }
     }
@@ -1147,7 +1289,8 @@ public class CloudTabletRebalancer extends MasterDaemon {
 
     private void transferTablet(Tablet pickedTablet, long srcBe, long destBe, 
String clusterId,
                             BalanceType balanceType, 
List<UpdateCloudReplicaInfo> infos) {
-        LOG.info("transfer {} from {} to {}, cluster {}", 
pickedTablet.getId(), srcBe, destBe, clusterId);
+        LOG.info("transfer {} from {} to {}, cluster {}, type {}",
+                pickedTablet.getId(), srcBe, destBe, clusterId, balanceType);
         updateBeToTablets(pickedTablet, srcBe, destBe,
                 beToTabletsGlobal, beToTabletsInTable, partitionToTablets);
         updateBeToTablets(pickedTablet, srcBe, destBe,
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/cloud/catalog/ComputeGroup.java 
b/fe/fe-core/src/main/java/org/apache/doris/cloud/catalog/ComputeGroup.java
index 82c24afd7c5..c895f13f7a0 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/cloud/catalog/ComputeGroup.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/cloud/catalog/ComputeGroup.java
@@ -18,7 +18,11 @@
 package org.apache.doris.cloud.catalog;
 
 import org.apache.doris.cloud.proto.Cloud;
+import org.apache.doris.common.Config;
+import org.apache.doris.common.DdlException;
 
+import com.google.common.collect.ImmutableSet;
+import com.google.common.collect.Maps;
 import com.google.gson.Gson;
 import lombok.Getter;
 import lombok.Setter;
@@ -33,6 +37,27 @@ import java.util.Map;
 public class ComputeGroup {
     private static final Logger LOG = LogManager.getLogger(ComputeGroup.class);
 
+    public static final String BALANCE_TYPE = "balance_type";
+
+    public static final String BALANCE_WARM_UP_TASK_TIMEOUT = 
"balance_warm_up_task_timeout";
+
+    private static final ImmutableSet<String> ALL_PROPERTIES_NAME = new 
ImmutableSet.Builder<String>()
+            .add(BALANCE_TYPE).add(BALANCE_WARM_UP_TASK_TIMEOUT).build();
+
+    private static final Map<String, String> ALL_PROPERTIES_DEFAULT_VALUE_MAP 
= Maps.newHashMap();
+
+    public static final int DEFAULT_BALANCE_WARM_UP_TASK_TIMEOUT = 
Config.cloud_pre_heating_time_limit_sec;
+    public static final BalanceTypeEnum DEFAULT_COMPUTE_GROUP_BALANCE_ENUM
+            = 
BalanceTypeEnum.fromString(Config.cloud_warm_up_for_rebalance_type);
+
+    static {
+        ALL_PROPERTIES_DEFAULT_VALUE_MAP.put(BALANCE_TYPE, 
DEFAULT_COMPUTE_GROUP_BALANCE_ENUM.getValue());
+        if 
(BalanceTypeEnum.ASYNC_WARMUP.getValue().equals(Config.cloud_warm_up_for_rebalance_type))
 {
+            ALL_PROPERTIES_DEFAULT_VALUE_MAP.put(BALANCE_WARM_UP_TASK_TIMEOUT,
+                    String.valueOf(DEFAULT_BALANCE_WARM_UP_TASK_TIMEOUT));
+        }
+    }
+
     private enum PolicyTypeEnum {
         ActiveStandby,
     }
@@ -110,6 +135,10 @@ public class ComputeGroup {
     @Setter
     private boolean needRebuildFileCache = false;
 
+    @Getter
+    @Setter
+    private Map<String, String> properties = new 
LinkedHashMap<>(ALL_PROPERTIES_DEFAULT_VALUE_MAP);
+
     public ComputeGroup(String id, String name, ComputeTypeEnum type) {
         this.id = id;
         this.name = name;
@@ -134,6 +163,129 @@ public class ComputeGroup {
         return policy.getStandbyComputeGroup();
     }
 
+    private void validateTimeoutRestriction(Map<String, String> 
inputProperties) throws DdlException {
+        if (!properties.containsKey(BALANCE_TYPE)) {
+            return;
+        }
+        String originalBalanceType = properties.get(BALANCE_TYPE);
+        if 
(BalanceTypeEnum.ASYNC_WARMUP.getValue().equals(originalBalanceType)) {
+            return;
+        }
+
+        if (inputProperties.containsKey(BALANCE_TYPE)
+                && 
BalanceTypeEnum.ASYNC_WARMUP.getValue().equals(inputProperties.get(BALANCE_TYPE)))
 {
+            return;
+        }
+
+        if (inputProperties.containsKey(BALANCE_WARM_UP_TASK_TIMEOUT)) {
+            throw new DdlException("Property " + BALANCE_WARM_UP_TASK_TIMEOUT
+                    + " cannot be set when current " + BALANCE_TYPE + " is " + 
originalBalanceType
+                    + ". Only async_warmup type supports timeout setting.");
+        }
+    }
+
+    /**
+     * Validate a single property key-value pair.
+     */
+    private static void validateProperty(String key, String value) throws 
DdlException {
+        if (value == null || value.isEmpty()) {
+            return;
+        }
+
+        if (!ALL_PROPERTIES_NAME.contains(key)) {
+            throw new DdlException("Property " + key + " is not supported");
+        }
+
+        // Validate specific properties
+        if (BALANCE_TYPE.equals(key)) {
+            if (!BalanceTypeEnum.isValid(value)) {
+                throw new DdlException("Property " + BALANCE_TYPE
+                    + " only support without_warmup or async_warmup or 
sync_warmup");
+            }
+        } else if (BALANCE_WARM_UP_TASK_TIMEOUT.equals(key)) {
+            try {
+                int timeout = Integer.parseInt(value);
+                if (timeout <= 0) {
+                    throw new DdlException("Property " + 
BALANCE_WARM_UP_TASK_TIMEOUT + " must be positive integer");
+                }
+            } catch (NumberFormatException e) {
+                throw new DdlException("Property " + 
BALANCE_WARM_UP_TASK_TIMEOUT + " must be positive integer");
+            }
+        }
+    }
+
+    public void checkProperties(Map<String, String> inputProperties) throws 
DdlException {
+        if (inputProperties == null || inputProperties.isEmpty()) {
+            return;
+        }
+
+        for (Map.Entry<String, String> entry : inputProperties.entrySet()) {
+            validateProperty(entry.getKey(), entry.getValue());
+        }
+
+        validateTimeoutRestriction(inputProperties);
+    }
+
+    public void modifyProperties(Map<String, String> inputProperties) throws 
DdlException {
+        String balanceType = inputProperties.get(BALANCE_TYPE);
+        if (balanceType == null) {
+            return;
+        }
+        if (BalanceTypeEnum.WITHOUT_WARMUP.getValue().equals(balanceType)
+                || BalanceTypeEnum.SYNC_WARMUP.getValue().equals(balanceType)
+                || 
BalanceTypeEnum.PEER_READ_ASYNC_WARMUP.getValue().equals(balanceType)) {
+            // delete BALANCE_WARM_UP_TASK_TIMEOUT if exists
+            properties.remove(BALANCE_WARM_UP_TASK_TIMEOUT);
+        } else if 
(BalanceTypeEnum.ASYNC_WARMUP.getValue().equals(balanceType)) {
+            // if BALANCE_WARM_UP_TASK_TIMEOUT exists, it has been validated 
in validateProperty
+            if (!properties.containsKey(BALANCE_WARM_UP_TASK_TIMEOUT)) {
+                properties.put(BALANCE_WARM_UP_TASK_TIMEOUT, 
String.valueOf(DEFAULT_BALANCE_WARM_UP_TASK_TIMEOUT));
+            }
+        }
+    }
+
+    // set properties, just set in periodic instance status checker
+    public void setProperties(Map<String, String> propertiesInMs) {
+        if (propertiesInMs == null || propertiesInMs.isEmpty()) {
+            return;
+        }
+
+        for (Map.Entry<String, String> entry : propertiesInMs.entrySet()) {
+            String key = entry.getKey();
+            String value = entry.getValue();
+
+            try {
+                validateProperty(key, value);
+            } catch (DdlException e) {
+                LOG.warn("ignore invalid property. compute group: {}, key: {}, 
value: {}, error: {}",
+                        name, key, value, e.getMessage());
+                continue;
+            }
+
+            if (value != null && !value.isEmpty()) {
+                properties.put(key, value);
+            }
+        }
+    }
+
+    public BalanceTypeEnum getBalanceType() {
+        String balanceType = properties.get(BALANCE_TYPE);
+        BalanceTypeEnum type = BalanceTypeEnum.fromString(balanceType);
+        if (type == null) {
+            return BalanceTypeEnum.ASYNC_WARMUP;
+        }
+        return type;
+    }
+
+    public int getBalanceWarmUpTaskTimeout() {
+        String timeoutStr = properties.get(BALANCE_WARM_UP_TASK_TIMEOUT);
+        try {
+            return Integer.parseInt(timeoutStr);
+        } catch (NumberFormatException e) {
+            return DEFAULT_BALANCE_WARM_UP_TASK_TIMEOUT;
+        }
+    }
+
     @Override
     public String toString() {
         Map<String, String> showMap = new LinkedHashMap<>();
@@ -143,6 +295,7 @@ public class ComputeGroup {
         showMap.put("unavailableSince", String.valueOf(unavailableSince));
         showMap.put("availableSince", String.valueOf(availableSince));
         showMap.put("policy", policy == null ? "no_policy" : 
policy.toString());
+        showMap.put("properties", properties.toString());
         Gson gson = new Gson();
         return gson.toJson(showMap);
     }
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/cloud/system/CloudSystemInfoService.java
 
b/fe/fe-core/src/main/java/org/apache/doris/cloud/system/CloudSystemInfoService.java
index de28e19dddc..c429bc57347 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/cloud/system/CloudSystemInfoService.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/cloud/system/CloudSystemInfoService.java
@@ -1658,4 +1658,48 @@ public class CloudSystemInfoService extends 
SystemInfoService {
             LOG.info("alter rename compute group, request: {}, response: {}", 
request, response);
         }
     }
+
+    public void alterComputeGroupProperties(String computeGroupName, 
Map<String, String> properties)
+            throws UserException {
+        String cloudInstanceId = ((CloudEnv) 
Env.getCurrentEnv()).getCloudInstanceId();
+        if (Strings.isNullOrEmpty(cloudInstanceId)) {
+            throw new DdlException("unable to alter compute group properties 
due to empty cloud_instance_id");
+        }
+        String computeGroupId = ((CloudSystemInfoService) 
Env.getCurrentSystemInfo())
+                .getCloudClusterIdByName(computeGroupName);
+        if (Strings.isNullOrEmpty(computeGroupId)) {
+            LOG.info("alter compute group properties {} not found, unable to 
alter", computeGroupName);
+            throw new DdlException("compute group '" + computeGroupName + "' 
not found, unable to alter properties");
+        }
+
+        ClusterPB clusterPB = ClusterPB.newBuilder()
+                .setClusterId(computeGroupId)
+                .setClusterName(computeGroupName)
+                .setType(ClusterPB.Type.COMPUTE)
+                .putAllProperties(properties)
+                .build();
+
+        Cloud.AlterClusterRequest request = 
Cloud.AlterClusterRequest.newBuilder()
+                .setInstanceId(((CloudEnv) 
Env.getCurrentEnv()).getCloudInstanceId())
+                .setOp(Cloud.AlterClusterRequest.Operation.ALTER_PROPERTIES)
+                .setCluster(clusterPB)
+                .build();
+
+
+        Cloud.AlterClusterResponse response = null;
+        try {
+            response = MetaServiceProxy.getInstance().alterCluster(request);
+            if (response.getStatus().getCode() != Cloud.MetaServiceCode.OK) {
+                LOG.warn("alter compute group properties not ok, response: 
{}", response);
+                throw new UserException("failed to alter compute group 
properties errorCode: "
+                    + response.getStatus().getCode()
+                    + " msg: " + response.getStatus().getMsg() + " may be you 
can try later");
+            }
+        } catch (RpcException e) {
+            LOG.warn("alter compute group properties rpc exception");
+            throw new UserException("failed to alter compute group 
properties", e);
+        } finally {
+            LOG.info("alter compute group properties, request: {}, response: 
{}", request, response);
+        }
+    }
 }
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/parser/LogicalPlanBuilder.java
 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/parser/LogicalPlanBuilder.java
index d6a2cd412a6..f348a1bfeaa 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/parser/LogicalPlanBuilder.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/parser/LogicalPlanBuilder.java
@@ -98,6 +98,7 @@ import 
org.apache.doris.nereids.DorisParser.AliasedQueryContext;
 import org.apache.doris.nereids.DorisParser.AlterCatalogCommentContext;
 import org.apache.doris.nereids.DorisParser.AlterCatalogPropertiesContext;
 import org.apache.doris.nereids.DorisParser.AlterCatalogRenameContext;
+import org.apache.doris.nereids.DorisParser.AlterComputeGroupContext;
 import org.apache.doris.nereids.DorisParser.AlterDatabasePropertiesContext;
 import org.apache.doris.nereids.DorisParser.AlterDatabaseRenameContext;
 import org.apache.doris.nereids.DorisParser.AlterDatabaseSetQuotaContext;
@@ -620,6 +621,7 @@ import 
org.apache.doris.nereids.trees.plans.commands.AlterCatalogPropertiesComma
 import org.apache.doris.nereids.trees.plans.commands.AlterCatalogRenameCommand;
 import org.apache.doris.nereids.trees.plans.commands.AlterColocateGroupCommand;
 import org.apache.doris.nereids.trees.plans.commands.AlterColumnStatsCommand;
+import org.apache.doris.nereids.trees.plans.commands.AlterComputeGroupCommand;
 import 
org.apache.doris.nereids.trees.plans.commands.AlterDatabasePropertiesCommand;
 import org.apache.doris.nereids.trees.plans.commands.AlterJobCommand;
 import org.apache.doris.nereids.trees.plans.commands.AlterMTMVCommand;
@@ -6329,6 +6331,13 @@ public class LogicalPlanBuilder extends 
DorisParserBaseVisitor<Object> {
         return new AlterWorkloadGroupCommand(cgName, 
stripQuotes(ctx.name.getText()), properties);
     }
 
+    @Override
+    public LogicalPlan visitAlterComputeGroup(AlterComputeGroupContext ctx) {
+        Map<String, String> properties = ctx.propertyClause() != null
+                ? Maps.newHashMap(visitPropertyClause(ctx.propertyClause())) : 
Maps.newHashMap();
+        return new AlterComputeGroupCommand(ctx.name.getText(), properties);
+    }
+
     @Override
     public LogicalPlan visitAlterWorkloadPolicy(AlterWorkloadPolicyContext 
ctx) {
         Map<String, String> properties = ctx.propertyClause() != null
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/PlanType.java 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/PlanType.java
index 6fc59b0f584..f50bd047f3a 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/PlanType.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/PlanType.java
@@ -206,6 +206,7 @@ public enum PlanType {
     ALTER_STORAGE_POLICY_COMMAND,
     ALTER_STORAGE_VAULT,
     ALTER_WORKLOAD_GROUP_COMMAND,
+    ALTER_COMPUTE_GROUP_COMMAND,
     ALTER_WORKLOAD_POLICY_COMMAND,
     ALTER_DATABASE_RENAME_COMMAND,
     ALTER_DATABASE_SET_DATA_QUOTA_COMMAND,
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/AlterComputeGroupCommand.java
 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/AlterComputeGroupCommand.java
new file mode 100644
index 00000000000..8a295678f48
--- /dev/null
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/AlterComputeGroupCommand.java
@@ -0,0 +1,98 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.nereids.trees.plans.commands;
+
+import org.apache.doris.catalog.Env;
+import org.apache.doris.cloud.catalog.ComputeGroup;
+import org.apache.doris.cloud.system.CloudSystemInfoService;
+import org.apache.doris.common.AnalysisException;
+import org.apache.doris.common.Config;
+import org.apache.doris.common.ErrorCode;
+import org.apache.doris.common.ErrorReport;
+import org.apache.doris.common.UserException;
+import org.apache.doris.mysql.privilege.PrivPredicate;
+import org.apache.doris.nereids.trees.plans.PlanType;
+import org.apache.doris.nereids.trees.plans.visitor.PlanVisitor;
+import org.apache.doris.qe.ConnectContext;
+import org.apache.doris.qe.StmtExecutor;
+
+//import org.apache.commons.lang3.StringUtils;
+import java.util.Map;
+
+/**
+ * alter compute group command
+ */
+public class AlterComputeGroupCommand extends AlterCommand {
+    private final String computeGroupName;
+    private final Map<String, String> properties;
+
+    /**
+     * constructor
+     */
+    public AlterComputeGroupCommand(String computeGroupName, Map<String, 
String> properties) {
+        super(PlanType.ALTER_COMPUTE_GROUP_COMMAND);
+        this.computeGroupName = computeGroupName;
+        this.properties = properties;
+    }
+
+    /**
+     * validate
+     */
+    public void validate(ConnectContext connectContext) throws UserException {
+        if (Config.isNotCloudMode()) {
+            throw new AnalysisException("Currently, Alter compute group is 
only supported in cloud mode");
+        }
+        // check auth
+        if 
(!Env.getCurrentEnv().getAccessManager().checkGlobalPriv(ConnectContext.get(), 
PrivPredicate.ADMIN)) {
+            
ErrorReport.reportAnalysisException(ErrorCode.ERR_SPECIFIC_ACCESS_DENIED_ERROR, 
"ADMIN");
+        }
+
+        if (properties == null || properties.isEmpty()) {
+            throw new AnalysisException("Compute Group properties can't be 
empty");
+        }
+
+        CloudSystemInfoService cloudSys = ((CloudSystemInfoService) 
Env.getCurrentSystemInfo());
+        // check compute group exist
+        ComputeGroup cg = cloudSys.getComputeGroupByName(computeGroupName);
+        if (cg == null) {
+            throw new AnalysisException("Compute Group " + computeGroupName + 
" does not exist");
+        }
+
+        if (cg.isVirtual()) {
+            throw new AnalysisException("Virtual Compute Group " + 
computeGroupName + " can not be altered");
+        }
+
+        // check compute group's properties can be modified
+        cg.checkProperties(properties);
+
+        cg.modifyProperties(properties);
+    }
+
+    @Override
+    public void doRun(ConnectContext ctx, StmtExecutor executor) throws 
Exception {
+        validate(ctx);
+        CloudSystemInfoService cloudSys = ((CloudSystemInfoService) 
Env.getCurrentSystemInfo());
+        // send rpc to ms
+        cloudSys.alterComputeGroupProperties(computeGroupName, properties);
+    }
+
+    @Override
+    public <R, C> R accept(PlanVisitor<R, C> visitor, C context) {
+        return visitor.visitAlterComputeGroupCommand(this, context);
+    }
+}
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/ShowClustersCommand.java
 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/ShowClustersCommand.java
index da61a861ce6..7ea77b085a8 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/ShowClustersCommand.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/ShowClustersCommand.java
@@ -59,10 +59,12 @@ import java.util.stream.Collectors;
 public class ShowClustersCommand extends ShowCommand {
     // sql: show clusters;
     public static final ImmutableList<String> CLUSTER_TITLE_NAMES = new 
ImmutableList.Builder<String>()
-            
.add("cluster").add("is_current").add("users").add("backend_num").add("sub_clusters").add("policy").build();
+            .add("cluster").add("is_current").add("users").add("backend_num")
+            .add("sub_clusters").add("policy").add("properties").build();
     // sql: show compute groups;
     public static final ImmutableList<String> COMPUTE_GROUP_TITLE_NAMES = new 
ImmutableList.Builder<String>()
-            
.add("Name").add("IsCurrent").add("Users").add("BackendNum").add("SubComputeGroups").add("Policy").build();
+            .add("Name").add("IsCurrent").add("Users").add("BackendNum")
+            .add("SubComputeGroups").add("Policy").add("Properties").build();
 
     private static final Logger LOG = 
LogManager.getLogger(ShowClustersCommand.class);
     private final boolean isComputeGroup;
@@ -105,13 +107,17 @@ public class ShowClustersCommand extends ShowCommand {
         clusterNameSet.addAll(clusterNames);
 
         for (String clusterName : clusterNameSet) {
-            ArrayList<String> row = Lists.newArrayList(clusterName);
             // current_used, users
             if (!Env.getCurrentEnv().getAccessManager()
                     
.checkCloudPriv(ConnectContext.get().getCurrentUserIdentity(), clusterName,
                             PrivPredicate.USAGE, ResourceTypeEnum.CLUSTER)) {
                 continue;
             }
+            ComputeGroup cg = cloudSys.getComputeGroupByName(clusterName);
+            if (cg == null) {
+                continue;
+            }
+            ArrayList<String> row = Lists.newArrayList(clusterName);
             String clusterNameFromCtx = "";
             try {
                 clusterNameFromCtx = ctx.getCloudCluster();
@@ -142,16 +148,14 @@ public class ShowClustersCommand extends ShowCommand {
                 rows.add(row);
                 row.add(subClusterNames);
                 row.add(policy);
+                row.add(cg.getProperties().toString());
                 continue;
             }
             // virtual compute group
             // virtual cg backends eq 0
             row.add(String.valueOf(0));
             rows.add(row);
-            ComputeGroup cg = cloudSys.getComputeGroupByName(clusterName);
-            if (cg == null) {
-                continue;
-            }
+
             String activeCluster = cg.getPolicy().getActiveComputeGroup();
             String standbyCluster = cg.getPolicy().getStandbyComputeGroup();
             // first active, second standby
@@ -159,6 +163,7 @@ public class ShowClustersCommand extends ShowCommand {
             row.add(subClusterNames);
             // Policy
             row.add(cg.getPolicy().toString());
+            row.add(cg.getProperties().toString());
         }
 
         return new ShowResultSet(getMetaData(), rows);
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/visitor/CommandVisitor.java
 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/visitor/CommandVisitor.java
index 3a8ff435ead..b663110a5ca 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/visitor/CommandVisitor.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/visitor/CommandVisitor.java
@@ -41,6 +41,7 @@ import 
org.apache.doris.nereids.trees.plans.commands.AlterCatalogPropertiesComma
 import org.apache.doris.nereids.trees.plans.commands.AlterCatalogRenameCommand;
 import org.apache.doris.nereids.trees.plans.commands.AlterColocateGroupCommand;
 import org.apache.doris.nereids.trees.plans.commands.AlterColumnStatsCommand;
+import org.apache.doris.nereids.trees.plans.commands.AlterComputeGroupCommand;
 import 
org.apache.doris.nereids.trees.plans.commands.AlterDatabasePropertiesCommand;
 import org.apache.doris.nereids.trees.plans.commands.AlterJobCommand;
 import org.apache.doris.nereids.trees.plans.commands.AlterJobStatusCommand;
@@ -808,6 +809,10 @@ public interface CommandVisitor<R, C> {
         return visitCommand(alterWorkloadGroupCommand, context);
     }
 
+    default R visitAlterComputeGroupCommand(AlterComputeGroupCommand 
alterComputeGroupCommand, C context) {
+        return visitCommand(alterComputeGroupCommand, context);
+    }
+
     default R visitAlterWorkloadPolicyCommand(AlterWorkloadPolicyCommand 
alterWorkloadPolicyCommand, C context) {
         return visitCommand(alterWorkloadPolicyCommand, context);
     }
diff --git 
a/fe/fe-core/src/test/java/org/apache/doris/cloud/catalog/ComputeGroupTest.java 
b/fe/fe-core/src/test/java/org/apache/doris/cloud/catalog/ComputeGroupTest.java
new file mode 100644
index 00000000000..c61f2abefa4
--- /dev/null
+++ 
b/fe/fe-core/src/test/java/org/apache/doris/cloud/catalog/ComputeGroupTest.java
@@ -0,0 +1,341 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.cloud.catalog;
+
+import org.apache.doris.common.DdlException;
+
+import com.google.common.collect.Maps;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+import java.util.Map;
+
+public class ComputeGroupTest {
+    private ComputeGroup computeGroup;
+
+    @BeforeEach
+    public void setUp() {
+        computeGroup = new ComputeGroup("test_id", "test_group", 
ComputeGroup.ComputeTypeEnum.COMPUTE);
+    }
+
+    @Test
+    public void testCheckPropertiesWithNull() throws DdlException {
+        computeGroup.checkProperties(null);
+        computeGroup.checkProperties(Maps.newHashMap());
+    }
+
+    @Test
+    public void testCheckPropertiesWithValidBalanceType() throws DdlException {
+        // 测试有效的balance_type
+        Map<String, String> properties = Maps.newHashMap();
+        properties.put(ComputeGroup.BALANCE_TYPE, 
BalanceTypeEnum.WITHOUT_WARMUP.getValue());
+        computeGroup.checkProperties(properties);
+
+        properties.put(ComputeGroup.BALANCE_TYPE, 
BalanceTypeEnum.ASYNC_WARMUP.getValue());
+        computeGroup.checkProperties(properties);
+
+        properties.put(ComputeGroup.BALANCE_TYPE, 
BalanceTypeEnum.SYNC_WARMUP.getValue());
+        computeGroup.checkProperties(properties);
+
+        properties.put(ComputeGroup.BALANCE_TYPE, 
BalanceTypeEnum.PEER_READ_ASYNC_WARMUP.getValue());
+        computeGroup.checkProperties(properties);
+    }
+
+    @Test
+    public void testCheckPropertiesWithInvalidBalanceType() {
+        // 测试无效的balance_type
+        Map<String, String> properties = Maps.newHashMap();
+        properties.put(ComputeGroup.BALANCE_TYPE, "invalid_type");
+
+        Assertions.assertThrows(DdlException.class, () -> {
+            computeGroup.checkProperties(properties);
+        });
+    }
+
+    @Test
+    public void testCheckPropertiesWithValidTimeout() throws DdlException {
+        // 测试有效的timeout
+        Map<String, String> properties = Maps.newHashMap();
+        properties.put(ComputeGroup.BALANCE_TYPE, 
BalanceTypeEnum.ASYNC_WARMUP.getValue());
+        properties.put(ComputeGroup.BALANCE_WARM_UP_TASK_TIMEOUT, "300");
+        computeGroup.checkProperties(properties);
+
+        properties.put(ComputeGroup.BALANCE_WARM_UP_TASK_TIMEOUT, "1");
+        computeGroup.checkProperties(properties);
+    }
+
+    @Test
+    public void testCheckPropertiesWithInvalidTimeout() {
+        // 测试无效的timeout
+        Map<String, String> properties = Maps.newHashMap();
+        properties.put(ComputeGroup.BALANCE_WARM_UP_TASK_TIMEOUT, "-1");
+
+        Assertions.assertThrows(DdlException.class, () -> {
+            computeGroup.checkProperties(properties);
+        });
+
+        properties.put(ComputeGroup.BALANCE_WARM_UP_TASK_TIMEOUT, "invalid");
+        Assertions.assertThrows(DdlException.class, () -> {
+            computeGroup.checkProperties(properties);
+        });
+    }
+
+    @Test
+    public void testCheckPropertiesWithUnsupportedProperty() {
+        // 测试不支持的属性
+        Map<String, String> properties = Maps.newHashMap();
+        properties.put("unsupported_property", "value");
+
+        Assertions.assertThrows(DdlException.class, () -> {
+            computeGroup.checkProperties(properties);
+        });
+    }
+
+    @Test
+    public void testModifyPropertiesWithDirectSwitch() throws DdlException {
+        // 测试without_warmup类型,应该删除timeout
+        Map<String, String> inputProperties = Maps.newHashMap();
+        inputProperties.put(ComputeGroup.BALANCE_TYPE, 
BalanceTypeEnum.WITHOUT_WARMUP.getValue());
+        inputProperties.put(ComputeGroup.BALANCE_WARM_UP_TASK_TIMEOUT,
+                
String.valueOf(ComputeGroup.DEFAULT_BALANCE_WARM_UP_TASK_TIMEOUT));
+
+        // 先设置timeout到properties中
+        
computeGroup.getProperties().put(ComputeGroup.BALANCE_WARM_UP_TASK_TIMEOUT,
+                
String.valueOf(ComputeGroup.DEFAULT_BALANCE_WARM_UP_TASK_TIMEOUT));
+        
Assertions.assertTrue(computeGroup.getProperties().containsKey(ComputeGroup.BALANCE_WARM_UP_TASK_TIMEOUT));
+
+        computeGroup.modifyProperties(inputProperties);
+
+        // 验证timeout被删除
+        
Assertions.assertFalse(computeGroup.getProperties().containsKey(ComputeGroup.BALANCE_WARM_UP_TASK_TIMEOUT));
+    }
+
+    @Test
+    public void testModifyPropertiesWithSyncCache() throws DdlException {
+        // 测试sync_cache类型,应该删除timeout
+        Map<String, String> inputProperties = Maps.newHashMap();
+        inputProperties.put(ComputeGroup.BALANCE_TYPE, 
BalanceTypeEnum.SYNC_WARMUP.getValue());
+        inputProperties.put(ComputeGroup.BALANCE_WARM_UP_TASK_TIMEOUT,
+                
String.valueOf(ComputeGroup.DEFAULT_BALANCE_WARM_UP_TASK_TIMEOUT));
+
+        // 先设置timeout到properties中
+        
computeGroup.getProperties().put(ComputeGroup.BALANCE_WARM_UP_TASK_TIMEOUT,
+                
String.valueOf(ComputeGroup.DEFAULT_BALANCE_WARM_UP_TASK_TIMEOUT));
+        
Assertions.assertTrue(computeGroup.getProperties().containsKey(ComputeGroup.BALANCE_WARM_UP_TASK_TIMEOUT));
+
+        computeGroup.modifyProperties(inputProperties);
+
+        // 验证timeout被删除
+        
Assertions.assertFalse(computeGroup.getProperties().containsKey(ComputeGroup.BALANCE_WARM_UP_TASK_TIMEOUT));
+    }
+
+    @Test
+    public void testCheckPropertiesWithBalanceTypeTransition() throws 
DdlException {
+        computeGroup.getProperties().put(ComputeGroup.BALANCE_TYPE, 
BalanceTypeEnum.WITHOUT_WARMUP.getValue());
+        Map<String, String> inputProperties = Maps.newHashMap();
+        inputProperties.put(ComputeGroup.BALANCE_TYPE, 
BalanceTypeEnum.ASYNC_WARMUP.getValue());
+        computeGroup.checkProperties(inputProperties);
+    }
+
+    @Test
+    public void testCheckPropertiesWithWarmupCacheToWarmupCache() throws 
DdlException {
+        computeGroup.getProperties().put(ComputeGroup.BALANCE_TYPE, 
BalanceTypeEnum.ASYNC_WARMUP.getValue());
+        Map<String, String> inputProperties = Maps.newHashMap();
+        inputProperties.put(ComputeGroup.BALANCE_TYPE, 
BalanceTypeEnum.ASYNC_WARMUP.getValue());
+        computeGroup.checkProperties(inputProperties);
+    }
+
+    @Test
+    public void testCheckPropertiesWithDirectSwitchToDirectSwitch() throws 
DdlException {
+        // 测试从direct_switch转换到direct_switch,不需要设置timeout
+        computeGroup.getProperties().put(ComputeGroup.BALANCE_TYPE, 
BalanceTypeEnum.WITHOUT_WARMUP.getValue());
+        Map<String, String> inputProperties = Maps.newHashMap();
+        inputProperties.put(ComputeGroup.BALANCE_TYPE, 
BalanceTypeEnum.WITHOUT_WARMUP.getValue());
+        computeGroup.checkProperties(inputProperties);
+    }
+
+    @Test
+    public void testModifyPropertiesWithWarmupCacheAndExistingTimeout() throws 
DdlException {
+        // 测试async_warmup类型,已存在timeout,不应该添加默认值
+        Map<String, String> inputProperties = Maps.newHashMap();
+        inputProperties.put(ComputeGroup.BALANCE_TYPE, 
BalanceTypeEnum.ASYNC_WARMUP.getValue());
+        inputProperties.put(ComputeGroup.BALANCE_WARM_UP_TASK_TIMEOUT, "600");
+
+        // 先设置timeout到properties中
+        
computeGroup.getProperties().put(ComputeGroup.BALANCE_WARM_UP_TASK_TIMEOUT, 
"500");
+        String originalTimeout = 
computeGroup.getProperties().get(ComputeGroup.BALANCE_WARM_UP_TASK_TIMEOUT);
+
+        computeGroup.modifyProperties(inputProperties);
+
+        // 验证timeout没有被修改, 这里的意思是用户已经设置过timeout了,就不应该被覆盖
+        Assertions.assertEquals(originalTimeout, 
computeGroup.getProperties().get(ComputeGroup.BALANCE_WARM_UP_TASK_TIMEOUT));
+    }
+
+    @Test
+    public void testModifyPropertiesWithWarmupCacheAndNoTimeout() throws 
DdlException {
+        // 测试async_warmup类型,不存在timeout,应该添加默认值
+        Map<String, String> inputProperties = Maps.newHashMap();
+        inputProperties.put(ComputeGroup.BALANCE_TYPE, 
BalanceTypeEnum.ASYNC_WARMUP.getValue());
+
+        // 确保properties中没有timeout
+        
computeGroup.getProperties().remove(ComputeGroup.BALANCE_WARM_UP_TASK_TIMEOUT);
+        
Assertions.assertFalse(computeGroup.getProperties().containsKey(ComputeGroup.BALANCE_WARM_UP_TASK_TIMEOUT));
+
+        computeGroup.modifyProperties(inputProperties);
+        // 验证默认值被添加
+        
Assertions.assertTrue(computeGroup.getProperties().containsKey(ComputeGroup.BALANCE_WARM_UP_TASK_TIMEOUT));
+        
Assertions.assertEquals(String.valueOf(ComputeGroup.DEFAULT_BALANCE_WARM_UP_TASK_TIMEOUT),
+                
computeGroup.getProperties().get(ComputeGroup.BALANCE_WARM_UP_TASK_TIMEOUT));
+    }
+
+    @Test
+    public void testModifyPropertiesWithNullBalanceType() throws DdlException {
+        // 测试null balance_type,不应该修改properties
+        Map<String, String> inputProperties = Maps.newHashMap();
+        inputProperties.put("other_property", "value");
+
+        Map<String, String> originalProperties = 
Maps.newHashMap(computeGroup.getProperties());
+
+        computeGroup.modifyProperties(inputProperties);
+
+        // 验证properties没有被修改
+        Assertions.assertEquals(originalProperties, 
computeGroup.getProperties());
+    }
+
+    @Test
+    public void testModifyPropertiesWithEmptyInput() throws DdlException {
+        // 测试空输入,不应该修改properties
+        Map<String, String> inputProperties = Maps.newHashMap();
+
+        Map<String, String> originalProperties = 
Maps.newHashMap(computeGroup.getProperties());
+
+        computeGroup.modifyProperties(inputProperties);
+
+        // 验证properties没有被修改
+        Assertions.assertEquals(originalProperties, 
computeGroup.getProperties());
+    }
+
+    @Test
+    public void testCheckPropertiesWithSyncCacheToWarmupCache() throws 
DdlException {
+        computeGroup.getProperties().put(ComputeGroup.BALANCE_TYPE, 
BalanceTypeEnum.SYNC_WARMUP.getValue());
+        Map<String, String> inputProperties = Maps.newHashMap();
+        inputProperties.put(ComputeGroup.BALANCE_TYPE, 
BalanceTypeEnum.ASYNC_WARMUP.getValue());
+        computeGroup.checkProperties(inputProperties);
+    }
+
+    @Test
+    public void testValidateTimeoutRestrictionWithNoCurrentBalanceType() 
throws DdlException {
+        // 测试当前没有设置balance_type的情况
+        computeGroup.getProperties().remove(ComputeGroup.BALANCE_TYPE);
+        Map<String, String> inputProperties = Maps.newHashMap();
+        inputProperties.put(ComputeGroup.BALANCE_WARM_UP_TASK_TIMEOUT, "500");
+        computeGroup.checkProperties(inputProperties);
+    }
+
+    @Test
+    public void testValidateTimeoutRestrictionWithCurrentWarmupCache() throws 
DdlException {
+        // 测试当前balance_type是warmup_cache的情况
+        computeGroup.getProperties().put(ComputeGroup.BALANCE_TYPE, 
BalanceTypeEnum.ASYNC_WARMUP.getValue());
+        Map<String, String> inputProperties = Maps.newHashMap();
+        inputProperties.put(ComputeGroup.BALANCE_WARM_UP_TASK_TIMEOUT, "500");
+        computeGroup.checkProperties(inputProperties);
+    }
+
+    @Test
+    public void testValidateTimeoutRestrictionWithDirectSwitchToWarmupCache() 
throws DdlException {
+        // 测试从direct_switch转换到warmup_cache并设置timeout
+        computeGroup.getProperties().put(ComputeGroup.BALANCE_TYPE, 
BalanceTypeEnum.WITHOUT_WARMUP.getValue());
+        Map<String, String> inputProperties = Maps.newHashMap();
+        inputProperties.put(ComputeGroup.BALANCE_TYPE, 
BalanceTypeEnum.ASYNC_WARMUP.getValue());
+        inputProperties.put(ComputeGroup.BALANCE_WARM_UP_TASK_TIMEOUT, "500");
+        computeGroup.checkProperties(inputProperties);
+    }
+
+    @Test
+    public void 
testValidateTimeoutRestrictionWithSyncCacheToWarmupCacheWithTimeout() throws 
DdlException {
+        // 测试从sync_cache转换到warmup_cache并设置timeout
+        computeGroup.getProperties().put(ComputeGroup.BALANCE_TYPE, 
BalanceTypeEnum.SYNC_WARMUP.getValue());
+        Map<String, String> inputProperties = Maps.newHashMap();
+        inputProperties.put(ComputeGroup.BALANCE_TYPE, 
BalanceTypeEnum.ASYNC_WARMUP.getValue());
+        inputProperties.put(ComputeGroup.BALANCE_WARM_UP_TASK_TIMEOUT, "500");
+        computeGroup.checkProperties(inputProperties);
+    }
+
+    @Test
+    public void testValidateTimeoutRestrictionWithDirectSwitchAndOnlyTimeout() 
{
+        // 测试当前是direct_switch,仅设置timeout应该失败
+        computeGroup.getProperties().put(ComputeGroup.BALANCE_TYPE, 
BalanceTypeEnum.WITHOUT_WARMUP.getValue());
+        Map<String, String> inputProperties = Maps.newHashMap();
+        inputProperties.put(ComputeGroup.BALANCE_WARM_UP_TASK_TIMEOUT, "500");
+
+        Assertions.assertThrows(DdlException.class, () -> {
+            computeGroup.checkProperties(inputProperties);
+        });
+    }
+
+    @Test
+    public void testValidateTimeoutRestrictionWithSyncCacheAndOnlyTimeout() {
+        // 测试当前是sync_cache,仅设置timeout应该失败
+        computeGroup.getProperties().put(ComputeGroup.BALANCE_TYPE, 
BalanceTypeEnum.SYNC_WARMUP.getValue());
+        Map<String, String> inputProperties = Maps.newHashMap();
+        inputProperties.put(ComputeGroup.BALANCE_WARM_UP_TASK_TIMEOUT, "500");
+
+        Assertions.assertThrows(DdlException.class, () -> {
+            computeGroup.checkProperties(inputProperties);
+        });
+    }
+
+    @Test
+    public void 
testValidateTimeoutRestrictionWithDirectSwitchToSyncCacheAndTimeout() {
+        // 测试从direct_switch转换到sync_cache并设置timeout应该失败
+        computeGroup.getProperties().put(ComputeGroup.BALANCE_TYPE, 
BalanceTypeEnum.WITHOUT_WARMUP.getValue());
+        Map<String, String> inputProperties = Maps.newHashMap();
+        inputProperties.put(ComputeGroup.BALANCE_TYPE, 
BalanceTypeEnum.SYNC_WARMUP.getValue());
+        inputProperties.put(ComputeGroup.BALANCE_WARM_UP_TASK_TIMEOUT, "500");
+
+        Assertions.assertThrows(DdlException.class, () -> {
+            computeGroup.checkProperties(inputProperties);
+        });
+    }
+
+    @Test
+    public void 
testValidateTimeoutRestrictionWithDirectSwitchToSameAndTimeout() {
+        // 测试从direct_switch转换到direct_switch并设置timeout应该失败
+        computeGroup.getProperties().put(ComputeGroup.BALANCE_TYPE, 
BalanceTypeEnum.WITHOUT_WARMUP.getValue());
+        Map<String, String> inputProperties = Maps.newHashMap();
+        inputProperties.put(ComputeGroup.BALANCE_TYPE, 
BalanceTypeEnum.WITHOUT_WARMUP.getValue());
+        inputProperties.put(ComputeGroup.BALANCE_WARM_UP_TASK_TIMEOUT, "500");
+
+        Assertions.assertThrows(DdlException.class, () -> {
+            computeGroup.checkProperties(inputProperties);
+        });
+    }
+
+    @Test
+    public void testValidateTimeoutRestrictionWithNoInputTimeout() throws 
DdlException {
+        // 测试输入中没有timeout的情况
+        computeGroup.getProperties().put(ComputeGroup.BALANCE_TYPE, 
BalanceTypeEnum.WITHOUT_WARMUP.getValue());
+        Map<String, String> inputProperties = Maps.newHashMap();
+        inputProperties.put("other_property", "value");
+        Assertions.assertThrows(DdlException.class, () -> {
+            // Property other_property is not supported
+            computeGroup.checkProperties(inputProperties);
+        });
+    }
+}
diff --git 
a/fe/fe-core/src/test/java/org/apache/doris/nereids/trees/plans/commands/AlterComputeGroupCommandTest.java
 
b/fe/fe-core/src/test/java/org/apache/doris/nereids/trees/plans/commands/AlterComputeGroupCommandTest.java
new file mode 100644
index 00000000000..8ff8b60ffe0
--- /dev/null
+++ 
b/fe/fe-core/src/test/java/org/apache/doris/nereids/trees/plans/commands/AlterComputeGroupCommandTest.java
@@ -0,0 +1,243 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.nereids.trees.plans.commands;
+
+import org.apache.doris.backup.CatalogMocker;
+import org.apache.doris.catalog.Database;
+import org.apache.doris.catalog.Env;
+import org.apache.doris.cloud.catalog.ComputeGroup;
+import org.apache.doris.cloud.system.CloudSystemInfoService;
+import org.apache.doris.common.Config;
+import org.apache.doris.common.DdlException;
+import org.apache.doris.common.UserException;
+import org.apache.doris.datasource.InternalCatalog;
+import org.apache.doris.mysql.privilege.AccessControllerManager;
+import org.apache.doris.mysql.privilege.PrivPredicate;
+import org.apache.doris.qe.ConnectContext;
+
+import com.google.common.collect.Maps;
+import mockit.Expectations;
+import mockit.Mocked;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Test;
+
+import java.util.HashMap;
+import java.util.Map;
+
+public class AlterComputeGroupCommandTest {
+    private static final String internalCtl = 
InternalCatalog.INTERNAL_CATALOG_NAME;
+    @Mocked
+    private Env env;
+    @Mocked
+    private AccessControllerManager accessControllerManager;
+    @Mocked
+    private InternalCatalog catalog;
+    @Mocked
+    private ConnectContext connectContext;
+    @Mocked
+    private CloudSystemInfoService cloudSystemInfoService;
+    @Mocked
+    private ComputeGroup computeGroup;
+    private Database db;
+
+    private void runBefore() throws Exception {
+        db = CatalogMocker.mockDb();
+        new Expectations() {
+            {
+                Env.getCurrentEnv();
+                minTimes = 0;
+                result = env;
+
+                env.getAccessManager();
+                minTimes = 0;
+                result = accessControllerManager;
+
+                ConnectContext.get();
+                minTimes = 0;
+                result = connectContext;
+
+                connectContext.isSkipAuth();
+                minTimes = 0;
+                result = true;
+
+                accessControllerManager.checkGlobalPriv(connectContext, 
PrivPredicate.ADMIN);
+                minTimes = 0;
+                result = true;
+            }
+        };
+    }
+
+    @Test
+    public void testValidateNonCloudMode() throws Exception {
+        runBefore();
+        Config.deploy_mode = "non-cloud";
+        Map<String, String> properties = Maps.newHashMap();
+        properties.put("balance_type", "async_warmup");
+        AlterComputeGroupCommand command = new 
AlterComputeGroupCommand("test_group", properties);
+        Assertions.assertThrows(UserException.class, () -> 
command.validate(connectContext));
+    }
+
+    @Test
+    public void testValidateAuthFailed() throws Exception {
+        runBefore();
+        Config.deploy_mode = "cloud";
+        new Expectations() {
+            {
+                accessControllerManager.checkGlobalPriv(connectContext, 
PrivPredicate.ADMIN);
+                minTimes = 0;
+                result = false;
+            }
+        };
+
+        Map<String, String> properties = Maps.newHashMap();
+        properties.put("balance_type", "async_warmup");
+        AlterComputeGroupCommand command = new 
AlterComputeGroupCommand("test_group", properties);
+        Assertions.assertThrows(UserException.class, () -> 
command.validate(connectContext));
+    }
+
+    @Test
+    public void testValidateEmptyProperties() throws Exception {
+        runBefore();
+        Config.deploy_mode = "cloud";
+
+        AlterComputeGroupCommand command = new 
AlterComputeGroupCommand("test_group", null);
+        Assertions.assertThrows(UserException.class, () -> 
command.validate(connectContext));
+
+        AlterComputeGroupCommand command2 = new 
AlterComputeGroupCommand("test_group", new HashMap<>());
+        Assertions.assertThrows(UserException.class, () -> 
command2.validate(connectContext));
+    }
+
+    @Test
+    public void testValidateComputeGroupNotExist() throws Exception {
+        runBefore();
+        Config.deploy_mode = "cloud";
+
+        new Expectations() {
+            {
+                env.getCurrentSystemInfo();
+                minTimes = 0;
+                result = cloudSystemInfoService;
+
+                
cloudSystemInfoService.getComputeGroupByName("non_exist_group");
+                minTimes = 0;
+                result = null;
+            }
+        };
+
+        Map<String, String> properties = Maps.newHashMap();
+        properties.put("balance_type", "async_warmup");
+        AlterComputeGroupCommand command = new 
AlterComputeGroupCommand("non_exist_group", properties);
+        Assertions.assertThrows(UserException.class, () -> 
command.validate(connectContext));
+    }
+
+    @Test
+    public void testValidateVirtualComputeGroup() throws Exception {
+        runBefore();
+        Config.deploy_mode = "cloud";
+
+        new Expectations() {
+            {
+                env.getCurrentSystemInfo();
+                minTimes = 0;
+                result = cloudSystemInfoService;
+
+                cloudSystemInfoService.getComputeGroupByName("virtual_group");
+                minTimes = 0;
+                result = computeGroup;
+
+                computeGroup.isVirtual();
+                minTimes = 0;
+                result = true;
+            }
+        };
+
+        Map<String, String> properties = Maps.newHashMap();
+        properties.put("balance_type", "async_warmup");
+        AlterComputeGroupCommand command = new 
AlterComputeGroupCommand("virtual_group", properties);
+        Assertions.assertThrows(UserException.class, () -> 
command.validate(connectContext));
+    }
+
+    @Test
+    public void testValidateInvalidProperties() throws Exception {
+        runBefore();
+        Config.deploy_mode = "cloud";
+
+        new Expectations() {
+            {
+                env.getCurrentSystemInfo();
+                minTimes = 0;
+                result = cloudSystemInfoService;
+
+                cloudSystemInfoService.getComputeGroupByName("test_group");
+                minTimes = 0;
+                result = computeGroup;
+
+                computeGroup.isVirtual();
+                minTimes = 0;
+                result = false;
+
+                computeGroup.checkProperties((Map<String, String>) any);
+                minTimes = 0;
+                result = new DdlException("Invalid property");
+            }
+        };
+
+        Map<String, String> properties = Maps.newHashMap();
+        properties.put("invalid_property", "invalid_value");
+        AlterComputeGroupCommand command = new 
AlterComputeGroupCommand("test_group", properties);
+        Assertions.assertThrows(UserException.class, () -> 
command.validate(connectContext));
+    }
+
+    @Test
+    public void testValidateSuccess() throws Exception {
+        runBefore();
+        Config.deploy_mode = "cloud";
+
+        new Expectations() {
+            {
+                env.getCurrentSystemInfo();
+                minTimes = 0;
+                result = cloudSystemInfoService;
+
+                cloudSystemInfoService.getComputeGroupByName("test_group");
+                minTimes = 0;
+                result = computeGroup;
+
+                computeGroup.isVirtual();
+                minTimes = 0;
+                result = false;
+            }
+        };
+
+        new Expectations() {
+            {
+                computeGroup.checkProperties((Map<String, String>) any);
+                minTimes = 0;
+
+                computeGroup.modifyProperties((Map<String, String>) any);
+                minTimes = 0;
+            }
+        };
+
+        Map<String, String> properties = Maps.newHashMap();
+        properties.put("balance_type", "async_warmup");
+        properties.put("balance_warm_up_task_timeout", "300");
+        AlterComputeGroupCommand command = new 
AlterComputeGroupCommand("test_group", properties);
+        Assertions.assertDoesNotThrow(() -> command.validate(connectContext));
+    }
+}
diff --git 
a/fe/fe-core/src/test/java/org/apache/doris/nereids/trees/plans/commands/ShowComputeGroupTest.java
 
b/fe/fe-core/src/test/java/org/apache/doris/nereids/trees/plans/commands/ShowComputeGroupTest.java
new file mode 100644
index 00000000000..06588301872
--- /dev/null
+++ 
b/fe/fe-core/src/test/java/org/apache/doris/nereids/trees/plans/commands/ShowComputeGroupTest.java
@@ -0,0 +1,91 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.nereids.trees.plans.commands;
+
+import org.apache.doris.catalog.Column;
+import org.apache.doris.common.AnalysisException;
+import org.apache.doris.common.Config;
+import org.apache.doris.qe.ShowResultSetMetaData;
+import org.apache.doris.utframe.TestWithFeService;
+
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Test;
+
+import java.util.List;
+import java.util.stream.Collectors;
+
+public class ShowComputeGroupTest extends TestWithFeService {
+    @Override
+    protected void runBeforeAll() throws Exception {
+        createDatabase("test");
+    }
+
+    @Test
+    public void testShowComputeGroupsInCloudMode() throws Exception {
+        Config.deploy_mode = "cloud";
+        ShowClustersCommand command = new ShowClustersCommand(true);
+        ShowResultSetMetaData metaData = command.getMetaData();
+        Assertions.assertNotNull(metaData);
+        List<String> columnNames = metaData.getColumns().stream()
+                .map(Column::getName)
+                .collect(Collectors.toList());
+        Assertions.assertEquals(7, columnNames.size());
+        Assertions.assertEquals("Name", columnNames.get(0));
+        Assertions.assertEquals("IsCurrent", columnNames.get(1));
+        Assertions.assertEquals("Users", columnNames.get(2));
+        Assertions.assertEquals("BackendNum", columnNames.get(3));
+        Assertions.assertEquals("SubComputeGroups", columnNames.get(4));
+        Assertions.assertEquals("Policy", columnNames.get(5));
+        Assertions.assertEquals("Properties", columnNames.get(6));
+    }
+
+    @Test
+    public void testShowComputeGroupsInNonCloudMode() throws Exception {
+        Config.deploy_mode = "not-cloud";
+        ShowClustersCommand command = new ShowClustersCommand(true);
+        Assertions.assertThrows(AnalysisException.class, () -> {
+            command.doRun(connectContext, null);
+        });
+    }
+
+    @Test
+    public void testShowClustersInCloudMode() throws Exception {
+        ShowClustersCommand command = new ShowClustersCommand(false);
+        ShowResultSetMetaData metaData = command.getMetaData();
+        Assertions.assertNotNull(metaData);
+        List<String> columnNames = metaData.getColumns().stream()
+                .map(Column::getName).collect(Collectors.toList());
+        Assertions.assertEquals(7, columnNames.size());
+        Assertions.assertEquals("cluster", columnNames.get(0));
+        Assertions.assertEquals("is_current", columnNames.get(1));
+        Assertions.assertEquals("users", columnNames.get(2));
+        Assertions.assertEquals("backend_num", columnNames.get(3));
+        Assertions.assertEquals("sub_clusters", columnNames.get(4));
+        Assertions.assertEquals("policy", columnNames.get(5));
+        Assertions.assertEquals("properties", columnNames.get(6));
+    }
+
+    @Test
+    public void testShowClustersInNonCloudMode() throws Exception {
+        Config.deploy_mode = "not-cloud";
+        ShowClustersCommand command = new ShowClustersCommand(false);
+        Assertions.assertThrows(AnalysisException.class, () -> {
+            command.doRun(connectContext, null);
+        });
+    }
+}
diff --git a/gensrc/proto/cloud.proto b/gensrc/proto/cloud.proto
index 8e5794f4787..41322826eb2 100644
--- a/gensrc/proto/cloud.proto
+++ b/gensrc/proto/cloud.proto
@@ -182,6 +182,7 @@ message ClusterPB {
     optional int64 mtime = 11;
     repeated string cluster_names = 12; // clusters in virtual cluster
     optional ClusterPolicy cluster_policy = 13; // virtual cluster policy
+    map<string, string> properties = 14; // clsuter additional properties
 }
 
 message NodeInfoPB {
@@ -1410,6 +1411,7 @@ message AlterClusterRequest {
         UPDATE_CLUSTER_ENDPOINT = 9;
         SET_CLUSTER_STATUS = 10;
         ALTER_VCLUSTER_INFO = 11;
+        ALTER_PROPERTIES = 12;
     }
     optional string instance_id = 1;
     optional string cloud_unique_id = 2; // For auth
diff --git 
a/regression-test/suites/cloud_p0/balance/test_alter_compute_group_properties.groovy
 
b/regression-test/suites/cloud_p0/balance/test_alter_compute_group_properties.groovy
new file mode 100644
index 00000000000..e0db0c258e7
--- /dev/null
+++ 
b/regression-test/suites/cloud_p0/balance/test_alter_compute_group_properties.groovy
@@ -0,0 +1,92 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+import org.apache.doris.regression.suite.ClusterOptions
+import org.codehaus.groovy.runtime.IOGroovyMethods
+
+suite('test_alter_compute_group_properties', 'docker') {
+    if (!isCloudMode()) {
+        return;
+    }
+    def options = new ClusterOptions()
+    options.feConfigs += [
+        'cloud_cluster_check_interval_second=1',
+        'cloud_tablet_rebalancer_interval_second=1',
+        'sys_log_verbose_modules=org',
+    ]
+    options.setFeNum(1)
+    options.setBeNum(1)
+    options.cloudMode = true
+    options.enableDebugPoints()
+
+    def findComputeGroup = { clusterName ->
+        def showComputeGroups = sql_return_maparray """SHOW COMPUTE GROUPS"""
+        log.info("SHOW COMPUTE GROUPS result: {}", showComputeGroups)
+        showComputeGroups.find { it.Name == clusterName }
+    }
+
+    def findCluster = { clusterName ->
+        def showCg = sql_return_maparray """SHOW CLUSTERS"""
+        log.info("SHOW CLUSTERS result: {}", showCg)
+        showCg.find { it.cluster == clusterName }
+    }
+
+    docker(options) {
+        String clusterName = "compute_cluster"
+        def showComputeGroup = findComputeGroup(clusterName)
+        def showclusters = findCluster(clusterName)
+        
assertTrue(showComputeGroup.Properties.contains('balance_type=async_warmup, 
balance_warm_up_task_timeout=300'))
+        
assertTrue(showclusters.properties.contains('balance_type=async_warmup, 
balance_warm_up_task_timeout=300'))
+        sql """ALTER COMPUTE GROUP $clusterName PROPERTIES 
('balance_type'='without_warmup')"""
+        sleep(3 * 1000)
+        showComputeGroup = findComputeGroup(clusterName)
+        showclusters = findCluster(clusterName)
+        
assertTrue(showComputeGroup.Properties.contains('balance_type=without_warmup'))
+        
assertTrue(showclusters.properties.contains('balance_type=without_warmup'))
+        try {
+            //  errCode = 2, detailMessage = Property 
balance_warm_up_task_timeout cannot be set when current balance_type is 
without_warmup. Only async_warmup type supports timeout setting.
+            sql """ALTER COMPUTE GROUP $clusterName PROPERTIES 
('balance_warm_up_task_timeout'='6000')"""
+        } catch (Exception e) {
+            logger.info("exception: {}", e.getMessage())
+            assertTrue(e.getMessage().contains("Property 
balance_warm_up_task_timeout cannot be set when current"))
+        } 
+        sql """ALTER COMPUTE GROUP $clusterName PROPERTIES 
('balance_type'='async_warmup', 'balance_warm_up_task_timeout'='6000')"""
+        sleep(3 * 1000)
+        showComputeGroup = findComputeGroup(clusterName)
+        showclusters = findCluster(clusterName)
+        
assertTrue(showComputeGroup.Properties.contains('balance_type=async_warmup, 
balance_warm_up_task_timeout=6000'))
+        
assertTrue(showclusters.properties.contains('balance_type=async_warmup, 
balance_warm_up_task_timeout=6000'))
+        sql """ALTER COMPUTE GROUP $clusterName PROPERTIES 
('balance_type'='without_warmup')"""
+        sleep(3 * 1000)
+        showComputeGroup = findComputeGroup(clusterName)
+        showclusters = findCluster(clusterName)
+        
assertTrue(showComputeGroup.Properties.contains('balance_type=without_warmup'))
+        
assertTrue(showclusters.properties.contains('balance_type=without_warmup'))
+        sql """ALTER COMPUTE GROUP $clusterName PROPERTIES 
('balance_type'='async_warmup')"""
+        sleep(3 * 1000)
+        showComputeGroup = findComputeGroup(clusterName)
+        showclusters = findCluster(clusterName)
+        
assertTrue(showComputeGroup.Properties.contains('balance_type=async_warmup, 
balance_warm_up_task_timeout=300'))
+        
assertTrue(showclusters.properties.contains('balance_type=async_warmup, 
balance_warm_up_task_timeout=300'))
+        sql """ALTER COMPUTE GROUP $clusterName PROPERTIES 
('balance_type'='sync_warmup')"""
+        sleep(3 * 1000)
+        showComputeGroup = findComputeGroup(clusterName)
+        showclusters = findCluster(clusterName)
+        
assertTrue(showComputeGroup.Properties.contains('balance_type=sync_warmup'))
+        
assertTrue(showclusters.properties.contains('balance_type=sync_warmup'))
+    }
+}
diff --git 
a/regression-test/suites/cloud_p0/balance/test_balance_use_compute_group_properties.groovy
 
b/regression-test/suites/cloud_p0/balance/test_balance_use_compute_group_properties.groovy
new file mode 100644
index 00000000000..f96119ec29b
--- /dev/null
+++ 
b/regression-test/suites/cloud_p0/balance/test_balance_use_compute_group_properties.groovy
@@ -0,0 +1,212 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+import org.apache.doris.regression.suite.ClusterOptions
+
+
+suite('test_balance_use_compute_group_properties', 'docker') {
+    if (!isCloudMode()) {
+        return;
+    }
+    def options = new ClusterOptions()
+    options.feConfigs += [
+        'cloud_cluster_check_interval_second=1',
+        'cloud_tablet_rebalancer_interval_second=1',
+        'sys_log_verbose_modules=org',
+        'heartbeat_interval_second=1',
+        'rehash_tablet_after_be_dead_seconds=3600',
+        'cloud_warm_up_for_rebalance_type=sync_warmup',
+        'cloud_pre_heating_time_limit_sec=30'
+    ]
+    options.beConfigs += [
+        'report_tablet_interval_seconds=1',
+        'schedule_sync_tablets_interval_s=18000',
+        'disable_auto_compaction=true',
+        'sys_log_verbose_modules=*'
+    ]
+    options.setFeNum(1)
+    options.setBeNum(1)
+    options.cloudMode = true
+    options.enableDebugPoints()
+
+    def mergeDirs = { base, add ->
+        base + add.collectEntries { host, hashFiles ->
+            [(host): base[host] ? (base[host] + hashFiles) : hashFiles]
+        }
+    }
+
+    def global_config_cluster = "compute_cluster"
+    def without_warmup_cluster = "without_warmup"
+    def async_warmup_cluster = "async_warmup"
+    def sync_warmup_cluster = "sync_warmup"
+
+    def testCase = { table -> 
+        def ms = cluster.getAllMetaservices().get(0)
+        def msHttpPort = ms.host + ":" + ms.httpPort
+
+        // alter each cluster different balance type
+        sql """ALTER COMPUTE GROUP $without_warmup_cluster PROPERTIES 
('balance_type'='without_warmup')"""
+        sql """ALTER COMPUTE GROUP $async_warmup_cluster PROPERTIES 
('balance_type'='async_warmup', 'balance_warm_up_task_timeout'='10')"""
+        sql """ALTER COMPUTE GROUP $sync_warmup_cluster PROPERTIES 
('balance_type'='sync_warmup')"""
+
+        sql """CREATE TABLE $table (
+            `k1` int(11) NULL,
+            `v1` VARCHAR(2048)
+            )
+            DUPLICATE KEY(`k1`)
+            COMMENT 'OLAP'
+            DISTRIBUTED BY HASH(`k1`) BUCKETS 2
+            PROPERTIES (
+            "replication_num"="1"
+            );
+        """
+        sql """
+            insert into $table values (10, '1'), (20, '2')
+        """
+        sql """
+            insert into $table values (30, '3'), (40, '4')
+        """
+
+        def beforeBalanceEveryClusterCache = [:]
+
+        def clusterNameToBeIdx = [:]
+        clusterNameToBeIdx[global_config_cluster] = [1]
+        clusterNameToBeIdx[without_warmup_cluster] = [2]
+        clusterNameToBeIdx[async_warmup_cluster] = [3]
+        clusterNameToBeIdx[sync_warmup_cluster] = [4]
+
+        // generate primary tablet in each cluster
+        for (clusterName in [global_config_cluster, without_warmup_cluster, 
async_warmup_cluster, sync_warmup_cluster]) {
+            sql """ use @$clusterName """
+            sql """ select * from $table """
+            def beIdxs = clusterNameToBeIdx[clusterName]
+            def bes = []
+            beIdxs.each { beIdx ->
+                def be = cluster.getBeByIndex(beIdx)
+                bes << be
+            }
+            logger.info("clusterName {} be idxs {}, bes {}", clusterName, 
beIdxs, bes)
+
+            // before add be
+            def beforeGetFromFe = getTabletAndBeHostFromFe(table)
+            def beforeGetFromBe = getTabletAndBeHostFromBe(bes)
+            logger.info("before add be fe tablets {}, be tablets {}", 
beforeGetFromFe, beforeGetFromBe)
+            // version 2
+            def beforeCacheDirVersion2 = 
getTabletFileCacheDirFromBe(msHttpPort, table, 2)
+            logger.info("cache dir version 2 {}", beforeCacheDirVersion2)
+            // version 3
+            def beforeCacheDirVersion3 = 
getTabletFileCacheDirFromBe(msHttpPort, table, 3)
+            logger.info("cache dir version 3 {}", beforeCacheDirVersion3)
+
+            def beforeWarmUpResult = sql_return_maparray """ADMIN SHOW REPLICA 
DISTRIBUTION FROM $table"""
+            logger.info("before warm up result {}", beforeWarmUpResult)
+            def merged23CacheDir = [beforeCacheDirVersion2, 
beforeCacheDirVersion3]
+                .inject([:]) { acc, m -> mergeDirs(acc, m) }
+            beforeBalanceEveryClusterCache[clusterName] = [beforeGetFromFe, 
beforeGetFromBe, merged23CacheDir]
+        }
+        logger.info("before balance every cluster cache {}", 
beforeBalanceEveryClusterCache)
+
+        // disable cloud balance
+        setFeConfig('enable_cloud_multi_replica', true)
+        cluster.addBackend(1, global_config_cluster)
+        cluster.addBackend(1, without_warmup_cluster)
+        cluster.addBackend(1, async_warmup_cluster)
+        cluster.addBackend(1, sync_warmup_cluster)
+        
GetDebugPoint().enableDebugPointForAllBEs("CloudBackendService.check_warm_up_cache_async.return_task_false")
+        setFeConfig('enable_cloud_multi_replica', false)
+
+        clusterNameToBeIdx[global_config_cluster] = [1, 5]
+        clusterNameToBeIdx[without_warmup_cluster] = [2, 6]
+        clusterNameToBeIdx[async_warmup_cluster] = [3, 7]
+        clusterNameToBeIdx[sync_warmup_cluster] = [4, 8]
+        
+        // sleep 11s, wait balance
+        // and sync_warmup cluster task 10s timeout
+        sleep(11 * 1000)
+
+        def afterBalanceEveryClusterCache = [:]
+
+        for (clusterName in [global_config_cluster, without_warmup_cluster, 
async_warmup_cluster, sync_warmup_cluster]) {
+            sql """ use @$clusterName """
+            sql """ select * from $table """
+            def beIdxs = clusterNameToBeIdx[clusterName]
+            def bes = []
+            beIdxs.each { beIdx ->
+                def be = cluster.getBeByIndex(beIdx)
+                bes << be
+            }
+            logger.info("after add be clusterName {} be idxs {}, bes {}", 
clusterName, beIdxs, bes)
+
+            // after add be
+            def afterGetFromFe = getTabletAndBeHostFromFe(table)
+            def afterGetFromBe = getTabletAndBeHostFromBe(bes)
+            // version 2
+            def afterCacheDirVersion2 = 
getTabletFileCacheDirFromBe(msHttpPort, table, 2)
+            logger.info("cache dir version 2 {}", afterCacheDirVersion2)
+            // version 3
+            def afterCacheDirVersion3 = 
getTabletFileCacheDirFromBe(msHttpPort, table, 3)
+            logger.info("cache dir version 3 {}", afterCacheDirVersion3)
+            def merged23CacheDir = [afterCacheDirVersion2, 
afterCacheDirVersion3]
+                .inject([:]) { acc, m -> mergeDirs(acc, m) }
+            afterBalanceEveryClusterCache[clusterName] = [afterGetFromFe, 
afterGetFromBe, merged23CacheDir]
+            logger.info("after add be clusterName {} fe tablets {}, be tablets 
{}, cache dir {}", clusterName, afterGetFromFe, afterGetFromBe, 
merged23CacheDir)
+        }
+        logger.info("after add be balance every cluster cache {}", 
afterBalanceEveryClusterCache)
+
+        // assert first map keys
+        def assertFirstMapKeys = { clusterRet, expectedEqual ->
+            def firstMap = clusterRet[0]
+            def keys = firstMap.keySet().toList()
+            if (expectedEqual) {
+                assert firstMap[keys[0]] == firstMap[keys[1]]
+            } else {
+                assert firstMap[keys[0]] != firstMap[keys[1]]
+            }
+        }
+
+        // check afterBalanceEveryClusterCache
+        // fe config cloud_warm_up_for_rebalance_type=sync_warmup
+        def global_config_cluster_ret = 
afterBalanceEveryClusterCache[global_config_cluster]
+        logger.info("global_config_cluster_ret {}", global_config_cluster_ret)
+        // fe tablets not changed
+        assertFirstMapKeys(global_config_cluster_ret, true)
+
+        def without_warmup_cluster_ret = 
afterBalanceEveryClusterCache[without_warmup_cluster]
+        logger.info("without_warmup_cluster_ret {}", 
without_warmup_cluster_ret)
+        // fe tablets has changed
+        assertFirstMapKeys(without_warmup_cluster_ret, false)
+
+        def async_warmup_cluster_ret = 
afterBalanceEveryClusterCache[async_warmup_cluster]
+        logger.info("async_warmup_cluster_ret {}", async_warmup_cluster_ret)
+        // fe tablets has changed, due to task timeout
+        assertFirstMapKeys(async_warmup_cluster_ret, false)
+
+        def sync_warmup_cluster_ret = 
afterBalanceEveryClusterCache[sync_warmup_cluster]
+        logger.info("sync_warmup_cluster_ret {}", sync_warmup_cluster_ret)
+        // fe tablets not changed
+        assertFirstMapKeys(sync_warmup_cluster_ret, true)
+
+        logger.info("success check after balance every cluster cache, 
cluster's balance type is worked")
+    }
+
+    docker(options) {
+        cluster.addBackend(1, without_warmup_cluster)
+        cluster.addBackend(1, async_warmup_cluster)
+        cluster.addBackend(1, sync_warmup_cluster)
+        testCase("test_balance_warm_up_tbl")
+    }
+}
diff --git 
a/regression-test/suites/cloud_p0/balance/test_balance_warm_up.groovy 
b/regression-test/suites/cloud_p0/balance/test_balance_warm_up.groovy
index 7a18f22bb31..50476800d75 100644
--- a/regression-test/suites/cloud_p0/balance/test_balance_warm_up.groovy
+++ b/regression-test/suites/cloud_p0/balance/test_balance_warm_up.groovy
@@ -29,7 +29,7 @@ suite('test_balance_warm_up', 'docker') {
         'sys_log_verbose_modules=org',
         'heartbeat_interval_second=1',
         'rehash_tablet_after_be_dead_seconds=3600',
-        'enable_cloud_warm_up_for_rebalance=true'
+        'cloud_warm_up_for_rebalance_type=async_warmup'
     ]
     options.beConfigs += [
         'report_tablet_interval_seconds=1',
diff --git 
a/regression-test/suites/cloud_p0/balance/test_balance_warm_up_sync_global_config.groovy
 
b/regression-test/suites/cloud_p0/balance/test_balance_warm_up_sync_global_config.groovy
new file mode 100644
index 00000000000..19c252fba85
--- /dev/null
+++ 
b/regression-test/suites/cloud_p0/balance/test_balance_warm_up_sync_global_config.groovy
@@ -0,0 +1,179 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+import org.apache.doris.regression.suite.ClusterOptions
+
+
+suite('test_balance_warm_up_sync_cache', 'docker') {
+    if (!isCloudMode()) {
+        return;
+    }
+    def options = new ClusterOptions()
+    options.feConfigs += [
+        'cloud_cluster_check_interval_second=1',
+        'cloud_tablet_rebalancer_interval_second=1',
+        'sys_log_verbose_modules=org',
+        'heartbeat_interval_second=1',
+        'rehash_tablet_after_be_dead_seconds=3600',
+        'cloud_warm_up_for_rebalance_type=sync_warmup',
+        'cloud_pre_heating_time_limit_sec=30'
+    ]
+    options.beConfigs += [
+        'report_tablet_interval_seconds=1',
+        'schedule_sync_tablets_interval_s=18000',
+        'disable_auto_compaction=true',
+        'sys_log_verbose_modules=*'
+    ]
+    options.setFeNum(1)
+    options.setBeNum(1)
+    options.cloudMode = true
+    options.enableDebugPoints()
+
+    def mergeDirs = { base, add ->
+        base + add.collectEntries { host, hashFiles ->
+            [(host): base[host] ? (base[host] + hashFiles) : hashFiles]
+        }
+    }
+    
+    def testCase = { table -> 
+        def ms = cluster.getAllMetaservices().get(0)
+        def msHttpPort = ms.host + ":" + ms.httpPort
+        sql """CREATE TABLE $table (
+            `k1` int(11) NULL,
+            `v1` VARCHAR(2048)
+            )
+            DUPLICATE KEY(`k1`)
+            COMMENT 'OLAP'
+            DISTRIBUTED BY HASH(`k1`) BUCKETS 2
+            PROPERTIES (
+            "replication_num"="1"
+            );
+        """
+        sql """
+            insert into $table values (10, '1'), (20, '2')
+        """
+        sql """
+            insert into $table values (30, '3'), (40, '4')
+        """
+
+        // before add be
+        def beforeGetFromFe = getTabletAndBeHostFromFe(table)
+        def beforeGetFromBe = 
getTabletAndBeHostFromBe(cluster.getAllBackends())
+        // version 2
+        def beforeCacheDirVersion2 = getTabletFileCacheDirFromBe(msHttpPort, 
table, 2)
+        logger.info("cache dir version 2 {}", beforeCacheDirVersion2)
+        // version 3
+        def beforeCacheDirVersion3 = getTabletFileCacheDirFromBe(msHttpPort, 
table, 3)
+        logger.info("cache dir version 3 {}", beforeCacheDirVersion3)
+
+        def beforeWarmUpResult = sql_return_maparray """ADMIN SHOW REPLICA 
DISTRIBUTION FROM $table"""
+        logger.info("before warm up result {}", beforeWarmUpResult)
+
+        // disable cloud balance
+        setFeConfig('enable_cloud_multi_replica', true)
+        cluster.addBackend(1, "compute_cluster")
+        
GetDebugPoint().enableDebugPointForAllBEs("CloudBackendService.check_warm_up_cache_async.return_task_false")
+        setFeConfig('enable_cloud_multi_replica', false)
+
+        sleep(5 * 1000)
+        sql """
+            insert into $table values (50, '4'), (60, '6')
+        """
+        // version 4, new rs after warm up task
+        def beforeCacheDirVersion4 = getTabletFileCacheDirFromBe(msHttpPort, 
table, 4)
+        logger.info("cache dir version 4 {}", beforeCacheDirVersion4)
+        def afterMerged23CacheDir = [beforeCacheDirVersion2, 
beforeCacheDirVersion3, beforeCacheDirVersion4]
+            .inject([:]) { acc, m -> mergeDirs(acc, m) }
+        logger.info("after version 4 fe tablets {}, be tablets {}, cache dir 
{}", beforeGetFromFe, beforeGetFromBe, afterMerged23CacheDir)
+
+        // after cloud_pre_heating_time_limit_sec = 30s 
+        sleep(40 * 1000)
+        // check tablet still in old be
+        def beforeWarmUpTaskOkDis = sql_return_maparray """ADMIN SHOW REPLICA 
DISTRIBUTION FROM $table"""
+        logger.info("before warm up dis result {}", beforeWarmUpTaskOkDis)
+        assert beforeWarmUpTaskOkDis.any { row ->
+            Integer.valueOf((String) row.ReplicaNum) == 2
+        }
+
+        
GetDebugPoint().disableDebugPointForAllBEs("CloudBackendService.check_warm_up_cache_async.return_task_false")
+        def oldBe = sql_return_maparray('show backends').get(0)
+        def newAddBe = sql_return_maparray('show backends').get(1)
+        // balance tablet
+        awaitUntil(500) {
+            def afterWarmUpTaskOkResult = sql_return_maparray """ADMIN SHOW 
REPLICA DISTRIBUTION FROM $table"""
+            logger.info("after warm up result {}", afterWarmUpTaskOkResult)
+            afterWarmUpTaskOkResult.any { row -> 
+                Integer.valueOf((String) row.ReplicaNum) == 1
+            }
+        }
+
+        // from be1 -> be2, warm up this tablet
+        // after add be
+        def afterGetFromFe = getTabletAndBeHostFromFe(table)
+        def afterGetFromBe = getTabletAndBeHostFromBe(cluster.getAllBackends())
+        // version 2
+        def afterCacheDirVersion2 = getTabletFileCacheDirFromBe(msHttpPort, 
table, 2)
+        logger.info("after cache dir version 2 {}", afterCacheDirVersion2)
+        // version 3
+        def afterCacheDirVersion3 = getTabletFileCacheDirFromBe(msHttpPort, 
table, 3)
+        logger.info("after cache dir version 3 {}", afterCacheDirVersion3)
+        sleep(5 * 1000)
+        // version 4
+        def afterCacheDirVersion4 = getTabletFileCacheDirFromBe(msHttpPort, 
table, 4)
+        logger.info("after cache dir version 4 {}", afterCacheDirVersion4)
+
+        def afterMergedCacheDir = [afterCacheDirVersion2, 
afterCacheDirVersion3, afterCacheDirVersion4]
+            .inject([:]) { acc, m -> mergeDirs(acc, m) }
+
+        logger.info("after fe tablets {}, be tablets {}, cache dir {}", 
afterGetFromFe, afterGetFromBe, afterMergedCacheDir)
+        def newAddBeCacheDir = afterMergedCacheDir.get(newAddBe.Host)
+        logger.info("new add be cache dir {}", newAddBeCacheDir)
+        assert newAddBeCacheDir.size() != 0
+        assert 
afterMerged23CacheDir[oldBe.Host].containsAll(afterMergedCacheDir[newAddBe.Host])
+
+        def be = cluster.getBeByBackendId(newAddBe.BackendId.toLong())
+        def dataPath = new File("${be.path}/storage/file_cache")
+        logger.info("Checking file_cache directory: {}", dataPath.absolutePath)
+        logger.info("Directory exists: {}", dataPath.exists())
+        
+        def subDirs = []
+        
+        def collectDirs
+        collectDirs = { File dir ->
+            if (dir.exists()) {
+                dir.eachDir { subDir ->
+                    logger.info("Found subdir: {}", subDir.name)
+                    subDirs << subDir.name
+                    collectDirs(subDir) 
+                }
+            }
+        }
+        
+        collectDirs(dataPath)
+        logger.info("BE {} file_cache subdirs: {}", newAddBe.Host, subDirs)
+
+        newAddBeCacheDir.each { hashFile ->
+            assertTrue(subDirs.any { subDir -> subDir.startsWith(hashFile) }, 
+            "Expected cache file pattern ${hashFile} not found in BE 
${newAddBe.Host}'s file_cache directory. " + 
+            "Available subdirs: ${subDirs}")
+        }
+    }
+
+    docker(options) {
+        testCase("test_balance_warm_up_sync_tbl")
+    }
+}
diff --git 
a/regression-test/suites/cloud_p0/balance/test_balance_warm_up_task_abnormal.groovy
 
b/regression-test/suites/cloud_p0/balance/test_balance_warm_up_task_abnormal.groovy
new file mode 100644
index 00000000000..0938126d61c
--- /dev/null
+++ 
b/regression-test/suites/cloud_p0/balance/test_balance_warm_up_task_abnormal.groovy
@@ -0,0 +1,121 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+import org.apache.doris.regression.suite.ClusterOptions
+
+
+suite('test_balance_warm_up_task_abnormal', 'docker') {
+    if (!isCloudMode()) {
+        return;
+    }
+    def options = new ClusterOptions()
+    options.feConfigs += [
+        'cloud_cluster_check_interval_second=1',
+        'cloud_tablet_rebalancer_interval_second=1',
+        'sys_log_verbose_modules=org',
+        'heartbeat_interval_second=1',
+        'rehash_tablet_after_be_dead_seconds=3600',
+        'cloud_warm_up_for_rebalance_type=sync_warmup',
+        'cloud_pre_heating_time_limit_sec=10'
+    ]
+    options.beConfigs += [
+        'report_tablet_interval_seconds=1',
+        'schedule_sync_tablets_interval_s=18000',
+        'disable_auto_compaction=true',
+        'sys_log_verbose_modules=*'
+    ]
+    options.setFeNum(1)
+    options.setBeNum(1)
+    options.cloudMode = true
+    options.enableDebugPoints()
+
+    def mergeDirs = { base, add ->
+        base + add.collectEntries { host, hashFiles ->
+            [(host): base[host] ? (base[host] + hashFiles) : hashFiles]
+        }
+    }
+    
+    def testCase = { table -> 
+        def ms = cluster.getAllMetaservices().get(0)
+        def msHttpPort = ms.host + ":" + ms.httpPort
+        sql """CREATE TABLE $table (
+            `k1` int(11) NULL,
+            `v1` VARCHAR(2048)
+            )
+            DUPLICATE KEY(`k1`)
+            COMMENT 'OLAP'
+            DISTRIBUTED BY HASH(`k1`) BUCKETS 2
+            PROPERTIES (
+            "replication_num"="1"
+            );
+        """
+        sql """
+            insert into $table values (10, '1'), (20, '2')
+        """
+        sql """
+            insert into $table values (30, '3'), (40, '4')
+        """
+
+        // before add be
+        def beforeGetFromFe = getTabletAndBeHostFromFe(table)
+        def beforeGetFromBe = 
getTabletAndBeHostFromBe(cluster.getAllBackends())
+        // version 2
+        def beforeCacheDirVersion2 = getTabletFileCacheDirFromBe(msHttpPort, 
table, 2)
+        logger.info("cache dir version 2 {}", beforeCacheDirVersion2)
+        // version 3
+        def beforeCacheDirVersion3 = getTabletFileCacheDirFromBe(msHttpPort, 
table, 3)
+        logger.info("cache dir version 3 {}", beforeCacheDirVersion3)
+
+        def beforeWarmUpResult = sql_return_maparray """ADMIN SHOW REPLICA 
DISTRIBUTION FROM $table"""
+        logger.info("before warm up result {}", beforeWarmUpResult)
+        assert beforeWarmUpResult.any { row ->
+            Integer.valueOf((String) row.ReplicaNum) == 2
+        }
+
+        // disable cloud balance
+        setFeConfig('enable_cloud_multi_replica', true)
+        cluster.addBackend(1, "compute_cluster")
+        // sync warm up task always return false
+        
GetDebugPoint().enableDebugPointForAllBEs("CloudBackendService.check_warm_up_cache_async.return_task_false")
+        setFeConfig('enable_cloud_multi_replica', false)
+
+        // wait for some time to make sure warm up task is processed, but 
mapping is not changed
+        sleep(15 * 1000)
+        // check mapping is not changed
+        def afterAddBeResult = sql_return_maparray """ADMIN SHOW REPLICA 
DISTRIBUTION FROM $table"""
+        logger.info("after add be result {}", afterAddBeResult)
+        // two be, but 2 replica num is still mapping in old be
+        assert afterAddBeResult.any { row ->
+            Integer.valueOf((String) row.ReplicaNum) == 2
+        }
+
+        // test recover from abnormal
+        sql """ALTER COMPUTE GROUP compute_cluster PROPERTIES 
('balance_type'='without_warmup')"""
+        sleep(5 * 1000)
+
+        def afterAlterResult = sql_return_maparray """ADMIN SHOW REPLICA 
DISTRIBUTION FROM $table"""
+        logger.info("after alter balance policy result {}", afterAlterResult)
+        // now mapping is changed to 1 replica in each be
+        assert afterAlterResult.any { row ->
+            Integer.valueOf((String) row.ReplicaNum) == 1
+        }
+    }
+
+    docker(options) {
+        testCase("test_balance_warm_up_task_abnormal_tbl")
+    }
+}
diff --git 
a/regression-test/suites/cloud_p0/balance/test_peer_read_async_warmup.groovy 
b/regression-test/suites/cloud_p0/balance/test_peer_read_async_warmup.groovy
new file mode 100644
index 00000000000..8ce71606542
--- /dev/null
+++ b/regression-test/suites/cloud_p0/balance/test_peer_read_async_warmup.groovy
@@ -0,0 +1,169 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+import org.apache.doris.regression.suite.ClusterOptions
+
+
+suite('test_peer_read_async_warmup', 'docker') {
+    if (!isCloudMode()) {
+        return;
+    }
+    def options = new ClusterOptions()
+    options.feConfigs += [
+        'cloud_cluster_check_interval_second=1',
+        'cloud_tablet_rebalancer_interval_second=1',
+        'sys_log_verbose_modules=org',
+        'heartbeat_interval_second=1',
+        'rehash_tablet_after_be_dead_seconds=3600',
+        'cloud_warm_up_for_rebalance_type=peer_read_async_warmup',
+        // disable Auto Analysis Job Executor
+        'auto_check_statistics_in_minutes=60',
+    ]
+    options.beConfigs += [
+        'report_tablet_interval_seconds=1',
+        'schedule_sync_tablets_interval_s=18000',
+        'disable_auto_compaction=true',
+        'sys_log_verbose_modules=*',
+    ]
+    options.setFeNum(1)
+    options.setBeNum(1)
+    options.cloudMode = true
+    options.enableDebugPoints()
+    
+    def mergeDirs = { base, add ->
+        base + add.collectEntries { host, hashFiles ->
+            [(host): base[host] ? (base[host] + hashFiles) : hashFiles]
+        }
+    }
+
+    def getBrpcMetrics = {ip, port, name ->
+        def url = "http://${ip}:${port}/brpc_metrics";
+        def metrics = new URL(url).text
+        def matcher = metrics =~ ~"${name}\\s+(\\d+)"
+        if (matcher.find()) {
+            return matcher[0][1] as long
+        } else {
+            throw new RuntimeException("${name} not found for ${ip}:${port}")
+        }
+    }
+
+    def testCase = { table -> 
+        def ms = cluster.getAllMetaservices().get(0)
+        def msHttpPort = ms.host + ":" + ms.httpPort
+        sql """CREATE TABLE $table (
+            `k1` int(11) NULL,
+            `v1` VARCHAR(2048)
+            )
+            DUPLICATE KEY(`k1`)
+            COMMENT 'OLAP'
+            DISTRIBUTED BY HASH(`k1`) BUCKETS 2
+            PROPERTIES (
+            "replication_num"="1"
+            );
+        """
+        sql """
+            insert into $table values (10, '1'), (20, '2')
+        """
+        sql """
+            insert into $table values (30, '3'), (40, '4')
+        """
+
+        // before add be
+        def beforeGetFromFe = getTabletAndBeHostFromFe(table)
+        def beforeGetFromBe = 
getTabletAndBeHostFromBe(cluster.getAllBackends())
+        // version 2
+        def beforeCacheDirVersion2 = getTabletFileCacheDirFromBe(msHttpPort, 
table, 2)
+        logger.info("cache dir version 2 {}", beforeCacheDirVersion2)
+        // version 3
+        def beforeCacheDirVersion3 = getTabletFileCacheDirFromBe(msHttpPort, 
table, 3)
+        logger.info("cache dir version 3 {}", beforeCacheDirVersion3)
+
+        def beforeWarmUpResult = sql_return_maparray """ADMIN SHOW REPLICA 
DISTRIBUTION FROM $table"""
+        logger.info("before warm up result {}", beforeWarmUpResult)
+
+        // disable cloud balance
+        setFeConfig('enable_cloud_multi_replica', true)
+        cluster.addBackend(1, "compute_cluster")
+        
GetDebugPoint().enableDebugPointForAllBEs("FileCacheBlockDownloader::download_segment_file_sleep",
 [sleep_time: 50])
+        setFeConfig('enable_cloud_multi_replica', false)
+        awaitUntil(500) {
+            def afterRebalanceResult = sql_return_maparray """ADMIN SHOW 
REPLICA DISTRIBUTION FROM $table"""
+            logger.info("after rebalance result {}", afterRebalanceResult)
+            afterRebalanceResult.any { row -> 
+                Integer.valueOf((String) row.ReplicaNum) == 1
+            }
+        } 
+
+        sql """
+            insert into $table values (50, '4'), (60, '6')
+        """
+        // version 4, in new be, but not in old be
+        def beforeCacheDirVersion4 = getTabletFileCacheDirFromBe(msHttpPort, 
table, 4)
+        logger.info("cache dir version 4 {}", beforeCacheDirVersion4)
+
+        sql """
+            insert into $table values (70, '7'), (80, '8')
+        """
+        // version 5, in new be, but not in old be
+        def beforeCacheDirVersion5 = getTabletFileCacheDirFromBe(msHttpPort, 
table, 5)
+        logger.info("cache dir version 5 {}", beforeCacheDirVersion5)
+
+        def afterMerged2345CacheDir = [beforeCacheDirVersion2, 
beforeCacheDirVersion3, beforeCacheDirVersion4, beforeCacheDirVersion5]
+            .inject([:]) { acc, m -> mergeDirs(acc, m) }
+        logger.info("after version 2,3,4,5 fe tablets {}, be tablets {}, cache 
dir {}", beforeGetFromFe, beforeGetFromBe, afterMerged2345CacheDir)
+
+        def oldBe = sql_return_maparray('show backends').get(0)
+        def newAddBe = sql_return_maparray('show backends').get(1)
+
+        def newAddBeCacheDir = afterMerged2345CacheDir.get(newAddBe.Host)
+        logger.info("new add be cache dir {}", newAddBeCacheDir)
+        // version 4, 5
+        assertTrue(newAddBeCacheDir.size() == 2, "new add be should have 
version 4,5 cache file")
+        // warm up task blocked by debug point, so old be should not have 
version 4,5 cache file
+        
assertFalse(afterMerged2345CacheDir[oldBe.Host].containsAll(newAddBeCacheDir), 
"old be should not have version 4,5 cache file")
+
+        // The query triggers reading the file cache from the peer
+        profile("test_peer_read_async_warmup_profile") {
+            sql """ set enable_profile = true;"""
+            sql """ set profile_level = 2;"""
+            run {
+                sql """/* test_peer_read_async_warmup_profile */ select * from 
$table"""
+                sleep(1000)
+            }
+
+            check { profileString, exception ->
+                log.info(profileString)
+                // Use a regular expression to match the numeric value inside 
parentheses after "NumPeerIOTotal:"
+                def matcher = (profileString =~ /-  NumPeerIOTotal:\s+(\d+)/)
+                def total = 0
+                while (matcher.find()) {
+                    total += matcher.group(1).toInteger()
+                    logger.info("NumPeerIOTotal: {}", matcher.group(1))
+                }
+                assertTrue(total > 0)
+            } 
+        }
+
+        // peer read cache, so it should read version 2,3 cache file from old 
be, not s3
+        assertTrue(0 != getBrpcMetrics(newAddBe.Host, newAddBe.BrpcPort, 
"cached_remote_reader_peer_read"), "new add be should have peer read cache")
+        assertTrue(0 == getBrpcMetrics(newAddBe.Host, newAddBe.BrpcPort, 
"cached_remote_reader_s3_read"), "new add be should not have s3 read cache")
+    }
+
+    docker(options) {
+        testCase("test_peer_read_async_warmup_tbl")
+    }
+}
diff --git 
a/regression-test/suites/cloud_p0/multi_cluster/test_warmup_rebalance.groovy 
b/regression-test/suites/cloud_p0/balance/test_warmup_rebalance.groovy
similarity index 98%
rename from 
regression-test/suites/cloud_p0/multi_cluster/test_warmup_rebalance.groovy
rename to regression-test/suites/cloud_p0/balance/test_warmup_rebalance.groovy
index aa35a70e121..c0dc2f9747c 100644
--- a/regression-test/suites/cloud_p0/multi_cluster/test_warmup_rebalance.groovy
+++ b/regression-test/suites/cloud_p0/balance/test_warmup_rebalance.groovy
@@ -25,7 +25,7 @@ suite('test_warmup_rebalance_in_cloud', 'multi_cluster, 
docker') {
     def options = new ClusterOptions()
     options.feConfigs += [
         'cloud_cluster_check_interval_second=1',
-        'enable_cloud_warm_up_for_rebalance=true',
+        'cloud_warm_up_for_rebalance_type=async_warmup',
         'cloud_tablet_rebalancer_interval_second=1',
         'cloud_balance_tablet_percent_per_run=0.5',
         'sys_log_verbose_modules=org',


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to