This is an automated email from the ASF dual-hosted git repository.

dockerzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git


The following commit(s) were added to refs/heads/master by this push:
     new 55ae3f850e [INLONG-9222][Manager] SortStandalone configuration support 
multiple tags (#9637)
55ae3f850e is described below

commit 55ae3f850e8d1f57d6806d262d0a59c7a43df8b9
Author: vernedeng <[email protected]>
AuthorDate: Wed Jan 31 21:10:09 2024 +0800

    [INLONG-9222][Manager] SortStandalone configuration support multiple tags 
(#9637)
---
 .../sort/standalone/SortSourceClusterInfo.java     | 28 ++++++++++++++++++++++
 .../service/core/impl/SortSourceServiceImpl.java   | 22 +++++++++++------
 2 files changed, 43 insertions(+), 7 deletions(-)

diff --git 
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sort/standalone/SortSourceClusterInfo.java
 
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sort/standalone/SortSourceClusterInfo.java
index 59e7d35386..8e85fd8287 100644
--- 
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sort/standalone/SortSourceClusterInfo.java
+++ 
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sort/standalone/SortSourceClusterInfo.java
@@ -17,7 +17,10 @@
 
 package org.apache.inlong.manager.pojo.sort.standalone;
 
+import org.apache.inlong.manager.common.consts.InlongConstants;
+
 import com.google.common.base.Splitter;
+import com.google.common.collect.ImmutableSet;
 import com.google.gson.Gson;
 import com.google.gson.JsonElement;
 import com.google.gson.JsonObject;
@@ -25,9 +28,12 @@ import lombok.Data;
 import org.apache.commons.lang3.StringUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
+import org.springframework.util.CollectionUtils;
 
 import java.util.HashMap;
 import java.util.Map;
+import java.util.Objects;
+import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
 
 @Data
@@ -46,9 +52,17 @@ public class SortSourceClusterInfo {
     String clusterTags;
     String extTag;
     String extParams;
+    Set<String> clusterTagsSet;
     Map<String, String> extTagMap = new ConcurrentHashMap<>();
     Map<String, String> extParamsMap = new ConcurrentHashMap<>();
 
+    public Set<String> getClusterTagsSet() {
+        if (CollectionUtils.isEmpty(clusterTagsSet) && 
StringUtils.isNotBlank(clusterTags)) {
+            clusterTagsSet = 
ImmutableSet.copyOf(clusterTags.split(InlongConstants.COMMA));
+        }
+        return clusterTagsSet;
+    }
+
     public Map<String, String> getExtParamsMap() {
         if (extParamsMap.isEmpty() && extParams != null) {
             try {
@@ -84,4 +98,18 @@ public class SortSourceClusterInfo {
         String isConsumable = this.getExtTagMap().get(KEY_IS_CONSUMABLE);
         return isConsumable == null || "true".equalsIgnoreCase(isConsumable);
     }
+
+    @Override
+    public boolean equals(Object o) {
+        if (!(o instanceof SortSourceClusterInfo)) {
+            return false;
+        }
+        SortSourceClusterInfo other = (SortSourceClusterInfo) o;
+        return Objects.equals(this.name, other.name)
+                && Objects.equals(this.clusterTags, other.clusterTags)
+                && Objects.equals(this.type, other.type)
+                && Objects.equals(this.extParams, other.extParams)
+                && Objects.equals(this.extTag, other.extTag);
+
+    }
 }
diff --git 
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/SortSourceServiceImpl.java
 
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/SortSourceServiceImpl.java
index cafe428f37..43e07f985c 100644
--- 
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/SortSourceServiceImpl.java
+++ 
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/SortSourceServiceImpl.java
@@ -37,6 +37,7 @@ import 
org.apache.inlong.manager.service.core.SortSourceService;
 
 import com.google.gson.Gson;
 import org.apache.commons.codec.digest.DigestUtils;
+import org.apache.commons.collections.CollectionUtils;
 import org.apache.commons.lang3.StringUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -197,10 +198,17 @@ public class SortSourceServiceImpl implements 
SortSourceService {
                 .collect(Collectors.toMap(SortSourceClusterInfo::getName, v -> 
v));
 
         // group mq clusters by cluster tag
-        mqClusters = allClusters.stream()
+        mqClusters = new HashMap<>();
+        allClusters.stream()
                 .filter(cluster -> 
SUPPORTED_MQ_TYPE.contains(cluster.getType()))
                 .filter(SortSourceClusterInfo::isConsumable)
-                
.collect(Collectors.groupingBy(SortSourceClusterInfo::getClusterTags));
+                .forEach(mq -> {
+                    Set<String> tags = mq.getClusterTagsSet();
+                    tags.forEach(tag -> {
+                        List<SortSourceClusterInfo> list = 
mqClusters.computeIfAbsent(tag, k -> new ArrayList<>());
+                        list.add(mq);
+                    });
+                });
 
         // reload all stream sinks, to Map<clusterName, Map<taskName, 
List<groupId>>> format
         List<SortSourceStreamSinkInfo> allStreamSinks = 
configLoader.loadAllStreamSinks();
@@ -302,7 +310,7 @@ public class SortSourceServiceImpl implements 
SortSourceService {
             List<SortSourceStreamSinkInfo> sinkList) {
 
         Preconditions.expectNotNull(sortClusters.get(clusterName), "sort 
cluster should not be NULL");
-        String sortClusterTag = sortClusters.get(clusterName).getClusterTags();
+        Set<String> tags = sortClusters.get(clusterName).getClusterTagsSet();
 
         // get group infos
         List<SortSourceStreamSinkInfo> sinkInfoList = sinkList.stream()
@@ -315,10 +323,10 @@ public class SortSourceServiceImpl implements 
SortSourceService {
         Map<String, List<SortSourceStreamSinkInfo>> tag2SinkInfos = 
sinkInfoList.stream()
                 .filter(sink -> 
Objects.nonNull(groupInfos.get(sink.getGroupId())))
                 .filter(sink -> {
-                    if (StringUtils.isBlank(sortClusterTag)) {
+                    if (CollectionUtils.isEmpty(tags)) {
                         return true;
                     }
-                    return 
sortClusterTag.equals(groupInfos.get(sink.getGroupId()).getClusterTag());
+                    return 
tags.contains(groupInfos.get(sink.getGroupId()).getClusterTag());
                 })
                 .collect(Collectors.groupingBy(sink -> {
                     SortSourceGroupInfo groupInfo = 
groupInfos.get(sink.getGroupId());
@@ -329,10 +337,10 @@ public class SortSourceServiceImpl implements 
SortSourceService {
         Map<String, List<SortSourceStreamSinkInfo>> backupTag2SinkInfos = 
sinkInfoList.stream()
                 .filter(sink -> 
backupClusterTag.containsKey(sink.getGroupId()))
                 .filter(sink -> {
-                    if (StringUtils.isBlank(sortClusterTag)) {
+                    if (CollectionUtils.isEmpty(tags)) {
                         return true;
                     }
-                    return 
sortClusterTag.equals(backupClusterTag.get(sink.getGroupId()));
+                    return 
tags.contains(backupClusterTag.get(sink.getGroupId()));
                 })
                 .collect(Collectors.groupingBy(info -> 
backupClusterTag.get(info.getGroupId())));
 

Reply via email to