This is an automated email from the ASF dual-hosted git repository.
dataroaring pushed a commit to branch branch-3.0
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-3.0 by this push:
new 65a65e7a332 branch-3.0: [fix](job) use cluster name rather than
cluster id to find available be (#52911) (#55563)
65a65e7a332 is described below
commit 65a65e7a3327b3a770f6bf9a7b126552b2f66464
Author: hui lai <[email protected]>
AuthorDate: Tue Sep 2 21:19:20 2025 +0800
branch-3.0: [fix](job) use cluster name rather than cluster id to find
available be (#52911) (#55563)
pick #52911
In multi cluster scenario, users perceive the cluster name, so it should
be used cluster name rather than cluster id to find available be node.
---
.../doris/cloud/load/CloudRoutineLoadManager.java | 12 ++---
.../doris/load/routineload/RoutineLoadJob.java | 52 ++++++++++------------
2 files changed, 27 insertions(+), 37 deletions(-)
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/cloud/load/CloudRoutineLoadManager.java
b/fe/fe-core/src/main/java/org/apache/doris/cloud/load/CloudRoutineLoadManager.java
index eff1c345e5a..399b33e32d4 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/cloud/load/CloudRoutineLoadManager.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/cloud/load/CloudRoutineLoadManager.java
@@ -51,14 +51,8 @@ public class CloudRoutineLoadManager extends
RoutineLoadManager {
@Override
protected List<Long> getAvailableBackendIds(long jobId) throws
LoadException {
RoutineLoadJob routineLoadJob = getJob(jobId);
- String cloudClusterId = routineLoadJob.getCloudClusterId();
- if (Strings.isNullOrEmpty(cloudClusterId)) {
- LOG.warn("cluster id is empty");
- throw new LoadException("cluster id is empty");
- }
-
return ((CloudSystemInfoService) Env.getCurrentSystemInfo())
- .getBackendsByClusterId(cloudClusterId)
+ .getBackendsByClusterName(routineLoadJob.getCloudCluster())
.stream()
.filter(Backend::isAlive)
.map(Backend::getId)
@@ -67,13 +61,13 @@ public class CloudRoutineLoadManager extends
RoutineLoadManager {
@Override
public void replayCreateRoutineLoadJob(RoutineLoadJob routineLoadJob) {
- routineLoadJob.setCloudClusterById();
+ routineLoadJob.setCloudCluster();
super.replayCreateRoutineLoadJob(routineLoadJob);
}
@Override
public void replayChangeRoutineLoadJob(RoutineLoadOperation operation) {
- getJob(operation.getId()).setCloudClusterById();
+ getJob(operation.getId()).setCloudCluster();
super.replayChangeRoutineLoadJob(operation);
}
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadJob.java
b/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadJob.java
index 9c83bb6e9f9..46460e1b82a 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadJob.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadJob.java
@@ -292,6 +292,8 @@ public abstract class RoutineLoadJob
protected byte escape = 0;
// use for cloud cluster mode
+ protected String qualifiedUser;
+ @SerializedName("ccn")
protected String cloudCluster;
public void setTypeRead(boolean isTypeRead) {
@@ -303,6 +305,13 @@ public abstract class RoutineLoadJob
this.dataSourceType = type;
if (ConnectContext.get() != null) {
this.memtableOnSinkNode =
ConnectContext.get().getSessionVariable().enableMemtableOnSinkNode;
+ if (Config.isCloudMode()) {
+ try {
+ this.cloudCluster = ConnectContext.get().getCloudCluster();
+ } catch (ComputeGroupException e) {
+ LOG.warn("failed to get cloud cluster", e);
+ }
+ }
}
}
@@ -320,6 +329,13 @@ public abstract class RoutineLoadJob
SessionVariable var = ConnectContext.get().getSessionVariable();
sessionVariables.put(SessionVariable.SQL_MODE,
Long.toString(var.getSqlMode()));
this.memtableOnSinkNode =
ConnectContext.get().getSessionVariable().enableMemtableOnSinkNode;
+ if (Config.isCloudMode()) {
+ try {
+ this.cloudCluster = ConnectContext.get().getCloudCluster();
+ } catch (ComputeGroupException e) {
+ LOG.warn("failed to get cloud cluster", e);
+ }
+ }
} else {
sessionVariables.put(SessionVariable.SQL_MODE,
String.valueOf(SqlModeHelper.MODE_DEFAULT));
}
@@ -1010,21 +1026,13 @@ public abstract class RoutineLoadJob
table.readLock();
try {
if (Config.isCloudMode()) {
- String clusterName = ((CloudSystemInfoService)
Env.getCurrentSystemInfo())
- .getClusterNameByClusterId(cloudClusterId);
- if (Strings.isNullOrEmpty(clusterName)) {
- String err = String.format("cluster name is empty, cluster
id is %s", cloudClusterId);
- LOG.warn(err);
- throw new UserException(err);
- }
-
if (ConnectContext.get() == null) {
ConnectContext ctx = new ConnectContext();
ctx.setThreadLocalInfo();
- ctx.setCloudCluster(clusterName);
+ ctx.setCloudCluster(cloudCluster);
needCleanCtx = true;
} else {
- ConnectContext.get().setCloudCluster(clusterName);
+ ConnectContext.get().setCloudCluster(cloudCluster);
}
}
@@ -1599,27 +1607,15 @@ public abstract class RoutineLoadJob
this.origStmt = origStmt;
}
- public void setCloudCluster(String cloudClusterName) throws UserException {
- if (Strings.isNullOrEmpty(cloudClusterName)) {
- LOG.warn("cluster name is empty");
- throw new UserException("cluster name is empty");
+ public void setCloudCluster() {
+ if (Strings.isNullOrEmpty(cloudCluster)) {
+ this.cloudCluster = ((CloudSystemInfoService)
Env.getCurrentSystemInfo())
+ .getClusterNameByClusterId(cloudClusterId);
}
-
- this.cloudClusterId = ((CloudSystemInfoService)
Env.getCurrentSystemInfo())
- .getCloudClusterIdByName(cloudClusterName);
- if (Strings.isNullOrEmpty(this.cloudClusterId)) {
- LOG.warn("cluster id is empty, cluster name {}", cloudClusterName);
- throw new UserException("cluster id is empty, cluster name: " +
cloudClusterName);
- }
- }
-
- public String getCloudClusterId() {
- return cloudClusterId;
}
- public void setCloudClusterById() {
- this.cloudCluster = ((CloudSystemInfoService)
Env.getCurrentSystemInfo())
- .getClusterNameByClusterId(cloudClusterId);
+ public void setCloudCluster(String cloudCluster) {
+ this.cloudCluster = cloudCluster;
}
// check the correctness of commit info
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]