vernedeng commented on code in PR #9353:
URL: https://github.com/apache/inlong/pull/9353#discussion_r1413793715


##########
inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/group/InlongGroupServiceImpl.java:
##########
@@ -672,4 +699,150 @@ private void chkUnmodifiableParams(InlongGroupEntity 
entity, InlongGroupRequest
                 String.format("record has expired with record version=%d, 
request version=%d",
                         entity.getVersion(), request.getVersion()));
     }
+
+    @Override
+    @Transactional(rollbackFor = Throwable.class, isolation = 
Isolation.REPEATABLE_READ, propagation = Propagation.REQUIRES_NEW)
+    public Boolean startTagSwitch(String groupId, String clusterTag) {
+        LOGGER.info("start to switch cluster tag for group={}, target tag={}", 
groupId, clusterTag);
+
+        InlongGroupInfo groupInfo = this.get(groupId);
+
+        // check if the group mode is data sync mode
+        if (groupInfo.getInlongGroupMode() == 1) {
+            String errMSg = String.format("no need to switch sync mode group = 
{}", groupId);
+            LOGGER.error(errMSg);
+            throw new BusinessException(errMSg);
+        }
+
+        // check if the group is under switching
+        List<InlongGroupExtInfo> groupExt = groupInfo.getExtList();
+        Set<String> keys = groupExt.stream()
+                .map(InlongGroupExtInfo::getKeyName)
+                .collect(Collectors.toSet());
+
+        if (keys.contains(BACKUP_CLUSTER_TAG) || 
keys.contains(BACKUP_MQ_RESOURCE)) {
+            String errMsg = String.format("switch failed, current group is 
under switching, group=[%s]", groupId);
+            LOGGER.error(errMsg);
+            throw new BusinessException(errMsg);
+        }
+
+        // check if the cluster tag is under current tenant
+        InlongGroupEntity groupEntity = groupMapper.selectByGroupId(groupId);
+        if (groupEntity == null) {
+            LOGGER.error("inlong group not found by groupId={}", groupId);
+            throw new BusinessException(ErrorCodeEnum.GROUP_NOT_FOUND);
+        }
+
+        TenantClusterTagEntity tenantClusterTag =
+                tenantClusterTagMapper.selectByUniqueKey(clusterTag, 
groupEntity.getTenant());
+        if (tenantClusterTag == null) {
+            LOGGER.error("tenant cluster not found for tenant={}, cluster={}", 
groupEntity.getTenant(), clusterTag);
+            throw new 
BusinessException(ErrorCodeEnum.TENANT_CLUSTER_TAG_NOT_FOUND);
+        }
+
+        // check if all sink related sort cluster has the target cluster tag
+        List<StreamSink> sinks = 
streamSinkService.listSink(groupEntity.getInlongGroupId(), null);
+        for (StreamSink sink : sinks) {
+            String clusterName = sink.getInlongClusterName();
+            InlongClusterEntity clusterEntity =
+                    clusterEntityMapper.selectByNameAndType(clusterName,

Review Comment:
   fixed, thx



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to