This is an automated email from the ASF dual-hosted git repository.
dockerzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git
The following commit(s) were added to refs/heads/master by this push:
new 51c689e99f [INLONG-8960][Manager] Automatically assign sort cluster
after creating stream sinks (#9001)
51c689e99f is described below
commit 51c689e99f8db05ed3abcd206d1babbe34acd46e
Author: vernedeng <[email protected]>
AuthorDate: Wed Sep 27 20:51:01 2023 +0800
[INLONG-8960][Manager] Automatically assign sort cluster after creating
stream sinks (#9001)
---
.../dao/mapper/InlongClusterEntityMapper.java | 2 +
.../manager/dao/mapper/StreamSinkEntityMapper.java | 2 +
.../mappers/InlongClusterEntityMapper.xml | 10 +++
.../resources/mappers/StreamSinkEntityMapper.xml | 12 +++
.../sortstandalone/SortStandaloneClusterDTO.java | 65 ---------------
.../apache/inlong/manager/pojo/sink/SinkInfo.java | 1 +
.../cluster/SortStandaloneClusterOperator.java | 31 ++++---
.../AbstractStandaloneSinkResourceOperator.java | 94 ++++++++++++++++++++++
.../resource/sink/cls/ClsResourceOperator.java | 5 +-
.../sink/es/ElasticsearchResourceOperator.java | 5 +-
.../sink/pulsar/PulsarResourceOperator.java | 5 +-
.../resource/sink/StandaloneAutoAssignTest.java | 92 +++++++++++++++++++++
.../sink/TestStandaloneSinkResourceOperator.java} | 35 ++------
13 files changed, 244 insertions(+), 115 deletions(-)
diff --git
a/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/mapper/InlongClusterEntityMapper.java
b/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/mapper/InlongClusterEntityMapper.java
index ce2644691d..a0d71abc5e 100644
---
a/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/mapper/InlongClusterEntityMapper.java
+++
b/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/mapper/InlongClusterEntityMapper.java
@@ -64,4 +64,6 @@ public interface InlongClusterEntityMapper {
int deleteByPrimaryKey(Integer id);
+ List<InlongClusterEntity> selectStandaloneClusterByType(@Param("sinkType")
String sinkType);
+
}
diff --git
a/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/mapper/StreamSinkEntityMapper.java
b/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/mapper/StreamSinkEntityMapper.java
index 9810878ff3..0687972c0b 100644
---
a/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/mapper/StreamSinkEntityMapper.java
+++
b/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/mapper/StreamSinkEntityMapper.java
@@ -161,4 +161,6 @@ public interface StreamSinkEntityMapper {
*/
int deleteByInlongGroupIds(@Param("groupIdList") List<String> groupIdList);
+ String selectAssignedCluster(@Param("dataNodeName") String dataNodeName);
+
}
diff --git
a/inlong-manager/manager-dao/src/main/resources/mappers/InlongClusterEntityMapper.xml
b/inlong-manager/manager-dao/src/main/resources/mappers/InlongClusterEntityMapper.xml
index f554faf64a..77f0eb3769 100644
---
a/inlong-manager/manager-dao/src/main/resources/mappers/InlongClusterEntityMapper.xml
+++
b/inlong-manager/manager-dao/src/main/resources/mappers/InlongClusterEntityMapper.xml
@@ -189,6 +189,16 @@
from inlong_cluster
where is_deleted = 0
</select>
+ <select id="selectStandaloneClusterByType"
resultType="org.apache.inlong.manager.dao.entity.InlongClusterEntity">
+ select
+ <include refid="Base_Column_List"/>
+ from inlong_cluster
+ <where>
+ type = 'SORTSTANDALONE'
+ and find_in_set(#{sinkType, jdbcType=VARCHAR}, ext_tag)
+ and is_deleted = 0
+ </where>
+ </select>
<update id="updateById"
parameterType="org.apache.inlong.manager.dao.entity.InlongClusterEntity">
update inlong_cluster
diff --git
a/inlong-manager/manager-dao/src/main/resources/mappers/StreamSinkEntityMapper.xml
b/inlong-manager/manager-dao/src/main/resources/mappers/StreamSinkEntityMapper.xml
index 6d43fd4968..b80e5f2d5a 100644
---
a/inlong-manager/manager-dao/src/main/resources/mappers/StreamSinkEntityMapper.xml
+++
b/inlong-manager/manager-dao/src/main/resources/mappers/StreamSinkEntityMapper.xml
@@ -330,6 +330,7 @@
sink.inlong_group_id,
sink.inlong_stream_id,
sink.sink_type,
+ sink.inlong_cluster_name,
sink.description,
sink.enable_create_resource,
sink.ext_params,
@@ -389,6 +390,17 @@
from stream_sink
where is_deleted = 0
</select>
+ <select id="selectAssignedCluster" resultType="java.lang.String">
+ select inlong_cluster_name
+ from stream_sink
+ <where>
+ data_node_name = #{dataNodeName, jdbcType=VARCHAR}
+ and is_deleted = 0
+ </where>
+ group by inlong_cluster_name
+ order by count(*) asc
+ limit 1
+ </select>
<update id="updateByIdSelective"
parameterType="org.apache.inlong.manager.dao.entity.StreamSinkEntity">
update stream_sink
<set>
diff --git
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/cluster/sortstandalone/SortStandaloneClusterDTO.java
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/cluster/sortstandalone/SortStandaloneClusterDTO.java
deleted file mode 100644
index 0eec2ded29..0000000000
---
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/cluster/sortstandalone/SortStandaloneClusterDTO.java
+++ /dev/null
@@ -1,65 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.inlong.manager.pojo.cluster.sortstandalone;
-
-import org.apache.inlong.manager.common.enums.ErrorCodeEnum;
-import org.apache.inlong.manager.common.exceptions.BusinessException;
-import org.apache.inlong.manager.common.util.CommonBeanUtils;
-import org.apache.inlong.manager.common.util.JsonUtils;
-
-import io.swagger.annotations.ApiModel;
-import io.swagger.annotations.ApiModelProperty;
-import lombok.AllArgsConstructor;
-import lombok.Builder;
-import lombok.Data;
-import lombok.NoArgsConstructor;
-import org.apache.commons.lang3.StringUtils;
-
-import javax.validation.constraints.NotNull;
-
-import java.util.Set;
-
-@Data
-@Builder
-@NoArgsConstructor
-@AllArgsConstructor
-@ApiModel("SortStandalone cluster info")
-public class SortStandaloneClusterDTO {
-
- @ApiModelProperty(value = "Supported sink types")
- private Set<String> supportedSinkTypes;
-
- public static SortStandaloneClusterDTO
getFromRequest(SortStandaloneClusterRequest request, String extParams) {
- SortStandaloneClusterDTO dto = StringUtils.isNotBlank(extParams)
- ? SortStandaloneClusterDTO.getFromJson(extParams)
- : new SortStandaloneClusterDTO();
- return CommonBeanUtils.copyProperties(request, dto, true);
- }
-
- /**
- * Get the dto instance from the JSON string.
- */
- public static SortStandaloneClusterDTO getFromJson(@NotNull String
extParams) {
- try {
- return JsonUtils.parseObject(extParams,
SortStandaloneClusterDTO.class);
- } catch (Exception e) {
- throw new BusinessException(ErrorCodeEnum.CLUSTER_INFO_INCORRECT,
- String.format("parse extParams of SortStandalone Cluster
failure: %s", e.getMessage()));
- }
- }
-}
diff --git
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/SinkInfo.java
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/SinkInfo.java
index c2d2c3fff2..78c728eecf 100644
---
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/SinkInfo.java
+++
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/SinkInfo.java
@@ -31,6 +31,7 @@ public class SinkInfo {
private String inlongGroupId;
private String inlongStreamId;
private String sinkType;
+ private String inlongClusterName;
private String sinkName;
private String dataNodeName;
private String description;
diff --git
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/cluster/SortStandaloneClusterOperator.java
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/cluster/SortStandaloneClusterOperator.java
index 9e04f33c0d..3cf12837c2 100644
---
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/cluster/SortStandaloneClusterOperator.java
+++
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/cluster/SortStandaloneClusterOperator.java
@@ -17,6 +17,7 @@
package org.apache.inlong.manager.service.cluster;
+import org.apache.inlong.manager.common.consts.InlongConstants;
import org.apache.inlong.manager.common.enums.ClusterType;
import org.apache.inlong.manager.common.enums.ErrorCodeEnum;
import org.apache.inlong.manager.common.exceptions.BusinessException;
@@ -24,35 +25,30 @@ import
org.apache.inlong.manager.common.util.CommonBeanUtils;
import org.apache.inlong.manager.dao.entity.InlongClusterEntity;
import org.apache.inlong.manager.pojo.cluster.ClusterInfo;
import org.apache.inlong.manager.pojo.cluster.ClusterRequest;
-import
org.apache.inlong.manager.pojo.cluster.sortstandalone.SortStandaloneClusterDTO;
import
org.apache.inlong.manager.pojo.cluster.sortstandalone.SortStandaloneClusterInfo;
import
org.apache.inlong.manager.pojo.cluster.sortstandalone.SortStandaloneClusterRequest;
-import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.common.base.Joiner;
+import com.google.common.collect.Sets;
import lombok.extern.slf4j.Slf4j;
+import org.apache.commons.collections4.CollectionUtils;
import org.apache.commons.lang3.StringUtils;
-import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
+import java.util.Set;
+
@Slf4j
@Service
public class SortStandaloneClusterOperator extends AbstractClusterOperator {
- @Autowired
- private ObjectMapper objectMapper;
-
@Override
protected void setTargetEntity(ClusterRequest request, InlongClusterEntity
targetEntity) {
SortStandaloneClusterRequest standaloneRequest =
(SortStandaloneClusterRequest) request;
CommonBeanUtils.copyProperties(standaloneRequest, targetEntity, true);
- try {
- SortStandaloneClusterDTO dto =
SortStandaloneClusterDTO.getFromRequest(standaloneRequest,
- targetEntity.getExtParams());
- targetEntity.setExtParams(objectMapper.writeValueAsString(dto));
- log.debug("success to set entity for SortStandalone cluster");
- } catch (Exception e) {
- throw new BusinessException(ErrorCodeEnum.CLUSTER_INFO_INCORRECT,
- String.format("serialize extParams of SortStandalone
Cluster failure: %s", e.getMessage()));
+ Set<String> supportedTypes = standaloneRequest.getSupportedSinkTypes();
+ if (CollectionUtils.isNotEmpty(supportedTypes)) {
+ String extTag =
Joiner.on(InlongConstants.COMMA).join(supportedTypes);
+ targetEntity.setExtTag(extTag);
}
}
@@ -74,9 +70,10 @@ public class SortStandaloneClusterOperator extends
AbstractClusterOperator {
SortStandaloneClusterInfo clusterInfo = new
SortStandaloneClusterInfo();
CommonBeanUtils.copyProperties(entity, clusterInfo);
- if (StringUtils.isNotBlank(entity.getExtParams())) {
- SortStandaloneClusterDTO dto =
SortStandaloneClusterDTO.getFromJson(entity.getExtParams());
- CommonBeanUtils.copyProperties(dto, clusterInfo);
+ String extTag = entity.getExtTag();
+ if (StringUtils.isNotBlank(extTag)) {
+ Set<String> supportedTypes =
Sets.newHashSet(extTag.split(InlongConstants.COMMA));
+ clusterInfo.setSupportedSinkTypes(supportedTypes);
}
return clusterInfo;
}
diff --git
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/sink/AbstractStandaloneSinkResourceOperator.java
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/sink/AbstractStandaloneSinkResourceOperator.java
new file mode 100644
index 0000000000..a7e3fb9e62
--- /dev/null
+++
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/sink/AbstractStandaloneSinkResourceOperator.java
@@ -0,0 +1,94 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.manager.service.resource.sink;
+
+import org.apache.inlong.manager.common.consts.InlongConstants;
+import org.apache.inlong.manager.common.util.Preconditions;
+import org.apache.inlong.manager.dao.entity.InlongClusterEntity;
+import org.apache.inlong.manager.dao.entity.InlongGroupEntity;
+import org.apache.inlong.manager.dao.entity.StreamSinkEntity;
+import org.apache.inlong.manager.dao.mapper.InlongClusterEntityMapper;
+import org.apache.inlong.manager.dao.mapper.InlongGroupEntityMapper;
+import org.apache.inlong.manager.dao.mapper.StreamSinkEntityMapper;
+import org.apache.inlong.manager.pojo.sink.SinkInfo;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.collect.Sets;
+import org.apache.commons.lang3.StringUtils;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.util.CollectionUtils;
+
+import java.util.List;
+import java.util.Random;
+import java.util.stream.Collectors;
+
+public abstract class AbstractStandaloneSinkResourceOperator implements
SinkResourceOperator {
+
+ @Autowired
+ private InlongClusterEntityMapper clusterEntityMapper;
+ @Autowired
+ private StreamSinkEntityMapper sinkEntityMapper;
+ @Autowired
+ private InlongGroupEntityMapper groupEntityMapper;
+
+ private Random rand = new Random();
+
+ @VisibleForTesting
+ protected void assignCluster(SinkInfo sinkInfo) {
+ if (StringUtils.isNotBlank(sinkInfo.getInlongClusterName())) {
+ return;
+ }
+
+ String targetCluster = assignOneCluster(sinkInfo);
+ Preconditions.expectNotBlank(targetCluster,
+ String.format("find no proper cluster assign to group=%s,
stream=%s, sink type=%s, data node=%s ",
+ sinkInfo.getInlongGroupId(),
sinkInfo.getInlongStreamId(), sinkInfo.getSinkType(),
+ sinkInfo.getDataNodeName()));
+
+ StreamSinkEntity sink =
sinkEntityMapper.selectByPrimaryKey(sinkInfo.getId());
+ sink.setInlongClusterName(targetCluster);
+ sinkEntityMapper.updateByIdSelective(sink);
+ }
+
+ private String assignOneCluster(SinkInfo sinkInfo) {
+ return StringUtils
+ .firstNonBlank(assignFromExist(sinkInfo.getDataNodeName()),
+ assignFromRelated(sinkInfo.getSinkType(),
sinkInfo.getInlongGroupId()));
+ }
+
+ private String assignFromExist(String dataNodeName) {
+ return sinkEntityMapper.selectAssignedCluster(dataNodeName);
+ }
+
+ private String assignFromRelated(String sinkType, String groupId) {
+ InlongGroupEntity group = groupEntityMapper.selectByGroupId(groupId);
+ List<InlongClusterEntity> clusters = clusterEntityMapper
+ .selectStandaloneClusterByType(sinkType).stream()
+ .filter(cluster -> checkCluster(cluster.getClusterTags(),
group.getInlongClusterTag()))
+ .collect(Collectors.toList());
+
+ return CollectionUtils.isEmpty(clusters) ? null :
clusters.get(rand.nextInt(clusters.size())).getName();
+
+ }
+
+ private boolean checkCluster(String clusterTags, String targetTag) {
+ return StringUtils.isBlank(clusterTags)
+ ||
Sets.newHashSet(clusterTags.split(InlongConstants.COMMA)).contains(targetTag);
+ }
+
+}
diff --git
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/sink/cls/ClsResourceOperator.java
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/sink/cls/ClsResourceOperator.java
index e69f298ffe..20effc5f93 100644
---
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/sink/cls/ClsResourceOperator.java
+++
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/sink/cls/ClsResourceOperator.java
@@ -31,7 +31,7 @@ import
org.apache.inlong.manager.dao.mapper.StreamSinkEntityMapper;
import org.apache.inlong.manager.pojo.node.cls.ClsDataNodeDTO;
import org.apache.inlong.manager.pojo.sink.SinkInfo;
import org.apache.inlong.manager.pojo.sink.cls.ClsSinkDTO;
-import org.apache.inlong.manager.service.resource.sink.SinkResourceOperator;
+import
org.apache.inlong.manager.service.resource.sink.AbstractStandaloneSinkResourceOperator;
import org.apache.inlong.manager.service.sink.StreamSinkService;
import com.tencentcloudapi.cls.v20201016.ClsClient;
@@ -52,7 +52,7 @@ import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
@Service
-public class ClsResourceOperator implements SinkResourceOperator {
+public class ClsResourceOperator extends
AbstractStandaloneSinkResourceOperator {
private static final Logger LOG =
LoggerFactory.getLogger(ClsResourceOperator.class);
@@ -79,6 +79,7 @@ public class ClsResourceOperator implements
SinkResourceOperator {
return;
}
this.createTopicID(sinkInfo);
+ this.assignCluster(sinkInfo);
}
/**
diff --git
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/sink/es/ElasticsearchResourceOperator.java
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/sink/es/ElasticsearchResourceOperator.java
index 52b854b207..70165bf541 100644
---
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/sink/es/ElasticsearchResourceOperator.java
+++
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/sink/es/ElasticsearchResourceOperator.java
@@ -28,7 +28,7 @@ import org.apache.inlong.manager.pojo.sink.SinkInfo;
import org.apache.inlong.manager.pojo.sink.es.ElasticsearchFieldInfo;
import org.apache.inlong.manager.pojo.sink.es.ElasticsearchSinkDTO;
import org.apache.inlong.manager.service.node.DataNodeOperateHelper;
-import org.apache.inlong.manager.service.resource.sink.SinkResourceOperator;
+import
org.apache.inlong.manager.service.resource.sink.AbstractStandaloneSinkResourceOperator;
import org.apache.inlong.manager.service.sink.StreamSinkService;
import org.apache.commons.collections.CollectionUtils;
@@ -45,7 +45,7 @@ import java.util.List;
* Elasticsearch's resource operator
*/
@Service
-public class ElasticsearchResourceOperator implements SinkResourceOperator {
+public class ElasticsearchResourceOperator extends
AbstractStandaloneSinkResourceOperator {
private static final Logger LOGGER =
LoggerFactory.getLogger(ElasticsearchResourceOperator.class);
@Autowired
@@ -76,6 +76,7 @@ public class ElasticsearchResourceOperator implements
SinkResourceOperator {
}
this.createIndex(sinkInfo);
+ this.assignCluster(sinkInfo);
}
private void createIndex(SinkInfo sinkInfo) {
diff --git
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/sink/pulsar/PulsarResourceOperator.java
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/sink/pulsar/PulsarResourceOperator.java
index 0d6df611ea..a38b391131 100644
---
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/sink/pulsar/PulsarResourceOperator.java
+++
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/sink/pulsar/PulsarResourceOperator.java
@@ -34,7 +34,7 @@ import
org.apache.inlong.manager.pojo.sink.pulsar.PulsarSinkDTO;
import org.apache.inlong.manager.service.node.DataNodeOperateHelper;
import org.apache.inlong.manager.service.resource.queue.pulsar.PulsarOperator;
import org.apache.inlong.manager.service.resource.queue.pulsar.PulsarUtils;
-import org.apache.inlong.manager.service.resource.sink.SinkResourceOperator;
+import
org.apache.inlong.manager.service.resource.sink.AbstractStandaloneSinkResourceOperator;
import org.apache.inlong.manager.service.sink.StreamSinkService;
import org.apache.pulsar.client.admin.PulsarAdmin;
@@ -49,7 +49,7 @@ import org.springframework.stereotype.Service;
* Pulsar resource operate for creating pulsar resource
*/
@Service
-public class PulsarResourceOperator implements SinkResourceOperator {
+public class PulsarResourceOperator extends
AbstractStandaloneSinkResourceOperator {
private static final Logger LOG =
LoggerFactory.getLogger(PulsarResourceOperator.class);
@@ -76,6 +76,7 @@ public class PulsarResourceOperator implements
SinkResourceOperator {
return;
}
this.createTopic(sinkInfo);
+ this.assignCluster(sinkInfo);
}
private void createTopic(SinkInfo sinkInfo) {
diff --git
a/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/resource/sink/StandaloneAutoAssignTest.java
b/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/resource/sink/StandaloneAutoAssignTest.java
new file mode 100644
index 0000000000..b7a246003e
--- /dev/null
+++
b/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/resource/sink/StandaloneAutoAssignTest.java
@@ -0,0 +1,92 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.manager.service.resource.sink;
+
+import org.apache.inlong.common.constant.MQType;
+import org.apache.inlong.manager.common.consts.SinkType;
+import org.apache.inlong.manager.dao.entity.StreamSinkEntity;
+import org.apache.inlong.manager.dao.mapper.StreamSinkEntityMapper;
+import
org.apache.inlong.manager.pojo.cluster.sortstandalone.SortStandaloneClusterRequest;
+import org.apache.inlong.manager.pojo.group.InlongGroupInfo;
+import org.apache.inlong.manager.pojo.sink.SinkInfo;
+import org.apache.inlong.manager.pojo.stream.InlongStreamInfo;
+import org.apache.inlong.manager.service.ServiceBaseTest;
+import org.apache.inlong.manager.service.cluster.InlongClusterService;
+
+import com.google.common.collect.Sets;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Test;
+import org.springframework.beans.factory.annotation.Autowired;
+
+import java.util.List;
+import java.util.Set;
+
+public class StandaloneAutoAssignTest extends ServiceBaseTest {
+
+ @Autowired
+ private StreamSinkEntityMapper sinkEntityMapper;
+ @Autowired
+ private InlongClusterService clusterService;
+ @Autowired
+ private TestStandaloneSinkResourceOperator testResourceOperator;
+ @Test
+ public void testAutoAssign() {
+
+ String group = "autoGroup";
+ String stream = "autoStream";
+ InlongGroupInfo groupInfo = this.createInlongGroup(group,
MQType.PULSAR);
+ InlongStreamInfo streamInfo = this.createStreamInfo(groupInfo, stream);
+ Integer id = saveClsSink(groupInfo.getInlongGroupId(),
streamInfo.getInlongStreamId());
+
+ String clusterName = "clsCluster";
+ Set<String> types = Sets.newHashSet(SinkType.CLS,
SinkType.ELASTICSEARCH);
+ saveStandaloneCluster(groupInfo.getInlongClusterTag(), clusterName,
types);
+
+ List<SinkInfo> sinkInfos =
sinkEntityMapper.selectAllConfig(groupInfo.getInlongGroupId(), null);
+ Assertions.assertEquals(1, sinkInfos.size());
+ SinkInfo clsSinkInfo = sinkInfos.get(0);
+
+ testResourceOperator.assignCluster(clsSinkInfo);
+
+ StreamSinkEntity newClsEntity =
sinkEntityMapper.selectByPrimaryKey(id);
+ Assertions.assertEquals(clusterName,
newClsEntity.getInlongClusterName());
+
+ }
+
+ public Integer saveClsSink(String groupId, String streamId) {
+ StreamSinkEntity clsSinkEntity = new StreamSinkEntity();
+ clsSinkEntity.setDataNodeName("testNode");
+ clsSinkEntity.setSinkType(SinkType.CLS);
+ clsSinkEntity.setSinkName("testClsSink");
+ clsSinkEntity.setInlongGroupId(groupId);
+ clsSinkEntity.setInlongStreamId(streamId);
+ clsSinkEntity.setCreator(GLOBAL_OPERATOR);
+
+ return sinkEntityMapper.insert(clsSinkEntity);
+ }
+
+ public Integer saveStandaloneCluster(String clusterTag, String
clusterName, Set<String> supportedSinkTypes) {
+ SortStandaloneClusterRequest request = new
SortStandaloneClusterRequest();
+ request.setClusterTags(clusterTag);
+ request.setName(clusterName);
+ request.setSupportedSinkTypes(supportedSinkTypes);
+ request.setInCharges(GLOBAL_OPERATOR);
+ return clusterService.save(request, GLOBAL_OPERATOR);
+ }
+
+}
diff --git
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/SinkInfo.java
b/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/resource/sink/TestStandaloneSinkResourceOperator.java
similarity index 51%
copy from
inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/SinkInfo.java
copy to
inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/resource/sink/TestStandaloneSinkResourceOperator.java
index c2d2c3fff2..5d9567737f 100644
---
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/SinkInfo.java
+++
b/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/resource/sink/TestStandaloneSinkResourceOperator.java
@@ -15,34 +15,15 @@
* limitations under the License.
*/
-package org.apache.inlong.manager.pojo.sink;
+package org.apache.inlong.manager.service.resource.sink;
-import io.swagger.annotations.ApiModel;
-import lombok.Data;
+import org.springframework.stereotype.Service;
-/**
- * Sink info - with stream
- */
-@Data
-@ApiModel("Sink info - with stream")
-public class SinkInfo {
-
- private Integer id;
- private String inlongGroupId;
- private String inlongStreamId;
- private String sinkType;
- private String sinkName;
- private String dataNodeName;
- private String description;
- private Integer enableCreateResource;
- private String extParams;
- private Integer status;
- private String creator;
-
- // Inlong stream info
- private String mqResource;
- private String dataType;
- private String sourceSeparator; // Source separator configured in the
stream info
- private String dataEscapeChar;
+@Service
+public class TestStandaloneSinkResourceOperator extends
AbstractStandaloneSinkResourceOperator {
+ @Override
+ public Boolean accept(String sinkType) {
+ return null;
+ }
}