This is an automated email from the ASF dual-hosted git repository.
dockerzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git
The following commit(s) were added to refs/heads/master by this push:
new 84bd8b247d [INLONG-9936][Manager] Support for parallel acquisition of
data preview data from multiple Pulsar clusters (#9937)
84bd8b247d is described below
commit 84bd8b247d607fa5a3b7fcd95e93a6f074f0eaaf
Author: 卢春亮 <[email protected]>
AuthorDate: Mon Apr 8 19:24:12 2024 +0800
[INLONG-9936][Manager] Support for parallel acquisition of data preview
data from multiple Pulsar clusters (#9937)
---
.../queue/pulsar/PulsarQueueResourceOperator.java | 49 +++++++------
.../resource/queue/pulsar/QueryCountDownLatch.java | 55 +++++++++++++++
.../queue/pulsar/QueryLatestMessagesRunnable.java | 82 ++++++++++++++++++++++
3 files changed, 164 insertions(+), 22 deletions(-)
diff --git
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/queue/pulsar/PulsarQueueResourceOperator.java
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/queue/pulsar/PulsarQueueResourceOperator.java
index 13b859347a..b1ab068b12 100644
---
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/queue/pulsar/PulsarQueueResourceOperator.java
+++
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/queue/pulsar/PulsarQueueResourceOperator.java
@@ -50,7 +50,11 @@ import
org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import java.util.ArrayList;
+import java.util.Collections;
import java.util.List;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
/**
* Operator for create Pulsar Tenant, Namespace, Topic and Subscription
@@ -77,6 +81,8 @@ public class PulsarQueueResourceOperator implements
QueueResourceOperator {
@Autowired
private PulsarOperator pulsarOperator;
+ private ScheduledExecutorService executor =
Executors.newScheduledThreadPool(10);
+
@Override
public boolean accept(String mqType) {
return MQType.PULSAR.equals(mqType) ||
MQType.TDMQ_PULSAR.equals(mqType);
@@ -303,35 +309,34 @@ public class PulsarQueueResourceOperator implements
QueueResourceOperator {
*/
public List<BriefMQMessage> queryLatestMessages(InlongGroupInfo groupInfo,
InlongStreamInfo streamInfo, Integer messageCount) throws
Exception {
- String groupId = streamInfo.getInlongGroupId();
+ List<ClusterInfo> pulsarClusterList =
clusterService.listByTagAndType(groupInfo.getInlongClusterTag(),
+ ClusterType.PULSAR);
+ List<BriefMQMessage> briefMQMessages =
Collections.synchronizedList(new ArrayList<>());
+ QueryCountDownLatch queryLatch = new QueryCountDownLatch(messageCount,
pulsarClusterList.size());
InlongPulsarInfo inlongPulsarInfo = ((InlongPulsarInfo) groupInfo);
- List<ClusterInfo> pulsarClusterList =
-
clusterService.listByTagAndType(groupInfo.getInlongClusterTag(),
ClusterType.PULSAR);
- String tenant = inlongPulsarInfo.getPulsarTenant();
- if (StringUtils.isBlank(tenant) &&
CollectionUtils.isNotEmpty(pulsarClusterList)) {
- tenant = ((PulsarClusterInfo)
pulsarClusterList.get(0)).getPulsarTenant();
+ for (ClusterInfo clusterInfo : pulsarClusterList) {
+ QueryLatestMessagesRunnable task = new
QueryLatestMessagesRunnable(inlongPulsarInfo, streamInfo,
+ (PulsarClusterInfo) clusterInfo, pulsarOperator,
messageCount, briefMQMessages, queryLatch);
+ this.executor.execute(task);
}
+ queryLatch.await(30, TimeUnit.SECONDS);
- String namespace = groupInfo.getMqResource();
+ // insert the consumer group info into the inlong_consume table
String topicName = streamInfo.getMqResource();
- String fullTopicName = tenant + "/" + namespace + "/" + topicName;
String clusterTag = inlongPulsarInfo.getInlongClusterTag();
String subs = String.format(PULSAR_SUBSCRIPTION_REALTIME_REVIEW,
clusterTag, topicName);
- boolean serial =
InlongConstants.PULSAR_QUEUE_TYPE_SERIAL.equals(inlongPulsarInfo.getQueueModule());
- List<BriefMQMessage> briefMQMessages = new ArrayList<>();
- for (ClusterInfo clusterInfo : pulsarClusterList) {
- briefMQMessages =
pulsarOperator.queryLatestMessage((PulsarClusterInfo) clusterInfo,
fullTopicName, subs,
- messageCount, streamInfo, serial);
- if (CollectionUtils.isNotEmpty(briefMQMessages)) {
- break;
- }
- }
-
- // insert the consumer group info into the inlong_consume table
Integer id = consumeService.saveBySystem(groupInfo, topicName, subs);
+ String groupId = streamInfo.getInlongGroupId();
log.info("success to save inlong consume [{}] for subs={}, groupId={},
topic={}",
id, subs, groupId, topicName);
- return briefMQMessages;
+
+ // cut
+ int finalMsgCount = Math.min(messageCount, briefMQMessages.size());
+ if (finalMsgCount > 0) {
+ return briefMQMessages.subList(0, finalMsgCount);
+ } else {
+ return new ArrayList<>();
+ }
}
/**
@@ -341,8 +346,8 @@ public class PulsarQueueResourceOperator implements
QueueResourceOperator {
Long resetTime) throws Exception {
log.info("begin to reset cursor for sinkId={}", sinkEntity.getId());
InlongPulsarInfo pulsarInfo = (InlongPulsarInfo) groupInfo;
- List<ClusterInfo> clusterInfos =
-
clusterService.listByTagAndType(pulsarInfo.getInlongClusterTag(),
ClusterType.PULSAR);
+ List<ClusterInfo> clusterInfos =
clusterService.listByTagAndType(pulsarInfo.getInlongClusterTag(),
+ ClusterType.PULSAR);
for (ClusterInfo clusterInfo : clusterInfos) {
PulsarClusterInfo pulsarCluster = (PulsarClusterInfo) clusterInfo;
try {
diff --git
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/queue/pulsar/QueryCountDownLatch.java
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/queue/pulsar/QueryCountDownLatch.java
new file mode 100644
index 0000000000..08970b39d7
--- /dev/null
+++
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/queue/pulsar/QueryCountDownLatch.java
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.manager.service.resource.queue.pulsar;
+
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * QueryCountDownLatch
+ */
+public class QueryCountDownLatch {
+
+ private CountDownLatch dataLatch;
+ private CountDownLatch taskLatch;
+ private CountDownLatch flagLatch;
+
+ public QueryCountDownLatch(int dataSize, int taskSize) {
+ this.dataLatch = new CountDownLatch(dataSize);
+ this.taskLatch = new CountDownLatch(taskSize);
+ this.flagLatch = new CountDownLatch(1);
+ }
+
+ public void countDown(int dataDownSize) {
+ this.taskLatch.countDown();
+ for (int i = 0; i < dataDownSize; i++) {
+ this.dataLatch.countDown();
+ }
+ if (this.taskLatch.getCount() == 0 || this.dataLatch.getCount() == 0) {
+ this.flagLatch.countDown();
+ }
+ }
+
+ public void await() throws InterruptedException {
+ this.flagLatch.await();
+ }
+
+ public boolean await(long timeout, TimeUnit unit) throws
InterruptedException {
+ return this.flagLatch.await(timeout, unit);
+ }
+}
diff --git
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/queue/pulsar/QueryLatestMessagesRunnable.java
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/queue/pulsar/QueryLatestMessagesRunnable.java
new file mode 100644
index 0000000000..189fbcb97d
--- /dev/null
+++
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/queue/pulsar/QueryLatestMessagesRunnable.java
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.manager.service.resource.queue.pulsar;
+
+import org.apache.inlong.manager.common.consts.InlongConstants;
+import org.apache.inlong.manager.pojo.cluster.pulsar.PulsarClusterInfo;
+import org.apache.inlong.manager.pojo.consume.BriefMQMessage;
+import org.apache.inlong.manager.pojo.group.pulsar.InlongPulsarInfo;
+import org.apache.inlong.manager.pojo.stream.InlongStreamInfo;
+
+import org.apache.commons.collections.CollectionUtils;
+import org.apache.commons.lang3.StringUtils;
+
+import java.util.List;
+
+/**
+ * QueryLatestMessagesRunnable
+ */
+public class QueryLatestMessagesRunnable implements Runnable {
+
+ public static final String PULSAR_SUBSCRIPTION_REALTIME_REVIEW =
"%s_%s_consumer_group_realtime_review";
+
+ private InlongPulsarInfo inlongPulsarInfo;
+ private InlongStreamInfo streamInfo;
+ private PulsarClusterInfo clusterInfo;
+ private PulsarOperator pulsarOperator;
+ private Integer messageCount;
+ private List<BriefMQMessage> briefMQMessages;
+ private QueryCountDownLatch latch;
+
+ public QueryLatestMessagesRunnable(InlongPulsarInfo inlongPulsarInfo,
+ InlongStreamInfo streamInfo,
+ PulsarClusterInfo clusterInfo,
+ PulsarOperator pulsarOperator,
+ Integer messageCount,
+ List<BriefMQMessage> briefMQMessages,
+ QueryCountDownLatch latch) {
+ this.inlongPulsarInfo = inlongPulsarInfo;
+ this.streamInfo = streamInfo;
+ this.clusterInfo = clusterInfo;
+ this.pulsarOperator = pulsarOperator;
+ this.messageCount = messageCount;
+ this.briefMQMessages = briefMQMessages;
+ this.latch = latch;
+ }
+
+ @Override
+ public void run() {
+ String tenant = inlongPulsarInfo.getPulsarTenant();
+ if (StringUtils.isBlank(tenant)) {
+ tenant = clusterInfo.getPulsarTenant();
+ }
+
+ String namespace = inlongPulsarInfo.getMqResource();
+ String topicName = streamInfo.getMqResource();
+ String fullTopicName = tenant + "/" + namespace + "/" + topicName;
+ String clusterTag = inlongPulsarInfo.getInlongClusterTag();
+ String subs = String.format(PULSAR_SUBSCRIPTION_REALTIME_REVIEW,
clusterTag, topicName);
+ boolean serial =
InlongConstants.PULSAR_QUEUE_TYPE_SERIAL.equals(inlongPulsarInfo.getQueueModule());
+ List<BriefMQMessage> messages =
pulsarOperator.queryLatestMessage(clusterInfo, fullTopicName, subs,
+ messageCount, streamInfo, serial);
+ if (CollectionUtils.isNotEmpty(messages)) {
+ briefMQMessages.addAll(messages);
+ this.latch.countDown(messages.size());
+ }
+ }
+}