This is an automated email from the ASF dual-hosted git repository.

dataroaring pushed a commit to branch branch-3.0
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/branch-3.0 by this push:
     new 65a65e7a332 branch-3.0: [fix](job) use cluster name rather than 
cluster id to find available be (#52911) (#55563)
65a65e7a332 is described below

commit 65a65e7a3327b3a770f6bf9a7b126552b2f66464
Author: hui lai <[email protected]>
AuthorDate: Tue Sep 2 21:19:20 2025 +0800

    branch-3.0: [fix](job) use cluster name rather than cluster id to find 
available be (#52911) (#55563)
    
    pick #52911
    
    In multi cluster scenario, users perceive the cluster name, so it should
    be used cluster name rather than cluster id to find available be node.
---
 .../doris/cloud/load/CloudRoutineLoadManager.java  | 12 ++---
 .../doris/load/routineload/RoutineLoadJob.java     | 52 ++++++++++------------
 2 files changed, 27 insertions(+), 37 deletions(-)

diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/cloud/load/CloudRoutineLoadManager.java
 
b/fe/fe-core/src/main/java/org/apache/doris/cloud/load/CloudRoutineLoadManager.java
index eff1c345e5a..399b33e32d4 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/cloud/load/CloudRoutineLoadManager.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/cloud/load/CloudRoutineLoadManager.java
@@ -51,14 +51,8 @@ public class CloudRoutineLoadManager extends 
RoutineLoadManager {
     @Override
     protected List<Long> getAvailableBackendIds(long jobId) throws 
LoadException {
         RoutineLoadJob routineLoadJob = getJob(jobId);
-        String cloudClusterId = routineLoadJob.getCloudClusterId();
-        if (Strings.isNullOrEmpty(cloudClusterId)) {
-            LOG.warn("cluster id is empty");
-            throw new LoadException("cluster id is empty");
-        }
-
         return ((CloudSystemInfoService) Env.getCurrentSystemInfo())
-                .getBackendsByClusterId(cloudClusterId)
+                .getBackendsByClusterName(routineLoadJob.getCloudCluster())
                 .stream()
                 .filter(Backend::isAlive)
                 .map(Backend::getId)
@@ -67,13 +61,13 @@ public class CloudRoutineLoadManager extends 
RoutineLoadManager {
 
     @Override
     public void replayCreateRoutineLoadJob(RoutineLoadJob routineLoadJob) {
-        routineLoadJob.setCloudClusterById();
+        routineLoadJob.setCloudCluster();
         super.replayCreateRoutineLoadJob(routineLoadJob);
     }
 
     @Override
     public void replayChangeRoutineLoadJob(RoutineLoadOperation operation) {
-        getJob(operation.getId()).setCloudClusterById();
+        getJob(operation.getId()).setCloudCluster();
         super.replayChangeRoutineLoadJob(operation);
     }
 }
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadJob.java
 
b/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadJob.java
index 9c83bb6e9f9..46460e1b82a 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadJob.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadJob.java
@@ -292,6 +292,8 @@ public abstract class RoutineLoadJob
     protected byte escape = 0;
 
     // use for cloud cluster mode
+    protected String qualifiedUser;
+    @SerializedName("ccn")
     protected String cloudCluster;
 
     public void setTypeRead(boolean isTypeRead) {
@@ -303,6 +305,13 @@ public abstract class RoutineLoadJob
         this.dataSourceType = type;
         if (ConnectContext.get() != null) {
             this.memtableOnSinkNode = 
ConnectContext.get().getSessionVariable().enableMemtableOnSinkNode;
+            if (Config.isCloudMode()) {
+                try {
+                    this.cloudCluster = ConnectContext.get().getCloudCluster();
+                } catch (ComputeGroupException e) {
+                    LOG.warn("failed to get cloud cluster", e);
+                }
+            }
         }
     }
 
@@ -320,6 +329,13 @@ public abstract class RoutineLoadJob
             SessionVariable var = ConnectContext.get().getSessionVariable();
             sessionVariables.put(SessionVariable.SQL_MODE, 
Long.toString(var.getSqlMode()));
             this.memtableOnSinkNode = 
ConnectContext.get().getSessionVariable().enableMemtableOnSinkNode;
+            if (Config.isCloudMode()) {
+                try {
+                    this.cloudCluster = ConnectContext.get().getCloudCluster();
+                } catch (ComputeGroupException e) {
+                    LOG.warn("failed to get cloud cluster", e);
+                }
+            }
         } else {
             sessionVariables.put(SessionVariable.SQL_MODE, 
String.valueOf(SqlModeHelper.MODE_DEFAULT));
         }
@@ -1010,21 +1026,13 @@ public abstract class RoutineLoadJob
         table.readLock();
         try {
             if (Config.isCloudMode()) {
-                String clusterName = ((CloudSystemInfoService) 
Env.getCurrentSystemInfo())
-                        .getClusterNameByClusterId(cloudClusterId);
-                if (Strings.isNullOrEmpty(clusterName)) {
-                    String err = String.format("cluster name is empty, cluster 
id is %s", cloudClusterId);
-                    LOG.warn(err);
-                    throw new UserException(err);
-                }
-
                 if (ConnectContext.get() == null) {
                     ConnectContext ctx = new ConnectContext();
                     ctx.setThreadLocalInfo();
-                    ctx.setCloudCluster(clusterName);
+                    ctx.setCloudCluster(cloudCluster);
                     needCleanCtx = true;
                 } else {
-                    ConnectContext.get().setCloudCluster(clusterName);
+                    ConnectContext.get().setCloudCluster(cloudCluster);
                 }
             }
 
@@ -1599,27 +1607,15 @@ public abstract class RoutineLoadJob
         this.origStmt = origStmt;
     }
 
-    public void setCloudCluster(String cloudClusterName) throws UserException {
-        if (Strings.isNullOrEmpty(cloudClusterName)) {
-            LOG.warn("cluster name is empty");
-            throw new UserException("cluster name is empty");
+    public void setCloudCluster() {
+        if (Strings.isNullOrEmpty(cloudCluster)) {
+            this.cloudCluster = ((CloudSystemInfoService) 
Env.getCurrentSystemInfo())
+                    .getClusterNameByClusterId(cloudClusterId);
         }
-
-        this.cloudClusterId = ((CloudSystemInfoService) 
Env.getCurrentSystemInfo())
-                .getCloudClusterIdByName(cloudClusterName);
-        if (Strings.isNullOrEmpty(this.cloudClusterId)) {
-            LOG.warn("cluster id is empty, cluster name {}", cloudClusterName);
-            throw new UserException("cluster id is empty, cluster name: " + 
cloudClusterName);
-        }
-    }
-
-    public String getCloudClusterId() {
-        return cloudClusterId;
     }
 
-    public void setCloudClusterById() {
-        this.cloudCluster = ((CloudSystemInfoService) 
Env.getCurrentSystemInfo())
-                        .getClusterNameByClusterId(cloudClusterId);
+    public void setCloudCluster(String cloudCluster) {
+        this.cloudCluster = cloudCluster;
     }
 
     // check the correctness of commit info


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to