This is an automated email from the ASF dual-hosted git repository.

dockerzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git


The following commit(s) were added to refs/heads/master by this push:
     new 2573798d5 [INLONG-4937][Manager] One inlong group id can transform 
data across two cluster tags (#4938)
2573798d5 is described below

commit 2573798d52c41a7ee4d7df0c948d5098743898b6
Author: 卢春亮 <[email protected]>
AuthorDate: Wed Jul 20 10:21:42 2022 +0800

    [INLONG-4937][Manager] One inlong group id can transform data across two 
cluster tags (#4938)
---
 inlong-manager/manager-service/pom.xml             |  40 ++
 .../repository/DataProxyConfigRepository.java      | 428 ++++++++++++++++++---
 .../repository/DataProxyConfigRepositoryTest.java  | 189 +++++++++
 .../controller/openapi/DataProxyController.java    |  34 +-
 4 files changed, 642 insertions(+), 49 deletions(-)

diff --git a/inlong-manager/manager-service/pom.xml 
b/inlong-manager/manager-service/pom.xml
index 8acf8b440..da4c8739f 100644
--- a/inlong-manager/manager-service/pom.xml
+++ b/inlong-manager/manager-service/pom.xml
@@ -506,6 +506,46 @@
             <artifactId>junit-jupiter</artifactId>
             <scope>test</scope>
         </dependency>
+
+        <dependency>
+            <groupId>org.powermock</groupId>
+            <artifactId>powermock-module-junit4</artifactId>
+            <scope>test</scope>
+            <exclusions>
+                <exclusion>
+                    <groupId>org.objenesis</groupId>
+                    <artifactId>objenesis</artifactId>
+                </exclusion>
+                <exclusion>
+                    <groupId>net.bytebuddy</groupId>
+                    <artifactId>byte-buddy-agent</artifactId>
+                </exclusion>
+            </exclusions>
+        </dependency>
+        <dependency>
+            <groupId>org.powermock</groupId>
+            <artifactId>powermock-module-testng</artifactId>
+            <scope>test</scope>
+            <exclusions>
+                <exclusion>
+                    <groupId>org.testng</groupId>
+                    <artifactId>testng</artifactId>
+                </exclusion>
+                <exclusion>
+                    <groupId>net.bytebuddy</groupId>
+                    <artifactId>byte-buddy-agent</artifactId>
+                </exclusion>
+                <exclusion>
+                    <groupId>org.javassist</groupId>
+                    <artifactId>javassist</artifactId>
+                </exclusion>
+            </exclusions>
+        </dependency>
+        <dependency>
+            <groupId>org.powermock</groupId>
+            <artifactId>powermock-api-mockito2</artifactId>
+            <scope>test</scope>
+        </dependency>
     </dependencies>
 
 </project>
diff --git 
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/repository/DataProxyConfigRepository.java
 
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/repository/DataProxyConfigRepository.java
index 7c0a4feb5..513bec16c 100644
--- 
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/repository/DataProxyConfigRepository.java
+++ 
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/repository/DataProxyConfigRepository.java
@@ -19,6 +19,9 @@ package org.apache.inlong.manager.service.repository;
 
 import com.google.common.base.Splitter;
 import com.google.gson.Gson;
+import com.google.gson.JsonObject;
+
+import org.apache.commons.beanutils.BeanUtils;
 import org.apache.commons.codec.digest.DigestUtils;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.inlong.common.pojo.dataproxy.CacheClusterObject;
@@ -29,11 +32,21 @@ import org.apache.inlong.common.pojo.dataproxy.IRepository;
 import org.apache.inlong.common.pojo.dataproxy.InLongIdObject;
 import org.apache.inlong.common.pojo.dataproxy.ProxyClusterObject;
 import org.apache.inlong.common.pojo.dataproxy.RepositoryTimerTask;
+import org.apache.inlong.manager.common.enums.ErrorCodeEnum;
+import org.apache.inlong.manager.common.exceptions.BusinessException;
+import org.apache.inlong.manager.common.pojo.cluster.ClusterPageRequest;
 import org.apache.inlong.manager.common.pojo.dataproxy.CacheCluster;
 import org.apache.inlong.manager.common.pojo.dataproxy.InlongGroupId;
 import org.apache.inlong.manager.common.pojo.dataproxy.InlongStreamId;
 import org.apache.inlong.manager.common.pojo.dataproxy.ProxyCluster;
+import org.apache.inlong.manager.common.pojo.sink.SinkPageRequest;
+import org.apache.inlong.manager.dao.entity.InlongClusterEntity;
+import org.apache.inlong.manager.dao.entity.InlongGroupEntity;
+import org.apache.inlong.manager.dao.entity.StreamSinkEntity;
 import org.apache.inlong.manager.dao.mapper.ClusterSetMapper;
+import org.apache.inlong.manager.dao.mapper.InlongClusterEntityMapper;
+import org.apache.inlong.manager.dao.mapper.InlongGroupEntityMapper;
+import org.apache.inlong.manager.dao.mapper.StreamSinkEntityMapper;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.springframework.beans.factory.annotation.Autowired;
@@ -41,7 +54,8 @@ import org.springframework.context.annotation.Lazy;
 import org.springframework.stereotype.Repository;
 import org.springframework.transaction.annotation.Transactional;
 
-import javax.annotation.PostConstruct;
+import java.lang.reflect.InvocationTargetException;
+import java.sql.Date;
 import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.List;
@@ -51,6 +65,8 @@ import java.util.Timer;
 import java.util.TimerTask;
 import java.util.concurrent.ConcurrentHashMap;
 
+import javax.annotation.PostConstruct;
+
 /**
  * DataProxyConfigRepository
  */
@@ -58,11 +74,19 @@ import java.util.concurrent.ConcurrentHashMap;
 @Repository(value = "dataProxyConfigRepository")
 public class DataProxyConfigRepository implements IRepository {
 
+    public static final Logger LOGGER = 
LoggerFactory.getLogger(DataProxyConfigRepository.class);
+
+    public static final String KEY_BACKUP_CLUSTER_TAG = "backup_cluster_tag";
+    public static final String KEY_BACKUP_TOPIC = "backup_topic";
+    public static final String KEY_SORT_TASK_NAME = "defaultSortTaskName";
+    public static final String KEY_DATA_NODE_NAME = "defaultDataNodeName";
+    public static final String KEY_SORT_CONSUEMER_GROUP = 
"defaultSortConsumerGroup";
+    public static final String KEY_SINK_NAME = "defaultSinkName";
+
     public static final Splitter.MapSplitter MAP_SPLITTER = 
Splitter.on(SEPARATOR).trimResults()
             .withKeyValueSeparator(KEY_VALUE_SEPARATOR);
     public static final String CACHE_CLUSTER_PRODUCER_TAG = "producer";
     public static final String CACHE_CLUSTER_CONSUMER_TAG = "consumer";
-    private static final Logger LOGGER = 
LoggerFactory.getLogger(DataProxyConfigRepository.class);
     private static final Gson gson = new Gson();
 
     // key: proxyClusterName, value: jsonString
@@ -74,6 +98,12 @@ public class DataProxyConfigRepository implements 
IRepository {
 
     @Autowired
     private ClusterSetMapper clusterSetMapper;
+    @Autowired
+    private InlongClusterEntityMapper clusterMapper;
+    @Autowired
+    private InlongGroupEntityMapper inlongGroupMapper;
+    @Autowired
+    private StreamSinkEntityMapper streamSinkMapper;
 
     @PostConstruct
     public void initialize() {
@@ -87,6 +117,70 @@ public class DataProxyConfigRepository implements 
IRepository {
         }
     }
 
+    /**
+     * get clusterSetMapper
+     * @return the clusterSetMapper
+     */
+    public ClusterSetMapper getClusterSetMapper() {
+        return clusterSetMapper;
+    }
+
+    /**
+     * set clusterSetMapper
+     * @param clusterSetMapper the clusterSetMapper to set
+     */
+    public void setClusterSetMapper(ClusterSetMapper clusterSetMapper) {
+        this.clusterSetMapper = clusterSetMapper;
+    }
+
+    /**
+     * get clusterMapper
+     * @return the clusterMapper
+     */
+    public InlongClusterEntityMapper getClusterMapper() {
+        return clusterMapper;
+    }
+
+    /**
+     * set clusterMapper
+     * @param clusterMapper the clusterMapper to set
+     */
+    public void setClusterMapper(InlongClusterEntityMapper clusterMapper) {
+        this.clusterMapper = clusterMapper;
+    }
+
+    /**
+     * get inlongGroupMapper
+     * @return the inlongGroupMapper
+     */
+    public InlongGroupEntityMapper getInlongGroupMapper() {
+        return inlongGroupMapper;
+    }
+
+    /**
+     * set inlongGroupMapper
+     * @param inlongGroupMapper the inlongGroupMapper to set
+     */
+    public void setInlongGroupMapper(InlongGroupEntityMapper 
inlongGroupMapper) {
+        this.inlongGroupMapper = inlongGroupMapper;
+    }
+
+    /**
+     * get streamSinkMapper
+     * @return the streamSinkMapper
+     */
+    public StreamSinkEntityMapper getStreamSinkMapper() {
+        return streamSinkMapper;
+    }
+
+    /**
+     * set streamSinkMapper
+     * @param streamSinkMapper the streamSinkMapper to set
+     */
+    public void setStreamSinkMapper(StreamSinkEntityMapper streamSinkMapper) {
+        this.streamSinkMapper = streamSinkMapper;
+    }
+
     /**
      * reload
      */
@@ -152,7 +246,6 @@ public class DataProxyConfigRepository implements 
IRepository {
     /**
      * reloadInlongId
      */
-    @SuppressWarnings("unchecked")
     private Map<String, List<InLongIdObject>> reloadInlongId() {
         // parse group
         Map<String, InlongGroupId> groupIdMap = new HashMap<>();
@@ -165,53 +258,108 @@ public class DataProxyConfigRepository implements 
IRepository {
             if (groupId == null) {
                 continue;
             }
-            // choose topic
-            String groupTopic = groupIdObj.getTopic();
-            String streamTopic = streamIdObj.getTopic();
-            String finalTopic = null;
-            if (StringUtils.isEmpty(groupTopic)) {
-                // both empty then ignore
-                if (StringUtils.isEmpty(streamTopic)) {
-                    continue;
-                } else {
-                    finalTopic = streamTopic;
-                }
+            Map<String, String> groupParams = 
this.getExtParams(groupIdObj.getExtParams());
+            Map<String, String> streamParams = 
this.getExtParams(streamIdObj.getExtParams());
+            this.parseMasterTopic(groupIdObj, streamIdObj, groupParams, 
streamParams, inlongIdMap);
+            this.parseBackupTopic(groupIdObj, streamIdObj, groupParams, 
streamParams, inlongIdMap);
+        }
+        return inlongIdMap;
+    }
+
+    /**
+     * getExtParams
+     */
+    @SuppressWarnings("unchecked")
+    private Map<String, String> getExtParams(String extParams) {
+        // parse extparams
+        if (!StringUtils.isEmpty(extParams)) {
+            try {
+                Map<String, String> groupParams = gson.fromJson(extParams, 
HashMap.class);
+                return groupParams;
+            } catch (Exception e) {
+                LOGGER.error("Fail to parse ext error:{},params:{}", 
e.getMessage(), extParams, e);
+            }
+        }
+        return new HashMap<>();
+    }
+
+    /**
+     * parseMasterTopic
+     */
+    private void parseMasterTopic(InlongGroupId groupIdObj, InlongStreamId 
streamIdObj,
+            Map<String, String> groupParams, Map<String, String> streamParams,
+            Map<String, List<InLongIdObject>> inlongIdMap) {
+        // choose topic
+        String groupTopic = groupIdObj.getTopic();
+        String streamTopic = streamIdObj.getTopic();
+        String finalTopic = null;
+        if (StringUtils.isEmpty(groupTopic)) {
+            // both empty then ignore
+            if (StringUtils.isEmpty(streamTopic)) {
+                return;
             } else {
-                if (StringUtils.isEmpty(streamTopic)) {
-                    finalTopic = groupTopic;
-                } else {
-                    // Pulsar: namespace+topic
-                    finalTopic = groupTopic + "/" + streamTopic;
-                }
+                finalTopic = streamTopic;
             }
-            // concat id
-            InLongIdObject obj = new InLongIdObject();
-            String inlongId = groupId + "." + streamIdObj.getInlongStreamId();
-            obj.setInlongId(inlongId);
-            obj.setTopic(finalTopic);
-            Map<String, String> params = new HashMap<>();
-            obj.setParams(params);
-            // parse group extparams
-            if (!StringUtils.isEmpty(groupIdObj.getExtParams())) {
-                try {
-                    Map<String, String> groupParams = 
gson.fromJson(groupIdObj.getExtParams(), Map.class);
-                    params.putAll(groupParams);
-                } catch (Exception e) {
-                    LOGGER.error(e.getMessage(), e);
-                }
+        } else {
+            if (StringUtils.isEmpty(streamTopic)) {
+                finalTopic = groupTopic;
+            } else {
+                // Pulsar: namespace+topic
+                finalTopic = groupTopic + "/" + streamTopic;
             }
-            // parse stream extparams
-            if (!StringUtils.isEmpty(streamIdObj.getExtParams())) {
-                try {
-                    Map<String, String> streamParams = 
gson.fromJson(streamIdObj.getExtParams(), Map.class);
-                    params.putAll(streamParams);
-                } catch (Exception e) {
-                    LOGGER.error(e.getMessage(), e);
-                }
+        }
+        // concat id
+        InLongIdObject obj = new InLongIdObject();
+        String inlongId = streamIdObj.getInlongGroupId() + "." + 
streamIdObj.getInlongStreamId();
+        obj.setInlongId(inlongId);
+        obj.setTopic(finalTopic);
+        Map<String, String> params = new HashMap<>();
+        params.putAll(groupParams);
+        params.putAll(streamParams);
+        obj.setParams(params);
+        inlongIdMap.computeIfAbsent(groupIdObj.getClusterTag(), k -> new 
ArrayList<>()).add(obj);
+    }
+
+    /**
+     * parseBackupTopic
+     */
+    private void parseBackupTopic(InlongGroupId groupIdObj, InlongStreamId 
streamIdObj,
+            Map<String, String> groupParams, Map<String, String> streamParams,
+            Map<String, List<InLongIdObject>> inlongIdMap) {
+        Map<String, String> params = new HashMap<>();
+        params.putAll(groupParams);
+        params.putAll(streamParams);
+        // find backup cluster tag
+        String clusterTag = params.get(KEY_BACKUP_CLUSTER_TAG);
+        if (StringUtils.isEmpty(clusterTag)) {
+            return;
+        }
+        // find backup topic
+        String groupTopic = groupParams.get(KEY_BACKUP_TOPIC);
+        String streamTopic = streamParams.get(KEY_BACKUP_TOPIC);
+        String finalTopic = null;
+        if (StringUtils.isEmpty(groupTopic)) {
+            // both empty then ignore
+            if (StringUtils.isEmpty(streamTopic)) {
+                return;
+            } else {
+                finalTopic = streamTopic;
+            }
+        } else {
+            if (StringUtils.isEmpty(streamTopic)) {
+                finalTopic = groupTopic;
+            } else {
+                // Pulsar: namespace+topic
+                finalTopic = groupTopic + "/" + streamTopic;
             }
-            inlongIdMap.computeIfAbsent(groupIdObj.getClusterTag(), k -> new 
ArrayList<>()).add(obj);
         }
-        return inlongIdMap;
+        // concat id
+        InLongIdObject obj = new InLongIdObject();
+        String inlongId = streamIdObj.getInlongGroupId() + "." + 
streamIdObj.getInlongStreamId();
+        obj.setInlongId(inlongId);
+        obj.setTopic(finalTopic);
+        obj.setParams(params);
+        inlongIdMap.computeIfAbsent(clusterTag, k -> new 
ArrayList<>()).add(obj);
     }
 
     /**
@@ -315,4 +463,194 @@ public class DataProxyConfigRepository implements 
IRepository {
     public String getProxyConfigJson(String clusterName) {
         return this.proxyConfigJson.get(clusterName);
     }
+
+    /**
+     * changeClusterTag
+     */
+    public String changeClusterTag(String inlongGroupId, String clusterTag,
+            String topic) {
+        try {
+            // select
+            InlongGroupEntity oldGroup = 
inlongGroupMapper.selectByGroupId(inlongGroupId);
+            if (oldGroup == null) {
+                throw new BusinessException(ErrorCodeEnum.GROUP_NOT_FOUND);
+            }
+            String oldClusterTag = oldGroup.getInlongClusterTag();
+            if (StringUtils.equals(oldClusterTag, clusterTag)) {
+                return "Cluster tag is same.";
+            }
+            // prepare group
+            final InlongGroupEntity newGroup = 
this.prepareClusterTagGroup(oldGroup, clusterTag, topic);
+            // load cluster
+            Map<String, InlongClusterEntity> clusterMap = new HashMap<>();
+            ClusterPageRequest clusterRequest = new ClusterPageRequest();
+            List<InlongClusterEntity> clusters = 
clusterMapper.selectByCondition(clusterRequest);
+            clusters.forEach((v) -> {
+                clusterMap.put(v.getName(), v);
+            });
+            // prepare stream sink
+            SinkPageRequest request = new SinkPageRequest();
+            request.setInlongGroupId(inlongGroupId);
+            List<StreamSinkEntity> streamSinks = 
streamSinkMapper.selectByCondition(request);
+            List<StreamSinkEntity> newStreamSinks = new ArrayList<>();
+            for (StreamSinkEntity streamSink : streamSinks) {
+                String clusterName = streamSink.getInlongClusterName();
+                InlongClusterEntity cluster = clusterMap.get(clusterName);
+                if (cluster == null) {
+                    continue;
+                }
+                if (!StringUtils.equals(oldClusterTag, 
cluster.getClusterTags())) {
+                    continue;
+                }
+                String clusterType = cluster.getType();
+                // find the cluster of same cluster tag and sink type, and add 
new stream sink
+                StreamSinkEntity newStreamSink = 
this.createNewStreamSink(clusters, clusterType, clusterTag,
+                        streamSink);
+                if (newStreamSink != null) {
+                    newStreamSinks.add(newStreamSink);
+                }
+            }
+            // update
+            newStreamSinks.forEach((v) -> {
+                streamSinkMapper.insert(v);
+            });
+            inlongGroupMapper.updateByIdentifierSelective(newGroup);
+            return inlongGroupId;
+        } catch (Exception e) {
+            LOGGER.error(e.getMessage(), e);
+            return e.getMessage();
+        }
+    }
+
+    /**
+     * createNewStreamSink
+     */
+    private StreamSinkEntity createNewStreamSink(List<InlongClusterEntity> 
clusters, String clusterType,
+            String clusterTag, StreamSinkEntity srcStreamSink) {
+        for (InlongClusterEntity v : clusters) {
+            if (StringUtils.equals(clusterType, v.getType())
+                    && StringUtils.equals(clusterTag, v.getClusterTags())) {
+                String newExtParams = v.getExtParams();
+                Gson gson = new Gson();
+                JsonObject extParams = gson.fromJson(newExtParams, 
JsonObject.class);
+                if (extParams.has(KEY_SINK_NAME) && 
extParams.has(KEY_SORT_TASK_NAME)
+                        && extParams.has(KEY_DATA_NODE_NAME) && 
extParams.has(KEY_SORT_CONSUEMER_GROUP)) {
+                    final String sinkName = 
extParams.get(KEY_SINK_NAME).getAsString();
+                    final String sortTaskName = 
extParams.get(KEY_SORT_TASK_NAME).getAsString();
+                    final String dataNodeName = 
extParams.get(KEY_DATA_NODE_NAME).getAsString();
+                    final String sortConsumerGroup = 
extParams.get(KEY_SORT_CONSUEMER_GROUP).getAsString();
+                    StreamSinkEntity newStreamSink = 
copyStreamSink(srcStreamSink);
+                    newStreamSink.setInlongClusterName(v.getName());
+                    newStreamSink.setSinkName(sinkName);
+                    newStreamSink.setSortTaskName(sortTaskName);
+                    newStreamSink.setDataNodeName(dataNodeName);
+                    newStreamSink.setSortConsumerGroup(sortConsumerGroup);
+                    return newStreamSink;
+                }
+                return null;
+            }
+        }
+        return null;
+    }
+
+    /**
+     * copyStreamSink
+     */
+    private StreamSinkEntity copyStreamSink(StreamSinkEntity streamSink) {
+        try {
+            StreamSinkEntity streamSinkDest = new StreamSinkEntity();
+            BeanUtils.copyProperties(streamSinkDest, streamSink);
+            streamSinkDest.setId(null);
+            streamSinkDest.setModifyTime(new Date(System.currentTimeMillis()));
+            return streamSinkDest;
+        } catch (Exception e) {
+            LOGGER.error("Fail to copy stream sink:{}", e.getMessage(), e);
+            return null;
+        }
+    }
+
+    /**
+     * prepareClusterTagGroup
+     */
+    private InlongGroupEntity prepareClusterTagGroup(InlongGroupEntity 
oldGroup, String clusterTag, String topic)
+            throws IllegalAccessException, InvocationTargetException {
+        // parse ext_params
+        String extParams = oldGroup.getExtParams();
+        if (StringUtils.isEmpty(extParams)) {
+            extParams = "{}";
+        }
+        // parse json
+        Gson gson = new Gson();
+        JsonObject extParamsObj = gson.fromJson(extParams, JsonObject.class);
+        // change cluster tag
+        extParamsObj.addProperty(KEY_BACKUP_CLUSTER_TAG, 
oldGroup.getInlongClusterTag());
+        extParamsObj.addProperty(KEY_BACKUP_TOPIC, oldGroup.getMqResource());
+        // copy properties
+        InlongGroupEntity newGroup = new InlongGroupEntity();
+        BeanUtils.copyProperties(newGroup, oldGroup);
+        newGroup.setId(null);
+        // change properties
+        newGroup.setInlongClusterTag(clusterTag);
+        newGroup.setMqResource(topic);
+        String newExtParams = extParamsObj.toString();
+        newGroup.setExtParams(newExtParams);
+        return newGroup;
+    }
+
+    /**
+     * removeBackupClusterTag
+     */
+    public String removeBackupClusterTag(String inlongGroupId) {
+        // select
+        InlongGroupEntity oldGroup = 
inlongGroupMapper.selectByGroupId(inlongGroupId);
+        if (oldGroup == null) {
+            throw new BusinessException(ErrorCodeEnum.GROUP_NOT_FOUND);
+        }
+        // parse ext_params
+        String extParams = oldGroup.getExtParams();
+        if (StringUtils.isEmpty(extParams)) {
+            return inlongGroupId;
+        }
+        // parse json
+        Gson gson = new Gson();
+        JsonObject extParamsObj = gson.fromJson(extParams, JsonObject.class);
+        if (!extParamsObj.has(KEY_BACKUP_CLUSTER_TAG)) {
+            return inlongGroupId;
+        }
+        final String oldClusterTag = 
extParamsObj.get(KEY_BACKUP_CLUSTER_TAG).getAsString();
+        extParamsObj.remove(KEY_BACKUP_CLUSTER_TAG);
+        extParamsObj.remove(KEY_BACKUP_TOPIC);
+        String newExtParams = extParamsObj.toString();
+        oldGroup.setExtParams(newExtParams);
+        // update group
+        inlongGroupMapper.updateByIdentifierSelective(oldGroup);
+
+        // load cluster
+        Map<String, InlongClusterEntity> clusterMap = new HashMap<>();
+        ClusterPageRequest clusterRequest = new ClusterPageRequest();
+        List<InlongClusterEntity> clusters = 
clusterMapper.selectByCondition(clusterRequest);
+        clusters.forEach((v) -> {
+            clusterMap.put(v.getName(), v);
+        });
+        // prepare stream sink
+        SinkPageRequest request = new SinkPageRequest();
+        request.setInlongGroupId(inlongGroupId);
+        List<StreamSinkEntity> streamSinks = 
streamSinkMapper.selectByCondition(request);
+        List<StreamSinkEntity> deleteStreamSinks = new ArrayList<>();
+        for (StreamSinkEntity streamSink : streamSinks) {
+            String clusterName = streamSink.getInlongClusterName();
+            InlongClusterEntity cluster = clusterMap.get(clusterName);
+            if (cluster == null) {
+                continue;
+            }
+            if (StringUtils.equals(oldClusterTag, cluster.getClusterTags())) {
+                deleteStreamSinks.add(streamSink);
+            }
+        }
+        // delete old stream sink
+        deleteStreamSinks.forEach((v) -> {
+            streamSinkMapper.deleteByPrimaryKey(v.getId());
+        });
+        return inlongGroupId;
+    }
 }
diff --git 
a/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/repository/DataProxyConfigRepositoryTest.java
 
b/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/repository/DataProxyConfigRepositoryTest.java
new file mode 100644
index 000000000..ae9ff0114
--- /dev/null
+++ 
b/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/repository/DataProxyConfigRepositoryTest.java
@@ -0,0 +1,189 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.manager.service.repository;
+
+import org.apache.inlong.manager.common.pojo.sink.SinkPageRequest;
+import org.apache.inlong.manager.dao.entity.InlongClusterEntity;
+import org.apache.inlong.manager.dao.entity.InlongGroupEntity;
+import org.apache.inlong.manager.dao.entity.StreamSinkEntity;
+import org.apache.inlong.manager.dao.mapper.InlongClusterEntityMapper;
+import org.apache.inlong.manager.dao.mapper.InlongGroupEntityMapper;
+import org.apache.inlong.manager.dao.mapper.StreamSinkEntityMapper;
+import org.junit.jupiter.api.Test;
+import org.junit.runner.RunWith;
+import org.powermock.api.mockito.PowerMockito;
+import org.powermock.core.classloader.annotations.PrepareForTest;
+import org.powermock.modules.junit4.PowerMockRunner;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.anyInt;
+import static org.mockito.ArgumentMatchers.anyString;
+
+/**
+ * DataProxyConfigRepositoryTest
+ * 
+ */
+@RunWith(PowerMockRunner.class)
+@PrepareForTest({InlongGroupEntityMapper.class, StreamSinkEntityMapper.class, 
InlongClusterEntityMapper.class})
+public class DataProxyConfigRepositoryTest {
+
+    private static final String INLONG_GROUP_ID = "03a00000026";
+    private static final String INLONG_STREAM_ID = "1";
+    private static final String CLUSTER_TAG_NEW = "ct_new";
+    private static final String CLUSTER_TAG_OLD = "ct_old";
+    private static final String TOPIC_NEW = "t_03a00000026";
+    private static final String TOPIC_OLD = "t_03a00000026";
+    private static final String CLS_CLUSTER_NEW = "cls_new";
+    private static final String CLS_CLUSTER_OLD = "cls_old";
+    private static final String CLS_DATA_NODE_NEW = "sid_cls_new";
+    private static final String CLS_DATA_NODE_OLD = "sid_cls_old";
+
+    @Test
+    public void testChangeClusterTag() {
+        DataProxyConfigRepository repository = new DataProxyConfigRepository();
+        repository.setInlongGroupMapper(this.mockGroupMapper());
+        repository.setClusterMapper(this.mockClusterMapper());
+        repository.setStreamSinkMapper(this.mockStreamSinkMapper());
+        String inlongGroupId = repository.changeClusterTag(INLONG_GROUP_ID, 
CLUSTER_TAG_NEW, TOPIC_NEW);
+        assertEquals(inlongGroupId, INLONG_GROUP_ID);
+    }
+
+    private InlongGroupEntityMapper mockGroupMapper() {
+        InlongGroupEntity entity = new InlongGroupEntity();
+        entity.setInlongGroupId(INLONG_GROUP_ID);
+        entity.setInlongClusterTag(CLUSTER_TAG_OLD);
+        entity.setMqResource(TOPIC_OLD);
+        InlongGroupEntityMapper mapper = 
PowerMockito.mock(InlongGroupEntityMapper.class);
+        
PowerMockito.when(mapper.selectByGroupId(anyString())).thenReturn(entity);
+        
PowerMockito.when(mapper.updateByIdentifierSelective(any())).thenReturn(1);
+        return mapper;
+    }
+
+    private InlongClusterEntityMapper mockClusterMapper() {
+        final List<InlongClusterEntity> clusters = new ArrayList<>();
+        InlongClusterEntity dataProxyCluster = new InlongClusterEntity();
+        dataProxyCluster.setName("dp_1");
+        dataProxyCluster.setType("DATA_PROXY");
+        dataProxyCluster.setClusterTags(CLUSTER_TAG_OLD);
+        dataProxyCluster.setExtTag("sz=true");
+        dataProxyCluster.setExtParams("{}");
+        clusters.add(dataProxyCluster);
+        InlongClusterEntity pulsarCluster = new InlongClusterEntity();
+        pulsarCluster.setName("pc_1");
+        pulsarCluster.setType("PULSAR");
+        pulsarCluster.setClusterTags(CLUSTER_TAG_OLD);
+        pulsarCluster.setExtTag("sz=true&producer=true&consumer=true");
+        pulsarCluster.setExtParams("{}");
+        clusters.add(pulsarCluster);
+        InlongClusterEntity clsCluster = new InlongClusterEntity();
+        clsCluster.setName(CLS_CLUSTER_OLD);
+        clsCluster.setType("cls");
+        clsCluster.setClusterTags(CLUSTER_TAG_OLD);
+        
clsCluster.setExtParams(String.format("{\"%s\":\"%s\",\"%s\":\"%s\",\"%s\":\"%s\",\"%s\":\"%s\"}",
+                DataProxyConfigRepository.KEY_SINK_NAME, CLS_DATA_NODE_OLD,
+                DataProxyConfigRepository.KEY_SORT_TASK_NAME, 
CLS_DATA_NODE_OLD,
+                DataProxyConfigRepository.KEY_DATA_NODE_NAME, 
CLS_DATA_NODE_OLD,
+                DataProxyConfigRepository.KEY_SORT_CONSUEMER_GROUP, 
CLS_DATA_NODE_OLD));
+        clusters.add(clsCluster);
+        InlongClusterEntity kafkaCluster = new InlongClusterEntity();
+        kafkaCluster.setName("kafka_1");
+        kafkaCluster.setType("kafka");
+        kafkaCluster.setExtParams("{}");
+        clusters.add(kafkaCluster);
+        InlongClusterEntity clsCluster2 = new InlongClusterEntity();
+        clsCluster2.setName(CLS_CLUSTER_NEW);
+        clsCluster2.setType("cls");
+        clsCluster2.setClusterTags(CLUSTER_TAG_NEW);
+        
clsCluster2.setExtParams(String.format("{\"%s\":\"%s\",\"%s\":\"%s\",\"%s\":\"%s\",\"%s\":\"%s\"}",
+                DataProxyConfigRepository.KEY_SINK_NAME, CLS_DATA_NODE_NEW,
+                DataProxyConfigRepository.KEY_SORT_TASK_NAME, 
CLS_DATA_NODE_NEW,
+                DataProxyConfigRepository.KEY_DATA_NODE_NAME, 
CLS_DATA_NODE_NEW,
+                DataProxyConfigRepository.KEY_SORT_CONSUEMER_GROUP, 
CLS_DATA_NODE_NEW));
+        clusters.add(clsCluster2);
+        InlongClusterEntityMapper mapper = 
PowerMockito.mock(InlongClusterEntityMapper.class);
+        
PowerMockito.when(mapper.selectByCondition(any())).thenReturn(clusters);
+        return mapper;
+    }
+
+    private StreamSinkEntityMapper mockStreamSinkMapper() {
+        final List<StreamSinkEntity> streamSinks = new ArrayList<>();
+        StreamSinkEntity clsSink = new StreamSinkEntity();
+        clsSink.setInlongGroupId(INLONG_GROUP_ID);
+        clsSink.setInlongStreamId(INLONG_STREAM_ID);
+        clsSink.setInlongClusterName(CLS_CLUSTER_OLD);
+        clsSink.setSinkName(CLS_DATA_NODE_OLD);
+        clsSink.setSortTaskName(CLS_DATA_NODE_OLD);
+        clsSink.setDataNodeName(CLS_DATA_NODE_OLD);
+        clsSink.setSortConsumerGroup(CLS_DATA_NODE_OLD);
+        clsSink.setSinkType("cls");
+        streamSinks.add(clsSink);
+        StreamSinkEntity kafkaSink = new StreamSinkEntity();
+        kafkaSink.setInlongGroupId(INLONG_GROUP_ID);
+        kafkaSink.setInlongStreamId(INLONG_STREAM_ID);
+        kafkaSink.setInlongClusterName("kafka_1");
+        kafkaSink.setSinkName("sid_" + kafkaSink.getInlongClusterName());
+        kafkaSink.setSortTaskName("sid_" + kafkaSink.getInlongClusterName());
+        kafkaSink.setDataNodeName("sid_" + kafkaSink.getInlongClusterName());
+        kafkaSink.setSortConsumerGroup("sid_" + 
kafkaSink.getInlongClusterName());
+        kafkaSink.setSinkType("kafka");
+        streamSinks.add(kafkaSink);
+        StreamSinkEntityMapper mapper = 
PowerMockito.mock(StreamSinkEntityMapper.class);
+        
PowerMockito.when(mapper.selectByCondition(any())).thenReturn(streamSinks);
+        PowerMockito.when(mapper.insert(any())).thenReturn(1);
+        PowerMockito.when(mapper.deleteByPrimaryKey(anyInt())).thenReturn(1);
+        return mapper;
+    }
+
+    @Test
+    public void testRemoveBackupClusterTag() {
+        final DataProxyConfigRepository repository = new 
DataProxyConfigRepository();
+        // group
+        InlongGroupEntityMapper groupMapper = this.mockGroupMapper();
+        InlongGroupEntity groupEntity = 
groupMapper.selectByGroupId(INLONG_GROUP_ID);
+        groupEntity.setInlongClusterTag(CLUSTER_TAG_NEW);
+        groupEntity.setMqResource(TOPIC_NEW);
+        groupEntity.setExtParams(
+                String.format("{\"%s\":\"%s\",\"%s\":\"%s\"}", 
DataProxyConfigRepository.KEY_BACKUP_CLUSTER_TAG,
+                        CLUSTER_TAG_OLD, 
DataProxyConfigRepository.KEY_BACKUP_TOPIC, TOPIC_OLD));
+        repository.setInlongGroupMapper(groupMapper);
+        // cluster
+        repository.setClusterMapper(this.mockClusterMapper());
+        // stream sink
+        StreamSinkEntityMapper streamSinkMapper = this.mockStreamSinkMapper();
+        SinkPageRequest sinkPageRequest = new SinkPageRequest();
+        final List<StreamSinkEntity> streamSinks = 
streamSinkMapper.selectByCondition(sinkPageRequest);
+        StreamSinkEntity clsSink = new StreamSinkEntity();
+        clsSink.setInlongGroupId(INLONG_GROUP_ID);
+        clsSink.setInlongStreamId(INLONG_STREAM_ID);
+        clsSink.setInlongClusterName(CLS_CLUSTER_NEW);
+        clsSink.setSinkName(CLS_DATA_NODE_NEW);
+        clsSink.setSortTaskName(CLS_DATA_NODE_NEW);
+        clsSink.setDataNodeName(CLS_DATA_NODE_NEW);
+        clsSink.setSortConsumerGroup(CLS_DATA_NODE_NEW);
+        clsSink.setSinkType("cls");
+        streamSinks.add(clsSink);
+        repository.setStreamSinkMapper(streamSinkMapper);
+        // test
+        String inlongGroupId = 
repository.removeBackupClusterTag(INLONG_GROUP_ID);
+        assertEquals(INLONG_GROUP_ID, inlongGroupId);
+    }
+}
diff --git 
a/inlong-manager/manager-web/src/main/java/org/apache/inlong/manager/web/controller/openapi/DataProxyController.java
 
b/inlong-manager/manager-web/src/main/java/org/apache/inlong/manager/web/controller/openapi/DataProxyController.java
index 518dc50d3..a3b7e1284 100644
--- 
a/inlong-manager/manager-web/src/main/java/org/apache/inlong/manager/web/controller/openapi/DataProxyController.java
+++ 
b/inlong-manager/manager-web/src/main/java/org/apache/inlong/manager/web/controller/openapi/DataProxyController.java
@@ -17,27 +17,30 @@
 
 package org.apache.inlong.manager.web.controller.openapi;
 
-import io.swagger.annotations.Api;
-import io.swagger.annotations.ApiImplicitParam;
-import io.swagger.annotations.ApiImplicitParams;
-import io.swagger.annotations.ApiOperation;
 import org.apache.commons.collections.CollectionUtils;
 import org.apache.inlong.common.pojo.dataproxy.DataProxyConfig;
 import org.apache.inlong.manager.common.beans.Response;
 import org.apache.inlong.manager.common.pojo.cluster.ClusterPageRequest;
 import org.apache.inlong.manager.common.pojo.dataproxy.DataProxyNodeInfo;
 import org.apache.inlong.manager.service.cluster.InlongClusterService;
+import org.apache.inlong.manager.service.repository.DataProxyConfigRepository;
 import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.context.annotation.Lazy;
 import org.springframework.web.bind.annotation.GetMapping;
 import org.springframework.web.bind.annotation.PostMapping;
 import org.springframework.web.bind.annotation.RequestBody;
 import org.springframework.web.bind.annotation.RequestMapping;
+import org.springframework.web.bind.annotation.RequestMethod;
 import org.springframework.web.bind.annotation.RequestParam;
 import org.springframework.web.bind.annotation.RestController;
 
 import java.util.List;
 
+import io.swagger.annotations.Api;
+import io.swagger.annotations.ApiImplicitParam;
+import io.swagger.annotations.ApiImplicitParams;
+import io.swagger.annotations.ApiOperation;
+
 /**
  * Data proxy controller.
  */
@@ -49,6 +52,8 @@ public class DataProxyController {
     @Autowired
     @Lazy
     private InlongClusterService clusterService;
+    @Autowired
+    private DataProxyConfigRepository dataProxyConfigRepository;
 
     @PostMapping(value = "/dataproxy/getIpList")
     @ApiOperation(value = "Get data proxy ip list by cluster name and tag")
@@ -82,4 +87,25 @@ public class DataProxyController {
         return clusterService.getAllConfig(clusterName, md5);
     }
 
+    /**
+     * changeClusterTag
+     */
+    @RequestMapping(value = "/changeClusterTag", method = RequestMethod.PUT)
+    @ApiOperation(value = "Change cluster tag and topic of a inlong group id.")
+    public Response<String> changeClusterTag(@RequestParam String 
inlongGroupId, @RequestParam String clusterTag,
+            @RequestParam String topic) {
+        String result = 
dataProxyConfigRepository.changeClusterTag(inlongGroupId, clusterTag, topic);
+        return Response.success(result);
+    }
+
+    /**
+     * removeBackupClusterTag
+     */
+    @RequestMapping(value = "/removeBackupClusterTag", method = 
RequestMethod.PUT)
+    @ApiOperation(value = "remove backup cluster tag and topic of a inlong 
group id.")
+    public Response<String> removeBackupClusterTag(@RequestParam String 
inlongGroupId) {
+        String result = 
dataProxyConfigRepository.removeBackupClusterTag(inlongGroupId);
+        return Response.success(result);
+    }
+
 }

Reply via email to