This is an automated email from the ASF dual-hosted git repository.

dockerzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git


The following commit(s) were added to refs/heads/master by this push:
     new 84bd8b247d [INLONG-9936][Manager] Support for parallel acquisition of 
data preview data from multiple Pulsar clusters (#9937)
84bd8b247d is described below

commit 84bd8b247d607fa5a3b7fcd95e93a6f074f0eaaf
Author: 卢春亮 <[email protected]>
AuthorDate: Mon Apr 8 19:24:12 2024 +0800

    [INLONG-9936][Manager] Support for parallel acquisition of data preview 
data from multiple Pulsar clusters (#9937)
---
 .../queue/pulsar/PulsarQueueResourceOperator.java  | 49 +++++++------
 .../resource/queue/pulsar/QueryCountDownLatch.java | 55 +++++++++++++++
 .../queue/pulsar/QueryLatestMessagesRunnable.java  | 82 ++++++++++++++++++++++
 3 files changed, 164 insertions(+), 22 deletions(-)

diff --git 
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/queue/pulsar/PulsarQueueResourceOperator.java
 
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/queue/pulsar/PulsarQueueResourceOperator.java
index 13b859347a..b1ab068b12 100644
--- 
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/queue/pulsar/PulsarQueueResourceOperator.java
+++ 
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/queue/pulsar/PulsarQueueResourceOperator.java
@@ -50,7 +50,11 @@ import 
org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.stereotype.Service;
 
 import java.util.ArrayList;
+import java.util.Collections;
 import java.util.List;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
 
 /**
  * Operator for create Pulsar Tenant, Namespace, Topic and Subscription
@@ -77,6 +81,8 @@ public class PulsarQueueResourceOperator implements 
QueueResourceOperator {
     @Autowired
     private PulsarOperator pulsarOperator;
 
+    private ScheduledExecutorService executor = 
Executors.newScheduledThreadPool(10);
+
     @Override
     public boolean accept(String mqType) {
         return MQType.PULSAR.equals(mqType) || 
MQType.TDMQ_PULSAR.equals(mqType);
@@ -303,35 +309,34 @@ public class PulsarQueueResourceOperator implements 
QueueResourceOperator {
      */
     public List<BriefMQMessage> queryLatestMessages(InlongGroupInfo groupInfo,
             InlongStreamInfo streamInfo, Integer messageCount) throws 
Exception {
-        String groupId = streamInfo.getInlongGroupId();
+        List<ClusterInfo> pulsarClusterList = 
clusterService.listByTagAndType(groupInfo.getInlongClusterTag(),
+                ClusterType.PULSAR);
+        List<BriefMQMessage> briefMQMessages = 
Collections.synchronizedList(new ArrayList<>());
+        QueryCountDownLatch queryLatch = new QueryCountDownLatch(messageCount, 
pulsarClusterList.size());
         InlongPulsarInfo inlongPulsarInfo = ((InlongPulsarInfo) groupInfo);
-        List<ClusterInfo> pulsarClusterList =
-                
clusterService.listByTagAndType(groupInfo.getInlongClusterTag(), 
ClusterType.PULSAR);
-        String tenant = inlongPulsarInfo.getPulsarTenant();
-        if (StringUtils.isBlank(tenant) && 
CollectionUtils.isNotEmpty(pulsarClusterList)) {
-            tenant = ((PulsarClusterInfo) 
pulsarClusterList.get(0)).getPulsarTenant();
+        for (ClusterInfo clusterInfo : pulsarClusterList) {
+            QueryLatestMessagesRunnable task = new 
QueryLatestMessagesRunnable(inlongPulsarInfo, streamInfo,
+                    (PulsarClusterInfo) clusterInfo, pulsarOperator, 
messageCount, briefMQMessages, queryLatch);
+            this.executor.execute(task);
         }
+        queryLatch.await(30, TimeUnit.SECONDS);
 
-        String namespace = groupInfo.getMqResource();
+        // insert the consumer group info into the inlong_consume table
         String topicName = streamInfo.getMqResource();
-        String fullTopicName = tenant + "/" + namespace + "/" + topicName;
         String clusterTag = inlongPulsarInfo.getInlongClusterTag();
         String subs = String.format(PULSAR_SUBSCRIPTION_REALTIME_REVIEW, 
clusterTag, topicName);
-        boolean serial = 
InlongConstants.PULSAR_QUEUE_TYPE_SERIAL.equals(inlongPulsarInfo.getQueueModule());
-        List<BriefMQMessage> briefMQMessages = new ArrayList<>();
-        for (ClusterInfo clusterInfo : pulsarClusterList) {
-            briefMQMessages = 
pulsarOperator.queryLatestMessage((PulsarClusterInfo) clusterInfo, 
fullTopicName, subs,
-                    messageCount, streamInfo, serial);
-            if (CollectionUtils.isNotEmpty(briefMQMessages)) {
-                break;
-            }
-        }
-
-        // insert the consumer group info into the inlong_consume table
         Integer id = consumeService.saveBySystem(groupInfo, topicName, subs);
+        String groupId = streamInfo.getInlongGroupId();
         log.info("success to save inlong consume [{}] for subs={}, groupId={}, 
topic={}",
                 id, subs, groupId, topicName);
-        return briefMQMessages;
+
+        // cut
+        int finalMsgCount = Math.min(messageCount, briefMQMessages.size());
+        if (finalMsgCount > 0) {
+            return briefMQMessages.subList(0, finalMsgCount);
+        } else {
+            return new ArrayList<>();
+        }
     }
 
     /**
@@ -341,8 +346,8 @@ public class PulsarQueueResourceOperator implements 
QueueResourceOperator {
             Long resetTime) throws Exception {
         log.info("begin to reset cursor for sinkId={}", sinkEntity.getId());
         InlongPulsarInfo pulsarInfo = (InlongPulsarInfo) groupInfo;
-        List<ClusterInfo> clusterInfos =
-                
clusterService.listByTagAndType(pulsarInfo.getInlongClusterTag(), 
ClusterType.PULSAR);
+        List<ClusterInfo> clusterInfos = 
clusterService.listByTagAndType(pulsarInfo.getInlongClusterTag(),
+                ClusterType.PULSAR);
         for (ClusterInfo clusterInfo : clusterInfos) {
             PulsarClusterInfo pulsarCluster = (PulsarClusterInfo) clusterInfo;
             try {
diff --git 
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/queue/pulsar/QueryCountDownLatch.java
 
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/queue/pulsar/QueryCountDownLatch.java
new file mode 100644
index 0000000000..08970b39d7
--- /dev/null
+++ 
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/queue/pulsar/QueryCountDownLatch.java
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.manager.service.resource.queue.pulsar;
+
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * QueryCountDownLatch
+ */
+public class QueryCountDownLatch {
+
+    private CountDownLatch dataLatch;
+    private CountDownLatch taskLatch;
+    private CountDownLatch flagLatch;
+
+    public QueryCountDownLatch(int dataSize, int taskSize) {
+        this.dataLatch = new CountDownLatch(dataSize);
+        this.taskLatch = new CountDownLatch(taskSize);
+        this.flagLatch = new CountDownLatch(1);
+    }
+
+    public void countDown(int dataDownSize) {
+        this.taskLatch.countDown();
+        for (int i = 0; i < dataDownSize; i++) {
+            this.dataLatch.countDown();
+        }
+        if (this.taskLatch.getCount() == 0 || this.dataLatch.getCount() == 0) {
+            this.flagLatch.countDown();
+        }
+    }
+
+    public void await() throws InterruptedException {
+        this.flagLatch.await();
+    }
+
+    public boolean await(long timeout, TimeUnit unit) throws 
InterruptedException {
+        return this.flagLatch.await(timeout, unit);
+    }
+}
diff --git 
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/queue/pulsar/QueryLatestMessagesRunnable.java
 
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/queue/pulsar/QueryLatestMessagesRunnable.java
new file mode 100644
index 0000000000..189fbcb97d
--- /dev/null
+++ 
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/queue/pulsar/QueryLatestMessagesRunnable.java
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.manager.service.resource.queue.pulsar;
+
+import org.apache.inlong.manager.common.consts.InlongConstants;
+import org.apache.inlong.manager.pojo.cluster.pulsar.PulsarClusterInfo;
+import org.apache.inlong.manager.pojo.consume.BriefMQMessage;
+import org.apache.inlong.manager.pojo.group.pulsar.InlongPulsarInfo;
+import org.apache.inlong.manager.pojo.stream.InlongStreamInfo;
+
+import org.apache.commons.collections.CollectionUtils;
+import org.apache.commons.lang3.StringUtils;
+
+import java.util.List;
+
+/**
+ * QueryLatestMessagesRunnable
+ */
+public class QueryLatestMessagesRunnable implements Runnable {
+
+    public static final String PULSAR_SUBSCRIPTION_REALTIME_REVIEW = 
"%s_%s_consumer_group_realtime_review";
+
+    private InlongPulsarInfo inlongPulsarInfo;
+    private InlongStreamInfo streamInfo;
+    private PulsarClusterInfo clusterInfo;
+    private PulsarOperator pulsarOperator;
+    private Integer messageCount;
+    private List<BriefMQMessage> briefMQMessages;
+    private QueryCountDownLatch latch;
+
+    public QueryLatestMessagesRunnable(InlongPulsarInfo inlongPulsarInfo,
+            InlongStreamInfo streamInfo,
+            PulsarClusterInfo clusterInfo,
+            PulsarOperator pulsarOperator,
+            Integer messageCount,
+            List<BriefMQMessage> briefMQMessages,
+            QueryCountDownLatch latch) {
+        this.inlongPulsarInfo = inlongPulsarInfo;
+        this.streamInfo = streamInfo;
+        this.clusterInfo = clusterInfo;
+        this.pulsarOperator = pulsarOperator;
+        this.messageCount = messageCount;
+        this.briefMQMessages = briefMQMessages;
+        this.latch = latch;
+    }
+
+    @Override
+    public void run() {
+        String tenant = inlongPulsarInfo.getPulsarTenant();
+        if (StringUtils.isBlank(tenant)) {
+            tenant = clusterInfo.getPulsarTenant();
+        }
+
+        String namespace = inlongPulsarInfo.getMqResource();
+        String topicName = streamInfo.getMqResource();
+        String fullTopicName = tenant + "/" + namespace + "/" + topicName;
+        String clusterTag = inlongPulsarInfo.getInlongClusterTag();
+        String subs = String.format(PULSAR_SUBSCRIPTION_REALTIME_REVIEW, 
clusterTag, topicName);
+        boolean serial = 
InlongConstants.PULSAR_QUEUE_TYPE_SERIAL.equals(inlongPulsarInfo.getQueueModule());
+        List<BriefMQMessage> messages = 
pulsarOperator.queryLatestMessage(clusterInfo, fullTopicName, subs,
+                messageCount, streamInfo, serial);
+        if (CollectionUtils.isNotEmpty(messages)) {
+            briefMQMessages.addAll(messages);
+            this.latch.countDown(messages.size());
+        }
+    }
+}

Reply via email to