This is an automated email from the ASF dual-hosted git repository.
morrysnow pushed a commit to branch branch-3.1
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-3.1 by this push:
new 01a54a0e838 branch-3.1: [feature](stream-load) support stream load
endpoint redirect policy #53104 (#53208)
01a54a0e838 is described below
commit 01a54a0e838ec5b7522f22406562522d8e62417e
Author: github-actions[bot]
<41898282+github-actions[bot]@users.noreply.github.com>
AuthorDate: Tue Jul 15 11:49:49 2025 +0800
branch-3.1: [feature](stream-load) support stream load endpoint redirect
policy #53104 (#53208)
Cherry-picked from #53104
Co-authored-by: Xin Liao <[email protected]>
---
.../main/java/org/apache/doris/common/Config.java | 6 +-
.../doris/cloud/catalog/CloudClusterChecker.java | 45 +++--
.../doris/cloud/system/CloudSystemInfoService.java | 72 +++++++-
.../org/apache/doris/httpv2/rest/LoadAction.java | 66 +++++--
.../main/java/org/apache/doris/resource/Tag.java | 14 +-
.../main/java/org/apache/doris/system/Backend.java | 45 +++--
gensrc/proto/cloud.proto | 2 +
.../doris/regression/suite/SuiteCluster.groovy | 4 +
.../stream_load/test_stream_load_endpoint.groovy | 198 +++++++++++++++++++++
9 files changed, 399 insertions(+), 53 deletions(-)
diff --git a/fe/fe-common/src/main/java/org/apache/doris/common/Config.java
b/fe/fe-common/src/main/java/org/apache/doris/common/Config.java
index 5b0fe49de3d..2d3f8fefd85 100644
--- a/fe/fe-common/src/main/java/org/apache/doris/common/Config.java
+++ b/fe/fe-common/src/main/java/org/apache/doris/common/Config.java
@@ -3276,8 +3276,10 @@ public class Config extends ConfigBase {
@ConfField(mutable = true)
public static int audit_event_log_queue_size = 250000;
- @ConfField(description = {"存算分离模式下streamload导入使用的转发策略,
可选值为public-private或者空",
- "streamload route policy in cloud mode, availale options are
public-private and empty string"})
+ @ConfField(mutable = true, description = {
+ "streamload导入使用的转发策略,
可选值为public-private/public/private/direct/random-be/空",
+ "streamload route policy, available options are "
+ + "public-private/public/private/direct/random-be and empty
string" })
public static String streamload_redirect_policy = "";
@ConfField(description = {"存算分离模式下建表是否检查残留recycler key, 默认true",
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/cloud/catalog/CloudClusterChecker.java
b/fe/fe-core/src/main/java/org/apache/doris/cloud/catalog/CloudClusterChecker.java
index ca81b165cb9..4ac8dc0dc03 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/cloud/catalog/CloudClusterChecker.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/cloud/catalog/CloudClusterChecker.java
@@ -101,8 +101,8 @@ public class CloudClusterChecker extends MasterDaemon {
// Attach tag to BEs
String clusterName =
remoteClusterIdToPB.get(addId).getClusterName();
String clusterId =
remoteClusterIdToPB.get(addId).getClusterId();
- String publicEndpoint =
remoteClusterIdToPB.get(addId).getPublicEndpoint();
- String privateEndpoint =
remoteClusterIdToPB.get(addId).getPrivateEndpoint();
+ String clusterPublicEndpoint =
remoteClusterIdToPB.get(addId).getPublicEndpoint();
+ String clusterPrivateEndpoint =
remoteClusterIdToPB.get(addId).getPrivateEndpoint();
// For old versions that do no have status field set
ClusterStatus clusterStatus =
remoteClusterIdToPB.get(addId).hasClusterStatus()
? remoteClusterIdToPB.get(addId).getClusterStatus() :
ClusterStatus.NORMAL;
@@ -118,8 +118,14 @@ public class CloudClusterChecker extends MasterDaemon {
newTagMap.put(Tag.CLOUD_CLUSTER_STATUS,
String.valueOf(clusterStatus));
newTagMap.put(Tag.CLOUD_CLUSTER_NAME, clusterName);
newTagMap.put(Tag.CLOUD_CLUSTER_ID, clusterId);
- newTagMap.put(Tag.CLOUD_CLUSTER_PUBLIC_ENDPOINT,
publicEndpoint);
- newTagMap.put(Tag.CLOUD_CLUSTER_PRIVATE_ENDPOINT,
privateEndpoint);
+ String nodePublicEndpoint = node.hasPublicEndpoint() &&
!node.getPublicEndpoint().isEmpty()
+ ? node.getPublicEndpoint()
+ : clusterPublicEndpoint;
+ String nodePrivateEndpoint = node.hasPrivateEndpoint() &&
!node.getPrivateEndpoint().isEmpty()
+ ? node.getPrivateEndpoint()
+ : clusterPrivateEndpoint;
+ newTagMap.put(Tag.PUBLIC_ENDPOINT, nodePublicEndpoint);
+ newTagMap.put(Tag.PRIVATE_ENDPOINT, nodePrivateEndpoint);
newTagMap.put(Tag.CLOUD_UNIQUE_ID,
node.getCloudUniqueId());
b.setTagMap(newTagMap);
toAdd.add(b);
@@ -200,28 +206,32 @@ public class CloudClusterChecker extends MasterDaemon {
// edit log
Env.getCurrentEnv().getEditLog().logBackendStateChange(be);
}
- updateIfComputeNodeEndpointChanged(remoteClusterPb, be);
+ updateIfComputeNodeEndpointChanged(remoteClusterPb, node, be);
}
}
- private void updateIfComputeNodeEndpointChanged(ClusterPB remoteClusterPb,
Backend be) {
+ private void updateIfComputeNodeEndpointChanged(ClusterPB remoteClusterPb,
Cloud.NodeInfoPB node, Backend be) {
// check PublicEndpoint、PrivateEndpoint is changed?
boolean netChanged = false;
- String remotePublicEndpoint = remoteClusterPb.getPublicEndpoint();
- String localPublicEndpoint =
be.getTagMap().get(Tag.CLOUD_CLUSTER_PUBLIC_ENDPOINT);
+ String remotePublicEndpoint = node.hasPublicEndpoint() &&
!node.getPublicEndpoint().isEmpty()
+ ? node.getPublicEndpoint()
+ : remoteClusterPb.getPublicEndpoint();
+ String localPublicEndpoint = be.getTagMap().get(Tag.PUBLIC_ENDPOINT);
if (!localPublicEndpoint.equals(remotePublicEndpoint)) {
LOG.info("be {} has changed public_endpoint from {} to {}",
be, localPublicEndpoint, remotePublicEndpoint);
- be.getTagMap().put(Tag.CLOUD_CLUSTER_PUBLIC_ENDPOINT,
remotePublicEndpoint);
+ be.getTagMap().put(Tag.PUBLIC_ENDPOINT, remotePublicEndpoint);
netChanged = true;
}
- String remotePrivateEndpoint = remoteClusterPb.getPrivateEndpoint();
- String localPrivateEndpoint =
be.getTagMap().get(Tag.CLOUD_CLUSTER_PRIVATE_ENDPOINT);
+ String remotePrivateEndpoint = node.hasPrivateEndpoint() &&
!node.getPrivateEndpoint().isEmpty()
+ ? node.getPrivateEndpoint()
+ : remoteClusterPb.getPrivateEndpoint();
+ String localPrivateEndpoint = be.getTagMap().get(Tag.PRIVATE_ENDPOINT);
if (!localPrivateEndpoint.equals(remotePrivateEndpoint)) {
LOG.info("be {} has changed private_endpoint from {} to {}",
be, localPrivateEndpoint, remotePrivateEndpoint);
- be.getTagMap().put(Tag.CLOUD_CLUSTER_PRIVATE_ENDPOINT,
remotePrivateEndpoint);
+ be.getTagMap().put(Tag.PRIVATE_ENDPOINT, remotePrivateEndpoint);
netChanged = true;
}
if (netChanged) {
@@ -322,6 +332,12 @@ public class CloudClusterChecker extends MasterDaemon {
LOG.warn("cant get valid add from ms {}", node);
continue;
}
+ String publicEndpoint = node.hasPublicEndpoint() &&
!node.getPublicEndpoint().isEmpty()
+ ? node.getPublicEndpoint()
+ : remoteClusterIdToPB.get(cid).getPublicEndpoint();
+ String privateEndpoint = node.hasPrivateEndpoint() &&
!node.getPrivateEndpoint().isEmpty()
+ ? node.getPrivateEndpoint()
+ :
remoteClusterIdToPB.get(cid).getPrivateEndpoint();
String endpoint = host + ":" + node.getHeartbeatPort();
Backend b = new Backend(Env.getCurrentEnv().getNextId(),
host, node.getHeartbeatPort());
if (node.hasIsSmoothUpgrade()) {
@@ -332,9 +348,8 @@ public class CloudClusterChecker extends MasterDaemon {
Map<String, String> newTagMap =
Tag.DEFAULT_BACKEND_TAG.toMap();
newTagMap.put(Tag.CLOUD_CLUSTER_NAME,
remoteClusterIdToPB.get(cid).getClusterName());
newTagMap.put(Tag.CLOUD_CLUSTER_ID,
remoteClusterIdToPB.get(cid).getClusterId());
- newTagMap.put(Tag.CLOUD_CLUSTER_PUBLIC_ENDPOINT,
remoteClusterIdToPB.get(cid).getPublicEndpoint());
- newTagMap.put(Tag.CLOUD_CLUSTER_PRIVATE_ENDPOINT,
- remoteClusterIdToPB.get(cid).getPrivateEndpoint());
+ newTagMap.put(Tag.PUBLIC_ENDPOINT, publicEndpoint);
+ newTagMap.put(Tag.PRIVATE_ENDPOINT, privateEndpoint);
newTagMap.put(Tag.CLOUD_UNIQUE_ID,
node.getCloudUniqueId());
b.setTagMap(newTagMap);
nodeMap.put(endpoint, b);
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/cloud/system/CloudSystemInfoService.java
b/fe/fe-core/src/main/java/org/apache/doris/cloud/system/CloudSystemInfoService.java
index 61dbcb4d053..81765e18029 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/cloud/system/CloudSystemInfoService.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/cloud/system/CloudSystemInfoService.java
@@ -578,7 +578,12 @@ public class CloudSystemInfoService extends
SystemInfoService {
@Override
public void addBackends(List<HostInfo> hostInfos, Map<String, String>
tagMap) throws UserException {
// issue rpc to meta to add this node, then fe master would add this
node to its backends
-
+ if (Strings.isNullOrEmpty(((CloudEnv)
Env.getCurrentEnv()).getCloudInstanceId())) {
+ throw new DdlException("unable to add backends due to empty
cloud_instance_id");
+ }
+ if (hostInfos.isEmpty()) {
+ return;
+ }
String clusterName = tagMap.getOrDefault(Tag.COMPUTE_GROUP_NAME,
Tag.VALUE_DEFAULT_COMPUTE_GROUP_NAME);
if (clusterName.isEmpty()) {
throw new UserException("ComputeGroup'name can not be empty");
@@ -590,7 +595,48 @@ public class CloudSystemInfoService extends
SystemInfoService {
: String.valueOf(Config.cluster_id);
String cloudUniqueId = "1:" + instanceId + ":" +
RandomIdentifierGenerator.generateRandomIdentifier(8);
- alterBackendCluster(hostInfos, computeGroupId, cloudUniqueId,
Cloud.AlterClusterRequest.Operation.ADD_NODE);
+
+ String publicEndpoint = tagMap.getOrDefault(Tag.PUBLIC_ENDPOINT, "");
+ String privateEndpoint = tagMap.getOrDefault(Tag.PRIVATE_ENDPOINT, "");
+
+ Cloud.ClusterPB clusterPB = Cloud.ClusterPB.newBuilder()
+ .setClusterId(computeGroupId)
+ .setType(Cloud.ClusterPB.Type.COMPUTE)
+ .build();
+
+ for (HostInfo hostInfo : hostInfos) {
+ Cloud.NodeInfoPB nodeInfoPB = Cloud.NodeInfoPB.newBuilder()
+ .setCloudUniqueId(cloudUniqueId)
+ .setIp(hostInfo.getHost())
+ .setHost(hostInfo.getHost())
+ .setHeartbeatPort(hostInfo.getPort())
+ .setCtime(System.currentTimeMillis() / 1000)
+ .setPublicEndpoint(publicEndpoint)
+ .setPrivateEndpoint(privateEndpoint)
+ .build();
+ clusterPB = clusterPB.toBuilder().addNodes(nodeInfoPB).build();
+ LOG.info("adding backend node: host={}, port={},
publicEndpoint={}, privateEndpoint={}",
+ hostInfo.getHost(), hostInfo.getPort(), publicEndpoint,
privateEndpoint);
+ }
+
+ Cloud.AlterClusterRequest request =
Cloud.AlterClusterRequest.newBuilder()
+ .setInstanceId(((CloudEnv)
Env.getCurrentEnv()).getCloudInstanceId())
+ .setOp(Cloud.AlterClusterRequest.Operation.ADD_NODE)
+ .setCluster(clusterPB)
+ .build();
+
+ Cloud.AlterClusterResponse response;
+ try {
+ response = MetaServiceProxy.getInstance().alterCluster(request);
+ LOG.info("add backends, request: {}, response: {}", request,
response);
+ if (response.getStatus().getCode() != Cloud.MetaServiceCode.OK) {
+ LOG.warn("add backends not ok, response: {}", response);
+ throw new DdlException("failed to add backends errorCode: " +
response.getStatus().getCode()
+ + " msg: " + response.getStatus().getMsg());
+ }
+ } catch (RpcException e) {
+ throw new DdlException("failed to add backends", e);
+ }
}
// final entry of dropping backend
@@ -1040,8 +1086,8 @@ public class CloudSystemInfoService extends
SystemInfoService {
String clusterNameMeta = cpb.getClusterName();
Cloud.ClusterStatus clusterStatus = cpb.hasClusterStatus()
? cpb.getClusterStatus() : Cloud.ClusterStatus.NORMAL;
- String publicEndpoint = cpb.getPublicEndpoint();
- String privateEndpoint = cpb.getPrivateEndpoint();
+ String clusterPublicEndpoint = cpb.getPublicEndpoint();
+ String clusterPrivateEndpoint = cpb.getPrivateEndpoint();
// Prepare backends
List<Backend> backends = new ArrayList<>();
for (Cloud.NodeInfoPB node : cpb.getNodesList()) {
@@ -1050,14 +1096,24 @@ public class CloudSystemInfoService extends
SystemInfoService {
newTagMap.put(Tag.CLOUD_CLUSTER_NAME, clusterNameMeta);
newTagMap.put(Tag.CLOUD_CLUSTER_ID, clusterId);
newTagMap.put(Tag.CLOUD_CLUSTER_STATUS,
String.valueOf(clusterStatus));
- newTagMap.put(Tag.CLOUD_CLUSTER_PUBLIC_ENDPOINT, publicEndpoint);
- newTagMap.put(Tag.CLOUD_CLUSTER_PRIVATE_ENDPOINT, privateEndpoint);
+ // Prioritize node-level endpoint configuration, use cluster-level
endpoint if
+ // node endpoint is empty
+ String nodePublicEndpoint = node.hasPublicEndpoint() &&
!node.getPublicEndpoint().isEmpty()
+ ? node.getPublicEndpoint()
+ : clusterPublicEndpoint;
+ String nodePrivateEndpoint = node.hasPrivateEndpoint() &&
!node.getPrivateEndpoint().isEmpty()
+ ? node.getPrivateEndpoint()
+ : clusterPrivateEndpoint;
+
+ newTagMap.put(Tag.PUBLIC_ENDPOINT, nodePublicEndpoint);
+ newTagMap.put(Tag.PRIVATE_ENDPOINT, nodePrivateEndpoint);
newTagMap.put(Tag.CLOUD_UNIQUE_ID, node.getCloudUniqueId());
Backend b = new Backend(Env.getCurrentEnv().getNextId(),
node.getIp(), node.getHeartbeatPort());
b.setTagMap(newTagMap);
backends.add(b);
- LOG.info("new backend to add, clusterName={} clusterId={}
backend={}",
- clusterNameMeta, clusterId, b.toString());
+ LOG.info(
+ "new backend to add, clusterName={} clusterId={}
backend={}, publicEndpoint={}, privateEndpoint={}",
+ clusterNameMeta, clusterId, b.toString(),
nodePublicEndpoint, nodePrivateEndpoint);
}
updateCloudBackends(backends, new ArrayList<>());
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/httpv2/rest/LoadAction.java
b/fe/fe-core/src/main/java/org/apache/doris/httpv2/rest/LoadAction.java
index 907522946d9..3511d01d210 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/httpv2/rest/LoadAction.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/httpv2/rest/LoadAction.java
@@ -79,6 +79,9 @@ public class LoadAction extends RestBaseController {
public static final String REDIRECT_POLICY_PUBLIC_PRIVATE =
"public-private";
public static final String REDIRECT_POLICY_RANDOM_BE = "random-be";
+ public static final String REDIRECT_POLICY_DIRECT = "direct";
+ public static final String REDIRECT_POLICY_PUBLIC = "public";
+ public static final String REDIRECT_POLICY_PRIVATE = "private";
private ExecuteEnv execEnv = ExecuteEnv.getInstance();
@@ -435,7 +438,7 @@ public class LoadAction extends RestBaseController {
if (backend == null) {
throw new
LoadException(SystemInfoService.NO_BACKEND_LOAD_AVAILABLE_MSG + ", policy: " +
policy);
}
- return new TNetworkAddress(backend.getHost(), backend.getHttpPort());
+ return selectEndpointByRedirectPolicy(request, backend);
}
private TNetworkAddress selectCloudRedirectBackend(String clusterName,
HttpServletRequest req, boolean groupCommit,
@@ -447,33 +450,50 @@ public class LoadAction extends RestBaseController {
} else {
backend = StreamLoadHandler.selectBackend(clusterName);
}
+ return selectEndpointByRedirectPolicy(req, backend);
+ }
- String redirectPolicy =
req.getHeader(LoadAction.HEADER_REDIRECT_POLICY);
- // User specified redirect policy
- if (redirectPolicy != null &&
redirectPolicy.equalsIgnoreCase(REDIRECT_POLICY_RANDOM_BE)) {
- return new TNetworkAddress(backend.getHost(),
backend.getHttpPort());
- }
- redirectPolicy = redirectPolicy == null || redirectPolicy.isEmpty()
- ? Config.streamload_redirect_policy : redirectPolicy;
-
+ /**
+ * Selects the endpoint address based on the redirect policy specified in
the request header.
+ * The available redirect policies are:
+ * - DIRECT: Redirects to the backend's host.
+ * - PUBLIC: Redirects to the public endpoint of the backend.
+ * - PRIVATE: Redirects to the private endpoint of the backend.
+ * - PUBLIC_PRIVATE: Redirects based on the host IP or domain. If the
host is a site-local
+ * address, redirects to the private endpoint. Otherwise, redirects to
the public endpoint.
+ * - DEFAULT: If request host equals to backend's public endpoint,
redirects to the public endpoint.
+ * If private endpoint of backend is set, redirects to the private
endpoint. Otherwise, redirects
+ * to the backend's host.
+ *
+ * @param req The HTTP request object.
+ * @param backend The backend to redirect to.
+ * @return The selected endpoint address.
+ * @throws LoadException If there is an error in the redirect policy or
endpoint selection.
+ */
+ private TNetworkAddress selectEndpointByRedirectPolicy(HttpServletRequest
req, Backend backend)
+ throws LoadException {
Pair<String, Integer> publicHostPort = null;
Pair<String, Integer> privateHostPort = null;
try {
- if (!Strings.isNullOrEmpty(backend.getCloudPublicEndpoint())) {
- publicHostPort =
splitHostAndPort(backend.getCloudPublicEndpoint());
+ if (!Strings.isNullOrEmpty(backend.getPublicEndpoint())) {
+ publicHostPort = splitHostAndPort(backend.getPublicEndpoint());
}
} catch (AnalysisException e) {
throw new LoadException(e.getMessage());
}
try {
- if (!Strings.isNullOrEmpty(backend.getCloudPrivateEndpoint())) {
- privateHostPort =
splitHostAndPort(backend.getCloudPrivateEndpoint());
+ if (!Strings.isNullOrEmpty(backend.getPrivateEndpoint())) {
+ privateHostPort =
splitHostAndPort(backend.getPrivateEndpoint());
}
} catch (AnalysisException e) {
throw new LoadException(e.getMessage());
}
+ String redirectPolicy =
req.getHeader(LoadAction.HEADER_REDIRECT_POLICY);
+ redirectPolicy = redirectPolicy == null || redirectPolicy.isEmpty()
+ ? Config.streamload_redirect_policy : redirectPolicy;
+
String reqHostStr = req.getHeader(HttpHeaderNames.HOST.toString());
reqHostStr = reqHostStr.replaceAll("\\s+", "");
if (reqHostStr.isEmpty()) {
@@ -492,7 +512,21 @@ public class LoadAction extends RestBaseController {
throw new LoadException("Invalid header host: " + reqHost);
}
- if (redirectPolicy != null &&
redirectPolicy.equalsIgnoreCase(REDIRECT_POLICY_PUBLIC_PRIVATE)) {
+ // User specified redirect policy
+ if (redirectPolicy != null &&
(redirectPolicy.equalsIgnoreCase(REDIRECT_POLICY_DIRECT)
+ ||
redirectPolicy.equalsIgnoreCase(REDIRECT_POLICY_RANDOM_BE))) {
+ return new TNetworkAddress(backend.getHost(),
backend.getHttpPort());
+ } else if (redirectPolicy != null &&
redirectPolicy.equalsIgnoreCase(REDIRECT_POLICY_PUBLIC)) {
+ if (publicHostPort != null) {
+ return new TNetworkAddress(publicHostPort.first,
publicHostPort.second);
+ }
+ throw new LoadException("public endpoint is null, please check be
public endpoint config");
+ } else if (redirectPolicy != null &&
redirectPolicy.equalsIgnoreCase(REDIRECT_POLICY_PRIVATE)) {
+ if (privateHostPort != null) {
+ return new TNetworkAddress(privateHostPort.first,
privateHostPort.second);
+ }
+ throw new LoadException("private endpoint is null, please check be
private endpoint config");
+ } else if (redirectPolicy != null &&
redirectPolicy.equalsIgnoreCase(REDIRECT_POLICY_PUBLIC_PRIVATE)) {
// redirect with ip
if (InetAddressValidator.getInstance().isValid(reqHost)) {
InetAddress addr;
@@ -525,10 +559,10 @@ public class LoadAction extends RestBaseController {
}
} else {
if (InetAddressValidator.getInstance().isValid(reqHost)
- && publicHostPort != null && reqHost ==
publicHostPort.first) {
+ && publicHostPort != null &&
reqHost.equalsIgnoreCase(publicHostPort.first)) {
return new TNetworkAddress(publicHostPort.first,
publicHostPort.second);
} else if (privateHostPort != null) {
- return new TNetworkAddress(reqHost, privateHostPort.second);
+ return new TNetworkAddress(privateHostPort.first,
privateHostPort.second);
} else {
return new TNetworkAddress(backend.getHost(),
backend.getHttpPort());
}
diff --git a/fe/fe-core/src/main/java/org/apache/doris/resource/Tag.java
b/fe/fe-core/src/main/java/org/apache/doris/resource/Tag.java
index 7d6a18829cf..17b7db694ef 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/resource/Tag.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/resource/Tag.java
@@ -69,9 +69,12 @@ public class Tag implements Writable {
public static final String CLOUD_CLUSTER_NAME = "cloud_cluster_name";
public static final String CLOUD_CLUSTER_ID = "cloud_cluster_id";
public static final String CLOUD_UNIQUE_ID = "cloud_unique_id";
+ public static final String CLOUD_CLUSTER_STATUS = "cloud_cluster_status";
public static final String CLOUD_CLUSTER_PUBLIC_ENDPOINT =
"cloud_cluster_public_endpoint";
public static final String CLOUD_CLUSTER_PRIVATE_ENDPOINT =
"cloud_cluster_private_endpoint";
- public static final String CLOUD_CLUSTER_STATUS = "cloud_cluster_status";
+
+ public static final String PUBLIC_ENDPOINT = "public_endpoint";
+ public static final String PRIVATE_ENDPOINT = "private_endpoint";
public static final String COMPUTE_GROUP_NAME = "compute_group_name";
@@ -86,6 +89,7 @@ public class Tag implements Writable {
VALUE_MIX, VALUE_DEFAULT_CLUSTER);
private static final String TAG_TYPE_REGEX = "^[a-z][a-z0-9_]{0,32}$";
private static final String TAG_VALUE_REGEX =
"^[a-zA-Z][a-zA-Z0-9_]{0,32}$";
+ private static final String ENDPOINT_REGEX = "^[a-zA-Z0-9.-]+(:[0-9]+)?$";
public static final Tag DEFAULT_BACKEND_TAG;
@@ -112,7 +116,13 @@ public class Tag implements Writable {
if (!type.matches(TAG_TYPE_REGEX)) {
throw new AnalysisException("Invalid tag type format: " + type);
}
- if (!value.matches(TAG_VALUE_REGEX)) {
+
+ // if type is an endpoint type, value must be a valid endpoint
+ if (type.equalsIgnoreCase(PUBLIC_ENDPOINT) ||
type.equalsIgnoreCase(PRIVATE_ENDPOINT)) {
+ if (!value.matches(ENDPOINT_REGEX)) {
+ throw new AnalysisException("Invalid " + type + " value
format: " + value);
+ }
+ } else if (!value.matches(TAG_VALUE_REGEX)) {
throw new AnalysisException("Invalid tag value format: " + value);
}
return new Tag(type, value);
diff --git a/fe/fe-core/src/main/java/org/apache/doris/system/Backend.java
b/fe/fe-core/src/main/java/org/apache/doris/system/Backend.java
index 4ef7fa58f6c..f0a7990c9bd 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/system/Backend.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/system/Backend.java
@@ -222,12 +222,23 @@ public class Backend implements Writable {
return tagMap.getOrDefault(Tag.CLOUD_UNIQUE_ID, "");
}
- public String getCloudPublicEndpoint() {
- return tagMap.getOrDefault(Tag.CLOUD_CLUSTER_PUBLIC_ENDPOINT, "");
+ // This modification changes
+ // CLOUD_CLUSTER_PUBLIC_ENDPOINT/CLOUD_CLUSTER_PRIVATE_ENDPOINT to
+ // PUBLIC_ENDPOINT/PRIVATE_ENDPOINT, but backend information
+ // has been persisted in the edit log. For upgrade compatibility, the tag
may
+ // not have public_endpoint/private_endpoint
+ // during initial upgrade, so we first try to get
+ // CLOUD_CLUSTER_PUBLIC_ENDPOINT/CLOUD_CLUSTER_PRIVATE_ENDPOINT, and later
it
+ // will be
+ // synchronized from meta service to the public_endpoint/private_endpoint
tag.
+ // CLOUD_CLUSTER_PUBLIC_ENDPOINT/CLOUD_CLUSTER_PRIVATE_ENDPOINT can be
+ // removed in future versions.
+ public String getPublicEndpoint() {
+ return tagMap.getOrDefault(Tag.PUBLIC_ENDPOINT,
tagMap.getOrDefault(Tag.CLOUD_CLUSTER_PUBLIC_ENDPOINT, ""));
}
- public String getCloudPrivateEndpoint() {
- return tagMap.getOrDefault(Tag.CLOUD_CLUSTER_PRIVATE_ENDPOINT, "");
+ public String getPrivateEndpoint() {
+ return tagMap.getOrDefault(Tag.PRIVATE_ENDPOINT,
tagMap.getOrDefault(Tag.CLOUD_CLUSTER_PRIVATE_ENDPOINT, ""));
}
public long getId() {
@@ -1028,12 +1039,9 @@ public class Backend implements Writable {
Map<String, String> displayTagMap = Maps.newHashMap();
displayTagMap.putAll(tagMap);
- if (displayTagMap.containsKey("cloud_cluster_public_endpoint")) {
- displayTagMap.put("public_endpoint",
displayTagMap.remove("cloud_cluster_public_endpoint"));
- }
- if (displayTagMap.containsKey("cloud_cluster_private_endpoint")) {
- displayTagMap.put("private_endpoint",
displayTagMap.remove("cloud_cluster_private_endpoint"));
- }
+ // Migrate old cloud endpoint tags to new endpoint tags for backward
compatibility
+ migrateEndpointTag(displayTagMap, "cloud_cluster_public_endpoint",
"public_endpoint");
+ migrateEndpointTag(displayTagMap, "cloud_cluster_private_endpoint",
"private_endpoint");
if (displayTagMap.containsKey("cloud_cluster_status")) {
displayTagMap.put("compute_group_status",
displayTagMap.remove("cloud_cluster_status"));
}
@@ -1087,4 +1095,21 @@ public class Backend implements Writable {
return !Collections.disjoint(wgTagSet, beTagSet);
}
+ /**
+ * Migrate endpoint tag from old name to new name for backward
compatibility.
+ * If new tag already exists, just remove the old tag.
+ * If new tag doesn't exist, rename old tag to new tag.
+ */
+ private void migrateEndpointTag(Map<String, String> tagMap, String
oldTagName, String newTagName) {
+ if (tagMap.containsKey(oldTagName)) {
+ if (tagMap.containsKey(newTagName)) {
+ // New tag exists, just remove the old one
+ tagMap.remove(oldTagName);
+ } else {
+ // New tag doesn't exist, migrate old tag to new tag
+ tagMap.put(newTagName, tagMap.remove(oldTagName));
+ }
+ }
+ }
+
}
diff --git a/gensrc/proto/cloud.proto b/gensrc/proto/cloud.proto
index 0669064c52e..2700f097687 100644
--- a/gensrc/proto/cloud.proto
+++ b/gensrc/proto/cloud.proto
@@ -178,6 +178,8 @@ message NodeInfoPB {
optional bool is_smooth_upgrade = 12;
// fqdn
optional string host = 13;
+ optional string public_endpoint = 14;
+ optional string private_endpoint = 15;
}
enum NodeStatusPB {
diff --git
a/regression-test/framework/src/main/groovy/org/apache/doris/regression/suite/SuiteCluster.groovy
b/regression-test/framework/src/main/groovy/org/apache/doris/regression/suite/SuiteCluster.groovy
index 3abbed2c399..952cba30a88 100644
---
a/regression-test/framework/src/main/groovy/org/apache/doris/regression/suite/SuiteCluster.groovy
+++
b/regression-test/framework/src/main/groovy/org/apache/doris/regression/suite/SuiteCluster.groovy
@@ -230,6 +230,10 @@ class Backend extends ServerNode {
return path + '/conf/be.conf'
}
+ String getHeartbeatPort() {
+ return heartbeatPort;
+ }
+
}
class MetaService extends ServerNode {
diff --git
a/regression-test/suites/load_p0/stream_load/test_stream_load_endpoint.groovy
b/regression-test/suites/load_p0/stream_load/test_stream_load_endpoint.groovy
new file mode 100644
index 00000000000..2675b3e5570
--- /dev/null
+++
b/regression-test/suites/load_p0/stream_load/test_stream_load_endpoint.groovy
@@ -0,0 +1,198 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+import org.apache.doris.regression.suite.ClusterOptions
+import org.apache.doris.regression.util.NodeType
+
+suite('test_stream_load_endpoint', 'docker') {
+ def getRedirectLocation = { feIp, fePort, redirectPolicy ->
+ def command = """ curl -v --max-redirs 0 --location-trusted -u root:
+ -H redirect-policy:$redirectPolicy
+ -T
${context.config.dataPath}/load_p0/stream_load/test_stream_load1.csv
+ http://${feIp}:${fePort}/api/test/tbl/_stream_load """
+ log.info("redirect command: ${command}")
+
+ def code = -1
+ def location = ""
+ try {
+ def process = command.execute()
+ code = process.waitFor()
+ // Parse Location from stderr since curl -v outputs headers to
stderr
+ def errorOutput = process.err.text
+ def locationLine = errorOutput.readLines().find {
it.trim().startsWith('< Location: ') }
+ if (locationLine) {
+ location = locationLine.trim().substring('< Location:
'.length())
+ }
+ log.info("curl output: ${process.text}")
+ log.info("curl error: ${errorOutput}")
+ } catch (Exception e) {
+ log.info("exception: ${e}".toString())
+ }
+ return location
+ }
+ // local mode
+ def options = new ClusterOptions()
+ options.feNum = 1
+ options.beNum = 3
+ options.cloudMode = false
+ options.beConfigs.add('enable_java_support=false')
+ options.feConfigs.add('enable_print_request_before_execution=true')
+ docker(options) {
+ // get fe and be info
+ def feIp = cluster.getMasterFe().getHttpAddress()[0]
+ def fePort = cluster.getMasterFe().getHttpAddress()[1]
+
+ def be1Ip = cluster.getBackends().get(0).getHttpAddress()[0]
+ def be1HttpPort = cluster.getBackends().get(0).getHttpAddress()[1]
+ def be1HeartbeatPort = cluster.getBackends().get(0).getHeartbeatPort()
+
+ def be2Ip = cluster.getBackends().get(1).getHttpAddress()[0]
+ def be2HttpPort = cluster.getBackends().get(1).getHttpAddress()[1]
+ def be2HeartbeatPort = cluster.getBackends().get(1).getHeartbeatPort()
+
+ def be3Ip = cluster.getBackends().get(2).getHttpAddress()[0]
+ def be3HttpPort = cluster.getBackends().get(2).getHttpAddress()[1]
+ def be3HeartbeatPort = cluster.getBackends().get(2).getHeartbeatPort()
+
+ log.info("Initial cluster setup - 3 BEs")
+ log.info("BE1: ${be1Ip}:${be1HeartbeatPort}, BE2:
${be2Ip}:${be2HeartbeatPort}, BE3: ${be3Ip}:${be3HeartbeatPort}")
+ sql """show backends"""
+
+ // Drop BE1 and BE2
+ log.info("Dropping BE1 and BE2")
+ sql """ALTER SYSTEM DROPP BACKEND '${be1Ip}:${be1HeartbeatPort}'"""
+ sql """ALTER SYSTEM DROPP BACKEND '${be2Ip}:${be2HeartbeatPort}'"""
+ log.info("Backends after dropping BE1 and BE2: ${sql """show
backends""" }")
+
+ // Add BE1 back with custom endpoints
+ log.info("Adding BE1 back with custom public and private endpoints")
+ sql """ALTER SYSTEM ADD BACKEND '${be1Ip}:${be1HeartbeatPort}'
properties('tag.public_endpoint' = '11.10.10.10:8010', 'tag.private_endpoint' =
'10.10.10.9:8020')"""
+
+ // Add BE2 back with different custom endpoints
+ log.info("Adding BE2 back with different custom endpoints")
+ sql """ALTER SYSTEM ADD BACKEND '${be2Ip}:${be2HeartbeatPort}'
properties('tag.public_endpoint' = '12.20.20.20:8030', 'tag.private_endpoint' =
'11.20.20.19:8040')"""
+
+ // Modify BE3's endpoints
+ log.info("Modifying BE3's endpoints")
+ sql """ALTER SYSTEM MODIFY BACKEND '${be3Ip}:${be3HeartbeatPort}' set
('tag.public_endpoint' = '13.30.30.30:8050', 'tag.private_endpoint' =
'12.30.30.29:8060', 'tag.location' = 'default')"""
+
+ log.info("Final backends configuration: ${sql """show backends""" }")
+
+ // Test redirect locations - should use one of the available BEs
+ def location = getRedirectLocation(feIp, fePort, "public")
+ log.info("public location: ${location}")
+ assertTrue(location.contains("11.10.10.10:8010") ||
location.contains("12.20.20.20:8030") || location.contains("13.30.30.30:8050"))
+
+ location = getRedirectLocation(feIp, fePort, "private")
+ log.info("private location: ${location}")
+ assertTrue(location.contains("10.10.10.9:8020") ||
location.contains("11.20.20.19:8040") || location.contains("12.30.30.29:8060"))
+
+ location = getRedirectLocation(feIp, fePort, "direct")
+ log.info("direct location: ${location}")
+ assertTrue(location.contains("${be1Ip}:${be1HttpPort}") ||
location.contains("${be2Ip}:${be2HttpPort}") ||
location.contains("${be3Ip}:${be3HttpPort}"))
+
+ location = getRedirectLocation(feIp, fePort, "")
+ log.info("default location: ${location}")
+ assertTrue(location.contains("10.10.10.9:8020") ||
location.contains("11.20.20.19:8040") || location.contains("12.30.30.29:8060"))
+
+ // Test specific BE endpoint modifications
+ log.info("Testing endpoint configuration for specific BEs")
+
+ sql """ALTER SYSTEM MODIFY BACKEND
'${be1Ip}:${be1HeartbeatPort}','${be2Ip}:${be2HeartbeatPort}','${be3Ip}:${be3HeartbeatPort}'
set ('tag.location' = 'default')"""
+
+ location = getRedirectLocation(feIp, fePort, "")
+ log.info("final private location test: ${location}")
+ assertTrue(location.contains("${be1Ip}:${be1HttpPort}") ||
location.contains("${be2Ip}:${be2HttpPort}") ||
location.contains("${be3Ip}:${be3HttpPort}"))
+
+ sql """ALTER SYSTEM MODIFY BACKEND
'${be1Ip}:${be1HeartbeatPort}','${be2Ip}:${be2HeartbeatPort}','${be3Ip}:${be3HeartbeatPort}'
set ('tag.location' = 'default', 'tag.private_endpoint' =
'12.30.30.29:8060')"""
+ location = getRedirectLocation(feIp, fePort, "")
+ log.info("final private location test: ${location}")
+ assertTrue(location.contains("12.30.30.29:8060"))
+ }
+
+ // cloud mode
+ def options2 = new ClusterOptions()
+ options2.feNum = 1
+ options2.beNum = 3
+ options2.cloudMode = true
+ options2.beConfigs.add('enable_java_support=false')
+ options2.feConfigs.add('enable_print_request_before_execution=true')
+ docker(options2) {
+ // get fe and be info
+ def feIp = cluster.getMasterFe().getHttpAddress()[0]
+ def fePort = cluster.getMasterFe().getHttpAddress()[1]
+
+ def be1Ip = cluster.getBackends().get(0).getHttpAddress()[0]
+ def be1HttpPort = cluster.getBackends().get(0).getHttpAddress()[1]
+ def be1HeartbeatPort = cluster.getBackends().get(0).getHeartbeatPort()
+
+ def be2Ip = cluster.getBackends().get(1).getHttpAddress()[0]
+ def be2HttpPort = cluster.getBackends().get(1).getHttpAddress()[1]
+ def be2HeartbeatPort = cluster.getBackends().get(1).getHeartbeatPort()
+
+ def be3Ip = cluster.getBackends().get(2).getHttpAddress()[0]
+ def be3HttpPort = cluster.getBackends().get(2).getHttpAddress()[1]
+ def be3HeartbeatPort = cluster.getBackends().get(2).getHeartbeatPort()
+
+ def msIp = cluster.getMetaservices().get(0).getHttpAddress()[0]
+ def msPort = cluster.getMetaservices().get(0).getHttpAddress()[1]
+
+ log.info("Initial cluster setup - 3 BEs")
+ log.info("BE1: ${be1Ip}:${be1HeartbeatPort}, BE2:
${be2Ip}:${be2HeartbeatPort}, BE3: ${be3Ip}:${be3HeartbeatPort}")
+ sql """show backends"""
+
+ // Drop BE1 and BE2
+ log.info("Dropping BE1 and BE2")
+ sql """ALTER SYSTEM DROPP BACKEND '${be1Ip}:${be1HeartbeatPort}'"""
+ sql """ALTER SYSTEM DROPP BACKEND '${be2Ip}:${be2HeartbeatPort}'"""
+ log.info("Backends after dropping BE1 and BE2: ${sql """show
backends""" }")
+
+ // Add BE1 back with custom endpoints
+ log.info("Adding BE1 back with custom public and private endpoints")
+ sql """ALTER SYSTEM ADD BACKEND '${be1Ip}:${be1HeartbeatPort}'
properties('tag.public_endpoint' = '11.10.10.10:8010', 'tag.private_endpoint' =
'10.10.10.9:8020',"tag.compute_group_name" = "default_compute_group")"""
+
+ // Drop BE3
+ sql """ALTER SYSTEM DROPP BACKEND '${be3Ip}:${be3HeartbeatPort}'"""
+
+ // Add BE2 BE3 back with different custom endpoints
+ log.info("Adding BE2 BE3 back with different custom endpoints")
+ sql """ALTER SYSTEM ADD BACKEND
'${be2Ip}:${be2HeartbeatPort}','${be3Ip}:${be3HeartbeatPort}'
properties('tag.public_endpoint' = '12.20.20.20:8030', 'tag.private_endpoint' =
'11.20.20.19:8040', "tag.compute_group_name" = "default_compute_group")"""
+
+ sleep(30000)
+
+ log.info("Final backends configuration: ${sql """show backends""" }")
+
+ // Test redirect locations - should use one of the available BEs
+ def location = getRedirectLocation(feIp, fePort, "public")
+ log.info("public location: ${location}")
+ assertTrue(location.contains("11.10.10.10:8010") ||
location.contains("12.20.20.20:8030"))
+
+ location = getRedirectLocation(feIp, fePort, "private")
+ log.info("private location: ${location}")
+ assertTrue(location.contains("10.10.10.9:8020") ||
location.contains("11.20.20.19:8040"))
+
+ location = getRedirectLocation(feIp, fePort, "direct")
+ log.info("direct location: ${location}")
+ assertTrue(location.contains("${be1Ip}:${be1HttpPort}") ||
location.contains("${be2Ip}:${be2HttpPort}") ||
location.contains("${be3Ip}:${be3HttpPort}"))
+
+ location = getRedirectLocation(feIp, fePort, "")
+ log.info("default location: ${location}")
+ assertTrue(location.contains("10.10.10.9:8020") ||
location.contains("11.20.20.19:8040"))
+
+ }
+
+}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]