healchow commented on code in PR #6275:
URL: https://github.com/apache/inlong/pull/6275#discussion_r1004222088
##########
inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/SortSourceServiceImpl.java:
##########
@@ -177,6 +194,156 @@ public SortSourceConfigResponse getSourceConfig(
}
+ private void reloadAllSourceConfigV2() {
+ List<StreamSinkEntity> sinkEntities =
streamSinkEntityMapper.selectAllStreamSinks();
+ // convert to Map<clusterName, Map<taskName, List<groupId>>> format.
+ Map<String, Map<String, List<String>>> groupMap = new
ConcurrentHashMap<>();
+ sinkEntities.forEach(stream -> {
+ Map<String, List<String>> task2groupsMap =
+ groupMap.computeIfAbsent(stream.getInlongClusterName(), k
-> new ConcurrentHashMap<>());
+ List<String> groupIdList =
+ task2groupsMap.computeIfAbsent(stream.getSortTaskName(), k
-> new ArrayList<>());
+ groupIdList.add(stream.getInlongGroupId());
+ });
+
+ // get all group topic info by groupId
+ Map<String, List<InlongGroupTopicInfo>> allGroupTopicInfo =
sinkEntities.stream()
+ .flatMap(entity -> {
+ InlongGroupTopicInfo topicInfo =
groupService.getTopic(entity.getInlongGroupId());
+ InlongGroupTopicInfo backupTopicInfo =
groupService.getBackupTopic(entity.getInlongGroupId());
+ return Stream.of(topicInfo, backupTopicInfo);
+ })
+ .filter(Objects::nonNull)
+
.collect(Collectors.groupingBy(InlongGroupTopicInfo::getInlongGroupId));
+
+ // Prepare CacheZones for each cluster and task
+ Map<String, Map<String, String>> newMd5Map = new ConcurrentHashMap<>();
+ Map<String, Map<String, CacheZoneConfig>> newConfigMap = new
ConcurrentHashMap<>();
+ groupMap.forEach((clusterName, task2Group) -> {
+
+ // prepare the new config and md5
+ Map<String, CacheZoneConfig> task2Config = new
ConcurrentHashMap<>();
+ Map<String, String> task2Md5 = new ConcurrentHashMap<>();
+
+ task2Group.forEach((taskName, groupList) -> {
+ CacheZoneConfig cacheZoneConfig =
+ CacheZoneConfig.builder()
+ .sortClusterName(clusterName)
+ .sortTaskId(taskName)
+ .build();
+ List<InlongGroupTopicInfo> relatedGroupInfos =
groupList.stream()
+ .flatMap(groupId ->
allGroupTopicInfo.get(groupId).stream())
+ .collect(Collectors.toList());
+ Map<String, CacheZone> cacheZoneMap =
this.parsePulsarCacheZones(relatedGroupInfos);
+ cacheZoneConfig.setCacheZones(cacheZoneMap);
+
+ String jsonStr = GSON.toJson(cacheZoneConfig);
+ String md5 = DigestUtils.md5Hex(jsonStr);
+ task2Config.put(taskName, cacheZoneConfig);
+ task2Md5.put(taskName, md5);
+ });
+
+ newConfigMap.put(clusterName, task2Config);
+ newMd5Map.put(clusterName, task2Md5);
+ });
+
+ sortSourceConfigMap = newConfigMap;
+ sortSourceMd5Map = newMd5Map;
+ }
+
+ private Map<String, CacheZone>
parsePulsarCacheZones(List<InlongGroupTopicInfo> groupTopicInfos) {
+ Map<String, CacheZone> cacheZoneMap = new HashMap<>();
+ groupTopicInfos.forEach(info -> this.parseCacheZones(info,
cacheZoneMap));
+ return cacheZoneMap;
+ }
+
+ private void parseCacheZones(InlongGroupTopicInfo info, Map<String,
CacheZone> cacheZoneMap) {
+ switch (info.getMqType().toUpperCase()) {
Review Comment:
No need to upper case, it is an error if it was not matched.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]