vernedeng commented on code in PR #8777:
URL: https://github.com/apache/inlong/pull/8777#discussion_r1301014133


##########
inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/queue/pulsar/PulsarOperator.java:
##########
@@ -389,27 +390,50 @@ private boolean subscriptionExists(PulsarAdmin 
pulsarAdmin, String topic, String
     public List<BriefMQMessage> queryLatestMessage(PulsarAdmin pulsarAdmin, 
String topicFullName, String subName,
             Integer messageCount, InlongStreamInfo streamInfo) {
         LOGGER.info("begin to query message for topic {}, subName={}", 
topicFullName, subName);
-
         List<BriefMQMessage> messageList = new ArrayList<>();
-        for (int i = 0; i < messageCount; i++) {
-            try {
-                Message<byte[]> pulsarMessage = 
pulsarAdmin.topics().examineMessage(topicFullName, "latest", i);
-                Map<String, String> headers = pulsarMessage.getProperties();
-                int wrapTypeId = 
Integer.parseInt(headers.getOrDefault(InlongConstants.MSG_ENCODE_VER,
-                        
Integer.toString(DataProxyMsgEncType.MSG_ENCODE_TYPE_INLONGMSG.getId())));
-                DeserializeOperator deserializeOperator = 
deserializeOperatorFactory.getInstance(
-                        DataProxyMsgEncType.valueOf(wrapTypeId));
-                messageList.addAll(
-                        deserializeOperator.decodeMsg(streamInfo, 
pulsarMessage.getData(), headers, i));
-            } catch (Exception e) {
-                String errMsg = "decode msg error: ";
-                LOGGER.error(errMsg, e);
-                throw new BusinessException(errMsg + e.getMessage());
+        try {
+            PartitionedTopicMetadata partitionedTopicMetadata = 
pulsarAdmin.topics()
+                    .getPartitionedTopicMetadata(topicFullName);
+            int partitionCount = partitionedTopicMetadata.partitions;
+            boolean serial = partitionCount == 0;
+            for (int messageIndex = 0; messageIndex < messageCount; 
messageIndex++) {
+                int currentPartitionNum = messageIndex % partitionCount;
+                int messagePosition = messageIndex / partitionCount;
+                String topicNameOfPartition = 
buildTopicNameOfPartition(topicFullName, currentPartitionNum, serial);
+                
messageList.addAll(queryMessageFromPulsar(topicNameOfPartition, pulsarAdmin, 
messageIndex,

Review Comment:
   Please process exception for each partition



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to