This is an automated email from the ASF dual-hosted git repository.
wangbo pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new f864a9caadf [Fix] Some fix for workload group (#50577)
f864a9caadf is described below
commit f864a9caadfb78e9171f6836b344d96f5e024870
Author: wangbo <[email protected]>
AuthorDate: Fri May 9 11:00:06 2025 +0800
[Fix] Some fix for workload group (#50577)
---
.../resource/computegroup/ComputeGroupMgr.java | 2 +-
.../workloadgroup/BindWgToComputeGroupThread.java | 46 ++++++++++++++++-----
.../resource/workloadgroup/WorkloadGroup.java | 15 +++++++
.../resource/workloadgroup/WorkloadGroupMgr.java | 4 +-
.../apache/doris/catalog/CreateEncryptKeyTest.java | 2 +-
.../apache/doris/utframe/TestWithFeService.java | 2 +-
.../data/workload_manager_p0/test_curd_wlg.out | Bin 3559 -> 3479 bytes
.../workload_manager_p0/test_curd_wlg.groovy | 22 +++++++---
8 files changed, 70 insertions(+), 23 deletions(-)
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/resource/computegroup/ComputeGroupMgr.java
b/fe/fe-core/src/main/java/org/apache/doris/resource/computegroup/ComputeGroupMgr.java
index c350f48db32..608dc87508f 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/resource/computegroup/ComputeGroupMgr.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/resource/computegroup/ComputeGroupMgr.java
@@ -46,7 +46,7 @@ public class ComputeGroupMgr {
}
return new CloudComputeGroup(clusterId, name,
cloudSystemInfoService);
} else {
- return new ComputeGroup("", name, systemInfoService);
+ return new ComputeGroup(name, name, systemInfoService);
}
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/resource/workloadgroup/BindWgToComputeGroupThread.java
b/fe/fe-core/src/main/java/org/apache/doris/resource/workloadgroup/BindWgToComputeGroupThread.java
index 342eb858fec..0862f566c0b 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/resource/workloadgroup/BindWgToComputeGroupThread.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/resource/workloadgroup/BindWgToComputeGroupThread.java
@@ -18,11 +18,14 @@
package org.apache.doris.resource.workloadgroup;
import org.apache.doris.catalog.Env;
+import org.apache.doris.cloud.system.CloudSystemInfoService;
import org.apache.doris.common.Config;
import org.apache.doris.common.FeConstants;
import org.apache.doris.ha.FrontendNodeType;
+import org.apache.doris.resource.Tag;
import org.apache.doris.resource.computegroup.ComputeGroupMgr;
+import org.apache.commons.lang3.StringUtils;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
@@ -58,15 +61,44 @@ public class BindWgToComputeGroupThread extends Thread {
return;
}
try {
+ // 1 wait catalog ready
waitCatalogReady();
-
Env.getCurrentEnv().getWorkloadGroupMgr().tryCreateNormalWorkloadGroup();
- createNewComputeGroup();
+
+ // 2 wait backend ready
+ Set<String> cgIdSet = waitAllBackendReady();
+
+ // 3 try create default workload group
+ String defaultCg = Tag.DEFAULT_BACKEND_TAG.value;
+ if (Config.isCloudMode()) {
+ defaultCg = ((CloudSystemInfoService)
Env.getCurrentEnv().getClusterInfo()).getCloudClusterIdByName(
+ Tag.VALUE_DEFAULT_COMPUTE_GROUP_NAME);
+ }
+ if (!StringUtils.isEmpty(defaultCg)) {
+
Env.getCurrentEnv().getWorkloadGroupMgr().tryCreateNormalWorkloadGroup(defaultCg);
+ } else {
+ LOG.warn("[init_wg]can not find default compute group.");
+ }
+
+ // 4 try create new workload group
+ createNewWorkloadGroup(cgIdSet);
} catch (Throwable t) {
LOG.info("[init_wg]Error happens when drop old workload group, ",
t);
}
}
- private void createNewComputeGroup() throws InterruptedException {
+ private Set<String> waitAllBackendReady() throws InterruptedException {
+ ComputeGroupMgr cgMgr = Env.getCurrentEnv().getComputeGroupMgr();
+ Set<String> idSet = cgMgr.getAllComputeGroupIds();
+ while (idSet.size() == 0) {
+ LOG.info("[init_wg]Not get any backends, sleep");
+ Thread.sleep(Config.resource_not_ready_sleep_seconds * 1000);
+ idSet = cgMgr.getAllComputeGroupIds();
+ }
+ LOG.info("[init_wg]Get cgs from backend, {}", String.join(",", idSet));
+ return idSet;
+ }
+
+ private void createNewWorkloadGroup(Set<String> idSet) {
WorkloadGroupMgr wgMgr = Env.getCurrentEnv().getWorkloadGroupMgr();
LOG.info("[init_wg] print current cg before, id map:{}, name map : {}",
wgMgr.getIdToWorkloadGroup(),
@@ -77,14 +109,6 @@ public class BindWgToComputeGroupThread extends Thread {
return;
}
- ComputeGroupMgr cgMgr = Env.getCurrentEnv().getComputeGroupMgr();
- Set<String> idSet = cgMgr.getAllComputeGroupIds();
- while (idSet.size() == 0) {
- LOG.info("[init_wg]Not get any backends, sleep");
- Thread.sleep(Config.resource_not_ready_sleep_seconds * 1000);
- idSet = cgMgr.getAllComputeGroupIds();
- }
- LOG.info("[init_wg]Get cgs from backend, {}", String.join(",", idSet));
for (WorkloadGroup wg : oldWgList) {
wgMgr.bindWorkloadGroupToComputeGroup(idSet, wg);
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/resource/workloadgroup/WorkloadGroup.java
b/fe/fe-core/src/main/java/org/apache/doris/resource/workloadgroup/WorkloadGroup.java
index 5cf893fc004..e7d2c271930 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/resource/workloadgroup/WorkloadGroup.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/resource/workloadgroup/WorkloadGroup.java
@@ -18,6 +18,7 @@
package org.apache.doris.resource.workloadgroup;
import org.apache.doris.catalog.Env;
+import org.apache.doris.cloud.system.CloudSystemInfoService;
import org.apache.doris.common.Config;
import org.apache.doris.common.DdlException;
import org.apache.doris.common.Pair;
@@ -575,6 +576,20 @@ public class WorkloadGroup implements Writable,
GsonPostProcessable {
row.add(queryQueueDetail == null ? "0" :
String.valueOf(queryQueueDetail.first));
} else if (QueryQueue.WAITING_QUERY_NUM.equals(key)) {
row.add(queryQueueDetail == null ? "0" :
String.valueOf(queryQueueDetail.second));
+ } else if (COMPUTE_GROUP.equals(key)) {
+ String val = properties.get(key);
+ if (!StringUtils.isEmpty(val) && Config.isCloudMode()) {
+ try {
+ String cgName = ((CloudSystemInfoService)
Env.getCurrentSystemInfo()).getClusterNameByClusterId(
+ val);
+ if (!StringUtils.isEmpty(cgName)) {
+ val = cgName;
+ }
+ } catch (Throwable t) {
+ LOG.debug("get compute group failed, ", t);
+ }
+ }
+ row.add(val);
} else {
String val = properties.get(key);
if (StringUtils.isEmpty(val)) {
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/resource/workloadgroup/WorkloadGroupMgr.java
b/fe/fe-core/src/main/java/org/apache/doris/resource/workloadgroup/WorkloadGroupMgr.java
index 160c88b91b5..b2b35ee69e3 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/resource/workloadgroup/WorkloadGroupMgr.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/resource/workloadgroup/WorkloadGroupMgr.java
@@ -620,14 +620,12 @@ public class WorkloadGroupMgr extends MasterDaemon
implements Writable, GsonPost
return oldWgList;
}
- public void tryCreateNormalWorkloadGroup() {
+ public void tryCreateNormalWorkloadGroup(String defaultCgName) {
writeLock();
try {
LOG.info("[init_wg] before create normal wg, id map: {}, name map:
{}", idToWorkloadGroup,
keyToWorkloadGroup);
if (idToWorkloadGroup.isEmpty()) {
- String defaultCgName = Config.isCloudMode() ?
Tag.VALUE_DEFAULT_COMPUTE_GROUP_NAME
- : Tag.VALUE_DEFAULT_TAG;
Map<String, String> properties = Maps.newHashMap();
properties.put(WorkloadGroup.ENABLE_MEMORY_OVERCOMMIT, "true");
properties.put(WorkloadGroup.COMPUTE_GROUP, defaultCgName);
diff --git
a/fe/fe-core/src/test/java/org/apache/doris/catalog/CreateEncryptKeyTest.java
b/fe/fe-core/src/test/java/org/apache/doris/catalog/CreateEncryptKeyTest.java
index c062a52d17d..86e51d71bd2 100644
---
a/fe/fe-core/src/test/java/org/apache/doris/catalog/CreateEncryptKeyTest.java
+++
b/fe/fe-core/src/test/java/org/apache/doris/catalog/CreateEncryptKeyTest.java
@@ -58,7 +58,7 @@ public class CreateEncryptKeyTest {
@Test
public void test() throws Exception {
ConnectContext ctx = UtFrameUtils.createDefaultCtx();
-
Env.getCurrentEnv().getWorkloadGroupMgr().tryCreateNormalWorkloadGroup();
+
Env.getCurrentEnv().getWorkloadGroupMgr().tryCreateNormalWorkloadGroup("default");
// create database db1
String createDbStmtStr = "create database db1;";
diff --git
a/fe/fe-core/src/test/java/org/apache/doris/utframe/TestWithFeService.java
b/fe/fe-core/src/test/java/org/apache/doris/utframe/TestWithFeService.java
index 98811023214..5e09c3e4be4 100644
--- a/fe/fe-core/src/test/java/org/apache/doris/utframe/TestWithFeService.java
+++ b/fe/fe-core/src/test/java/org/apache/doris/utframe/TestWithFeService.java
@@ -164,7 +164,7 @@ public abstract class TestWithFeService {
beforeCluster();
createDorisCluster();
runBeforeAll();
-
Env.getCurrentEnv().getWorkloadGroupMgr().tryCreateNormalWorkloadGroup();
+
Env.getCurrentEnv().getWorkloadGroupMgr().tryCreateNormalWorkloadGroup("default");
}
protected void beforeCluster() {
diff --git a/regression-test/data/workload_manager_p0/test_curd_wlg.out
b/regression-test/data/workload_manager_p0/test_curd_wlg.out
index bdeca8ccf9d..ea8dac8b9c9 100644
Binary files a/regression-test/data/workload_manager_p0/test_curd_wlg.out and
b/regression-test/data/workload_manager_p0/test_curd_wlg.out differ
diff --git a/regression-test/suites/workload_manager_p0/test_curd_wlg.groovy
b/regression-test/suites/workload_manager_p0/test_curd_wlg.groovy
index e3a443afe3d..3ba731d9612 100644
--- a/regression-test/suites/workload_manager_p0/test_curd_wlg.groovy
+++ b/regression-test/suites/workload_manager_p0/test_curd_wlg.groovy
@@ -36,12 +36,15 @@ suite("test_crud_wlg") {
def forComputeGroupStr = "";
+ String computeGroupName = "default"
+
//cloud-mode
if (isCloudMode()) {
def clusters = sql " SHOW CLUSTERS; "
assertTrue(!clusters.isEmpty())
def validCluster = clusters[0][0]
forComputeGroupStr = " for $validCluster "
+ computeGroupName = validCluster
}
sql "drop workload group if exists bypass_group $forComputeGroupStr;"
@@ -134,13 +137,20 @@ suite("test_crud_wlg") {
");"
sql "set workload_group=test_group;"
- qt_show_1 "select
name,cpu_share,memory_limit,enable_memory_overcommit,max_concurrency,max_queue_size,queue_timeout,cpu_hard_limit,scan_thread_num,compute_group,read_bytes_per_second,remote_read_bytes_per_second
from information_schema.workload_groups where name in ('normal','test_group')
order by name;"
+ qt_show_1 "select
name,cpu_share,memory_limit,enable_memory_overcommit,max_concurrency,max_queue_size,queue_timeout,cpu_hard_limit,scan_thread_num,read_bytes_per_second,remote_read_bytes_per_second
from information_schema.workload_groups where name in ('normal','test_group')
order by name;"
+
+ def query_cg_result = sql "select distinct compute_group from
information_schema.workload_groups where name in ('normal','test_group')"
+ String cg_name = query_cg_result[0][0]
+ if (!computeGroupName.equals(cg_name)) {
+ logger.info("expected:" + computeGroupName + ", real:
$query_cg_result, " + cg_name)
+ assertTrue(false)
+ }
// test drop workload group
sql "create workload group if not exists test_drop_wg $forComputeGroupStr
properties ('cpu_share'='10')"
- qt_show_del_wg_1 "select
name,cpu_share,memory_limit,enable_memory_overcommit,max_concurrency,max_queue_size,queue_timeout,cpu_hard_limit,scan_thread_num,compute_group
from information_schema.workload_groups where name in
('normal','test_group','test_drop_wg') order by name;"
+ qt_show_del_wg_1 "select
name,cpu_share,memory_limit,enable_memory_overcommit,max_concurrency,max_queue_size,queue_timeout,cpu_hard_limit,scan_thread_num
from information_schema.workload_groups where name in
('normal','test_group','test_drop_wg') order by name;"
sql "drop workload group test_drop_wg $forComputeGroupStr"
- qt_show_del_wg_2 "select
name,cpu_share,memory_limit,enable_memory_overcommit,max_concurrency,max_queue_size,queue_timeout,cpu_hard_limit,scan_thread_num,compute_group
from information_schema.workload_groups where name in
('normal','test_group','test_drop_wg') order by name;"
+ qt_show_del_wg_2 "select
name,cpu_share,memory_limit,enable_memory_overcommit,max_concurrency,max_queue_size,queue_timeout,cpu_hard_limit,scan_thread_num
from information_schema.workload_groups where name in
('normal','test_group','test_drop_wg') order by name;"
// test memory_limit
test {
@@ -644,7 +654,7 @@ suite("test_crud_wlg") {
// test default value
sql "drop workload group if exists default_val_wg $forComputeGroupStr"
sql "create workload group default_val_wg $forComputeGroupStr
properties('enable_memory_overcommit'='true');"
- qt_select_default_val_wg_1 "select
name,cpu_share,memory_limit,enable_memory_overcommit,max_concurrency,max_queue_size,queue_timeout,cpu_hard_limit,scan_thread_num,max_remote_scan_thread_num,min_remote_scan_thread_num,memory_low_watermark,memory_high_watermark,compute_group,read_bytes_per_second,remote_read_bytes_per_second
from information_schema.workload_groups where name = 'default_val_wg'"
+ qt_select_default_val_wg_1 "select
name,cpu_share,memory_limit,enable_memory_overcommit,max_concurrency,max_queue_size,queue_timeout,cpu_hard_limit,scan_thread_num,max_remote_scan_thread_num,min_remote_scan_thread_num,memory_low_watermark,memory_high_watermark,read_bytes_per_second,remote_read_bytes_per_second
from information_schema.workload_groups where name = 'default_val_wg'"
sql """
alter workload group default_val_wg $forComputeGroupStr properties(
@@ -662,7 +672,7 @@ suite("test_crud_wlg") {
'remote_read_bytes_per_second'='10');
"""
- qt_select_default_val_wg_2 "select
name,cpu_share,memory_limit,enable_memory_overcommit,max_concurrency,max_queue_size,queue_timeout,cpu_hard_limit,scan_thread_num,max_remote_scan_thread_num,min_remote_scan_thread_num,memory_low_watermark,memory_high_watermark,compute_group,read_bytes_per_second,remote_read_bytes_per_second
from information_schema.workload_groups where name = 'default_val_wg'"
+ qt_select_default_val_wg_2 "select
name,cpu_share,memory_limit,enable_memory_overcommit,max_concurrency,max_queue_size,queue_timeout,cpu_hard_limit,scan_thread_num,max_remote_scan_thread_num,min_remote_scan_thread_num,memory_low_watermark,memory_high_watermark,read_bytes_per_second,remote_read_bytes_per_second
from information_schema.workload_groups where name = 'default_val_wg'"
sql """
alter workload group default_val_wg $forComputeGroupStr properties(
@@ -681,7 +691,7 @@ suite("test_crud_wlg") {
);
"""
- qt_select_default_val_wg_3 "select
name,cpu_share,memory_limit,enable_memory_overcommit,max_concurrency,max_queue_size,queue_timeout,cpu_hard_limit,scan_thread_num,max_remote_scan_thread_num,min_remote_scan_thread_num,memory_low_watermark,memory_high_watermark,compute_group,read_bytes_per_second,remote_read_bytes_per_second
from information_schema.workload_groups where name = 'default_val_wg'"
+ qt_select_default_val_wg_3 "select
name,cpu_share,memory_limit,enable_memory_overcommit,max_concurrency,max_queue_size,queue_timeout,cpu_hard_limit,scan_thread_num,max_remote_scan_thread_num,min_remote_scan_thread_num,memory_low_watermark,memory_high_watermark,read_bytes_per_second,remote_read_bytes_per_second
from information_schema.workload_groups where name = 'default_val_wg'"
sql "drop workload group if exists default_val_wg $forComputeGroupStr"
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]