This is an automated email from the ASF dual-hosted git repository.
dockerzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git
The following commit(s) were added to refs/heads/master by this push:
new 8d853493e4 [INLONG-9314][Manager] Support cluster switch for
InlongGroup (#9353)
8d853493e4 is described below
commit 8d853493e42f8144257c3caeb6df5d1777df714b
Author: vernedeng <[email protected]>
AuthorDate: Tue Dec 5 09:35:15 2023 +0800
[INLONG-9314][Manager] Support cluster switch for InlongGroup (#9353)
---
.../inlong/common/constant/ClusterSwitch.java | 7 +
.../inlong/manager/common/consts/SinkType.java | 15 ++
.../inlong/manager/common/enums/ErrorCodeEnum.java | 2 +
.../dao/mapper/TenantClusterTagEntityMapper.java | 2 +
.../mappers/TenantClusterTagEntityMapper.xml | 8 +
.../manager/service/group/InlongGroupService.java | 4 +
.../service/group/InlongGroupServiceImpl.java | 164 +++++++++++++++++++++
.../AbstractStandaloneSinkResourceOperator.java | 3 +-
.../web/controller/InlongGroupController.java | 18 +++
9 files changed, 222 insertions(+), 1 deletion(-)
diff --git
a/inlong-common/src/main/java/org/apache/inlong/common/constant/ClusterSwitch.java
b/inlong-common/src/main/java/org/apache/inlong/common/constant/ClusterSwitch.java
index 90816730ce..847162da1d 100644
---
a/inlong-common/src/main/java/org/apache/inlong/common/constant/ClusterSwitch.java
+++
b/inlong-common/src/main/java/org/apache/inlong/common/constant/ClusterSwitch.java
@@ -31,4 +31,11 @@ public class ClusterSwitch {
* MQ resource for backup, represents the namespace of Pulsar, the topic
of TubeMQ, etc.
*/
public static final String BACKUP_MQ_RESOURCE = "backup_mq_resource";
+
+ /**
+ * Cluster swtich start time
+ */
+ public static final String CLUSTER_SWITCH_TIME = "cluster_switch_time";
+
+ public static final int FINISH_SWITCH_INTERVAL_MIN = 10;
}
diff --git
a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/consts/SinkType.java
b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/consts/SinkType.java
index b9b8a17350..68e80e7d07 100644
---
a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/consts/SinkType.java
+++
b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/consts/SinkType.java
@@ -17,9 +17,13 @@
package org.apache.inlong.manager.common.consts;
+import org.apache.inlong.manager.common.enums.ClusterType;
+
import java.lang.reflect.Field;
+import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
+import java.util.Map;
import java.util.Objects;
import java.util.Set;
@@ -74,10 +78,18 @@ public class SinkType extends StreamType {
@SupportSortType(sortType = SortType.SORT_STANDALONE)
public static final String CLS = "CLS";
+ public static final Map<String, String> SINK_TO_CLUSTER = new HashMap<>();
+
public static final Set<String> SORT_FLINK_SINK = new HashSet<>();
public static final Set<String> SORT_STANDALONE_SINK = new HashSet<>();
+ static {
+ SINK_TO_CLUSTER.put(CLS, ClusterType.SORT_CLS);
+ SINK_TO_CLUSTER.put(ELASTICSEARCH, ClusterType.SORT_ES);
+ SINK_TO_CLUSTER.put(PULSAR, ClusterType.SORT_PULSAR);
+ }
+
static {
SinkType obj = new SinkType();
Class<? extends SinkType> clazz = obj.getClass();
@@ -98,4 +110,7 @@ public class SinkType extends StreamType {
return sinkTypes.stream().anyMatch(SORT_FLINK_SINK::contains);
}
+ public static String relatedSortClusterType(String sinkType) {
+ return SINK_TO_CLUSTER.get(sinkType);
+ }
}
diff --git
a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/enums/ErrorCodeEnum.java
b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/enums/ErrorCodeEnum.java
index 289ddba60d..9d20be566d 100644
---
a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/enums/ErrorCodeEnum.java
+++
b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/enums/ErrorCodeEnum.java
@@ -68,6 +68,8 @@ public enum ErrorCodeEnum {
CLUSTER_INFO_INCORRECT(1103, "Cluster info was incorrect"),
CLUSTER_TAG_NOT_FOUND(1104, "Cluster tag information does not exist"),
+ TENANT_CLUSTER_TAG_NOT_FOUND(1105, "Tenant Cluster tag does not exist"),
+
DATA_NODE_NOT_FOUND(1150, "Data node information does not exist"),
DATA_NODE_TYPE_NOT_SUPPORTED(1151, "Data node type '%s' not supported"),
DATA_NODE_ID_CHANGED(1152, "Data node information's id not equals"),
diff --git
a/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/mapper/TenantClusterTagEntityMapper.java
b/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/mapper/TenantClusterTagEntityMapper.java
index 95d4beaa46..f9b34f2524 100644
---
a/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/mapper/TenantClusterTagEntityMapper.java
+++
b/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/mapper/TenantClusterTagEntityMapper.java
@@ -34,6 +34,8 @@ public interface TenantClusterTagEntityMapper {
TenantClusterTagEntity selectByPrimaryKey(Integer id);
+ TenantClusterTagEntity selectByUniqueKey(String clusterTag, String tenant);
+
List<TenantClusterTagEntity> selectByTag(String clusterTag);
List<TenantClusterTagEntity> selectByCondition(TenantClusterTagPageRequest
request);
diff --git
a/inlong-manager/manager-dao/src/main/resources/mappers/TenantClusterTagEntityMapper.xml
b/inlong-manager/manager-dao/src/main/resources/mappers/TenantClusterTagEntityMapper.xml
index 8e9e78dd22..3de752e8ca 100644
---
a/inlong-manager/manager-dao/src/main/resources/mappers/TenantClusterTagEntityMapper.xml
+++
b/inlong-manager/manager-dao/src/main/resources/mappers/TenantClusterTagEntityMapper.xml
@@ -41,6 +41,14 @@
from tenant_cluster_tag
where id = #{id,jdbcType=INTEGER}
</select>
+ <select id="selectByUniqueKey" resultMap="BaseResultMap">
+ select
+ <include refid="Base_Column_List" />
+ from tenant_cluster_tag
+ where tenant = #{tenant,jdbcType=VARCHAR}
+ and cluster_tag = #{clusterTag,jdbcType=VARCHAR}
+ and is_deleted = 0
+ </select>
<select id="selectByTag" parameterType="java.lang.String"
resultMap="BaseResultMap">
select
<include refid="Base_Column_List" />
diff --git
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/group/InlongGroupService.java
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/group/InlongGroupService.java
index c344548ddb..d2f8b09048 100644
---
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/group/InlongGroupService.java
+++
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/group/InlongGroupService.java
@@ -207,4 +207,8 @@ public interface InlongGroupService {
*/
Map<String, Object> detail(String groupId);
+ Boolean startTagSwitch(String groupId, String clusterTag);
+
+ Boolean finishTagSwitch(String groupId);
+
}
diff --git
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/group/InlongGroupServiceImpl.java
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/group/InlongGroupServiceImpl.java
index 62f9ee7da3..2d87b74995 100644
---
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/group/InlongGroupServiceImpl.java
+++
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/group/InlongGroupServiceImpl.java
@@ -20,21 +20,27 @@ package org.apache.inlong.manager.service.group;
import org.apache.inlong.manager.common.auth.Authentication.AuthType;
import org.apache.inlong.manager.common.auth.SecretTokenAuthentication;
import org.apache.inlong.manager.common.consts.InlongConstants;
+import org.apache.inlong.manager.common.consts.SinkType;
import org.apache.inlong.manager.common.enums.ErrorCodeEnum;
import org.apache.inlong.manager.common.enums.GroupStatus;
+import org.apache.inlong.manager.common.enums.ProcessName;
import org.apache.inlong.manager.common.enums.TenantUserTypeEnum;
import org.apache.inlong.manager.common.exceptions.BusinessException;
import org.apache.inlong.manager.common.util.CommonBeanUtils;
import org.apache.inlong.manager.common.util.JsonUtils;
import org.apache.inlong.manager.common.util.Preconditions;
+import org.apache.inlong.manager.dao.entity.InlongClusterEntity;
import org.apache.inlong.manager.dao.entity.InlongGroupEntity;
import org.apache.inlong.manager.dao.entity.InlongGroupExtEntity;
import org.apache.inlong.manager.dao.entity.InlongStreamExtEntity;
import org.apache.inlong.manager.dao.entity.StreamSourceEntity;
+import org.apache.inlong.manager.dao.entity.TenantClusterTagEntity;
+import org.apache.inlong.manager.dao.mapper.InlongClusterEntityMapper;
import org.apache.inlong.manager.dao.mapper.InlongGroupEntityMapper;
import org.apache.inlong.manager.dao.mapper.InlongGroupExtEntityMapper;
import org.apache.inlong.manager.dao.mapper.InlongStreamExtEntityMapper;
import org.apache.inlong.manager.dao.mapper.StreamSourceEntityMapper;
+import org.apache.inlong.manager.dao.mapper.TenantClusterTagEntityMapper;
import org.apache.inlong.manager.pojo.cluster.ClusterInfo;
import org.apache.inlong.manager.pojo.common.OrderFieldEnum;
import org.apache.inlong.manager.pojo.common.OrderTypeEnum;
@@ -48,16 +54,22 @@ import
org.apache.inlong.manager.pojo.group.InlongGroupPageRequest;
import org.apache.inlong.manager.pojo.group.InlongGroupRequest;
import org.apache.inlong.manager.pojo.group.InlongGroupTopicInfo;
import org.apache.inlong.manager.pojo.group.InlongGroupTopicRequest;
+import org.apache.inlong.manager.pojo.sink.StreamSink;
import org.apache.inlong.manager.pojo.sort.BaseSortConf;
import org.apache.inlong.manager.pojo.sort.BaseSortConf.SortType;
import org.apache.inlong.manager.pojo.sort.FlinkSortConf;
import org.apache.inlong.manager.pojo.sort.UserDefinedSortConf;
import org.apache.inlong.manager.pojo.source.StreamSource;
+import org.apache.inlong.manager.pojo.stream.InlongStreamInfo;
+import org.apache.inlong.manager.pojo.user.LoginUserUtils;
import org.apache.inlong.manager.pojo.user.UserInfo;
+import
org.apache.inlong.manager.pojo.workflow.form.process.GroupResourceProcessForm;
import org.apache.inlong.manager.service.cluster.InlongClusterService;
+import org.apache.inlong.manager.service.sink.StreamSinkService;
import org.apache.inlong.manager.service.source.SourceOperatorFactory;
import org.apache.inlong.manager.service.source.StreamSourceOperator;
import org.apache.inlong.manager.service.stream.InlongStreamService;
+import org.apache.inlong.manager.service.workflow.WorkflowService;
import com.fasterxml.jackson.core.type.TypeReference;
import com.github.pagehelper.Page;
@@ -76,6 +88,7 @@ import org.springframework.transaction.annotation.Propagation;
import org.springframework.transaction.annotation.Transactional;
import org.springframework.validation.annotation.Validated;
+import java.time.LocalDateTime;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
@@ -86,7 +99,11 @@ import java.util.Set;
import java.util.stream.Collectors;
import static
org.apache.inlong.common.constant.ClusterSwitch.BACKUP_CLUSTER_TAG;
+import static
org.apache.inlong.common.constant.ClusterSwitch.BACKUP_MQ_RESOURCE;
+import static
org.apache.inlong.common.constant.ClusterSwitch.CLUSTER_SWITCH_TIME;
+import static
org.apache.inlong.common.constant.ClusterSwitch.FINISH_SWITCH_INTERVAL_MIN;
import static org.apache.inlong.manager.pojo.common.PageRequest.MAX_PAGE_SIZE;
+import static
org.apache.inlong.manager.workflow.event.process.ProcessEventListener.EXECUTOR_SERVICE;
/**
* Inlong group service layer implementation
@@ -107,11 +124,19 @@ public class InlongGroupServiceImpl implements
InlongGroupService {
@Autowired
private InlongStreamService streamService;
@Autowired
+ private StreamSinkService streamSinkService;
+ @Autowired
private StreamSourceEntityMapper streamSourceMapper;
@Autowired
+ private TenantClusterTagEntityMapper tenantClusterTagMapper;
+ @Autowired
private InlongStreamExtEntityMapper streamExtMapper;
@Autowired
private InlongClusterService clusterService;
+ @Autowired
+ private WorkflowService workflowService;
+ @Autowired
+ private InlongClusterEntityMapper clusterEntityMapper;
@Autowired
private InlongGroupOperatorFactory groupOperatorFactory;
@@ -672,4 +697,143 @@ public class InlongGroupServiceImpl implements
InlongGroupService {
String.format("record has expired with record version=%d,
request version=%d",
entity.getVersion(), request.getVersion()));
}
+
+ @Override
+ @Transactional(rollbackFor = Throwable.class, isolation =
Isolation.REPEATABLE_READ, propagation = Propagation.REQUIRES_NEW)
+ public Boolean startTagSwitch(String groupId, String clusterTag) {
+ LOGGER.info("start to switch cluster tag for group={}, target tag={}",
groupId, clusterTag);
+
+ InlongGroupInfo groupInfo = this.get(groupId);
+
+ // check if the group mode is data sync mode
+ if
(InlongConstants.DATASYNC_MODE.equals(groupInfo.getInlongGroupMode())) {
+ String errMSg = String.format("no need to switch sync mode group =
{}", groupId);
+ LOGGER.error(errMSg);
+ throw new BusinessException(errMSg);
+ }
+
+ // check if the group is under switching
+ List<InlongGroupExtInfo> groupExt = groupInfo.getExtList();
+ Set<String> keys = groupExt.stream()
+ .map(InlongGroupExtInfo::getKeyName)
+ .collect(Collectors.toSet());
+
+ if (keys.contains(BACKUP_CLUSTER_TAG) ||
keys.contains(BACKUP_MQ_RESOURCE)) {
+ String errMsg = String.format("switch failed, current group is
under switching, group=[%s]", groupId);
+ LOGGER.error(errMsg);
+ throw new BusinessException(errMsg);
+ }
+
+ // check if the cluster tag is under current tenant
+ InlongGroupEntity groupEntity = groupMapper.selectByGroupId(groupId);
+ if (groupEntity == null) {
+ LOGGER.error("inlong group not found by groupId={}", groupId);
+ throw new BusinessException(ErrorCodeEnum.GROUP_NOT_FOUND);
+ }
+
+ TenantClusterTagEntity tenantClusterTag =
+ tenantClusterTagMapper.selectByUniqueKey(clusterTag,
groupEntity.getTenant());
+ if (tenantClusterTag == null) {
+ LOGGER.error("tenant cluster not found for tenant={},
clusterTag={}", groupEntity.getTenant(), clusterTag);
+ throw new
BusinessException(ErrorCodeEnum.TENANT_CLUSTER_TAG_NOT_FOUND);
+ }
+
+ // check if all sink related sort cluster has the target cluster tag
+ List<StreamSink> sinks =
streamSinkService.listSink(groupEntity.getInlongGroupId(), null);
+ for (StreamSink sink : sinks) {
+ String clusterName = sink.getInlongClusterName();
+ List<InlongClusterEntity> clusterEntity =
+ clusterEntityMapper.selectByKey(clusterTag, clusterName,
+
SinkType.relatedSortClusterType(sink.getSinkType()));
+ if (CollectionUtils.isEmpty(clusterEntity) || clusterEntity.size()
!= 1) {
+ String errMsg = String.format("find no cluster or multiple
cluster with clusterName=[%s]", clusterName);
+ LOGGER.error(errMsg);
+ throw new BusinessException(ErrorCodeEnum.CLUSTER_NOT_FOUND,
errMsg);
+ }
+
+ }
+
+ // config cluster tag and backup_cluster_tag
+ UserInfo userInfo = LoginUserUtils.getLoginUser();
+ InlongGroupRequest request = groupInfo.genRequest();
+ String oldClusterTag = request.getInlongClusterTag();
+ request.setInlongClusterTag(clusterTag);
+ request.getExtList().add(new InlongGroupExtInfo(null, groupId,
BACKUP_CLUSTER_TAG, oldClusterTag));
+ request.getExtList().add(new InlongGroupExtInfo(null, groupId,
BACKUP_MQ_RESOURCE, request.getMqResource()));
+ request.getExtList().add(new InlongGroupExtInfo(null, groupId,
CLUSTER_SWITCH_TIME,
+ LocalDateTime.now().toString()));
+ this.update(request, userInfo.getName());
+
+ // trigger group workflow to rebuild configs
+ this.triggerWorkFlow(groupInfo, userInfo);
+ LOGGER.info("success to switch cluster tag for group={}, target
tag={}", groupId, clusterTag);
+ return true;
+ }
+
+ @Override
+ @Transactional(rollbackFor = Throwable.class, isolation =
Isolation.REPEATABLE_READ, propagation = Propagation.REQUIRES_NEW)
+ public Boolean finishTagSwitch(String groupId) {
+ LOGGER.info("start to finish switch cluster tag for group={}",
groupId);
+
+ InlongGroupInfo groupInfo = this.get(groupId);
+ UserInfo userInfo = LoginUserUtils.getLoginUser();
+
+ // check whether the current status supports modification
+ GroupStatus curStatus = GroupStatus.forCode(groupInfo.getStatus());
+ if (GroupStatus.notAllowedUpdate(curStatus)) {
+ String errMsg = String.format("Current status=%s is not allowed to
update", curStatus);
+ LOGGER.error(errMsg);
+ throw new
BusinessException(ErrorCodeEnum.GROUP_UPDATE_NOT_ALLOWED, errMsg);
+ }
+
+ // check if the group is under switching
+ List<InlongGroupExtInfo> groupExt = groupInfo.getExtList();
+ Map<String, InlongGroupExtInfo> extInfoMap = groupExt.stream()
+ .collect(Collectors.toMap(InlongGroupExtInfo::getKeyName, v ->
v));
+
+ if (!extInfoMap.containsKey(BACKUP_CLUSTER_TAG) ||
!extInfoMap.containsKey(BACKUP_MQ_RESOURCE)) {
+ String errMsg = String.format("finish switch failed, current group
is not under switching, group=[%s]",
+ groupId);
+ LOGGER.error(errMsg);
+ throw new BusinessException(errMsg);
+ }
+
+ InlongGroupExtInfo switchTime = extInfoMap.get(CLUSTER_SWITCH_TIME);
+ LocalDateTime switchStartTime =
+ switchTime == null ? LocalDateTime.MIN :
LocalDateTime.parse(switchTime.getKeyValue());
+
+ // check the switch time
+ LocalDateTime allowSwitchTime =
switchStartTime.plusMinutes(FINISH_SWITCH_INTERVAL_MIN);
+ if (LocalDateTime.now().isBefore(allowSwitchTime)) {
+ String errMsg = String.format("finish switch failed, please retry
until={}", allowSwitchTime);
+ LOGGER.error(errMsg);
+ throw new BusinessException(errMsg);
+ }
+
+ // remove backup ext info
+ removeExt(extInfoMap.get(BACKUP_CLUSTER_TAG));
+ removeExt(extInfoMap.get(BACKUP_MQ_RESOURCE));
+ removeExt(extInfoMap.get(CLUSTER_SWITCH_TIME));
+
+ // trigger group workflow to rebuild configs
+ this.triggerWorkFlow(groupInfo, userInfo);
+ return true;
+ }
+
+ private void triggerWorkFlow(InlongGroupInfo groupInfo, UserInfo userInfo)
{
+ GroupResourceProcessForm processForm = new GroupResourceProcessForm();
+ processForm.setGroupInfo(groupInfo);
+ List<InlongStreamInfo> streamList =
streamService.list(groupInfo.getInlongGroupId());
+ processForm.setStreamInfos(streamList);
+ EXECUTOR_SERVICE.execute(
+ () ->
workflowService.startAsync(ProcessName.CREATE_GROUP_RESOURCE, userInfo,
processForm));
+ }
+
+ private void removeExt(InlongGroupExtInfo extInfo) {
+ if (extInfo == null || extInfo.getId() == null) {
+ return;
+ }
+ groupExtMapper.deleteByPrimaryKey(extInfo.getId());
+ }
+
}
diff --git
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/sink/AbstractStandaloneSinkResourceOperator.java
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/sink/AbstractStandaloneSinkResourceOperator.java
index 61187e6f8d..dd1c26d9f7 100644
---
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/sink/AbstractStandaloneSinkResourceOperator.java
+++
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/sink/AbstractStandaloneSinkResourceOperator.java
@@ -18,6 +18,7 @@
package org.apache.inlong.manager.service.resource.sink;
import org.apache.inlong.manager.common.consts.InlongConstants;
+import org.apache.inlong.manager.common.consts.SinkType;
import org.apache.inlong.manager.common.util.Preconditions;
import org.apache.inlong.manager.dao.entity.InlongClusterEntity;
import org.apache.inlong.manager.dao.entity.InlongGroupEntity;
@@ -79,7 +80,7 @@ public abstract class AbstractStandaloneSinkResourceOperator
implements SinkReso
private String assignFromRelated(String sinkType, String groupId) {
InlongGroupEntity group = groupEntityMapper.selectByGroupId(groupId);
- String sortClusterType = SORT_PREFIX.concat(sinkType);
+ String sortClusterType = SinkType.relatedSortClusterType(sinkType);
List<InlongClusterEntity> clusters = clusterEntityMapper
.selectByKey(null, null, sortClusterType).stream()
.filter(cluster -> checkCluster(cluster.getClusterTags(),
group.getInlongClusterTag()))
diff --git
a/inlong-manager/manager-web/src/main/java/org/apache/inlong/manager/web/controller/InlongGroupController.java
b/inlong-manager/manager-web/src/main/java/org/apache/inlong/manager/web/controller/InlongGroupController.java
index 2e49fd9380..0ad34ae506 100644
---
a/inlong-manager/manager-web/src/main/java/org/apache/inlong/manager/web/controller/InlongGroupController.java
+++
b/inlong-manager/manager-web/src/main/java/org/apache/inlong/manager/web/controller/InlongGroupController.java
@@ -40,6 +40,7 @@ import
org.apache.inlong.manager.service.operationlog.OperationLog;
import io.swagger.annotations.Api;
import io.swagger.annotations.ApiImplicitParam;
+import io.swagger.annotations.ApiImplicitParams;
import io.swagger.annotations.ApiOperation;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.validation.annotation.Validated;
@@ -206,4 +207,21 @@ public class InlongGroupController {
return Response.success(groupService.detail(groupId));
}
+ @RequestMapping(value = "/group/switch/start/{groupId}/{clusterTag}",
method = RequestMethod.GET)
+ @ApiOperation(value = "start tag switch")
+ @ApiImplicitParams({
+ @ApiImplicitParam(name = "groupId", value = "Inlong group id",
dataTypeClass = String.class, required = true),
+ @ApiImplicitParam(name = "clusterTag", value = "cluster tag",
dataTypeClass = String.class, required = true)
+ })
+ public Response<Boolean> startTagSwitch(@PathVariable String groupId,
@PathVariable String clusterTag) {
+ return Response.success(groupService.startTagSwitch(groupId,
clusterTag));
+ }
+
+ @RequestMapping(value = "/group/switch/finish/{groupId}", method =
RequestMethod.GET)
+ @ApiOperation(value = "finish tag switch")
+ @ApiImplicitParam(name = "groupId", value = "Inlong group id",
dataTypeClass = String.class, required = true)
+ public Response<Boolean> finishTagSwitch(@PathVariable String groupId) {
+ return Response.success(groupService.finishTagSwitch(groupId));
+ }
+
}
\ No newline at end of file