vernedeng commented on code in PR #9353:
URL: https://github.com/apache/inlong/pull/9353#discussion_r1411494479
##########
inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/group/InlongGroupServiceImpl.java:
##########
@@ -672,4 +697,134 @@ private void chkUnmodifiableParams(InlongGroupEntity
entity, InlongGroupRequest
String.format("record has expired with record version=%d,
request version=%d",
entity.getVersion(), request.getVersion()));
}
+
+ @Override
+ @Transactional(rollbackFor = Throwable.class, isolation =
Isolation.REPEATABLE_READ, propagation = Propagation.REQUIRES_NEW)
+ public Boolean startTagSwitch(String groupId, String clusterTag) {
+ LOGGER.info("start to switch cluster tag for group={}, target tag={}",
groupId, clusterTag);
+
+ InlongGroupInfo groupInfo = this.get(groupId);
+
+ // check if the group is under switching
+ List<InlongGroupExtInfo> groupExt = groupInfo.getExtList();
+ Set<String> keys = groupExt.stream()
+ .map(InlongGroupExtInfo::getKeyName)
+ .collect(Collectors.toSet());
+
+ if (keys.contains(BACKUP_CLUSTER_TAG) ||
keys.contains(BACKUP_MQ_RESOURCE)) {
+ String errMsg = String.format("switch failed, current group is
under switching, group=[%s]", groupId);
+ LOGGER.error(errMsg);
+ throw new BusinessException(errMsg);
+ }
+
+ // check if the cluster tag is under current tenant
+ InlongGroupEntity groupEntity = groupMapper.selectByGroupId(groupId);
+ if (groupEntity == null) {
+ LOGGER.error("inlong group not found by groupId={}", groupId);
+ throw new BusinessException(ErrorCodeEnum.GROUP_NOT_FOUND);
+ }
+
+ TenantClusterTagEntity tenantClusterTag =
+ tenantClusterTagMapper.selectByUniqueKey(clusterTag,
groupEntity.getTenant());
+ if (tenantClusterTag == null) {
+ LOGGER.error("tenant cluster not found for tenant={}, cluster={}",
groupEntity.getTenant(), clusterTag);
+ throw new
BusinessException(ErrorCodeEnum.TENANT_CLUSTER_TAG_NOT_FOUND);
+ }
+
+ // check if all sink related sort cluster has the target cluster tag
+ List<StreamSink> sinks =
streamSinkService.listSink(groupEntity.getInlongGroupId(), null);
+ for (StreamSink sink : sinks) {
+ String clusterName = sink.getInlongClusterName();
+ InlongClusterEntity clusterEntity =
+ clusterEntityMapper.selectByNameAndType(clusterName, null);
+ if (clusterEntity == null) {
+ String errMsg = String.format("find no cluster with
name=[%s]", clusterName);
+ LOGGER.error(errMsg);
+ throw new BusinessException(ErrorCodeEnum.CLUSTER_NOT_FOUND,
errMsg);
+ }
+
+ Set<String> tags =
ImmutableSet.copyOf(clusterEntity.getClusterTags().split(InlongConstants.COMMA));
+ if (!tags.isEmpty() && !tags.contains(clusterTag)) {
+ String errMsg = String.format("find no cluster tag=[%s] in
cluster name=[%s]", clusterTag, clusterName);
+ LOGGER.error(errMsg);
+ throw new
BusinessException(ErrorCodeEnum.CLUSTER_TAG_NOT_FOUND, errMsg);
+ }
+
+ }
+
+ // config cluster tag and backup_cluster_tag
+ UserInfo userInfo = LoginUserUtils.getLoginUser();
+ InlongGroupRequest request = groupInfo.genRequest();
+ String oldClusterTag = request.getInlongClusterTag();
+ request.setInlongClusterTag(clusterTag);
+ request.getExtList().add(new InlongGroupExtInfo(null, groupId,
BACKUP_CLUSTER_TAG, oldClusterTag));
+ request.getExtList().add(new InlongGroupExtInfo(null, groupId,
BACKUP_MQ_RESOURCE, request.getMqResource()));
+ request.getExtList().add(new InlongGroupExtInfo(null, groupId,
CLUSTER_SWITCH_TIME,
+ LocalDateTime.now().toString()));
+ this.update(request, userInfo.getName());
+
+ // trigger group workflow to rebuild configs
+ this.triggerWorkFlow(groupInfo, userInfo);
+ LOGGER.info("success to switch cluster tag for group={}, target
tag={}", groupId, clusterTag);
+ return true;
+ }
+
+ @Override
+ @Transactional(rollbackFor = Throwable.class, isolation =
Isolation.REPEATABLE_READ, propagation = Propagation.REQUIRES_NEW)
+ public Boolean finishTagSwitch(String groupId) {
+ LOGGER.info("start to finish switch cluster tag for group={}",
groupId);
+
+ InlongGroupInfo groupInfo = this.get(groupId);
Review Comment:
fixed, thx
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]