This is an automated email from the ASF dual-hosted git repository.
dockerzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git
The following commit(s) were added to refs/heads/master by this push:
new 90c89f1f9b [INLONG-8360][Manager] Support previewing data of Kafka
(#8588)
90c89f1f9b is described below
commit 90c89f1f9ba21a1a5b65ae9519b70bac490efa16
Author: Hao <[email protected]>
AuthorDate: Mon Jul 31 16:10:24 2023 +0800
[INLONG-8360][Manager] Support previewing data of Kafka (#8588)
Co-authored-by: healchow <[email protected]>
---
.../manager/plugin/flink/FlinkOperation.java | 6 +-
.../manager/pojo/consume/BriefMQMessage.java | 8 +-
.../message/InlongMsgDeserializeOperator.java | 21 ++-
.../service/message/PbMsgDeserializeOperator.java | 25 ++--
.../service/message/RawMsgDeserializeOperator.java | 7 +-
.../resource/queue/kafka/KafkaOperator.java | 121 ++++++++++++++++-
...rators.java => KafkaQueueResourceOperator.java} | 23 +++-
.../resource/queue/pulsar/PulsarOperator.java | 1 -
...rator.java => PulsarQueueResourceOperator.java} | 2 +-
...rator.java => TubeMQQueueResourceOperator.java} | 2 +-
.../service/source/kafka/KafkaSourceOperator.java | 2 +-
.../source/pulsar/PulsarSourceOperator.java | 2 +-
.../resource/queue/kafka/KafkaOperatorTest.java | 146 +++++++++++++++++++++
13 files changed, 327 insertions(+), 39 deletions(-)
diff --git
a/inlong-manager/manager-plugins/base/src/main/java/org/apache/inlong/manager/plugin/flink/FlinkOperation.java
b/inlong-manager/manager-plugins/base/src/main/java/org/apache/inlong/manager/plugin/flink/FlinkOperation.java
index 817a9f8197..3ae90e6036 100644
---
a/inlong-manager/manager-plugins/base/src/main/java/org/apache/inlong/manager/plugin/flink/FlinkOperation.java
+++
b/inlong-manager/manager-plugins/base/src/main/java/org/apache/inlong/manager/plugin/flink/FlinkOperation.java
@@ -313,13 +313,13 @@ public class FlinkOperation {
String jobId = flinkInfo.getJobId();
JobDetailsInfo jobDetailsInfo = flinkService.getJobDetail(jobId);
if (jobDetailsInfo == null) {
- throw new Exception(String.format("delete job failed as the job
not found for %s", jobId));
+ throw new Exception(String.format("delete job failed as the job
[%s] not found", jobId));
}
JobStatus jobStatus = jobDetailsInfo.getJobStatus();
if (jobStatus != null && jobStatus.isTerminalState()) {
- String message = String.format("not support delete %s as the task
was terminated", jobId);
- message = jobStatus.isGloballyTerminalState() ? message + "
globally" : " locally";
+ String stateName = jobStatus.isGloballyTerminalState() ?
"globally" : "locally";
+ String message = String.format("unsupported delete job [%s] as it
was [%s] terminated", jobId, stateName);
throw new Exception(message);
}
diff --git
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/consume/BriefMQMessage.java
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/consume/BriefMQMessage.java
index a09c3c0b0c..51085c5570 100644
---
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/consume/BriefMQMessage.java
+++
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/consume/BriefMQMessage.java
@@ -25,16 +25,16 @@ import lombok.Data;
import lombok.NoArgsConstructor;
/**
- * Inlong display message info
+ * Brief Message info for MQ
*/
@Data
@Builder
@NoArgsConstructor
@AllArgsConstructor
-@ApiModel("Inlong brief mq message info")
+@ApiModel("Brief Message info for MQ")
public class BriefMQMessage {
- @ApiModelProperty(value = "index id")
+ @ApiModelProperty(value = "Message index id")
private Integer id;
@ApiModelProperty(value = "Inlong group id")
@@ -43,7 +43,7 @@ public class BriefMQMessage {
@ApiModelProperty(value = "Inlong stream id")
private String inlongStreamId;
- @ApiModelProperty(value = "Date")
+ @ApiModelProperty(value = "Message date")
private Long dt;
@ApiModelProperty(value = "Client ip")
diff --git
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/message/InlongMsgDeserializeOperator.java
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/message/InlongMsgDeserializeOperator.java
index 9ae409425b..b5165eb8bf 100644
---
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/message/InlongMsgDeserializeOperator.java
+++
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/message/InlongMsgDeserializeOperator.java
@@ -44,22 +44,22 @@ public class InlongMsgDeserializeOperator implements
DeserializeOperator {
}
@Override
- public List<BriefMQMessage> decodeMsg(InlongStreamInfo streamInfo,
- byte[] msgBytes, Map<String, String> headers, int index) throws
Exception {
+ public List<BriefMQMessage> decodeMsg(InlongStreamInfo streamInfo, byte[]
msgBytes, Map<String, String> headers,
+ int index) {
String groupId = headers.get(AttributeConstants.GROUP_ID);
String streamId = headers.get(AttributeConstants.STREAM_ID);
List<BriefMQMessage> messageList = new ArrayList<>();
InLongMsg inLongMsg = InLongMsg.parseFrom(msgBytes);
for (String attr : inLongMsg.getAttrs()) {
- Map<String, String> attributes = StringUtil.splitKv(attr,
INLONGMSG_ATTR_ENTRY_DELIMITER,
+ Map<String, String> attrMap = StringUtil.splitKv(attr,
INLONGMSG_ATTR_ENTRY_DELIMITER,
INLONGMSG_ATTR_KV_DELIMITER, null, null);
// Extracts time from the attributes
long msgTime;
- if (attributes.containsKey(INLONGMSG_ATTR_TIME_T)) {
- String date = attributes.get(INLONGMSG_ATTR_TIME_T).trim();
+ if (attrMap.containsKey(INLONGMSG_ATTR_TIME_T)) {
+ String date = attrMap.get(INLONGMSG_ATTR_TIME_T).trim();
msgTime = StringUtil.parseDateTime(date);
- } else if (attributes.containsKey(INLONGMSG_ATTR_TIME_DT)) {
- String epoch = attributes.get(INLONGMSG_ATTR_TIME_DT).trim();
+ } else if (attrMap.containsKey(INLONGMSG_ATTR_TIME_DT)) {
+ String epoch = attrMap.get(INLONGMSG_ATTR_TIME_DT).trim();
msgTime = Long.parseLong(epoch);
} else {
throw new
IllegalArgumentException(String.format("PARSE_ATTR_ERROR_STRING%s",
@@ -71,10 +71,9 @@ public class InlongMsgDeserializeOperator implements
DeserializeOperator {
if (Objects.isNull(bodyBytes)) {
continue;
}
- BriefMQMessage inLongMessage =
- new BriefMQMessage(index, groupId, streamId, msgTime,
attributes.get(CLIENT_IP),
- new String(bodyBytes,
Charset.forName(streamInfo.getDataEncoding())));
- messageList.add(inLongMessage);
+ BriefMQMessage message = new BriefMQMessage(index, groupId,
streamId, msgTime, attrMap.get(CLIENT_IP),
+ new String(bodyBytes,
Charset.forName(streamInfo.getDataEncoding())));
+ messageList.add(message);
}
}
return messageList;
diff --git
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/message/PbMsgDeserializeOperator.java
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/message/PbMsgDeserializeOperator.java
index 8a9320237d..19742325f8 100644
---
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/message/PbMsgDeserializeOperator.java
+++
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/message/PbMsgDeserializeOperator.java
@@ -48,7 +48,6 @@ public class PbMsgDeserializeOperator implements
DeserializeOperator {
@Override
public List<BriefMQMessage> decodeMsg(InlongStreamInfo streamInfo,
byte[] msgBytes, Map<String, String> headers, int index) throws
Exception {
- List<BriefMQMessage> messageList = new ArrayList<>();
int compressType =
Integer.parseInt(headers.getOrDefault(COMPRESS_TYPE_KEY, "0"));
byte[] values = msgBytes;
switch (compressType) {
@@ -63,27 +62,33 @@ public class PbMsgDeserializeOperator implements
DeserializeOperator {
default:
throw new IllegalArgumentException("Unknown compress type:" +
compressType);
}
- messageList = transformMessageObjs(MessageObjs.parseFrom(values),
streamInfo, index);
- return messageList;
+ return transformMessageObjs(MessageObjs.parseFrom(values), streamInfo,
index);
}
private List<BriefMQMessage> transformMessageObjs(MessageObjs messageObjs,
InlongStreamInfo streamInfo, int index) {
if (null == messageObjs) {
return null;
}
- List<BriefMQMessage> briefMQMessages = new ArrayList<>();
+
+ List<BriefMQMessage> messageList = new ArrayList<>();
for (MessageObj messageObj : messageObjs.getMsgsList()) {
List<MapFieldEntry> mapFieldEntries = messageObj.getParamsList();
Map<String, String> headers = new HashMap<>();
for (MapFieldEntry mapFieldEntry : mapFieldEntries) {
headers.put(mapFieldEntry.getKey(), mapFieldEntry.getValue());
}
- BriefMQMessage briefMQMessage = new BriefMQMessage(index,
headers.get(AttributeConstants.GROUP_ID),
- headers.get(AttributeConstants.STREAM_ID),
messageObj.getMsgTime(),
- headers.get(CLIENT_IP),
- new String(messageObj.getBody().toByteArray(),
Charset.forName(streamInfo.getDataEncoding())));
- briefMQMessages.add(briefMQMessage);
+
+ BriefMQMessage message = BriefMQMessage.builder()
+ .id(index)
+ .inlongGroupId(headers.get(AttributeConstants.GROUP_ID))
+ .inlongStreamId(headers.get(AttributeConstants.STREAM_ID))
+ .dt(messageObj.getMsgTime())
+ .clientIp(headers.get(CLIENT_IP))
+ .body(new String(messageObj.getBody().toByteArray(),
Charset.forName(streamInfo.getDataEncoding())))
+ .build();
+ messageList.add(message);
}
- return briefMQMessages;
+
+ return messageList;
}
}
diff --git
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/message/RawMsgDeserializeOperator.java
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/message/RawMsgDeserializeOperator.java
index de4b4a628b..cf334ebcc8 100644
---
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/message/RawMsgDeserializeOperator.java
+++
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/message/RawMsgDeserializeOperator.java
@@ -41,13 +41,12 @@ public class RawMsgDeserializeOperator implements
DeserializeOperator {
@Override
public List<BriefMQMessage> decodeMsg(InlongStreamInfo streamInfo,
- byte[] msgBytes, Map<String, String> headers, int index) throws
Exception {
+ byte[] msgBytes, Map<String, String> headers, int index) {
String groupId = headers.get(AttributeConstants.GROUP_ID);
String streamId = headers.get(AttributeConstants.STREAM_ID);
long msgTime = Long.parseLong(headers.getOrDefault(MSG_TIME_KEY, "0"));
- return Collections
- .singletonList(new BriefMQMessage(null, groupId, streamId,
msgTime, headers.get(CLIENT_IP),
- new String(msgBytes,
Charset.forName(streamInfo.getDataEncoding()))));
+ return Collections.singletonList(new BriefMQMessage(index, groupId,
streamId, msgTime,
+ headers.get(CLIENT_IP), new String(msgBytes,
Charset.forName(streamInfo.getDataEncoding()))));
}
}
diff --git
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/queue/kafka/KafkaOperator.java
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/queue/kafka/KafkaOperator.java
index af67f02b10..82b2d7a03b 100644
---
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/queue/kafka/KafkaOperator.java
+++
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/queue/kafka/KafkaOperator.java
@@ -17,21 +17,48 @@
package org.apache.inlong.manager.service.resource.queue.kafka;
+import org.apache.inlong.common.enums.DataProxyMsgEncType;
+import org.apache.inlong.manager.common.consts.InlongConstants;
+import org.apache.inlong.manager.common.exceptions.BusinessException;
import org.apache.inlong.manager.pojo.cluster.kafka.KafkaClusterInfo;
+import org.apache.inlong.manager.pojo.consume.BriefMQMessage;
import org.apache.inlong.manager.pojo.group.kafka.InlongKafkaInfo;
+import org.apache.inlong.manager.pojo.stream.InlongStreamInfo;
import org.apache.inlong.manager.service.cluster.InlongClusterServiceImpl;
+import org.apache.inlong.manager.service.message.DeserializeOperator;
+import org.apache.inlong.manager.service.message.DeserializeOperatorFactory;
+import com.google.common.annotations.VisibleForTesting;
+import org.apache.commons.lang3.tuple.Pair;
import org.apache.kafka.clients.admin.AdminClient;
import org.apache.kafka.clients.admin.CreateTopicsResult;
import org.apache.kafka.clients.admin.DeleteTopicsResult;
import org.apache.kafka.clients.admin.NewTopic;
+import org.apache.kafka.clients.consumer.Consumer;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.clients.consumer.ConsumerRecords;
+import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.kafka.common.PartitionInfo;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.header.Header;
+import org.apache.kafka.common.serialization.ByteArrayDeserializer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
+import java.nio.charset.StandardCharsets;
+import java.time.Duration;
+import java.util.ArrayList;
import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
import java.util.Set;
import java.util.concurrent.ExecutionException;
+import java.util.stream.Collectors;
/**
* kafka operator, supports creating topics and creating subscription.
@@ -41,6 +68,9 @@ public class KafkaOperator {
private static final Logger LOGGER =
LoggerFactory.getLogger(InlongClusterServiceImpl.class);
+ @Autowired
+ public DeserializeOperatorFactory deserializeOperatorFactory;
+
/**
* Create Kafka topic inlongKafkaInfo
*/
@@ -69,7 +99,7 @@ public class KafkaOperator {
public void forceDeleteTopic(KafkaClusterInfo kafkaClusterInfo, String
topicName) {
AdminClient adminClient = KafkaUtils.getAdminClient(kafkaClusterInfo);
DeleteTopicsResult result =
adminClient.deleteTopics(Collections.singletonList(topicName));
- LOGGER.info("success to delete topic={}", topicName);
+ LOGGER.info("success to delete topic={}, result: {}", topicName,
result.all());
}
public boolean topicIsExists(KafkaClusterInfo kafkaClusterInfo, String
topic)
@@ -79,4 +109,93 @@ public class KafkaOperator {
return topicList.contains(topic);
}
+ /**
+ * Query topic message for the given Kafka cluster.
+ */
+ public List<BriefMQMessage> queryLatestMessage(KafkaClusterInfo
clusterInfo, String topicName,
+ String consumeGroup, Integer messageCount, InlongStreamInfo
streamInfo) {
+ LOGGER.debug("begin to query message for topic {} in cluster: {}",
topicName, clusterInfo);
+
+ Properties properties = getProperties(clusterInfo.getUrl(),
consumeGroup);
+ KafkaConsumer<byte[], byte[]> consumer = new
KafkaConsumer<>(properties);
+ return getLatestMessage(consumer, topicName, messageCount, streamInfo);
+ }
+
+ @VisibleForTesting
+ public List<BriefMQMessage> getLatestMessage(Consumer<byte[], byte[]>
consumer, String topicName,
+ Integer messageCount, InlongStreamInfo streamInfo) {
+ List<BriefMQMessage> messageList = new ArrayList<>();
+
+ try {
+ List<PartitionInfo> partitionInfoList =
consumer.partitionsFor(topicName);
+ List<TopicPartition> topicPartitionList =
partitionInfoList.stream()
+ .map(topicPartition -> new
TopicPartition(topicPartition.topic(), topicPartition.partition()))
+ .collect(Collectors.toList());
+
+ Map<TopicPartition, Long> beginningTopicPartitionList =
consumer.beginningOffsets(topicPartitionList);
+ Map<TopicPartition, Long> endTopicPartitionList =
consumer.endOffsets(topicPartitionList);
+
+ int count = (int) Math.ceil((double) messageCount /
topicPartitionList.size());
+ Map<TopicPartition, Long> expectedOffsetMap =
beginningTopicPartitionList.entrySet()
+ .stream()
+ .map(entry -> {
+ long beginningOffset = entry.getValue();
+ long endOffset =
endTopicPartitionList.getOrDefault(entry.getKey(), beginningOffset);
+ Long offset = (endOffset - beginningOffset) >= count ?
(endOffset - count) : beginningOffset;
+ return Pair.of(entry.getKey(), offset);
+ }).collect(Collectors.toMap(Pair::getKey, Pair::getValue));
+
+ consumer.assign(topicPartitionList);
+ expectedOffsetMap.forEach(consumer::seek);
+
+ int index = 0;
+ ConsumerRecords<byte[], byte[]> records =
consumer.poll(Duration.ofMillis(100));
+ for (ConsumerRecord<byte[], byte[]> record : records) {
+ Map<String, String> headers = new HashMap<>();
+ for (Header header : record.headers()) {
+ headers.put(header.key(), new String(header.value(),
StandardCharsets.UTF_8));
+ }
+
+ int wrapTypeId =
Integer.parseInt(headers.getOrDefault(InlongConstants.MSG_ENCODE_VER,
+
Integer.toString(DataProxyMsgEncType.MSG_ENCODE_TYPE_INLONGMSG.getId())));
+ DeserializeOperator deserializeOperator =
deserializeOperatorFactory.getInstance(
+ DataProxyMsgEncType.valueOf(wrapTypeId));
+ messageList.addAll(deserializeOperator.decodeMsg(streamInfo,
record.value(), headers, index));
+ if (messageList.size() >= messageCount) {
+ break;
+ }
+ }
+ } catch (Exception e) {
+ String errMsg = "decode msg error: ";
+ LOGGER.error(errMsg, e);
+ throw new BusinessException(errMsg + e.getMessage());
+ } finally {
+ consumer.close();
+ }
+
+ LOGGER.debug("success query messages for topic={}, size={}, returned
size={}",
+ topicName, messageList.size(), messageCount);
+ // only return a list of messages of the specified count
+ int fromIndex = (messageList.size() > messageCount) ?
(messageList.size() - messageCount) : 0;
+ List<BriefMQMessage> resultList = messageList.subList(fromIndex,
messageList.size());
+ for (int i = 0; i < resultList.size(); i++) {
+ BriefMQMessage message = resultList.get(i);
+ message.setId(i + 1);
+ }
+
+ return resultList;
+ }
+
+ /**
+ * Get a properties instance of consumer group.
+ */
+ private static Properties getProperties(String clusterUrl, String
consumeGroup) {
+ Properties properties = new Properties();
+ properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, clusterUrl);
+ properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
ByteArrayDeserializer.class.getName());
+ properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
ByteArrayDeserializer.class.getName());
+ properties.put(ConsumerConfig.GROUP_ID_CONFIG, consumeGroup);
+
+ return properties;
+ }
}
diff --git
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/queue/kafka/KafkaResourceOperators.java
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/queue/kafka/KafkaQueueResourceOperator.java
similarity index 88%
rename from
inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/queue/kafka/KafkaResourceOperators.java
rename to
inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/queue/kafka/KafkaQueueResourceOperator.java
index d5d01c9315..bfc042b6de 100644
---
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/queue/kafka/KafkaResourceOperators.java
+++
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/queue/kafka/KafkaQueueResourceOperator.java
@@ -25,6 +25,7 @@ import
org.apache.inlong.manager.common.exceptions.WorkflowListenerException;
import org.apache.inlong.manager.common.util.Preconditions;
import org.apache.inlong.manager.pojo.cluster.ClusterInfo;
import org.apache.inlong.manager.pojo.cluster.kafka.KafkaClusterInfo;
+import org.apache.inlong.manager.pojo.consume.BriefMQMessage;
import org.apache.inlong.manager.pojo.group.InlongGroupInfo;
import org.apache.inlong.manager.pojo.group.kafka.InlongKafkaInfo;
import org.apache.inlong.manager.pojo.stream.InlongStreamBriefInfo;
@@ -49,13 +50,15 @@ import java.util.List;
*/
@Slf4j
@Service
-public class KafkaResourceOperators implements QueueResourceOperator {
+public class KafkaQueueResourceOperator implements QueueResourceOperator {
/**
* The name rule for Kafka consumer group:
clusterTag_topicName_consumer_group
*/
public static final String KAFKA_CONSUMER_GROUP = "%s_%s_consumer_group";
+ public static final String KAFKA_CONSUMER_GROUP_REALTIME_REVIEW =
"%s_%s_consumer_group_realtime_review";
+
@Autowired
private KafkaOperator kafkaOperator;
@Autowired
@@ -184,4 +187,22 @@ public class KafkaResourceOperators implements
QueueResourceOperator {
ClusterInfo clusterInfo =
clusterService.getOne(groupInfo.getInlongClusterTag(), null, ClusterType.KAFKA);
kafkaOperator.forceDeleteTopic((KafkaClusterInfo) clusterInfo,
topicName);
}
+
+ @Override
+ public List<BriefMQMessage> queryLatestMessages(InlongGroupInfo groupInfo,
InlongStreamInfo streamInfo,
+ Integer messageCount) {
+ ClusterInfo clusterInfo =
clusterService.getOne(groupInfo.getInlongClusterTag(), null, ClusterType.KAFKA);
+
+ String topicName = streamInfo.getMqResource();
+ if (topicName.equals(streamInfo.getInlongStreamId())) {
+ // the default mq resource (stream id) is not sufficient to
discriminate different kafka topics
+ topicName = String.format(Constants.DEFAULT_KAFKA_TOPIC_FORMAT,
+ groupInfo.getMqResource(), streamInfo.getMqResource());
+ }
+
+ String consumeGroup =
+ String.format(KAFKA_CONSUMER_GROUP_REALTIME_REVIEW,
groupInfo.getInlongClusterTag(), topicName);
+ return kafkaOperator.queryLatestMessage((KafkaClusterInfo)
clusterInfo, topicName, consumeGroup, messageCount,
+ streamInfo);
+ }
}
diff --git
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/queue/pulsar/PulsarOperator.java
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/queue/pulsar/PulsarOperator.java
index 227a95bb3c..470ec417b4 100644
---
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/queue/pulsar/PulsarOperator.java
+++
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/queue/pulsar/PulsarOperator.java
@@ -405,7 +405,6 @@ public class PulsarOperator {
String errMsg = "decode msg error: ";
LOGGER.error(errMsg, e);
throw new BusinessException(errMsg + e.getMessage());
-
}
}
diff --git
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/queue/pulsar/PulsarResourceOperator.java
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/queue/pulsar/PulsarQueueResourceOperator.java
similarity index 99%
rename from
inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/queue/pulsar/PulsarResourceOperator.java
rename to
inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/queue/pulsar/PulsarQueueResourceOperator.java
index f08faeaced..0746fac320 100644
---
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/queue/pulsar/PulsarResourceOperator.java
+++
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/queue/pulsar/PulsarQueueResourceOperator.java
@@ -57,7 +57,7 @@ import java.util.List;
*/
@Slf4j
@Service
-public class PulsarResourceOperator implements QueueResourceOperator {
+public class PulsarQueueResourceOperator implements QueueResourceOperator {
/**
* The name rule for Pulsar subscription:
clusterTag_topicName_sinkId_consumer_group
diff --git
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/queue/tubemq/TubeMQResourceOperator.java
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/queue/tubemq/TubeMQQueueResourceOperator.java
similarity index 98%
rename from
inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/queue/tubemq/TubeMQResourceOperator.java
rename to
inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/queue/tubemq/TubeMQQueueResourceOperator.java
index 1baa970179..0fa47ea3eb 100644
---
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/queue/tubemq/TubeMQResourceOperator.java
+++
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/queue/tubemq/TubeMQQueueResourceOperator.java
@@ -43,7 +43,7 @@ import java.util.List;
*/
@Slf4j
@Service
-public class TubeMQResourceOperator implements QueueResourceOperator {
+public class TubeMQQueueResourceOperator implements QueueResourceOperator {
@Autowired
private InlongClusterService clusterService;
diff --git
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/kafka/KafkaSourceOperator.java
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/kafka/KafkaSourceOperator.java
index f2d404f552..0037170e79 100644
---
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/kafka/KafkaSourceOperator.java
+++
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/kafka/KafkaSourceOperator.java
@@ -50,7 +50,7 @@ import java.util.List;
import java.util.Map;
import java.util.Objects;
-import static
org.apache.inlong.manager.service.resource.queue.kafka.KafkaResourceOperators.KAFKA_CONSUMER_GROUP;
+import static
org.apache.inlong.manager.service.resource.queue.kafka.KafkaQueueResourceOperator.KAFKA_CONSUMER_GROUP;
/**
* kafka stream source operator
diff --git
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/pulsar/PulsarSourceOperator.java
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/pulsar/PulsarSourceOperator.java
index c93d236954..6aa998962a 100644
---
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/pulsar/PulsarSourceOperator.java
+++
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/pulsar/PulsarSourceOperator.java
@@ -53,7 +53,7 @@ import org.springframework.stereotype.Service;
import java.util.List;
import java.util.Map;
-import static
org.apache.inlong.manager.service.resource.queue.pulsar.PulsarResourceOperator.PULSAR_SUBSCRIPTION;
+import static
org.apache.inlong.manager.service.resource.queue.pulsar.PulsarQueueResourceOperator.PULSAR_SUBSCRIPTION;
/**
* Pulsar stream source operator
diff --git
a/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/resource/queue/kafka/KafkaOperatorTest.java
b/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/resource/queue/kafka/KafkaOperatorTest.java
new file mode 100644
index 0000000000..d2cf618b3d
--- /dev/null
+++
b/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/resource/queue/kafka/KafkaOperatorTest.java
@@ -0,0 +1,146 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.manager.service.resource.queue.kafka;
+
+import org.apache.inlong.common.msg.InLongMsg;
+import org.apache.inlong.manager.pojo.consume.BriefMQMessage;
+import org.apache.inlong.manager.pojo.stream.InlongStreamInfo;
+import org.apache.inlong.manager.service.ServiceBaseTest;
+
+import com.google.common.collect.Lists;
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.clients.consumer.MockConsumer;
+import org.apache.kafka.clients.consumer.OffsetResetStrategy;
+import org.apache.kafka.common.PartitionInfo;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.header.internals.RecordHeaders;
+import org.apache.kafka.common.record.TimestampType;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.springframework.beans.factory.annotation.Autowired;
+
+import java.nio.charset.StandardCharsets;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
+
+/**
+ * Test for {@link KafkaOperator}
+ */
+public class KafkaOperatorTest extends ServiceBaseTest {
+
+ private static final String TOPIC_NAME = "test";
+ private static final int PARTITION_NUM = 5;
+
+ private final MockConsumer<byte[], byte[]> consumer = new
MockConsumer<>(OffsetResetStrategy.EARLIEST);
+
+ private final InlongStreamInfo streamInfo = new InlongStreamInfo();
+
+ @Autowired
+ private KafkaOperator kafkaOperator;
+
+ public byte[] buildInlongMessage(String message) {
+ InLongMsg inLongMsg = InLongMsg.newInLongMsg();
+ String attr = "t=20230728";
+ inLongMsg.addMsg(attr, message.getBytes(StandardCharsets.UTF_8));
+ return inLongMsg.buildArray();
+ }
+
+ public void addOnceRecord(int partition, long offset, String message) {
+ ConsumerRecord<byte[], byte[]> record =
+ new ConsumerRecord<>(TOPIC_NAME, partition, offset, 0L,
TimestampType.CREATE_TIME,
+ 0L, 0, 0, null, buildInlongMessage(message), new
RecordHeaders(), Optional.empty());
+ consumer.addRecord(record);
+
+ HashMap<TopicPartition, Long> endingOffsets = new HashMap<>();
+ endingOffsets.put(new TopicPartition(TOPIC_NAME, partition), offset);
+ consumer.updateEndOffsets(endingOffsets);
+ }
+
+ public void addRecord(List<String> messages) {
+ List<List<String>> partitions =
+ Lists.partition(messages, (int) Math.ceil((double)
messages.size() / PARTITION_NUM));
+
+ IntStream.range(0, partitions.size()).forEach(index -> {
+ List<String> partitionItems = partitions.get(index);
+ for (int offset = 0; offset < partitionItems.size(); offset++) {
+ addOnceRecord(index, offset + 1, partitionItems.get(offset));
+ }
+ });
+ }
+
+ @BeforeEach
+ public void setUp() {
+ streamInfo.setDataEncoding("UTF-8");
+
+ List<TopicPartition> topicPartitions = IntStream.range(0,
PARTITION_NUM)
+ .mapToObj(i -> new TopicPartition(TOPIC_NAME,
i)).collect(Collectors.toList());
+ consumer.assign(topicPartitions);
+
+ List<PartitionInfo> partitions = IntStream.range(0, PARTITION_NUM)
+ .mapToObj(i -> new PartitionInfo(TOPIC_NAME, i, null, null,
null)).collect(Collectors.toList());
+ consumer.updatePartitions(TOPIC_NAME, partitions);
+
+ Map<TopicPartition, Long> offsets = topicPartitions.stream()
+ .collect(Collectors.toMap(Function.identity(), t -> 0L));
+ consumer.updateBeginningOffsets(offsets);
+ consumer.updateEndOffsets(offsets);
+
+ topicPartitions.forEach(item -> consumer.seek(item, 0));
+ }
+
+ @Test
+ void testGetKafkaLatestMessage() {
+ List<BriefMQMessage> messages =
kafkaOperator.getLatestMessage(consumer, TOPIC_NAME, 10, streamInfo);
+ Assertions.assertEquals(0, messages.size());
+ }
+
+ @Test
+ void testGetKafkaLatestMessage_1() {
+ addRecord(Collections.singletonList("inlong"));
+
+ List<BriefMQMessage> messages =
kafkaOperator.getLatestMessage(consumer, TOPIC_NAME, 10, streamInfo);
+ Assertions.assertEquals(1, messages.size());
+ Assertions.assertEquals("inlong", messages.get(0).getBody());
+ }
+
+ @Test
+ void testGetKafkaLatestMessage_2() {
+ List<String> records = IntStream.range(0, 9).mapToObj(index -> "name_"
+ index).collect(Collectors.toList());
+ addRecord(records);
+
+ List<BriefMQMessage> messages =
kafkaOperator.getLatestMessage(consumer, TOPIC_NAME, 10, streamInfo);
+ Assertions.assertEquals(9, messages.size());
+ }
+
+ @Test
+ void testGetKafkaLatestMessage_4() {
+ List<String> records = IntStream.range(0, 21).mapToObj(index ->
"name_" + index).collect(Collectors.toList());
+ addRecord(records);
+
+ List<BriefMQMessage> messages =
kafkaOperator.getLatestMessage(consumer, TOPIC_NAME, 10, streamInfo);
+ Assertions.assertEquals(10, messages.size());
+ }
+
+}