This is an automated email from the ASF dual-hosted git repository.

dockerzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git


The following commit(s) were added to refs/heads/master by this push:
     new 90c89f1f9b [INLONG-8360][Manager] Support previewing data of Kafka 
(#8588)
90c89f1f9b is described below

commit 90c89f1f9ba21a1a5b65ae9519b70bac490efa16
Author: Hao <[email protected]>
AuthorDate: Mon Jul 31 16:10:24 2023 +0800

    [INLONG-8360][Manager] Support previewing data of Kafka (#8588)
    
    Co-authored-by: healchow <[email protected]>
---
 .../manager/plugin/flink/FlinkOperation.java       |   6 +-
 .../manager/pojo/consume/BriefMQMessage.java       |   8 +-
 .../message/InlongMsgDeserializeOperator.java      |  21 ++-
 .../service/message/PbMsgDeserializeOperator.java  |  25 ++--
 .../service/message/RawMsgDeserializeOperator.java |   7 +-
 .../resource/queue/kafka/KafkaOperator.java        | 121 ++++++++++++++++-
 ...rators.java => KafkaQueueResourceOperator.java} |  23 +++-
 .../resource/queue/pulsar/PulsarOperator.java      |   1 -
 ...rator.java => PulsarQueueResourceOperator.java} |   2 +-
 ...rator.java => TubeMQQueueResourceOperator.java} |   2 +-
 .../service/source/kafka/KafkaSourceOperator.java  |   2 +-
 .../source/pulsar/PulsarSourceOperator.java        |   2 +-
 .../resource/queue/kafka/KafkaOperatorTest.java    | 146 +++++++++++++++++++++
 13 files changed, 327 insertions(+), 39 deletions(-)

diff --git 
a/inlong-manager/manager-plugins/base/src/main/java/org/apache/inlong/manager/plugin/flink/FlinkOperation.java
 
b/inlong-manager/manager-plugins/base/src/main/java/org/apache/inlong/manager/plugin/flink/FlinkOperation.java
index 817a9f8197..3ae90e6036 100644
--- 
a/inlong-manager/manager-plugins/base/src/main/java/org/apache/inlong/manager/plugin/flink/FlinkOperation.java
+++ 
b/inlong-manager/manager-plugins/base/src/main/java/org/apache/inlong/manager/plugin/flink/FlinkOperation.java
@@ -313,13 +313,13 @@ public class FlinkOperation {
         String jobId = flinkInfo.getJobId();
         JobDetailsInfo jobDetailsInfo = flinkService.getJobDetail(jobId);
         if (jobDetailsInfo == null) {
-            throw new Exception(String.format("delete job failed as the job 
not found for %s", jobId));
+            throw new Exception(String.format("delete job failed as the job 
[%s] not found", jobId));
         }
 
         JobStatus jobStatus = jobDetailsInfo.getJobStatus();
         if (jobStatus != null && jobStatus.isTerminalState()) {
-            String message = String.format("not support delete %s as the task 
was terminated", jobId);
-            message = jobStatus.isGloballyTerminalState() ? message + " 
globally" : " locally";
+            String stateName = jobStatus.isGloballyTerminalState() ? 
"globally" : "locally";
+            String message = String.format("unsupported delete job [%s] as it 
was [%s] terminated", jobId, stateName);
             throw new Exception(message);
         }
 
diff --git 
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/consume/BriefMQMessage.java
 
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/consume/BriefMQMessage.java
index a09c3c0b0c..51085c5570 100644
--- 
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/consume/BriefMQMessage.java
+++ 
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/consume/BriefMQMessage.java
@@ -25,16 +25,16 @@ import lombok.Data;
 import lombok.NoArgsConstructor;
 
 /**
- * Inlong display message info
+ * Brief Message info for MQ
  */
 @Data
 @Builder
 @NoArgsConstructor
 @AllArgsConstructor
-@ApiModel("Inlong brief mq message info")
+@ApiModel("Brief Message info for MQ")
 public class BriefMQMessage {
 
-    @ApiModelProperty(value = "index id")
+    @ApiModelProperty(value = "Message index id")
     private Integer id;
 
     @ApiModelProperty(value = "Inlong group id")
@@ -43,7 +43,7 @@ public class BriefMQMessage {
     @ApiModelProperty(value = "Inlong stream id")
     private String inlongStreamId;
 
-    @ApiModelProperty(value = "Date")
+    @ApiModelProperty(value = "Message date")
     private Long dt;
 
     @ApiModelProperty(value = "Client ip")
diff --git 
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/message/InlongMsgDeserializeOperator.java
 
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/message/InlongMsgDeserializeOperator.java
index 9ae409425b..b5165eb8bf 100644
--- 
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/message/InlongMsgDeserializeOperator.java
+++ 
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/message/InlongMsgDeserializeOperator.java
@@ -44,22 +44,22 @@ public class InlongMsgDeserializeOperator implements 
DeserializeOperator {
     }
 
     @Override
-    public List<BriefMQMessage> decodeMsg(InlongStreamInfo streamInfo,
-            byte[] msgBytes, Map<String, String> headers, int index) throws 
Exception {
+    public List<BriefMQMessage> decodeMsg(InlongStreamInfo streamInfo, byte[] 
msgBytes, Map<String, String> headers,
+            int index) {
         String groupId = headers.get(AttributeConstants.GROUP_ID);
         String streamId = headers.get(AttributeConstants.STREAM_ID);
         List<BriefMQMessage> messageList = new ArrayList<>();
         InLongMsg inLongMsg = InLongMsg.parseFrom(msgBytes);
         for (String attr : inLongMsg.getAttrs()) {
-            Map<String, String> attributes = StringUtil.splitKv(attr, 
INLONGMSG_ATTR_ENTRY_DELIMITER,
+            Map<String, String> attrMap = StringUtil.splitKv(attr, 
INLONGMSG_ATTR_ENTRY_DELIMITER,
                     INLONGMSG_ATTR_KV_DELIMITER, null, null);
             // Extracts time from the attributes
             long msgTime;
-            if (attributes.containsKey(INLONGMSG_ATTR_TIME_T)) {
-                String date = attributes.get(INLONGMSG_ATTR_TIME_T).trim();
+            if (attrMap.containsKey(INLONGMSG_ATTR_TIME_T)) {
+                String date = attrMap.get(INLONGMSG_ATTR_TIME_T).trim();
                 msgTime = StringUtil.parseDateTime(date);
-            } else if (attributes.containsKey(INLONGMSG_ATTR_TIME_DT)) {
-                String epoch = attributes.get(INLONGMSG_ATTR_TIME_DT).trim();
+            } else if (attrMap.containsKey(INLONGMSG_ATTR_TIME_DT)) {
+                String epoch = attrMap.get(INLONGMSG_ATTR_TIME_DT).trim();
                 msgTime = Long.parseLong(epoch);
             } else {
                 throw new 
IllegalArgumentException(String.format("PARSE_ATTR_ERROR_STRING%s",
@@ -71,10 +71,9 @@ public class InlongMsgDeserializeOperator implements 
DeserializeOperator {
                 if (Objects.isNull(bodyBytes)) {
                     continue;
                 }
-                BriefMQMessage inLongMessage =
-                        new BriefMQMessage(index, groupId, streamId, msgTime, 
attributes.get(CLIENT_IP),
-                                new String(bodyBytes, 
Charset.forName(streamInfo.getDataEncoding())));
-                messageList.add(inLongMessage);
+                BriefMQMessage message = new BriefMQMessage(index, groupId, 
streamId, msgTime, attrMap.get(CLIENT_IP),
+                        new String(bodyBytes, 
Charset.forName(streamInfo.getDataEncoding())));
+                messageList.add(message);
             }
         }
         return messageList;
diff --git 
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/message/PbMsgDeserializeOperator.java
 
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/message/PbMsgDeserializeOperator.java
index 8a9320237d..19742325f8 100644
--- 
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/message/PbMsgDeserializeOperator.java
+++ 
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/message/PbMsgDeserializeOperator.java
@@ -48,7 +48,6 @@ public class PbMsgDeserializeOperator implements 
DeserializeOperator {
     @Override
     public List<BriefMQMessage> decodeMsg(InlongStreamInfo streamInfo,
             byte[] msgBytes, Map<String, String> headers, int index) throws 
Exception {
-        List<BriefMQMessage> messageList = new ArrayList<>();
         int compressType = 
Integer.parseInt(headers.getOrDefault(COMPRESS_TYPE_KEY, "0"));
         byte[] values = msgBytes;
         switch (compressType) {
@@ -63,27 +62,33 @@ public class PbMsgDeserializeOperator implements 
DeserializeOperator {
             default:
                 throw new IllegalArgumentException("Unknown compress type:" + 
compressType);
         }
-        messageList = transformMessageObjs(MessageObjs.parseFrom(values), 
streamInfo, index);
-        return messageList;
+        return transformMessageObjs(MessageObjs.parseFrom(values), streamInfo, 
index);
     }
 
     private List<BriefMQMessage> transformMessageObjs(MessageObjs messageObjs, 
InlongStreamInfo streamInfo, int index) {
         if (null == messageObjs) {
             return null;
         }
-        List<BriefMQMessage> briefMQMessages = new ArrayList<>();
+
+        List<BriefMQMessage> messageList = new ArrayList<>();
         for (MessageObj messageObj : messageObjs.getMsgsList()) {
             List<MapFieldEntry> mapFieldEntries = messageObj.getParamsList();
             Map<String, String> headers = new HashMap<>();
             for (MapFieldEntry mapFieldEntry : mapFieldEntries) {
                 headers.put(mapFieldEntry.getKey(), mapFieldEntry.getValue());
             }
-            BriefMQMessage briefMQMessage = new BriefMQMessage(index, 
headers.get(AttributeConstants.GROUP_ID),
-                    headers.get(AttributeConstants.STREAM_ID), 
messageObj.getMsgTime(),
-                    headers.get(CLIENT_IP),
-                    new String(messageObj.getBody().toByteArray(), 
Charset.forName(streamInfo.getDataEncoding())));
-            briefMQMessages.add(briefMQMessage);
+
+            BriefMQMessage message = BriefMQMessage.builder()
+                    .id(index)
+                    .inlongGroupId(headers.get(AttributeConstants.GROUP_ID))
+                    .inlongStreamId(headers.get(AttributeConstants.STREAM_ID))
+                    .dt(messageObj.getMsgTime())
+                    .clientIp(headers.get(CLIENT_IP))
+                    .body(new String(messageObj.getBody().toByteArray(), 
Charset.forName(streamInfo.getDataEncoding())))
+                    .build();
+            messageList.add(message);
         }
-        return briefMQMessages;
+
+        return messageList;
     }
 }
diff --git 
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/message/RawMsgDeserializeOperator.java
 
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/message/RawMsgDeserializeOperator.java
index de4b4a628b..cf334ebcc8 100644
--- 
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/message/RawMsgDeserializeOperator.java
+++ 
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/message/RawMsgDeserializeOperator.java
@@ -41,13 +41,12 @@ public class RawMsgDeserializeOperator implements 
DeserializeOperator {
 
     @Override
     public List<BriefMQMessage> decodeMsg(InlongStreamInfo streamInfo,
-            byte[] msgBytes, Map<String, String> headers, int index) throws 
Exception {
+            byte[] msgBytes, Map<String, String> headers, int index) {
         String groupId = headers.get(AttributeConstants.GROUP_ID);
         String streamId = headers.get(AttributeConstants.STREAM_ID);
         long msgTime = Long.parseLong(headers.getOrDefault(MSG_TIME_KEY, "0"));
-        return Collections
-                .singletonList(new BriefMQMessage(null, groupId, streamId, 
msgTime, headers.get(CLIENT_IP),
-                        new String(msgBytes, 
Charset.forName(streamInfo.getDataEncoding()))));
+        return Collections.singletonList(new BriefMQMessage(index, groupId, 
streamId, msgTime,
+                headers.get(CLIENT_IP), new String(msgBytes, 
Charset.forName(streamInfo.getDataEncoding()))));
     }
 
 }
diff --git 
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/queue/kafka/KafkaOperator.java
 
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/queue/kafka/KafkaOperator.java
index af67f02b10..82b2d7a03b 100644
--- 
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/queue/kafka/KafkaOperator.java
+++ 
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/queue/kafka/KafkaOperator.java
@@ -17,21 +17,48 @@
 
 package org.apache.inlong.manager.service.resource.queue.kafka;
 
+import org.apache.inlong.common.enums.DataProxyMsgEncType;
+import org.apache.inlong.manager.common.consts.InlongConstants;
+import org.apache.inlong.manager.common.exceptions.BusinessException;
 import org.apache.inlong.manager.pojo.cluster.kafka.KafkaClusterInfo;
+import org.apache.inlong.manager.pojo.consume.BriefMQMessage;
 import org.apache.inlong.manager.pojo.group.kafka.InlongKafkaInfo;
+import org.apache.inlong.manager.pojo.stream.InlongStreamInfo;
 import org.apache.inlong.manager.service.cluster.InlongClusterServiceImpl;
+import org.apache.inlong.manager.service.message.DeserializeOperator;
+import org.apache.inlong.manager.service.message.DeserializeOperatorFactory;
 
+import com.google.common.annotations.VisibleForTesting;
+import org.apache.commons.lang3.tuple.Pair;
 import org.apache.kafka.clients.admin.AdminClient;
 import org.apache.kafka.clients.admin.CreateTopicsResult;
 import org.apache.kafka.clients.admin.DeleteTopicsResult;
 import org.apache.kafka.clients.admin.NewTopic;
+import org.apache.kafka.clients.consumer.Consumer;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.clients.consumer.ConsumerRecords;
+import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.kafka.common.PartitionInfo;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.header.Header;
+import org.apache.kafka.common.serialization.ByteArrayDeserializer;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
+import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.stereotype.Service;
 
+import java.nio.charset.StandardCharsets;
+import java.time.Duration;
+import java.util.ArrayList;
 import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
 import java.util.Set;
 import java.util.concurrent.ExecutionException;
+import java.util.stream.Collectors;
 
 /**
  * kafka operator, supports creating topics and creating subscription.
@@ -41,6 +68,9 @@ public class KafkaOperator {
 
     private static final Logger LOGGER = 
LoggerFactory.getLogger(InlongClusterServiceImpl.class);
 
+    @Autowired
+    public DeserializeOperatorFactory deserializeOperatorFactory;
+
     /**
      * Create Kafka topic inlongKafkaInfo
      */
@@ -69,7 +99,7 @@ public class KafkaOperator {
     public void forceDeleteTopic(KafkaClusterInfo kafkaClusterInfo, String 
topicName) {
         AdminClient adminClient = KafkaUtils.getAdminClient(kafkaClusterInfo);
         DeleteTopicsResult result = 
adminClient.deleteTopics(Collections.singletonList(topicName));
-        LOGGER.info("success to delete topic={}", topicName);
+        LOGGER.info("success to delete topic={}, result: {}", topicName, 
result.all());
     }
 
     public boolean topicIsExists(KafkaClusterInfo kafkaClusterInfo, String 
topic)
@@ -79,4 +109,93 @@ public class KafkaOperator {
         return topicList.contains(topic);
     }
 
+    /**
+     * Query topic message for the given Kafka cluster.
+     */
+    public List<BriefMQMessage> queryLatestMessage(KafkaClusterInfo 
clusterInfo, String topicName,
+            String consumeGroup, Integer messageCount, InlongStreamInfo 
streamInfo) {
+        LOGGER.debug("begin to query message for topic {} in cluster: {}", 
topicName, clusterInfo);
+
+        Properties properties = getProperties(clusterInfo.getUrl(), 
consumeGroup);
+        KafkaConsumer<byte[], byte[]> consumer = new 
KafkaConsumer<>(properties);
+        return getLatestMessage(consumer, topicName, messageCount, streamInfo);
+    }
+
+    @VisibleForTesting
+    public List<BriefMQMessage> getLatestMessage(Consumer<byte[], byte[]> 
consumer, String topicName,
+            Integer messageCount, InlongStreamInfo streamInfo) {
+        List<BriefMQMessage> messageList = new ArrayList<>();
+
+        try {
+            List<PartitionInfo> partitionInfoList = 
consumer.partitionsFor(topicName);
+            List<TopicPartition> topicPartitionList = 
partitionInfoList.stream()
+                    .map(topicPartition -> new 
TopicPartition(topicPartition.topic(), topicPartition.partition()))
+                    .collect(Collectors.toList());
+
+            Map<TopicPartition, Long> beginningTopicPartitionList = 
consumer.beginningOffsets(topicPartitionList);
+            Map<TopicPartition, Long> endTopicPartitionList = 
consumer.endOffsets(topicPartitionList);
+
+            int count = (int) Math.ceil((double) messageCount / 
topicPartitionList.size());
+            Map<TopicPartition, Long> expectedOffsetMap = 
beginningTopicPartitionList.entrySet()
+                    .stream()
+                    .map(entry -> {
+                        long beginningOffset = entry.getValue();
+                        long endOffset = 
endTopicPartitionList.getOrDefault(entry.getKey(), beginningOffset);
+                        Long offset = (endOffset - beginningOffset) >= count ? 
(endOffset - count) : beginningOffset;
+                        return Pair.of(entry.getKey(), offset);
+                    }).collect(Collectors.toMap(Pair::getKey, Pair::getValue));
+
+            consumer.assign(topicPartitionList);
+            expectedOffsetMap.forEach(consumer::seek);
+
+            int index = 0;
+            ConsumerRecords<byte[], byte[]> records = 
consumer.poll(Duration.ofMillis(100));
+            for (ConsumerRecord<byte[], byte[]> record : records) {
+                Map<String, String> headers = new HashMap<>();
+                for (Header header : record.headers()) {
+                    headers.put(header.key(), new String(header.value(), 
StandardCharsets.UTF_8));
+                }
+
+                int wrapTypeId = 
Integer.parseInt(headers.getOrDefault(InlongConstants.MSG_ENCODE_VER,
+                        
Integer.toString(DataProxyMsgEncType.MSG_ENCODE_TYPE_INLONGMSG.getId())));
+                DeserializeOperator deserializeOperator = 
deserializeOperatorFactory.getInstance(
+                        DataProxyMsgEncType.valueOf(wrapTypeId));
+                messageList.addAll(deserializeOperator.decodeMsg(streamInfo, 
record.value(), headers, index));
+                if (messageList.size() >= messageCount) {
+                    break;
+                }
+            }
+        } catch (Exception e) {
+            String errMsg = "decode msg error: ";
+            LOGGER.error(errMsg, e);
+            throw new BusinessException(errMsg + e.getMessage());
+        } finally {
+            consumer.close();
+        }
+
+        LOGGER.debug("success query messages for topic={}, size={}, returned 
size={}",
+                topicName, messageList.size(), messageCount);
+        // only return a list of messages of the specified count
+        int fromIndex = (messageList.size() > messageCount) ? 
(messageList.size() - messageCount) : 0;
+        List<BriefMQMessage> resultList = messageList.subList(fromIndex, 
messageList.size());
+        for (int i = 0; i < resultList.size(); i++) {
+            BriefMQMessage message = resultList.get(i);
+            message.setId(i + 1);
+        }
+
+        return resultList;
+    }
+
+    /**
+     * Get a properties instance of consumer group.
+     */
+    private static Properties getProperties(String clusterUrl, String 
consumeGroup) {
+        Properties properties = new Properties();
+        properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, clusterUrl);
+        properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, 
ByteArrayDeserializer.class.getName());
+        properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, 
ByteArrayDeserializer.class.getName());
+        properties.put(ConsumerConfig.GROUP_ID_CONFIG, consumeGroup);
+
+        return properties;
+    }
 }
diff --git 
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/queue/kafka/KafkaResourceOperators.java
 
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/queue/kafka/KafkaQueueResourceOperator.java
similarity index 88%
rename from 
inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/queue/kafka/KafkaResourceOperators.java
rename to 
inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/queue/kafka/KafkaQueueResourceOperator.java
index d5d01c9315..bfc042b6de 100644
--- 
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/queue/kafka/KafkaResourceOperators.java
+++ 
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/queue/kafka/KafkaQueueResourceOperator.java
@@ -25,6 +25,7 @@ import 
org.apache.inlong.manager.common.exceptions.WorkflowListenerException;
 import org.apache.inlong.manager.common.util.Preconditions;
 import org.apache.inlong.manager.pojo.cluster.ClusterInfo;
 import org.apache.inlong.manager.pojo.cluster.kafka.KafkaClusterInfo;
+import org.apache.inlong.manager.pojo.consume.BriefMQMessage;
 import org.apache.inlong.manager.pojo.group.InlongGroupInfo;
 import org.apache.inlong.manager.pojo.group.kafka.InlongKafkaInfo;
 import org.apache.inlong.manager.pojo.stream.InlongStreamBriefInfo;
@@ -49,13 +50,15 @@ import java.util.List;
  */
 @Slf4j
 @Service
-public class KafkaResourceOperators implements QueueResourceOperator {
+public class KafkaQueueResourceOperator implements QueueResourceOperator {
 
     /**
      * The name rule for Kafka consumer group: 
clusterTag_topicName_consumer_group
      */
     public static final String KAFKA_CONSUMER_GROUP = "%s_%s_consumer_group";
 
+    public static final String KAFKA_CONSUMER_GROUP_REALTIME_REVIEW = 
"%s_%s_consumer_group_realtime_review";
+
     @Autowired
     private KafkaOperator kafkaOperator;
     @Autowired
@@ -184,4 +187,22 @@ public class KafkaResourceOperators implements 
QueueResourceOperator {
         ClusterInfo clusterInfo = 
clusterService.getOne(groupInfo.getInlongClusterTag(), null, ClusterType.KAFKA);
         kafkaOperator.forceDeleteTopic((KafkaClusterInfo) clusterInfo, 
topicName);
     }
+
+    @Override
+    public List<BriefMQMessage> queryLatestMessages(InlongGroupInfo groupInfo, 
InlongStreamInfo streamInfo,
+            Integer messageCount) {
+        ClusterInfo clusterInfo = 
clusterService.getOne(groupInfo.getInlongClusterTag(), null, ClusterType.KAFKA);
+
+        String topicName = streamInfo.getMqResource();
+        if (topicName.equals(streamInfo.getInlongStreamId())) {
+            // the default mq resource (stream id) is not sufficient to 
discriminate different kafka topics
+            topicName = String.format(Constants.DEFAULT_KAFKA_TOPIC_FORMAT,
+                    groupInfo.getMqResource(), streamInfo.getMqResource());
+        }
+
+        String consumeGroup =
+                String.format(KAFKA_CONSUMER_GROUP_REALTIME_REVIEW, 
groupInfo.getInlongClusterTag(), topicName);
+        return kafkaOperator.queryLatestMessage((KafkaClusterInfo) 
clusterInfo, topicName, consumeGroup, messageCount,
+                streamInfo);
+    }
 }
diff --git 
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/queue/pulsar/PulsarOperator.java
 
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/queue/pulsar/PulsarOperator.java
index 227a95bb3c..470ec417b4 100644
--- 
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/queue/pulsar/PulsarOperator.java
+++ 
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/queue/pulsar/PulsarOperator.java
@@ -405,7 +405,6 @@ public class PulsarOperator {
                 String errMsg = "decode msg error: ";
                 LOGGER.error(errMsg, e);
                 throw new BusinessException(errMsg + e.getMessage());
-
             }
         }
 
diff --git 
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/queue/pulsar/PulsarResourceOperator.java
 
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/queue/pulsar/PulsarQueueResourceOperator.java
similarity index 99%
rename from 
inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/queue/pulsar/PulsarResourceOperator.java
rename to 
inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/queue/pulsar/PulsarQueueResourceOperator.java
index f08faeaced..0746fac320 100644
--- 
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/queue/pulsar/PulsarResourceOperator.java
+++ 
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/queue/pulsar/PulsarQueueResourceOperator.java
@@ -57,7 +57,7 @@ import java.util.List;
  */
 @Slf4j
 @Service
-public class PulsarResourceOperator implements QueueResourceOperator {
+public class PulsarQueueResourceOperator implements QueueResourceOperator {
 
     /**
      * The name rule for Pulsar subscription: 
clusterTag_topicName_sinkId_consumer_group
diff --git 
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/queue/tubemq/TubeMQResourceOperator.java
 
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/queue/tubemq/TubeMQQueueResourceOperator.java
similarity index 98%
rename from 
inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/queue/tubemq/TubeMQResourceOperator.java
rename to 
inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/queue/tubemq/TubeMQQueueResourceOperator.java
index 1baa970179..0fa47ea3eb 100644
--- 
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/queue/tubemq/TubeMQResourceOperator.java
+++ 
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/queue/tubemq/TubeMQQueueResourceOperator.java
@@ -43,7 +43,7 @@ import java.util.List;
  */
 @Slf4j
 @Service
-public class TubeMQResourceOperator implements QueueResourceOperator {
+public class TubeMQQueueResourceOperator implements QueueResourceOperator {
 
     @Autowired
     private InlongClusterService clusterService;
diff --git 
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/kafka/KafkaSourceOperator.java
 
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/kafka/KafkaSourceOperator.java
index f2d404f552..0037170e79 100644
--- 
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/kafka/KafkaSourceOperator.java
+++ 
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/kafka/KafkaSourceOperator.java
@@ -50,7 +50,7 @@ import java.util.List;
 import java.util.Map;
 import java.util.Objects;
 
-import static 
org.apache.inlong.manager.service.resource.queue.kafka.KafkaResourceOperators.KAFKA_CONSUMER_GROUP;
+import static 
org.apache.inlong.manager.service.resource.queue.kafka.KafkaQueueResourceOperator.KAFKA_CONSUMER_GROUP;
 
 /**
  * kafka stream source operator
diff --git 
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/pulsar/PulsarSourceOperator.java
 
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/pulsar/PulsarSourceOperator.java
index c93d236954..6aa998962a 100644
--- 
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/pulsar/PulsarSourceOperator.java
+++ 
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/pulsar/PulsarSourceOperator.java
@@ -53,7 +53,7 @@ import org.springframework.stereotype.Service;
 import java.util.List;
 import java.util.Map;
 
-import static 
org.apache.inlong.manager.service.resource.queue.pulsar.PulsarResourceOperator.PULSAR_SUBSCRIPTION;
+import static 
org.apache.inlong.manager.service.resource.queue.pulsar.PulsarQueueResourceOperator.PULSAR_SUBSCRIPTION;
 
 /**
  * Pulsar stream source operator
diff --git 
a/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/resource/queue/kafka/KafkaOperatorTest.java
 
b/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/resource/queue/kafka/KafkaOperatorTest.java
new file mode 100644
index 0000000000..d2cf618b3d
--- /dev/null
+++ 
b/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/resource/queue/kafka/KafkaOperatorTest.java
@@ -0,0 +1,146 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.manager.service.resource.queue.kafka;
+
+import org.apache.inlong.common.msg.InLongMsg;
+import org.apache.inlong.manager.pojo.consume.BriefMQMessage;
+import org.apache.inlong.manager.pojo.stream.InlongStreamInfo;
+import org.apache.inlong.manager.service.ServiceBaseTest;
+
+import com.google.common.collect.Lists;
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.clients.consumer.MockConsumer;
+import org.apache.kafka.clients.consumer.OffsetResetStrategy;
+import org.apache.kafka.common.PartitionInfo;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.header.internals.RecordHeaders;
+import org.apache.kafka.common.record.TimestampType;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.springframework.beans.factory.annotation.Autowired;
+
+import java.nio.charset.StandardCharsets;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
+
+/**
+ * Test for {@link KafkaOperator}
+ */
+public class KafkaOperatorTest extends ServiceBaseTest {
+
+    private static final String TOPIC_NAME = "test";
+    private static final int PARTITION_NUM = 5;
+
+    private final MockConsumer<byte[], byte[]> consumer = new 
MockConsumer<>(OffsetResetStrategy.EARLIEST);
+
+    private final InlongStreamInfo streamInfo = new InlongStreamInfo();
+
+    @Autowired
+    private KafkaOperator kafkaOperator;
+
+    public byte[] buildInlongMessage(String message) {
+        InLongMsg inLongMsg = InLongMsg.newInLongMsg();
+        String attr = "t=20230728";
+        inLongMsg.addMsg(attr, message.getBytes(StandardCharsets.UTF_8));
+        return inLongMsg.buildArray();
+    }
+
+    public void addOnceRecord(int partition, long offset, String message) {
+        ConsumerRecord<byte[], byte[]> record =
+                new ConsumerRecord<>(TOPIC_NAME, partition, offset, 0L, 
TimestampType.CREATE_TIME,
+                        0L, 0, 0, null, buildInlongMessage(message), new 
RecordHeaders(), Optional.empty());
+        consumer.addRecord(record);
+
+        HashMap<TopicPartition, Long> endingOffsets = new HashMap<>();
+        endingOffsets.put(new TopicPartition(TOPIC_NAME, partition), offset);
+        consumer.updateEndOffsets(endingOffsets);
+    }
+
+    public void addRecord(List<String> messages) {
+        List<List<String>> partitions =
+                Lists.partition(messages, (int) Math.ceil((double) 
messages.size() / PARTITION_NUM));
+
+        IntStream.range(0, partitions.size()).forEach(index -> {
+            List<String> partitionItems = partitions.get(index);
+            for (int offset = 0; offset < partitionItems.size(); offset++) {
+                addOnceRecord(index, offset + 1, partitionItems.get(offset));
+            }
+        });
+    }
+
+    @BeforeEach
+    public void setUp() {
+        streamInfo.setDataEncoding("UTF-8");
+
+        List<TopicPartition> topicPartitions = IntStream.range(0, 
PARTITION_NUM)
+                .mapToObj(i -> new TopicPartition(TOPIC_NAME, 
i)).collect(Collectors.toList());
+        consumer.assign(topicPartitions);
+
+        List<PartitionInfo> partitions = IntStream.range(0, PARTITION_NUM)
+                .mapToObj(i -> new PartitionInfo(TOPIC_NAME, i, null, null, 
null)).collect(Collectors.toList());
+        consumer.updatePartitions(TOPIC_NAME, partitions);
+
+        Map<TopicPartition, Long> offsets = topicPartitions.stream()
+                .collect(Collectors.toMap(Function.identity(), t -> 0L));
+        consumer.updateBeginningOffsets(offsets);
+        consumer.updateEndOffsets(offsets);
+
+        topicPartitions.forEach(item -> consumer.seek(item, 0));
+    }
+
+    @Test
+    void testGetKafkaLatestMessage() {
+        List<BriefMQMessage> messages = 
kafkaOperator.getLatestMessage(consumer, TOPIC_NAME, 10, streamInfo);
+        Assertions.assertEquals(0, messages.size());
+    }
+
+    @Test
+    void testGetKafkaLatestMessage_1() {
+        addRecord(Collections.singletonList("inlong"));
+
+        List<BriefMQMessage> messages = 
kafkaOperator.getLatestMessage(consumer, TOPIC_NAME, 10, streamInfo);
+        Assertions.assertEquals(1, messages.size());
+        Assertions.assertEquals("inlong", messages.get(0).getBody());
+    }
+
+    @Test
+    void testGetKafkaLatestMessage_2() {
+        List<String> records = IntStream.range(0, 9).mapToObj(index -> "name_" 
+ index).collect(Collectors.toList());
+        addRecord(records);
+
+        List<BriefMQMessage> messages = 
kafkaOperator.getLatestMessage(consumer, TOPIC_NAME, 10, streamInfo);
+        Assertions.assertEquals(9, messages.size());
+    }
+
+    @Test
+    void testGetKafkaLatestMessage_4() {
+        List<String> records = IntStream.range(0, 21).mapToObj(index -> 
"name_" + index).collect(Collectors.toList());
+        addRecord(records);
+
+        List<BriefMQMessage> messages = 
kafkaOperator.getLatestMessage(consumer, TOPIC_NAME, 10, streamInfo);
+        Assertions.assertEquals(10, messages.size());
+    }
+
+}


Reply via email to