fuweng11 commented on code in PR #9937:
URL: https://github.com/apache/inlong/pull/9937#discussion_r1555573650
##########
inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/queue/pulsar/PulsarQueueResourceOperator.java:
##########
@@ -303,35 +309,34 @@ private void deletePulsarTopic(InlongPulsarInfo
pulsarInfo, PulsarClusterInfo pu
*/
public List<BriefMQMessage> queryLatestMessages(InlongGroupInfo groupInfo,
InlongStreamInfo streamInfo, Integer messageCount) throws
Exception {
- String groupId = streamInfo.getInlongGroupId();
+ List<ClusterInfo> pulsarClusterList =
clusterService.listByTagAndType(groupInfo.getInlongClusterTag(),
+ ClusterType.PULSAR);
+ List<BriefMQMessage> briefMQMessages = new CopyOnWriteArrayList<>();
+ QueryCountDownLatch queryLatch = new QueryCountDownLatch(messageCount,
pulsarClusterList.size());
InlongPulsarInfo inlongPulsarInfo = ((InlongPulsarInfo) groupInfo);
- List<ClusterInfo> pulsarClusterList =
-
clusterService.listByTagAndType(groupInfo.getInlongClusterTag(),
ClusterType.PULSAR);
- String tenant = inlongPulsarInfo.getPulsarTenant();
- if (StringUtils.isBlank(tenant) &&
CollectionUtils.isNotEmpty(pulsarClusterList)) {
- tenant = ((PulsarClusterInfo)
pulsarClusterList.get(0)).getPulsarTenant();
+ for (ClusterInfo clusterInfo : pulsarClusterList) {
+ QueryLatestMessagesRunnable task = new
QueryLatestMessagesRunnable(inlongPulsarInfo, streamInfo,
+ (PulsarClusterInfo) clusterInfo, pulsarOperator,
messageCount, briefMQMessages, queryLatch);
+ this.executor.execute(task);
}
+ queryLatch.await(30, TimeUnit.SECONDS);
- String namespace = groupInfo.getMqResource();
+ // insert the consumer group info into the inlong_consume table
String topicName = streamInfo.getMqResource();
- String fullTopicName = tenant + "/" + namespace + "/" + topicName;
String clusterTag = inlongPulsarInfo.getInlongClusterTag();
String subs = String.format(PULSAR_SUBSCRIPTION_REALTIME_REVIEW,
clusterTag, topicName);
- boolean serial =
InlongConstants.PULSAR_QUEUE_TYPE_SERIAL.equals(inlongPulsarInfo.getQueueModule());
- List<BriefMQMessage> briefMQMessages = new ArrayList<>();
- for (ClusterInfo clusterInfo : pulsarClusterList) {
- briefMQMessages =
pulsarOperator.queryLatestMessage((PulsarClusterInfo) clusterInfo,
fullTopicName, subs,
- messageCount, streamInfo, serial);
- if (CollectionUtils.isNotEmpty(briefMQMessages)) {
- break;
- }
- }
-
- // insert the consumer group info into the inlong_consume table
Integer id = consumeService.saveBySystem(groupInfo, topicName, subs);
+ String groupId = streamInfo.getInlongGroupId();
log.info("success to save inlong consume [{}] for subs={}, groupId={},
topic={}",
id, subs, groupId, topicName);
- return briefMQMessages;
+
+ // cut
+ int finalMsgCount = Math.min(messageCount, briefMQMessages.size());
+ if (finalMsgCount > 0) {
+ return briefMQMessages.subList(0, finalMsgCount);
+ } else {
+ return new ArrayList<>();
+ }
Review Comment:
```suggestion
int finalMsgCount = Math.max(Math.min(messageCount,
briefMQMessages.size()), 0);
return briefMQMessages.subList(0, finalMsgCount);
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]