This is an automated email from the ASF dual-hosted git repository.
dataroaring pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new d3f47245ee6 [fix](job) use cluster name rather than cluster id to find
available be (#52911)
d3f47245ee6 is described below
commit d3f47245ee6852168e0749fdfce57182a10696a1
Author: hui lai <[email protected]>
AuthorDate: Fri Jul 11 11:43:49 2025 +0800
[fix](job) use cluster name rather than cluster id to find available be
(#52911)
### What problem does this PR solve?
In multi cluster scenario, users perceive the cluster name, so it should
be used cluster name rather than cluster id to find available be node.
---
.../doris/cloud/load/CloudRoutineLoadManager.java | 12 ++---
.../doris/load/routineload/KafkaTaskInfo.java | 12 +----
.../doris/load/routineload/RoutineLoadJob.java | 51 ++++++++++------------
3 files changed, 27 insertions(+), 48 deletions(-)
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/cloud/load/CloudRoutineLoadManager.java
b/fe/fe-core/src/main/java/org/apache/doris/cloud/load/CloudRoutineLoadManager.java
index eff1c345e5a..399b33e32d4 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/cloud/load/CloudRoutineLoadManager.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/cloud/load/CloudRoutineLoadManager.java
@@ -51,14 +51,8 @@ public class CloudRoutineLoadManager extends
RoutineLoadManager {
@Override
protected List<Long> getAvailableBackendIds(long jobId) throws
LoadException {
RoutineLoadJob routineLoadJob = getJob(jobId);
- String cloudClusterId = routineLoadJob.getCloudClusterId();
- if (Strings.isNullOrEmpty(cloudClusterId)) {
- LOG.warn("cluster id is empty");
- throw new LoadException("cluster id is empty");
- }
-
return ((CloudSystemInfoService) Env.getCurrentSystemInfo())
- .getBackendsByClusterId(cloudClusterId)
+ .getBackendsByClusterName(routineLoadJob.getCloudCluster())
.stream()
.filter(Backend::isAlive)
.map(Backend::getId)
@@ -67,13 +61,13 @@ public class CloudRoutineLoadManager extends
RoutineLoadManager {
@Override
public void replayCreateRoutineLoadJob(RoutineLoadJob routineLoadJob) {
- routineLoadJob.setCloudClusterById();
+ routineLoadJob.setCloudCluster();
super.replayCreateRoutineLoadJob(routineLoadJob);
}
@Override
public void replayChangeRoutineLoadJob(RoutineLoadOperation operation) {
- getJob(operation.getId()).setCloudClusterById();
+ getJob(operation.getId()).setCloudCluster();
super.replayChangeRoutineLoadJob(operation);
}
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/load/routineload/KafkaTaskInfo.java
b/fe/fe-core/src/main/java/org/apache/doris/load/routineload/KafkaTaskInfo.java
index 110f45a1ad2..b869550406e 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/load/routineload/KafkaTaskInfo.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/load/routineload/KafkaTaskInfo.java
@@ -21,7 +21,6 @@ import org.apache.doris.catalog.Database;
import org.apache.doris.catalog.Env;
import org.apache.doris.catalog.OlapTable;
import org.apache.doris.catalog.Table;
-import org.apache.doris.cloud.system.CloudSystemInfoService;
import org.apache.doris.common.Config;
import org.apache.doris.common.UserException;
import org.apache.doris.common.util.DebugUtil;
@@ -37,7 +36,6 @@ import org.apache.doris.thrift.TRoutineLoadTask;
import org.apache.doris.thrift.TUniqueId;
import com.google.common.base.Joiner;
-import com.google.common.base.Strings;
import com.google.gson.Gson;
import org.apache.commons.lang3.StringUtils;
import org.apache.logging.log4j.LogManager;
@@ -151,15 +149,7 @@ public class KafkaTaskInfo extends RoutineLoadTaskInfo {
ConnectContext tmpContext = new ConnectContext();
if (Config.isCloudMode()) {
- String clusterName = ((CloudSystemInfoService)
Env.getCurrentSystemInfo())
-
.getClusterNameByClusterId(routineLoadJob.getCloudClusterId());
- if (Strings.isNullOrEmpty(clusterName)) {
- LOG.warn("cluster name is empty, cluster id is {}, job
id is {}",
- routineLoadJob.getCloudClusterId(),
routineLoadJob.getTxnId());
- throw new UserException(String.format("cluster name is
empty, cluster id is %s",
- routineLoadJob.getCloudClusterId()));
- }
- tmpContext.setCloudCluster(clusterName);
+
tmpContext.setCloudCluster(routineLoadJob.getCloudCluster());
}
tmpContext.setCurrentUserIdentity(routineLoadJob.getUserIdentity());
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadJob.java
b/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadJob.java
index cd1808e4b5b..eb51067de9f 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadJob.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadJob.java
@@ -282,6 +282,7 @@ public abstract class RoutineLoadJob
// use for cloud cluster mode
protected String qualifiedUser;
+ @SerializedName("ccn")
protected String cloudCluster;
public RoutineLoadJob(long id, LoadDataSourceType type) {
@@ -289,6 +290,13 @@ public abstract class RoutineLoadJob
this.dataSourceType = type;
if (ConnectContext.get() != null) {
this.memtableOnSinkNode =
ConnectContext.get().getSessionVariable().enableMemtableOnSinkNode;
+ if (Config.isCloudMode()) {
+ try {
+ this.cloudCluster = ConnectContext.get().getCloudCluster();
+ } catch (ComputeGroupException e) {
+ LOG.warn("failed to get cloud cluster", e);
+ }
+ }
}
}
@@ -306,6 +314,13 @@ public abstract class RoutineLoadJob
SessionVariable var = ConnectContext.get().getSessionVariable();
sessionVariables.put(SessionVariable.SQL_MODE,
Long.toString(var.getSqlMode()));
this.memtableOnSinkNode =
ConnectContext.get().getSessionVariable().enableMemtableOnSinkNode;
+ if (Config.isCloudMode()) {
+ try {
+ this.cloudCluster = ConnectContext.get().getCloudCluster();
+ } catch (ComputeGroupException e) {
+ LOG.warn("failed to get cloud cluster", e);
+ }
+ }
} else {
sessionVariables.put(SessionVariable.SQL_MODE,
String.valueOf(SqlModeHelper.MODE_DEFAULT));
}
@@ -1010,21 +1025,13 @@ public abstract class RoutineLoadJob
table.readLock();
try {
if (Config.isCloudMode()) {
- String clusterName = ((CloudSystemInfoService)
Env.getCurrentSystemInfo())
- .getClusterNameByClusterId(cloudClusterId);
- if (Strings.isNullOrEmpty(clusterName)) {
- String err = String.format("cluster name is empty, cluster
id is %s", cloudClusterId);
- LOG.warn(err);
- throw new UserException(err);
- }
-
if (ConnectContext.get() == null) {
ConnectContext ctx = new ConnectContext();
ctx.setThreadLocalInfo();
- ctx.setCloudCluster(clusterName);
+ ctx.setCloudCluster(cloudCluster);
needCleanCtx = true;
} else {
- ConnectContext.get().setCloudCluster(clusterName);
+ ConnectContext.get().setCloudCluster(cloudCluster);
}
ConnectContext.get().setCurrentUserIdentity(this.getUserIdentity());
} else {
@@ -1609,27 +1616,15 @@ public abstract class RoutineLoadJob
this.origStmt = origStmt;
}
- public void setCloudCluster(String cloudClusterName) throws UserException {
- if (Strings.isNullOrEmpty(cloudClusterName)) {
- LOG.warn("cluster name is empty");
- throw new UserException("cluster name is empty");
+ public void setCloudCluster() {
+ if (this.cloudCluster.isEmpty()) {
+ this.cloudCluster = ((CloudSystemInfoService)
Env.getCurrentSystemInfo())
+ .getClusterNameByClusterId(cloudClusterId);
}
-
- this.cloudClusterId = ((CloudSystemInfoService)
Env.getCurrentSystemInfo())
- .getCloudClusterIdByName(cloudClusterName);
- if (Strings.isNullOrEmpty(this.cloudClusterId)) {
- LOG.warn("cluster id is empty, cluster name {}", cloudClusterName);
- throw new UserException("cluster id is empty, cluster name: " +
cloudClusterName);
- }
- }
-
- public String getCloudClusterId() {
- return cloudClusterId;
}
- public void setCloudClusterById() {
- this.cloudCluster = ((CloudSystemInfoService)
Env.getCurrentSystemInfo())
- .getClusterNameByClusterId(cloudClusterId);
+ public void setCloudCluster(String cloudCluster) {
+ this.cloudCluster = cloudCluster;
}
// check the correctness of commit info
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]