This is an automated email from the ASF dual-hosted git repository.
yiguolei pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new ba9ba646caa [chore](refactor) move query queue as member of workload
group (#53688)
ba9ba646caa is described below
commit ba9ba646caaccbd674dbb85466b4e5cb592eb4e8
Author: yiguolei <[email protected]>
AuthorDate: Fri Jul 25 13:56:24 2025 +0800
[chore](refactor) move query queue as member of workload group (#53688)
### What problem does this PR solve?
In the past, query queue is not a member of workload group, but it
lifecycle is the same with workload group and there are many code to
maintain this relation ship.
In this PR, I move query queue to a member of workload group to simplify
the code.
### Release note
None
### Check List (For Author)
- Test <!-- At least one of them must be included. -->
- [ ] Regression test
- [ ] Unit Test
- [ ] Manual test (add detailed scripts or steps below)
- [ ] No need to test or manual test. Explain why:
- [ ] This is a refactor/code format and no logic has been changed.
- [ ] Previous test can cover this change.
- [ ] No code files have been changed.
- [ ] Other reason <!-- Add your reason? -->
- Behavior changed:
- [ ] No.
- [ ] Yes. <!-- Explain the behavior change -->
- Does this need documentation?
- [ ] No.
- [ ] Yes. <!-- Add document PR link here. eg:
https://github.com/apache/doris-website/pull/1214 -->
### Check List (For Reviewer who merge this PR)
- [ ] Confirm the release note
- [ ] Confirm test cases
- [ ] Confirm document
- [ ] Add branch pick label <!-- Add branch pick label that this PR
should merge into -->
---
.../main/java/org/apache/doris/common/Config.java | 3 -
.../doris/analysis/CreateRoutineLoadStmt.java | 4 +-
.../main/java/org/apache/doris/catalog/Env.java | 1 -
.../org/apache/doris/load/loadv2/LoadManager.java | 10 +-
.../doris/load/routineload/KafkaTaskInfo.java | 6 +-
.../plans/commands/info/CreateRoutineLoadInfo.java | 4 +-
.../main/java/org/apache/doris/qe/Coordinator.java | 20 +--
.../org/apache/doris/qe/NereidsCoordinator.java | 18 +--
.../computegroup/AllBackendComputeGroup.java | 4 +
.../doris/resource/computegroup/ComputeGroup.java | 4 +-
.../doris/resource/workloadgroup/QueryQueue.java | 12 +-
.../resource/workloadgroup/WorkloadGroup.java | 140 +++++++++++----------
.../resource/workloadgroup/WorkloadGroupMgr.java | 138 ++------------------
.../apache/doris/service/FrontendServiceImpl.java | 6 +-
.../workloadgroup/WorkloadGroupMgrTest.java | 62 ++-------
.../resource/workloadgroup/WorkloadGroupTest.java | 3 +-
.../data/workload_manager_p0/test_curd_wlg.out | Bin 3479 -> 3515 bytes
.../workload_manager_p0/test_curd_wlg.groovy | 29 -----
18 files changed, 147 insertions(+), 317 deletions(-)
diff --git a/fe/fe-common/src/main/java/org/apache/doris/common/Config.java
b/fe/fe-common/src/main/java/org/apache/doris/common/Config.java
index acaf27c5850..05c988fa594 100644
--- a/fe/fe-common/src/main/java/org/apache/doris/common/Config.java
+++ b/fe/fe-common/src/main/java/org/apache/doris/common/Config.java
@@ -1969,9 +1969,6 @@ public class Config extends ConfigBase {
@ConfField(mutable = true, varType = VariableAnnotation.EXPERIMENTAL)
public static boolean enable_workload_group = true;
- @ConfField(mutable = true, varType = VariableAnnotation.EXPERIMENTAL)
- public static boolean enable_wg_memory_sum_limit = true;
-
@ConfField(mutable = true)
public static boolean enable_query_queue = true;
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/analysis/CreateRoutineLoadStmt.java
b/fe/fe-core/src/main/java/org/apache/doris/analysis/CreateRoutineLoadStmt.java
index c0301e91224..ba647e74a37 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/analysis/CreateRoutineLoadStmt.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/analysis/CreateRoutineLoadStmt.java
@@ -36,7 +36,7 @@ import
org.apache.doris.load.routineload.RoutineLoadDataSourcePropertyFactory;
import org.apache.doris.load.routineload.RoutineLoadJob;
import org.apache.doris.qe.ConnectContext;
import org.apache.doris.qe.OriginStatement;
-import org.apache.doris.thrift.TPipelineWorkloadGroup;
+import org.apache.doris.resource.workloadgroup.WorkloadGroup;
import com.google.common.base.Strings;
import com.google.common.collect.ImmutableSet;
@@ -526,7 +526,7 @@ public class CreateRoutineLoadStmt extends DdlStmt
implements NotFallbackInParse
}
tmpCtx.setCurrentUserIdentity(ConnectContext.get().getCurrentUserIdentity());
tmpCtx.getSessionVariable().setWorkloadGroup(inputWorkloadGroupStr);
- List<TPipelineWorkloadGroup> wgList =
Env.getCurrentEnv().getWorkloadGroupMgr()
+ List<WorkloadGroup> wgList =
Env.getCurrentEnv().getWorkloadGroupMgr()
.getWorkloadGroup(tmpCtx);
if (wgList.size() == 0) {
throw new UserException("Can not find workload group " +
inputWorkloadGroupStr);
diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java
b/fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java
index fd014114f4b..abddf8c44a0 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java
@@ -1974,7 +1974,6 @@ public class Env {
dnsCache.start();
- workloadGroupMgr.start();
workloadSchedPolicyMgr.start();
workloadRuntimeStatusMgr.start();
admissionControl.start();
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/LoadManager.java
b/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/LoadManager.java
index 9f52d057af3..a2e1ae10cda 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/LoadManager.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/LoadManager.java
@@ -118,7 +118,10 @@ public class LoadManager implements Writable {
List<TPipelineWorkloadGroup> twgList = null;
if (Config.enable_workload_group) {
try {
- twgList =
Env.getCurrentEnv().getWorkloadGroupMgr().getWorkloadGroup(ConnectContext.get());
+ twgList =
Env.getCurrentEnv().getWorkloadGroupMgr().getWorkloadGroup(ConnectContext.get())
+ .stream()
+ .map(e -> e.toThrift())
+ .collect(Collectors.toList());
} catch (Throwable t) {
LOG.info("Get workload group failed when create load job,", t);
throw t;
@@ -167,7 +170,10 @@ public class LoadManager implements Writable {
List<TPipelineWorkloadGroup> twgList = null;
if (Config.enable_workload_group) {
try {
- twgList =
Env.getCurrentEnv().getWorkloadGroupMgr().getWorkloadGroup(ConnectContext.get());
+ twgList =
Env.getCurrentEnv().getWorkloadGroupMgr().getWorkloadGroup(ConnectContext.get())
+ .stream()
+ .map(e -> e.toThrift())
+ .collect(Collectors.toList());
} catch (Throwable t) {
LOG.info("Get workload group failed when create load job,", t);
throw t;
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/load/routineload/KafkaTaskInfo.java
b/fe/fe-core/src/main/java/org/apache/doris/load/routineload/KafkaTaskInfo.java
index 16eb812397f..a3d87ccef8b 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/load/routineload/KafkaTaskInfo.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/load/routineload/KafkaTaskInfo.java
@@ -45,6 +45,7 @@ import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.UUID;
+import java.util.stream.Collectors;
public class KafkaTaskInfo extends RoutineLoadTaskInfo {
private RoutineLoadManager routineLoadManager =
Env.getCurrentEnv().getRoutineLoadManager();
@@ -158,7 +159,10 @@ public class KafkaTaskInfo extends RoutineLoadTaskInfo {
tmpContext.getSessionVariable().setWorkloadGroup(wgName);
}
- tWgList =
Env.getCurrentEnv().getWorkloadGroupMgr().getWorkloadGroup(tmpContext);
+ tWgList =
Env.getCurrentEnv().getWorkloadGroupMgr().getWorkloadGroup(tmpContext)
+ .stream()
+ .map(e -> e.toThrift())
+ .collect(Collectors.toList());
if (tWgList.size() != 0) {
tExecPlanFragmentParams.setWorkloadGroups(tWgList);
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/info/CreateRoutineLoadInfo.java
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/info/CreateRoutineLoadInfo.java
index e6c9c5fea29..6be432073fa 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/info/CreateRoutineLoadInfo.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/info/CreateRoutineLoadInfo.java
@@ -55,7 +55,7 @@ import
org.apache.doris.nereids.trees.plans.commands.load.LoadSequenceClause;
import org.apache.doris.nereids.trees.plans.commands.load.LoadWhereClause;
import org.apache.doris.nereids.util.PlanUtils;
import org.apache.doris.qe.ConnectContext;
-import org.apache.doris.thrift.TPipelineWorkloadGroup;
+import org.apache.doris.resource.workloadgroup.WorkloadGroup;
import com.google.common.base.Strings;
import com.google.common.collect.ImmutableSet;
@@ -485,7 +485,7 @@ public class CreateRoutineLoadInfo {
if (Config.isCloudMode()) {
tmpCtx.setCloudCluster(ConnectContext.get().getCloudCluster());
}
- List<TPipelineWorkloadGroup> wgList =
Env.getCurrentEnv().getWorkloadGroupMgr()
+ List<WorkloadGroup> wgList =
Env.getCurrentEnv().getWorkloadGroupMgr()
.getWorkloadGroup(tmpCtx);
if (wgList.size() == 0) {
throw new UserException("Can not find workload group " +
inputWorkloadGroupStr);
diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java
b/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java
index d68b91fb409..f3d99dbbb4e 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java
@@ -79,6 +79,7 @@ import org.apache.doris.qe.ConnectContext.ConnectType;
import org.apache.doris.qe.QueryStatisticsItem.FragmentInstanceInfo;
import org.apache.doris.resource.workloadgroup.QueryQueue;
import org.apache.doris.resource.workloadgroup.QueueToken;
+import org.apache.doris.resource.workloadgroup.WorkloadGroup;
import org.apache.doris.rpc.BackendServiceProxy;
import org.apache.doris.rpc.RpcException;
import org.apache.doris.service.ExecuteEnv;
@@ -654,24 +655,25 @@ public class Coordinator implements CoordInterface {
// LoadTask does not have context, not controlled by queue now
if (context != null) {
if (Config.enable_workload_group) {
- List<TPipelineWorkloadGroup> wgList =
context.getEnv().getWorkloadGroupMgr().getWorkloadGroup(context);
+ List<WorkloadGroup> wgs =
Env.getCurrentEnv().getWorkloadGroupMgr()
+ .getWorkloadGroup(context);
+ List<TPipelineWorkloadGroup> wgList = wgs.stream()
+ .map(e -> e.toThrift())
+ .collect(Collectors.toList());
this.setTWorkloadGroups(wgList);
- boolean shouldQueue = this.shouldQueue();
- if (shouldQueue) {
- Set<Long> wgIdSet = Sets.newHashSet();
- for (TPipelineWorkloadGroup twg : wgList) {
- wgIdSet.add(twg.getId());
- }
- queryQueue =
context.getEnv().getWorkloadGroupMgr().getWorkloadGroupQueryQueue(wgIdSet);
- if (queryQueue == null) {
+ if (this.shouldQueue()) {
+ if (wgs.size() < 1) {
// This logic is actually useless, because when could
not find query queue, it will
// throw exception during workload group manager.
throw new UserException("could not find query queue");
}
+ // AllBackendComputeGroup may assocatiate with multiple
workload groups
+ queryQueue = wgs.get(0).getQueryQueue();
queueToken =
queryQueue.getToken(context.getSessionVariable().wgQuerySlotCount);
queueToken.get(DebugUtil.printId(queryId),
this.queryOptions.getExecutionTimeout() * 1000);
}
+ context.setWorkloadGroupName(wgs.get(0).getName());
} else {
context.setWorkloadGroupName("");
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/qe/NereidsCoordinator.java
b/fe/fe-core/src/main/java/org/apache/doris/qe/NereidsCoordinator.java
index 7ad22e6eac1..641d433c429 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/qe/NereidsCoordinator.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/qe/NereidsCoordinator.java
@@ -51,6 +51,7 @@ import org.apache.doris.qe.runtime.SingleFragmentPipelineTask;
import org.apache.doris.qe.runtime.ThriftPlansBuilder;
import org.apache.doris.resource.workloadgroup.QueryQueue;
import org.apache.doris.resource.workloadgroup.QueueToken;
+import org.apache.doris.resource.workloadgroup.WorkloadGroup;
import org.apache.doris.service.arrowflight.results.FlightSqlEndpointsLocation;
import org.apache.doris.system.Backend;
import org.apache.doris.thrift.TErrorTabletInfo;
@@ -66,14 +67,13 @@ import org.apache.doris.thrift.TUniqueId;
import com.google.common.base.Preconditions;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
-import com.google.common.collect.Sets;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import java.util.Comparator;
import java.util.List;
import java.util.Map;
-import java.util.Set;
+import java.util.stream.Collectors;
/** NereidsCoordinator */
public class NereidsCoordinator extends Coordinator {
@@ -472,24 +472,24 @@ public class NereidsCoordinator extends Coordinator {
// LoadTask does not have context, not controlled by queue now
if (context != null && needEnqueue) {
if (Config.enable_workload_group) {
- List<TPipelineWorkloadGroup> wgList =
context.getEnv().getWorkloadGroupMgr().getWorkloadGroup(context);
+ List<WorkloadGroup> wgs =
context.getEnv().getWorkloadGroupMgr().getWorkloadGroup(context);
+ List<TPipelineWorkloadGroup> wgList = wgs.stream()
+ .map(e -> e.toThrift())
+ .collect(Collectors.toList());
this.setTWorkloadGroups(wgList);
if (shouldQueue(context)) {
- Set<Long> wgIdSet = Sets.newHashSet();
- for (TPipelineWorkloadGroup twg : wgList) {
- wgIdSet.add(twg.getId());
- }
- QueryQueue queryQueue =
context.getEnv().getWorkloadGroupMgr().getWorkloadGroupQueryQueue(wgIdSet);
- if (queryQueue == null) {
+ if (wgs.size() < 1) {
// This logic is actually useless, because when could
not find query queue, it will
// throw exception during workload group manager.
throw new UserException("could not find query queue");
}
+ QueryQueue queryQueue = wgs.get(0).getQueryQueue();
QueueToken queueToken =
queryQueue.getToken(context.getSessionVariable().wgQuerySlotCount);
int queryTimeout =
coordinatorContext.queryOptions.getExecutionTimeout() * 1000;
coordinatorContext.setQueueInfo(queryQueue, queueToken);
queueToken.get(DebugUtil.printId(coordinatorContext.queryId), queryTimeout);
}
+ context.setWorkloadGroupName(wgs.get(0).getName());
} else {
context.setWorkloadGroupName("");
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/resource/computegroup/AllBackendComputeGroup.java
b/fe/fe-core/src/main/java/org/apache/doris/resource/computegroup/AllBackendComputeGroup.java
index 86f7cd4e193..ed5b08eab1b 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/resource/computegroup/AllBackendComputeGroup.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/resource/computegroup/AllBackendComputeGroup.java
@@ -65,6 +65,10 @@ public class AllBackendComputeGroup extends ComputeGroup {
return
systemInfoService.getAllClusterBackendsNoException().values().asList();
}
+ // In non cloud mode, if root or admin user not set resource group name,
then will use all
+ // resource groups(This is by design in the past). So we need get all
workload groups
+ // from different resource groups and publish the workload group to all
backends.
+ // The backend will filter the workload group that is pre-created in its
local.
@Override
public List<WorkloadGroup> getWorkloadGroup(String wgName,
WorkloadGroupMgr wgMgr) throws UserException {
List<WorkloadGroup> wgList = Lists.newArrayList();
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/resource/computegroup/ComputeGroup.java
b/fe/fe-core/src/main/java/org/apache/doris/resource/computegroup/ComputeGroup.java
index c75f32285a1..200a9435ed4 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/resource/computegroup/ComputeGroup.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/resource/computegroup/ComputeGroup.java
@@ -98,14 +98,12 @@ public class ComputeGroup {
// use wgMgr as args is just for FE UT, otherwise get wgMgr from env is
hard to mock
public List<WorkloadGroup> getWorkloadGroup(String wgName,
WorkloadGroupMgr wgMgr) throws UserException {
- List<WorkloadGroup> wgList = Lists.newArrayList();
WorkloadGroup wg = wgMgr
.getWorkloadGroupByComputeGroup(WorkloadGroupKey.get(id,
wgName));
if (wg == null) {
throw new UserException("Can not find workload group " + wgName +
" in compute croup " + name);
}
- wgList.add(wg);
- return wgList;
+ return Lists.newArrayList(wg);
}
private void checkInvalidComputeGroup() {
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/resource/workloadgroup/QueryQueue.java
b/fe/fe-core/src/main/java/org/apache/doris/resource/workloadgroup/QueryQueue.java
index 82822c05a0d..e792b56e6b6 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/resource/workloadgroup/QueryQueue.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/resource/workloadgroup/QueryQueue.java
@@ -49,8 +49,6 @@ public class QueryQueue {
private long wgId;
- private long propVersion;
-
private PriorityQueue<QueueToken> waitingQueryQueue;
private Queue<QueueToken> runningQueryQueue;
@@ -63,10 +61,6 @@ public class QueryQueue {
}
}
- long getPropVersion() {
- return propVersion;
- }
-
long getWgId() {
return wgId;
}
@@ -83,18 +77,17 @@ public class QueryQueue {
return queueTimeout;
}
- public QueryQueue(long wgId, int maxConcurrency, int maxQueueSize, int
queueTimeout, long propVersion) {
+ public QueryQueue(long wgId, int maxConcurrency, int maxQueueSize, int
queueTimeout) {
this.wgId = wgId;
this.maxConcurrency = maxConcurrency;
this.maxQueueSize = maxQueueSize;
this.queueTimeout = queueTimeout;
- this.propVersion = propVersion;
this.waitingQueryQueue = new PriorityQueue<QueueToken>();
this.runningQueryQueue = new LinkedList<QueueToken>();
}
public String debugString() {
- return "wgId= " + wgId + ", version=" + this.propVersion +
",maxConcurrency=" + maxConcurrency
+ return "wgId= " + wgId + ",maxConcurrency=" + maxConcurrency
+ ", maxQueueSize=" + maxQueueSize + ", queueTimeout=" +
queueTimeout + ", currentRunningQueryNum="
+ runningQueryQueue.size() + ", currentWaitingQueryNum=" +
waitingQueryQueue.size();
}
@@ -201,7 +194,6 @@ public class QueryQueue {
this.maxConcurrency = maxConcurrency;
this.maxQueueSize = maxQueueSize;
this.queueTimeout = queryWaitTimeout;
- this.propVersion = version;
} finally {
if (LOG.isDebugEnabled()) {
LOG.debug(this.debugString());
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/resource/workloadgroup/WorkloadGroup.java
b/fe/fe-core/src/main/java/org/apache/doris/resource/workloadgroup/WorkloadGroup.java
index 134edb91f2f..d7184463e9d 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/resource/workloadgroup/WorkloadGroup.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/resource/workloadgroup/WorkloadGroup.java
@@ -109,6 +109,11 @@ public class WorkloadGroup implements Writable,
GsonPostProcessable {
.put(SPILL_THRESHOLD_LOW_WATERMARK, MEMORY_LOW_WATERMARK)
.put(SPILL_THRESHOLD_HIGH_WATERMARK,
MEMORY_HIGH_WATERMARK).build();
+
+ public static final int CPU_HARD_LIMIT_DEFAULT_VALUE = 100;
+ // Memory limit is a string value ending with % in BE, so it is different
from other limits
+ // other limit is a number.
+ public static final String MEMORY_LIMIT_DEFAULT_VALUE = "100%";
public static final int MEMORY_LOW_WATERMARK_DEFAULT_VALUE = 75;
public static final int MEMORY_HIGH_WATERMARK_DEFAULT_VALUE = 85;
@@ -116,9 +121,9 @@ public class WorkloadGroup implements Writable,
GsonPostProcessable {
static {
ALL_PROPERTIES_DEFAULT_VALUE_MAP.put(CPU_SHARE, "-1");
- ALL_PROPERTIES_DEFAULT_VALUE_MAP.put(CPU_HARD_LIMIT, "-1");
- ALL_PROPERTIES_DEFAULT_VALUE_MAP.put(MEMORY_LIMIT, "-1");
- ALL_PROPERTIES_DEFAULT_VALUE_MAP.put(ENABLE_MEMORY_OVERCOMMIT, "true");
+ ALL_PROPERTIES_DEFAULT_VALUE_MAP.put(CPU_HARD_LIMIT, "100");
+ ALL_PROPERTIES_DEFAULT_VALUE_MAP.put(MEMORY_LIMIT, "100%");
+ ALL_PROPERTIES_DEFAULT_VALUE_MAP.put(ENABLE_MEMORY_OVERCOMMIT,
"false");
ALL_PROPERTIES_DEFAULT_VALUE_MAP.put(MAX_CONCURRENCY,
String.valueOf(Integer.MAX_VALUE));
ALL_PROPERTIES_DEFAULT_VALUE_MAP.put(MAX_QUEUE_SIZE, "0");
ALL_PROPERTIES_DEFAULT_VALUE_MAP.put(QUEUE_TIMEOUT, "0");
@@ -153,12 +158,7 @@ public class WorkloadGroup implements Writable,
GsonPostProcessable {
@SerializedName(value = "version")
private long version;
- private double memoryLimitPercent = 0;
- private int maxConcurrency = Integer.MAX_VALUE;
- private int maxQueueSize = 0;
- private int queueTimeout = 0;
-
- private int cpuHardLimit = 0;
+ private QueryQueue queryQueue;
WorkloadGroup(long id, String name, Map<String, String> properties) {
this(id, name, properties, 0);
@@ -169,9 +169,6 @@ public class WorkloadGroup implements Writable,
GsonPostProcessable {
this.name = name;
this.properties = properties;
this.version = version;
- if (properties.containsKey(MEMORY_LIMIT)) {
- setMemLimitPercent(properties);
- }
if (properties.containsKey(WRITE_BUFFER_RATIO)) {
String loadBufLimitStr = properties.get(WRITE_BUFFER_RATIO);
@@ -192,21 +189,40 @@ public class WorkloadGroup implements Writable,
GsonPostProcessable {
if (properties.containsKey(ENABLE_MEMORY_OVERCOMMIT)) {
properties.put(ENABLE_MEMORY_OVERCOMMIT,
properties.get(ENABLE_MEMORY_OVERCOMMIT).toLowerCase());
+ } else {
+ properties.put(ENABLE_MEMORY_OVERCOMMIT, "false");
}
+
if (properties.containsKey(CPU_HARD_LIMIT)) {
String cpuHardLimitStr = properties.get(CPU_HARD_LIMIT);
if (cpuHardLimitStr.endsWith("%")) {
cpuHardLimitStr = cpuHardLimitStr.substring(0,
cpuHardLimitStr.length() - 1);
}
- this.cpuHardLimit = Integer.parseInt(cpuHardLimitStr);
this.properties.put(CPU_HARD_LIMIT, cpuHardLimitStr);
+ } else {
+ this.properties.put(CPU_HARD_LIMIT, CPU_HARD_LIMIT_DEFAULT_VALUE +
"");
}
+
+ if (properties.containsKey(MEMORY_LIMIT)) {
+ String memHardLimitStr = properties.get(MEMORY_LIMIT);
+ // If the input is -1, it means use all memory, in this version we
could change it to 100% now
+ // since sum of all group's memory could be larger than 100%
+ if (memHardLimitStr.equals("-1")) {
+ memHardLimitStr = MEMORY_LIMIT_DEFAULT_VALUE;
+ }
+ this.properties.put(MEMORY_LIMIT, memHardLimitStr);
+ } else {
+ this.properties.put(MEMORY_LIMIT, MEMORY_LIMIT_DEFAULT_VALUE);
+ }
+
if (properties.containsKey(MEMORY_LOW_WATERMARK)) {
String lowWatermarkStr = properties.get(MEMORY_LOW_WATERMARK);
if (lowWatermarkStr.endsWith("%")) {
lowWatermarkStr = lowWatermarkStr.substring(0,
lowWatermarkStr.length() - 1);
}
this.properties.put(MEMORY_LOW_WATERMARK, lowWatermarkStr);
+ } else {
+ this.properties.put(MEMORY_LOW_WATERMARK,
MEMORY_LOW_WATERMARK_DEFAULT_VALUE + "");
}
if (properties.containsKey(MEMORY_HIGH_WATERMARK)) {
String highWatermarkStr = properties.get(MEMORY_HIGH_WATERMARK);
@@ -214,29 +230,10 @@ public class WorkloadGroup implements Writable,
GsonPostProcessable {
highWatermarkStr = highWatermarkStr.substring(0,
highWatermarkStr.length() - 1);
}
this.properties.put(MEMORY_HIGH_WATERMARK, highWatermarkStr);
- }
- resetQueueProperty(properties);
- }
-
- private void resetQueueProperty(Map<String, String> properties) {
- if (properties.containsKey(MAX_CONCURRENCY)) {
- this.maxConcurrency =
Integer.parseInt(properties.get(MAX_CONCURRENCY));
} else {
- this.maxConcurrency = Integer.MAX_VALUE;
- properties.put(MAX_CONCURRENCY,
String.valueOf(this.maxConcurrency));
- }
- if (properties.containsKey(MAX_QUEUE_SIZE)) {
- this.maxQueueSize =
Integer.parseInt(properties.get(MAX_QUEUE_SIZE));
- } else {
- this.maxQueueSize = 0;
- properties.put(MAX_QUEUE_SIZE, String.valueOf(maxQueueSize));
- }
- if (properties.containsKey(QUEUE_TIMEOUT)) {
- this.queueTimeout =
Integer.parseInt(properties.get(QUEUE_TIMEOUT));
- } else {
- this.queueTimeout = 0;
- properties.put(QUEUE_TIMEOUT, String.valueOf(queueTimeout));
+ this.properties.put(MEMORY_HIGH_WATERMARK,
MEMORY_HIGH_WATERMARK_DEFAULT_VALUE + "");
}
+ initQueryQueue();
}
// new resource group
@@ -540,6 +537,10 @@ public class WorkloadGroup implements Writable,
GsonPostProcessable {
return name;
}
+ public QueryQueue getQueryQueue() {
+ return queryQueue;
+ }
+
public WorkloadGroupKey getWorkloadGroupKey() {
return WorkloadGroupKey.get(this.getComputeGroup(), this.getName());
}
@@ -552,23 +553,11 @@ public class WorkloadGroup implements Writable,
GsonPostProcessable {
return version;
}
- public int getMaxConcurrency() {
- return maxConcurrency;
- }
-
- public int getMaxQueueSize() {
- return maxQueueSize;
- }
-
- public int getQueueTimeout() {
- return queueTimeout;
- }
-
- public void getProcNodeData(BaseProcResult result, QueryQueue qq) {
+ public void getProcNodeData(BaseProcResult result) {
List<String> row = new ArrayList<>();
row.add(String.valueOf(id));
row.add(name);
- Pair<Integer, Integer> queryQueueDetail = qq != null ?
qq.getQueryQueueDetail() : null;
+ Pair<Integer, Integer> queryQueueDetail =
queryQueue.getQueryQueueDetail();
// skip id,name,running query,waiting query
for (int i = 2; i <
WorkloadGroupMgr.WORKLOAD_GROUP_PROC_NODE_TITLE_NAMES.size(); i++) {
String key =
WorkloadGroupMgr.WORKLOAD_GROUP_PROC_NODE_TITLE_NAMES.get(i);
@@ -606,12 +595,9 @@ public class WorkloadGroup implements Writable,
GsonPostProcessable {
result.addRow(row);
}
- public double getMemoryLimitPercentWhenCalSum() {
- return memoryLimitPercent == -1 ? 0 : memoryLimitPercent;
- }
-
public double getMemoryLimitPercent() {
- return memoryLimitPercent;
+ String memoryStr = properties.get(MEMORY_LIMIT);
+ return Double.valueOf(memoryStr.substring(0, memoryStr.length() - 1));
}
public String getComputeGroup() {
@@ -746,22 +732,48 @@ public class WorkloadGroup implements Writable,
GsonPostProcessable {
return workloadGroup;
}
- void setMemLimitPercent(Map<String, String> props) {
- String memoryLimitString = props.get(MEMORY_LIMIT);
- this.memoryLimitPercent = "-1".equals(memoryLimitString) ? -1
- : Double.parseDouble(memoryLimitString.substring(0,
memoryLimitString.length() - 1));
- }
-
@Override
public void gsonPostProcess() throws IOException {
+ // Do not use -1 as default value, using 100%
if (properties.containsKey(MEMORY_LIMIT)) {
- setMemLimitPercent(properties);
+ String memHardLimitStr = properties.get(MEMORY_LIMIT);
+ // If the input is -1, it means use all memory, in this version we
could change it to 100% now
+ // since sum of all group's memory could be larger than 100%
+ if (memHardLimitStr.equals("-1")) {
+ memHardLimitStr = MEMORY_LIMIT_DEFAULT_VALUE;
+ }
+ this.properties.put(MEMORY_LIMIT, memHardLimitStr);
+ } else {
+ this.properties.put(MEMORY_LIMIT, MEMORY_LIMIT_DEFAULT_VALUE);
}
+ // The from json method just uses reflection logic to create a new
workload group
+ // but workload group's contructor need create other objects, like
queue, so need
+ // init queue here after workload group is created from json
+ initQueryQueue();
+ }
- if (properties.containsKey(CPU_HARD_LIMIT)) {
- this.cpuHardLimit =
Integer.parseInt(properties.get(CPU_HARD_LIMIT));
+ private void initQueryQueue() {
+ int maxConcurrency = Integer.MAX_VALUE;
+ int maxQueueSize = 0;
+ int queueTimeout = 0;
+ if (properties.containsKey(MAX_CONCURRENCY)) {
+ maxConcurrency = Integer.parseInt(properties.get(MAX_CONCURRENCY));
+ } else {
+ maxConcurrency = Integer.MAX_VALUE;
+ properties.put(MAX_CONCURRENCY, String.valueOf(maxConcurrency));
}
-
- this.resetQueueProperty(this.properties);
+ if (properties.containsKey(MAX_QUEUE_SIZE)) {
+ maxQueueSize = Integer.parseInt(properties.get(MAX_QUEUE_SIZE));
+ } else {
+ maxQueueSize = 0;
+ properties.put(MAX_QUEUE_SIZE, String.valueOf(maxQueueSize));
+ }
+ if (properties.containsKey(QUEUE_TIMEOUT)) {
+ queueTimeout = Integer.parseInt(properties.get(QUEUE_TIMEOUT));
+ } else {
+ queueTimeout = 0;
+ properties.put(QUEUE_TIMEOUT, String.valueOf(queueTimeout));
+ }
+ queryQueue = new QueryQueue(id, maxConcurrency, maxQueueSize,
queueTimeout);
}
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/resource/workloadgroup/WorkloadGroupMgr.java
b/fe/fe-core/src/main/java/org/apache/doris/resource/workloadgroup/WorkloadGroupMgr.java
index 8cdf28870c0..bb37ae46551 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/resource/workloadgroup/WorkloadGroupMgr.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/resource/workloadgroup/WorkloadGroupMgr.java
@@ -32,7 +32,6 @@ import org.apache.doris.common.io.Text;
import org.apache.doris.common.io.Writable;
import org.apache.doris.common.proc.BaseProcResult;
import org.apache.doris.common.proc.ProcResult;
-import org.apache.doris.common.util.MasterDaemon;
import org.apache.doris.mysql.privilege.PrivPredicate;
import org.apache.doris.persist.DropWorkloadGroupOperationLog;
import org.apache.doris.persist.gson.GsonPostProcessable;
@@ -41,7 +40,6 @@ import org.apache.doris.qe.ConnectContext;
import org.apache.doris.resource.Tag;
import org.apache.doris.resource.computegroup.ComputeGroup;
import org.apache.doris.resource.computegroup.ComputeGroupMgr;
-import org.apache.doris.thrift.TPipelineWorkloadGroup;
import org.apache.doris.thrift.TUserIdentity;
import org.apache.doris.thrift.TopicInfo;
@@ -58,14 +56,13 @@ import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import java.util.ArrayList;
-import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.locks.ReentrantReadWriteLock;
-public class WorkloadGroupMgr extends MasterDaemon implements Writable,
GsonPostProcessable {
+public class WorkloadGroupMgr implements Writable, GsonPostProcessable {
public static final String DEFAULT_GROUP_NAME = "normal";
@@ -96,51 +93,7 @@ public class WorkloadGroupMgr extends MasterDaemon
implements Writable, GsonPost
public static final String EMPTY_COMPUTE_GROUP = "";
- @Override
- protected void runAfterCatalogReady() {
- try {
- resetQueryQueueProp();
- } catch (Throwable e) {
- LOG.warn("reset query queue failed, ", e);
- }
- }
-
- public void resetQueryQueueProp() {
- List<QueryQueue> newPropList = new ArrayList<>();
- Map<Long, QueryQueue> currentQueueCopyMap = new HashMap<>();
- readLock();
- try {
- for (Map.Entry<Long, WorkloadGroup> entry :
idToWorkloadGroup.entrySet()) {
- WorkloadGroup wg = entry.getValue();
- QueryQueue tmpQ = new QueryQueue(wg.getId(),
wg.getMaxConcurrency(),
- wg.getMaxQueueSize(), wg.getQueueTimeout(),
wg.getVersion());
- newPropList.add(tmpQ);
- }
- for (Map.Entry<Long, QueryQueue> entry :
idToQueryQueue.entrySet()) {
- currentQueueCopyMap.put(entry.getKey(), entry.getValue());
- }
- } finally {
- readUnlock();
- }
-
- for (QueryQueue newPropQq : newPropList) {
- QueryQueue currentQueryQueue =
currentQueueCopyMap.get(newPropQq.getWgId());
- if (currentQueryQueue == null) {
- continue;
- }
- if (newPropQq.getPropVersion() >
currentQueryQueue.getPropVersion()) {
-
currentQueryQueue.resetQueueProperty(newPropQq.getMaxConcurrency(),
newPropQq.getMaxQueueSize(),
- newPropQq.getQueueTimeout(),
newPropQq.getPropVersion());
- }
- if (LOG.isDebugEnabled()) {
- LOG.debug(currentQueryQueue.debugString()); // for test debug
- }
- }
- }
-
- public WorkloadGroupMgr() {
- super("workload-group-thread", Config.query_queue_update_interval_ms);
- }
+ public WorkloadGroupMgr() {}
public static WorkloadGroupMgr read(DataInput in) throws IOException {
String json = Text.readString(in);
@@ -179,22 +132,16 @@ public class WorkloadGroupMgr extends MasterDaemon
implements Writable, GsonPost
return keyToWorkloadGroup.get(wgKey);
}
- public List<TPipelineWorkloadGroup> getWorkloadGroup(ConnectContext
context) throws UserException {
+ // the workload group name in AllBackendComputeGroup will associate with
multiple workload groups
+ public List<WorkloadGroup> getWorkloadGroup(ConnectContext context) throws
UserException {
String wgName = getWorkloadGroupNameAndCheckPriv(context);
ComputeGroup cg = context.getComputeGroup();
-
- List<TPipelineWorkloadGroup> workloadGroups = Lists.newArrayList();
readLock();
try {
- List<WorkloadGroup> wgList = cg.getWorkloadGroup(wgName, this);
- for (WorkloadGroup wg : wgList) {
- workloadGroups.add(wg.toThrift());
- }
- context.setWorkloadGroupName(wgName);
+ return cg.getWorkloadGroup(wgName, this);
} finally {
readUnlock();
}
- return workloadGroups;
}
public List<TopicInfo> getPublishTopicInfo() {
@@ -210,59 +157,16 @@ public class WorkloadGroupMgr extends MasterDaemon
implements Writable, GsonPost
return workloadGroups;
}
- public QueryQueue getWorkloadGroupQueryQueue(Set<Long> wgIdSet) throws
UserException {
- writeLock();
- try {
- QueryQueue queryQueue = null;
- for (long wgId : wgIdSet) {
- WorkloadGroup wg = idToWorkloadGroup.get(wgId);
- if (wg == null) {
- continue;
- }
- QueryQueue tmpQueue = idToQueryQueue.get(wg.getId());
- if (tmpQueue == null) {
- tmpQueue = new QueryQueue(wg.getId(),
wg.getMaxConcurrency(), wg.getMaxQueueSize(),
- wg.getQueueTimeout(), wg.getVersion());
- idToQueryQueue.put(wg.getId(), tmpQueue);
- queryQueue = tmpQueue;
- break;
- }
- if (queryQueue == null) {
- queryQueue = tmpQueue;
- } else {
- Pair<Integer, Integer> detail1 =
queryQueue.getQueryQueueDetail();
- Pair<Integer, Integer> detail2 =
tmpQueue.getQueryQueueDetail();
- if (detail2.first < detail1.first) {
- queryQueue = tmpQueue;
- }
- }
- }
- if (queryQueue == null) {
- throw new DdlException("Can not find query queue for workload
group: " + wgIdSet);
- }
- return queryQueue;
- } finally {
- writeUnlock();
- }
- }
-
public Map<String, List<String>> getWorkloadGroupQueryDetail() {
Map<String, List<String>> ret = Maps.newHashMap();
readLock();
try {
for (Map.Entry<Long, WorkloadGroup> entry :
idToWorkloadGroup.entrySet()) {
- Long wgId = entry.getKey();
WorkloadGroup wg = entry.getValue();
- QueryQueue qq = idToQueryQueue.get(wgId);
List<String> valueList = new ArrayList<>(2);
- if (qq == null) {
- valueList.add("0");
- valueList.add("0");
- } else {
- Pair<Integer, Integer> qdtail = qq.getQueryQueueDetail();
- valueList.add(String.valueOf(qdtail.first));
- valueList.add(String.valueOf(qdtail.second));
- }
+ Pair<Integer, Integer> qdtail =
wg.getQueryQueue().getQueryQueueDetail();
+ valueList.add(String.valueOf(qdtail.first));
+ valueList.add(String.valueOf(qdtail.second));
ret.put(wg.getWorkloadGroupKey().toString(), valueList);
}
} finally {
@@ -318,7 +222,6 @@ public class WorkloadGroupMgr extends MasterDaemon
implements Writable, GsonPost
private void checkGlobalUnlock(WorkloadGroup newWg, WorkloadGroup oldWg)
throws DdlException {
String newWgCg = newWg.getComputeGroup();
- double sumOfAllMemLimit = 0;
int wgNumOfCurrentCg = 0;
boolean isAlterStmt = oldWg != null;
boolean isCreateStmt = !isAlterStmt;
@@ -339,21 +242,6 @@ public class WorkloadGroupMgr extends MasterDaemon
implements Writable, GsonPost
if (!newWgCg.equals(curWgCg)) {
continue;
}
-
- if (wg.getMemoryLimitPercentWhenCalSum() > 0) {
- sumOfAllMemLimit += wg.getMemoryLimitPercentWhenCalSum();
- }
- }
-
- // 2 sum current wg value
- sumOfAllMemLimit += newWg.getMemoryLimitPercentWhenCalSum();
-
- // 3 check total sum
- if (Config.enable_wg_memory_sum_limit && sumOfAllMemLimit > 100.0 +
1e-6) {
- throw new DdlException(
- "The sum of all workload group " +
WorkloadGroup.MEMORY_LIMIT + " within compute group " + (
- newWgCg)
- + " can not be greater than 100.0%. current sum
val:" + sumOfAllMemLimit);
}
// 4 check wg num
@@ -450,7 +338,6 @@ public class WorkloadGroupMgr extends MasterDaemon
implements Writable, GsonPost
WorkloadGroup workloadGroup = keyToWorkloadGroup.get(wgKey);
keyToWorkloadGroup.remove(wgKey);
idToWorkloadGroup.remove(workloadGroup.getId());
- idToQueryQueue.remove(workloadGroup.getId());
Env.getCurrentEnv().getEditLog()
.logDropWorkloadGroup(new
DropWorkloadGroupOperationLog(workloadGroup.getId()));
} finally {
@@ -550,11 +437,6 @@ public class WorkloadGroupMgr extends MasterDaemon
implements Writable, GsonPost
return idToWorkloadGroup;
}
- // for ut
- public Map<Long, QueryQueue> getIdToQueryQueue() {
- return idToQueryQueue;
- }
-
@Override
public void write(DataOutput out) throws IOException {
String json = GsonUtils.GSON.toJson(this);
@@ -583,7 +465,7 @@ public class WorkloadGroupMgr extends MasterDaemon
implements Writable, GsonPost
workloadGroup.getName(),
PrivPredicate.SHOW_WORKLOAD_GROUP)) {
continue;
}
- workloadGroup.getProcNodeData(result,
idToQueryQueue.get(workloadGroup.getId()));
+ workloadGroup.getProcNodeData(result);
}
} finally {
readUnlock();
@@ -685,7 +567,6 @@ public class WorkloadGroupMgr extends MasterDaemon
implements Writable, GsonPost
for (WorkloadGroup wg : tombstoneWgList) {
keyToWorkloadGroup.remove(wg.getWorkloadGroupKey());
idToWorkloadGroup.remove(wg.getId());
- idToQueryQueue.remove(wg.getId());
Env.getCurrentEnv().getEditLog()
.logDropWorkloadGroup(new
DropWorkloadGroupOperationLog(wg.getId()));
LOG.info("Drop tombstone normal workload group {}
success.", wg);
@@ -730,7 +611,6 @@ public class WorkloadGroupMgr extends MasterDaemon
implements Writable, GsonPost
// drop old workload group
keyToWorkloadGroup.remove(oldKey);
idToWorkloadGroup.remove(oldWg.getId());
- idToQueryQueue.remove(oldWg.getId());
Env.getCurrentEnv().getEditLog().logDropWorkloadGroup(new
DropWorkloadGroupOperationLog(oldWg.getId()));
LOG.info("[init_wg]Drop old workload group {} success.", oldWg);
} catch (Throwable t) {
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java
b/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java
index 6dc75dd7701..833391ee834 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java
@@ -2038,8 +2038,10 @@ public class FrontendServiceImpl implements
FrontendService.Iface {
// mysql load request not carry user info, need fix it later.
boolean hasUserName = !StringUtils.isEmpty(request.getUser());
if (Config.enable_workload_group && hasUserName) {
- tWorkloadGroupList = Env.getCurrentEnv().getWorkloadGroupMgr()
- .getWorkloadGroup(ConnectContext.get());
+ tWorkloadGroupList =
Env.getCurrentEnv().getWorkloadGroupMgr().getWorkloadGroup(ConnectContext.get())
+ .stream()
+ .map(e -> e.toThrift())
+ .collect(Collectors.toList());
}
if (!Strings.isNullOrEmpty(request.getLoadSql())) {
httpStreamPutImpl(request, result);
diff --git
a/fe/fe-core/src/test/java/org/apache/doris/resource/workloadgroup/WorkloadGroupMgrTest.java
b/fe/fe-core/src/test/java/org/apache/doris/resource/workloadgroup/WorkloadGroupMgrTest.java
index c30360c15c3..b633307a721 100644
---
a/fe/fe-core/src/test/java/org/apache/doris/resource/workloadgroup/WorkloadGroupMgrTest.java
+++
b/fe/fe-core/src/test/java/org/apache/doris/resource/workloadgroup/WorkloadGroupMgrTest.java
@@ -46,6 +46,7 @@ import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.atomic.AtomicLong;
+import java.util.stream.Collectors;
public class WorkloadGroupMgrTest {
@@ -161,19 +162,13 @@ public class WorkloadGroupMgrTest {
Assert.assertTrue(idToRG.containsKey(group2.getId()));
Assert.assertTrue(key2.getComputeGroup().equals(wg2.getComputeGroup()));
- // 3 test memory limit exceeds
+ // 3 test memory limit exceeds, it will success
Map<String, String> properties3 = Maps.newHashMap();
properties3.put(WorkloadGroup.CPU_SHARE, "20");
properties3.put(WorkloadGroup.MEMORY_LIMIT, "90%");
properties3.put(WorkloadGroup.COMPUTE_GROUP, cg1);
String wgName3 = "wg3";
long wgId3 = 3;
- try {
- workloadGroupMgr.createWorkloadGroup(cg1, new WorkloadGroup(wgId3,
wgName3, properties3), false);
- Assert.fail();
- } catch (DdlException e) {
- Assert.assertTrue(e.getMessage().contains("can not be greater"));
- }
properties3.put(WorkloadGroup.MEMORY_LIMIT, "1%");
workloadGroupMgr.createWorkloadGroup(cg1, new WorkloadGroup(wgId3,
wgName3, properties3), false);
@@ -222,7 +217,10 @@ public class WorkloadGroupMgrTest {
ConnectContext ctx = new ConnectContext();
// 1.1 not set wg, get normal
ctx.setComputeGroup(new ComputeGroup(cgName1, cgName1, null));
- List<TPipelineWorkloadGroup> ret =
workloadGroupMgr.getWorkloadGroup(ctx);
+ List<TPipelineWorkloadGroup> ret =
workloadGroupMgr.getWorkloadGroup(ctx)
+ .stream()
+ .map(e -> e.toThrift())
+ .collect(Collectors.toList());
Assert.assertTrue(ret.get(0).getId() == 100);
ctx.setComputeGroup(new ComputeGroup(cgName2, cgName2, null));
@@ -247,7 +245,10 @@ public class WorkloadGroupMgrTest {
Map<String, String> prop3 = Maps.newHashMap();
prop3.put(WorkloadGroup.COMPUTE_GROUP, cgName1);
workloadGroupMgr.createWorkloadGroup(cgName1, new WorkloadGroup(wgId3,
wgName2, prop3), false);
- List<TPipelineWorkloadGroup> tPipWgList =
workloadGroupMgr.getWorkloadGroup(ctx);
+ List<TPipelineWorkloadGroup> tPipWgList =
workloadGroupMgr.getWorkloadGroup(ctx)
+ .stream()
+ .map(e -> e.toThrift())
+ .collect(Collectors.toList());
Set<Long> idSet = Sets.newHashSet();
for (TPipelineWorkloadGroup tpip : tPipWgList) {
idSet.add(tpip.getId());
@@ -260,7 +261,9 @@ public class WorkloadGroupMgrTest {
// 1.5 test get failed
ctx.getSessionVariable().setWorkloadGroup("abc");
try {
- workloadGroupMgr.getWorkloadGroup(ctx);
+ workloadGroupMgr.getWorkloadGroup(ctx)
+ .stream()
+ .map(e -> e.toThrift()).collect(Collectors.toList());
Assert.fail();
} catch (UserException e) {
Assert.assertTrue(e.getMessage().contains("Can not find workload
group"));
@@ -336,7 +339,6 @@ public class WorkloadGroupMgrTest {
WorkloadGroup wg1 = new WorkloadGroup(wgId1, wgName1, prop1);
wgMgr.getIdToWorkloadGroup().put(wgId1, wg1);
wgMgr.getNameToWorkloadGroup().put(WorkloadGroupKey.get(WorkloadGroupMgr.EMPTY_COMPUTE_GROUP,
wgName1), wg1);
- wgMgr.getIdToQueryQueue().put(wgId1, new QueryQueue(0, 0, 0, 0, 0));
long wgId2 = 2;
String wgName2 = "wg2";
@@ -345,7 +347,6 @@ public class WorkloadGroupMgrTest {
WorkloadGroup wg2 = new WorkloadGroup(wgId2, wgName2, prop2);
wgMgr.getIdToWorkloadGroup().put(wgId2, wg2);
wgMgr.getNameToWorkloadGroup().put(WorkloadGroupKey.get(WorkloadGroupMgr.EMPTY_COMPUTE_GROUP,
wgName2), wg2);
- wgMgr.getIdToQueryQueue().put(wgId2, new QueryQueue(0, 0, 0, 0, 0));
long wgId3 = 3;
String wgName3 = "wg3";
@@ -354,7 +355,6 @@ public class WorkloadGroupMgrTest {
WorkloadGroup wg3 = new WorkloadGroup(wgId3, wgName3, prop3);
wgMgr.getIdToWorkloadGroup().put(wgId3, wg3);
wgMgr.getNameToWorkloadGroup().put(WorkloadGroupKey.get(WorkloadGroupMgr.EMPTY_COMPUTE_GROUP,
wgName3), wg3);
- wgMgr.getIdToQueryQueue().put(wgId3, new QueryQueue(0, 0, 0, 0, 0));
// create a duplicate wg3 which binds to a compute group
@@ -367,12 +367,10 @@ public class WorkloadGroupMgrTest {
prop4.put(WorkloadGroup.COMPUTE_GROUP, cg1);
WorkloadGroup wg4 = new WorkloadGroup(wgId4, wgName4, prop4);
wgMgr.createWorkloadGroup(cg1, wg4, false);
- wgMgr.getIdToQueryQueue().put(wgId4, new QueryQueue(0, 0, 0, 0, 0));
Assert.assertTrue(wgMgr.getIdToWorkloadGroup().size() == 4);
Assert.assertTrue(wgMgr.getNameToWorkloadGroup().size() == 4);
- Assert.assertTrue(wgMgr.getIdToQueryQueue().size() == 4);
Assert.assertTrue(wgMgr.getOldWorkloadGroup().size() == 3);
String cg2 = "cg2";
@@ -386,7 +384,6 @@ public class WorkloadGroupMgrTest {
Assert.assertTrue(wgMgr.getIdToWorkloadGroup().size() == 6);
Assert.assertTrue(wgMgr.getNameToWorkloadGroup().size() == 6);
- Assert.assertTrue(wgMgr.getIdToQueryQueue().size() == 1);
Assert.assertTrue(wgMgr.getOldWorkloadGroup().size() == 0);
Assert.assertTrue(wgMgr.getIdToWorkloadGroup().get(wgId1) == null);
Assert.assertTrue(wgMgr.getIdToWorkloadGroup().get(wgId2) == null);
@@ -445,48 +442,15 @@ public class WorkloadGroupMgrTest {
String wgName12 = "wg2";
Map<String, String> prop3 = Maps.newHashMap();
prop3.put(WorkloadGroup.COMPUTE_GROUP, cgName1);
- prop3.put(propName, "71%");
- try {
- workloadGroupMgr.createWorkloadGroup(cgName1, new
WorkloadGroup(3, wgName12, prop3), false);
- Assert.fail();
- } catch (DdlException e) {
- Assert.assertTrue(e.getMessage().contains("can not be
greater"));
- }
-
- // reset limit, then create should succ
prop3.put(propName, "70%");
workloadGroupMgr.createWorkloadGroup(cgName1, new WorkloadGroup(3,
wgName12, prop3), false);
- // create cg1.wg3, it should be failed and sum value should be 100
- String wgName13 = "wg3";
- Map<String, String> prop4 = Maps.newHashMap();
- prop4.put(WorkloadGroup.COMPUTE_GROUP, cgName1);
- prop4.put(propName, "1%");
- try {
- workloadGroupMgr.createWorkloadGroup(cgName1, new
WorkloadGroup(4, wgName13, prop4), false);
- Assert.fail();
- } catch (DdlException e) {
- Assert.assertTrue(e.getMessage().contains("current sum
val:101"));
- }
-
// create cg2.wg2 with limit 20%, it should be succ.
String wgName22 = "wg2";
Map<String, String> prop5 = Maps.newHashMap();
prop5.put(WorkloadGroup.COMPUTE_GROUP, cgName2);
prop5.put(propName, "9%");
workloadGroupMgr.createWorkloadGroup(cgName2, new WorkloadGroup(5,
wgName22, prop5), false);
-
- // create cg2.wg3 with limit 60, it should be failed.
- String wgName23 = "wg3";
- Map<String, String> prop6 = Maps.newHashMap();
- prop6.put(WorkloadGroup.COMPUTE_GROUP, cgName2);
- prop6.put(propName, "30%");
- try {
- workloadGroupMgr.createWorkloadGroup(cgName2, new
WorkloadGroup(6, wgName23, prop6), false);
- } catch (DdlException e) {
- Assert.assertTrue(e.getMessage().contains("current sum
val:110"));
- }
-
}
}
diff --git
a/fe/fe-core/src/test/java/org/apache/doris/resource/workloadgroup/WorkloadGroupTest.java
b/fe/fe-core/src/test/java/org/apache/doris/resource/workloadgroup/WorkloadGroupTest.java
index e75f1ef904a..720661c3778 100644
---
a/fe/fe-core/src/test/java/org/apache/doris/resource/workloadgroup/WorkloadGroupTest.java
+++
b/fe/fe-core/src/test/java/org/apache/doris/resource/workloadgroup/WorkloadGroupTest.java
@@ -39,7 +39,6 @@ public class WorkloadGroupTest {
String name1 = "g1";
WorkloadGroup group1 = WorkloadGroup.create(name1, properties1);
Assert.assertEquals(name1, group1.getName());
- Assert.assertEquals(8, group1.getProperties().size());
Assert.assertTrue(group1.getProperties().containsKey(WorkloadGroup.CPU_SHARE));
Assert.assertTrue(Math.abs(group1.getMemoryLimitPercent() - 30) <
1e-6);
}
@@ -86,7 +85,7 @@ public class WorkloadGroupTest {
WorkloadGroup group1 = WorkloadGroup.create(name1, properties1);
BaseProcResult result = new BaseProcResult();
- group1.getProcNodeData(result, null);
+ group1.getProcNodeData(result);
List<List<String>> rows = result.getRows();
Assert.assertEquals(1, rows.size());
}
diff --git a/regression-test/data/workload_manager_p0/test_curd_wlg.out
b/regression-test/data/workload_manager_p0/test_curd_wlg.out
index ea8dac8b9c9..5135555a6dd 100644
Binary files a/regression-test/data/workload_manager_p0/test_curd_wlg.out and
b/regression-test/data/workload_manager_p0/test_curd_wlg.out differ
diff --git a/regression-test/suites/workload_manager_p0/test_curd_wlg.groovy
b/regression-test/suites/workload_manager_p0/test_curd_wlg.groovy
index 0f2cecc5690..ae79dabd0d0 100644
--- a/regression-test/suites/workload_manager_p0/test_curd_wlg.groovy
+++ b/regression-test/suites/workload_manager_p0/test_curd_wlg.groovy
@@ -152,12 +152,6 @@ suite("test_crud_wlg") {
sql "drop workload group test_drop_wg $forComputeGroupStr"
qt_show_del_wg_2 "select
name,cpu_share,memory_limit,enable_memory_overcommit,max_concurrency,max_queue_size,queue_timeout,cpu_hard_limit,scan_thread_num
from information_schema.workload_groups where name in
('normal','test_group','test_drop_wg') order by name;"
- // test memory_limit
- test {
- sql "alter workload group test_group $forComputeGroupStr properties (
'memory_limit'='100%' );"
-
- exception "can not be greater than"
- }
sql "alter workload group test_group $forComputeGroupStr properties (
'memory_limit'='11%' );"
qt_mem_limit_1 """ select count(1) from ${table_name} """
@@ -247,29 +241,6 @@ suite("test_crud_wlg") {
exception "The allowed cpu_share value is -1 or a positive integer"
}
- // failed for mem_limit
- test {
- sql "create workload group if not exists test_group2
$forComputeGroupStr " +
- "properties ( " +
- " 'cpu_share'='10', " +
- " 'memory_limit'='200%', " +
- " 'enable_memory_overcommit'='true' " +
- ");"
-
- exception "can not be greater than"
- }
-
- test {
- sql "create workload group if not exists test_group2
$forComputeGroupStr " +
- "properties ( " +
- " 'cpu_share'='10', " +
- " 'memory_limit'='99%', " +
- " 'enable_memory_overcommit'='true' " +
- ");"
-
- exception "can not be greater than"
- }
-
// failed for mem_overcommit
test {
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]