This is an automated email from the ASF dual-hosted git repository.
dockerzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git
The following commit(s) were added to refs/heads/master by this push:
new 47c4fa01a3 [INLONG-9484]][Manager] Improve logic of sortstandalone
sink auto-assigned cluster (#9485)
47c4fa01a3 is described below
commit 47c4fa01a3b8760989ac01a8808db0073b9be638
Author: vernedeng <[email protected]>
AuthorDate: Fri Dec 15 14:21:02 2023 +0800
[INLONG-9484]][Manager] Improve logic of sortstandalone sink auto-assigned
cluster (#9485)
---
.../manager/pojo/cluster/ClusterRequest.java | 2 +-
.../inlong/manager/pojo/node/DataNodeRequest.java | 2 +-
.../AbstractStandaloneSinkResourceOperator.java | 37 +++++++++++++---------
3 files changed, 24 insertions(+), 17 deletions(-)
diff --git
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/cluster/ClusterRequest.java
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/cluster/ClusterRequest.java
index a2dd867a0c..0030a13810 100644
---
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/cluster/ClusterRequest.java
+++
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/cluster/ClusterRequest.java
@@ -48,7 +48,7 @@ public abstract class ClusterRequest {
private Integer id;
@ApiModelProperty(value = "Cluster name")
- @Pattern(regexp = "^[A-Za-z0-9_-]{1,128}$", message = "only supports
letters, numbers, '-', or '_'")
+ @Pattern(regexp = "^[A-Za-z0-9._-]{1,128}$", message = "only supports
letters, numbers, '.', '-', or '_'")
private String name;
@ApiModelProperty(value = "Cluster display name, just for display")
diff --git
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/node/DataNodeRequest.java
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/node/DataNodeRequest.java
index a125382c41..fc336a4749 100644
---
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/node/DataNodeRequest.java
+++
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/node/DataNodeRequest.java
@@ -48,7 +48,7 @@ public abstract class DataNodeRequest {
private Integer id;
@ApiModelProperty(value = "Data node name")
- @Pattern(regexp = "^[A-Za-z0-9_-]{1,128}$", message = "only supports
letters, numbers, '-', or '_'")
+ @Pattern(regexp = "^[A-Za-z0-9._-]{1,128}$", message = "only supports
letters, numbers, '.', '-', or '_'")
private String name;
@ApiModelProperty(value = "Data node display name, just for display")
diff --git
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/sink/AbstractStandaloneSinkResourceOperator.java
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/sink/AbstractStandaloneSinkResourceOperator.java
index 7a53035a23..79893ad9c0 100644
---
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/sink/AbstractStandaloneSinkResourceOperator.java
+++
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/sink/AbstractStandaloneSinkResourceOperator.java
@@ -17,8 +17,8 @@
package org.apache.inlong.manager.service.resource.sink;
-import org.apache.inlong.manager.common.consts.InlongConstants;
import org.apache.inlong.manager.common.consts.SinkType;
+import org.apache.inlong.manager.common.enums.ErrorCodeEnum;
import org.apache.inlong.manager.common.enums.SinkStatus;
import org.apache.inlong.manager.common.util.Preconditions;
import org.apache.inlong.manager.dao.entity.InlongClusterEntity;
@@ -31,15 +31,15 @@ import org.apache.inlong.manager.pojo.sink.SinkInfo;
import org.apache.inlong.manager.service.sink.StreamSinkService;
import com.google.common.annotations.VisibleForTesting;
-import com.google.common.collect.Sets;
+import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.util.CollectionUtils;
import java.util.List;
import java.util.Random;
-import java.util.stream.Collectors;
+@Slf4j
public abstract class AbstractStandaloneSinkResourceOperator implements
SinkResourceOperator {
@Autowired
@@ -51,14 +51,16 @@ public abstract class
AbstractStandaloneSinkResourceOperator implements SinkReso
@Autowired
private StreamSinkService sinkService;
- private static final String SORT_PREFIX = "SORT_";
-
private Random rand = new Random();
@VisibleForTesting
protected void assignCluster(SinkInfo sinkInfo) {
+ if (StringUtils.isBlank(sinkInfo.getSinkType())) {
+ throw new
IllegalArgumentException(ErrorCodeEnum.SINK_TYPE_IS_NULL.getMessage());
+ }
+
if (StringUtils.isNotBlank(sinkInfo.getInlongClusterName())) {
- String info = "success to create es resource";
+ String info = "no need to auto-assign cluster since the cluster
has already assigned";
sinkService.updateStatus(sinkInfo.getId(),
SinkStatus.CONFIG_SUCCESSFUL.getCode(), info);
return;
}
@@ -88,18 +90,23 @@ public abstract class
AbstractStandaloneSinkResourceOperator implements SinkReso
private String assignFromRelated(String sinkType, String groupId) {
InlongGroupEntity group = groupEntityMapper.selectByGroupId(groupId);
String sortClusterType = SinkType.relatedSortClusterType(sinkType);
- List<InlongClusterEntity> clusters = clusterEntityMapper
- .selectByKey(null, null, sortClusterType).stream()
- .filter(cluster -> checkCluster(cluster.getClusterTags(),
group.getInlongClusterTag()))
- .collect(Collectors.toList());
+ if (StringUtils.isBlank(sortClusterType)) {
+ log.error("find no relate sort cluster type for sink type={}",
sinkType);
+ return null;
+ }
- return CollectionUtils.isEmpty(clusters) ? null :
clusters.get(rand.nextInt(clusters.size())).getName();
+ // if some clusters have the same tag
+ List<InlongClusterEntity> clusters =
+ clusterEntityMapper.selectByKey(group.getInlongClusterTag(),
null, sortClusterType);
+ if (!CollectionUtils.isEmpty(clusters)) {
+ return clusters.get(rand.nextInt(clusters.size())).getName();
+ }
- }
+ // if no cluster with the same tag
+ clusters = clusterEntityMapper.selectByKey(null, null,
sortClusterType);
+
+ return CollectionUtils.isEmpty(clusters) ? null :
clusters.get(rand.nextInt(clusters.size())).getName();
- private boolean checkCluster(String clusterTags, String targetTag) {
- return StringUtils.isBlank(clusterTags)
- ||
Sets.newHashSet(clusterTags.split(InlongConstants.COMMA)).contains(targetTag);
}
}