This is an automated email from the ASF dual-hosted git repository. dockerzhang pushed a commit to branch branch-1.4 in repository https://gitbox.apache.org/repos/asf/inlong.git
commit b04d4472ebffaed97f0bd1ac389392ca66b494c6 Author: vernedeng <[email protected]> AuthorDate: Tue Nov 8 19:20:52 2022 +0800 [INLONG-6426][Manager] SortSourceService support multi stream under one group (#6427) --- .../resources/mappers/InlongStreamEntityMapper.xml | 3 +- .../resources/mappers/StreamSinkEntityMapper.xml | 1 + .../pojo/sort/standalone/SortSourceStreamInfo.java | 23 ++++++ .../sort/standalone/SortSourceStreamSinkInfo.java | 1 + .../service/core/impl/SortSourceServiceImpl.java | 96 +++++++++++++--------- .../manager/service/sort/SortServiceImplTest.java | 24 +++--- 6 files changed, 97 insertions(+), 51 deletions(-) diff --git a/inlong-manager/manager-dao/src/main/resources/mappers/InlongStreamEntityMapper.xml b/inlong-manager/manager-dao/src/main/resources/mappers/InlongStreamEntityMapper.xml index 231769ccc..9fec7e7cd 100644 --- a/inlong-manager/manager-dao/src/main/resources/mappers/InlongStreamEntityMapper.xml +++ b/inlong-manager/manager-dao/src/main/resources/mappers/InlongStreamEntityMapper.xml @@ -289,7 +289,8 @@ select inlong_group_id, inlong_stream_id, - mq_resource + mq_resource, + ext_params from inlong_stream where is_deleted = 0 </select> diff --git a/inlong-manager/manager-dao/src/main/resources/mappers/StreamSinkEntityMapper.xml b/inlong-manager/manager-dao/src/main/resources/mappers/StreamSinkEntityMapper.xml index f6422643c..996c85cf4 100644 --- a/inlong-manager/manager-dao/src/main/resources/mappers/StreamSinkEntityMapper.xml +++ b/inlong-manager/manager-dao/src/main/resources/mappers/StreamSinkEntityMapper.xml @@ -372,6 +372,7 @@ select inlong_cluster_name as sortClusterName, sort_task_name, inlong_group_id as groupId, + inlong_stream_id as streamId, ext_params from stream_sink where is_deleted = 0 diff --git a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sort/standalone/SortSourceStreamInfo.java b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sort/standalone/SortSourceStreamInfo.java index bd89a573c..294212c27 100644 --- a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sort/standalone/SortSourceStreamInfo.java +++ b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sort/standalone/SortSourceStreamInfo.java @@ -17,11 +17,34 @@ package org.apache.inlong.manager.pojo.sort.standalone; +import com.google.gson.Gson; import lombok.Data; +import org.apache.commons.lang3.StringUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; @Data public class SortSourceStreamInfo { + private static final Logger LOGGER = LoggerFactory.getLogger(SortSourceStreamInfo.class); + private static final Gson GSON = new Gson(); + private String inlongGroupId; private String inlongStreamId; private String mqResource; + String extParams; + Map<String, String> extParamsMap = new ConcurrentHashMap<>(); + + public Map<String, String> getExtParamsMap() { + if (extParamsMap.isEmpty() && StringUtils.isNotBlank(extParams)) { + try { + extParamsMap = GSON.fromJson(extParams, Map.class); + } catch (Throwable t) { + LOGGER.error("fail to parse group ext params", t); + } + } + return extParamsMap; + } } diff --git a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sort/standalone/SortSourceStreamSinkInfo.java b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sort/standalone/SortSourceStreamSinkInfo.java index d144b2574..917494252 100644 --- a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sort/standalone/SortSourceStreamSinkInfo.java +++ b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sort/standalone/SortSourceStreamSinkInfo.java @@ -38,6 +38,7 @@ public class SortSourceStreamSinkInfo { String sortClusterName; String sortTaskName; String groupId; + String streamId; String extParams; Map<String, String> extParamsMap; diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/SortSourceServiceImpl.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/SortSourceServiceImpl.java index 4ec62c36d..df92a55d9 100644 --- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/SortSourceServiceImpl.java +++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/SortSourceServiceImpl.java @@ -50,7 +50,6 @@ import java.util.HashMap; import java.util.HashSet; import java.util.List; import java.util.Map; -import java.util.Optional; import java.util.Set; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.Executors; @@ -95,11 +94,11 @@ public class SortSourceServiceImpl implements SortSourceService { private Map<String, List<SortSourceClusterInfo>> mqClusters; private Map<String, SortSourceGroupInfo> groupInfos; - private Map<String, SortSourceStreamInfo> allStreams; + private Map<String, Map<String, SortSourceStreamInfo>> allStreams; private Map<String, String> backupClusterTag; private Map<String, String> backupGroupMqResource; - private Map<String, String> backupStreamMqResource; - private Map<String, Map<String, List<String>>> groupMap; + private Map<String, Map<String, String>> backupStreamMqResource; + private Map<String, Map<String, List<SortSourceStreamSinkInfo>>> streamSinkMap; @Autowired private SortConfigLoader configLoader; @@ -195,16 +194,16 @@ public class SortSourceServiceImpl implements SortSourceService { // reload all stream sinks, to Map<clusterName, Map<taskName, List<groupId>>> format List<SortSourceStreamSinkInfo> allStreamSinks = configLoader.loadAllStreamSinks(); - groupMap = new HashMap<>(); + streamSinkMap = new HashMap<>(); allStreamSinks.stream() .filter(sink -> sink.getSortClusterName() != null) .filter(sink -> sink.getSortTaskName() != null) .forEach(sink -> { - Map<String, List<String>> task2groupsMap = - groupMap.computeIfAbsent(sink.getSortClusterName(), k -> new ConcurrentHashMap<>()); - List<String> groupIdList = + Map<String, List<SortSourceStreamSinkInfo>> task2groupsMap = + streamSinkMap.computeIfAbsent(sink.getSortClusterName(), k -> new ConcurrentHashMap<>()); + List<SortSourceStreamSinkInfo> sinkInfoList = task2groupsMap.computeIfAbsent(sink.getSortTaskName(), k -> new ArrayList<>()); - groupIdList.add(sink.getGroupId()); + sinkInfoList.add(sink); }); // reload all groups @@ -225,12 +224,15 @@ public class SortSourceServiceImpl implements SortSourceService { // reload all streams allStreams = configLoader.loadAllStreams() .stream() - .collect(Collectors.toMap(SortSourceStreamInfo::getInlongGroupId, stream -> stream)); + .collect(Collectors.groupingBy(SortSourceStreamInfo::getInlongGroupId, + Collectors.toMap(SortSourceStreamInfo::getInlongStreamId, info -> info))); // reload all back up stream mq resource backupStreamMqResource = configLoader.loadStreamBackupInfo(ClusterSwitch.BACKUP_MQ_RESOURCE) .stream() - .collect(Collectors.toMap(InlongStreamExtEntity::getInlongGroupId, InlongStreamExtEntity::getKeyValue)); + .collect(Collectors.groupingBy(InlongStreamExtEntity::getInlongGroupId, + Collectors.toMap(InlongStreamExtEntity::getInlongStreamId, + InlongStreamExtEntity::getKeyValue))); } private void parseAll() { @@ -239,12 +241,12 @@ public class SortSourceServiceImpl implements SortSourceService { Map<String, Map<String, String>> newMd5Map = new ConcurrentHashMap<>(); Map<String, Map<String, CacheZoneConfig>> newConfigMap = new ConcurrentHashMap<>(); - groupMap.forEach((sortClusterName, task2GroupList) -> { + streamSinkMap.forEach((sortClusterName, task2SinkList) -> { // prepare the new config and md5 Map<String, CacheZoneConfig> task2Config = new ConcurrentHashMap<>(); Map<String, String> task2Md5 = new ConcurrentHashMap<>(); - task2GroupList.forEach((taskName, groupList) -> { + task2SinkList.forEach((taskName, sinkList) -> { try { CacheZoneConfig cacheZoneConfig = CacheZoneConfig.builder() @@ -252,7 +254,7 @@ public class SortSourceServiceImpl implements SortSourceService { .sortTaskId(taskName) .build(); Map<String, CacheZone> cacheZoneMap = - this.parseCacheZones(sortClusterName, taskName, groupList); + this.parseCacheZones(sinkList); cacheZoneConfig.setCacheZones(cacheZoneMap); // prepare md5 @@ -277,31 +279,33 @@ public class SortSourceServiceImpl implements SortSourceService { backupClusterTag = null; backupGroupMqResource = null; backupStreamMqResource = null; - groupMap = null; + streamSinkMap = null; } private Map<String, CacheZone> parseCacheZones( - String sortClusterName, - String taskName, - List<String> groupIdList) { + List<SortSourceStreamSinkInfo> sinkList) { // get group infos - List<SortSourceGroupInfo> groupInfoList = groupIdList.stream() - .filter(groupInfos::containsKey) - .map(groupInfos::get) + List<SortSourceStreamSinkInfo> sinkInfoList = sinkList.stream() + .filter(sinkInfo -> groupInfos.containsKey(sinkInfo.getGroupId()) + && allStreams.containsKey(sinkInfo.getGroupId()) + && allStreams.get(sinkInfo.getGroupId()).containsKey(sinkInfo.getStreamId())) .collect(Collectors.toList()); // group them by cluster tag. - Map<String, List<SortSourceGroupInfo>> tag2GroupInfos = groupInfoList.stream() - .collect(Collectors.groupingBy(SortSourceGroupInfo::getClusterTag)); + Map<String, List<SortSourceStreamSinkInfo>> tag2SinkInfos = sinkInfoList.stream() + .collect(Collectors.groupingBy(sink -> { + SortSourceGroupInfo groupInfo = groupInfos.get(sink.getGroupId()); + return groupInfo.getClusterTag(); + })); // group them by second cluster tag. - Map<String, List<SortSourceGroupInfo>> backupTag2GroupInfos = groupInfoList.stream() + Map<String, List<SortSourceStreamSinkInfo>> backupTag2SinkInfos = sinkInfoList.stream() .filter(info -> backupClusterTag.containsKey(info.getGroupId())) .collect(Collectors.groupingBy(info -> backupClusterTag.get(info.getGroupId()))); - List<CacheZone> cacheZones = this.parseCacheZonesByTag(tag2GroupInfos, false); - List<CacheZone> backupCacheZones = this.parseCacheZonesByTag(backupTag2GroupInfos, true); + List<CacheZone> cacheZones = this.parseCacheZonesByTag(tag2SinkInfos, false); + List<CacheZone> backupCacheZones = this.parseCacheZonesByTag(backupTag2SinkInfos, true); return Stream.of(cacheZones, backupCacheZones) .flatMap(Collection::stream) @@ -315,18 +319,20 @@ public class SortSourceServiceImpl implements SortSourceService { ); } - private List<CacheZone> parseCacheZonesByTag(Map<String, List<SortSourceGroupInfo>> tag2Groups, boolean isBackup) { + private List<CacheZone> parseCacheZonesByTag( + Map<String, List<SortSourceStreamSinkInfo>> tag2Sinks, + boolean isBackup) { - return tag2Groups.keySet().stream() + return tag2Sinks.keySet().stream() .filter(mqClusters::containsKey) .flatMap(tag -> { - List<SortSourceGroupInfo> groups = tag2Groups.get(tag); + List<SortSourceStreamSinkInfo> sinks = tag2Sinks.get(tag); List<SortSourceClusterInfo> clusters = mqClusters.get(tag); return clusters.stream() .map(cluster -> { CacheZone zone = null; try { - zone = this.parseCacheZone(groups, cluster, isBackup); + zone = this.parseCacheZone(sinks, cluster, isBackup); } catch (IllegalStateException e) { LOGGER.error("fail to init cache zone for cluster " + cluster, e); } @@ -337,11 +343,11 @@ public class SortSourceServiceImpl implements SortSourceService { } private CacheZone parseCacheZone( - List<SortSourceGroupInfo> groups, + List<SortSourceStreamSinkInfo> sinks, SortSourceClusterInfo cluster, boolean isBackupTag) { switch (cluster.getType()) { - case ClusterType.PULSAR: return parsePulsarZone(groups, cluster, isBackupTag); + case ClusterType.PULSAR: return parsePulsarZone(sinks, cluster, isBackupTag); default: throw new BusinessException(String.format("do not support cluster type=%s of cluster=%s", cluster.getType(), cluster)); @@ -349,24 +355,34 @@ public class SortSourceServiceImpl implements SortSourceService { } private CacheZone parsePulsarZone( - List<SortSourceGroupInfo> groups, + List<SortSourceStreamSinkInfo> sinks, SortSourceClusterInfo cluster, boolean isBackupTag) { Map<String, String> param = cluster.getExtParamsMap(); String tenant = param.get(KEY_TENANT); String auth = param.get(KEY_AUTH); - List<Topic> sdkTopics = groups.stream() - .map(info -> { - String namespace = info.getMqResource(); - String topic = allStreams.get(info.getGroupId()).getMqResource(); + List<Topic> sdkTopics = sinks.stream() + .map(sink -> { + String groupId = sink.getGroupId(); + String streamId = sink.getStreamId(); + SortSourceGroupInfo groupInfo = groupInfos.get(groupId); + SortSourceStreamInfo streamInfo = allStreams.get(groupId).get(streamId); + + String namespace = groupInfo.getMqResource(); + String topic = streamInfo.getMqResource(); if (isBackupTag) { - namespace = Optional.ofNullable(backupGroupMqResource.get(info.getGroupId())).orElse(namespace); - topic = Optional.ofNullable(backupStreamMqResource.get(info.getGroupId())).orElse(topic); + if (backupGroupMqResource.containsKey(groupId)) { + namespace = backupGroupMqResource.get(groupId); + } + if (backupStreamMqResource.containsKey(groupId) + && backupStreamMqResource.get(groupId).containsKey(streamId)) { + topic = backupStreamMqResource.get(groupId).get(streamId); + } } String fullTopic = tenant.concat("/").concat(namespace).concat("/").concat(topic); return Topic.builder() .topic(fullTopic) - .topicProperties(info.getExtParamsMap()) + .topicProperties(streamInfo.getExtParamsMap()) .build(); }) .collect(Collectors.toList()); diff --git a/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/sort/SortServiceImplTest.java b/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/sort/SortServiceImplTest.java index 338ee3276..d52b47486 100644 --- a/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/sort/SortServiceImplTest.java +++ b/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/sort/SortServiceImplTest.java @@ -69,10 +69,12 @@ public class SortServiceImplTest extends ServiceBaseTest { private static final String TEST_CLUSTER = "testCluster"; private static final String TEST_TASK = "testTask"; private static final String TEST_GROUP = "testGroup"; - private static final String TEST_STREAM = "1"; + private static final String TEST_STREAM_1 = "1"; + private static final String TEST_STREAM_2 = "2"; private static final String TEST_TAG = "testTag"; private static final String BACK_UP_TAG = "testBackupTag"; - private static final String TEST_TOPIC = "testTopic"; + private static final String TEST_TOPIC_1 = "testTopic"; + private static final String TEST_TOPIC_2 = "testTopic2"; private static final String TEST_SINK_TYPE = "testSinkType"; private static final String TEST_CREATOR = "testUser"; @@ -204,11 +206,13 @@ public class SortServiceImplTest extends ServiceBaseTest { private void prepareAll() { this.prepareCluster(TEST_CLUSTER); this.preparePulsar("testPulsar", true, TEST_TAG); - this.preparePulsar("testPulsar2", true, BACK_UP_TAG); + this.preparePulsar("backupPulsar", true, BACK_UP_TAG); this.prepareDataNode(TEST_TASK); this.prepareGroupId(TEST_GROUP); - this.prepareStreamId(TEST_GROUP, TEST_STREAM); - this.prepareTask(TEST_TASK, TEST_GROUP, TEST_CLUSTER); + this.prepareStreamId(TEST_GROUP, TEST_STREAM_1, TEST_TOPIC_1); + this.prepareStreamId(TEST_GROUP, TEST_STREAM_2, TEST_TOPIC_2); + this.prepareTask(TEST_TASK, TEST_GROUP, TEST_CLUSTER, TEST_STREAM_1); + this.prepareTask(TEST_TASK, TEST_GROUP, TEST_CLUSTER, TEST_STREAM_2); } private void prepareDataNode(String taskName) { @@ -255,12 +259,12 @@ public class SortServiceImplTest extends ServiceBaseTest { groupService.save(request, "test operator"); } - private void prepareStreamId(String groupId, String streamId) { + private void prepareStreamId(String groupId, String streamId, String topic) { InlongStreamRequest request = new InlongStreamRequest(); request.setInlongGroupId(groupId); request.setInlongStreamId(streamId); request.setName("test_stream_name"); - request.setMqResource(TEST_TOPIC); + request.setMqResource(topic); request.setVersion(InlongConstants.INITIAL_VERSION); List<InlongStreamExtInfo> extInfos = new ArrayList<>(); InlongStreamExtInfo ext = new InlongStreamExtInfo(); @@ -268,7 +272,7 @@ public class SortServiceImplTest extends ServiceBaseTest { ext.setInlongStreamId(streamId); ext.setInlongGroupId(groupId); ext.setKeyName(ClusterSwitch.BACKUP_MQ_RESOURCE); - ext.setKeyValue("backup_topic"); + ext.setKeyValue("backup_" + topic); request.setExtList(extInfos); streamService.save(request, "test_operator"); } @@ -305,7 +309,7 @@ public class SortServiceImplTest extends ServiceBaseTest { clusterService.save(request, "test operator"); } - private void prepareTask(String taskName, String groupId, String clusterName) { + private void prepareTask(String taskName, String groupId, String clusterName, String streamId) { SinkRequest request = new HiveSinkRequest(); request.setDataNodeName(taskName); request.setSinkType(SinkType.HIVE); @@ -313,7 +317,7 @@ public class SortServiceImplTest extends ServiceBaseTest { request.setSinkName(taskName); request.setSortTaskName(taskName); request.setInlongGroupId(groupId); - request.setInlongStreamId("1"); + request.setInlongStreamId(streamId); Map<String, Object> properties = new HashMap<>(); properties.put("delimiter", "|"); properties.put("dataType", "text");
