This is an automated email from the ASF dual-hosted git repository.

zirui pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git


The following commit(s) were added to refs/heads/master by this push:
     new 13909ba19 [INLONG-7940][Sort] Use pulsar subscriptions and specify 
subscription offset in pulsar connector (#7943)
13909ba19 is described below

commit 13909ba19c7f0054aa54351700b01a4efd1c1d19
Author: Schnapps <[email protected]>
AuthorDate: Mon May 8 14:10:15 2023 +0800

    [INLONG-7940][Sort] Use pulsar subscriptions and specify subscription 
offset in pulsar connector (#7943)
---
 .../node/extract/PulsarExtractNodeTest.java        |  4 +-
 .../sort/pulsar/internal/FlinkPulsarSource.java    | 10 +++++
 .../sort/pulsar/internal/PulsarMetadataReader.java | 45 ++++++++++++++++++++++
 .../inlong/sort/parser/PulsarSqlParserTest.java    |  4 +-
 4 files changed, 59 insertions(+), 4 deletions(-)

diff --git 
a/inlong-sort/sort-common/src/test/java/org/apache/inlong/sort/protocol/node/extract/PulsarExtractNodeTest.java
 
b/inlong-sort/sort-common/src/test/java/org/apache/inlong/sort/protocol/node/extract/PulsarExtractNodeTest.java
index d9e9f5d6e..8ab726472 100644
--- 
a/inlong-sort/sort-common/src/test/java/org/apache/inlong/sort/protocol/node/extract/PulsarExtractNodeTest.java
+++ 
b/inlong-sort/sort-common/src/test/java/org/apache/inlong/sort/protocol/node/extract/PulsarExtractNodeTest.java
@@ -50,7 +50,7 @@ public class PulsarExtractNodeTest extends 
SerializeBaseTest<Node> {
                 format,
                 "earliest",
                 null,
-                null,
-                null);
+                "subscription",
+                "earliest");
     }
 }
diff --git 
a/inlong-sort/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/internal/FlinkPulsarSource.java
 
b/inlong-sort/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/internal/FlinkPulsarSource.java
index 3ec787d0d..9225abd43 100644
--- 
a/inlong-sort/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/internal/FlinkPulsarSource.java
+++ 
b/inlong-sort/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/internal/FlinkPulsarSource.java
@@ -1006,6 +1006,16 @@ public class FlinkPulsarSource<T>
                     }
                 }
                 return specificOffsets;
+            case EXTERNAL_SUBSCRIPTION:
+                Map<TopicRange, MessageId> offsetsFromSubs = new HashMap<>();
+                for (TopicRange topic : topics) {
+                    offsetsFromSubs.put(
+                            topic,
+                            metadataReader.getPositionFromSubscription(
+                                    topic, subscriptionPosition));
+                }
+                log.info("offset for each topic: {}", offsetsFromSubs);
+                return offsetsFromSubs;
         }
         return null;
     }
diff --git 
a/inlong-sort/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/internal/PulsarMetadataReader.java
 
b/inlong-sort/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/internal/PulsarMetadataReader.java
index 10b44b370..780e6803b 100644
--- 
a/inlong-sort/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/internal/PulsarMetadataReader.java
+++ 
b/inlong-sort/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/internal/PulsarMetadataReader.java
@@ -17,6 +17,9 @@
 
 package org.apache.inlong.sort.pulsar.internal;
 
+import java.io.UnsupportedEncodingException;
+import java.net.URLEncoder;
+import java.nio.charset.StandardCharsets;
 import org.apache.flink.configuration.ConfigOption;
 import org.apache.flink.configuration.ConfigOptions;
 import org.apache.flink.streaming.connectors.pulsar.internal.PulsarClientUtils;
@@ -33,9 +36,13 @@ import org.apache.pulsar.client.api.MessageId;
 import org.apache.pulsar.client.api.PulsarClient;
 import org.apache.pulsar.client.api.PulsarClientException;
 import org.apache.pulsar.client.api.Range;
+import org.apache.pulsar.client.impl.MessageIdImpl;
 import org.apache.pulsar.client.impl.PulsarClientImpl;
 import org.apache.pulsar.client.impl.conf.ClientConfigurationData;
 import org.apache.pulsar.common.naming.TopicName;
+import org.apache.pulsar.common.policies.data.PersistentTopicInternalStats;
+import org.apache.pulsar.common.policies.data.SubscriptionStats;
+import org.apache.pulsar.common.policies.data.TopicStats;
 import org.apache.pulsar.shade.com.google.common.collect.Sets;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -260,6 +267,44 @@ public class PulsarMetadataReader implements AutoCloseable 
{
         }
     }
 
+    public MessageId getPositionFromSubscription(TopicRange topic, MessageId 
defaultPosition) {
+        try {
+            String subscriptionName = subscriptionNameFrom(topic);
+            TopicStats topicStats = admin.topics().getStats(topic.getTopic());
+            if (topicStats.getSubscriptions().containsKey(subscriptionName)) {
+                SubscriptionStats subStats = 
topicStats.getSubscriptions().get(subscriptionName);
+                if (subStats.getConsumers().size() != 0) {
+                    throw new IllegalStateException(
+                            "Subscription been actively used by other 
consumers, "
+                                    + "in this situation, the exactly-once 
semantics cannot be guaranteed.");
+                } else {
+                    String encodedSubName =
+                            URLEncoder.encode(subscriptionName, 
StandardCharsets.UTF_8.toString());
+                    PersistentTopicInternalStats.CursorStats c =
+                            admin.topics()
+                                    .getInternalStats(topic.getTopic()).cursors
+                                            .get(encodedSubName);
+                    String[] ids = c.markDeletePosition.split(":", 2);
+                    long ledgerId = Long.parseLong(ids[0]);
+                    long entryIdInMarkDelete = Long.parseLong(ids[1]);
+                    // we are getting the next mid from sub position, if the 
entryId is -1,
+                    // it denotes we haven't read data from the ledger before,
+                    // therefore no need to skip the current entry for the 
next position
+                    long entryId = entryIdInMarkDelete == -1 ? -1 : 
entryIdInMarkDelete + 1;
+                    int partitionIdx = 
TopicName.getPartitionIndex(topic.getTopic());
+                    return new MessageIdImpl(ledgerId, entryId, partitionIdx);
+                }
+            } else {
+                // create sub on topic
+                admin.topics()
+                        .createSubscription(topic.getTopic(), 
subscriptionName, defaultPosition);
+                return defaultPosition;
+            }
+        } catch (PulsarAdminException | UnsupportedEncodingException e) {
+            throw new IllegalStateException("Failed to get stats for topic " + 
topic, e);
+        }
+    }
+
     /**
      * Designate the close of the metadata reader.
      */
diff --git 
a/inlong-sort/sort-core/src/test/java/org/apache/inlong/sort/parser/PulsarSqlParserTest.java
 
b/inlong-sort/sort-core/src/test/java/org/apache/inlong/sort/parser/PulsarSqlParserTest.java
index bc5774dba..8c72bfbfb 100644
--- 
a/inlong-sort/sort-core/src/test/java/org/apache/inlong/sort/parser/PulsarSqlParserTest.java
+++ 
b/inlong-sort/sort-core/src/test/java/org/apache/inlong/sort/parser/PulsarSqlParserTest.java
@@ -74,8 +74,8 @@ public class PulsarSqlParserTest {
                 format,
                 "earliest",
                 null,
-                null,
-                null);
+                "test",
+                "earliest");
     }
 
     private NodeRelation buildNodeRelation(List<Node> inputs, List<Node> 
outputs) {

Reply via email to