fuweng11 commented on code in PR #9353:
URL: https://github.com/apache/inlong/pull/9353#discussion_r1411665429
##########
inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/group/InlongGroupServiceImpl.java:
##########
@@ -672,4 +699,150 @@ private void chkUnmodifiableParams(InlongGroupEntity
entity, InlongGroupRequest
String.format("record has expired with record version=%d,
request version=%d",
entity.getVersion(), request.getVersion()));
}
+
+ @Override
+ @Transactional(rollbackFor = Throwable.class, isolation =
Isolation.REPEATABLE_READ, propagation = Propagation.REQUIRES_NEW)
+ public Boolean startTagSwitch(String groupId, String clusterTag) {
+ LOGGER.info("start to switch cluster tag for group={}, target tag={}",
groupId, clusterTag);
Review Comment:
start to switch cluster tag for groupId=
##########
inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/group/InlongGroupServiceImpl.java:
##########
@@ -672,4 +699,150 @@ private void chkUnmodifiableParams(InlongGroupEntity
entity, InlongGroupRequest
String.format("record has expired with record version=%d,
request version=%d",
entity.getVersion(), request.getVersion()));
}
+
+ @Override
+ @Transactional(rollbackFor = Throwable.class, isolation =
Isolation.REPEATABLE_READ, propagation = Propagation.REQUIRES_NEW)
+ public Boolean startTagSwitch(String groupId, String clusterTag) {
+ LOGGER.info("start to switch cluster tag for group={}, target tag={}",
groupId, clusterTag);
+
+ InlongGroupInfo groupInfo = this.get(groupId);
+
+ // check if the group mode is data sync mode
+ if (groupInfo.getInlongGroupMode() == 1) {
Review Comment:
InlongConstants.DATASYNC_MODE.equals(groupEntity.getInlongGroupMode())
##########
inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/group/InlongGroupServiceImpl.java:
##########
@@ -672,4 +699,150 @@ private void chkUnmodifiableParams(InlongGroupEntity
entity, InlongGroupRequest
String.format("record has expired with record version=%d,
request version=%d",
entity.getVersion(), request.getVersion()));
}
+
+ @Override
+ @Transactional(rollbackFor = Throwable.class, isolation =
Isolation.REPEATABLE_READ, propagation = Propagation.REQUIRES_NEW)
+ public Boolean startTagSwitch(String groupId, String clusterTag) {
+ LOGGER.info("start to switch cluster tag for group={}, target tag={}",
groupId, clusterTag);
+
+ InlongGroupInfo groupInfo = this.get(groupId);
+
+ // check if the group mode is data sync mode
+ if (groupInfo.getInlongGroupMode() == 1) {
+ String errMSg = String.format("no need to switch sync mode group =
{}", groupId);
Review Comment:
groupId = {}
##########
inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/group/InlongGroupServiceImpl.java:
##########
@@ -672,4 +699,150 @@ private void chkUnmodifiableParams(InlongGroupEntity
entity, InlongGroupRequest
String.format("record has expired with record version=%d,
request version=%d",
entity.getVersion(), request.getVersion()));
}
+
+ @Override
+ @Transactional(rollbackFor = Throwable.class, isolation =
Isolation.REPEATABLE_READ, propagation = Propagation.REQUIRES_NEW)
+ public Boolean startTagSwitch(String groupId, String clusterTag) {
+ LOGGER.info("start to switch cluster tag for group={}, target tag={}",
groupId, clusterTag);
+
+ InlongGroupInfo groupInfo = this.get(groupId);
+
+ // check if the group mode is data sync mode
+ if (groupInfo.getInlongGroupMode() == 1) {
+ String errMSg = String.format("no need to switch sync mode group =
{}", groupId);
+ LOGGER.error(errMSg);
+ throw new BusinessException(errMSg);
+ }
+
+ // check if the group is under switching
+ List<InlongGroupExtInfo> groupExt = groupInfo.getExtList();
+ Set<String> keys = groupExt.stream()
+ .map(InlongGroupExtInfo::getKeyName)
+ .collect(Collectors.toSet());
+
+ if (keys.contains(BACKUP_CLUSTER_TAG) ||
keys.contains(BACKUP_MQ_RESOURCE)) {
+ String errMsg = String.format("switch failed, current group is
under switching, group=[%s]", groupId);
+ LOGGER.error(errMsg);
+ throw new BusinessException(errMsg);
+ }
+
+ // check if the cluster tag is under current tenant
+ InlongGroupEntity groupEntity = groupMapper.selectByGroupId(groupId);
+ if (groupEntity == null) {
+ LOGGER.error("inlong group not found by groupId={}", groupId);
+ throw new BusinessException(ErrorCodeEnum.GROUP_NOT_FOUND);
+ }
+
+ TenantClusterTagEntity tenantClusterTag =
+ tenantClusterTagMapper.selectByUniqueKey(clusterTag,
groupEntity.getTenant());
+ if (tenantClusterTag == null) {
+ LOGGER.error("tenant cluster not found for tenant={}, cluster={}",
groupEntity.getTenant(), clusterTag);
Review Comment:
clusterTag=
##########
inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/group/InlongGroupServiceImpl.java:
##########
@@ -672,4 +699,150 @@ private void chkUnmodifiableParams(InlongGroupEntity
entity, InlongGroupRequest
String.format("record has expired with record version=%d,
request version=%d",
entity.getVersion(), request.getVersion()));
}
+
+ @Override
+ @Transactional(rollbackFor = Throwable.class, isolation =
Isolation.REPEATABLE_READ, propagation = Propagation.REQUIRES_NEW)
+ public Boolean startTagSwitch(String groupId, String clusterTag) {
+ LOGGER.info("start to switch cluster tag for group={}, target tag={}",
groupId, clusterTag);
+
+ InlongGroupInfo groupInfo = this.get(groupId);
+
+ // check if the group mode is data sync mode
+ if (groupInfo.getInlongGroupMode() == 1) {
+ String errMSg = String.format("no need to switch sync mode group =
{}", groupId);
+ LOGGER.error(errMSg);
+ throw new BusinessException(errMSg);
+ }
+
+ // check if the group is under switching
+ List<InlongGroupExtInfo> groupExt = groupInfo.getExtList();
+ Set<String> keys = groupExt.stream()
+ .map(InlongGroupExtInfo::getKeyName)
+ .collect(Collectors.toSet());
+
+ if (keys.contains(BACKUP_CLUSTER_TAG) ||
keys.contains(BACKUP_MQ_RESOURCE)) {
+ String errMsg = String.format("switch failed, current group is
under switching, group=[%s]", groupId);
Review Comment:
groupId=
##########
inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/group/InlongGroupServiceImpl.java:
##########
@@ -672,4 +699,150 @@ private void chkUnmodifiableParams(InlongGroupEntity
entity, InlongGroupRequest
String.format("record has expired with record version=%d,
request version=%d",
entity.getVersion(), request.getVersion()));
}
+
+ @Override
+ @Transactional(rollbackFor = Throwable.class, isolation =
Isolation.REPEATABLE_READ, propagation = Propagation.REQUIRES_NEW)
+ public Boolean startTagSwitch(String groupId, String clusterTag) {
+ LOGGER.info("start to switch cluster tag for group={}, target tag={}",
groupId, clusterTag);
+
+ InlongGroupInfo groupInfo = this.get(groupId);
+
+ // check if the group mode is data sync mode
+ if (groupInfo.getInlongGroupMode() == 1) {
+ String errMSg = String.format("no need to switch sync mode group =
{}", groupId);
+ LOGGER.error(errMSg);
+ throw new BusinessException(errMSg);
+ }
+
+ // check if the group is under switching
+ List<InlongGroupExtInfo> groupExt = groupInfo.getExtList();
+ Set<String> keys = groupExt.stream()
+ .map(InlongGroupExtInfo::getKeyName)
+ .collect(Collectors.toSet());
+
+ if (keys.contains(BACKUP_CLUSTER_TAG) ||
keys.contains(BACKUP_MQ_RESOURCE)) {
+ String errMsg = String.format("switch failed, current group is
under switching, group=[%s]", groupId);
+ LOGGER.error(errMsg);
+ throw new BusinessException(errMsg);
+ }
+
+ // check if the cluster tag is under current tenant
+ InlongGroupEntity groupEntity = groupMapper.selectByGroupId(groupId);
+ if (groupEntity == null) {
+ LOGGER.error("inlong group not found by groupId={}", groupId);
+ throw new BusinessException(ErrorCodeEnum.GROUP_NOT_FOUND);
+ }
+
+ TenantClusterTagEntity tenantClusterTag =
+ tenantClusterTagMapper.selectByUniqueKey(clusterTag,
groupEntity.getTenant());
+ if (tenantClusterTag == null) {
+ LOGGER.error("tenant cluster not found for tenant={}, cluster={}",
groupEntity.getTenant(), clusterTag);
+ throw new
BusinessException(ErrorCodeEnum.TENANT_CLUSTER_TAG_NOT_FOUND);
+ }
+
+ // check if all sink related sort cluster has the target cluster tag
+ List<StreamSink> sinks =
streamSinkService.listSink(groupEntity.getInlongGroupId(), null);
+ for (StreamSink sink : sinks) {
+ String clusterName = sink.getInlongClusterName();
+ InlongClusterEntity clusterEntity =
+ clusterEntityMapper.selectByNameAndType(clusterName,
+
SinkType.relatedSortClusterType(sink.getSinkType()));
+ if (clusterEntity == null) {
+ String errMsg = String.format("find no cluster with
name=[%s]", clusterName);
Review Comment:
clusterName=[%s]
##########
inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/group/InlongGroupService.java:
##########
@@ -207,4 +207,8 @@ void updateAfterApprove(
*/
Map<String, Object> detail(String groupId);
+ Boolean startTagSwitch(String groupId, String clusterTag);
Review Comment:
Add doc.
##########
inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/group/InlongGroupServiceImpl.java:
##########
@@ -672,4 +699,150 @@ private void chkUnmodifiableParams(InlongGroupEntity
entity, InlongGroupRequest
String.format("record has expired with record version=%d,
request version=%d",
entity.getVersion(), request.getVersion()));
}
+
+ @Override
+ @Transactional(rollbackFor = Throwable.class, isolation =
Isolation.REPEATABLE_READ, propagation = Propagation.REQUIRES_NEW)
+ public Boolean startTagSwitch(String groupId, String clusterTag) {
+ LOGGER.info("start to switch cluster tag for group={}, target tag={}",
groupId, clusterTag);
+
+ InlongGroupInfo groupInfo = this.get(groupId);
+
+ // check if the group mode is data sync mode
+ if (groupInfo.getInlongGroupMode() == 1) {
+ String errMSg = String.format("no need to switch sync mode group =
{}", groupId);
+ LOGGER.error(errMSg);
+ throw new BusinessException(errMSg);
+ }
+
+ // check if the group is under switching
+ List<InlongGroupExtInfo> groupExt = groupInfo.getExtList();
+ Set<String> keys = groupExt.stream()
+ .map(InlongGroupExtInfo::getKeyName)
+ .collect(Collectors.toSet());
+
+ if (keys.contains(BACKUP_CLUSTER_TAG) ||
keys.contains(BACKUP_MQ_RESOURCE)) {
+ String errMsg = String.format("switch failed, current group is
under switching, group=[%s]", groupId);
+ LOGGER.error(errMsg);
+ throw new BusinessException(errMsg);
+ }
+
+ // check if the cluster tag is under current tenant
+ InlongGroupEntity groupEntity = groupMapper.selectByGroupId(groupId);
+ if (groupEntity == null) {
+ LOGGER.error("inlong group not found by groupId={}", groupId);
+ throw new BusinessException(ErrorCodeEnum.GROUP_NOT_FOUND);
+ }
+
+ TenantClusterTagEntity tenantClusterTag =
+ tenantClusterTagMapper.selectByUniqueKey(clusterTag,
groupEntity.getTenant());
+ if (tenantClusterTag == null) {
+ LOGGER.error("tenant cluster not found for tenant={}, cluster={}",
groupEntity.getTenant(), clusterTag);
+ throw new
BusinessException(ErrorCodeEnum.TENANT_CLUSTER_TAG_NOT_FOUND);
+ }
+
+ // check if all sink related sort cluster has the target cluster tag
+ List<StreamSink> sinks =
streamSinkService.listSink(groupEntity.getInlongGroupId(), null);
+ for (StreamSink sink : sinks) {
+ String clusterName = sink.getInlongClusterName();
+ InlongClusterEntity clusterEntity =
+ clusterEntityMapper.selectByNameAndType(clusterName,
+
SinkType.relatedSortClusterType(sink.getSinkType()));
+ if (clusterEntity == null) {
+ String errMsg = String.format("find no cluster with
name=[%s]", clusterName);
+ LOGGER.error(errMsg);
+ throw new BusinessException(ErrorCodeEnum.CLUSTER_NOT_FOUND,
errMsg);
+ }
+
+ Set<String> tags =
ImmutableSet.copyOf(clusterEntity.getClusterTags().split(InlongConstants.COMMA));
+ if (!tags.isEmpty() && !tags.contains(clusterTag)) {
+ String errMsg = String.format("find no cluster tag=[%s] in
cluster name=[%s]", clusterTag, clusterName);
+ LOGGER.error(errMsg);
+ throw new
BusinessException(ErrorCodeEnum.CLUSTER_TAG_NOT_FOUND, errMsg);
+ }
+
+ }
+
+ // config cluster tag and backup_cluster_tag
+ UserInfo userInfo = LoginUserUtils.getLoginUser();
+ InlongGroupRequest request = groupInfo.genRequest();
+ String oldClusterTag = request.getInlongClusterTag();
+ request.setInlongClusterTag(clusterTag);
+ request.getExtList().add(new InlongGroupExtInfo(null, groupId,
BACKUP_CLUSTER_TAG, oldClusterTag));
+ request.getExtList().add(new InlongGroupExtInfo(null, groupId,
BACKUP_MQ_RESOURCE, request.getMqResource()));
+ request.getExtList().add(new InlongGroupExtInfo(null, groupId,
CLUSTER_SWITCH_TIME,
+ LocalDateTime.now().toString()));
+ this.update(request, userInfo.getName());
+
+ // trigger group workflow to rebuild configs
+ this.triggerWorkFlow(groupInfo, userInfo);
+ LOGGER.info("success to switch cluster tag for group={}, target
tag={}", groupId, clusterTag);
+ return true;
+ }
+
+ @Override
+ @Transactional(rollbackFor = Throwable.class, isolation =
Isolation.REPEATABLE_READ, propagation = Propagation.REQUIRES_NEW)
+ public Boolean finishTagSwitch(String groupId) {
+ LOGGER.info("start to finish switch cluster tag for group={}",
groupId);
Review Comment:
groupId={}
##########
inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/group/InlongGroupServiceImpl.java:
##########
@@ -672,4 +699,150 @@ private void chkUnmodifiableParams(InlongGroupEntity
entity, InlongGroupRequest
String.format("record has expired with record version=%d,
request version=%d",
entity.getVersion(), request.getVersion()));
}
+
+ @Override
+ @Transactional(rollbackFor = Throwable.class, isolation =
Isolation.REPEATABLE_READ, propagation = Propagation.REQUIRES_NEW)
+ public Boolean startTagSwitch(String groupId, String clusterTag) {
+ LOGGER.info("start to switch cluster tag for group={}, target tag={}",
groupId, clusterTag);
+
+ InlongGroupInfo groupInfo = this.get(groupId);
+
+ // check if the group mode is data sync mode
+ if (groupInfo.getInlongGroupMode() == 1) {
+ String errMSg = String.format("no need to switch sync mode group =
{}", groupId);
+ LOGGER.error(errMSg);
+ throw new BusinessException(errMSg);
+ }
+
+ // check if the group is under switching
+ List<InlongGroupExtInfo> groupExt = groupInfo.getExtList();
+ Set<String> keys = groupExt.stream()
+ .map(InlongGroupExtInfo::getKeyName)
+ .collect(Collectors.toSet());
+
+ if (keys.contains(BACKUP_CLUSTER_TAG) ||
keys.contains(BACKUP_MQ_RESOURCE)) {
+ String errMsg = String.format("switch failed, current group is
under switching, group=[%s]", groupId);
+ LOGGER.error(errMsg);
+ throw new BusinessException(errMsg);
+ }
+
+ // check if the cluster tag is under current tenant
+ InlongGroupEntity groupEntity = groupMapper.selectByGroupId(groupId);
+ if (groupEntity == null) {
+ LOGGER.error("inlong group not found by groupId={}", groupId);
+ throw new BusinessException(ErrorCodeEnum.GROUP_NOT_FOUND);
+ }
+
+ TenantClusterTagEntity tenantClusterTag =
+ tenantClusterTagMapper.selectByUniqueKey(clusterTag,
groupEntity.getTenant());
+ if (tenantClusterTag == null) {
+ LOGGER.error("tenant cluster not found for tenant={}, cluster={}",
groupEntity.getTenant(), clusterTag);
+ throw new
BusinessException(ErrorCodeEnum.TENANT_CLUSTER_TAG_NOT_FOUND);
+ }
+
+ // check if all sink related sort cluster has the target cluster tag
+ List<StreamSink> sinks =
streamSinkService.listSink(groupEntity.getInlongGroupId(), null);
+ for (StreamSink sink : sinks) {
+ String clusterName = sink.getInlongClusterName();
+ InlongClusterEntity clusterEntity =
+ clusterEntityMapper.selectByNameAndType(clusterName,
+
SinkType.relatedSortClusterType(sink.getSinkType()));
+ if (clusterEntity == null) {
+ String errMsg = String.format("find no cluster with
name=[%s]", clusterName);
+ LOGGER.error(errMsg);
+ throw new BusinessException(ErrorCodeEnum.CLUSTER_NOT_FOUND,
errMsg);
+ }
+
+ Set<String> tags =
ImmutableSet.copyOf(clusterEntity.getClusterTags().split(InlongConstants.COMMA));
+ if (!tags.isEmpty() && !tags.contains(clusterTag)) {
+ String errMsg = String.format("find no cluster tag=[%s] in
cluster name=[%s]", clusterTag, clusterName);
+ LOGGER.error(errMsg);
+ throw new
BusinessException(ErrorCodeEnum.CLUSTER_TAG_NOT_FOUND, errMsg);
+ }
+
+ }
+
+ // config cluster tag and backup_cluster_tag
+ UserInfo userInfo = LoginUserUtils.getLoginUser();
+ InlongGroupRequest request = groupInfo.genRequest();
+ String oldClusterTag = request.getInlongClusterTag();
+ request.setInlongClusterTag(clusterTag);
+ request.getExtList().add(new InlongGroupExtInfo(null, groupId,
BACKUP_CLUSTER_TAG, oldClusterTag));
+ request.getExtList().add(new InlongGroupExtInfo(null, groupId,
BACKUP_MQ_RESOURCE, request.getMqResource()));
+ request.getExtList().add(new InlongGroupExtInfo(null, groupId,
CLUSTER_SWITCH_TIME,
+ LocalDateTime.now().toString()));
+ this.update(request, userInfo.getName());
+
+ // trigger group workflow to rebuild configs
+ this.triggerWorkFlow(groupInfo, userInfo);
+ LOGGER.info("success to switch cluster tag for group={}, target
tag={}", groupId, clusterTag);
Review Comment:
groupId={}
##########
inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/group/InlongGroupServiceImpl.java:
##########
@@ -672,4 +699,150 @@ private void chkUnmodifiableParams(InlongGroupEntity
entity, InlongGroupRequest
String.format("record has expired with record version=%d,
request version=%d",
entity.getVersion(), request.getVersion()));
}
+
+ @Override
+ @Transactional(rollbackFor = Throwable.class, isolation =
Isolation.REPEATABLE_READ, propagation = Propagation.REQUIRES_NEW)
+ public Boolean startTagSwitch(String groupId, String clusterTag) {
+ LOGGER.info("start to switch cluster tag for group={}, target tag={}",
groupId, clusterTag);
+
+ InlongGroupInfo groupInfo = this.get(groupId);
+
+ // check if the group mode is data sync mode
+ if (groupInfo.getInlongGroupMode() == 1) {
+ String errMSg = String.format("no need to switch sync mode group =
{}", groupId);
+ LOGGER.error(errMSg);
+ throw new BusinessException(errMSg);
+ }
+
+ // check if the group is under switching
+ List<InlongGroupExtInfo> groupExt = groupInfo.getExtList();
+ Set<String> keys = groupExt.stream()
+ .map(InlongGroupExtInfo::getKeyName)
+ .collect(Collectors.toSet());
+
+ if (keys.contains(BACKUP_CLUSTER_TAG) ||
keys.contains(BACKUP_MQ_RESOURCE)) {
+ String errMsg = String.format("switch failed, current group is
under switching, group=[%s]", groupId);
+ LOGGER.error(errMsg);
+ throw new BusinessException(errMsg);
+ }
+
+ // check if the cluster tag is under current tenant
+ InlongGroupEntity groupEntity = groupMapper.selectByGroupId(groupId);
+ if (groupEntity == null) {
+ LOGGER.error("inlong group not found by groupId={}", groupId);
+ throw new BusinessException(ErrorCodeEnum.GROUP_NOT_FOUND);
+ }
+
+ TenantClusterTagEntity tenantClusterTag =
+ tenantClusterTagMapper.selectByUniqueKey(clusterTag,
groupEntity.getTenant());
+ if (tenantClusterTag == null) {
+ LOGGER.error("tenant cluster not found for tenant={}, cluster={}",
groupEntity.getTenant(), clusterTag);
+ throw new
BusinessException(ErrorCodeEnum.TENANT_CLUSTER_TAG_NOT_FOUND);
+ }
+
+ // check if all sink related sort cluster has the target cluster tag
+ List<StreamSink> sinks =
streamSinkService.listSink(groupEntity.getInlongGroupId(), null);
+ for (StreamSink sink : sinks) {
+ String clusterName = sink.getInlongClusterName();
+ InlongClusterEntity clusterEntity =
+ clusterEntityMapper.selectByNameAndType(clusterName,
+
SinkType.relatedSortClusterType(sink.getSinkType()));
+ if (clusterEntity == null) {
+ String errMsg = String.format("find no cluster with
name=[%s]", clusterName);
+ LOGGER.error(errMsg);
+ throw new BusinessException(ErrorCodeEnum.CLUSTER_NOT_FOUND,
errMsg);
+ }
+
+ Set<String> tags =
ImmutableSet.copyOf(clusterEntity.getClusterTags().split(InlongConstants.COMMA));
+ if (!tags.isEmpty() && !tags.contains(clusterTag)) {
+ String errMsg = String.format("find no cluster tag=[%s] in
cluster name=[%s]", clusterTag, clusterName);
+ LOGGER.error(errMsg);
+ throw new
BusinessException(ErrorCodeEnum.CLUSTER_TAG_NOT_FOUND, errMsg);
+ }
+
+ }
+
+ // config cluster tag and backup_cluster_tag
+ UserInfo userInfo = LoginUserUtils.getLoginUser();
+ InlongGroupRequest request = groupInfo.genRequest();
+ String oldClusterTag = request.getInlongClusterTag();
+ request.setInlongClusterTag(clusterTag);
+ request.getExtList().add(new InlongGroupExtInfo(null, groupId,
BACKUP_CLUSTER_TAG, oldClusterTag));
+ request.getExtList().add(new InlongGroupExtInfo(null, groupId,
BACKUP_MQ_RESOURCE, request.getMqResource()));
+ request.getExtList().add(new InlongGroupExtInfo(null, groupId,
CLUSTER_SWITCH_TIME,
+ LocalDateTime.now().toString()));
+ this.update(request, userInfo.getName());
+
+ // trigger group workflow to rebuild configs
+ this.triggerWorkFlow(groupInfo, userInfo);
+ LOGGER.info("success to switch cluster tag for group={}, target
tag={}", groupId, clusterTag);
+ return true;
+ }
+
+ @Override
+ @Transactional(rollbackFor = Throwable.class, isolation =
Isolation.REPEATABLE_READ, propagation = Propagation.REQUIRES_NEW)
+ public Boolean finishTagSwitch(String groupId) {
+ LOGGER.info("start to finish switch cluster tag for group={}",
groupId);
+
+ InlongGroupInfo groupInfo = this.get(groupId);
+ UserInfo userInfo = LoginUserUtils.getLoginUser();
+
+ // check whether the current status supports modification
+ GroupStatus curStatus = GroupStatus.forCode(groupInfo.getStatus());
+ if (GroupStatus.notAllowedUpdate(curStatus)) {
+ String errMsg = String.format("Current status=%s is not allowed to
update", curStatus);
+ LOGGER.error(errMsg);
+ throw new
BusinessException(ErrorCodeEnum.GROUP_UPDATE_NOT_ALLOWED, errMsg);
+ }
+
+ // check if the group is under switching
+ List<InlongGroupExtInfo> groupExt = groupInfo.getExtList();
+ Map<String, InlongGroupExtInfo> extInfoMap = groupExt.stream()
+ .collect(Collectors.toMap(InlongGroupExtInfo::getKeyName, v ->
v));
+
+ if (!extInfoMap.containsKey(BACKUP_CLUSTER_TAG) ||
!extInfoMap.containsKey(BACKUP_MQ_RESOURCE)) {
+ String errMsg = String.format("finish switch failed, current group
is not under switching, group=[%s]",
Review Comment:
Ditto.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]