This is an automated email from the ASF dual-hosted git repository.

dockerzhang pushed a commit to branch branch-1.4
in repository https://gitbox.apache.org/repos/asf/inlong.git

commit 7644e71d58c79ed26e447b13a671309a83e2a0c3
Author: haifxu <[email protected]>
AuthorDate: Tue Nov 8 19:20:28 2022 +0800

    [INLONG-6451][Manager] Optimize the resource process of Kafka MQ (#6455)
---
 .../manager/pojo/group/kafka/InlongKafkaDTO.java   |  4 ---
 .../manager/pojo/group/kafka/InlongKafkaInfo.java  |  4 ---
 .../pojo/group/kafka/InlongKafkaRequest.java       |  4 ---
 .../resource/queue/kafka/KafkaOperator.java        | 31 +++--------------
 .../queue/kafka/KafkaResourceOperators.java        | 18 ++++------
 .../service/resource/queue/kafka/KafkaUtils.java   | 27 ++-------------
 .../service/source/kafka/KafkaSourceOperator.java  | 39 ++++++++++++++++++++++
 7 files changed, 51 insertions(+), 76 deletions(-)

diff --git 
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/group/kafka/InlongKafkaDTO.java
 
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/group/kafka/InlongKafkaDTO.java
index 093441768..0136a7d93 100644
--- 
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/group/kafka/InlongKafkaDTO.java
+++ 
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/group/kafka/InlongKafkaDTO.java
@@ -44,10 +44,6 @@ public class InlongKafkaDTO extends BaseInlongGroup {
     private Integer numPartitions;
     // replicationFactor number
     private Short replicationFactor = 1;
-    //consumer grouping
-    private String groupId;
-    // autocommit interval
-    private String autoCommit;
 
     /**
      * Get the dto instance from the request
diff --git 
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/group/kafka/InlongKafkaInfo.java
 
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/group/kafka/InlongKafkaInfo.java
index 20a699237..becfead26 100644
--- 
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/group/kafka/InlongKafkaInfo.java
+++ 
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/group/kafka/InlongKafkaInfo.java
@@ -40,10 +40,6 @@ public class InlongKafkaInfo extends InlongGroupInfo {
     private Integer numPartitions;
     // replicationFactor number
     private Short replicationFactor = 1;
-    //consumer grouping
-    private String groupId;
-    // autocommit interval
-    private String autoCommit;
 
     public InlongKafkaInfo() {
         this.setMqType(MQType.KAFKA);
diff --git 
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/group/kafka/InlongKafkaRequest.java
 
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/group/kafka/InlongKafkaRequest.java
index 67cfe0550..d66e05ffd 100644
--- 
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/group/kafka/InlongKafkaRequest.java
+++ 
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/group/kafka/InlongKafkaRequest.java
@@ -39,10 +39,6 @@ public class InlongKafkaRequest extends InlongGroupRequest {
     private Integer numPartitions;
     // replicationFactor number
     private Short replicationFactor = 1;
-    //consumer grouping
-    private String groupId;
-    // autocommit interval
-    private String autoCommit;
 
     public InlongKafkaRequest() {
         this.setMqType(MQType.KAFKA);
diff --git 
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/queue/kafka/KafkaOperator.java
 
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/queue/kafka/KafkaOperator.java
index 7e886cf95..6691b55d0 100644
--- 
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/queue/kafka/KafkaOperator.java
+++ 
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/queue/kafka/KafkaOperator.java
@@ -17,12 +17,6 @@
 
 package org.apache.inlong.manager.service.resource.queue.kafka;
 
-import java.util.Collections;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-import java.util.concurrent.ExecutionException;
-
 import org.apache.inlong.manager.pojo.cluster.kafka.KafkaClusterInfo;
 import org.apache.inlong.manager.pojo.group.kafka.InlongKafkaInfo;
 import org.apache.inlong.manager.service.cluster.InlongClusterServiceImpl;
@@ -30,12 +24,14 @@ import org.apache.kafka.clients.admin.AdminClient;
 import org.apache.kafka.clients.admin.CreateTopicsResult;
 import org.apache.kafka.clients.admin.DeleteTopicsResult;
 import org.apache.kafka.clients.admin.NewTopic;
-import org.apache.kafka.clients.consumer.KafkaConsumer;
-import org.apache.kafka.common.PartitionInfo;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.springframework.stereotype.Service;
 
+import java.util.Collections;
+import java.util.Set;
+import java.util.concurrent.ExecutionException;
+
 /**
  * kafka operator, supports creating topics and creating subscription.
  */
@@ -77,23 +73,4 @@ public class KafkaOperator {
         return topicList.contains(topic);
     }
 
-    public void createSubscription(InlongKafkaInfo inlongKafkaInfo, 
KafkaClusterInfo kafkaClusterInfo,
-            String subscription) {
-        KafkaConsumer kafkaConsumer = 
KafkaUtils.createKafkaConsumer(inlongKafkaInfo, kafkaClusterInfo);
-        kafkaConsumer.subscribe(Collections.singletonList(subscription));
-    }
-
-    public boolean subscriptionIsExists(InlongKafkaInfo inlongKafkaInfo, 
KafkaClusterInfo kafkaClusterInfo,
-            String topic) {
-        try (KafkaConsumer consumer = 
KafkaUtils.createKafkaConsumer(inlongKafkaInfo, kafkaClusterInfo)) {
-            Map<String, List<PartitionInfo>> topics = consumer.listTopics();
-            List<PartitionInfo> partitions = topics.get(topic);
-            if (partitions == null) {
-                LOGGER.info("subscription is not exist");
-                return false;
-            }
-            return true;
-        }
-    }
-
 }
diff --git 
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/queue/kafka/KafkaResourceOperators.java
 
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/queue/kafka/KafkaResourceOperators.java
index 37a1ee66d..4e2b7246a 100644
--- 
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/queue/kafka/KafkaResourceOperators.java
+++ 
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/queue/kafka/KafkaResourceOperators.java
@@ -79,7 +79,7 @@ public class KafkaResourceOperators implements 
QueueResourceOperator {
                 return;
             }
             for (InlongStreamBriefInfo streamInfo : streamInfoList) {
-                this.createKafkaTopic(inlongKafkaInfo, 
streamInfo.getInlongStreamId());
+                this.createKafkaTopic(inlongKafkaInfo, 
streamInfo.getMqResource());
             }
         } catch (Exception e) {
             String msg = String.format("failed to create kafka resource for 
groupId=%s", groupId);
@@ -126,7 +126,7 @@ public class KafkaResourceOperators implements 
QueueResourceOperator {
         try {
             InlongKafkaInfo inlongKafkaInfo = (InlongKafkaInfo) groupInfo;
             // create kafka topic
-            this.createKafkaTopic(inlongKafkaInfo, 
streamInfo.getInlongStreamId());
+            this.createKafkaTopic(inlongKafkaInfo, streamInfo.getMqResource());
         } catch (Exception e) {
             String msg = String.format("failed to create kafka topic for 
groupId=%s, streamId=%s", groupId, streamId);
             log.error(msg, e);
@@ -159,10 +159,9 @@ public class KafkaResourceOperators implements 
QueueResourceOperator {
     /**
      * Create Kafka Topic and Subscription, and save the consumer group info.
      */
-    private void createKafkaTopic(InlongKafkaInfo kafkaInfo, String streamId) 
throws Exception {
+    private void createKafkaTopic(InlongKafkaInfo kafkaInfo, String topicName) 
throws Exception {
         // 1. create kafka topic
         ClusterInfo clusterInfo = 
clusterService.getOne(kafkaInfo.getInlongClusterTag(), null, ClusterType.KAFKA);
-        String topicName = kafkaInfo.getInlongGroupId() + "_" + streamId;
         kafkaOperator.createTopic(kafkaInfo, (KafkaClusterInfo) clusterInfo, 
topicName);
 
         boolean exist = kafkaOperator.topicIsExists((KafkaClusterInfo) 
clusterInfo, topicName);
@@ -172,17 +171,12 @@ public class KafkaResourceOperators implements 
QueueResourceOperator {
             throw new WorkflowListenerException("topic=" + topicName + " not 
exists in " + bootStrapServers);
         }
 
-        // 2. create a subscription for the kafka topic
-        kafkaOperator.createSubscription(kafkaInfo, (KafkaClusterInfo) 
clusterInfo, topicName);
-        String groupId = kafkaInfo.getInlongGroupId();
-        log.info("success to create kafka subscription for groupId={}, 
topic={}, consumeGroup={}",
-                groupId, topicName, topicName);
-
-        // 3. insert the consumer group info
+        // Kafka consumers do not need to register in advance
+        // 2. insert the consumer group info
         String consumeGroup = String.format(KAFKA_CONSUMER_GROUP, 
kafkaInfo.getInlongClusterTag(), topicName);
         Integer id = consumeService.saveBySystem(kafkaInfo, topicName, 
consumeGroup);
         log.info("success to save inlong consume [{}] for consumerGroup={}, 
groupId={}, topic={}",
-                id, consumeGroup, groupId, topicName);
+                id, consumeGroup, kafkaInfo.getInlongGroupId(), topicName);
     }
 
     /**
diff --git 
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/queue/kafka/KafkaUtils.java
 
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/queue/kafka/KafkaUtils.java
index d793b8ce4..a9bd98ced 100644
--- 
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/queue/kafka/KafkaUtils.java
+++ 
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/queue/kafka/KafkaUtils.java
@@ -17,15 +17,12 @@
 
 package org.apache.inlong.manager.service.resource.queue.kafka;
 
-import java.util.Properties;
-
 import lombok.extern.slf4j.Slf4j;
 import org.apache.inlong.manager.pojo.cluster.kafka.KafkaClusterInfo;
-import org.apache.inlong.manager.pojo.group.kafka.InlongKafkaInfo;
 import org.apache.kafka.clients.admin.AdminClient;
 import org.apache.kafka.clients.admin.AdminClientConfig;
-import org.apache.kafka.clients.consumer.ConsumerConfig;
-import org.apache.kafka.clients.consumer.KafkaConsumer;
+
+import java.util.Properties;
 
 /**
  * kafka connection utils
@@ -40,24 +37,4 @@ public class KafkaUtils {
         // Create AdminClient instance
         return AdminClient.create(properties);
     }
-
-    public static KafkaConsumer createKafkaConsumer(InlongKafkaInfo 
inlongKafkaInfo,KafkaClusterInfo kafkaClusterInfo) {
-        Properties properties = new Properties();
-        // The connected kafka cluster address
-        properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, 
kafkaClusterInfo.getUrl());
-        // consumer grouping
-        properties.put(ConsumerConfig.GROUP_ID_CONFIG, 
inlongKafkaInfo.getGroupId());
-        // Confirm Auto Commit
-        properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "true");
-        // autocommit interval
-        properties.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "1000");
-        // Serialization
-        properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
-                "org.apache.kafka.common.serialization.IntegerDeserializer");
-        properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
-                "org.apache.kafka.common.serialization.StringDeserializer");
-        // For different groupid to ensure that the previous message can be 
consumed, reset the offset
-        properties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
-        return new KafkaConsumer(properties);
-    }
 }
diff --git 
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/kafka/KafkaSourceOperator.java
 
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/kafka/KafkaSourceOperator.java
index 17c14eea2..2ce0f1077 100644
--- 
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/kafka/KafkaSourceOperator.java
+++ 
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/kafka/KafkaSourceOperator.java
@@ -18,22 +18,33 @@
 package org.apache.inlong.manager.service.source.kafka;
 
 import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
 import org.apache.inlong.manager.common.consts.SourceType;
+import org.apache.inlong.manager.common.enums.ClusterType;
 import org.apache.inlong.manager.common.enums.ErrorCodeEnum;
 import org.apache.inlong.manager.common.exceptions.BusinessException;
 import org.apache.inlong.manager.common.util.CommonBeanUtils;
 import org.apache.inlong.manager.dao.entity.StreamSourceEntity;
+import org.apache.inlong.manager.pojo.cluster.ClusterInfo;
+import org.apache.inlong.manager.pojo.cluster.kafka.KafkaClusterInfo;
+import org.apache.inlong.manager.pojo.group.InlongGroupInfo;
 import org.apache.inlong.manager.pojo.source.SourceRequest;
 import org.apache.inlong.manager.pojo.source.StreamSource;
+import org.apache.inlong.manager.pojo.source.kafka.KafkaOffset;
 import org.apache.inlong.manager.pojo.source.kafka.KafkaSource;
 import org.apache.inlong.manager.pojo.source.kafka.KafkaSourceDTO;
 import org.apache.inlong.manager.pojo.source.kafka.KafkaSourceRequest;
+import org.apache.inlong.manager.pojo.stream.InlongStreamInfo;
 import org.apache.inlong.manager.pojo.stream.StreamField;
+import org.apache.inlong.manager.service.cluster.InlongClusterService;
 import org.apache.inlong.manager.service.source.AbstractSourceOperator;
 import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.stereotype.Service;
 
 import java.util.List;
+import java.util.Map;
+import java.util.Objects;
 
 /**
  * kafka stream source operator
@@ -43,6 +54,8 @@ public class KafkaSourceOperator extends 
AbstractSourceOperator {
 
     @Autowired
     private ObjectMapper objectMapper;
+    @Autowired
+    private InlongClusterService clusterService;
 
     @Override
     public Boolean accept(String sourceType) {
@@ -82,4 +95,30 @@ public class KafkaSourceOperator extends 
AbstractSourceOperator {
         return source;
     }
 
+    @Override
+    public Map<String, List<StreamSource>> getSourcesMap(InlongGroupInfo 
groupInfo, List<InlongStreamInfo> streamInfos,
+            List<StreamSource> streamSources) {
+        ClusterInfo clusterInfo = 
clusterService.getOne(groupInfo.getInlongClusterTag(), null, ClusterType.KAFKA);
+        KafkaClusterInfo kafkaClusterInfo = (KafkaClusterInfo) clusterInfo;
+        String bootstrapServers = kafkaClusterInfo.getUrl();
+
+        Map<String, List<StreamSource>> sourceMap = Maps.newHashMap();
+        streamInfos.forEach(streamInfo -> {
+            KafkaSource kafkaSource = new KafkaSource();
+            String streamId = streamInfo.getInlongStreamId();
+            kafkaSource.setSourceName(streamId);
+            kafkaSource.setBootstrapServers(bootstrapServers);
+            kafkaSource.setTopic(streamInfo.getMqResource());
+            for (StreamSource sourceInfo : streamSources) {
+                if (!Objects.equals(streamId, sourceInfo.getInlongStreamId())) 
{
+                    continue;
+                }
+                
kafkaSource.setSerializationType(sourceInfo.getSerializationType());
+            }
+            kafkaSource.setAutoOffsetReset(KafkaOffset.EARLIEST.getName());
+            kafkaSource.setFieldList(streamInfo.getFieldList());
+            sourceMap.computeIfAbsent(streamId, key -> 
Lists.newArrayList()).add(kafkaSource);
+        });
+        return sourceMap;
+    }
 }

Reply via email to