This is an automated email from the ASF dual-hosted git repository.
yiguolei pushed a commit to branch branch-4.0
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-4.0 by this push:
new a17454c7e6b branch-4.0: [feature](cloud) Support balance sync warm up
#56164 (#57533)
a17454c7e6b is described below
commit a17454c7e6b7ea51e57463a926ba9b518dce5b38
Author: github-actions[bot]
<41898282+github-actions[bot]@users.noreply.github.com>
AuthorDate: Thu Nov 6 19:53:09 2025 +0800
branch-4.0: [feature](cloud) Support balance sync warm up #56164 (#57533)
Cherry-picked from #56164
Co-authored-by: deardeng <[email protected]>
---
be/src/io/cache/block_file_cache_downloader.cpp | 14 +-
cloud/src/meta-service/meta_service_resource.cpp | 27 +-
.../main/java/org/apache/doris/common/Config.java | 22 +-
.../antlr4/org/apache/doris/nereids/DorisParser.g4 | 2 +
.../doris/cloud/catalog/BalanceTypeEnum.java | 69 +++++
.../cloud/catalog/CloudInstanceStatusChecker.java | 53 ++++
.../doris/cloud/catalog/CloudTabletRebalancer.java | 179 +++++++++--
.../apache/doris/cloud/catalog/ComputeGroup.java | 153 +++++++++
.../doris/cloud/system/CloudSystemInfoService.java | 44 +++
.../doris/nereids/parser/LogicalPlanBuilder.java | 9 +
.../apache/doris/nereids/trees/plans/PlanType.java | 1 +
.../plans/commands/AlterComputeGroupCommand.java | 98 ++++++
.../trees/plans/commands/ShowClustersCommand.java | 19 +-
.../trees/plans/visitor/CommandVisitor.java | 5 +
.../doris/cloud/catalog/ComputeGroupTest.java | 341 +++++++++++++++++++++
.../commands/AlterComputeGroupCommandTest.java | 243 +++++++++++++++
.../trees/plans/commands/ShowComputeGroupTest.java | 91 ++++++
gensrc/proto/cloud.proto | 2 +
.../test_alter_compute_group_properties.groovy | 92 ++++++
...est_balance_use_compute_group_properties.groovy | 212 +++++++++++++
.../cloud_p0/balance/test_balance_warm_up.groovy | 2 +-
.../test_balance_warm_up_sync_global_config.groovy | 179 +++++++++++
.../test_balance_warm_up_task_abnormal.groovy | 121 ++++++++
.../balance/test_peer_read_async_warmup.groovy | 169 ++++++++++
.../test_warmup_rebalance.groovy | 2 +-
25 files changed, 2116 insertions(+), 33 deletions(-)
diff --git a/be/src/io/cache/block_file_cache_downloader.cpp
b/be/src/io/cache/block_file_cache_downloader.cpp
index f5a2a287f83..46ce90aafcc 100644
--- a/be/src/io/cache/block_file_cache_downloader.cpp
+++ b/be/src/io/cache/block_file_cache_downloader.cpp
@@ -27,6 +27,7 @@
#include <memory>
#include <mutex>
+#include <unordered_set>
#include <variant>
#include "cloud/cloud_tablet_mgr.h"
@@ -171,6 +172,7 @@ std::unordered_map<std::string, RowsetMetaSharedPtr>
snapshot_rs_metas(BaseTable
void FileCacheBlockDownloader::download_file_cache_block(
const DownloadTask::FileCacheBlockMetaVec& metas) {
+ std::unordered_set<int64_t> synced_tablets;
std::ranges::for_each(metas, [&](const FileCacheBlockMeta& meta) {
VLOG_DEBUG << "download_file_cache_block: start, tablet_id=" <<
meta.tablet_id()
<< ", rowset_id=" << meta.rowset_id() << ", segment_id=" <<
meta.segment_id()
@@ -183,12 +185,20 @@ void FileCacheBlockDownloader::download_file_cache_block(
} else {
tablet = std::move(res).value();
}
-
+ if (!synced_tablets.contains(meta.tablet_id())) {
+ auto st = tablet->sync_rowsets();
+ if (!st) {
+ // just log failed, try it best
+ LOG(WARNING) << "failed to sync rowsets: " << meta.tablet_id()
+ << " err msg: " << st.to_string();
+ }
+ synced_tablets.insert(meta.tablet_id());
+ }
auto id_to_rowset_meta_map = snapshot_rs_metas(tablet.get());
auto find_it = id_to_rowset_meta_map.find(meta.rowset_id());
if (find_it == id_to_rowset_meta_map.end()) {
LOG(WARNING) << "download_file_cache_block: tablet_id=" <<
meta.tablet_id()
- << "rowset_id not found, rowset_id=" <<
meta.rowset_id();
+ << " rowset_id not found, rowset_id=" <<
meta.rowset_id();
return;
}
diff --git a/cloud/src/meta-service/meta_service_resource.cpp
b/cloud/src/meta-service/meta_service_resource.cpp
index 6c8add49cc0..77a74abd521 100644
--- a/cloud/src/meta-service/meta_service_resource.cpp
+++ b/cloud/src/meta-service/meta_service_resource.cpp
@@ -2607,7 +2607,7 @@ void handle_set_cluster_status(const std::string&
instance_id, const ClusterInfo
});
}
-void handle_alter_vcluster_Info(const std::string& instance_id, const
ClusterInfo& cluster,
+void handle_alter_vcluster_info(const std::string& instance_id, const
ClusterInfo& cluster,
std::shared_ptr<ResourceManager> resource_mgr,
std::string& msg,
MetaServiceCode& code) {
msg = resource_mgr->update_cluster(
@@ -2694,6 +2694,26 @@ void handle_alter_vcluster_Info(const std::string&
instance_id, const ClusterInf
});
}
+void handle_alter_properties(const std::string& instance_id, const
ClusterInfo& cluster,
+ std::shared_ptr<ResourceManager> resource_mgr,
std::string& msg,
+ MetaServiceCode& code) {
+ msg = resource_mgr->update_cluster(
+ instance_id, cluster,
+ [&](const ClusterPB& i) { return i.cluster_id() ==
cluster.cluster.cluster_id(); },
+ [&](ClusterPB& c, std::vector<ClusterPB>&) {
+ std::string msg;
+ std::stringstream ss;
+ if (ClusterPB::COMPUTE != c.type()) {
+ code = MetaServiceCode::INVALID_ARGUMENT;
+ ss << "just support set COMPUTE cluster status";
+ msg = ss.str();
+ return msg;
+ }
+ *c.mutable_properties() = cluster.cluster.properties();
+ return msg;
+ });
+}
+
void MetaServiceImpl::alter_cluster(google::protobuf::RpcController*
controller,
const AlterClusterRequest* request,
AlterClusterResponse* response,
@@ -2778,7 +2798,10 @@ void
MetaServiceImpl::alter_cluster(google::protobuf::RpcController* controller,
handle_set_cluster_status(instance_id, cluster, resource_mgr(), msg,
code);
break;
case AlterClusterRequest::ALTER_VCLUSTER_INFO:
- handle_alter_vcluster_Info(instance_id, cluster, resource_mgr(), msg,
code);
+ handle_alter_vcluster_info(instance_id, cluster, resource_mgr(), msg,
code);
+ break;
+ case AlterClusterRequest::ALTER_PROPERTIES:
+ handle_alter_properties(instance_id, cluster, resource_mgr(), msg,
code);
break;
default:
code = MetaServiceCode::INVALID_ARGUMENT;
diff --git a/fe/fe-common/src/main/java/org/apache/doris/common/Config.java
b/fe/fe-common/src/main/java/org/apache/doris/common/Config.java
index 1b386f98588..8d943d5c7e7 100644
--- a/fe/fe-common/src/main/java/org/apache/doris/common/Config.java
+++ b/fe/fe-common/src/main/java/org/apache/doris/common/Config.java
@@ -3333,8 +3333,26 @@ public class Config extends ConfigBase {
@ConfField(mutable = true, masterOnly = true)
public static int cloud_min_balance_tablet_num_per_run = 2;
- @ConfField(mutable = true, masterOnly = true)
- public static boolean enable_cloud_warm_up_for_rebalance = true;
+ @ConfField(description = {"指定存算分离模式下所有Compute group的扩缩容预热方式。"
+ + "without_warmup: 直接修改tablet分片映射,首次读从S3拉取,均衡最快但性能波动最大;"
+ + "async_warmup: 异步预热,尽力而为拉取cache,均衡较快但可能cache miss;"
+ + "sync_warmup: 同步预热,确保cache迁移完成,均衡较慢但无cache miss;"
+ + "peer_read_async_warmup: 直接修改tablet分片映射,首次读从Peer
BE拉取,均衡最快可能会影响同计算组中其他BE性能。"
+ + "注意:此为全局FE配置,也可通过SQL(ALTER COMPUTE GROUP cg PROPERTIES)"
+ + "设置compute group维度的balance类型,compute group维度配置优先级更高",
+ "Specify the scaling and warming methods for all Compute groups in a
cloud mode. "
+ + "without_warmup: Directly modify shard mapping, first read from
S3,"
+ + "fastest re-balance but largest fluctuation; "
+ + "async_warmup: Asynchronous warmup, best-effort cache pulling, "
+ + "faster re-balance but possible cache miss; "
+ + "sync_warmup: Synchronous warmup, ensure cache migration
completion, "
+ + "slower re-balance but no cache miss; "
+ + "peer_read_async_warmup: Directly modify shard mapping, first
read from Peer BE, "
+ + "fastest re-balance but may affect other BEs in the same compute
group performance. "
+ + "Note: This is a global FE configuration, you can also use SQL
(ALTER COMPUTE GROUP cg PROPERTIES) "
+ + "to set balance type at compute group level, compute group level
configuration has higher priority"},
+ options = {"without_warmup", "async_warmup", "sync_warmup",
"peer_read_async_warmup"})
+ public static String cloud_warm_up_for_rebalance_type = "async_warmup";
@ConfField(mutable = true, masterOnly = false)
public static String security_checker_class_name = "";
diff --git a/fe/fe-core/src/main/antlr4/org/apache/doris/nereids/DorisParser.g4
b/fe/fe-core/src/main/antlr4/org/apache/doris/nereids/DorisParser.g4
index 14b02187173..38fe0e9247b 100644
--- a/fe/fe-core/src/main/antlr4/org/apache/doris/nereids/DorisParser.g4
+++ b/fe/fe-core/src/main/antlr4/org/apache/doris/nereids/DorisParser.g4
@@ -264,6 +264,8 @@ supportedAlterStatement
| ALTER STORAGE VAULT name=multipartIdentifier properties=propertyClause
#alterStorageVault
| ALTER WORKLOAD GROUP name=identifierOrText (FOR
computeGroup=identifierOrText)?
properties=propertyClause?
#alterWorkloadGroup
+ | ALTER COMPUTE GROUP name=identifierOrText
+ properties=propertyClause?
#alterComputeGroup
| ALTER CATALOG name=identifier SET PROPERTIES
LEFT_PAREN propertyItemList RIGHT_PAREN
#alterCatalogProperties
| ALTER WORKLOAD POLICY name=identifierOrText
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/cloud/catalog/BalanceTypeEnum.java
b/fe/fe-core/src/main/java/org/apache/doris/cloud/catalog/BalanceTypeEnum.java
new file mode 100644
index 00000000000..d66e3126d5b
--- /dev/null
+++
b/fe/fe-core/src/main/java/org/apache/doris/cloud/catalog/BalanceTypeEnum.java
@@ -0,0 +1,69 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.cloud.catalog;
+
+import org.apache.doris.common.Config;
+
+import lombok.Getter;
+
+/**
+ * Enum for balance type options
+ */
+@Getter
+public enum BalanceTypeEnum {
+ WITHOUT_WARMUP("without_warmup"),
+ ASYNC_WARMUP("async_warmup"),
+ SYNC_WARMUP("sync_warmup"),
+ PEER_READ_ASYNC_WARMUP("peer_read_async_warmup");
+
+ private final String value;
+
+ BalanceTypeEnum(String value) {
+ this.value = value;
+ }
+
+ /**
+ * Parse string value to enum, case-insensitive
+ */
+ public static BalanceTypeEnum fromString(String value) {
+ if (value == null) {
+ return null;
+ }
+ for (BalanceTypeEnum type : BalanceTypeEnum.values()) {
+ if (type.value.equalsIgnoreCase(value)) {
+ return type;
+ }
+ }
+ return null;
+ }
+
+ /**
+ * Check if the given string is a valid balance type
+ */
+ public static boolean isValid(String value) {
+ return fromString(value) != null;
+ }
+
+ /**
+ * Get the balance type enum from the configuration string
+ */
+ public static BalanceTypeEnum getCloudWarmUpForRebalanceTypeEnum() {
+ return fromString(Config.cloud_warm_up_for_rebalance_type) == null
+ ? ComputeGroup.DEFAULT_COMPUTE_GROUP_BALANCE_ENUM :
fromString(Config.cloud_warm_up_for_rebalance_type);
+ }
+}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/cloud/catalog/CloudInstanceStatusChecker.java
b/fe/fe-core/src/main/java/org/apache/doris/cloud/catalog/CloudInstanceStatusChecker.java
index 044f24d2242..40feb8b787b 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/cloud/catalog/CloudInstanceStatusChecker.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/cloud/catalog/CloudInstanceStatusChecker.java
@@ -99,10 +99,63 @@ public class CloudInstanceStatusChecker extends
MasterDaemon {
List<Cloud.ClusterPB> virtualClusters = new ArrayList<>();
List<Cloud.ClusterPB> computeClusters = new ArrayList<>();
categorizeClusters(clusters, virtualClusters, computeClusters);
+ handleComputeClusters(computeClusters);
handleVirtualClusters(virtualClusters, computeClusters);
removeObsoleteVirtualGroups(virtualClusters);
}
+ private void handleComputeClusters(List<Cloud.ClusterPB> computeClusters) {
+ for (Cloud.ClusterPB computeClusterInMs : computeClusters) {
+ ComputeGroup computeGroupInFe = cloudSystemInfoService
+ .getComputeGroupById(computeClusterInMs.getClusterId());
+ if (computeGroupInFe == null) {
+ // cluster checker will sync it
+ LOG.info("found compute cluster {} in ms, but not in fe mem, "
+ + "it may be wait cluster checker to sync, ignore it",
+ computeClusterInMs);
+ } else {
+ // exist compute group, check properties changed and update if
needed
+ updatePropertiesIfChanged(computeGroupInFe,
computeClusterInMs);
+ }
+ }
+ }
+
+ /**
+ * Compare properties between compute cluster in MS and compute group in
FE,
+ * update only the changed key-value pairs to avoid unnecessary updates.
+ */
+ private void updatePropertiesIfChanged(ComputeGroup computeGroupInFe,
Cloud.ClusterPB computeClusterInMs) {
+ Map<String, String> propertiesInMs =
computeClusterInMs.getPropertiesMap();
+ Map<String, String> propertiesInFe = computeGroupInFe.getProperties();
+
+ if (propertiesInMs == null || propertiesInMs.isEmpty()) {
+ return;
+ }
+ Map<String, String> changedProperties = new HashMap<>();
+
+ // Check for changed or new properties
+ for (Map.Entry<String, String> entry : propertiesInMs.entrySet()) {
+ String key = entry.getKey();
+ String valueInMs = entry.getValue();
+ String valueInFe = propertiesInFe.get(key);
+
+ if (valueInFe != null && valueInFe.equalsIgnoreCase(valueInMs)) {
+ continue;
+ }
+ changedProperties.put(key, valueInMs);
+
+ LOG.debug("Property changed for compute group {}: {} = {} (was:
{})",
+ computeGroupInFe.getName(), key, valueInMs, valueInFe);
+ }
+
+ // Only update if there are actual changes
+ if (!changedProperties.isEmpty()) {
+ LOG.info("Updating properties for compute group {}: {}",
+ computeGroupInFe.getName(), changedProperties);
+ computeGroupInFe.setProperties(changedProperties);
+ }
+ }
+
private void categorizeClusters(List<Cloud.ClusterPB> clusters,
List<Cloud.ClusterPB> virtualClusters,
List<Cloud.ClusterPB> computeClusters) {
for (Cloud.ClusterPB cluster : clusters) {
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/cloud/catalog/CloudTabletRebalancer.java
b/fe/fe-core/src/main/java/org/apache/doris/cloud/catalog/CloudTabletRebalancer.java
index d8d6b712b2e..a33667f0f64 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/cloud/catalog/CloudTabletRebalancer.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/cloud/catalog/CloudTabletRebalancer.java
@@ -112,6 +112,54 @@ public class CloudTabletRebalancer extends MasterDaemon {
private CloudSystemInfoService cloudSystemInfoService;
+ private BalanceTypeEnum globalBalanceTypeEnum =
BalanceTypeEnum.getCloudWarmUpForRebalanceTypeEnum();
+
+ /**
+ * Get the current balance type for a compute group, falling back to
global balance type if not found
+ */
+ private BalanceTypeEnum getCurrentBalanceType(String clusterId) {
+ ComputeGroup cg =
cloudSystemInfoService.getComputeGroupById(clusterId);
+ if (cg == null) {
+ LOG.debug("compute group not found, use global balance type, id
{}", clusterId);
+ return globalBalanceTypeEnum;
+ }
+
+ BalanceTypeEnum computeGroupBalanceType = cg.getBalanceType();
+ if (isComputeGroupBalanceChanged(clusterId)) {
+ return computeGroupBalanceType;
+ }
+ return globalBalanceTypeEnum;
+ }
+
+ /**
+ * Get the current task timeout for a compute group, falling back to
global timeout if not found
+ */
+ private int getCurrentTaskTimeout(String clusterId) {
+ ComputeGroup cg =
cloudSystemInfoService.getComputeGroupById(clusterId);
+ if (cg == null) {
+ return Config.cloud_pre_heating_time_limit_sec;
+ }
+
+ int computeGroupTimeout = cg.getBalanceWarmUpTaskTimeout();
+ if (isComputeGroupBalanceChanged(clusterId)) {
+ return computeGroupTimeout;
+ }
+
+ return Config.cloud_pre_heating_time_limit_sec;
+ }
+
+ private boolean isComputeGroupBalanceChanged(String clusterId) {
+ ComputeGroup cg =
cloudSystemInfoService.getComputeGroupById(clusterId);
+ if (cg == null) {
+ return false;
+ }
+
+ BalanceTypeEnum computeGroupBalanceType = cg.getBalanceType();
+ int computeGroupTimeout = cg.getBalanceWarmUpTaskTimeout();
+ return computeGroupBalanceType !=
ComputeGroup.DEFAULT_COMPUTE_GROUP_BALANCE_ENUM
+ || computeGroupTimeout !=
ComputeGroup.DEFAULT_BALANCE_WARM_UP_TASK_TIMEOUT;
+ }
+
public CloudTabletRebalancer(CloudSystemInfoService
cloudSystemInfoService) {
super("cloud tablet rebalancer",
Config.cloud_tablet_rebalancer_interval_second * 1000);
this.cloudSystemInfoService = cloudSystemInfoService;
@@ -239,6 +287,7 @@ public class CloudTabletRebalancer extends MasterDaemon {
LOG.info("cloud tablet rebalance begin");
long start = System.currentTimeMillis();
+ globalBalanceTypeEnum =
BalanceTypeEnum.getCloudWarmUpForRebalanceTypeEnum();
buildClusterToBackendMap();
if (!completeRouteInfo()) {
@@ -417,10 +466,25 @@ public class CloudTabletRebalancer extends MasterDaemon {
public void checkInflightWarmUpCacheAsync() {
Map<Long, List<InfightTask>> beToInfightTasks = new HashMap<Long,
List<InfightTask>>();
+ Set<InfightTablet> invalidTasks = new HashSet<>();
for (Map.Entry<InfightTablet, InfightTask> entry :
tabletToInfightTask.entrySet()) {
+ String clusterId = entry.getKey().getClusterId();
+ BalanceTypeEnum balanceTypeEnum = getCurrentBalanceType(clusterId);
+ if (balanceTypeEnum == BalanceTypeEnum.WITHOUT_WARMUP) {
+ // no need check warmup cache async
+ invalidTasks.add(entry.getKey());
+ continue;
+ }
beToInfightTasks.putIfAbsent(entry.getValue().destBe, new
ArrayList<>());
beToInfightTasks.get(entry.getValue().destBe).add(entry.getValue());
}
+ invalidTasks.forEach(key -> {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("remove inflight warmup task tablet {} cluster {} no
need warmup",
+ key.getTabletId(), key.getClusterId());
+ }
+ tabletToInfightTask.remove(key);
+ });
List<UpdateCloudReplicaInfo> infos = new ArrayList<>();
long needRehashDeadTime = System.currentTimeMillis() -
Config.rehash_tablet_after_be_dead_seconds * 1000L;
@@ -432,6 +496,7 @@ public class CloudTabletRebalancer extends MasterDaemon {
destBackend = null;
}
if (destBackend == null || (!destBackend.isAlive() &&
destBackend.getLastUpdateMs() < needRehashDeadTime)) {
+ // dest backend not exist or dead too long, need remove all
inflight tasks in this dest backend
List<InfightTablet> toRemove = new LinkedList<>();
for (InfightTask task : entry.getValue()) {
for (InfightTablet key : tabletToInfightTask.keySet()) {
@@ -447,10 +512,12 @@ public class CloudTabletRebalancer extends MasterDaemon {
continue;
}
if (!destBackend.isAlive()) {
+ // dest backend dead, dead time smaller than
rehash_tablet_after_be_dead_seconds, wait next time
continue;
}
List<Long> tablets = entry.getValue().stream()
.map(task ->
task.pickedTablet.getId()).collect(Collectors.toList());
+ // check dest backend whether warmup cache done
Map<Long, Boolean> taskDone =
sendCheckWarmUpCacheAsyncRpc(tablets, entry.getKey());
if (taskDone == null) {
LOG.warn("sendCheckWarmUpCacheAsyncRpc return null be {},
inFight tasks {}",
@@ -461,17 +528,7 @@ public class CloudTabletRebalancer extends MasterDaemon {
for (Map.Entry<Long, Boolean> result : taskDone.entrySet()) {
InfightTask task = tabletToInfightTask
.getOrDefault(new InfightTablet(result.getKey(),
clusterId), null);
- if (task != null && (result.getValue() ||
System.currentTimeMillis() / 1000 - task.startTimestamp
- > Config.cloud_pre_heating_time_limit_sec)) {
- if (!result.getValue()) {
- LOG.info("{} pre cache timeout, forced to change the
mapping", result.getKey());
- }
- updateClusterToBeMap(task.pickedTablet, task.destBe,
clusterId, infos);
- if (LOG.isDebugEnabled()) {
- LOG.debug("remove tablet {}-{}", clusterId,
task.pickedTablet.getId());
- }
- tabletToInfightTask.remove(new
InfightTablet(task.pickedTablet.getId(), clusterId));
- }
+ handleWarmupCompletion(task, clusterId, result.getValue(),
result.getKey(), infos);
}
}
long oldSize = infos.size();
@@ -855,6 +912,70 @@ public class CloudTabletRebalancer extends MasterDaemon {
return null;
}
+ private void handleWarmupCompletion(InfightTask task, String clusterId,
boolean isDone, long tabletId,
+ List<UpdateCloudReplicaInfo> infos)
{
+ if (task == null) {
+ LOG.warn("cannot find inflight task for tablet {}-{}", clusterId,
tabletId);
+ return;
+ }
+ boolean shouldUpdateMapping = false;
+ BalanceTypeEnum currentBalanceType = getCurrentBalanceType(clusterId);
+ LOG.debug("cluster id {}, balance type {}, tabletId {}, ", clusterId,
currentBalanceType, tabletId);
+
+ switch (currentBalanceType) {
+ case ASYNC_WARMUP: {
+ int currentTaskTimeout = getCurrentTaskTimeout(clusterId);
+ boolean timeExceeded = System.currentTimeMillis() / 1000 -
task.startTimestamp > currentTaskTimeout;
+ LOG.debug("tablet {}-{} warmup cache isDone {} timeExceeded
{}",
+ clusterId, tabletId, isDone, timeExceeded);
+ if (isDone || timeExceeded) {
+ if (!isDone) {
+ // timeout but not done, not normal, info log
+ LOG.info("{}-{} warmup cache timeout {}, forced to
change the mapping",
+ clusterId, tabletId, currentTaskTimeout);
+ } else {
+ // done, normal
+ LOG.debug("{}-{} warmup cache done, change the
mapping", clusterId, tabletId);
+ }
+ shouldUpdateMapping = true;
+ }
+ break;
+ }
+ case SYNC_WARMUP: {
+ if (isDone) {
+ // done, normal
+ LOG.debug("{} sync cache done, change the mapping",
tabletId);
+ shouldUpdateMapping = true;
+ }
+ break;
+ }
+ default:
+ break;
+ }
+
+ if (!shouldUpdateMapping) {
+ return;
+ }
+
+ updateClusterToBeMap(task.pickedTablet, task.destBe, clusterId, infos);
+
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("remove tablet {}-{}", clusterId,
task.pickedTablet.getId());
+ }
+ tabletToInfightTask.remove(new
InfightTablet(task.pickedTablet.getId(), clusterId));
+
+ if (BalanceTypeEnum.SYNC_WARMUP.equals(currentBalanceType)) {
+ try {
+ // send sync cache rpc again, ignore the result, the best
effort to sync some new data
+ sendPreHeatingRpc(task.pickedTablet, task.srcBe, task.destBe);
+ } catch (Exception e) {
+ LOG.warn("Failed to preheat tablet {} from {} to {}, "
+ + "help msg turn off fe config
enable_cloud_warm_up_for_rebalance",
+ task.pickedTablet.getId(), task.srcBe, task.destBe, e);
+ }
+ }
+ }
+
private void updateBeToTablets(Tablet pickedTablet, long srcBe, long
destBe,
Map<Long, Set<Tablet>> globalBeToTablets,
Map<Long, Map<Long, Set<Tablet>>> beToTabletsInTable,
@@ -1050,6 +1171,10 @@ public class CloudTabletRebalancer extends MasterDaemon {
long avgNum = totalTabletsNum / beNum;
long transferNum = calculateTransferNum(avgNum);
+ BalanceTypeEnum currentBalanceType = getCurrentBalanceType(clusterId);
+ LOG.debug("balance type {}, be num {}, total tablets num {}, avg num
{}, transfer num {}",
+ currentBalanceType, beNum, totalTabletsNum, avgNum,
transferNum);
+
for (int i = 0; i < transferNum; i++) {
TransferPairInfo pairInfo = new TransferPairInfo();
if (!getTransferPair(bes, beToTablets, avgNum, pairInfo)) {
@@ -1069,17 +1194,34 @@ public class CloudTabletRebalancer extends MasterDaemon
{
CloudReplica cloudReplica = (CloudReplica)
pickedTablet.getReplicas().get(0);
Backend srcBackend = Env.getCurrentSystemInfo().getBackend(srcBe);
- if (Config.enable_cloud_warm_up_for_rebalance && srcBackend !=
null && srcBackend.isAlive()) {
- if (isConflict(srcBe, destBe, cloudReplica, balanceType,
- futurePartitionToTablets, futureBeToTabletsInTable)) {
+ if ((BalanceTypeEnum.WITHOUT_WARMUP.equals(currentBalanceType)
+ ||
BalanceTypeEnum.PEER_READ_ASYNC_WARMUP.equals(currentBalanceType))
+ && srcBackend != null && srcBackend.isAlive()) {
+ // direct switch, update fe meta directly, not send preheating
task
+ if (isConflict(srcBe, destBe, cloudReplica, balanceType,
partitionToTablets, beToTabletsInTable)) {
continue;
}
- preheatAndUpdateTablet(pickedTablet, srcBe, destBe, clusterId,
balanceType, beToTablets);
+ transferTablet(pickedTablet, srcBe, destBe, clusterId,
balanceType, infos);
+ if
(BalanceTypeEnum.PEER_READ_ASYNC_WARMUP.equals(currentBalanceType)) {
+ LOG.debug("directly switch {} from {} to {}, cluster {}",
pickedTablet.getId(), srcBe, destBe,
+ clusterId);
+ // send sync cache rpc, best effort
+ try {
+ sendPreHeatingRpc(pickedTablet, srcBe, destBe);
+ } catch (Exception e) {
+ LOG.debug("Failed to preheat tablet {} from {} to {}, "
+ + "directly policy, just ignore the error",
+ pickedTablet.getId(), srcBe, destBe, e);
+ return;
+ }
+ }
} else {
- if (isConflict(srcBe, destBe, cloudReplica, balanceType,
partitionToTablets, beToTabletsInTable)) {
+ // cache warm up
+ if (isConflict(srcBe, destBe, cloudReplica, balanceType,
+ futurePartitionToTablets, futureBeToTabletsInTable)) {
continue;
}
- transferTablet(pickedTablet, srcBe, destBe, clusterId,
balanceType, infos);
+ preheatAndUpdateTablet(pickedTablet, srcBe, destBe, clusterId,
balanceType, beToTablets);
}
}
}
@@ -1147,7 +1289,8 @@ public class CloudTabletRebalancer extends MasterDaemon {
private void transferTablet(Tablet pickedTablet, long srcBe, long destBe,
String clusterId,
BalanceType balanceType,
List<UpdateCloudReplicaInfo> infos) {
- LOG.info("transfer {} from {} to {}, cluster {}",
pickedTablet.getId(), srcBe, destBe, clusterId);
+ LOG.info("transfer {} from {} to {}, cluster {}, type {}",
+ pickedTablet.getId(), srcBe, destBe, clusterId, balanceType);
updateBeToTablets(pickedTablet, srcBe, destBe,
beToTabletsGlobal, beToTabletsInTable, partitionToTablets);
updateBeToTablets(pickedTablet, srcBe, destBe,
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/cloud/catalog/ComputeGroup.java
b/fe/fe-core/src/main/java/org/apache/doris/cloud/catalog/ComputeGroup.java
index 82c24afd7c5..c895f13f7a0 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/cloud/catalog/ComputeGroup.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/cloud/catalog/ComputeGroup.java
@@ -18,7 +18,11 @@
package org.apache.doris.cloud.catalog;
import org.apache.doris.cloud.proto.Cloud;
+import org.apache.doris.common.Config;
+import org.apache.doris.common.DdlException;
+import com.google.common.collect.ImmutableSet;
+import com.google.common.collect.Maps;
import com.google.gson.Gson;
import lombok.Getter;
import lombok.Setter;
@@ -33,6 +37,27 @@ import java.util.Map;
public class ComputeGroup {
private static final Logger LOG = LogManager.getLogger(ComputeGroup.class);
+ public static final String BALANCE_TYPE = "balance_type";
+
+ public static final String BALANCE_WARM_UP_TASK_TIMEOUT =
"balance_warm_up_task_timeout";
+
+ private static final ImmutableSet<String> ALL_PROPERTIES_NAME = new
ImmutableSet.Builder<String>()
+ .add(BALANCE_TYPE).add(BALANCE_WARM_UP_TASK_TIMEOUT).build();
+
+ private static final Map<String, String> ALL_PROPERTIES_DEFAULT_VALUE_MAP
= Maps.newHashMap();
+
+ public static final int DEFAULT_BALANCE_WARM_UP_TASK_TIMEOUT =
Config.cloud_pre_heating_time_limit_sec;
+ public static final BalanceTypeEnum DEFAULT_COMPUTE_GROUP_BALANCE_ENUM
+ =
BalanceTypeEnum.fromString(Config.cloud_warm_up_for_rebalance_type);
+
+ static {
+ ALL_PROPERTIES_DEFAULT_VALUE_MAP.put(BALANCE_TYPE,
DEFAULT_COMPUTE_GROUP_BALANCE_ENUM.getValue());
+ if
(BalanceTypeEnum.ASYNC_WARMUP.getValue().equals(Config.cloud_warm_up_for_rebalance_type))
{
+ ALL_PROPERTIES_DEFAULT_VALUE_MAP.put(BALANCE_WARM_UP_TASK_TIMEOUT,
+ String.valueOf(DEFAULT_BALANCE_WARM_UP_TASK_TIMEOUT));
+ }
+ }
+
private enum PolicyTypeEnum {
ActiveStandby,
}
@@ -110,6 +135,10 @@ public class ComputeGroup {
@Setter
private boolean needRebuildFileCache = false;
+ @Getter
+ @Setter
+ private Map<String, String> properties = new
LinkedHashMap<>(ALL_PROPERTIES_DEFAULT_VALUE_MAP);
+
public ComputeGroup(String id, String name, ComputeTypeEnum type) {
this.id = id;
this.name = name;
@@ -134,6 +163,129 @@ public class ComputeGroup {
return policy.getStandbyComputeGroup();
}
+ private void validateTimeoutRestriction(Map<String, String>
inputProperties) throws DdlException {
+ if (!properties.containsKey(BALANCE_TYPE)) {
+ return;
+ }
+ String originalBalanceType = properties.get(BALANCE_TYPE);
+ if
(BalanceTypeEnum.ASYNC_WARMUP.getValue().equals(originalBalanceType)) {
+ return;
+ }
+
+ if (inputProperties.containsKey(BALANCE_TYPE)
+ &&
BalanceTypeEnum.ASYNC_WARMUP.getValue().equals(inputProperties.get(BALANCE_TYPE)))
{
+ return;
+ }
+
+ if (inputProperties.containsKey(BALANCE_WARM_UP_TASK_TIMEOUT)) {
+ throw new DdlException("Property " + BALANCE_WARM_UP_TASK_TIMEOUT
+ + " cannot be set when current " + BALANCE_TYPE + " is " +
originalBalanceType
+ + ". Only async_warmup type supports timeout setting.");
+ }
+ }
+
+ /**
+ * Validate a single property key-value pair.
+ */
+ private static void validateProperty(String key, String value) throws
DdlException {
+ if (value == null || value.isEmpty()) {
+ return;
+ }
+
+ if (!ALL_PROPERTIES_NAME.contains(key)) {
+ throw new DdlException("Property " + key + " is not supported");
+ }
+
+ // Validate specific properties
+ if (BALANCE_TYPE.equals(key)) {
+ if (!BalanceTypeEnum.isValid(value)) {
+ throw new DdlException("Property " + BALANCE_TYPE
+ + " only support without_warmup or async_warmup or
sync_warmup");
+ }
+ } else if (BALANCE_WARM_UP_TASK_TIMEOUT.equals(key)) {
+ try {
+ int timeout = Integer.parseInt(value);
+ if (timeout <= 0) {
+ throw new DdlException("Property " +
BALANCE_WARM_UP_TASK_TIMEOUT + " must be positive integer");
+ }
+ } catch (NumberFormatException e) {
+ throw new DdlException("Property " +
BALANCE_WARM_UP_TASK_TIMEOUT + " must be positive integer");
+ }
+ }
+ }
+
+ public void checkProperties(Map<String, String> inputProperties) throws
DdlException {
+ if (inputProperties == null || inputProperties.isEmpty()) {
+ return;
+ }
+
+ for (Map.Entry<String, String> entry : inputProperties.entrySet()) {
+ validateProperty(entry.getKey(), entry.getValue());
+ }
+
+ validateTimeoutRestriction(inputProperties);
+ }
+
+ public void modifyProperties(Map<String, String> inputProperties) throws
DdlException {
+ String balanceType = inputProperties.get(BALANCE_TYPE);
+ if (balanceType == null) {
+ return;
+ }
+ if (BalanceTypeEnum.WITHOUT_WARMUP.getValue().equals(balanceType)
+ || BalanceTypeEnum.SYNC_WARMUP.getValue().equals(balanceType)
+ ||
BalanceTypeEnum.PEER_READ_ASYNC_WARMUP.getValue().equals(balanceType)) {
+ // delete BALANCE_WARM_UP_TASK_TIMEOUT if exists
+ properties.remove(BALANCE_WARM_UP_TASK_TIMEOUT);
+ } else if
(BalanceTypeEnum.ASYNC_WARMUP.getValue().equals(balanceType)) {
+ // if BALANCE_WARM_UP_TASK_TIMEOUT exists, it has been validated
in validateProperty
+ if (!properties.containsKey(BALANCE_WARM_UP_TASK_TIMEOUT)) {
+ properties.put(BALANCE_WARM_UP_TASK_TIMEOUT,
String.valueOf(DEFAULT_BALANCE_WARM_UP_TASK_TIMEOUT));
+ }
+ }
+ }
+
+ // set properties, just set in periodic instance status checker
+ public void setProperties(Map<String, String> propertiesInMs) {
+ if (propertiesInMs == null || propertiesInMs.isEmpty()) {
+ return;
+ }
+
+ for (Map.Entry<String, String> entry : propertiesInMs.entrySet()) {
+ String key = entry.getKey();
+ String value = entry.getValue();
+
+ try {
+ validateProperty(key, value);
+ } catch (DdlException e) {
+ LOG.warn("ignore invalid property. compute group: {}, key: {},
value: {}, error: {}",
+ name, key, value, e.getMessage());
+ continue;
+ }
+
+ if (value != null && !value.isEmpty()) {
+ properties.put(key, value);
+ }
+ }
+ }
+
+ public BalanceTypeEnum getBalanceType() {
+ String balanceType = properties.get(BALANCE_TYPE);
+ BalanceTypeEnum type = BalanceTypeEnum.fromString(balanceType);
+ if (type == null) {
+ return BalanceTypeEnum.ASYNC_WARMUP;
+ }
+ return type;
+ }
+
+ public int getBalanceWarmUpTaskTimeout() {
+ String timeoutStr = properties.get(BALANCE_WARM_UP_TASK_TIMEOUT);
+ try {
+ return Integer.parseInt(timeoutStr);
+ } catch (NumberFormatException e) {
+ return DEFAULT_BALANCE_WARM_UP_TASK_TIMEOUT;
+ }
+ }
+
@Override
public String toString() {
Map<String, String> showMap = new LinkedHashMap<>();
@@ -143,6 +295,7 @@ public class ComputeGroup {
showMap.put("unavailableSince", String.valueOf(unavailableSince));
showMap.put("availableSince", String.valueOf(availableSince));
showMap.put("policy", policy == null ? "no_policy" :
policy.toString());
+ showMap.put("properties", properties.toString());
Gson gson = new Gson();
return gson.toJson(showMap);
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/cloud/system/CloudSystemInfoService.java
b/fe/fe-core/src/main/java/org/apache/doris/cloud/system/CloudSystemInfoService.java
index de28e19dddc..c429bc57347 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/cloud/system/CloudSystemInfoService.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/cloud/system/CloudSystemInfoService.java
@@ -1658,4 +1658,48 @@ public class CloudSystemInfoService extends
SystemInfoService {
LOG.info("alter rename compute group, request: {}, response: {}",
request, response);
}
}
+
+ public void alterComputeGroupProperties(String computeGroupName,
Map<String, String> properties)
+ throws UserException {
+ String cloudInstanceId = ((CloudEnv)
Env.getCurrentEnv()).getCloudInstanceId();
+ if (Strings.isNullOrEmpty(cloudInstanceId)) {
+ throw new DdlException("unable to alter compute group properties
due to empty cloud_instance_id");
+ }
+ String computeGroupId = ((CloudSystemInfoService)
Env.getCurrentSystemInfo())
+ .getCloudClusterIdByName(computeGroupName);
+ if (Strings.isNullOrEmpty(computeGroupId)) {
+ LOG.info("alter compute group properties {} not found, unable to
alter", computeGroupName);
+ throw new DdlException("compute group '" + computeGroupName + "'
not found, unable to alter properties");
+ }
+
+ ClusterPB clusterPB = ClusterPB.newBuilder()
+ .setClusterId(computeGroupId)
+ .setClusterName(computeGroupName)
+ .setType(ClusterPB.Type.COMPUTE)
+ .putAllProperties(properties)
+ .build();
+
+ Cloud.AlterClusterRequest request =
Cloud.AlterClusterRequest.newBuilder()
+ .setInstanceId(((CloudEnv)
Env.getCurrentEnv()).getCloudInstanceId())
+ .setOp(Cloud.AlterClusterRequest.Operation.ALTER_PROPERTIES)
+ .setCluster(clusterPB)
+ .build();
+
+
+ Cloud.AlterClusterResponse response = null;
+ try {
+ response = MetaServiceProxy.getInstance().alterCluster(request);
+ if (response.getStatus().getCode() != Cloud.MetaServiceCode.OK) {
+ LOG.warn("alter compute group properties not ok, response:
{}", response);
+ throw new UserException("failed to alter compute group
properties errorCode: "
+ + response.getStatus().getCode()
+ + " msg: " + response.getStatus().getMsg() + " may be you
can try later");
+ }
+ } catch (RpcException e) {
+ LOG.warn("alter compute group properties rpc exception");
+ throw new UserException("failed to alter compute group
properties", e);
+ } finally {
+ LOG.info("alter compute group properties, request: {}, response:
{}", request, response);
+ }
+ }
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/nereids/parser/LogicalPlanBuilder.java
b/fe/fe-core/src/main/java/org/apache/doris/nereids/parser/LogicalPlanBuilder.java
index d6a2cd412a6..f348a1bfeaa 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/nereids/parser/LogicalPlanBuilder.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/nereids/parser/LogicalPlanBuilder.java
@@ -98,6 +98,7 @@ import
org.apache.doris.nereids.DorisParser.AliasedQueryContext;
import org.apache.doris.nereids.DorisParser.AlterCatalogCommentContext;
import org.apache.doris.nereids.DorisParser.AlterCatalogPropertiesContext;
import org.apache.doris.nereids.DorisParser.AlterCatalogRenameContext;
+import org.apache.doris.nereids.DorisParser.AlterComputeGroupContext;
import org.apache.doris.nereids.DorisParser.AlterDatabasePropertiesContext;
import org.apache.doris.nereids.DorisParser.AlterDatabaseRenameContext;
import org.apache.doris.nereids.DorisParser.AlterDatabaseSetQuotaContext;
@@ -620,6 +621,7 @@ import
org.apache.doris.nereids.trees.plans.commands.AlterCatalogPropertiesComma
import org.apache.doris.nereids.trees.plans.commands.AlterCatalogRenameCommand;
import org.apache.doris.nereids.trees.plans.commands.AlterColocateGroupCommand;
import org.apache.doris.nereids.trees.plans.commands.AlterColumnStatsCommand;
+import org.apache.doris.nereids.trees.plans.commands.AlterComputeGroupCommand;
import
org.apache.doris.nereids.trees.plans.commands.AlterDatabasePropertiesCommand;
import org.apache.doris.nereids.trees.plans.commands.AlterJobCommand;
import org.apache.doris.nereids.trees.plans.commands.AlterMTMVCommand;
@@ -6329,6 +6331,13 @@ public class LogicalPlanBuilder extends
DorisParserBaseVisitor<Object> {
return new AlterWorkloadGroupCommand(cgName,
stripQuotes(ctx.name.getText()), properties);
}
+ @Override
+ public LogicalPlan visitAlterComputeGroup(AlterComputeGroupContext ctx) {
+ Map<String, String> properties = ctx.propertyClause() != null
+ ? Maps.newHashMap(visitPropertyClause(ctx.propertyClause())) :
Maps.newHashMap();
+ return new AlterComputeGroupCommand(ctx.name.getText(), properties);
+ }
+
@Override
public LogicalPlan visitAlterWorkloadPolicy(AlterWorkloadPolicyContext
ctx) {
Map<String, String> properties = ctx.propertyClause() != null
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/PlanType.java
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/PlanType.java
index 6fc59b0f584..f50bd047f3a 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/PlanType.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/PlanType.java
@@ -206,6 +206,7 @@ public enum PlanType {
ALTER_STORAGE_POLICY_COMMAND,
ALTER_STORAGE_VAULT,
ALTER_WORKLOAD_GROUP_COMMAND,
+ ALTER_COMPUTE_GROUP_COMMAND,
ALTER_WORKLOAD_POLICY_COMMAND,
ALTER_DATABASE_RENAME_COMMAND,
ALTER_DATABASE_SET_DATA_QUOTA_COMMAND,
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/AlterComputeGroupCommand.java
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/AlterComputeGroupCommand.java
new file mode 100644
index 00000000000..8a295678f48
--- /dev/null
+++
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/AlterComputeGroupCommand.java
@@ -0,0 +1,98 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.nereids.trees.plans.commands;
+
+import org.apache.doris.catalog.Env;
+import org.apache.doris.cloud.catalog.ComputeGroup;
+import org.apache.doris.cloud.system.CloudSystemInfoService;
+import org.apache.doris.common.AnalysisException;
+import org.apache.doris.common.Config;
+import org.apache.doris.common.ErrorCode;
+import org.apache.doris.common.ErrorReport;
+import org.apache.doris.common.UserException;
+import org.apache.doris.mysql.privilege.PrivPredicate;
+import org.apache.doris.nereids.trees.plans.PlanType;
+import org.apache.doris.nereids.trees.plans.visitor.PlanVisitor;
+import org.apache.doris.qe.ConnectContext;
+import org.apache.doris.qe.StmtExecutor;
+
+//import org.apache.commons.lang3.StringUtils;
+import java.util.Map;
+
+/**
+ * alter compute group command
+ */
+public class AlterComputeGroupCommand extends AlterCommand {
+ private final String computeGroupName;
+ private final Map<String, String> properties;
+
+ /**
+ * constructor
+ */
+ public AlterComputeGroupCommand(String computeGroupName, Map<String,
String> properties) {
+ super(PlanType.ALTER_COMPUTE_GROUP_COMMAND);
+ this.computeGroupName = computeGroupName;
+ this.properties = properties;
+ }
+
+ /**
+ * validate
+ */
+ public void validate(ConnectContext connectContext) throws UserException {
+ if (Config.isNotCloudMode()) {
+ throw new AnalysisException("Currently, Alter compute group is
only supported in cloud mode");
+ }
+ // check auth
+ if
(!Env.getCurrentEnv().getAccessManager().checkGlobalPriv(ConnectContext.get(),
PrivPredicate.ADMIN)) {
+
ErrorReport.reportAnalysisException(ErrorCode.ERR_SPECIFIC_ACCESS_DENIED_ERROR,
"ADMIN");
+ }
+
+ if (properties == null || properties.isEmpty()) {
+ throw new AnalysisException("Compute Group properties can't be
empty");
+ }
+
+ CloudSystemInfoService cloudSys = ((CloudSystemInfoService)
Env.getCurrentSystemInfo());
+ // check compute group exist
+ ComputeGroup cg = cloudSys.getComputeGroupByName(computeGroupName);
+ if (cg == null) {
+ throw new AnalysisException("Compute Group " + computeGroupName +
" does not exist");
+ }
+
+ if (cg.isVirtual()) {
+ throw new AnalysisException("Virtual Compute Group " +
computeGroupName + " can not be altered");
+ }
+
+ // check compute group's properties can be modified
+ cg.checkProperties(properties);
+
+ cg.modifyProperties(properties);
+ }
+
+ @Override
+ public void doRun(ConnectContext ctx, StmtExecutor executor) throws
Exception {
+ validate(ctx);
+ CloudSystemInfoService cloudSys = ((CloudSystemInfoService)
Env.getCurrentSystemInfo());
+ // send rpc to ms
+ cloudSys.alterComputeGroupProperties(computeGroupName, properties);
+ }
+
+ @Override
+ public <R, C> R accept(PlanVisitor<R, C> visitor, C context) {
+ return visitor.visitAlterComputeGroupCommand(this, context);
+ }
+}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/ShowClustersCommand.java
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/ShowClustersCommand.java
index da61a861ce6..7ea77b085a8 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/ShowClustersCommand.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/ShowClustersCommand.java
@@ -59,10 +59,12 @@ import java.util.stream.Collectors;
public class ShowClustersCommand extends ShowCommand {
// sql: show clusters;
public static final ImmutableList<String> CLUSTER_TITLE_NAMES = new
ImmutableList.Builder<String>()
-
.add("cluster").add("is_current").add("users").add("backend_num").add("sub_clusters").add("policy").build();
+ .add("cluster").add("is_current").add("users").add("backend_num")
+ .add("sub_clusters").add("policy").add("properties").build();
// sql: show compute groups;
public static final ImmutableList<String> COMPUTE_GROUP_TITLE_NAMES = new
ImmutableList.Builder<String>()
-
.add("Name").add("IsCurrent").add("Users").add("BackendNum").add("SubComputeGroups").add("Policy").build();
+ .add("Name").add("IsCurrent").add("Users").add("BackendNum")
+ .add("SubComputeGroups").add("Policy").add("Properties").build();
private static final Logger LOG =
LogManager.getLogger(ShowClustersCommand.class);
private final boolean isComputeGroup;
@@ -105,13 +107,17 @@ public class ShowClustersCommand extends ShowCommand {
clusterNameSet.addAll(clusterNames);
for (String clusterName : clusterNameSet) {
- ArrayList<String> row = Lists.newArrayList(clusterName);
// current_used, users
if (!Env.getCurrentEnv().getAccessManager()
.checkCloudPriv(ConnectContext.get().getCurrentUserIdentity(), clusterName,
PrivPredicate.USAGE, ResourceTypeEnum.CLUSTER)) {
continue;
}
+ ComputeGroup cg = cloudSys.getComputeGroupByName(clusterName);
+ if (cg == null) {
+ continue;
+ }
+ ArrayList<String> row = Lists.newArrayList(clusterName);
String clusterNameFromCtx = "";
try {
clusterNameFromCtx = ctx.getCloudCluster();
@@ -142,16 +148,14 @@ public class ShowClustersCommand extends ShowCommand {
rows.add(row);
row.add(subClusterNames);
row.add(policy);
+ row.add(cg.getProperties().toString());
continue;
}
// virtual compute group
// virtual cg backends eq 0
row.add(String.valueOf(0));
rows.add(row);
- ComputeGroup cg = cloudSys.getComputeGroupByName(clusterName);
- if (cg == null) {
- continue;
- }
+
String activeCluster = cg.getPolicy().getActiveComputeGroup();
String standbyCluster = cg.getPolicy().getStandbyComputeGroup();
// first active, second standby
@@ -159,6 +163,7 @@ public class ShowClustersCommand extends ShowCommand {
row.add(subClusterNames);
// Policy
row.add(cg.getPolicy().toString());
+ row.add(cg.getProperties().toString());
}
return new ShowResultSet(getMetaData(), rows);
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/visitor/CommandVisitor.java
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/visitor/CommandVisitor.java
index 3a8ff435ead..b663110a5ca 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/visitor/CommandVisitor.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/visitor/CommandVisitor.java
@@ -41,6 +41,7 @@ import
org.apache.doris.nereids.trees.plans.commands.AlterCatalogPropertiesComma
import org.apache.doris.nereids.trees.plans.commands.AlterCatalogRenameCommand;
import org.apache.doris.nereids.trees.plans.commands.AlterColocateGroupCommand;
import org.apache.doris.nereids.trees.plans.commands.AlterColumnStatsCommand;
+import org.apache.doris.nereids.trees.plans.commands.AlterComputeGroupCommand;
import
org.apache.doris.nereids.trees.plans.commands.AlterDatabasePropertiesCommand;
import org.apache.doris.nereids.trees.plans.commands.AlterJobCommand;
import org.apache.doris.nereids.trees.plans.commands.AlterJobStatusCommand;
@@ -808,6 +809,10 @@ public interface CommandVisitor<R, C> {
return visitCommand(alterWorkloadGroupCommand, context);
}
+ default R visitAlterComputeGroupCommand(AlterComputeGroupCommand
alterComputeGroupCommand, C context) {
+ return visitCommand(alterComputeGroupCommand, context);
+ }
+
default R visitAlterWorkloadPolicyCommand(AlterWorkloadPolicyCommand
alterWorkloadPolicyCommand, C context) {
return visitCommand(alterWorkloadPolicyCommand, context);
}
diff --git
a/fe/fe-core/src/test/java/org/apache/doris/cloud/catalog/ComputeGroupTest.java
b/fe/fe-core/src/test/java/org/apache/doris/cloud/catalog/ComputeGroupTest.java
new file mode 100644
index 00000000000..c61f2abefa4
--- /dev/null
+++
b/fe/fe-core/src/test/java/org/apache/doris/cloud/catalog/ComputeGroupTest.java
@@ -0,0 +1,341 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.cloud.catalog;
+
+import org.apache.doris.common.DdlException;
+
+import com.google.common.collect.Maps;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+import java.util.Map;
+
+public class ComputeGroupTest {
+ private ComputeGroup computeGroup;
+
+ @BeforeEach
+ public void setUp() {
+ computeGroup = new ComputeGroup("test_id", "test_group",
ComputeGroup.ComputeTypeEnum.COMPUTE);
+ }
+
+ @Test
+ public void testCheckPropertiesWithNull() throws DdlException {
+ computeGroup.checkProperties(null);
+ computeGroup.checkProperties(Maps.newHashMap());
+ }
+
+ @Test
+ public void testCheckPropertiesWithValidBalanceType() throws DdlException {
+ // 测试有效的balance_type
+ Map<String, String> properties = Maps.newHashMap();
+ properties.put(ComputeGroup.BALANCE_TYPE,
BalanceTypeEnum.WITHOUT_WARMUP.getValue());
+ computeGroup.checkProperties(properties);
+
+ properties.put(ComputeGroup.BALANCE_TYPE,
BalanceTypeEnum.ASYNC_WARMUP.getValue());
+ computeGroup.checkProperties(properties);
+
+ properties.put(ComputeGroup.BALANCE_TYPE,
BalanceTypeEnum.SYNC_WARMUP.getValue());
+ computeGroup.checkProperties(properties);
+
+ properties.put(ComputeGroup.BALANCE_TYPE,
BalanceTypeEnum.PEER_READ_ASYNC_WARMUP.getValue());
+ computeGroup.checkProperties(properties);
+ }
+
+ @Test
+ public void testCheckPropertiesWithInvalidBalanceType() {
+ // 测试无效的balance_type
+ Map<String, String> properties = Maps.newHashMap();
+ properties.put(ComputeGroup.BALANCE_TYPE, "invalid_type");
+
+ Assertions.assertThrows(DdlException.class, () -> {
+ computeGroup.checkProperties(properties);
+ });
+ }
+
+ @Test
+ public void testCheckPropertiesWithValidTimeout() throws DdlException {
+ // 测试有效的timeout
+ Map<String, String> properties = Maps.newHashMap();
+ properties.put(ComputeGroup.BALANCE_TYPE,
BalanceTypeEnum.ASYNC_WARMUP.getValue());
+ properties.put(ComputeGroup.BALANCE_WARM_UP_TASK_TIMEOUT, "300");
+ computeGroup.checkProperties(properties);
+
+ properties.put(ComputeGroup.BALANCE_WARM_UP_TASK_TIMEOUT, "1");
+ computeGroup.checkProperties(properties);
+ }
+
+ @Test
+ public void testCheckPropertiesWithInvalidTimeout() {
+ // 测试无效的timeout
+ Map<String, String> properties = Maps.newHashMap();
+ properties.put(ComputeGroup.BALANCE_WARM_UP_TASK_TIMEOUT, "-1");
+
+ Assertions.assertThrows(DdlException.class, () -> {
+ computeGroup.checkProperties(properties);
+ });
+
+ properties.put(ComputeGroup.BALANCE_WARM_UP_TASK_TIMEOUT, "invalid");
+ Assertions.assertThrows(DdlException.class, () -> {
+ computeGroup.checkProperties(properties);
+ });
+ }
+
+ @Test
+ public void testCheckPropertiesWithUnsupportedProperty() {
+ // 测试不支持的属性
+ Map<String, String> properties = Maps.newHashMap();
+ properties.put("unsupported_property", "value");
+
+ Assertions.assertThrows(DdlException.class, () -> {
+ computeGroup.checkProperties(properties);
+ });
+ }
+
+ @Test
+ public void testModifyPropertiesWithDirectSwitch() throws DdlException {
+ // 测试without_warmup类型,应该删除timeout
+ Map<String, String> inputProperties = Maps.newHashMap();
+ inputProperties.put(ComputeGroup.BALANCE_TYPE,
BalanceTypeEnum.WITHOUT_WARMUP.getValue());
+ inputProperties.put(ComputeGroup.BALANCE_WARM_UP_TASK_TIMEOUT,
+
String.valueOf(ComputeGroup.DEFAULT_BALANCE_WARM_UP_TASK_TIMEOUT));
+
+ // 先设置timeout到properties中
+
computeGroup.getProperties().put(ComputeGroup.BALANCE_WARM_UP_TASK_TIMEOUT,
+
String.valueOf(ComputeGroup.DEFAULT_BALANCE_WARM_UP_TASK_TIMEOUT));
+
Assertions.assertTrue(computeGroup.getProperties().containsKey(ComputeGroup.BALANCE_WARM_UP_TASK_TIMEOUT));
+
+ computeGroup.modifyProperties(inputProperties);
+
+ // 验证timeout被删除
+
Assertions.assertFalse(computeGroup.getProperties().containsKey(ComputeGroup.BALANCE_WARM_UP_TASK_TIMEOUT));
+ }
+
+ @Test
+ public void testModifyPropertiesWithSyncCache() throws DdlException {
+ // 测试sync_cache类型,应该删除timeout
+ Map<String, String> inputProperties = Maps.newHashMap();
+ inputProperties.put(ComputeGroup.BALANCE_TYPE,
BalanceTypeEnum.SYNC_WARMUP.getValue());
+ inputProperties.put(ComputeGroup.BALANCE_WARM_UP_TASK_TIMEOUT,
+
String.valueOf(ComputeGroup.DEFAULT_BALANCE_WARM_UP_TASK_TIMEOUT));
+
+ // 先设置timeout到properties中
+
computeGroup.getProperties().put(ComputeGroup.BALANCE_WARM_UP_TASK_TIMEOUT,
+
String.valueOf(ComputeGroup.DEFAULT_BALANCE_WARM_UP_TASK_TIMEOUT));
+
Assertions.assertTrue(computeGroup.getProperties().containsKey(ComputeGroup.BALANCE_WARM_UP_TASK_TIMEOUT));
+
+ computeGroup.modifyProperties(inputProperties);
+
+ // 验证timeout被删除
+
Assertions.assertFalse(computeGroup.getProperties().containsKey(ComputeGroup.BALANCE_WARM_UP_TASK_TIMEOUT));
+ }
+
+ @Test
+ public void testCheckPropertiesWithBalanceTypeTransition() throws
DdlException {
+ computeGroup.getProperties().put(ComputeGroup.BALANCE_TYPE,
BalanceTypeEnum.WITHOUT_WARMUP.getValue());
+ Map<String, String> inputProperties = Maps.newHashMap();
+ inputProperties.put(ComputeGroup.BALANCE_TYPE,
BalanceTypeEnum.ASYNC_WARMUP.getValue());
+ computeGroup.checkProperties(inputProperties);
+ }
+
+ @Test
+ public void testCheckPropertiesWithWarmupCacheToWarmupCache() throws
DdlException {
+ computeGroup.getProperties().put(ComputeGroup.BALANCE_TYPE,
BalanceTypeEnum.ASYNC_WARMUP.getValue());
+ Map<String, String> inputProperties = Maps.newHashMap();
+ inputProperties.put(ComputeGroup.BALANCE_TYPE,
BalanceTypeEnum.ASYNC_WARMUP.getValue());
+ computeGroup.checkProperties(inputProperties);
+ }
+
+ @Test
+ public void testCheckPropertiesWithDirectSwitchToDirectSwitch() throws
DdlException {
+ // 测试从direct_switch转换到direct_switch,不需要设置timeout
+ computeGroup.getProperties().put(ComputeGroup.BALANCE_TYPE,
BalanceTypeEnum.WITHOUT_WARMUP.getValue());
+ Map<String, String> inputProperties = Maps.newHashMap();
+ inputProperties.put(ComputeGroup.BALANCE_TYPE,
BalanceTypeEnum.WITHOUT_WARMUP.getValue());
+ computeGroup.checkProperties(inputProperties);
+ }
+
+ @Test
+ public void testModifyPropertiesWithWarmupCacheAndExistingTimeout() throws
DdlException {
+ // 测试async_warmup类型,已存在timeout,不应该添加默认值
+ Map<String, String> inputProperties = Maps.newHashMap();
+ inputProperties.put(ComputeGroup.BALANCE_TYPE,
BalanceTypeEnum.ASYNC_WARMUP.getValue());
+ inputProperties.put(ComputeGroup.BALANCE_WARM_UP_TASK_TIMEOUT, "600");
+
+ // 先设置timeout到properties中
+
computeGroup.getProperties().put(ComputeGroup.BALANCE_WARM_UP_TASK_TIMEOUT,
"500");
+ String originalTimeout =
computeGroup.getProperties().get(ComputeGroup.BALANCE_WARM_UP_TASK_TIMEOUT);
+
+ computeGroup.modifyProperties(inputProperties);
+
+ // 验证timeout没有被修改, 这里的意思是用户已经设置过timeout了,就不应该被覆盖
+ Assertions.assertEquals(originalTimeout,
computeGroup.getProperties().get(ComputeGroup.BALANCE_WARM_UP_TASK_TIMEOUT));
+ }
+
+ @Test
+ public void testModifyPropertiesWithWarmupCacheAndNoTimeout() throws
DdlException {
+ // 测试async_warmup类型,不存在timeout,应该添加默认值
+ Map<String, String> inputProperties = Maps.newHashMap();
+ inputProperties.put(ComputeGroup.BALANCE_TYPE,
BalanceTypeEnum.ASYNC_WARMUP.getValue());
+
+ // 确保properties中没有timeout
+
computeGroup.getProperties().remove(ComputeGroup.BALANCE_WARM_UP_TASK_TIMEOUT);
+
Assertions.assertFalse(computeGroup.getProperties().containsKey(ComputeGroup.BALANCE_WARM_UP_TASK_TIMEOUT));
+
+ computeGroup.modifyProperties(inputProperties);
+ // 验证默认值被添加
+
Assertions.assertTrue(computeGroup.getProperties().containsKey(ComputeGroup.BALANCE_WARM_UP_TASK_TIMEOUT));
+
Assertions.assertEquals(String.valueOf(ComputeGroup.DEFAULT_BALANCE_WARM_UP_TASK_TIMEOUT),
+
computeGroup.getProperties().get(ComputeGroup.BALANCE_WARM_UP_TASK_TIMEOUT));
+ }
+
+ @Test
+ public void testModifyPropertiesWithNullBalanceType() throws DdlException {
+ // 测试null balance_type,不应该修改properties
+ Map<String, String> inputProperties = Maps.newHashMap();
+ inputProperties.put("other_property", "value");
+
+ Map<String, String> originalProperties =
Maps.newHashMap(computeGroup.getProperties());
+
+ computeGroup.modifyProperties(inputProperties);
+
+ // 验证properties没有被修改
+ Assertions.assertEquals(originalProperties,
computeGroup.getProperties());
+ }
+
+ @Test
+ public void testModifyPropertiesWithEmptyInput() throws DdlException {
+ // 测试空输入,不应该修改properties
+ Map<String, String> inputProperties = Maps.newHashMap();
+
+ Map<String, String> originalProperties =
Maps.newHashMap(computeGroup.getProperties());
+
+ computeGroup.modifyProperties(inputProperties);
+
+ // 验证properties没有被修改
+ Assertions.assertEquals(originalProperties,
computeGroup.getProperties());
+ }
+
+ @Test
+ public void testCheckPropertiesWithSyncCacheToWarmupCache() throws
DdlException {
+ computeGroup.getProperties().put(ComputeGroup.BALANCE_TYPE,
BalanceTypeEnum.SYNC_WARMUP.getValue());
+ Map<String, String> inputProperties = Maps.newHashMap();
+ inputProperties.put(ComputeGroup.BALANCE_TYPE,
BalanceTypeEnum.ASYNC_WARMUP.getValue());
+ computeGroup.checkProperties(inputProperties);
+ }
+
+ @Test
+ public void testValidateTimeoutRestrictionWithNoCurrentBalanceType()
throws DdlException {
+ // 测试当前没有设置balance_type的情况
+ computeGroup.getProperties().remove(ComputeGroup.BALANCE_TYPE);
+ Map<String, String> inputProperties = Maps.newHashMap();
+ inputProperties.put(ComputeGroup.BALANCE_WARM_UP_TASK_TIMEOUT, "500");
+ computeGroup.checkProperties(inputProperties);
+ }
+
+ @Test
+ public void testValidateTimeoutRestrictionWithCurrentWarmupCache() throws
DdlException {
+ // 测试当前balance_type是warmup_cache的情况
+ computeGroup.getProperties().put(ComputeGroup.BALANCE_TYPE,
BalanceTypeEnum.ASYNC_WARMUP.getValue());
+ Map<String, String> inputProperties = Maps.newHashMap();
+ inputProperties.put(ComputeGroup.BALANCE_WARM_UP_TASK_TIMEOUT, "500");
+ computeGroup.checkProperties(inputProperties);
+ }
+
+ @Test
+ public void testValidateTimeoutRestrictionWithDirectSwitchToWarmupCache()
throws DdlException {
+ // 测试从direct_switch转换到warmup_cache并设置timeout
+ computeGroup.getProperties().put(ComputeGroup.BALANCE_TYPE,
BalanceTypeEnum.WITHOUT_WARMUP.getValue());
+ Map<String, String> inputProperties = Maps.newHashMap();
+ inputProperties.put(ComputeGroup.BALANCE_TYPE,
BalanceTypeEnum.ASYNC_WARMUP.getValue());
+ inputProperties.put(ComputeGroup.BALANCE_WARM_UP_TASK_TIMEOUT, "500");
+ computeGroup.checkProperties(inputProperties);
+ }
+
+ @Test
+ public void
testValidateTimeoutRestrictionWithSyncCacheToWarmupCacheWithTimeout() throws
DdlException {
+ // 测试从sync_cache转换到warmup_cache并设置timeout
+ computeGroup.getProperties().put(ComputeGroup.BALANCE_TYPE,
BalanceTypeEnum.SYNC_WARMUP.getValue());
+ Map<String, String> inputProperties = Maps.newHashMap();
+ inputProperties.put(ComputeGroup.BALANCE_TYPE,
BalanceTypeEnum.ASYNC_WARMUP.getValue());
+ inputProperties.put(ComputeGroup.BALANCE_WARM_UP_TASK_TIMEOUT, "500");
+ computeGroup.checkProperties(inputProperties);
+ }
+
+ @Test
+ public void testValidateTimeoutRestrictionWithDirectSwitchAndOnlyTimeout()
{
+ // 测试当前是direct_switch,仅设置timeout应该失败
+ computeGroup.getProperties().put(ComputeGroup.BALANCE_TYPE,
BalanceTypeEnum.WITHOUT_WARMUP.getValue());
+ Map<String, String> inputProperties = Maps.newHashMap();
+ inputProperties.put(ComputeGroup.BALANCE_WARM_UP_TASK_TIMEOUT, "500");
+
+ Assertions.assertThrows(DdlException.class, () -> {
+ computeGroup.checkProperties(inputProperties);
+ });
+ }
+
+ @Test
+ public void testValidateTimeoutRestrictionWithSyncCacheAndOnlyTimeout() {
+ // 测试当前是sync_cache,仅设置timeout应该失败
+ computeGroup.getProperties().put(ComputeGroup.BALANCE_TYPE,
BalanceTypeEnum.SYNC_WARMUP.getValue());
+ Map<String, String> inputProperties = Maps.newHashMap();
+ inputProperties.put(ComputeGroup.BALANCE_WARM_UP_TASK_TIMEOUT, "500");
+
+ Assertions.assertThrows(DdlException.class, () -> {
+ computeGroup.checkProperties(inputProperties);
+ });
+ }
+
+ @Test
+ public void
testValidateTimeoutRestrictionWithDirectSwitchToSyncCacheAndTimeout() {
+ // 测试从direct_switch转换到sync_cache并设置timeout应该失败
+ computeGroup.getProperties().put(ComputeGroup.BALANCE_TYPE,
BalanceTypeEnum.WITHOUT_WARMUP.getValue());
+ Map<String, String> inputProperties = Maps.newHashMap();
+ inputProperties.put(ComputeGroup.BALANCE_TYPE,
BalanceTypeEnum.SYNC_WARMUP.getValue());
+ inputProperties.put(ComputeGroup.BALANCE_WARM_UP_TASK_TIMEOUT, "500");
+
+ Assertions.assertThrows(DdlException.class, () -> {
+ computeGroup.checkProperties(inputProperties);
+ });
+ }
+
+ @Test
+ public void
testValidateTimeoutRestrictionWithDirectSwitchToSameAndTimeout() {
+ // 测试从direct_switch转换到direct_switch并设置timeout应该失败
+ computeGroup.getProperties().put(ComputeGroup.BALANCE_TYPE,
BalanceTypeEnum.WITHOUT_WARMUP.getValue());
+ Map<String, String> inputProperties = Maps.newHashMap();
+ inputProperties.put(ComputeGroup.BALANCE_TYPE,
BalanceTypeEnum.WITHOUT_WARMUP.getValue());
+ inputProperties.put(ComputeGroup.BALANCE_WARM_UP_TASK_TIMEOUT, "500");
+
+ Assertions.assertThrows(DdlException.class, () -> {
+ computeGroup.checkProperties(inputProperties);
+ });
+ }
+
+ @Test
+ public void testValidateTimeoutRestrictionWithNoInputTimeout() throws
DdlException {
+ // 测试输入中没有timeout的情况
+ computeGroup.getProperties().put(ComputeGroup.BALANCE_TYPE,
BalanceTypeEnum.WITHOUT_WARMUP.getValue());
+ Map<String, String> inputProperties = Maps.newHashMap();
+ inputProperties.put("other_property", "value");
+ Assertions.assertThrows(DdlException.class, () -> {
+ // Property other_property is not supported
+ computeGroup.checkProperties(inputProperties);
+ });
+ }
+}
diff --git
a/fe/fe-core/src/test/java/org/apache/doris/nereids/trees/plans/commands/AlterComputeGroupCommandTest.java
b/fe/fe-core/src/test/java/org/apache/doris/nereids/trees/plans/commands/AlterComputeGroupCommandTest.java
new file mode 100644
index 00000000000..8ff8b60ffe0
--- /dev/null
+++
b/fe/fe-core/src/test/java/org/apache/doris/nereids/trees/plans/commands/AlterComputeGroupCommandTest.java
@@ -0,0 +1,243 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.nereids.trees.plans.commands;
+
+import org.apache.doris.backup.CatalogMocker;
+import org.apache.doris.catalog.Database;
+import org.apache.doris.catalog.Env;
+import org.apache.doris.cloud.catalog.ComputeGroup;
+import org.apache.doris.cloud.system.CloudSystemInfoService;
+import org.apache.doris.common.Config;
+import org.apache.doris.common.DdlException;
+import org.apache.doris.common.UserException;
+import org.apache.doris.datasource.InternalCatalog;
+import org.apache.doris.mysql.privilege.AccessControllerManager;
+import org.apache.doris.mysql.privilege.PrivPredicate;
+import org.apache.doris.qe.ConnectContext;
+
+import com.google.common.collect.Maps;
+import mockit.Expectations;
+import mockit.Mocked;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Test;
+
+import java.util.HashMap;
+import java.util.Map;
+
+public class AlterComputeGroupCommandTest {
+ private static final String internalCtl =
InternalCatalog.INTERNAL_CATALOG_NAME;
+ @Mocked
+ private Env env;
+ @Mocked
+ private AccessControllerManager accessControllerManager;
+ @Mocked
+ private InternalCatalog catalog;
+ @Mocked
+ private ConnectContext connectContext;
+ @Mocked
+ private CloudSystemInfoService cloudSystemInfoService;
+ @Mocked
+ private ComputeGroup computeGroup;
+ private Database db;
+
+ private void runBefore() throws Exception {
+ db = CatalogMocker.mockDb();
+ new Expectations() {
+ {
+ Env.getCurrentEnv();
+ minTimes = 0;
+ result = env;
+
+ env.getAccessManager();
+ minTimes = 0;
+ result = accessControllerManager;
+
+ ConnectContext.get();
+ minTimes = 0;
+ result = connectContext;
+
+ connectContext.isSkipAuth();
+ minTimes = 0;
+ result = true;
+
+ accessControllerManager.checkGlobalPriv(connectContext,
PrivPredicate.ADMIN);
+ minTimes = 0;
+ result = true;
+ }
+ };
+ }
+
+ @Test
+ public void testValidateNonCloudMode() throws Exception {
+ runBefore();
+ Config.deploy_mode = "non-cloud";
+ Map<String, String> properties = Maps.newHashMap();
+ properties.put("balance_type", "async_warmup");
+ AlterComputeGroupCommand command = new
AlterComputeGroupCommand("test_group", properties);
+ Assertions.assertThrows(UserException.class, () ->
command.validate(connectContext));
+ }
+
+ @Test
+ public void testValidateAuthFailed() throws Exception {
+ runBefore();
+ Config.deploy_mode = "cloud";
+ new Expectations() {
+ {
+ accessControllerManager.checkGlobalPriv(connectContext,
PrivPredicate.ADMIN);
+ minTimes = 0;
+ result = false;
+ }
+ };
+
+ Map<String, String> properties = Maps.newHashMap();
+ properties.put("balance_type", "async_warmup");
+ AlterComputeGroupCommand command = new
AlterComputeGroupCommand("test_group", properties);
+ Assertions.assertThrows(UserException.class, () ->
command.validate(connectContext));
+ }
+
+ @Test
+ public void testValidateEmptyProperties() throws Exception {
+ runBefore();
+ Config.deploy_mode = "cloud";
+
+ AlterComputeGroupCommand command = new
AlterComputeGroupCommand("test_group", null);
+ Assertions.assertThrows(UserException.class, () ->
command.validate(connectContext));
+
+ AlterComputeGroupCommand command2 = new
AlterComputeGroupCommand("test_group", new HashMap<>());
+ Assertions.assertThrows(UserException.class, () ->
command2.validate(connectContext));
+ }
+
+ @Test
+ public void testValidateComputeGroupNotExist() throws Exception {
+ runBefore();
+ Config.deploy_mode = "cloud";
+
+ new Expectations() {
+ {
+ env.getCurrentSystemInfo();
+ minTimes = 0;
+ result = cloudSystemInfoService;
+
+
cloudSystemInfoService.getComputeGroupByName("non_exist_group");
+ minTimes = 0;
+ result = null;
+ }
+ };
+
+ Map<String, String> properties = Maps.newHashMap();
+ properties.put("balance_type", "async_warmup");
+ AlterComputeGroupCommand command = new
AlterComputeGroupCommand("non_exist_group", properties);
+ Assertions.assertThrows(UserException.class, () ->
command.validate(connectContext));
+ }
+
+ @Test
+ public void testValidateVirtualComputeGroup() throws Exception {
+ runBefore();
+ Config.deploy_mode = "cloud";
+
+ new Expectations() {
+ {
+ env.getCurrentSystemInfo();
+ minTimes = 0;
+ result = cloudSystemInfoService;
+
+ cloudSystemInfoService.getComputeGroupByName("virtual_group");
+ minTimes = 0;
+ result = computeGroup;
+
+ computeGroup.isVirtual();
+ minTimes = 0;
+ result = true;
+ }
+ };
+
+ Map<String, String> properties = Maps.newHashMap();
+ properties.put("balance_type", "async_warmup");
+ AlterComputeGroupCommand command = new
AlterComputeGroupCommand("virtual_group", properties);
+ Assertions.assertThrows(UserException.class, () ->
command.validate(connectContext));
+ }
+
+ @Test
+ public void testValidateInvalidProperties() throws Exception {
+ runBefore();
+ Config.deploy_mode = "cloud";
+
+ new Expectations() {
+ {
+ env.getCurrentSystemInfo();
+ minTimes = 0;
+ result = cloudSystemInfoService;
+
+ cloudSystemInfoService.getComputeGroupByName("test_group");
+ minTimes = 0;
+ result = computeGroup;
+
+ computeGroup.isVirtual();
+ minTimes = 0;
+ result = false;
+
+ computeGroup.checkProperties((Map<String, String>) any);
+ minTimes = 0;
+ result = new DdlException("Invalid property");
+ }
+ };
+
+ Map<String, String> properties = Maps.newHashMap();
+ properties.put("invalid_property", "invalid_value");
+ AlterComputeGroupCommand command = new
AlterComputeGroupCommand("test_group", properties);
+ Assertions.assertThrows(UserException.class, () ->
command.validate(connectContext));
+ }
+
+ @Test
+ public void testValidateSuccess() throws Exception {
+ runBefore();
+ Config.deploy_mode = "cloud";
+
+ new Expectations() {
+ {
+ env.getCurrentSystemInfo();
+ minTimes = 0;
+ result = cloudSystemInfoService;
+
+ cloudSystemInfoService.getComputeGroupByName("test_group");
+ minTimes = 0;
+ result = computeGroup;
+
+ computeGroup.isVirtual();
+ minTimes = 0;
+ result = false;
+ }
+ };
+
+ new Expectations() {
+ {
+ computeGroup.checkProperties((Map<String, String>) any);
+ minTimes = 0;
+
+ computeGroup.modifyProperties((Map<String, String>) any);
+ minTimes = 0;
+ }
+ };
+
+ Map<String, String> properties = Maps.newHashMap();
+ properties.put("balance_type", "async_warmup");
+ properties.put("balance_warm_up_task_timeout", "300");
+ AlterComputeGroupCommand command = new
AlterComputeGroupCommand("test_group", properties);
+ Assertions.assertDoesNotThrow(() -> command.validate(connectContext));
+ }
+}
diff --git
a/fe/fe-core/src/test/java/org/apache/doris/nereids/trees/plans/commands/ShowComputeGroupTest.java
b/fe/fe-core/src/test/java/org/apache/doris/nereids/trees/plans/commands/ShowComputeGroupTest.java
new file mode 100644
index 00000000000..06588301872
--- /dev/null
+++
b/fe/fe-core/src/test/java/org/apache/doris/nereids/trees/plans/commands/ShowComputeGroupTest.java
@@ -0,0 +1,91 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.nereids.trees.plans.commands;
+
+import org.apache.doris.catalog.Column;
+import org.apache.doris.common.AnalysisException;
+import org.apache.doris.common.Config;
+import org.apache.doris.qe.ShowResultSetMetaData;
+import org.apache.doris.utframe.TestWithFeService;
+
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Test;
+
+import java.util.List;
+import java.util.stream.Collectors;
+
+public class ShowComputeGroupTest extends TestWithFeService {
+ @Override
+ protected void runBeforeAll() throws Exception {
+ createDatabase("test");
+ }
+
+ @Test
+ public void testShowComputeGroupsInCloudMode() throws Exception {
+ Config.deploy_mode = "cloud";
+ ShowClustersCommand command = new ShowClustersCommand(true);
+ ShowResultSetMetaData metaData = command.getMetaData();
+ Assertions.assertNotNull(metaData);
+ List<String> columnNames = metaData.getColumns().stream()
+ .map(Column::getName)
+ .collect(Collectors.toList());
+ Assertions.assertEquals(7, columnNames.size());
+ Assertions.assertEquals("Name", columnNames.get(0));
+ Assertions.assertEquals("IsCurrent", columnNames.get(1));
+ Assertions.assertEquals("Users", columnNames.get(2));
+ Assertions.assertEquals("BackendNum", columnNames.get(3));
+ Assertions.assertEquals("SubComputeGroups", columnNames.get(4));
+ Assertions.assertEquals("Policy", columnNames.get(5));
+ Assertions.assertEquals("Properties", columnNames.get(6));
+ }
+
+ @Test
+ public void testShowComputeGroupsInNonCloudMode() throws Exception {
+ Config.deploy_mode = "not-cloud";
+ ShowClustersCommand command = new ShowClustersCommand(true);
+ Assertions.assertThrows(AnalysisException.class, () -> {
+ command.doRun(connectContext, null);
+ });
+ }
+
+ @Test
+ public void testShowClustersInCloudMode() throws Exception {
+ ShowClustersCommand command = new ShowClustersCommand(false);
+ ShowResultSetMetaData metaData = command.getMetaData();
+ Assertions.assertNotNull(metaData);
+ List<String> columnNames = metaData.getColumns().stream()
+ .map(Column::getName).collect(Collectors.toList());
+ Assertions.assertEquals(7, columnNames.size());
+ Assertions.assertEquals("cluster", columnNames.get(0));
+ Assertions.assertEquals("is_current", columnNames.get(1));
+ Assertions.assertEquals("users", columnNames.get(2));
+ Assertions.assertEquals("backend_num", columnNames.get(3));
+ Assertions.assertEquals("sub_clusters", columnNames.get(4));
+ Assertions.assertEquals("policy", columnNames.get(5));
+ Assertions.assertEquals("properties", columnNames.get(6));
+ }
+
+ @Test
+ public void testShowClustersInNonCloudMode() throws Exception {
+ Config.deploy_mode = "not-cloud";
+ ShowClustersCommand command = new ShowClustersCommand(false);
+ Assertions.assertThrows(AnalysisException.class, () -> {
+ command.doRun(connectContext, null);
+ });
+ }
+}
diff --git a/gensrc/proto/cloud.proto b/gensrc/proto/cloud.proto
index 8e5794f4787..41322826eb2 100644
--- a/gensrc/proto/cloud.proto
+++ b/gensrc/proto/cloud.proto
@@ -182,6 +182,7 @@ message ClusterPB {
optional int64 mtime = 11;
repeated string cluster_names = 12; // clusters in virtual cluster
optional ClusterPolicy cluster_policy = 13; // virtual cluster policy
+ map<string, string> properties = 14; // clsuter additional properties
}
message NodeInfoPB {
@@ -1410,6 +1411,7 @@ message AlterClusterRequest {
UPDATE_CLUSTER_ENDPOINT = 9;
SET_CLUSTER_STATUS = 10;
ALTER_VCLUSTER_INFO = 11;
+ ALTER_PROPERTIES = 12;
}
optional string instance_id = 1;
optional string cloud_unique_id = 2; // For auth
diff --git
a/regression-test/suites/cloud_p0/balance/test_alter_compute_group_properties.groovy
b/regression-test/suites/cloud_p0/balance/test_alter_compute_group_properties.groovy
new file mode 100644
index 00000000000..e0db0c258e7
--- /dev/null
+++
b/regression-test/suites/cloud_p0/balance/test_alter_compute_group_properties.groovy
@@ -0,0 +1,92 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+import org.apache.doris.regression.suite.ClusterOptions
+import org.codehaus.groovy.runtime.IOGroovyMethods
+
+suite('test_alter_compute_group_properties', 'docker') {
+ if (!isCloudMode()) {
+ return;
+ }
+ def options = new ClusterOptions()
+ options.feConfigs += [
+ 'cloud_cluster_check_interval_second=1',
+ 'cloud_tablet_rebalancer_interval_second=1',
+ 'sys_log_verbose_modules=org',
+ ]
+ options.setFeNum(1)
+ options.setBeNum(1)
+ options.cloudMode = true
+ options.enableDebugPoints()
+
+ def findComputeGroup = { clusterName ->
+ def showComputeGroups = sql_return_maparray """SHOW COMPUTE GROUPS"""
+ log.info("SHOW COMPUTE GROUPS result: {}", showComputeGroups)
+ showComputeGroups.find { it.Name == clusterName }
+ }
+
+ def findCluster = { clusterName ->
+ def showCg = sql_return_maparray """SHOW CLUSTERS"""
+ log.info("SHOW CLUSTERS result: {}", showCg)
+ showCg.find { it.cluster == clusterName }
+ }
+
+ docker(options) {
+ String clusterName = "compute_cluster"
+ def showComputeGroup = findComputeGroup(clusterName)
+ def showclusters = findCluster(clusterName)
+
assertTrue(showComputeGroup.Properties.contains('balance_type=async_warmup,
balance_warm_up_task_timeout=300'))
+
assertTrue(showclusters.properties.contains('balance_type=async_warmup,
balance_warm_up_task_timeout=300'))
+ sql """ALTER COMPUTE GROUP $clusterName PROPERTIES
('balance_type'='without_warmup')"""
+ sleep(3 * 1000)
+ showComputeGroup = findComputeGroup(clusterName)
+ showclusters = findCluster(clusterName)
+
assertTrue(showComputeGroup.Properties.contains('balance_type=without_warmup'))
+
assertTrue(showclusters.properties.contains('balance_type=without_warmup'))
+ try {
+ // errCode = 2, detailMessage = Property
balance_warm_up_task_timeout cannot be set when current balance_type is
without_warmup. Only async_warmup type supports timeout setting.
+ sql """ALTER COMPUTE GROUP $clusterName PROPERTIES
('balance_warm_up_task_timeout'='6000')"""
+ } catch (Exception e) {
+ logger.info("exception: {}", e.getMessage())
+ assertTrue(e.getMessage().contains("Property
balance_warm_up_task_timeout cannot be set when current"))
+ }
+ sql """ALTER COMPUTE GROUP $clusterName PROPERTIES
('balance_type'='async_warmup', 'balance_warm_up_task_timeout'='6000')"""
+ sleep(3 * 1000)
+ showComputeGroup = findComputeGroup(clusterName)
+ showclusters = findCluster(clusterName)
+
assertTrue(showComputeGroup.Properties.contains('balance_type=async_warmup,
balance_warm_up_task_timeout=6000'))
+
assertTrue(showclusters.properties.contains('balance_type=async_warmup,
balance_warm_up_task_timeout=6000'))
+ sql """ALTER COMPUTE GROUP $clusterName PROPERTIES
('balance_type'='without_warmup')"""
+ sleep(3 * 1000)
+ showComputeGroup = findComputeGroup(clusterName)
+ showclusters = findCluster(clusterName)
+
assertTrue(showComputeGroup.Properties.contains('balance_type=without_warmup'))
+
assertTrue(showclusters.properties.contains('balance_type=without_warmup'))
+ sql """ALTER COMPUTE GROUP $clusterName PROPERTIES
('balance_type'='async_warmup')"""
+ sleep(3 * 1000)
+ showComputeGroup = findComputeGroup(clusterName)
+ showclusters = findCluster(clusterName)
+
assertTrue(showComputeGroup.Properties.contains('balance_type=async_warmup,
balance_warm_up_task_timeout=300'))
+
assertTrue(showclusters.properties.contains('balance_type=async_warmup,
balance_warm_up_task_timeout=300'))
+ sql """ALTER COMPUTE GROUP $clusterName PROPERTIES
('balance_type'='sync_warmup')"""
+ sleep(3 * 1000)
+ showComputeGroup = findComputeGroup(clusterName)
+ showclusters = findCluster(clusterName)
+
assertTrue(showComputeGroup.Properties.contains('balance_type=sync_warmup'))
+
assertTrue(showclusters.properties.contains('balance_type=sync_warmup'))
+ }
+}
diff --git
a/regression-test/suites/cloud_p0/balance/test_balance_use_compute_group_properties.groovy
b/regression-test/suites/cloud_p0/balance/test_balance_use_compute_group_properties.groovy
new file mode 100644
index 00000000000..f96119ec29b
--- /dev/null
+++
b/regression-test/suites/cloud_p0/balance/test_balance_use_compute_group_properties.groovy
@@ -0,0 +1,212 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+import org.apache.doris.regression.suite.ClusterOptions
+
+
+suite('test_balance_use_compute_group_properties', 'docker') {
+ if (!isCloudMode()) {
+ return;
+ }
+ def options = new ClusterOptions()
+ options.feConfigs += [
+ 'cloud_cluster_check_interval_second=1',
+ 'cloud_tablet_rebalancer_interval_second=1',
+ 'sys_log_verbose_modules=org',
+ 'heartbeat_interval_second=1',
+ 'rehash_tablet_after_be_dead_seconds=3600',
+ 'cloud_warm_up_for_rebalance_type=sync_warmup',
+ 'cloud_pre_heating_time_limit_sec=30'
+ ]
+ options.beConfigs += [
+ 'report_tablet_interval_seconds=1',
+ 'schedule_sync_tablets_interval_s=18000',
+ 'disable_auto_compaction=true',
+ 'sys_log_verbose_modules=*'
+ ]
+ options.setFeNum(1)
+ options.setBeNum(1)
+ options.cloudMode = true
+ options.enableDebugPoints()
+
+ def mergeDirs = { base, add ->
+ base + add.collectEntries { host, hashFiles ->
+ [(host): base[host] ? (base[host] + hashFiles) : hashFiles]
+ }
+ }
+
+ def global_config_cluster = "compute_cluster"
+ def without_warmup_cluster = "without_warmup"
+ def async_warmup_cluster = "async_warmup"
+ def sync_warmup_cluster = "sync_warmup"
+
+ def testCase = { table ->
+ def ms = cluster.getAllMetaservices().get(0)
+ def msHttpPort = ms.host + ":" + ms.httpPort
+
+ // alter each cluster different balance type
+ sql """ALTER COMPUTE GROUP $without_warmup_cluster PROPERTIES
('balance_type'='without_warmup')"""
+ sql """ALTER COMPUTE GROUP $async_warmup_cluster PROPERTIES
('balance_type'='async_warmup', 'balance_warm_up_task_timeout'='10')"""
+ sql """ALTER COMPUTE GROUP $sync_warmup_cluster PROPERTIES
('balance_type'='sync_warmup')"""
+
+ sql """CREATE TABLE $table (
+ `k1` int(11) NULL,
+ `v1` VARCHAR(2048)
+ )
+ DUPLICATE KEY(`k1`)
+ COMMENT 'OLAP'
+ DISTRIBUTED BY HASH(`k1`) BUCKETS 2
+ PROPERTIES (
+ "replication_num"="1"
+ );
+ """
+ sql """
+ insert into $table values (10, '1'), (20, '2')
+ """
+ sql """
+ insert into $table values (30, '3'), (40, '4')
+ """
+
+ def beforeBalanceEveryClusterCache = [:]
+
+ def clusterNameToBeIdx = [:]
+ clusterNameToBeIdx[global_config_cluster] = [1]
+ clusterNameToBeIdx[without_warmup_cluster] = [2]
+ clusterNameToBeIdx[async_warmup_cluster] = [3]
+ clusterNameToBeIdx[sync_warmup_cluster] = [4]
+
+ // generate primary tablet in each cluster
+ for (clusterName in [global_config_cluster, without_warmup_cluster,
async_warmup_cluster, sync_warmup_cluster]) {
+ sql """ use @$clusterName """
+ sql """ select * from $table """
+ def beIdxs = clusterNameToBeIdx[clusterName]
+ def bes = []
+ beIdxs.each { beIdx ->
+ def be = cluster.getBeByIndex(beIdx)
+ bes << be
+ }
+ logger.info("clusterName {} be idxs {}, bes {}", clusterName,
beIdxs, bes)
+
+ // before add be
+ def beforeGetFromFe = getTabletAndBeHostFromFe(table)
+ def beforeGetFromBe = getTabletAndBeHostFromBe(bes)
+ logger.info("before add be fe tablets {}, be tablets {}",
beforeGetFromFe, beforeGetFromBe)
+ // version 2
+ def beforeCacheDirVersion2 =
getTabletFileCacheDirFromBe(msHttpPort, table, 2)
+ logger.info("cache dir version 2 {}", beforeCacheDirVersion2)
+ // version 3
+ def beforeCacheDirVersion3 =
getTabletFileCacheDirFromBe(msHttpPort, table, 3)
+ logger.info("cache dir version 3 {}", beforeCacheDirVersion3)
+
+ def beforeWarmUpResult = sql_return_maparray """ADMIN SHOW REPLICA
DISTRIBUTION FROM $table"""
+ logger.info("before warm up result {}", beforeWarmUpResult)
+ def merged23CacheDir = [beforeCacheDirVersion2,
beforeCacheDirVersion3]
+ .inject([:]) { acc, m -> mergeDirs(acc, m) }
+ beforeBalanceEveryClusterCache[clusterName] = [beforeGetFromFe,
beforeGetFromBe, merged23CacheDir]
+ }
+ logger.info("before balance every cluster cache {}",
beforeBalanceEveryClusterCache)
+
+ // disable cloud balance
+ setFeConfig('enable_cloud_multi_replica', true)
+ cluster.addBackend(1, global_config_cluster)
+ cluster.addBackend(1, without_warmup_cluster)
+ cluster.addBackend(1, async_warmup_cluster)
+ cluster.addBackend(1, sync_warmup_cluster)
+
GetDebugPoint().enableDebugPointForAllBEs("CloudBackendService.check_warm_up_cache_async.return_task_false")
+ setFeConfig('enable_cloud_multi_replica', false)
+
+ clusterNameToBeIdx[global_config_cluster] = [1, 5]
+ clusterNameToBeIdx[without_warmup_cluster] = [2, 6]
+ clusterNameToBeIdx[async_warmup_cluster] = [3, 7]
+ clusterNameToBeIdx[sync_warmup_cluster] = [4, 8]
+
+ // sleep 11s, wait balance
+ // and sync_warmup cluster task 10s timeout
+ sleep(11 * 1000)
+
+ def afterBalanceEveryClusterCache = [:]
+
+ for (clusterName in [global_config_cluster, without_warmup_cluster,
async_warmup_cluster, sync_warmup_cluster]) {
+ sql """ use @$clusterName """
+ sql """ select * from $table """
+ def beIdxs = clusterNameToBeIdx[clusterName]
+ def bes = []
+ beIdxs.each { beIdx ->
+ def be = cluster.getBeByIndex(beIdx)
+ bes << be
+ }
+ logger.info("after add be clusterName {} be idxs {}, bes {}",
clusterName, beIdxs, bes)
+
+ // after add be
+ def afterGetFromFe = getTabletAndBeHostFromFe(table)
+ def afterGetFromBe = getTabletAndBeHostFromBe(bes)
+ // version 2
+ def afterCacheDirVersion2 =
getTabletFileCacheDirFromBe(msHttpPort, table, 2)
+ logger.info("cache dir version 2 {}", afterCacheDirVersion2)
+ // version 3
+ def afterCacheDirVersion3 =
getTabletFileCacheDirFromBe(msHttpPort, table, 3)
+ logger.info("cache dir version 3 {}", afterCacheDirVersion3)
+ def merged23CacheDir = [afterCacheDirVersion2,
afterCacheDirVersion3]
+ .inject([:]) { acc, m -> mergeDirs(acc, m) }
+ afterBalanceEveryClusterCache[clusterName] = [afterGetFromFe,
afterGetFromBe, merged23CacheDir]
+ logger.info("after add be clusterName {} fe tablets {}, be tablets
{}, cache dir {}", clusterName, afterGetFromFe, afterGetFromBe,
merged23CacheDir)
+ }
+ logger.info("after add be balance every cluster cache {}",
afterBalanceEveryClusterCache)
+
+ // assert first map keys
+ def assertFirstMapKeys = { clusterRet, expectedEqual ->
+ def firstMap = clusterRet[0]
+ def keys = firstMap.keySet().toList()
+ if (expectedEqual) {
+ assert firstMap[keys[0]] == firstMap[keys[1]]
+ } else {
+ assert firstMap[keys[0]] != firstMap[keys[1]]
+ }
+ }
+
+ // check afterBalanceEveryClusterCache
+ // fe config cloud_warm_up_for_rebalance_type=sync_warmup
+ def global_config_cluster_ret =
afterBalanceEveryClusterCache[global_config_cluster]
+ logger.info("global_config_cluster_ret {}", global_config_cluster_ret)
+ // fe tablets not changed
+ assertFirstMapKeys(global_config_cluster_ret, true)
+
+ def without_warmup_cluster_ret =
afterBalanceEveryClusterCache[without_warmup_cluster]
+ logger.info("without_warmup_cluster_ret {}",
without_warmup_cluster_ret)
+ // fe tablets has changed
+ assertFirstMapKeys(without_warmup_cluster_ret, false)
+
+ def async_warmup_cluster_ret =
afterBalanceEveryClusterCache[async_warmup_cluster]
+ logger.info("async_warmup_cluster_ret {}", async_warmup_cluster_ret)
+ // fe tablets has changed, due to task timeout
+ assertFirstMapKeys(async_warmup_cluster_ret, false)
+
+ def sync_warmup_cluster_ret =
afterBalanceEveryClusterCache[sync_warmup_cluster]
+ logger.info("sync_warmup_cluster_ret {}", sync_warmup_cluster_ret)
+ // fe tablets not changed
+ assertFirstMapKeys(sync_warmup_cluster_ret, true)
+
+ logger.info("success check after balance every cluster cache,
cluster's balance type is worked")
+ }
+
+ docker(options) {
+ cluster.addBackend(1, without_warmup_cluster)
+ cluster.addBackend(1, async_warmup_cluster)
+ cluster.addBackend(1, sync_warmup_cluster)
+ testCase("test_balance_warm_up_tbl")
+ }
+}
diff --git
a/regression-test/suites/cloud_p0/balance/test_balance_warm_up.groovy
b/regression-test/suites/cloud_p0/balance/test_balance_warm_up.groovy
index 7a18f22bb31..50476800d75 100644
--- a/regression-test/suites/cloud_p0/balance/test_balance_warm_up.groovy
+++ b/regression-test/suites/cloud_p0/balance/test_balance_warm_up.groovy
@@ -29,7 +29,7 @@ suite('test_balance_warm_up', 'docker') {
'sys_log_verbose_modules=org',
'heartbeat_interval_second=1',
'rehash_tablet_after_be_dead_seconds=3600',
- 'enable_cloud_warm_up_for_rebalance=true'
+ 'cloud_warm_up_for_rebalance_type=async_warmup'
]
options.beConfigs += [
'report_tablet_interval_seconds=1',
diff --git
a/regression-test/suites/cloud_p0/balance/test_balance_warm_up_sync_global_config.groovy
b/regression-test/suites/cloud_p0/balance/test_balance_warm_up_sync_global_config.groovy
new file mode 100644
index 00000000000..19c252fba85
--- /dev/null
+++
b/regression-test/suites/cloud_p0/balance/test_balance_warm_up_sync_global_config.groovy
@@ -0,0 +1,179 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+import org.apache.doris.regression.suite.ClusterOptions
+
+
+suite('test_balance_warm_up_sync_cache', 'docker') {
+ if (!isCloudMode()) {
+ return;
+ }
+ def options = new ClusterOptions()
+ options.feConfigs += [
+ 'cloud_cluster_check_interval_second=1',
+ 'cloud_tablet_rebalancer_interval_second=1',
+ 'sys_log_verbose_modules=org',
+ 'heartbeat_interval_second=1',
+ 'rehash_tablet_after_be_dead_seconds=3600',
+ 'cloud_warm_up_for_rebalance_type=sync_warmup',
+ 'cloud_pre_heating_time_limit_sec=30'
+ ]
+ options.beConfigs += [
+ 'report_tablet_interval_seconds=1',
+ 'schedule_sync_tablets_interval_s=18000',
+ 'disable_auto_compaction=true',
+ 'sys_log_verbose_modules=*'
+ ]
+ options.setFeNum(1)
+ options.setBeNum(1)
+ options.cloudMode = true
+ options.enableDebugPoints()
+
+ def mergeDirs = { base, add ->
+ base + add.collectEntries { host, hashFiles ->
+ [(host): base[host] ? (base[host] + hashFiles) : hashFiles]
+ }
+ }
+
+ def testCase = { table ->
+ def ms = cluster.getAllMetaservices().get(0)
+ def msHttpPort = ms.host + ":" + ms.httpPort
+ sql """CREATE TABLE $table (
+ `k1` int(11) NULL,
+ `v1` VARCHAR(2048)
+ )
+ DUPLICATE KEY(`k1`)
+ COMMENT 'OLAP'
+ DISTRIBUTED BY HASH(`k1`) BUCKETS 2
+ PROPERTIES (
+ "replication_num"="1"
+ );
+ """
+ sql """
+ insert into $table values (10, '1'), (20, '2')
+ """
+ sql """
+ insert into $table values (30, '3'), (40, '4')
+ """
+
+ // before add be
+ def beforeGetFromFe = getTabletAndBeHostFromFe(table)
+ def beforeGetFromBe =
getTabletAndBeHostFromBe(cluster.getAllBackends())
+ // version 2
+ def beforeCacheDirVersion2 = getTabletFileCacheDirFromBe(msHttpPort,
table, 2)
+ logger.info("cache dir version 2 {}", beforeCacheDirVersion2)
+ // version 3
+ def beforeCacheDirVersion3 = getTabletFileCacheDirFromBe(msHttpPort,
table, 3)
+ logger.info("cache dir version 3 {}", beforeCacheDirVersion3)
+
+ def beforeWarmUpResult = sql_return_maparray """ADMIN SHOW REPLICA
DISTRIBUTION FROM $table"""
+ logger.info("before warm up result {}", beforeWarmUpResult)
+
+ // disable cloud balance
+ setFeConfig('enable_cloud_multi_replica', true)
+ cluster.addBackend(1, "compute_cluster")
+
GetDebugPoint().enableDebugPointForAllBEs("CloudBackendService.check_warm_up_cache_async.return_task_false")
+ setFeConfig('enable_cloud_multi_replica', false)
+
+ sleep(5 * 1000)
+ sql """
+ insert into $table values (50, '4'), (60, '6')
+ """
+ // version 4, new rs after warm up task
+ def beforeCacheDirVersion4 = getTabletFileCacheDirFromBe(msHttpPort,
table, 4)
+ logger.info("cache dir version 4 {}", beforeCacheDirVersion4)
+ def afterMerged23CacheDir = [beforeCacheDirVersion2,
beforeCacheDirVersion3, beforeCacheDirVersion4]
+ .inject([:]) { acc, m -> mergeDirs(acc, m) }
+ logger.info("after version 4 fe tablets {}, be tablets {}, cache dir
{}", beforeGetFromFe, beforeGetFromBe, afterMerged23CacheDir)
+
+ // after cloud_pre_heating_time_limit_sec = 30s
+ sleep(40 * 1000)
+ // check tablet still in old be
+ def beforeWarmUpTaskOkDis = sql_return_maparray """ADMIN SHOW REPLICA
DISTRIBUTION FROM $table"""
+ logger.info("before warm up dis result {}", beforeWarmUpTaskOkDis)
+ assert beforeWarmUpTaskOkDis.any { row ->
+ Integer.valueOf((String) row.ReplicaNum) == 2
+ }
+
+
GetDebugPoint().disableDebugPointForAllBEs("CloudBackendService.check_warm_up_cache_async.return_task_false")
+ def oldBe = sql_return_maparray('show backends').get(0)
+ def newAddBe = sql_return_maparray('show backends').get(1)
+ // balance tablet
+ awaitUntil(500) {
+ def afterWarmUpTaskOkResult = sql_return_maparray """ADMIN SHOW
REPLICA DISTRIBUTION FROM $table"""
+ logger.info("after warm up result {}", afterWarmUpTaskOkResult)
+ afterWarmUpTaskOkResult.any { row ->
+ Integer.valueOf((String) row.ReplicaNum) == 1
+ }
+ }
+
+ // from be1 -> be2, warm up this tablet
+ // after add be
+ def afterGetFromFe = getTabletAndBeHostFromFe(table)
+ def afterGetFromBe = getTabletAndBeHostFromBe(cluster.getAllBackends())
+ // version 2
+ def afterCacheDirVersion2 = getTabletFileCacheDirFromBe(msHttpPort,
table, 2)
+ logger.info("after cache dir version 2 {}", afterCacheDirVersion2)
+ // version 3
+ def afterCacheDirVersion3 = getTabletFileCacheDirFromBe(msHttpPort,
table, 3)
+ logger.info("after cache dir version 3 {}", afterCacheDirVersion3)
+ sleep(5 * 1000)
+ // version 4
+ def afterCacheDirVersion4 = getTabletFileCacheDirFromBe(msHttpPort,
table, 4)
+ logger.info("after cache dir version 4 {}", afterCacheDirVersion4)
+
+ def afterMergedCacheDir = [afterCacheDirVersion2,
afterCacheDirVersion3, afterCacheDirVersion4]
+ .inject([:]) { acc, m -> mergeDirs(acc, m) }
+
+ logger.info("after fe tablets {}, be tablets {}, cache dir {}",
afterGetFromFe, afterGetFromBe, afterMergedCacheDir)
+ def newAddBeCacheDir = afterMergedCacheDir.get(newAddBe.Host)
+ logger.info("new add be cache dir {}", newAddBeCacheDir)
+ assert newAddBeCacheDir.size() != 0
+ assert
afterMerged23CacheDir[oldBe.Host].containsAll(afterMergedCacheDir[newAddBe.Host])
+
+ def be = cluster.getBeByBackendId(newAddBe.BackendId.toLong())
+ def dataPath = new File("${be.path}/storage/file_cache")
+ logger.info("Checking file_cache directory: {}", dataPath.absolutePath)
+ logger.info("Directory exists: {}", dataPath.exists())
+
+ def subDirs = []
+
+ def collectDirs
+ collectDirs = { File dir ->
+ if (dir.exists()) {
+ dir.eachDir { subDir ->
+ logger.info("Found subdir: {}", subDir.name)
+ subDirs << subDir.name
+ collectDirs(subDir)
+ }
+ }
+ }
+
+ collectDirs(dataPath)
+ logger.info("BE {} file_cache subdirs: {}", newAddBe.Host, subDirs)
+
+ newAddBeCacheDir.each { hashFile ->
+ assertTrue(subDirs.any { subDir -> subDir.startsWith(hashFile) },
+ "Expected cache file pattern ${hashFile} not found in BE
${newAddBe.Host}'s file_cache directory. " +
+ "Available subdirs: ${subDirs}")
+ }
+ }
+
+ docker(options) {
+ testCase("test_balance_warm_up_sync_tbl")
+ }
+}
diff --git
a/regression-test/suites/cloud_p0/balance/test_balance_warm_up_task_abnormal.groovy
b/regression-test/suites/cloud_p0/balance/test_balance_warm_up_task_abnormal.groovy
new file mode 100644
index 00000000000..0938126d61c
--- /dev/null
+++
b/regression-test/suites/cloud_p0/balance/test_balance_warm_up_task_abnormal.groovy
@@ -0,0 +1,121 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+import org.apache.doris.regression.suite.ClusterOptions
+
+
+suite('test_balance_warm_up_task_abnormal', 'docker') {
+ if (!isCloudMode()) {
+ return;
+ }
+ def options = new ClusterOptions()
+ options.feConfigs += [
+ 'cloud_cluster_check_interval_second=1',
+ 'cloud_tablet_rebalancer_interval_second=1',
+ 'sys_log_verbose_modules=org',
+ 'heartbeat_interval_second=1',
+ 'rehash_tablet_after_be_dead_seconds=3600',
+ 'cloud_warm_up_for_rebalance_type=sync_warmup',
+ 'cloud_pre_heating_time_limit_sec=10'
+ ]
+ options.beConfigs += [
+ 'report_tablet_interval_seconds=1',
+ 'schedule_sync_tablets_interval_s=18000',
+ 'disable_auto_compaction=true',
+ 'sys_log_verbose_modules=*'
+ ]
+ options.setFeNum(1)
+ options.setBeNum(1)
+ options.cloudMode = true
+ options.enableDebugPoints()
+
+ def mergeDirs = { base, add ->
+ base + add.collectEntries { host, hashFiles ->
+ [(host): base[host] ? (base[host] + hashFiles) : hashFiles]
+ }
+ }
+
+ def testCase = { table ->
+ def ms = cluster.getAllMetaservices().get(0)
+ def msHttpPort = ms.host + ":" + ms.httpPort
+ sql """CREATE TABLE $table (
+ `k1` int(11) NULL,
+ `v1` VARCHAR(2048)
+ )
+ DUPLICATE KEY(`k1`)
+ COMMENT 'OLAP'
+ DISTRIBUTED BY HASH(`k1`) BUCKETS 2
+ PROPERTIES (
+ "replication_num"="1"
+ );
+ """
+ sql """
+ insert into $table values (10, '1'), (20, '2')
+ """
+ sql """
+ insert into $table values (30, '3'), (40, '4')
+ """
+
+ // before add be
+ def beforeGetFromFe = getTabletAndBeHostFromFe(table)
+ def beforeGetFromBe =
getTabletAndBeHostFromBe(cluster.getAllBackends())
+ // version 2
+ def beforeCacheDirVersion2 = getTabletFileCacheDirFromBe(msHttpPort,
table, 2)
+ logger.info("cache dir version 2 {}", beforeCacheDirVersion2)
+ // version 3
+ def beforeCacheDirVersion3 = getTabletFileCacheDirFromBe(msHttpPort,
table, 3)
+ logger.info("cache dir version 3 {}", beforeCacheDirVersion3)
+
+ def beforeWarmUpResult = sql_return_maparray """ADMIN SHOW REPLICA
DISTRIBUTION FROM $table"""
+ logger.info("before warm up result {}", beforeWarmUpResult)
+ assert beforeWarmUpResult.any { row ->
+ Integer.valueOf((String) row.ReplicaNum) == 2
+ }
+
+ // disable cloud balance
+ setFeConfig('enable_cloud_multi_replica', true)
+ cluster.addBackend(1, "compute_cluster")
+ // sync warm up task always return false
+
GetDebugPoint().enableDebugPointForAllBEs("CloudBackendService.check_warm_up_cache_async.return_task_false")
+ setFeConfig('enable_cloud_multi_replica', false)
+
+ // wait for some time to make sure warm up task is processed, but
mapping is not changed
+ sleep(15 * 1000)
+ // check mapping is not changed
+ def afterAddBeResult = sql_return_maparray """ADMIN SHOW REPLICA
DISTRIBUTION FROM $table"""
+ logger.info("after add be result {}", afterAddBeResult)
+ // two be, but 2 replica num is still mapping in old be
+ assert afterAddBeResult.any { row ->
+ Integer.valueOf((String) row.ReplicaNum) == 2
+ }
+
+ // test recover from abnormal
+ sql """ALTER COMPUTE GROUP compute_cluster PROPERTIES
('balance_type'='without_warmup')"""
+ sleep(5 * 1000)
+
+ def afterAlterResult = sql_return_maparray """ADMIN SHOW REPLICA
DISTRIBUTION FROM $table"""
+ logger.info("after alter balance policy result {}", afterAlterResult)
+ // now mapping is changed to 1 replica in each be
+ assert afterAlterResult.any { row ->
+ Integer.valueOf((String) row.ReplicaNum) == 1
+ }
+ }
+
+ docker(options) {
+ testCase("test_balance_warm_up_task_abnormal_tbl")
+ }
+}
diff --git
a/regression-test/suites/cloud_p0/balance/test_peer_read_async_warmup.groovy
b/regression-test/suites/cloud_p0/balance/test_peer_read_async_warmup.groovy
new file mode 100644
index 00000000000..8ce71606542
--- /dev/null
+++ b/regression-test/suites/cloud_p0/balance/test_peer_read_async_warmup.groovy
@@ -0,0 +1,169 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+import org.apache.doris.regression.suite.ClusterOptions
+
+
+suite('test_peer_read_async_warmup', 'docker') {
+ if (!isCloudMode()) {
+ return;
+ }
+ def options = new ClusterOptions()
+ options.feConfigs += [
+ 'cloud_cluster_check_interval_second=1',
+ 'cloud_tablet_rebalancer_interval_second=1',
+ 'sys_log_verbose_modules=org',
+ 'heartbeat_interval_second=1',
+ 'rehash_tablet_after_be_dead_seconds=3600',
+ 'cloud_warm_up_for_rebalance_type=peer_read_async_warmup',
+ // disable Auto Analysis Job Executor
+ 'auto_check_statistics_in_minutes=60',
+ ]
+ options.beConfigs += [
+ 'report_tablet_interval_seconds=1',
+ 'schedule_sync_tablets_interval_s=18000',
+ 'disable_auto_compaction=true',
+ 'sys_log_verbose_modules=*',
+ ]
+ options.setFeNum(1)
+ options.setBeNum(1)
+ options.cloudMode = true
+ options.enableDebugPoints()
+
+ def mergeDirs = { base, add ->
+ base + add.collectEntries { host, hashFiles ->
+ [(host): base[host] ? (base[host] + hashFiles) : hashFiles]
+ }
+ }
+
+ def getBrpcMetrics = {ip, port, name ->
+ def url = "http://${ip}:${port}/brpc_metrics"
+ def metrics = new URL(url).text
+ def matcher = metrics =~ ~"${name}\\s+(\\d+)"
+ if (matcher.find()) {
+ return matcher[0][1] as long
+ } else {
+ throw new RuntimeException("${name} not found for ${ip}:${port}")
+ }
+ }
+
+ def testCase = { table ->
+ def ms = cluster.getAllMetaservices().get(0)
+ def msHttpPort = ms.host + ":" + ms.httpPort
+ sql """CREATE TABLE $table (
+ `k1` int(11) NULL,
+ `v1` VARCHAR(2048)
+ )
+ DUPLICATE KEY(`k1`)
+ COMMENT 'OLAP'
+ DISTRIBUTED BY HASH(`k1`) BUCKETS 2
+ PROPERTIES (
+ "replication_num"="1"
+ );
+ """
+ sql """
+ insert into $table values (10, '1'), (20, '2')
+ """
+ sql """
+ insert into $table values (30, '3'), (40, '4')
+ """
+
+ // before add be
+ def beforeGetFromFe = getTabletAndBeHostFromFe(table)
+ def beforeGetFromBe =
getTabletAndBeHostFromBe(cluster.getAllBackends())
+ // version 2
+ def beforeCacheDirVersion2 = getTabletFileCacheDirFromBe(msHttpPort,
table, 2)
+ logger.info("cache dir version 2 {}", beforeCacheDirVersion2)
+ // version 3
+ def beforeCacheDirVersion3 = getTabletFileCacheDirFromBe(msHttpPort,
table, 3)
+ logger.info("cache dir version 3 {}", beforeCacheDirVersion3)
+
+ def beforeWarmUpResult = sql_return_maparray """ADMIN SHOW REPLICA
DISTRIBUTION FROM $table"""
+ logger.info("before warm up result {}", beforeWarmUpResult)
+
+ // disable cloud balance
+ setFeConfig('enable_cloud_multi_replica', true)
+ cluster.addBackend(1, "compute_cluster")
+
GetDebugPoint().enableDebugPointForAllBEs("FileCacheBlockDownloader::download_segment_file_sleep",
[sleep_time: 50])
+ setFeConfig('enable_cloud_multi_replica', false)
+ awaitUntil(500) {
+ def afterRebalanceResult = sql_return_maparray """ADMIN SHOW
REPLICA DISTRIBUTION FROM $table"""
+ logger.info("after rebalance result {}", afterRebalanceResult)
+ afterRebalanceResult.any { row ->
+ Integer.valueOf((String) row.ReplicaNum) == 1
+ }
+ }
+
+ sql """
+ insert into $table values (50, '4'), (60, '6')
+ """
+ // version 4, in new be, but not in old be
+ def beforeCacheDirVersion4 = getTabletFileCacheDirFromBe(msHttpPort,
table, 4)
+ logger.info("cache dir version 4 {}", beforeCacheDirVersion4)
+
+ sql """
+ insert into $table values (70, '7'), (80, '8')
+ """
+ // version 5, in new be, but not in old be
+ def beforeCacheDirVersion5 = getTabletFileCacheDirFromBe(msHttpPort,
table, 5)
+ logger.info("cache dir version 5 {}", beforeCacheDirVersion5)
+
+ def afterMerged2345CacheDir = [beforeCacheDirVersion2,
beforeCacheDirVersion3, beforeCacheDirVersion4, beforeCacheDirVersion5]
+ .inject([:]) { acc, m -> mergeDirs(acc, m) }
+ logger.info("after version 2,3,4,5 fe tablets {}, be tablets {}, cache
dir {}", beforeGetFromFe, beforeGetFromBe, afterMerged2345CacheDir)
+
+ def oldBe = sql_return_maparray('show backends').get(0)
+ def newAddBe = sql_return_maparray('show backends').get(1)
+
+ def newAddBeCacheDir = afterMerged2345CacheDir.get(newAddBe.Host)
+ logger.info("new add be cache dir {}", newAddBeCacheDir)
+ // version 4, 5
+ assertTrue(newAddBeCacheDir.size() == 2, "new add be should have
version 4,5 cache file")
+ // warm up task blocked by debug point, so old be should not have
version 4,5 cache file
+
assertFalse(afterMerged2345CacheDir[oldBe.Host].containsAll(newAddBeCacheDir),
"old be should not have version 4,5 cache file")
+
+ // The query triggers reading the file cache from the peer
+ profile("test_peer_read_async_warmup_profile") {
+ sql """ set enable_profile = true;"""
+ sql """ set profile_level = 2;"""
+ run {
+ sql """/* test_peer_read_async_warmup_profile */ select * from
$table"""
+ sleep(1000)
+ }
+
+ check { profileString, exception ->
+ log.info(profileString)
+ // Use a regular expression to match the numeric value inside
parentheses after "NumPeerIOTotal:"
+ def matcher = (profileString =~ /- NumPeerIOTotal:\s+(\d+)/)
+ def total = 0
+ while (matcher.find()) {
+ total += matcher.group(1).toInteger()
+ logger.info("NumPeerIOTotal: {}", matcher.group(1))
+ }
+ assertTrue(total > 0)
+ }
+ }
+
+ // peer read cache, so it should read version 2,3 cache file from old
be, not s3
+ assertTrue(0 != getBrpcMetrics(newAddBe.Host, newAddBe.BrpcPort,
"cached_remote_reader_peer_read"), "new add be should have peer read cache")
+ assertTrue(0 == getBrpcMetrics(newAddBe.Host, newAddBe.BrpcPort,
"cached_remote_reader_s3_read"), "new add be should not have s3 read cache")
+ }
+
+ docker(options) {
+ testCase("test_peer_read_async_warmup_tbl")
+ }
+}
diff --git
a/regression-test/suites/cloud_p0/multi_cluster/test_warmup_rebalance.groovy
b/regression-test/suites/cloud_p0/balance/test_warmup_rebalance.groovy
similarity index 98%
rename from
regression-test/suites/cloud_p0/multi_cluster/test_warmup_rebalance.groovy
rename to regression-test/suites/cloud_p0/balance/test_warmup_rebalance.groovy
index aa35a70e121..c0dc2f9747c 100644
--- a/regression-test/suites/cloud_p0/multi_cluster/test_warmup_rebalance.groovy
+++ b/regression-test/suites/cloud_p0/balance/test_warmup_rebalance.groovy
@@ -25,7 +25,7 @@ suite('test_warmup_rebalance_in_cloud', 'multi_cluster,
docker') {
def options = new ClusterOptions()
options.feConfigs += [
'cloud_cluster_check_interval_second=1',
- 'enable_cloud_warm_up_for_rebalance=true',
+ 'cloud_warm_up_for_rebalance_type=async_warmup',
'cloud_tablet_rebalancer_interval_second=1',
'cloud_balance_tablet_percent_per_run=0.5',
'sys_log_verbose_modules=org',
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]