fuweng11 commented on code in PR #9353:
URL: https://github.com/apache/inlong/pull/9353#discussion_r1408668116


##########
inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/group/InlongGroupServiceImpl.java:
##########
@@ -672,4 +697,134 @@ private void chkUnmodifiableParams(InlongGroupEntity 
entity, InlongGroupRequest
                 String.format("record has expired with record version=%d, 
request version=%d",
                         entity.getVersion(), request.getVersion()));
     }
+
+    @Override
+    @Transactional(rollbackFor = Throwable.class, isolation = 
Isolation.REPEATABLE_READ, propagation = Propagation.REQUIRES_NEW)
+    public Boolean startTagSwitch(String groupId, String clusterTag) {
+        LOGGER.info("start to switch cluster tag for group={}, target tag={}", 
groupId, clusterTag);
+
+        InlongGroupInfo groupInfo = this.get(groupId);
+
+        // check if the group is under switching
+        List<InlongGroupExtInfo> groupExt = groupInfo.getExtList();
+        Set<String> keys = groupExt.stream()
+                .map(InlongGroupExtInfo::getKeyName)
+                .collect(Collectors.toSet());
+
+        if (keys.contains(BACKUP_CLUSTER_TAG) || 
keys.contains(BACKUP_MQ_RESOURCE)) {
+            String errMsg = String.format("switch failed, current group is 
under switching, group=[%s]", groupId);
+            LOGGER.error(errMsg);
+            throw new BusinessException(errMsg);
+        }
+
+        // check if the cluster tag is under current tenant
+        InlongGroupEntity groupEntity = groupMapper.selectByGroupId(groupId);
+        if (groupEntity == null) {
+            LOGGER.error("inlong group not found by groupId={}", groupId);
+            throw new BusinessException(ErrorCodeEnum.GROUP_NOT_FOUND);
+        }
+
+        TenantClusterTagEntity tenantClusterTag =
+                tenantClusterTagMapper.selectByUniqueKey(clusterTag, 
groupEntity.getTenant());
+        if (tenantClusterTag == null) {
+            LOGGER.error("tenant cluster not found for tenant={}, cluster={}", 
groupEntity.getTenant(), clusterTag);
+            throw new 
BusinessException(ErrorCodeEnum.TENANT_CLUSTER_TAG_NOT_FOUND);
+        }
+
+        // check if all sink related sort cluster has the target cluster tag
+        List<StreamSink> sinks = 
streamSinkService.listSink(groupEntity.getInlongGroupId(), null);
+        for (StreamSink sink : sinks) {
+            String clusterName = sink.getInlongClusterName();
+            InlongClusterEntity clusterEntity =
+                    clusterEntityMapper.selectByNameAndType(clusterName, null);
+            if (clusterEntity == null) {
+                String errMsg = String.format("find no cluster with 
name=[%s]", clusterName);
+                LOGGER.error(errMsg);
+                throw new BusinessException(ErrorCodeEnum.CLUSTER_NOT_FOUND, 
errMsg);
+            }
+
+            Set<String> tags = 
ImmutableSet.copyOf(clusterEntity.getClusterTags().split(InlongConstants.COMMA));
+            if (!tags.isEmpty() && !tags.contains(clusterTag)) {
+                String errMsg = String.format("find no cluster tag=[%s] in 
cluster name=[%s]", clusterTag, clusterName);
+                LOGGER.error(errMsg);
+                throw new 
BusinessException(ErrorCodeEnum.CLUSTER_TAG_NOT_FOUND, errMsg);
+            }
+
+        }
+
+        // config cluster tag and backup_cluster_tag
+        UserInfo userInfo = LoginUserUtils.getLoginUser();
+        InlongGroupRequest request = groupInfo.genRequest();
+        String oldClusterTag = request.getInlongClusterTag();
+        request.setInlongClusterTag(clusterTag);
+        request.getExtList().add(new InlongGroupExtInfo(null, groupId, 
BACKUP_CLUSTER_TAG, oldClusterTag));
+        request.getExtList().add(new InlongGroupExtInfo(null, groupId, 
BACKUP_MQ_RESOURCE, request.getMqResource()));
+        request.getExtList().add(new InlongGroupExtInfo(null, groupId, 
CLUSTER_SWITCH_TIME,
+                LocalDateTime.now().toString()));
+        this.update(request, userInfo.getName());
+
+        // trigger group workflow to rebuild configs
+        this.triggerWorkFlow(groupInfo, userInfo);
+        LOGGER.info("success to switch cluster tag for group={}, target 
tag={}", groupId, clusterTag);
+        return true;
+    }
+
+    @Override
+    @Transactional(rollbackFor = Throwable.class, isolation = 
Isolation.REPEATABLE_READ, propagation = Propagation.REQUIRES_NEW)
+    public Boolean finishTagSwitch(String groupId) {
+        LOGGER.info("start to finish switch cluster tag for group={}", 
groupId);
+
+        InlongGroupInfo groupInfo = this.get(groupId);

Review Comment:
   Please add a status check for the group to ensure that the switch has been 
completed.



##########
inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/group/InlongGroupServiceImpl.java:
##########
@@ -672,4 +697,134 @@ private void chkUnmodifiableParams(InlongGroupEntity 
entity, InlongGroupRequest
                 String.format("record has expired with record version=%d, 
request version=%d",
                         entity.getVersion(), request.getVersion()));
     }
+
+    @Override
+    @Transactional(rollbackFor = Throwable.class, isolation = 
Isolation.REPEATABLE_READ, propagation = Propagation.REQUIRES_NEW)
+    public Boolean startTagSwitch(String groupId, String clusterTag) {
+        LOGGER.info("start to switch cluster tag for group={}, target tag={}", 
groupId, clusterTag);
+
+        InlongGroupInfo groupInfo = this.get(groupId);
+
+        // check if the group is under switching
+        List<InlongGroupExtInfo> groupExt = groupInfo.getExtList();
+        Set<String> keys = groupExt.stream()
+                .map(InlongGroupExtInfo::getKeyName)
+                .collect(Collectors.toSet());
+
+        if (keys.contains(BACKUP_CLUSTER_TAG) || 
keys.contains(BACKUP_MQ_RESOURCE)) {
+            String errMsg = String.format("switch failed, current group is 
under switching, group=[%s]", groupId);
+            LOGGER.error(errMsg);
+            throw new BusinessException(errMsg);
+        }
+
+        // check if the cluster tag is under current tenant
+        InlongGroupEntity groupEntity = groupMapper.selectByGroupId(groupId);
+        if (groupEntity == null) {
+            LOGGER.error("inlong group not found by groupId={}", groupId);
+            throw new BusinessException(ErrorCodeEnum.GROUP_NOT_FOUND);
+        }
+
+        TenantClusterTagEntity tenantClusterTag =
+                tenantClusterTagMapper.selectByUniqueKey(clusterTag, 
groupEntity.getTenant());
+        if (tenantClusterTag == null) {
+            LOGGER.error("tenant cluster not found for tenant={}, cluster={}", 
groupEntity.getTenant(), clusterTag);
+            throw new 
BusinessException(ErrorCodeEnum.TENANT_CLUSTER_TAG_NOT_FOUND);
+        }
+
+        // check if all sink related sort cluster has the target cluster tag
+        List<StreamSink> sinks = 
streamSinkService.listSink(groupEntity.getInlongGroupId(), null);
+        for (StreamSink sink : sinks) {
+            String clusterName = sink.getInlongClusterName();
+            InlongClusterEntity clusterEntity =
+                    clusterEntityMapper.selectByNameAndType(clusterName, null);

Review Comment:
    Suggest use` clusterEntityMapper.selectByKey`.



##########
inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/group/InlongGroupServiceImpl.java:
##########
@@ -672,4 +697,134 @@ private void chkUnmodifiableParams(InlongGroupEntity 
entity, InlongGroupRequest
                 String.format("record has expired with record version=%d, 
request version=%d",
                         entity.getVersion(), request.getVersion()));
     }
+
+    @Override
+    @Transactional(rollbackFor = Throwable.class, isolation = 
Isolation.REPEATABLE_READ, propagation = Propagation.REQUIRES_NEW)
+    public Boolean startTagSwitch(String groupId, String clusterTag) {
+        LOGGER.info("start to switch cluster tag for group={}, target tag={}", 
groupId, clusterTag);
+
+        InlongGroupInfo groupInfo = this.get(groupId);
+
+        // check if the group is under switching
+        List<InlongGroupExtInfo> groupExt = groupInfo.getExtList();
+        Set<String> keys = groupExt.stream()
+                .map(InlongGroupExtInfo::getKeyName)
+                .collect(Collectors.toSet());
+
+        if (keys.contains(BACKUP_CLUSTER_TAG) || 
keys.contains(BACKUP_MQ_RESOURCE)) {
+            String errMsg = String.format("switch failed, current group is 
under switching, group=[%s]", groupId);
+            LOGGER.error(errMsg);
+            throw new BusinessException(errMsg);
+        }
+
+        // check if the cluster tag is under current tenant
+        InlongGroupEntity groupEntity = groupMapper.selectByGroupId(groupId);
+        if (groupEntity == null) {
+            LOGGER.error("inlong group not found by groupId={}", groupId);
+            throw new BusinessException(ErrorCodeEnum.GROUP_NOT_FOUND);
+        }
+
+        TenantClusterTagEntity tenantClusterTag =
+                tenantClusterTagMapper.selectByUniqueKey(clusterTag, 
groupEntity.getTenant());
+        if (tenantClusterTag == null) {
+            LOGGER.error("tenant cluster not found for tenant={}, cluster={}", 
groupEntity.getTenant(), clusterTag);
+            throw new 
BusinessException(ErrorCodeEnum.TENANT_CLUSTER_TAG_NOT_FOUND);
+        }
+
+        // check if all sink related sort cluster has the target cluster tag
+        List<StreamSink> sinks = 
streamSinkService.listSink(groupEntity.getInlongGroupId(), null);
+        for (StreamSink sink : sinks) {
+            String clusterName = sink.getInlongClusterName();
+            InlongClusterEntity clusterEntity =
+                    clusterEntityMapper.selectByNameAndType(clusterName, null);
+            if (clusterEntity == null) {
+                String errMsg = String.format("find no cluster with 
name=[%s]", clusterName);
+                LOGGER.error(errMsg);
+                throw new BusinessException(ErrorCodeEnum.CLUSTER_NOT_FOUND, 
errMsg);
+            }
+
+            Set<String> tags = 
ImmutableSet.copyOf(clusterEntity.getClusterTags().split(InlongConstants.COMMA));
+            if (!tags.isEmpty() && !tags.contains(clusterTag)) {
+                String errMsg = String.format("find no cluster tag=[%s] in 
cluster name=[%s]", clusterTag, clusterName);
+                LOGGER.error(errMsg);
+                throw new 
BusinessException(ErrorCodeEnum.CLUSTER_TAG_NOT_FOUND, errMsg);
+            }
+
+        }
+
+        // config cluster tag and backup_cluster_tag
+        UserInfo userInfo = LoginUserUtils.getLoginUser();
+        InlongGroupRequest request = groupInfo.genRequest();
+        String oldClusterTag = request.getInlongClusterTag();
+        request.setInlongClusterTag(clusterTag);
+        request.getExtList().add(new InlongGroupExtInfo(null, groupId, 
BACKUP_CLUSTER_TAG, oldClusterTag));
+        request.getExtList().add(new InlongGroupExtInfo(null, groupId, 
BACKUP_MQ_RESOURCE, request.getMqResource()));
+        request.getExtList().add(new InlongGroupExtInfo(null, groupId, 
CLUSTER_SWITCH_TIME,
+                LocalDateTime.now().toString()));
+        this.update(request, userInfo.getName());
+
+        // trigger group workflow to rebuild configs
+        this.triggerWorkFlow(groupInfo, userInfo);
+        LOGGER.info("success to switch cluster tag for group={}, target 
tag={}", groupId, clusterTag);
+        return true;
+    }
+
+    @Override
+    @Transactional(rollbackFor = Throwable.class, isolation = 
Isolation.REPEATABLE_READ, propagation = Propagation.REQUIRES_NEW)
+    public Boolean finishTagSwitch(String groupId) {
+        LOGGER.info("start to finish switch cluster tag for group={}", 
groupId);

Review Comment:
   finish?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to