This is an automated email from the ASF dual-hosted git repository.

yiguolei pushed a commit to branch branch-2.1
in repository https://gitbox.apache.org/repos/asf/doris.git

commit 895b3a6f7abac9ca0be92f39f87eadf796490f74
Author: wangbo <[email protected]>
AuthorDate: Wed Apr 17 09:48:37 2024 +0800

    [Fix](executor)Fix routine load failed when can not find group (#33596)
---
 .../doris/load/routineload/KafkaTaskInfo.java      | 52 +++++++++++++---------
 .../resource/workloadgroup/WorkloadGroupMgr.java   | 17 +++++--
 2 files changed, 45 insertions(+), 24 deletions(-)

diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/load/routineload/KafkaTaskInfo.java 
b/fe/fe-core/src/main/java/org/apache/doris/load/routineload/KafkaTaskInfo.java
index 86a084764ea..deb6749e677 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/load/routineload/KafkaTaskInfo.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/load/routineload/KafkaTaskInfo.java
@@ -139,17 +139,23 @@ public class KafkaTaskInfo extends RoutineLoadTaskInfo {
         tExecPlanFragmentParams.getQueryOptions().setQueryTimeout((int) 
timeoutS);
         tExecPlanFragmentParams.getQueryOptions().setExecutionTimeout((int) 
timeoutS);
 
-        long wgId = routineLoadJob.getWorkloadId();
-        List<TPipelineWorkloadGroup> tWgList = new ArrayList<>();
-        if (wgId > 0) {
-            tWgList = Env.getCurrentEnv().getWorkloadGroupMgr()
-                    .getTWorkloadGroupById(wgId);
-        }
-        if (tWgList.size() == 0) {
-            tWgList = Env.getCurrentEnv().getWorkloadGroupMgr()
-                    
.getTWorkloadGroupByUserIdentity(routineLoadJob.getUserIdentity());
+        if (Config.enable_workload_group) {
+            long wgId = routineLoadJob.getWorkloadId();
+            List<TPipelineWorkloadGroup> tWgList = new ArrayList<>();
+            if (wgId > 0) {
+                tWgList = Env.getCurrentEnv().getWorkloadGroupMgr()
+                        .getTWorkloadGroupById(wgId);
+                if (tWgList.size() == 0) {
+                    throw new UserException("can not find workload group, id=" 
+ wgId);
+                }
+            } else {
+                tWgList = Env.getCurrentEnv().getWorkloadGroupMgr()
+                        
.getWorkloadGroupByUser(routineLoadJob.getUserIdentity());
+            }
+            if (tWgList.size() != 0) {
+                tExecPlanFragmentParams.setWorkloadGroups(tWgList);
+            }
         }
-        tExecPlanFragmentParams.setWorkloadGroups(tWgList);
 
         return tExecPlanFragmentParams;
     }
@@ -166,17 +172,23 @@ public class KafkaTaskInfo extends RoutineLoadTaskInfo {
         tExecPlanFragmentParams.getQueryOptions().setQueryTimeout((int) 
timeoutS);
         tExecPlanFragmentParams.getQueryOptions().setExecutionTimeout((int) 
timeoutS);
 
-        long wgId = routineLoadJob.getWorkloadId();
-        List<TPipelineWorkloadGroup> tWgList = new ArrayList<>();
-        if (wgId > 0) {
-            tWgList = Env.getCurrentEnv().getWorkloadGroupMgr()
-                    .getTWorkloadGroupById(wgId);
-        }
-        if (tWgList.size() == 0) {
-            tWgList = Env.getCurrentEnv().getWorkloadGroupMgr()
-                    
.getTWorkloadGroupByUserIdentity(routineLoadJob.getUserIdentity());
+        if (Config.enable_workload_group) {
+            long wgId = routineLoadJob.getWorkloadId();
+            List<TPipelineWorkloadGroup> tWgList = new ArrayList<>();
+            if (wgId > 0) {
+                tWgList = Env.getCurrentEnv().getWorkloadGroupMgr()
+                        .getTWorkloadGroupById(wgId);
+                if (tWgList.size() == 0) {
+                    throw new UserException("can not find workload group, id=" 
+ wgId);
+                }
+            } else {
+                tWgList = Env.getCurrentEnv().getWorkloadGroupMgr()
+                        
.getWorkloadGroupByUser(routineLoadJob.getUserIdentity());
+            }
+            if (tWgList.size() != 0) {
+                tExecPlanFragmentParams.setWorkloadGroups(tWgList);
+            }
         }
-        tExecPlanFragmentParams.setWorkloadGroups(tWgList);
 
         return tExecPlanFragmentParams;
     }
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/resource/workloadgroup/WorkloadGroupMgr.java
 
b/fe/fe-core/src/main/java/org/apache/doris/resource/workloadgroup/WorkloadGroupMgr.java
index 28648ef25e2..7796a385eee 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/resource/workloadgroup/WorkloadGroupMgr.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/resource/workloadgroup/WorkloadGroupMgr.java
@@ -223,14 +223,23 @@ public class WorkloadGroupMgr implements Writable, 
GsonPostProcessable {
         return tWorkloadGroups;
     }
 
-    public List<TPipelineWorkloadGroup> 
getTWorkloadGroupByUserIdentity(UserIdentity user) throws UserException {
+    public List<TPipelineWorkloadGroup> getWorkloadGroupByUser(UserIdentity 
user) throws UserException {
         String groupName = 
Env.getCurrentEnv().getAuth().getWorkloadGroup(user.getQualifiedUser());
         List<TPipelineWorkloadGroup> ret = new ArrayList<>();
+        WorkloadGroup wg = null;
         readLock();
         try {
-            WorkloadGroup wg = nameToWorkloadGroup.get(groupName);
-            if (wg == null) {
-                throw new UserException("can not find workload group " + 
groupName);
+            if (groupName == null || groupName.isEmpty()) {
+                wg = nameToWorkloadGroup.get(DEFAULT_GROUP_NAME);
+                if (wg == null) {
+                    throw new RuntimeException("can not find normal workload 
group for routineload");
+                }
+            } else {
+                wg = nameToWorkloadGroup.get(groupName);
+                if (wg == null) {
+                    throw new UserException(
+                            "can not find workload group " + groupName + " for 
user " + user.getQualifiedUser());
+                }
             }
             ret.add(wg.toThrift());
         } finally {


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to