This is an automated email from the ASF dual-hosted git repository.

dataroaring pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new d3f47245ee6 [fix](job) use cluster name rather than cluster id to find 
available be (#52911)
d3f47245ee6 is described below

commit d3f47245ee6852168e0749fdfce57182a10696a1
Author: hui lai <[email protected]>
AuthorDate: Fri Jul 11 11:43:49 2025 +0800

    [fix](job) use cluster name rather than cluster id to find available be 
(#52911)
    
    ### What problem does this PR solve?
    
    In multi cluster scenario, users perceive the cluster name, so it should
    be used cluster name rather than cluster id to find available be node.
---
 .../doris/cloud/load/CloudRoutineLoadManager.java  | 12 ++---
 .../doris/load/routineload/KafkaTaskInfo.java      | 12 +----
 .../doris/load/routineload/RoutineLoadJob.java     | 51 ++++++++++------------
 3 files changed, 27 insertions(+), 48 deletions(-)

diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/cloud/load/CloudRoutineLoadManager.java
 
b/fe/fe-core/src/main/java/org/apache/doris/cloud/load/CloudRoutineLoadManager.java
index eff1c345e5a..399b33e32d4 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/cloud/load/CloudRoutineLoadManager.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/cloud/load/CloudRoutineLoadManager.java
@@ -51,14 +51,8 @@ public class CloudRoutineLoadManager extends 
RoutineLoadManager {
     @Override
     protected List<Long> getAvailableBackendIds(long jobId) throws 
LoadException {
         RoutineLoadJob routineLoadJob = getJob(jobId);
-        String cloudClusterId = routineLoadJob.getCloudClusterId();
-        if (Strings.isNullOrEmpty(cloudClusterId)) {
-            LOG.warn("cluster id is empty");
-            throw new LoadException("cluster id is empty");
-        }
-
         return ((CloudSystemInfoService) Env.getCurrentSystemInfo())
-                .getBackendsByClusterId(cloudClusterId)
+                .getBackendsByClusterName(routineLoadJob.getCloudCluster())
                 .stream()
                 .filter(Backend::isAlive)
                 .map(Backend::getId)
@@ -67,13 +61,13 @@ public class CloudRoutineLoadManager extends 
RoutineLoadManager {
 
     @Override
     public void replayCreateRoutineLoadJob(RoutineLoadJob routineLoadJob) {
-        routineLoadJob.setCloudClusterById();
+        routineLoadJob.setCloudCluster();
         super.replayCreateRoutineLoadJob(routineLoadJob);
     }
 
     @Override
     public void replayChangeRoutineLoadJob(RoutineLoadOperation operation) {
-        getJob(operation.getId()).setCloudClusterById();
+        getJob(operation.getId()).setCloudCluster();
         super.replayChangeRoutineLoadJob(operation);
     }
 }
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/load/routineload/KafkaTaskInfo.java 
b/fe/fe-core/src/main/java/org/apache/doris/load/routineload/KafkaTaskInfo.java
index 110f45a1ad2..b869550406e 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/load/routineload/KafkaTaskInfo.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/load/routineload/KafkaTaskInfo.java
@@ -21,7 +21,6 @@ import org.apache.doris.catalog.Database;
 import org.apache.doris.catalog.Env;
 import org.apache.doris.catalog.OlapTable;
 import org.apache.doris.catalog.Table;
-import org.apache.doris.cloud.system.CloudSystemInfoService;
 import org.apache.doris.common.Config;
 import org.apache.doris.common.UserException;
 import org.apache.doris.common.util.DebugUtil;
@@ -37,7 +36,6 @@ import org.apache.doris.thrift.TRoutineLoadTask;
 import org.apache.doris.thrift.TUniqueId;
 
 import com.google.common.base.Joiner;
-import com.google.common.base.Strings;
 import com.google.gson.Gson;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.logging.log4j.LogManager;
@@ -151,15 +149,7 @@ public class KafkaTaskInfo extends RoutineLoadTaskInfo {
 
                 ConnectContext tmpContext = new ConnectContext();
                 if (Config.isCloudMode()) {
-                    String clusterName = ((CloudSystemInfoService) 
Env.getCurrentSystemInfo())
-                            
.getClusterNameByClusterId(routineLoadJob.getCloudClusterId());
-                    if (Strings.isNullOrEmpty(clusterName)) {
-                        LOG.warn("cluster name is empty, cluster id is {}, job 
id is {}",
-                                routineLoadJob.getCloudClusterId(), 
routineLoadJob.getTxnId());
-                        throw new UserException(String.format("cluster name is 
empty, cluster id is %s",
-                                routineLoadJob.getCloudClusterId()));
-                    }
-                    tmpContext.setCloudCluster(clusterName);
+                    
tmpContext.setCloudCluster(routineLoadJob.getCloudCluster());
                 }
                 
tmpContext.setCurrentUserIdentity(routineLoadJob.getUserIdentity());
 
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadJob.java
 
b/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadJob.java
index cd1808e4b5b..eb51067de9f 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadJob.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadJob.java
@@ -282,6 +282,7 @@ public abstract class RoutineLoadJob
 
     // use for cloud cluster mode
     protected String qualifiedUser;
+    @SerializedName("ccn")
     protected String cloudCluster;
 
     public RoutineLoadJob(long id, LoadDataSourceType type) {
@@ -289,6 +290,13 @@ public abstract class RoutineLoadJob
         this.dataSourceType = type;
         if (ConnectContext.get() != null) {
             this.memtableOnSinkNode = 
ConnectContext.get().getSessionVariable().enableMemtableOnSinkNode;
+            if (Config.isCloudMode()) {
+                try {
+                    this.cloudCluster = ConnectContext.get().getCloudCluster();
+                } catch (ComputeGroupException e) {
+                    LOG.warn("failed to get cloud cluster", e);
+                }
+            }
         }
     }
 
@@ -306,6 +314,13 @@ public abstract class RoutineLoadJob
             SessionVariable var = ConnectContext.get().getSessionVariable();
             sessionVariables.put(SessionVariable.SQL_MODE, 
Long.toString(var.getSqlMode()));
             this.memtableOnSinkNode = 
ConnectContext.get().getSessionVariable().enableMemtableOnSinkNode;
+            if (Config.isCloudMode()) {
+                try {
+                    this.cloudCluster = ConnectContext.get().getCloudCluster();
+                } catch (ComputeGroupException e) {
+                    LOG.warn("failed to get cloud cluster", e);
+                }
+            }
         } else {
             sessionVariables.put(SessionVariable.SQL_MODE, 
String.valueOf(SqlModeHelper.MODE_DEFAULT));
         }
@@ -1010,21 +1025,13 @@ public abstract class RoutineLoadJob
         table.readLock();
         try {
             if (Config.isCloudMode()) {
-                String clusterName = ((CloudSystemInfoService) 
Env.getCurrentSystemInfo())
-                        .getClusterNameByClusterId(cloudClusterId);
-                if (Strings.isNullOrEmpty(clusterName)) {
-                    String err = String.format("cluster name is empty, cluster 
id is %s", cloudClusterId);
-                    LOG.warn(err);
-                    throw new UserException(err);
-                }
-
                 if (ConnectContext.get() == null) {
                     ConnectContext ctx = new ConnectContext();
                     ctx.setThreadLocalInfo();
-                    ctx.setCloudCluster(clusterName);
+                    ctx.setCloudCluster(cloudCluster);
                     needCleanCtx = true;
                 } else {
-                    ConnectContext.get().setCloudCluster(clusterName);
+                    ConnectContext.get().setCloudCluster(cloudCluster);
                 }
                 
ConnectContext.get().setCurrentUserIdentity(this.getUserIdentity());
             } else {
@@ -1609,27 +1616,15 @@ public abstract class RoutineLoadJob
         this.origStmt = origStmt;
     }
 
-    public void setCloudCluster(String cloudClusterName) throws UserException {
-        if (Strings.isNullOrEmpty(cloudClusterName)) {
-            LOG.warn("cluster name is empty");
-            throw new UserException("cluster name is empty");
+    public void setCloudCluster() {
+        if (this.cloudCluster.isEmpty()) {
+            this.cloudCluster = ((CloudSystemInfoService) 
Env.getCurrentSystemInfo())
+                    .getClusterNameByClusterId(cloudClusterId);
         }
-
-        this.cloudClusterId = ((CloudSystemInfoService) 
Env.getCurrentSystemInfo())
-                .getCloudClusterIdByName(cloudClusterName);
-        if (Strings.isNullOrEmpty(this.cloudClusterId)) {
-            LOG.warn("cluster id is empty, cluster name {}", cloudClusterName);
-            throw new UserException("cluster id is empty, cluster name: " + 
cloudClusterName);
-        }
-    }
-
-    public String getCloudClusterId() {
-        return cloudClusterId;
     }
 
-    public void setCloudClusterById() {
-        this.cloudCluster = ((CloudSystemInfoService) 
Env.getCurrentSystemInfo())
-                        .getClusterNameByClusterId(cloudClusterId);
+    public void setCloudCluster(String cloudCluster) {
+        this.cloudCluster = cloudCluster;
     }
 
     // check the correctness of commit info


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to