This is an automated email from the ASF dual-hosted git repository.

dockerzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git


The following commit(s) were added to refs/heads/master by this push:
     new c62bfff4ed [INLONG-9902][Manager] Data preview supports pulsar multi 
cluster (#9903)
c62bfff4ed is described below

commit c62bfff4ed40b8c057c9901a101002e3de6197b3
Author: fuweng11 <[email protected]>
AuthorDate: Sun Mar 31 23:05:01 2024 +0800

    [INLONG-9902][Manager] Data preview supports pulsar multi cluster (#9903)
---
 .../queue/pulsar/PulsarQueueResourceOperator.java     | 19 +++++++++++++------
 1 file changed, 13 insertions(+), 6 deletions(-)

diff --git 
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/queue/pulsar/PulsarQueueResourceOperator.java
 
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/queue/pulsar/PulsarQueueResourceOperator.java
index e6e1d66ef5..13b859347a 100644
--- 
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/queue/pulsar/PulsarQueueResourceOperator.java
+++ 
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/queue/pulsar/PulsarQueueResourceOperator.java
@@ -49,6 +49,7 @@ import org.apache.commons.lang3.StringUtils;
 import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.stereotype.Service;
 
+import java.util.ArrayList;
 import java.util.List;
 
 /**
@@ -304,11 +305,11 @@ public class PulsarQueueResourceOperator implements 
QueueResourceOperator {
             InlongStreamInfo streamInfo, Integer messageCount) throws 
Exception {
         String groupId = streamInfo.getInlongGroupId();
         InlongPulsarInfo inlongPulsarInfo = ((InlongPulsarInfo) groupInfo);
-        PulsarClusterInfo pulsarCluster = (PulsarClusterInfo) 
clusterService.getOne(groupInfo.getInlongClusterTag(),
-                null, ClusterType.PULSAR);
+        List<ClusterInfo> pulsarClusterList =
+                
clusterService.listByTagAndType(groupInfo.getInlongClusterTag(), 
ClusterType.PULSAR);
         String tenant = inlongPulsarInfo.getPulsarTenant();
-        if (StringUtils.isBlank(tenant)) {
-            tenant = pulsarCluster.getPulsarTenant();
+        if (StringUtils.isBlank(tenant) && 
CollectionUtils.isNotEmpty(pulsarClusterList)) {
+            tenant = ((PulsarClusterInfo) 
pulsarClusterList.get(0)).getPulsarTenant();
         }
 
         String namespace = groupInfo.getMqResource();
@@ -317,8 +318,14 @@ public class PulsarQueueResourceOperator implements 
QueueResourceOperator {
         String clusterTag = inlongPulsarInfo.getInlongClusterTag();
         String subs = String.format(PULSAR_SUBSCRIPTION_REALTIME_REVIEW, 
clusterTag, topicName);
         boolean serial = 
InlongConstants.PULSAR_QUEUE_TYPE_SERIAL.equals(inlongPulsarInfo.getQueueModule());
-        List<BriefMQMessage> briefMQMessages = 
pulsarOperator.queryLatestMessage(pulsarCluster, fullTopicName, subs,
-                messageCount, streamInfo, serial);
+        List<BriefMQMessage> briefMQMessages = new ArrayList<>();
+        for (ClusterInfo clusterInfo : pulsarClusterList) {
+            briefMQMessages = 
pulsarOperator.queryLatestMessage((PulsarClusterInfo) clusterInfo, 
fullTopicName, subs,
+                    messageCount, streamInfo, serial);
+            if (CollectionUtils.isNotEmpty(briefMQMessages)) {
+                break;
+            }
+        }
 
         // insert the consumer group info into the inlong_consume table
         Integer id = consumeService.saveBySystem(groupInfo, topicName, subs);

Reply via email to