This is an automated email from the ASF dual-hosted git repository. yiguolei pushed a commit to branch branch-2.1 in repository https://gitbox.apache.org/repos/asf/doris.git
commit 895b3a6f7abac9ca0be92f39f87eadf796490f74 Author: wangbo <[email protected]> AuthorDate: Wed Apr 17 09:48:37 2024 +0800 [Fix](executor)Fix routine load failed when can not find group (#33596) --- .../doris/load/routineload/KafkaTaskInfo.java | 52 +++++++++++++--------- .../resource/workloadgroup/WorkloadGroupMgr.java | 17 +++++-- 2 files changed, 45 insertions(+), 24 deletions(-) diff --git a/fe/fe-core/src/main/java/org/apache/doris/load/routineload/KafkaTaskInfo.java b/fe/fe-core/src/main/java/org/apache/doris/load/routineload/KafkaTaskInfo.java index 86a084764ea..deb6749e677 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/load/routineload/KafkaTaskInfo.java +++ b/fe/fe-core/src/main/java/org/apache/doris/load/routineload/KafkaTaskInfo.java @@ -139,17 +139,23 @@ public class KafkaTaskInfo extends RoutineLoadTaskInfo { tExecPlanFragmentParams.getQueryOptions().setQueryTimeout((int) timeoutS); tExecPlanFragmentParams.getQueryOptions().setExecutionTimeout((int) timeoutS); - long wgId = routineLoadJob.getWorkloadId(); - List<TPipelineWorkloadGroup> tWgList = new ArrayList<>(); - if (wgId > 0) { - tWgList = Env.getCurrentEnv().getWorkloadGroupMgr() - .getTWorkloadGroupById(wgId); - } - if (tWgList.size() == 0) { - tWgList = Env.getCurrentEnv().getWorkloadGroupMgr() - .getTWorkloadGroupByUserIdentity(routineLoadJob.getUserIdentity()); + if (Config.enable_workload_group) { + long wgId = routineLoadJob.getWorkloadId(); + List<TPipelineWorkloadGroup> tWgList = new ArrayList<>(); + if (wgId > 0) { + tWgList = Env.getCurrentEnv().getWorkloadGroupMgr() + .getTWorkloadGroupById(wgId); + if (tWgList.size() == 0) { + throw new UserException("can not find workload group, id=" + wgId); + } + } else { + tWgList = Env.getCurrentEnv().getWorkloadGroupMgr() + .getWorkloadGroupByUser(routineLoadJob.getUserIdentity()); + } + if (tWgList.size() != 0) { + tExecPlanFragmentParams.setWorkloadGroups(tWgList); + } } - tExecPlanFragmentParams.setWorkloadGroups(tWgList); return tExecPlanFragmentParams; } @@ -166,17 +172,23 @@ public class KafkaTaskInfo extends RoutineLoadTaskInfo { tExecPlanFragmentParams.getQueryOptions().setQueryTimeout((int) timeoutS); tExecPlanFragmentParams.getQueryOptions().setExecutionTimeout((int) timeoutS); - long wgId = routineLoadJob.getWorkloadId(); - List<TPipelineWorkloadGroup> tWgList = new ArrayList<>(); - if (wgId > 0) { - tWgList = Env.getCurrentEnv().getWorkloadGroupMgr() - .getTWorkloadGroupById(wgId); - } - if (tWgList.size() == 0) { - tWgList = Env.getCurrentEnv().getWorkloadGroupMgr() - .getTWorkloadGroupByUserIdentity(routineLoadJob.getUserIdentity()); + if (Config.enable_workload_group) { + long wgId = routineLoadJob.getWorkloadId(); + List<TPipelineWorkloadGroup> tWgList = new ArrayList<>(); + if (wgId > 0) { + tWgList = Env.getCurrentEnv().getWorkloadGroupMgr() + .getTWorkloadGroupById(wgId); + if (tWgList.size() == 0) { + throw new UserException("can not find workload group, id=" + wgId); + } + } else { + tWgList = Env.getCurrentEnv().getWorkloadGroupMgr() + .getWorkloadGroupByUser(routineLoadJob.getUserIdentity()); + } + if (tWgList.size() != 0) { + tExecPlanFragmentParams.setWorkloadGroups(tWgList); + } } - tExecPlanFragmentParams.setWorkloadGroups(tWgList); return tExecPlanFragmentParams; } diff --git a/fe/fe-core/src/main/java/org/apache/doris/resource/workloadgroup/WorkloadGroupMgr.java b/fe/fe-core/src/main/java/org/apache/doris/resource/workloadgroup/WorkloadGroupMgr.java index 28648ef25e2..7796a385eee 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/resource/workloadgroup/WorkloadGroupMgr.java +++ b/fe/fe-core/src/main/java/org/apache/doris/resource/workloadgroup/WorkloadGroupMgr.java @@ -223,14 +223,23 @@ public class WorkloadGroupMgr implements Writable, GsonPostProcessable { return tWorkloadGroups; } - public List<TPipelineWorkloadGroup> getTWorkloadGroupByUserIdentity(UserIdentity user) throws UserException { + public List<TPipelineWorkloadGroup> getWorkloadGroupByUser(UserIdentity user) throws UserException { String groupName = Env.getCurrentEnv().getAuth().getWorkloadGroup(user.getQualifiedUser()); List<TPipelineWorkloadGroup> ret = new ArrayList<>(); + WorkloadGroup wg = null; readLock(); try { - WorkloadGroup wg = nameToWorkloadGroup.get(groupName); - if (wg == null) { - throw new UserException("can not find workload group " + groupName); + if (groupName == null || groupName.isEmpty()) { + wg = nameToWorkloadGroup.get(DEFAULT_GROUP_NAME); + if (wg == null) { + throw new RuntimeException("can not find normal workload group for routineload"); + } + } else { + wg = nameToWorkloadGroup.get(groupName); + if (wg == null) { + throw new UserException( + "can not find workload group " + groupName + " for user " + user.getQualifiedUser()); + } } ret.add(wg.toThrift()); } finally { --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
