This is an automated email from the ASF dual-hosted git repository.

dockerzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git


The following commit(s) were added to refs/heads/master by this push:
     new 51c689e99f [INLONG-8960][Manager] Automatically assign sort cluster 
after creating stream sinks (#9001)
51c689e99f is described below

commit 51c689e99f8db05ed3abcd206d1babbe34acd46e
Author: vernedeng <[email protected]>
AuthorDate: Wed Sep 27 20:51:01 2023 +0800

    [INLONG-8960][Manager] Automatically assign sort cluster after creating 
stream sinks (#9001)
---
 .../dao/mapper/InlongClusterEntityMapper.java      |  2 +
 .../manager/dao/mapper/StreamSinkEntityMapper.java |  2 +
 .../mappers/InlongClusterEntityMapper.xml          | 10 +++
 .../resources/mappers/StreamSinkEntityMapper.xml   | 12 +++
 .../sortstandalone/SortStandaloneClusterDTO.java   | 65 ---------------
 .../apache/inlong/manager/pojo/sink/SinkInfo.java  |  1 +
 .../cluster/SortStandaloneClusterOperator.java     | 31 ++++---
 .../AbstractStandaloneSinkResourceOperator.java    | 94 ++++++++++++++++++++++
 .../resource/sink/cls/ClsResourceOperator.java     |  5 +-
 .../sink/es/ElasticsearchResourceOperator.java     |  5 +-
 .../sink/pulsar/PulsarResourceOperator.java        |  5 +-
 .../resource/sink/StandaloneAutoAssignTest.java    | 92 +++++++++++++++++++++
 .../sink/TestStandaloneSinkResourceOperator.java}  | 35 ++------
 13 files changed, 244 insertions(+), 115 deletions(-)

diff --git 
a/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/mapper/InlongClusterEntityMapper.java
 
b/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/mapper/InlongClusterEntityMapper.java
index ce2644691d..a0d71abc5e 100644
--- 
a/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/mapper/InlongClusterEntityMapper.java
+++ 
b/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/mapper/InlongClusterEntityMapper.java
@@ -64,4 +64,6 @@ public interface InlongClusterEntityMapper {
 
     int deleteByPrimaryKey(Integer id);
 
+    List<InlongClusterEntity> selectStandaloneClusterByType(@Param("sinkType") 
String sinkType);
+
 }
diff --git 
a/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/mapper/StreamSinkEntityMapper.java
 
b/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/mapper/StreamSinkEntityMapper.java
index 9810878ff3..0687972c0b 100644
--- 
a/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/mapper/StreamSinkEntityMapper.java
+++ 
b/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/mapper/StreamSinkEntityMapper.java
@@ -161,4 +161,6 @@ public interface StreamSinkEntityMapper {
      */
     int deleteByInlongGroupIds(@Param("groupIdList") List<String> groupIdList);
 
+    String selectAssignedCluster(@Param("dataNodeName") String dataNodeName);
+
 }
diff --git 
a/inlong-manager/manager-dao/src/main/resources/mappers/InlongClusterEntityMapper.xml
 
b/inlong-manager/manager-dao/src/main/resources/mappers/InlongClusterEntityMapper.xml
index f554faf64a..77f0eb3769 100644
--- 
a/inlong-manager/manager-dao/src/main/resources/mappers/InlongClusterEntityMapper.xml
+++ 
b/inlong-manager/manager-dao/src/main/resources/mappers/InlongClusterEntityMapper.xml
@@ -189,6 +189,16 @@
         from inlong_cluster
         where is_deleted = 0
     </select>
+    <select id="selectStandaloneClusterByType" 
resultType="org.apache.inlong.manager.dao.entity.InlongClusterEntity">
+        select
+        <include refid="Base_Column_List"/>
+        from inlong_cluster
+        <where>
+            type = 'SORTSTANDALONE'
+            and find_in_set(#{sinkType, jdbcType=VARCHAR}, ext_tag)
+            and is_deleted = 0
+        </where>
+    </select>
 
     <update id="updateById" 
parameterType="org.apache.inlong.manager.dao.entity.InlongClusterEntity">
         update inlong_cluster
diff --git 
a/inlong-manager/manager-dao/src/main/resources/mappers/StreamSinkEntityMapper.xml
 
b/inlong-manager/manager-dao/src/main/resources/mappers/StreamSinkEntityMapper.xml
index 6d43fd4968..b80e5f2d5a 100644
--- 
a/inlong-manager/manager-dao/src/main/resources/mappers/StreamSinkEntityMapper.xml
+++ 
b/inlong-manager/manager-dao/src/main/resources/mappers/StreamSinkEntityMapper.xml
@@ -330,6 +330,7 @@
         sink.inlong_group_id,
         sink.inlong_stream_id,
         sink.sink_type,
+        sink.inlong_cluster_name,
         sink.description,
         sink.enable_create_resource,
         sink.ext_params,
@@ -389,6 +390,17 @@
         from stream_sink
         where is_deleted = 0
     </select>
+    <select id="selectAssignedCluster" resultType="java.lang.String">
+        select inlong_cluster_name
+        from stream_sink
+        <where>
+            data_node_name = #{dataNodeName, jdbcType=VARCHAR}
+            and is_deleted = 0
+        </where>
+        group by inlong_cluster_name
+        order by count(*) asc
+        limit 1
+    </select>
     <update id="updateByIdSelective" 
parameterType="org.apache.inlong.manager.dao.entity.StreamSinkEntity">
         update stream_sink
         <set>
diff --git 
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/cluster/sortstandalone/SortStandaloneClusterDTO.java
 
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/cluster/sortstandalone/SortStandaloneClusterDTO.java
deleted file mode 100644
index 0eec2ded29..0000000000
--- 
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/cluster/sortstandalone/SortStandaloneClusterDTO.java
+++ /dev/null
@@ -1,65 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.inlong.manager.pojo.cluster.sortstandalone;
-
-import org.apache.inlong.manager.common.enums.ErrorCodeEnum;
-import org.apache.inlong.manager.common.exceptions.BusinessException;
-import org.apache.inlong.manager.common.util.CommonBeanUtils;
-import org.apache.inlong.manager.common.util.JsonUtils;
-
-import io.swagger.annotations.ApiModel;
-import io.swagger.annotations.ApiModelProperty;
-import lombok.AllArgsConstructor;
-import lombok.Builder;
-import lombok.Data;
-import lombok.NoArgsConstructor;
-import org.apache.commons.lang3.StringUtils;
-
-import javax.validation.constraints.NotNull;
-
-import java.util.Set;
-
-@Data
-@Builder
-@NoArgsConstructor
-@AllArgsConstructor
-@ApiModel("SortStandalone cluster info")
-public class SortStandaloneClusterDTO {
-
-    @ApiModelProperty(value = "Supported sink types")
-    private Set<String> supportedSinkTypes;
-
-    public static SortStandaloneClusterDTO 
getFromRequest(SortStandaloneClusterRequest request, String extParams) {
-        SortStandaloneClusterDTO dto = StringUtils.isNotBlank(extParams)
-                ? SortStandaloneClusterDTO.getFromJson(extParams)
-                : new SortStandaloneClusterDTO();
-        return CommonBeanUtils.copyProperties(request, dto, true);
-    }
-
-    /**
-     * Get the dto instance from the JSON string.
-     */
-    public static SortStandaloneClusterDTO getFromJson(@NotNull String 
extParams) {
-        try {
-            return JsonUtils.parseObject(extParams, 
SortStandaloneClusterDTO.class);
-        } catch (Exception e) {
-            throw new BusinessException(ErrorCodeEnum.CLUSTER_INFO_INCORRECT,
-                    String.format("parse extParams of SortStandalone Cluster 
failure: %s", e.getMessage()));
-        }
-    }
-}
diff --git 
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/SinkInfo.java
 
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/SinkInfo.java
index c2d2c3fff2..78c728eecf 100644
--- 
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/SinkInfo.java
+++ 
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/SinkInfo.java
@@ -31,6 +31,7 @@ public class SinkInfo {
     private String inlongGroupId;
     private String inlongStreamId;
     private String sinkType;
+    private String inlongClusterName;
     private String sinkName;
     private String dataNodeName;
     private String description;
diff --git 
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/cluster/SortStandaloneClusterOperator.java
 
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/cluster/SortStandaloneClusterOperator.java
index 9e04f33c0d..3cf12837c2 100644
--- 
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/cluster/SortStandaloneClusterOperator.java
+++ 
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/cluster/SortStandaloneClusterOperator.java
@@ -17,6 +17,7 @@
 
 package org.apache.inlong.manager.service.cluster;
 
+import org.apache.inlong.manager.common.consts.InlongConstants;
 import org.apache.inlong.manager.common.enums.ClusterType;
 import org.apache.inlong.manager.common.enums.ErrorCodeEnum;
 import org.apache.inlong.manager.common.exceptions.BusinessException;
@@ -24,35 +25,30 @@ import 
org.apache.inlong.manager.common.util.CommonBeanUtils;
 import org.apache.inlong.manager.dao.entity.InlongClusterEntity;
 import org.apache.inlong.manager.pojo.cluster.ClusterInfo;
 import org.apache.inlong.manager.pojo.cluster.ClusterRequest;
-import 
org.apache.inlong.manager.pojo.cluster.sortstandalone.SortStandaloneClusterDTO;
 import 
org.apache.inlong.manager.pojo.cluster.sortstandalone.SortStandaloneClusterInfo;
 import 
org.apache.inlong.manager.pojo.cluster.sortstandalone.SortStandaloneClusterRequest;
 
-import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.common.base.Joiner;
+import com.google.common.collect.Sets;
 import lombok.extern.slf4j.Slf4j;
+import org.apache.commons.collections4.CollectionUtils;
 import org.apache.commons.lang3.StringUtils;
-import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.stereotype.Service;
 
+import java.util.Set;
+
 @Slf4j
 @Service
 public class SortStandaloneClusterOperator extends AbstractClusterOperator {
 
-    @Autowired
-    private ObjectMapper objectMapper;
-
     @Override
     protected void setTargetEntity(ClusterRequest request, InlongClusterEntity 
targetEntity) {
         SortStandaloneClusterRequest standaloneRequest = 
(SortStandaloneClusterRequest) request;
         CommonBeanUtils.copyProperties(standaloneRequest, targetEntity, true);
-        try {
-            SortStandaloneClusterDTO dto = 
SortStandaloneClusterDTO.getFromRequest(standaloneRequest,
-                    targetEntity.getExtParams());
-            targetEntity.setExtParams(objectMapper.writeValueAsString(dto));
-            log.debug("success to set entity for SortStandalone cluster");
-        } catch (Exception e) {
-            throw new BusinessException(ErrorCodeEnum.CLUSTER_INFO_INCORRECT,
-                    String.format("serialize extParams of SortStandalone 
Cluster failure: %s", e.getMessage()));
+        Set<String> supportedTypes = standaloneRequest.getSupportedSinkTypes();
+        if (CollectionUtils.isNotEmpty(supportedTypes)) {
+            String extTag = 
Joiner.on(InlongConstants.COMMA).join(supportedTypes);
+            targetEntity.setExtTag(extTag);
         }
     }
 
@@ -74,9 +70,10 @@ public class SortStandaloneClusterOperator extends 
AbstractClusterOperator {
 
         SortStandaloneClusterInfo clusterInfo = new 
SortStandaloneClusterInfo();
         CommonBeanUtils.copyProperties(entity, clusterInfo);
-        if (StringUtils.isNotBlank(entity.getExtParams())) {
-            SortStandaloneClusterDTO dto = 
SortStandaloneClusterDTO.getFromJson(entity.getExtParams());
-            CommonBeanUtils.copyProperties(dto, clusterInfo);
+        String extTag = entity.getExtTag();
+        if (StringUtils.isNotBlank(extTag)) {
+            Set<String> supportedTypes = 
Sets.newHashSet(extTag.split(InlongConstants.COMMA));
+            clusterInfo.setSupportedSinkTypes(supportedTypes);
         }
         return clusterInfo;
     }
diff --git 
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/sink/AbstractStandaloneSinkResourceOperator.java
 
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/sink/AbstractStandaloneSinkResourceOperator.java
new file mode 100644
index 0000000000..a7e3fb9e62
--- /dev/null
+++ 
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/sink/AbstractStandaloneSinkResourceOperator.java
@@ -0,0 +1,94 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.manager.service.resource.sink;
+
+import org.apache.inlong.manager.common.consts.InlongConstants;
+import org.apache.inlong.manager.common.util.Preconditions;
+import org.apache.inlong.manager.dao.entity.InlongClusterEntity;
+import org.apache.inlong.manager.dao.entity.InlongGroupEntity;
+import org.apache.inlong.manager.dao.entity.StreamSinkEntity;
+import org.apache.inlong.manager.dao.mapper.InlongClusterEntityMapper;
+import org.apache.inlong.manager.dao.mapper.InlongGroupEntityMapper;
+import org.apache.inlong.manager.dao.mapper.StreamSinkEntityMapper;
+import org.apache.inlong.manager.pojo.sink.SinkInfo;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.collect.Sets;
+import org.apache.commons.lang3.StringUtils;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.util.CollectionUtils;
+
+import java.util.List;
+import java.util.Random;
+import java.util.stream.Collectors;
+
+public abstract class AbstractStandaloneSinkResourceOperator implements 
SinkResourceOperator {
+
+    @Autowired
+    private InlongClusterEntityMapper clusterEntityMapper;
+    @Autowired
+    private StreamSinkEntityMapper sinkEntityMapper;
+    @Autowired
+    private InlongGroupEntityMapper groupEntityMapper;
+
+    private Random rand = new Random();
+
+    @VisibleForTesting
+    protected void assignCluster(SinkInfo sinkInfo) {
+        if (StringUtils.isNotBlank(sinkInfo.getInlongClusterName())) {
+            return;
+        }
+
+        String targetCluster = assignOneCluster(sinkInfo);
+        Preconditions.expectNotBlank(targetCluster,
+                String.format("find no proper cluster assign to group=%s, 
stream=%s, sink type=%s, data node=%s ",
+                        sinkInfo.getInlongGroupId(), 
sinkInfo.getInlongStreamId(), sinkInfo.getSinkType(),
+                        sinkInfo.getDataNodeName()));
+
+        StreamSinkEntity sink = 
sinkEntityMapper.selectByPrimaryKey(sinkInfo.getId());
+        sink.setInlongClusterName(targetCluster);
+        sinkEntityMapper.updateByIdSelective(sink);
+    }
+
+    private String assignOneCluster(SinkInfo sinkInfo) {
+        return StringUtils
+                .firstNonBlank(assignFromExist(sinkInfo.getDataNodeName()),
+                        assignFromRelated(sinkInfo.getSinkType(), 
sinkInfo.getInlongGroupId()));
+    }
+
+    private String assignFromExist(String dataNodeName) {
+        return sinkEntityMapper.selectAssignedCluster(dataNodeName);
+    }
+
+    private String assignFromRelated(String sinkType, String groupId) {
+        InlongGroupEntity group = groupEntityMapper.selectByGroupId(groupId);
+        List<InlongClusterEntity> clusters = clusterEntityMapper
+                .selectStandaloneClusterByType(sinkType).stream()
+                .filter(cluster -> checkCluster(cluster.getClusterTags(), 
group.getInlongClusterTag()))
+                .collect(Collectors.toList());
+
+        return CollectionUtils.isEmpty(clusters) ? null : 
clusters.get(rand.nextInt(clusters.size())).getName();
+
+    }
+
+    private boolean checkCluster(String clusterTags, String targetTag) {
+        return StringUtils.isBlank(clusterTags)
+                || 
Sets.newHashSet(clusterTags.split(InlongConstants.COMMA)).contains(targetTag);
+    }
+
+}
diff --git 
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/sink/cls/ClsResourceOperator.java
 
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/sink/cls/ClsResourceOperator.java
index e69f298ffe..20effc5f93 100644
--- 
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/sink/cls/ClsResourceOperator.java
+++ 
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/sink/cls/ClsResourceOperator.java
@@ -31,7 +31,7 @@ import 
org.apache.inlong.manager.dao.mapper.StreamSinkEntityMapper;
 import org.apache.inlong.manager.pojo.node.cls.ClsDataNodeDTO;
 import org.apache.inlong.manager.pojo.sink.SinkInfo;
 import org.apache.inlong.manager.pojo.sink.cls.ClsSinkDTO;
-import org.apache.inlong.manager.service.resource.sink.SinkResourceOperator;
+import 
org.apache.inlong.manager.service.resource.sink.AbstractStandaloneSinkResourceOperator;
 import org.apache.inlong.manager.service.sink.StreamSinkService;
 
 import com.tencentcloudapi.cls.v20201016.ClsClient;
@@ -52,7 +52,7 @@ import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.stereotype.Service;
 
 @Service
-public class ClsResourceOperator implements SinkResourceOperator {
+public class ClsResourceOperator extends 
AbstractStandaloneSinkResourceOperator {
 
     private static final Logger LOG = 
LoggerFactory.getLogger(ClsResourceOperator.class);
 
@@ -79,6 +79,7 @@ public class ClsResourceOperator implements 
SinkResourceOperator {
             return;
         }
         this.createTopicID(sinkInfo);
+        this.assignCluster(sinkInfo);
     }
 
     /**
diff --git 
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/sink/es/ElasticsearchResourceOperator.java
 
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/sink/es/ElasticsearchResourceOperator.java
index 52b854b207..70165bf541 100644
--- 
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/sink/es/ElasticsearchResourceOperator.java
+++ 
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/sink/es/ElasticsearchResourceOperator.java
@@ -28,7 +28,7 @@ import org.apache.inlong.manager.pojo.sink.SinkInfo;
 import org.apache.inlong.manager.pojo.sink.es.ElasticsearchFieldInfo;
 import org.apache.inlong.manager.pojo.sink.es.ElasticsearchSinkDTO;
 import org.apache.inlong.manager.service.node.DataNodeOperateHelper;
-import org.apache.inlong.manager.service.resource.sink.SinkResourceOperator;
+import 
org.apache.inlong.manager.service.resource.sink.AbstractStandaloneSinkResourceOperator;
 import org.apache.inlong.manager.service.sink.StreamSinkService;
 
 import org.apache.commons.collections.CollectionUtils;
@@ -45,7 +45,7 @@ import java.util.List;
  * Elasticsearch's resource operator
  */
 @Service
-public class ElasticsearchResourceOperator implements SinkResourceOperator {
+public class ElasticsearchResourceOperator extends 
AbstractStandaloneSinkResourceOperator {
 
     private static final Logger LOGGER = 
LoggerFactory.getLogger(ElasticsearchResourceOperator.class);
     @Autowired
@@ -76,6 +76,7 @@ public class ElasticsearchResourceOperator implements 
SinkResourceOperator {
         }
 
         this.createIndex(sinkInfo);
+        this.assignCluster(sinkInfo);
     }
 
     private void createIndex(SinkInfo sinkInfo) {
diff --git 
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/sink/pulsar/PulsarResourceOperator.java
 
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/sink/pulsar/PulsarResourceOperator.java
index 0d6df611ea..a38b391131 100644
--- 
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/sink/pulsar/PulsarResourceOperator.java
+++ 
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/sink/pulsar/PulsarResourceOperator.java
@@ -34,7 +34,7 @@ import 
org.apache.inlong.manager.pojo.sink.pulsar.PulsarSinkDTO;
 import org.apache.inlong.manager.service.node.DataNodeOperateHelper;
 import org.apache.inlong.manager.service.resource.queue.pulsar.PulsarOperator;
 import org.apache.inlong.manager.service.resource.queue.pulsar.PulsarUtils;
-import org.apache.inlong.manager.service.resource.sink.SinkResourceOperator;
+import 
org.apache.inlong.manager.service.resource.sink.AbstractStandaloneSinkResourceOperator;
 import org.apache.inlong.manager.service.sink.StreamSinkService;
 
 import org.apache.pulsar.client.admin.PulsarAdmin;
@@ -49,7 +49,7 @@ import org.springframework.stereotype.Service;
  * Pulsar resource operate for creating pulsar resource
  */
 @Service
-public class PulsarResourceOperator implements SinkResourceOperator {
+public class PulsarResourceOperator extends 
AbstractStandaloneSinkResourceOperator {
 
     private static final Logger LOG = 
LoggerFactory.getLogger(PulsarResourceOperator.class);
 
@@ -76,6 +76,7 @@ public class PulsarResourceOperator implements 
SinkResourceOperator {
             return;
         }
         this.createTopic(sinkInfo);
+        this.assignCluster(sinkInfo);
     }
 
     private void createTopic(SinkInfo sinkInfo) {
diff --git 
a/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/resource/sink/StandaloneAutoAssignTest.java
 
b/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/resource/sink/StandaloneAutoAssignTest.java
new file mode 100644
index 0000000000..b7a246003e
--- /dev/null
+++ 
b/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/resource/sink/StandaloneAutoAssignTest.java
@@ -0,0 +1,92 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.manager.service.resource.sink;
+
+import org.apache.inlong.common.constant.MQType;
+import org.apache.inlong.manager.common.consts.SinkType;
+import org.apache.inlong.manager.dao.entity.StreamSinkEntity;
+import org.apache.inlong.manager.dao.mapper.StreamSinkEntityMapper;
+import 
org.apache.inlong.manager.pojo.cluster.sortstandalone.SortStandaloneClusterRequest;
+import org.apache.inlong.manager.pojo.group.InlongGroupInfo;
+import org.apache.inlong.manager.pojo.sink.SinkInfo;
+import org.apache.inlong.manager.pojo.stream.InlongStreamInfo;
+import org.apache.inlong.manager.service.ServiceBaseTest;
+import org.apache.inlong.manager.service.cluster.InlongClusterService;
+
+import com.google.common.collect.Sets;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Test;
+import org.springframework.beans.factory.annotation.Autowired;
+
+import java.util.List;
+import java.util.Set;
+
+public class StandaloneAutoAssignTest extends ServiceBaseTest {
+
+    @Autowired
+    private StreamSinkEntityMapper sinkEntityMapper;
+    @Autowired
+    private InlongClusterService clusterService;
+    @Autowired
+    private TestStandaloneSinkResourceOperator testResourceOperator;
+    @Test
+    public void testAutoAssign() {
+
+        String group = "autoGroup";
+        String stream = "autoStream";
+        InlongGroupInfo groupInfo = this.createInlongGroup(group, 
MQType.PULSAR);
+        InlongStreamInfo streamInfo = this.createStreamInfo(groupInfo, stream);
+        Integer id = saveClsSink(groupInfo.getInlongGroupId(), 
streamInfo.getInlongStreamId());
+
+        String clusterName = "clsCluster";
+        Set<String> types = Sets.newHashSet(SinkType.CLS, 
SinkType.ELASTICSEARCH);
+        saveStandaloneCluster(groupInfo.getInlongClusterTag(), clusterName, 
types);
+
+        List<SinkInfo> sinkInfos = 
sinkEntityMapper.selectAllConfig(groupInfo.getInlongGroupId(), null);
+        Assertions.assertEquals(1, sinkInfos.size());
+        SinkInfo clsSinkInfo = sinkInfos.get(0);
+
+        testResourceOperator.assignCluster(clsSinkInfo);
+
+        StreamSinkEntity newClsEntity = 
sinkEntityMapper.selectByPrimaryKey(id);
+        Assertions.assertEquals(clusterName, 
newClsEntity.getInlongClusterName());
+
+    }
+
+    public Integer saveClsSink(String groupId, String streamId) {
+        StreamSinkEntity clsSinkEntity = new StreamSinkEntity();
+        clsSinkEntity.setDataNodeName("testNode");
+        clsSinkEntity.setSinkType(SinkType.CLS);
+        clsSinkEntity.setSinkName("testClsSink");
+        clsSinkEntity.setInlongGroupId(groupId);
+        clsSinkEntity.setInlongStreamId(streamId);
+        clsSinkEntity.setCreator(GLOBAL_OPERATOR);
+
+        return sinkEntityMapper.insert(clsSinkEntity);
+    }
+
+    public Integer saveStandaloneCluster(String clusterTag, String 
clusterName, Set<String> supportedSinkTypes) {
+        SortStandaloneClusterRequest request = new 
SortStandaloneClusterRequest();
+        request.setClusterTags(clusterTag);
+        request.setName(clusterName);
+        request.setSupportedSinkTypes(supportedSinkTypes);
+        request.setInCharges(GLOBAL_OPERATOR);
+        return clusterService.save(request, GLOBAL_OPERATOR);
+    }
+
+}
diff --git 
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/SinkInfo.java
 
b/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/resource/sink/TestStandaloneSinkResourceOperator.java
similarity index 51%
copy from 
inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/SinkInfo.java
copy to 
inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/resource/sink/TestStandaloneSinkResourceOperator.java
index c2d2c3fff2..5d9567737f 100644
--- 
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/SinkInfo.java
+++ 
b/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/resource/sink/TestStandaloneSinkResourceOperator.java
@@ -15,34 +15,15 @@
  * limitations under the License.
  */
 
-package org.apache.inlong.manager.pojo.sink;
+package org.apache.inlong.manager.service.resource.sink;
 
-import io.swagger.annotations.ApiModel;
-import lombok.Data;
+import org.springframework.stereotype.Service;
 
-/**
- * Sink info - with stream
- */
-@Data
-@ApiModel("Sink info - with stream")
-public class SinkInfo {
-
-    private Integer id;
-    private String inlongGroupId;
-    private String inlongStreamId;
-    private String sinkType;
-    private String sinkName;
-    private String dataNodeName;
-    private String description;
-    private Integer enableCreateResource;
-    private String extParams;
-    private Integer status;
-    private String creator;
-
-    // Inlong stream info
-    private String mqResource;
-    private String dataType;
-    private String sourceSeparator; // Source separator configured in the 
stream info
-    private String dataEscapeChar;
+@Service
+public class TestStandaloneSinkResourceOperator extends 
AbstractStandaloneSinkResourceOperator {
 
+    @Override
+    public Boolean accept(String sinkType) {
+        return null;
+    }
 }

Reply via email to