EMsnap commented on code in PR #5090:
URL: https://github.com/apache/inlong/pull/5090#discussion_r922942575
##########
inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/SortSourceServiceImpl.java:
##########
@@ -202,25 +202,43 @@ private void reloadAllSourceConfig() {
.filter(cluster ->
SUPPORTED_MQ_TYPE.contains(cluster.getType()))
.collect(Collectors.groupingBy(SortSourceClusterInfo::getClusterTags));
+ // group clusters by name.
+ Map<String, SortSourceClusterInfo> name2ClusterInfos =
clusterInfos.stream()
+ .collect(Collectors.toMap(SortSourceClusterInfo::getName, info
-> info, (g1, g2) -> g1));
+
// Prepare CacheZones for each cluster and task
Map<String, Map<String, String>> newMd5Map = new ConcurrentHashMap<>();
Map<String, Map<String, CacheZoneConfig>> newConfigMap = new
ConcurrentHashMap<>();
- groupMap.forEach((cluster, task2Group) -> {
-
+ groupMap.forEach((clusterName, task2Group) -> {
+
+ // if there is no matched cluster name, just skip
+ if (!name2ClusterInfos.containsKey(clusterName)) {
+ return;
+ }
+ // find valid mq cluster list
+ String clusterTag =
name2ClusterInfos.get(clusterName).getClusterTags();
+ final Map<String, List<SortSourceClusterInfo>> validClusterInfos =
new ConcurrentHashMap<>();
+ if (allTag2ClusterInfos.containsKey(clusterTag)) {
+ validClusterInfos.put(clusterTag,
allTag2ClusterInfos.get(clusterTag));
+ } else {
+ validClusterInfos.putAll(allTag2ClusterInfos);
+ }
+
+ // prepare
Review Comment:
pls make comment more clear
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]