vernedeng commented on code in PR #8777:
URL: https://github.com/apache/inlong/pull/8777#discussion_r1302365756


##########
inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/queue/pulsar/PulsarOperator.java:
##########
@@ -389,27 +390,65 @@ private boolean subscriptionExists(PulsarAdmin 
pulsarAdmin, String topic, String
     public List<BriefMQMessage> queryLatestMessage(PulsarAdmin pulsarAdmin, 
String topicFullName, String subName,
             Integer messageCount, InlongStreamInfo streamInfo) {
         LOGGER.info("begin to query message for topic {}, subName={}", 
topicFullName, subName);
-
         List<BriefMQMessage> messageList = new ArrayList<>();
-        for (int i = 0; i < messageCount; i++) {
-            try {
-                Message<byte[]> pulsarMessage = 
pulsarAdmin.topics().examineMessage(topicFullName, "latest", i);
-                Map<String, String> headers = pulsarMessage.getProperties();
-                int wrapTypeId = 
Integer.parseInt(headers.getOrDefault(InlongConstants.MSG_ENCODE_VER,
-                        
Integer.toString(DataProxyMsgEncType.MSG_ENCODE_TYPE_INLONGMSG.getId())));
-                DeserializeOperator deserializeOperator = 
deserializeOperatorFactory.getInstance(
-                        DataProxyMsgEncType.valueOf(wrapTypeId));
-                messageList.addAll(
-                        deserializeOperator.decodeMsg(streamInfo, 
pulsarMessage.getData(), headers, i));
-            } catch (Exception e) {
-                String errMsg = "decode msg error: ";
-                LOGGER.error(errMsg, e);
-                throw new BusinessException(errMsg + e.getMessage());
-            }
+        int partitionCount = getPartitionCount(pulsarAdmin, topicFullName);
+        boolean serial = partitionCount == 0;
+        for (int messageIndex = 0; messageIndex < messageCount; 
messageIndex++) {
+            int currentPartitionNum = messageIndex % partitionCount;
+            int messagePosition = messageIndex / partitionCount;
+            String topicNameOfPartition = 
buildTopicNameOfPartition(topicFullName, currentPartitionNum, serial);
+            messageList.addAll(queryMessageFromPulsar(topicNameOfPartition, 
pulsarAdmin, messageIndex,
+                    streamInfo, messagePosition));
         }
-
         LOGGER.info("success query message by subs={} for topic={}", subName, 
topicFullName);
         return messageList;
     }
 
+    /**
+     *
+     * Use pulsar admin to get topic partition count
+     */
+    private int getPartitionCount(PulsarAdmin pulsarAdmin, String 
topicFullName) {
+        PartitionedTopicMetadata partitionedTopicMetadata;
+        try {
+            partitionedTopicMetadata = pulsarAdmin.topics()
+                    .getPartitionedTopicMetadata(topicFullName);
+        } catch (Exception e) {
+            String errMsg = "get pulsar partition error ";
+            LOGGER.error(errMsg, e);
+            throw new BusinessException(errMsg + e.getMessage());
+        }
+        return partitionedTopicMetadata.partitions;
+    }
+
+    /**
+     * Use pulsar admin to query message
+     */
+    private List<BriefMQMessage> queryMessageFromPulsar(String topic, 
PulsarAdmin pulsarAdmin, int index,

Review Comment:
   topic -> topicPartition



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to