fuweng11 commented on code in PR #9937:
URL: https://github.com/apache/inlong/pull/9937#discussion_r1555573650


##########
inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/queue/pulsar/PulsarQueueResourceOperator.java:
##########
@@ -303,35 +309,34 @@ private void deletePulsarTopic(InlongPulsarInfo 
pulsarInfo, PulsarClusterInfo pu
      */
     public List<BriefMQMessage> queryLatestMessages(InlongGroupInfo groupInfo,
             InlongStreamInfo streamInfo, Integer messageCount) throws 
Exception {
-        String groupId = streamInfo.getInlongGroupId();
+        List<ClusterInfo> pulsarClusterList = 
clusterService.listByTagAndType(groupInfo.getInlongClusterTag(),
+                ClusterType.PULSAR);
+        List<BriefMQMessage> briefMQMessages = new CopyOnWriteArrayList<>();
+        QueryCountDownLatch queryLatch = new QueryCountDownLatch(messageCount, 
pulsarClusterList.size());
         InlongPulsarInfo inlongPulsarInfo = ((InlongPulsarInfo) groupInfo);
-        List<ClusterInfo> pulsarClusterList =
-                
clusterService.listByTagAndType(groupInfo.getInlongClusterTag(), 
ClusterType.PULSAR);
-        String tenant = inlongPulsarInfo.getPulsarTenant();
-        if (StringUtils.isBlank(tenant) && 
CollectionUtils.isNotEmpty(pulsarClusterList)) {
-            tenant = ((PulsarClusterInfo) 
pulsarClusterList.get(0)).getPulsarTenant();
+        for (ClusterInfo clusterInfo : pulsarClusterList) {
+            QueryLatestMessagesRunnable task = new 
QueryLatestMessagesRunnable(inlongPulsarInfo, streamInfo,
+                    (PulsarClusterInfo) clusterInfo, pulsarOperator, 
messageCount, briefMQMessages, queryLatch);
+            this.executor.execute(task);
         }
+        queryLatch.await(30, TimeUnit.SECONDS);
 
-        String namespace = groupInfo.getMqResource();
+        // insert the consumer group info into the inlong_consume table
         String topicName = streamInfo.getMqResource();
-        String fullTopicName = tenant + "/" + namespace + "/" + topicName;
         String clusterTag = inlongPulsarInfo.getInlongClusterTag();
         String subs = String.format(PULSAR_SUBSCRIPTION_REALTIME_REVIEW, 
clusterTag, topicName);
-        boolean serial = 
InlongConstants.PULSAR_QUEUE_TYPE_SERIAL.equals(inlongPulsarInfo.getQueueModule());
-        List<BriefMQMessage> briefMQMessages = new ArrayList<>();
-        for (ClusterInfo clusterInfo : pulsarClusterList) {
-            briefMQMessages = 
pulsarOperator.queryLatestMessage((PulsarClusterInfo) clusterInfo, 
fullTopicName, subs,
-                    messageCount, streamInfo, serial);
-            if (CollectionUtils.isNotEmpty(briefMQMessages)) {
-                break;
-            }
-        }
-
-        // insert the consumer group info into the inlong_consume table
         Integer id = consumeService.saveBySystem(groupInfo, topicName, subs);
+        String groupId = streamInfo.getInlongGroupId();
         log.info("success to save inlong consume [{}] for subs={}, groupId={}, 
topic={}",
                 id, subs, groupId, topicName);
-        return briefMQMessages;
+
+        // cut
+        int finalMsgCount = Math.min(messageCount, briefMQMessages.size());
+        if (finalMsgCount > 0) {
+            return briefMQMessages.subList(0, finalMsgCount);
+        } else {
+            return new ArrayList<>();
+        }

Review Comment:
   ```suggestion
           int finalMsgCount = Math.max(Math.min(messageCount, 
briefMQMessages.size()), 0);
           return briefMQMessages.subList(0, finalMsgCount);
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to