This is an automated email from the ASF dual-hosted git repository.
dockerzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git
The following commit(s) were added to refs/heads/master by this push:
new 59ea439321 [INLONG-10249][Manager] Fix the problem of duplicate data
appears during data preview (#10250)
59ea439321 is described below
commit 59ea4393215579f55918befbfb43baef1b5e2875
Author: fuweng11 <[email protected]>
AuthorDate: Wed May 22 12:48:39 2024 +0800
[INLONG-10249][Manager] Fix the problem of duplicate data appears during
data preview (#10250)
---
.../service/resource/queue/pulsar/PulsarOperator.java | 7 +++----
.../resource/queue/pulsar/PulsarQueueResourceOperator.java | 14 ++------------
.../resource/queue/pulsar/QueryLatestMessagesRunnable.java | 8 ++------
3 files changed, 7 insertions(+), 22 deletions(-)
diff --git
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/queue/pulsar/PulsarOperator.java
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/queue/pulsar/PulsarOperator.java
index 45e6112ce3..5c9e9f5d10 100644
---
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/queue/pulsar/PulsarOperator.java
+++
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/queue/pulsar/PulsarOperator.java
@@ -409,19 +409,18 @@ public class PulsarOperator {
* Query topic message for the given pulsar cluster.
*/
public List<BriefMQMessage> queryLatestMessage(PulsarClusterInfo
pulsarClusterInfo, String topicFullName,
- String subName,
Integer messageCount, InlongStreamInfo streamInfo, boolean serial)
{
- LOGGER.info("begin to query message for topic {}, subName={}",
topicFullName, subName);
+ LOGGER.info("begin to query message for topic {}", topicFullName);
List<BriefMQMessage> messageList = new ArrayList<>();
int partitionCount = getPartitionCount(pulsarClusterInfo,
topicFullName);
for (int messageIndex = 0; messageIndex < messageCount;
messageIndex++) {
int currentPartitionNum = messageIndex % partitionCount;
- int messagePosition = messageIndex / partitionCount;
+ int messagePosition = messageIndex / partitionCount + 1;
String topicNameOfPartition =
buildTopicNameOfPartition(topicFullName, currentPartitionNum, serial);
messageList.addAll(queryMessageFromPulsar(topicNameOfPartition,
pulsarClusterInfo, messageIndex,
streamInfo, messagePosition));
}
- LOGGER.info("success query message by subs={} for topic={}", subName,
topicFullName);
+ LOGGER.info("success query message for topic={}", topicFullName);
return messageList;
}
diff --git
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/queue/pulsar/PulsarQueueResourceOperator.java
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/queue/pulsar/PulsarQueueResourceOperator.java
index b1ab068b12..6d86760ea4 100644
---
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/queue/pulsar/PulsarQueueResourceOperator.java
+++
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/queue/pulsar/PulsarQueueResourceOperator.java
@@ -68,8 +68,6 @@ public class PulsarQueueResourceOperator implements
QueueResourceOperator {
*/
public static final String PULSAR_SUBSCRIPTION = "%s_%s_%s_consumer_group";
- public static final String PULSAR_SUBSCRIPTION_REALTIME_REVIEW =
"%s_%s_consumer_group_realtime_review";
-
@Autowired
private InlongClusterService clusterService;
@Autowired
@@ -320,17 +318,9 @@ public class PulsarQueueResourceOperator implements
QueueResourceOperator {
this.executor.execute(task);
}
queryLatch.await(30, TimeUnit.SECONDS);
+ log.info("success query pulsar message for groupId={}, streamId={}",
streamInfo.getInlongGroupId(),
+ streamInfo.getInlongStreamId());
- // insert the consumer group info into the inlong_consume table
- String topicName = streamInfo.getMqResource();
- String clusterTag = inlongPulsarInfo.getInlongClusterTag();
- String subs = String.format(PULSAR_SUBSCRIPTION_REALTIME_REVIEW,
clusterTag, topicName);
- Integer id = consumeService.saveBySystem(groupInfo, topicName, subs);
- String groupId = streamInfo.getInlongGroupId();
- log.info("success to save inlong consume [{}] for subs={}, groupId={},
topic={}",
- id, subs, groupId, topicName);
-
- // cut
int finalMsgCount = Math.min(messageCount, briefMQMessages.size());
if (finalMsgCount > 0) {
return briefMQMessages.subList(0, finalMsgCount);
diff --git
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/queue/pulsar/QueryLatestMessagesRunnable.java
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/queue/pulsar/QueryLatestMessagesRunnable.java
index 189fbcb97d..caf61e0ebe 100644
---
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/queue/pulsar/QueryLatestMessagesRunnable.java
+++
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/queue/pulsar/QueryLatestMessagesRunnable.java
@@ -33,8 +33,6 @@ import java.util.List;
*/
public class QueryLatestMessagesRunnable implements Runnable {
- public static final String PULSAR_SUBSCRIPTION_REALTIME_REVIEW =
"%s_%s_consumer_group_realtime_review";
-
private InlongPulsarInfo inlongPulsarInfo;
private InlongStreamInfo streamInfo;
private PulsarClusterInfo clusterInfo;
@@ -69,11 +67,9 @@ public class QueryLatestMessagesRunnable implements Runnable
{
String namespace = inlongPulsarInfo.getMqResource();
String topicName = streamInfo.getMqResource();
String fullTopicName = tenant + "/" + namespace + "/" + topicName;
- String clusterTag = inlongPulsarInfo.getInlongClusterTag();
- String subs = String.format(PULSAR_SUBSCRIPTION_REALTIME_REVIEW,
clusterTag, topicName);
boolean serial =
InlongConstants.PULSAR_QUEUE_TYPE_SERIAL.equals(inlongPulsarInfo.getQueueModule());
- List<BriefMQMessage> messages =
pulsarOperator.queryLatestMessage(clusterInfo, fullTopicName, subs,
- messageCount, streamInfo, serial);
+ List<BriefMQMessage> messages =
+ pulsarOperator.queryLatestMessage(clusterInfo, fullTopicName,
messageCount, streamInfo, serial);
if (CollectionUtils.isNotEmpty(messages)) {
briefMQMessages.addAll(messages);
this.latch.countDown(messages.size());