This is an automated email from the ASF dual-hosted git repository.

healchow pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git


The following commit(s) were added to refs/heads/master by this push:
     new 3fa0cd9b8 [INLONG-5088][Manager] Support only consumes the MQ cluster 
with the same tag (#5090)
3fa0cd9b8 is described below

commit 3fa0cd9b8fe8284a58f6fb3fef59132f91d7c23a
Author: vernedeng <[email protected]>
AuthorDate: Mon Jul 18 14:20:23 2022 +0800

    [INLONG-5088][Manager] Support only consumes the MQ cluster with the same 
tag (#5090)
---
 .../service/core/impl/SortSourceServiceImpl.java   | 32 +++++++++++++++++-----
 1 file changed, 25 insertions(+), 7 deletions(-)

diff --git 
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/SortSourceServiceImpl.java
 
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/SortSourceServiceImpl.java
index 4475910aa..123cc47ad 100644
--- 
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/SortSourceServiceImpl.java
+++ 
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/SortSourceServiceImpl.java
@@ -202,25 +202,43 @@ public class SortSourceServiceImpl implements 
SortSourceService {
                 .filter(cluster -> 
SUPPORTED_MQ_TYPE.contains(cluster.getType()))
                 
.collect(Collectors.groupingBy(SortSourceClusterInfo::getClusterTags));
 
+        // group clusters by name.
+        Map<String, SortSourceClusterInfo> name2ClusterInfos = 
clusterInfos.stream()
+                .collect(Collectors.toMap(SortSourceClusterInfo::getName, info 
-> info, (g1, g2) -> g1));
+
         // Prepare CacheZones for each cluster and task
         Map<String, Map<String, String>> newMd5Map = new ConcurrentHashMap<>();
         Map<String, Map<String, CacheZoneConfig>> newConfigMap = new 
ConcurrentHashMap<>();
-        groupMap.forEach((cluster, task2Group) -> {
-
+        groupMap.forEach((clusterName, task2Group) -> {
+
+            // if there is no matched cluster name, just skip
+            if (!name2ClusterInfos.containsKey(clusterName)) {
+                return;
+            }
+            // find valid mq cluster list
+            String clusterTag = 
name2ClusterInfos.get(clusterName).getClusterTags();
+            final Map<String, List<SortSourceClusterInfo>> validClusterInfos = 
new ConcurrentHashMap<>();
+            if (allTag2ClusterInfos.containsKey(clusterTag)) {
+                validClusterInfos.put(clusterTag, 
allTag2ClusterInfos.get(clusterTag));
+            } else {
+                validClusterInfos.putAll(allTag2ClusterInfos);
+            }
+
+            // prepare the new config and md5
             Map<String, CacheZoneConfig> task2Config = new 
ConcurrentHashMap<>();
             Map<String, String> task2Md5 = new ConcurrentHashMap<>();
 
             task2Group.forEach((task, groupList) -> {
                 Map<String, CacheZone> cacheZones;
                 try {
-                    cacheZones = this.getCacheZones(groupList, 
allId2GroupInfos, allTag2ClusterInfos);
+                    cacheZones = this.getCacheZones(groupList, 
allId2GroupInfos, validClusterInfos);
                 } catch (Throwable t) {
-                    LOGGER.error("fail to get cacheZones of cluster {}, task 
{}", cluster, task);
+                    LOGGER.error("fail to get cacheZones of clusterName {}, 
task {}", clusterName, task);
                     return;
                 }
                 CacheZoneConfig config = CacheZoneConfig.builder()
                         .cacheZones(cacheZones)
-                        .sortClusterName(cluster)
+                        .sortClusterName(clusterName)
                         .sortTaskId(task)
                         .build();
                 String jsonStr = GSON.toJson(config);
@@ -229,8 +247,8 @@ public class SortSourceServiceImpl implements 
SortSourceService {
                 task2Md5.put(task, md5);
             });
 
-            newConfigMap.put(cluster, task2Config);
-            newMd5Map.put(cluster, task2Md5);
+            newConfigMap.put(clusterName, task2Config);
+            newMd5Map.put(clusterName, task2Md5);
         });
 
         sortSourceConfigMap = newConfigMap;

Reply via email to