This is an automated email from the ASF dual-hosted git repository. dockerzhang pushed a commit to branch branch-1.4 in repository https://gitbox.apache.org/repos/asf/inlong.git
commit 7644e71d58c79ed26e447b13a671309a83e2a0c3 Author: haifxu <[email protected]> AuthorDate: Tue Nov 8 19:20:28 2022 +0800 [INLONG-6451][Manager] Optimize the resource process of Kafka MQ (#6455) --- .../manager/pojo/group/kafka/InlongKafkaDTO.java | 4 --- .../manager/pojo/group/kafka/InlongKafkaInfo.java | 4 --- .../pojo/group/kafka/InlongKafkaRequest.java | 4 --- .../resource/queue/kafka/KafkaOperator.java | 31 +++-------------- .../queue/kafka/KafkaResourceOperators.java | 18 ++++------ .../service/resource/queue/kafka/KafkaUtils.java | 27 ++------------- .../service/source/kafka/KafkaSourceOperator.java | 39 ++++++++++++++++++++++ 7 files changed, 51 insertions(+), 76 deletions(-) diff --git a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/group/kafka/InlongKafkaDTO.java b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/group/kafka/InlongKafkaDTO.java index 093441768..0136a7d93 100644 --- a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/group/kafka/InlongKafkaDTO.java +++ b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/group/kafka/InlongKafkaDTO.java @@ -44,10 +44,6 @@ public class InlongKafkaDTO extends BaseInlongGroup { private Integer numPartitions; // replicationFactor number private Short replicationFactor = 1; - //consumer grouping - private String groupId; - // autocommit interval - private String autoCommit; /** * Get the dto instance from the request diff --git a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/group/kafka/InlongKafkaInfo.java b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/group/kafka/InlongKafkaInfo.java index 20a699237..becfead26 100644 --- a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/group/kafka/InlongKafkaInfo.java +++ b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/group/kafka/InlongKafkaInfo.java @@ -40,10 +40,6 @@ public class InlongKafkaInfo extends InlongGroupInfo { private Integer numPartitions; // replicationFactor number private Short replicationFactor = 1; - //consumer grouping - private String groupId; - // autocommit interval - private String autoCommit; public InlongKafkaInfo() { this.setMqType(MQType.KAFKA); diff --git a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/group/kafka/InlongKafkaRequest.java b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/group/kafka/InlongKafkaRequest.java index 67cfe0550..d66e05ffd 100644 --- a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/group/kafka/InlongKafkaRequest.java +++ b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/group/kafka/InlongKafkaRequest.java @@ -39,10 +39,6 @@ public class InlongKafkaRequest extends InlongGroupRequest { private Integer numPartitions; // replicationFactor number private Short replicationFactor = 1; - //consumer grouping - private String groupId; - // autocommit interval - private String autoCommit; public InlongKafkaRequest() { this.setMqType(MQType.KAFKA); diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/queue/kafka/KafkaOperator.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/queue/kafka/KafkaOperator.java index 7e886cf95..6691b55d0 100644 --- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/queue/kafka/KafkaOperator.java +++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/queue/kafka/KafkaOperator.java @@ -17,12 +17,6 @@ package org.apache.inlong.manager.service.resource.queue.kafka; -import java.util.Collections; -import java.util.List; -import java.util.Map; -import java.util.Set; -import java.util.concurrent.ExecutionException; - import org.apache.inlong.manager.pojo.cluster.kafka.KafkaClusterInfo; import org.apache.inlong.manager.pojo.group.kafka.InlongKafkaInfo; import org.apache.inlong.manager.service.cluster.InlongClusterServiceImpl; @@ -30,12 +24,14 @@ import org.apache.kafka.clients.admin.AdminClient; import org.apache.kafka.clients.admin.CreateTopicsResult; import org.apache.kafka.clients.admin.DeleteTopicsResult; import org.apache.kafka.clients.admin.NewTopic; -import org.apache.kafka.clients.consumer.KafkaConsumer; -import org.apache.kafka.common.PartitionInfo; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.stereotype.Service; +import java.util.Collections; +import java.util.Set; +import java.util.concurrent.ExecutionException; + /** * kafka operator, supports creating topics and creating subscription. */ @@ -77,23 +73,4 @@ public class KafkaOperator { return topicList.contains(topic); } - public void createSubscription(InlongKafkaInfo inlongKafkaInfo, KafkaClusterInfo kafkaClusterInfo, - String subscription) { - KafkaConsumer kafkaConsumer = KafkaUtils.createKafkaConsumer(inlongKafkaInfo, kafkaClusterInfo); - kafkaConsumer.subscribe(Collections.singletonList(subscription)); - } - - public boolean subscriptionIsExists(InlongKafkaInfo inlongKafkaInfo, KafkaClusterInfo kafkaClusterInfo, - String topic) { - try (KafkaConsumer consumer = KafkaUtils.createKafkaConsumer(inlongKafkaInfo, kafkaClusterInfo)) { - Map<String, List<PartitionInfo>> topics = consumer.listTopics(); - List<PartitionInfo> partitions = topics.get(topic); - if (partitions == null) { - LOGGER.info("subscription is not exist"); - return false; - } - return true; - } - } - } diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/queue/kafka/KafkaResourceOperators.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/queue/kafka/KafkaResourceOperators.java index 37a1ee66d..4e2b7246a 100644 --- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/queue/kafka/KafkaResourceOperators.java +++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/queue/kafka/KafkaResourceOperators.java @@ -79,7 +79,7 @@ public class KafkaResourceOperators implements QueueResourceOperator { return; } for (InlongStreamBriefInfo streamInfo : streamInfoList) { - this.createKafkaTopic(inlongKafkaInfo, streamInfo.getInlongStreamId()); + this.createKafkaTopic(inlongKafkaInfo, streamInfo.getMqResource()); } } catch (Exception e) { String msg = String.format("failed to create kafka resource for groupId=%s", groupId); @@ -126,7 +126,7 @@ public class KafkaResourceOperators implements QueueResourceOperator { try { InlongKafkaInfo inlongKafkaInfo = (InlongKafkaInfo) groupInfo; // create kafka topic - this.createKafkaTopic(inlongKafkaInfo, streamInfo.getInlongStreamId()); + this.createKafkaTopic(inlongKafkaInfo, streamInfo.getMqResource()); } catch (Exception e) { String msg = String.format("failed to create kafka topic for groupId=%s, streamId=%s", groupId, streamId); log.error(msg, e); @@ -159,10 +159,9 @@ public class KafkaResourceOperators implements QueueResourceOperator { /** * Create Kafka Topic and Subscription, and save the consumer group info. */ - private void createKafkaTopic(InlongKafkaInfo kafkaInfo, String streamId) throws Exception { + private void createKafkaTopic(InlongKafkaInfo kafkaInfo, String topicName) throws Exception { // 1. create kafka topic ClusterInfo clusterInfo = clusterService.getOne(kafkaInfo.getInlongClusterTag(), null, ClusterType.KAFKA); - String topicName = kafkaInfo.getInlongGroupId() + "_" + streamId; kafkaOperator.createTopic(kafkaInfo, (KafkaClusterInfo) clusterInfo, topicName); boolean exist = kafkaOperator.topicIsExists((KafkaClusterInfo) clusterInfo, topicName); @@ -172,17 +171,12 @@ public class KafkaResourceOperators implements QueueResourceOperator { throw new WorkflowListenerException("topic=" + topicName + " not exists in " + bootStrapServers); } - // 2. create a subscription for the kafka topic - kafkaOperator.createSubscription(kafkaInfo, (KafkaClusterInfo) clusterInfo, topicName); - String groupId = kafkaInfo.getInlongGroupId(); - log.info("success to create kafka subscription for groupId={}, topic={}, consumeGroup={}", - groupId, topicName, topicName); - - // 3. insert the consumer group info + // Kafka consumers do not need to register in advance + // 2. insert the consumer group info String consumeGroup = String.format(KAFKA_CONSUMER_GROUP, kafkaInfo.getInlongClusterTag(), topicName); Integer id = consumeService.saveBySystem(kafkaInfo, topicName, consumeGroup); log.info("success to save inlong consume [{}] for consumerGroup={}, groupId={}, topic={}", - id, consumeGroup, groupId, topicName); + id, consumeGroup, kafkaInfo.getInlongGroupId(), topicName); } /** diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/queue/kafka/KafkaUtils.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/queue/kafka/KafkaUtils.java index d793b8ce4..a9bd98ced 100644 --- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/queue/kafka/KafkaUtils.java +++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/queue/kafka/KafkaUtils.java @@ -17,15 +17,12 @@ package org.apache.inlong.manager.service.resource.queue.kafka; -import java.util.Properties; - import lombok.extern.slf4j.Slf4j; import org.apache.inlong.manager.pojo.cluster.kafka.KafkaClusterInfo; -import org.apache.inlong.manager.pojo.group.kafka.InlongKafkaInfo; import org.apache.kafka.clients.admin.AdminClient; import org.apache.kafka.clients.admin.AdminClientConfig; -import org.apache.kafka.clients.consumer.ConsumerConfig; -import org.apache.kafka.clients.consumer.KafkaConsumer; + +import java.util.Properties; /** * kafka connection utils @@ -40,24 +37,4 @@ public class KafkaUtils { // Create AdminClient instance return AdminClient.create(properties); } - - public static KafkaConsumer createKafkaConsumer(InlongKafkaInfo inlongKafkaInfo,KafkaClusterInfo kafkaClusterInfo) { - Properties properties = new Properties(); - // The connected kafka cluster address - properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaClusterInfo.getUrl()); - // consumer grouping - properties.put(ConsumerConfig.GROUP_ID_CONFIG, inlongKafkaInfo.getGroupId()); - // Confirm Auto Commit - properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "true"); - // autocommit interval - properties.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "1000"); - // Serialization - properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, - "org.apache.kafka.common.serialization.IntegerDeserializer"); - properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, - "org.apache.kafka.common.serialization.StringDeserializer"); - // For different groupid to ensure that the previous message can be consumed, reset the offset - properties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); - return new KafkaConsumer(properties); - } } diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/kafka/KafkaSourceOperator.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/kafka/KafkaSourceOperator.java index 17c14eea2..2ce0f1077 100644 --- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/kafka/KafkaSourceOperator.java +++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/kafka/KafkaSourceOperator.java @@ -18,22 +18,33 @@ package org.apache.inlong.manager.service.source.kafka; import com.fasterxml.jackson.databind.ObjectMapper; +import com.google.common.collect.Lists; +import com.google.common.collect.Maps; import org.apache.inlong.manager.common.consts.SourceType; +import org.apache.inlong.manager.common.enums.ClusterType; import org.apache.inlong.manager.common.enums.ErrorCodeEnum; import org.apache.inlong.manager.common.exceptions.BusinessException; import org.apache.inlong.manager.common.util.CommonBeanUtils; import org.apache.inlong.manager.dao.entity.StreamSourceEntity; +import org.apache.inlong.manager.pojo.cluster.ClusterInfo; +import org.apache.inlong.manager.pojo.cluster.kafka.KafkaClusterInfo; +import org.apache.inlong.manager.pojo.group.InlongGroupInfo; import org.apache.inlong.manager.pojo.source.SourceRequest; import org.apache.inlong.manager.pojo.source.StreamSource; +import org.apache.inlong.manager.pojo.source.kafka.KafkaOffset; import org.apache.inlong.manager.pojo.source.kafka.KafkaSource; import org.apache.inlong.manager.pojo.source.kafka.KafkaSourceDTO; import org.apache.inlong.manager.pojo.source.kafka.KafkaSourceRequest; +import org.apache.inlong.manager.pojo.stream.InlongStreamInfo; import org.apache.inlong.manager.pojo.stream.StreamField; +import org.apache.inlong.manager.service.cluster.InlongClusterService; import org.apache.inlong.manager.service.source.AbstractSourceOperator; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Service; import java.util.List; +import java.util.Map; +import java.util.Objects; /** * kafka stream source operator @@ -43,6 +54,8 @@ public class KafkaSourceOperator extends AbstractSourceOperator { @Autowired private ObjectMapper objectMapper; + @Autowired + private InlongClusterService clusterService; @Override public Boolean accept(String sourceType) { @@ -82,4 +95,30 @@ public class KafkaSourceOperator extends AbstractSourceOperator { return source; } + @Override + public Map<String, List<StreamSource>> getSourcesMap(InlongGroupInfo groupInfo, List<InlongStreamInfo> streamInfos, + List<StreamSource> streamSources) { + ClusterInfo clusterInfo = clusterService.getOne(groupInfo.getInlongClusterTag(), null, ClusterType.KAFKA); + KafkaClusterInfo kafkaClusterInfo = (KafkaClusterInfo) clusterInfo; + String bootstrapServers = kafkaClusterInfo.getUrl(); + + Map<String, List<StreamSource>> sourceMap = Maps.newHashMap(); + streamInfos.forEach(streamInfo -> { + KafkaSource kafkaSource = new KafkaSource(); + String streamId = streamInfo.getInlongStreamId(); + kafkaSource.setSourceName(streamId); + kafkaSource.setBootstrapServers(bootstrapServers); + kafkaSource.setTopic(streamInfo.getMqResource()); + for (StreamSource sourceInfo : streamSources) { + if (!Objects.equals(streamId, sourceInfo.getInlongStreamId())) { + continue; + } + kafkaSource.setSerializationType(sourceInfo.getSerializationType()); + } + kafkaSource.setAutoOffsetReset(KafkaOffset.EARLIEST.getName()); + kafkaSource.setFieldList(streamInfo.getFieldList()); + sourceMap.computeIfAbsent(streamId, key -> Lists.newArrayList()).add(kafkaSource); + }); + return sourceMap; + } }
