This is an automated email from the ASF dual-hosted git repository.
dockerzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git
The following commit(s) were added to refs/heads/master by this push:
new 2573798d5 [INLONG-4937][Manager] One inlong group id can transform
data across two cluster tags (#4938)
2573798d5 is described below
commit 2573798d52c41a7ee4d7df0c948d5098743898b6
Author: 卢春亮 <[email protected]>
AuthorDate: Wed Jul 20 10:21:42 2022 +0800
[INLONG-4937][Manager] One inlong group id can transform data across two
cluster tags (#4938)
---
inlong-manager/manager-service/pom.xml | 40 ++
.../repository/DataProxyConfigRepository.java | 428 ++++++++++++++++++---
.../repository/DataProxyConfigRepositoryTest.java | 189 +++++++++
.../controller/openapi/DataProxyController.java | 34 +-
4 files changed, 642 insertions(+), 49 deletions(-)
diff --git a/inlong-manager/manager-service/pom.xml
b/inlong-manager/manager-service/pom.xml
index 8acf8b440..da4c8739f 100644
--- a/inlong-manager/manager-service/pom.xml
+++ b/inlong-manager/manager-service/pom.xml
@@ -506,6 +506,46 @@
<artifactId>junit-jupiter</artifactId>
<scope>test</scope>
</dependency>
+
+ <dependency>
+ <groupId>org.powermock</groupId>
+ <artifactId>powermock-module-junit4</artifactId>
+ <scope>test</scope>
+ <exclusions>
+ <exclusion>
+ <groupId>org.objenesis</groupId>
+ <artifactId>objenesis</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>net.bytebuddy</groupId>
+ <artifactId>byte-buddy-agent</artifactId>
+ </exclusion>
+ </exclusions>
+ </dependency>
+ <dependency>
+ <groupId>org.powermock</groupId>
+ <artifactId>powermock-module-testng</artifactId>
+ <scope>test</scope>
+ <exclusions>
+ <exclusion>
+ <groupId>org.testng</groupId>
+ <artifactId>testng</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>net.bytebuddy</groupId>
+ <artifactId>byte-buddy-agent</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>org.javassist</groupId>
+ <artifactId>javassist</artifactId>
+ </exclusion>
+ </exclusions>
+ </dependency>
+ <dependency>
+ <groupId>org.powermock</groupId>
+ <artifactId>powermock-api-mockito2</artifactId>
+ <scope>test</scope>
+ </dependency>
</dependencies>
</project>
diff --git
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/repository/DataProxyConfigRepository.java
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/repository/DataProxyConfigRepository.java
index 7c0a4feb5..513bec16c 100644
---
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/repository/DataProxyConfigRepository.java
+++
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/repository/DataProxyConfigRepository.java
@@ -19,6 +19,9 @@ package org.apache.inlong.manager.service.repository;
import com.google.common.base.Splitter;
import com.google.gson.Gson;
+import com.google.gson.JsonObject;
+
+import org.apache.commons.beanutils.BeanUtils;
import org.apache.commons.codec.digest.DigestUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.inlong.common.pojo.dataproxy.CacheClusterObject;
@@ -29,11 +32,21 @@ import org.apache.inlong.common.pojo.dataproxy.IRepository;
import org.apache.inlong.common.pojo.dataproxy.InLongIdObject;
import org.apache.inlong.common.pojo.dataproxy.ProxyClusterObject;
import org.apache.inlong.common.pojo.dataproxy.RepositoryTimerTask;
+import org.apache.inlong.manager.common.enums.ErrorCodeEnum;
+import org.apache.inlong.manager.common.exceptions.BusinessException;
+import org.apache.inlong.manager.common.pojo.cluster.ClusterPageRequest;
import org.apache.inlong.manager.common.pojo.dataproxy.CacheCluster;
import org.apache.inlong.manager.common.pojo.dataproxy.InlongGroupId;
import org.apache.inlong.manager.common.pojo.dataproxy.InlongStreamId;
import org.apache.inlong.manager.common.pojo.dataproxy.ProxyCluster;
+import org.apache.inlong.manager.common.pojo.sink.SinkPageRequest;
+import org.apache.inlong.manager.dao.entity.InlongClusterEntity;
+import org.apache.inlong.manager.dao.entity.InlongGroupEntity;
+import org.apache.inlong.manager.dao.entity.StreamSinkEntity;
import org.apache.inlong.manager.dao.mapper.ClusterSetMapper;
+import org.apache.inlong.manager.dao.mapper.InlongClusterEntityMapper;
+import org.apache.inlong.manager.dao.mapper.InlongGroupEntityMapper;
+import org.apache.inlong.manager.dao.mapper.StreamSinkEntityMapper;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
@@ -41,7 +54,8 @@ import org.springframework.context.annotation.Lazy;
import org.springframework.stereotype.Repository;
import org.springframework.transaction.annotation.Transactional;
-import javax.annotation.PostConstruct;
+import java.lang.reflect.InvocationTargetException;
+import java.sql.Date;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
@@ -51,6 +65,8 @@ import java.util.Timer;
import java.util.TimerTask;
import java.util.concurrent.ConcurrentHashMap;
+import javax.annotation.PostConstruct;
+
/**
* DataProxyConfigRepository
*/
@@ -58,11 +74,19 @@ import java.util.concurrent.ConcurrentHashMap;
@Repository(value = "dataProxyConfigRepository")
public class DataProxyConfigRepository implements IRepository {
+ public static final Logger LOGGER =
LoggerFactory.getLogger(DataProxyConfigRepository.class);
+
+ public static final String KEY_BACKUP_CLUSTER_TAG = "backup_cluster_tag";
+ public static final String KEY_BACKUP_TOPIC = "backup_topic";
+ public static final String KEY_SORT_TASK_NAME = "defaultSortTaskName";
+ public static final String KEY_DATA_NODE_NAME = "defaultDataNodeName";
+ public static final String KEY_SORT_CONSUEMER_GROUP =
"defaultSortConsumerGroup";
+ public static final String KEY_SINK_NAME = "defaultSinkName";
+
public static final Splitter.MapSplitter MAP_SPLITTER =
Splitter.on(SEPARATOR).trimResults()
.withKeyValueSeparator(KEY_VALUE_SEPARATOR);
public static final String CACHE_CLUSTER_PRODUCER_TAG = "producer";
public static final String CACHE_CLUSTER_CONSUMER_TAG = "consumer";
- private static final Logger LOGGER =
LoggerFactory.getLogger(DataProxyConfigRepository.class);
private static final Gson gson = new Gson();
// key: proxyClusterName, value: jsonString
@@ -74,6 +98,12 @@ public class DataProxyConfigRepository implements
IRepository {
@Autowired
private ClusterSetMapper clusterSetMapper;
+ @Autowired
+ private InlongClusterEntityMapper clusterMapper;
+ @Autowired
+ private InlongGroupEntityMapper inlongGroupMapper;
+ @Autowired
+ private StreamSinkEntityMapper streamSinkMapper;
@PostConstruct
public void initialize() {
@@ -87,6 +117,70 @@ public class DataProxyConfigRepository implements
IRepository {
}
}
+ /**
+ * get clusterSetMapper
+ * @return the clusterSetMapper
+ */
+ public ClusterSetMapper getClusterSetMapper() {
+ return clusterSetMapper;
+ }
+
+ /**
+ * set clusterSetMapper
+ * @param clusterSetMapper the clusterSetMapper to set
+ */
+ public void setClusterSetMapper(ClusterSetMapper clusterSetMapper) {
+ this.clusterSetMapper = clusterSetMapper;
+ }
+
+ /**
+ * get clusterMapper
+ * @return the clusterMapper
+ */
+ public InlongClusterEntityMapper getClusterMapper() {
+ return clusterMapper;
+ }
+
+ /**
+ * set clusterMapper
+ * @param clusterMapper the clusterMapper to set
+ */
+ public void setClusterMapper(InlongClusterEntityMapper clusterMapper) {
+ this.clusterMapper = clusterMapper;
+ }
+
+ /**
+ * get inlongGroupMapper
+ * @return the inlongGroupMapper
+ */
+ public InlongGroupEntityMapper getInlongGroupMapper() {
+ return inlongGroupMapper;
+ }
+
+ /**
+ * set inlongGroupMapper
+ * @param inlongGroupMapper the inlongGroupMapper to set
+ */
+ public void setInlongGroupMapper(InlongGroupEntityMapper
inlongGroupMapper) {
+ this.inlongGroupMapper = inlongGroupMapper;
+ }
+
+ /**
+ * get streamSinkMapper
+ * @return the streamSinkMapper
+ */
+ public StreamSinkEntityMapper getStreamSinkMapper() {
+ return streamSinkMapper;
+ }
+
+ /**
+ * set streamSinkMapper
+ * @param streamSinkMapper the streamSinkMapper to set
+ */
+ public void setStreamSinkMapper(StreamSinkEntityMapper streamSinkMapper) {
+ this.streamSinkMapper = streamSinkMapper;
+ }
+
/**
* reload
*/
@@ -152,7 +246,6 @@ public class DataProxyConfigRepository implements
IRepository {
/**
* reloadInlongId
*/
- @SuppressWarnings("unchecked")
private Map<String, List<InLongIdObject>> reloadInlongId() {
// parse group
Map<String, InlongGroupId> groupIdMap = new HashMap<>();
@@ -165,53 +258,108 @@ public class DataProxyConfigRepository implements
IRepository {
if (groupId == null) {
continue;
}
- // choose topic
- String groupTopic = groupIdObj.getTopic();
- String streamTopic = streamIdObj.getTopic();
- String finalTopic = null;
- if (StringUtils.isEmpty(groupTopic)) {
- // both empty then ignore
- if (StringUtils.isEmpty(streamTopic)) {
- continue;
- } else {
- finalTopic = streamTopic;
- }
+ Map<String, String> groupParams =
this.getExtParams(groupIdObj.getExtParams());
+ Map<String, String> streamParams =
this.getExtParams(streamIdObj.getExtParams());
+ this.parseMasterTopic(groupIdObj, streamIdObj, groupParams,
streamParams, inlongIdMap);
+ this.parseBackupTopic(groupIdObj, streamIdObj, groupParams,
streamParams, inlongIdMap);
+ }
+ return inlongIdMap;
+ }
+
+ /**
+ * getExtParams
+ */
+ @SuppressWarnings("unchecked")
+ private Map<String, String> getExtParams(String extParams) {
+ // parse extparams
+ if (!StringUtils.isEmpty(extParams)) {
+ try {
+ Map<String, String> groupParams = gson.fromJson(extParams,
HashMap.class);
+ return groupParams;
+ } catch (Exception e) {
+ LOGGER.error("Fail to parse ext error:{},params:{}",
e.getMessage(), extParams, e);
+ }
+ }
+ return new HashMap<>();
+ }
+
+ /**
+ * parseMasterTopic
+ */
+ private void parseMasterTopic(InlongGroupId groupIdObj, InlongStreamId
streamIdObj,
+ Map<String, String> groupParams, Map<String, String> streamParams,
+ Map<String, List<InLongIdObject>> inlongIdMap) {
+ // choose topic
+ String groupTopic = groupIdObj.getTopic();
+ String streamTopic = streamIdObj.getTopic();
+ String finalTopic = null;
+ if (StringUtils.isEmpty(groupTopic)) {
+ // both empty then ignore
+ if (StringUtils.isEmpty(streamTopic)) {
+ return;
} else {
- if (StringUtils.isEmpty(streamTopic)) {
- finalTopic = groupTopic;
- } else {
- // Pulsar: namespace+topic
- finalTopic = groupTopic + "/" + streamTopic;
- }
+ finalTopic = streamTopic;
}
- // concat id
- InLongIdObject obj = new InLongIdObject();
- String inlongId = groupId + "." + streamIdObj.getInlongStreamId();
- obj.setInlongId(inlongId);
- obj.setTopic(finalTopic);
- Map<String, String> params = new HashMap<>();
- obj.setParams(params);
- // parse group extparams
- if (!StringUtils.isEmpty(groupIdObj.getExtParams())) {
- try {
- Map<String, String> groupParams =
gson.fromJson(groupIdObj.getExtParams(), Map.class);
- params.putAll(groupParams);
- } catch (Exception e) {
- LOGGER.error(e.getMessage(), e);
- }
+ } else {
+ if (StringUtils.isEmpty(streamTopic)) {
+ finalTopic = groupTopic;
+ } else {
+ // Pulsar: namespace+topic
+ finalTopic = groupTopic + "/" + streamTopic;
}
- // parse stream extparams
- if (!StringUtils.isEmpty(streamIdObj.getExtParams())) {
- try {
- Map<String, String> streamParams =
gson.fromJson(streamIdObj.getExtParams(), Map.class);
- params.putAll(streamParams);
- } catch (Exception e) {
- LOGGER.error(e.getMessage(), e);
- }
+ }
+ // concat id
+ InLongIdObject obj = new InLongIdObject();
+ String inlongId = streamIdObj.getInlongGroupId() + "." +
streamIdObj.getInlongStreamId();
+ obj.setInlongId(inlongId);
+ obj.setTopic(finalTopic);
+ Map<String, String> params = new HashMap<>();
+ params.putAll(groupParams);
+ params.putAll(streamParams);
+ obj.setParams(params);
+ inlongIdMap.computeIfAbsent(groupIdObj.getClusterTag(), k -> new
ArrayList<>()).add(obj);
+ }
+
+ /**
+ * parseBackupTopic
+ */
+ private void parseBackupTopic(InlongGroupId groupIdObj, InlongStreamId
streamIdObj,
+ Map<String, String> groupParams, Map<String, String> streamParams,
+ Map<String, List<InLongIdObject>> inlongIdMap) {
+ Map<String, String> params = new HashMap<>();
+ params.putAll(groupParams);
+ params.putAll(streamParams);
+ // find backup cluster tag
+ String clusterTag = params.get(KEY_BACKUP_CLUSTER_TAG);
+ if (StringUtils.isEmpty(clusterTag)) {
+ return;
+ }
+ // find backup topic
+ String groupTopic = groupParams.get(KEY_BACKUP_TOPIC);
+ String streamTopic = streamParams.get(KEY_BACKUP_TOPIC);
+ String finalTopic = null;
+ if (StringUtils.isEmpty(groupTopic)) {
+ // both empty then ignore
+ if (StringUtils.isEmpty(streamTopic)) {
+ return;
+ } else {
+ finalTopic = streamTopic;
+ }
+ } else {
+ if (StringUtils.isEmpty(streamTopic)) {
+ finalTopic = groupTopic;
+ } else {
+ // Pulsar: namespace+topic
+ finalTopic = groupTopic + "/" + streamTopic;
}
- inlongIdMap.computeIfAbsent(groupIdObj.getClusterTag(), k -> new
ArrayList<>()).add(obj);
}
- return inlongIdMap;
+ // concat id
+ InLongIdObject obj = new InLongIdObject();
+ String inlongId = streamIdObj.getInlongGroupId() + "." +
streamIdObj.getInlongStreamId();
+ obj.setInlongId(inlongId);
+ obj.setTopic(finalTopic);
+ obj.setParams(params);
+ inlongIdMap.computeIfAbsent(clusterTag, k -> new
ArrayList<>()).add(obj);
}
/**
@@ -315,4 +463,194 @@ public class DataProxyConfigRepository implements
IRepository {
public String getProxyConfigJson(String clusterName) {
return this.proxyConfigJson.get(clusterName);
}
+
+ /**
+ * changeClusterTag
+ */
+ public String changeClusterTag(String inlongGroupId, String clusterTag,
+ String topic) {
+ try {
+ // select
+ InlongGroupEntity oldGroup =
inlongGroupMapper.selectByGroupId(inlongGroupId);
+ if (oldGroup == null) {
+ throw new BusinessException(ErrorCodeEnum.GROUP_NOT_FOUND);
+ }
+ String oldClusterTag = oldGroup.getInlongClusterTag();
+ if (StringUtils.equals(oldClusterTag, clusterTag)) {
+ return "Cluster tag is same.";
+ }
+ // prepare group
+ final InlongGroupEntity newGroup =
this.prepareClusterTagGroup(oldGroup, clusterTag, topic);
+ // load cluster
+ Map<String, InlongClusterEntity> clusterMap = new HashMap<>();
+ ClusterPageRequest clusterRequest = new ClusterPageRequest();
+ List<InlongClusterEntity> clusters =
clusterMapper.selectByCondition(clusterRequest);
+ clusters.forEach((v) -> {
+ clusterMap.put(v.getName(), v);
+ });
+ // prepare stream sink
+ SinkPageRequest request = new SinkPageRequest();
+ request.setInlongGroupId(inlongGroupId);
+ List<StreamSinkEntity> streamSinks =
streamSinkMapper.selectByCondition(request);
+ List<StreamSinkEntity> newStreamSinks = new ArrayList<>();
+ for (StreamSinkEntity streamSink : streamSinks) {
+ String clusterName = streamSink.getInlongClusterName();
+ InlongClusterEntity cluster = clusterMap.get(clusterName);
+ if (cluster == null) {
+ continue;
+ }
+ if (!StringUtils.equals(oldClusterTag,
cluster.getClusterTags())) {
+ continue;
+ }
+ String clusterType = cluster.getType();
+ // find the cluster of same cluster tag and sink type, and add
new stream sink
+ StreamSinkEntity newStreamSink =
this.createNewStreamSink(clusters, clusterType, clusterTag,
+ streamSink);
+ if (newStreamSink != null) {
+ newStreamSinks.add(newStreamSink);
+ }
+ }
+ // update
+ newStreamSinks.forEach((v) -> {
+ streamSinkMapper.insert(v);
+ });
+ inlongGroupMapper.updateByIdentifierSelective(newGroup);
+ return inlongGroupId;
+ } catch (Exception e) {
+ LOGGER.error(e.getMessage(), e);
+ return e.getMessage();
+ }
+ }
+
+ /**
+ * createNewStreamSink
+ */
+ private StreamSinkEntity createNewStreamSink(List<InlongClusterEntity>
clusters, String clusterType,
+ String clusterTag, StreamSinkEntity srcStreamSink) {
+ for (InlongClusterEntity v : clusters) {
+ if (StringUtils.equals(clusterType, v.getType())
+ && StringUtils.equals(clusterTag, v.getClusterTags())) {
+ String newExtParams = v.getExtParams();
+ Gson gson = new Gson();
+ JsonObject extParams = gson.fromJson(newExtParams,
JsonObject.class);
+ if (extParams.has(KEY_SINK_NAME) &&
extParams.has(KEY_SORT_TASK_NAME)
+ && extParams.has(KEY_DATA_NODE_NAME) &&
extParams.has(KEY_SORT_CONSUEMER_GROUP)) {
+ final String sinkName =
extParams.get(KEY_SINK_NAME).getAsString();
+ final String sortTaskName =
extParams.get(KEY_SORT_TASK_NAME).getAsString();
+ final String dataNodeName =
extParams.get(KEY_DATA_NODE_NAME).getAsString();
+ final String sortConsumerGroup =
extParams.get(KEY_SORT_CONSUEMER_GROUP).getAsString();
+ StreamSinkEntity newStreamSink =
copyStreamSink(srcStreamSink);
+ newStreamSink.setInlongClusterName(v.getName());
+ newStreamSink.setSinkName(sinkName);
+ newStreamSink.setSortTaskName(sortTaskName);
+ newStreamSink.setDataNodeName(dataNodeName);
+ newStreamSink.setSortConsumerGroup(sortConsumerGroup);
+ return newStreamSink;
+ }
+ return null;
+ }
+ }
+ return null;
+ }
+
+ /**
+ * copyStreamSink
+ */
+ private StreamSinkEntity copyStreamSink(StreamSinkEntity streamSink) {
+ try {
+ StreamSinkEntity streamSinkDest = new StreamSinkEntity();
+ BeanUtils.copyProperties(streamSinkDest, streamSink);
+ streamSinkDest.setId(null);
+ streamSinkDest.setModifyTime(new Date(System.currentTimeMillis()));
+ return streamSinkDest;
+ } catch (Exception e) {
+ LOGGER.error("Fail to copy stream sink:{}", e.getMessage(), e);
+ return null;
+ }
+ }
+
+ /**
+ * prepareClusterTagGroup
+ */
+ private InlongGroupEntity prepareClusterTagGroup(InlongGroupEntity
oldGroup, String clusterTag, String topic)
+ throws IllegalAccessException, InvocationTargetException {
+ // parse ext_params
+ String extParams = oldGroup.getExtParams();
+ if (StringUtils.isEmpty(extParams)) {
+ extParams = "{}";
+ }
+ // parse json
+ Gson gson = new Gson();
+ JsonObject extParamsObj = gson.fromJson(extParams, JsonObject.class);
+ // change cluster tag
+ extParamsObj.addProperty(KEY_BACKUP_CLUSTER_TAG,
oldGroup.getInlongClusterTag());
+ extParamsObj.addProperty(KEY_BACKUP_TOPIC, oldGroup.getMqResource());
+ // copy properties
+ InlongGroupEntity newGroup = new InlongGroupEntity();
+ BeanUtils.copyProperties(newGroup, oldGroup);
+ newGroup.setId(null);
+ // change properties
+ newGroup.setInlongClusterTag(clusterTag);
+ newGroup.setMqResource(topic);
+ String newExtParams = extParamsObj.toString();
+ newGroup.setExtParams(newExtParams);
+ return newGroup;
+ }
+
+ /**
+ * removeBackupClusterTag
+ */
+ public String removeBackupClusterTag(String inlongGroupId) {
+ // select
+ InlongGroupEntity oldGroup =
inlongGroupMapper.selectByGroupId(inlongGroupId);
+ if (oldGroup == null) {
+ throw new BusinessException(ErrorCodeEnum.GROUP_NOT_FOUND);
+ }
+ // parse ext_params
+ String extParams = oldGroup.getExtParams();
+ if (StringUtils.isEmpty(extParams)) {
+ return inlongGroupId;
+ }
+ // parse json
+ Gson gson = new Gson();
+ JsonObject extParamsObj = gson.fromJson(extParams, JsonObject.class);
+ if (!extParamsObj.has(KEY_BACKUP_CLUSTER_TAG)) {
+ return inlongGroupId;
+ }
+ final String oldClusterTag =
extParamsObj.get(KEY_BACKUP_CLUSTER_TAG).getAsString();
+ extParamsObj.remove(KEY_BACKUP_CLUSTER_TAG);
+ extParamsObj.remove(KEY_BACKUP_TOPIC);
+ String newExtParams = extParamsObj.toString();
+ oldGroup.setExtParams(newExtParams);
+ // update group
+ inlongGroupMapper.updateByIdentifierSelective(oldGroup);
+
+ // load cluster
+ Map<String, InlongClusterEntity> clusterMap = new HashMap<>();
+ ClusterPageRequest clusterRequest = new ClusterPageRequest();
+ List<InlongClusterEntity> clusters =
clusterMapper.selectByCondition(clusterRequest);
+ clusters.forEach((v) -> {
+ clusterMap.put(v.getName(), v);
+ });
+ // prepare stream sink
+ SinkPageRequest request = new SinkPageRequest();
+ request.setInlongGroupId(inlongGroupId);
+ List<StreamSinkEntity> streamSinks =
streamSinkMapper.selectByCondition(request);
+ List<StreamSinkEntity> deleteStreamSinks = new ArrayList<>();
+ for (StreamSinkEntity streamSink : streamSinks) {
+ String clusterName = streamSink.getInlongClusterName();
+ InlongClusterEntity cluster = clusterMap.get(clusterName);
+ if (cluster == null) {
+ continue;
+ }
+ if (StringUtils.equals(oldClusterTag, cluster.getClusterTags())) {
+ deleteStreamSinks.add(streamSink);
+ }
+ }
+ // delete old stream sink
+ deleteStreamSinks.forEach((v) -> {
+ streamSinkMapper.deleteByPrimaryKey(v.getId());
+ });
+ return inlongGroupId;
+ }
}
diff --git
a/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/repository/DataProxyConfigRepositoryTest.java
b/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/repository/DataProxyConfigRepositoryTest.java
new file mode 100644
index 000000000..ae9ff0114
--- /dev/null
+++
b/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/repository/DataProxyConfigRepositoryTest.java
@@ -0,0 +1,189 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.manager.service.repository;
+
+import org.apache.inlong.manager.common.pojo.sink.SinkPageRequest;
+import org.apache.inlong.manager.dao.entity.InlongClusterEntity;
+import org.apache.inlong.manager.dao.entity.InlongGroupEntity;
+import org.apache.inlong.manager.dao.entity.StreamSinkEntity;
+import org.apache.inlong.manager.dao.mapper.InlongClusterEntityMapper;
+import org.apache.inlong.manager.dao.mapper.InlongGroupEntityMapper;
+import org.apache.inlong.manager.dao.mapper.StreamSinkEntityMapper;
+import org.junit.jupiter.api.Test;
+import org.junit.runner.RunWith;
+import org.powermock.api.mockito.PowerMockito;
+import org.powermock.core.classloader.annotations.PrepareForTest;
+import org.powermock.modules.junit4.PowerMockRunner;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.anyInt;
+import static org.mockito.ArgumentMatchers.anyString;
+
+/**
+ * DataProxyConfigRepositoryTest
+ *
+ */
+@RunWith(PowerMockRunner.class)
+@PrepareForTest({InlongGroupEntityMapper.class, StreamSinkEntityMapper.class,
InlongClusterEntityMapper.class})
+public class DataProxyConfigRepositoryTest {
+
+ private static final String INLONG_GROUP_ID = "03a00000026";
+ private static final String INLONG_STREAM_ID = "1";
+ private static final String CLUSTER_TAG_NEW = "ct_new";
+ private static final String CLUSTER_TAG_OLD = "ct_old";
+ private static final String TOPIC_NEW = "t_03a00000026";
+ private static final String TOPIC_OLD = "t_03a00000026";
+ private static final String CLS_CLUSTER_NEW = "cls_new";
+ private static final String CLS_CLUSTER_OLD = "cls_old";
+ private static final String CLS_DATA_NODE_NEW = "sid_cls_new";
+ private static final String CLS_DATA_NODE_OLD = "sid_cls_old";
+
+ @Test
+ public void testChangeClusterTag() {
+ DataProxyConfigRepository repository = new DataProxyConfigRepository();
+ repository.setInlongGroupMapper(this.mockGroupMapper());
+ repository.setClusterMapper(this.mockClusterMapper());
+ repository.setStreamSinkMapper(this.mockStreamSinkMapper());
+ String inlongGroupId = repository.changeClusterTag(INLONG_GROUP_ID,
CLUSTER_TAG_NEW, TOPIC_NEW);
+ assertEquals(inlongGroupId, INLONG_GROUP_ID);
+ }
+
+ private InlongGroupEntityMapper mockGroupMapper() {
+ InlongGroupEntity entity = new InlongGroupEntity();
+ entity.setInlongGroupId(INLONG_GROUP_ID);
+ entity.setInlongClusterTag(CLUSTER_TAG_OLD);
+ entity.setMqResource(TOPIC_OLD);
+ InlongGroupEntityMapper mapper =
PowerMockito.mock(InlongGroupEntityMapper.class);
+
PowerMockito.when(mapper.selectByGroupId(anyString())).thenReturn(entity);
+
PowerMockito.when(mapper.updateByIdentifierSelective(any())).thenReturn(1);
+ return mapper;
+ }
+
+ private InlongClusterEntityMapper mockClusterMapper() {
+ final List<InlongClusterEntity> clusters = new ArrayList<>();
+ InlongClusterEntity dataProxyCluster = new InlongClusterEntity();
+ dataProxyCluster.setName("dp_1");
+ dataProxyCluster.setType("DATA_PROXY");
+ dataProxyCluster.setClusterTags(CLUSTER_TAG_OLD);
+ dataProxyCluster.setExtTag("sz=true");
+ dataProxyCluster.setExtParams("{}");
+ clusters.add(dataProxyCluster);
+ InlongClusterEntity pulsarCluster = new InlongClusterEntity();
+ pulsarCluster.setName("pc_1");
+ pulsarCluster.setType("PULSAR");
+ pulsarCluster.setClusterTags(CLUSTER_TAG_OLD);
+ pulsarCluster.setExtTag("sz=true&producer=true&consumer=true");
+ pulsarCluster.setExtParams("{}");
+ clusters.add(pulsarCluster);
+ InlongClusterEntity clsCluster = new InlongClusterEntity();
+ clsCluster.setName(CLS_CLUSTER_OLD);
+ clsCluster.setType("cls");
+ clsCluster.setClusterTags(CLUSTER_TAG_OLD);
+
clsCluster.setExtParams(String.format("{\"%s\":\"%s\",\"%s\":\"%s\",\"%s\":\"%s\",\"%s\":\"%s\"}",
+ DataProxyConfigRepository.KEY_SINK_NAME, CLS_DATA_NODE_OLD,
+ DataProxyConfigRepository.KEY_SORT_TASK_NAME,
CLS_DATA_NODE_OLD,
+ DataProxyConfigRepository.KEY_DATA_NODE_NAME,
CLS_DATA_NODE_OLD,
+ DataProxyConfigRepository.KEY_SORT_CONSUEMER_GROUP,
CLS_DATA_NODE_OLD));
+ clusters.add(clsCluster);
+ InlongClusterEntity kafkaCluster = new InlongClusterEntity();
+ kafkaCluster.setName("kafka_1");
+ kafkaCluster.setType("kafka");
+ kafkaCluster.setExtParams("{}");
+ clusters.add(kafkaCluster);
+ InlongClusterEntity clsCluster2 = new InlongClusterEntity();
+ clsCluster2.setName(CLS_CLUSTER_NEW);
+ clsCluster2.setType("cls");
+ clsCluster2.setClusterTags(CLUSTER_TAG_NEW);
+
clsCluster2.setExtParams(String.format("{\"%s\":\"%s\",\"%s\":\"%s\",\"%s\":\"%s\",\"%s\":\"%s\"}",
+ DataProxyConfigRepository.KEY_SINK_NAME, CLS_DATA_NODE_NEW,
+ DataProxyConfigRepository.KEY_SORT_TASK_NAME,
CLS_DATA_NODE_NEW,
+ DataProxyConfigRepository.KEY_DATA_NODE_NAME,
CLS_DATA_NODE_NEW,
+ DataProxyConfigRepository.KEY_SORT_CONSUEMER_GROUP,
CLS_DATA_NODE_NEW));
+ clusters.add(clsCluster2);
+ InlongClusterEntityMapper mapper =
PowerMockito.mock(InlongClusterEntityMapper.class);
+
PowerMockito.when(mapper.selectByCondition(any())).thenReturn(clusters);
+ return mapper;
+ }
+
+ private StreamSinkEntityMapper mockStreamSinkMapper() {
+ final List<StreamSinkEntity> streamSinks = new ArrayList<>();
+ StreamSinkEntity clsSink = new StreamSinkEntity();
+ clsSink.setInlongGroupId(INLONG_GROUP_ID);
+ clsSink.setInlongStreamId(INLONG_STREAM_ID);
+ clsSink.setInlongClusterName(CLS_CLUSTER_OLD);
+ clsSink.setSinkName(CLS_DATA_NODE_OLD);
+ clsSink.setSortTaskName(CLS_DATA_NODE_OLD);
+ clsSink.setDataNodeName(CLS_DATA_NODE_OLD);
+ clsSink.setSortConsumerGroup(CLS_DATA_NODE_OLD);
+ clsSink.setSinkType("cls");
+ streamSinks.add(clsSink);
+ StreamSinkEntity kafkaSink = new StreamSinkEntity();
+ kafkaSink.setInlongGroupId(INLONG_GROUP_ID);
+ kafkaSink.setInlongStreamId(INLONG_STREAM_ID);
+ kafkaSink.setInlongClusterName("kafka_1");
+ kafkaSink.setSinkName("sid_" + kafkaSink.getInlongClusterName());
+ kafkaSink.setSortTaskName("sid_" + kafkaSink.getInlongClusterName());
+ kafkaSink.setDataNodeName("sid_" + kafkaSink.getInlongClusterName());
+ kafkaSink.setSortConsumerGroup("sid_" +
kafkaSink.getInlongClusterName());
+ kafkaSink.setSinkType("kafka");
+ streamSinks.add(kafkaSink);
+ StreamSinkEntityMapper mapper =
PowerMockito.mock(StreamSinkEntityMapper.class);
+
PowerMockito.when(mapper.selectByCondition(any())).thenReturn(streamSinks);
+ PowerMockito.when(mapper.insert(any())).thenReturn(1);
+ PowerMockito.when(mapper.deleteByPrimaryKey(anyInt())).thenReturn(1);
+ return mapper;
+ }
+
+ @Test
+ public void testRemoveBackupClusterTag() {
+ final DataProxyConfigRepository repository = new
DataProxyConfigRepository();
+ // group
+ InlongGroupEntityMapper groupMapper = this.mockGroupMapper();
+ InlongGroupEntity groupEntity =
groupMapper.selectByGroupId(INLONG_GROUP_ID);
+ groupEntity.setInlongClusterTag(CLUSTER_TAG_NEW);
+ groupEntity.setMqResource(TOPIC_NEW);
+ groupEntity.setExtParams(
+ String.format("{\"%s\":\"%s\",\"%s\":\"%s\"}",
DataProxyConfigRepository.KEY_BACKUP_CLUSTER_TAG,
+ CLUSTER_TAG_OLD,
DataProxyConfigRepository.KEY_BACKUP_TOPIC, TOPIC_OLD));
+ repository.setInlongGroupMapper(groupMapper);
+ // cluster
+ repository.setClusterMapper(this.mockClusterMapper());
+ // stream sink
+ StreamSinkEntityMapper streamSinkMapper = this.mockStreamSinkMapper();
+ SinkPageRequest sinkPageRequest = new SinkPageRequest();
+ final List<StreamSinkEntity> streamSinks =
streamSinkMapper.selectByCondition(sinkPageRequest);
+ StreamSinkEntity clsSink = new StreamSinkEntity();
+ clsSink.setInlongGroupId(INLONG_GROUP_ID);
+ clsSink.setInlongStreamId(INLONG_STREAM_ID);
+ clsSink.setInlongClusterName(CLS_CLUSTER_NEW);
+ clsSink.setSinkName(CLS_DATA_NODE_NEW);
+ clsSink.setSortTaskName(CLS_DATA_NODE_NEW);
+ clsSink.setDataNodeName(CLS_DATA_NODE_NEW);
+ clsSink.setSortConsumerGroup(CLS_DATA_NODE_NEW);
+ clsSink.setSinkType("cls");
+ streamSinks.add(clsSink);
+ repository.setStreamSinkMapper(streamSinkMapper);
+ // test
+ String inlongGroupId =
repository.removeBackupClusterTag(INLONG_GROUP_ID);
+ assertEquals(INLONG_GROUP_ID, inlongGroupId);
+ }
+}
diff --git
a/inlong-manager/manager-web/src/main/java/org/apache/inlong/manager/web/controller/openapi/DataProxyController.java
b/inlong-manager/manager-web/src/main/java/org/apache/inlong/manager/web/controller/openapi/DataProxyController.java
index 518dc50d3..a3b7e1284 100644
---
a/inlong-manager/manager-web/src/main/java/org/apache/inlong/manager/web/controller/openapi/DataProxyController.java
+++
b/inlong-manager/manager-web/src/main/java/org/apache/inlong/manager/web/controller/openapi/DataProxyController.java
@@ -17,27 +17,30 @@
package org.apache.inlong.manager.web.controller.openapi;
-import io.swagger.annotations.Api;
-import io.swagger.annotations.ApiImplicitParam;
-import io.swagger.annotations.ApiImplicitParams;
-import io.swagger.annotations.ApiOperation;
import org.apache.commons.collections.CollectionUtils;
import org.apache.inlong.common.pojo.dataproxy.DataProxyConfig;
import org.apache.inlong.manager.common.beans.Response;
import org.apache.inlong.manager.common.pojo.cluster.ClusterPageRequest;
import org.apache.inlong.manager.common.pojo.dataproxy.DataProxyNodeInfo;
import org.apache.inlong.manager.service.cluster.InlongClusterService;
+import org.apache.inlong.manager.service.repository.DataProxyConfigRepository;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Lazy;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestBody;
import org.springframework.web.bind.annotation.RequestMapping;
+import org.springframework.web.bind.annotation.RequestMethod;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.RestController;
import java.util.List;
+import io.swagger.annotations.Api;
+import io.swagger.annotations.ApiImplicitParam;
+import io.swagger.annotations.ApiImplicitParams;
+import io.swagger.annotations.ApiOperation;
+
/**
* Data proxy controller.
*/
@@ -49,6 +52,8 @@ public class DataProxyController {
@Autowired
@Lazy
private InlongClusterService clusterService;
+ @Autowired
+ private DataProxyConfigRepository dataProxyConfigRepository;
@PostMapping(value = "/dataproxy/getIpList")
@ApiOperation(value = "Get data proxy ip list by cluster name and tag")
@@ -82,4 +87,25 @@ public class DataProxyController {
return clusterService.getAllConfig(clusterName, md5);
}
+ /**
+ * changeClusterTag
+ */
+ @RequestMapping(value = "/changeClusterTag", method = RequestMethod.PUT)
+ @ApiOperation(value = "Change cluster tag and topic of a inlong group id.")
+ public Response<String> changeClusterTag(@RequestParam String
inlongGroupId, @RequestParam String clusterTag,
+ @RequestParam String topic) {
+ String result =
dataProxyConfigRepository.changeClusterTag(inlongGroupId, clusterTag, topic);
+ return Response.success(result);
+ }
+
+ /**
+ * removeBackupClusterTag
+ */
+ @RequestMapping(value = "/removeBackupClusterTag", method =
RequestMethod.PUT)
+ @ApiOperation(value = "remove backup cluster tag and topic of a inlong
group id.")
+ public Response<String> removeBackupClusterTag(@RequestParam String
inlongGroupId) {
+ String result =
dataProxyConfigRepository.removeBackupClusterTag(inlongGroupId);
+ return Response.success(result);
+ }
+
}