This is an automated email from the ASF dual-hosted git repository.

dockerzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git


The following commit(s) were added to refs/heads/master by this push:
     new 59ea439321 [INLONG-10249][Manager] Fix the problem of duplicate data 
appears during data preview (#10250)
59ea439321 is described below

commit 59ea4393215579f55918befbfb43baef1b5e2875
Author: fuweng11 <[email protected]>
AuthorDate: Wed May 22 12:48:39 2024 +0800

    [INLONG-10249][Manager] Fix the problem of duplicate data appears during 
data preview (#10250)
---
 .../service/resource/queue/pulsar/PulsarOperator.java      |  7 +++----
 .../resource/queue/pulsar/PulsarQueueResourceOperator.java | 14 ++------------
 .../resource/queue/pulsar/QueryLatestMessagesRunnable.java |  8 ++------
 3 files changed, 7 insertions(+), 22 deletions(-)

diff --git 
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/queue/pulsar/PulsarOperator.java
 
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/queue/pulsar/PulsarOperator.java
index 45e6112ce3..5c9e9f5d10 100644
--- 
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/queue/pulsar/PulsarOperator.java
+++ 
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/queue/pulsar/PulsarOperator.java
@@ -409,19 +409,18 @@ public class PulsarOperator {
      * Query topic message for the given pulsar cluster.
      */
     public List<BriefMQMessage> queryLatestMessage(PulsarClusterInfo 
pulsarClusterInfo, String topicFullName,
-            String subName,
             Integer messageCount, InlongStreamInfo streamInfo, boolean serial) 
{
-        LOGGER.info("begin to query message for topic {}, subName={}", 
topicFullName, subName);
+        LOGGER.info("begin to query message for topic {}", topicFullName);
         List<BriefMQMessage> messageList = new ArrayList<>();
         int partitionCount = getPartitionCount(pulsarClusterInfo, 
topicFullName);
         for (int messageIndex = 0; messageIndex < messageCount; 
messageIndex++) {
             int currentPartitionNum = messageIndex % partitionCount;
-            int messagePosition = messageIndex / partitionCount;
+            int messagePosition = messageIndex / partitionCount + 1;
             String topicNameOfPartition = 
buildTopicNameOfPartition(topicFullName, currentPartitionNum, serial);
             messageList.addAll(queryMessageFromPulsar(topicNameOfPartition, 
pulsarClusterInfo, messageIndex,
                     streamInfo, messagePosition));
         }
-        LOGGER.info("success query message by subs={} for topic={}", subName, 
topicFullName);
+        LOGGER.info("success query message for topic={}", topicFullName);
         return messageList;
     }
 
diff --git 
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/queue/pulsar/PulsarQueueResourceOperator.java
 
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/queue/pulsar/PulsarQueueResourceOperator.java
index b1ab068b12..6d86760ea4 100644
--- 
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/queue/pulsar/PulsarQueueResourceOperator.java
+++ 
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/queue/pulsar/PulsarQueueResourceOperator.java
@@ -68,8 +68,6 @@ public class PulsarQueueResourceOperator implements 
QueueResourceOperator {
      */
     public static final String PULSAR_SUBSCRIPTION = "%s_%s_%s_consumer_group";
 
-    public static final String PULSAR_SUBSCRIPTION_REALTIME_REVIEW = 
"%s_%s_consumer_group_realtime_review";
-
     @Autowired
     private InlongClusterService clusterService;
     @Autowired
@@ -320,17 +318,9 @@ public class PulsarQueueResourceOperator implements 
QueueResourceOperator {
             this.executor.execute(task);
         }
         queryLatch.await(30, TimeUnit.SECONDS);
+        log.info("success query pulsar message for groupId={}, streamId={}", 
streamInfo.getInlongGroupId(),
+                streamInfo.getInlongStreamId());
 
-        // insert the consumer group info into the inlong_consume table
-        String topicName = streamInfo.getMqResource();
-        String clusterTag = inlongPulsarInfo.getInlongClusterTag();
-        String subs = String.format(PULSAR_SUBSCRIPTION_REALTIME_REVIEW, 
clusterTag, topicName);
-        Integer id = consumeService.saveBySystem(groupInfo, topicName, subs);
-        String groupId = streamInfo.getInlongGroupId();
-        log.info("success to save inlong consume [{}] for subs={}, groupId={}, 
topic={}",
-                id, subs, groupId, topicName);
-
-        // cut
         int finalMsgCount = Math.min(messageCount, briefMQMessages.size());
         if (finalMsgCount > 0) {
             return briefMQMessages.subList(0, finalMsgCount);
diff --git 
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/queue/pulsar/QueryLatestMessagesRunnable.java
 
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/queue/pulsar/QueryLatestMessagesRunnable.java
index 189fbcb97d..caf61e0ebe 100644
--- 
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/queue/pulsar/QueryLatestMessagesRunnable.java
+++ 
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/queue/pulsar/QueryLatestMessagesRunnable.java
@@ -33,8 +33,6 @@ import java.util.List;
  */
 public class QueryLatestMessagesRunnable implements Runnable {
 
-    public static final String PULSAR_SUBSCRIPTION_REALTIME_REVIEW = 
"%s_%s_consumer_group_realtime_review";
-
     private InlongPulsarInfo inlongPulsarInfo;
     private InlongStreamInfo streamInfo;
     private PulsarClusterInfo clusterInfo;
@@ -69,11 +67,9 @@ public class QueryLatestMessagesRunnable implements Runnable 
{
         String namespace = inlongPulsarInfo.getMqResource();
         String topicName = streamInfo.getMqResource();
         String fullTopicName = tenant + "/" + namespace + "/" + topicName;
-        String clusterTag = inlongPulsarInfo.getInlongClusterTag();
-        String subs = String.format(PULSAR_SUBSCRIPTION_REALTIME_REVIEW, 
clusterTag, topicName);
         boolean serial = 
InlongConstants.PULSAR_QUEUE_TYPE_SERIAL.equals(inlongPulsarInfo.getQueueModule());
-        List<BriefMQMessage> messages = 
pulsarOperator.queryLatestMessage(clusterInfo, fullTopicName, subs,
-                messageCount, streamInfo, serial);
+        List<BriefMQMessage> messages =
+                pulsarOperator.queryLatestMessage(clusterInfo, fullTopicName, 
messageCount, streamInfo, serial);
         if (CollectionUtils.isNotEmpty(messages)) {
             briefMQMessages.addAll(messages);
             this.latch.countDown(messages.size());

Reply via email to