vernedeng commented on code in PR #6275:
URL: https://github.com/apache/inlong/pull/6275#discussion_r1005157552


##########
inlong-manager/manager-dao/src/main/resources/mappers/StreamSinkEntityMapper.xml:
##########
@@ -350,6 +350,13 @@
             </if>
         </where>
     </select>
+    <select id="selectAllStreamSinks" 
resultType="org.apache.inlong.manager.dao.entity.StreamSinkEntity">
+        select
+        <include refid="Base_Column_List"/>
+        from stream_sink
+        where is_deleted = 0
+        group by inlong_cluster_name, sort_task_name, sort_consumer_group, 
sink_type, data_node_name

Review Comment:
   fixed, thx



##########
inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/SortSourceServiceImpl.java:
##########
@@ -177,6 +194,156 @@ public SortSourceConfigResponse getSourceConfig(
 
     }
 
+    private void reloadAllSourceConfigV2() {
+        List<StreamSinkEntity> sinkEntities = 
streamSinkEntityMapper.selectAllStreamSinks();
+        // convert to Map<clusterName, Map<taskName, List<groupId>>> format.
+        Map<String, Map<String, List<String>>> groupMap = new 
ConcurrentHashMap<>();
+        sinkEntities.forEach(stream -> {
+            Map<String, List<String>> task2groupsMap =
+                    groupMap.computeIfAbsent(stream.getInlongClusterName(), k 
-> new ConcurrentHashMap<>());
+            List<String> groupIdList =
+                    task2groupsMap.computeIfAbsent(stream.getSortTaskName(), k 
-> new ArrayList<>());
+            groupIdList.add(stream.getInlongGroupId());
+        });
+
+        // get all group topic info by groupId
+        Map<String, List<InlongGroupTopicInfo>> allGroupTopicInfo = 
sinkEntities.stream()
+                .flatMap(entity -> {
+                    InlongGroupTopicInfo topicInfo = 
groupService.getTopic(entity.getInlongGroupId());
+                    InlongGroupTopicInfo backupTopicInfo = 
groupService.getBackupTopic(entity.getInlongGroupId());
+                    return Stream.of(topicInfo, backupTopicInfo);
+                })
+                .filter(Objects::nonNull)
+                
.collect(Collectors.groupingBy(InlongGroupTopicInfo::getInlongGroupId));
+
+        // Prepare CacheZones for each cluster and task
+        Map<String, Map<String, String>> newMd5Map = new ConcurrentHashMap<>();
+        Map<String, Map<String, CacheZoneConfig>> newConfigMap = new 
ConcurrentHashMap<>();
+        groupMap.forEach((clusterName, task2Group) -> {
+
+            // prepare the new config and md5
+            Map<String, CacheZoneConfig> task2Config = new 
ConcurrentHashMap<>();
+            Map<String, String> task2Md5 = new ConcurrentHashMap<>();
+
+            task2Group.forEach((taskName, groupList) -> {
+                CacheZoneConfig cacheZoneConfig =
+                        CacheZoneConfig.builder()
+                                .sortClusterName(clusterName)
+                                .sortTaskId(taskName)
+                                .build();
+                List<InlongGroupTopicInfo> relatedGroupInfos = 
groupList.stream()
+                        .flatMap(groupId -> 
allGroupTopicInfo.get(groupId).stream())
+                        .collect(Collectors.toList());
+                Map<String, CacheZone> cacheZoneMap = 
this.parsePulsarCacheZones(relatedGroupInfos);
+                cacheZoneConfig.setCacheZones(cacheZoneMap);
+
+                String jsonStr = GSON.toJson(cacheZoneConfig);
+                String md5 = DigestUtils.md5Hex(jsonStr);
+                task2Config.put(taskName, cacheZoneConfig);
+                task2Md5.put(taskName, md5);
+            });
+
+            newConfigMap.put(clusterName, task2Config);
+            newMd5Map.put(clusterName, task2Md5);
+        });
+
+        sortSourceConfigMap = newConfigMap;
+        sortSourceMd5Map = newMd5Map;
+    }
+
+    private Map<String, CacheZone> 
parsePulsarCacheZones(List<InlongGroupTopicInfo> groupTopicInfos) {
+        Map<String, CacheZone> cacheZoneMap = new HashMap<>();
+        groupTopicInfos.forEach(info -> this.parseCacheZones(info, 
cacheZoneMap));
+        return cacheZoneMap;
+    }
+
+    private void parseCacheZones(InlongGroupTopicInfo info, Map<String, 
CacheZone> cacheZoneMap) {
+        switch (info.getMqType().toUpperCase()) {

Review Comment:
   fixed, thx



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to