This is an automated email from the ASF dual-hosted git repository.
zirui pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git
The following commit(s) were added to refs/heads/master by this push:
new 13909ba19 [INLONG-7940][Sort] Use pulsar subscriptions and specify
subscription offset in pulsar connector (#7943)
13909ba19 is described below
commit 13909ba19c7f0054aa54351700b01a4efd1c1d19
Author: Schnapps <[email protected]>
AuthorDate: Mon May 8 14:10:15 2023 +0800
[INLONG-7940][Sort] Use pulsar subscriptions and specify subscription
offset in pulsar connector (#7943)
---
.../node/extract/PulsarExtractNodeTest.java | 4 +-
.../sort/pulsar/internal/FlinkPulsarSource.java | 10 +++++
.../sort/pulsar/internal/PulsarMetadataReader.java | 45 ++++++++++++++++++++++
.../inlong/sort/parser/PulsarSqlParserTest.java | 4 +-
4 files changed, 59 insertions(+), 4 deletions(-)
diff --git
a/inlong-sort/sort-common/src/test/java/org/apache/inlong/sort/protocol/node/extract/PulsarExtractNodeTest.java
b/inlong-sort/sort-common/src/test/java/org/apache/inlong/sort/protocol/node/extract/PulsarExtractNodeTest.java
index d9e9f5d6e..8ab726472 100644
---
a/inlong-sort/sort-common/src/test/java/org/apache/inlong/sort/protocol/node/extract/PulsarExtractNodeTest.java
+++
b/inlong-sort/sort-common/src/test/java/org/apache/inlong/sort/protocol/node/extract/PulsarExtractNodeTest.java
@@ -50,7 +50,7 @@ public class PulsarExtractNodeTest extends
SerializeBaseTest<Node> {
format,
"earliest",
null,
- null,
- null);
+ "subscription",
+ "earliest");
}
}
diff --git
a/inlong-sort/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/internal/FlinkPulsarSource.java
b/inlong-sort/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/internal/FlinkPulsarSource.java
index 3ec787d0d..9225abd43 100644
---
a/inlong-sort/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/internal/FlinkPulsarSource.java
+++
b/inlong-sort/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/internal/FlinkPulsarSource.java
@@ -1006,6 +1006,16 @@ public class FlinkPulsarSource<T>
}
}
return specificOffsets;
+ case EXTERNAL_SUBSCRIPTION:
+ Map<TopicRange, MessageId> offsetsFromSubs = new HashMap<>();
+ for (TopicRange topic : topics) {
+ offsetsFromSubs.put(
+ topic,
+ metadataReader.getPositionFromSubscription(
+ topic, subscriptionPosition));
+ }
+ log.info("offset for each topic: {}", offsetsFromSubs);
+ return offsetsFromSubs;
}
return null;
}
diff --git
a/inlong-sort/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/internal/PulsarMetadataReader.java
b/inlong-sort/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/internal/PulsarMetadataReader.java
index 10b44b370..780e6803b 100644
---
a/inlong-sort/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/internal/PulsarMetadataReader.java
+++
b/inlong-sort/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/internal/PulsarMetadataReader.java
@@ -17,6 +17,9 @@
package org.apache.inlong.sort.pulsar.internal;
+import java.io.UnsupportedEncodingException;
+import java.net.URLEncoder;
+import java.nio.charset.StandardCharsets;
import org.apache.flink.configuration.ConfigOption;
import org.apache.flink.configuration.ConfigOptions;
import org.apache.flink.streaming.connectors.pulsar.internal.PulsarClientUtils;
@@ -33,9 +36,13 @@ import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.api.Range;
+import org.apache.pulsar.client.impl.MessageIdImpl;
import org.apache.pulsar.client.impl.PulsarClientImpl;
import org.apache.pulsar.client.impl.conf.ClientConfigurationData;
import org.apache.pulsar.common.naming.TopicName;
+import org.apache.pulsar.common.policies.data.PersistentTopicInternalStats;
+import org.apache.pulsar.common.policies.data.SubscriptionStats;
+import org.apache.pulsar.common.policies.data.TopicStats;
import org.apache.pulsar.shade.com.google.common.collect.Sets;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -260,6 +267,44 @@ public class PulsarMetadataReader implements AutoCloseable
{
}
}
+ public MessageId getPositionFromSubscription(TopicRange topic, MessageId
defaultPosition) {
+ try {
+ String subscriptionName = subscriptionNameFrom(topic);
+ TopicStats topicStats = admin.topics().getStats(topic.getTopic());
+ if (topicStats.getSubscriptions().containsKey(subscriptionName)) {
+ SubscriptionStats subStats =
topicStats.getSubscriptions().get(subscriptionName);
+ if (subStats.getConsumers().size() != 0) {
+ throw new IllegalStateException(
+ "Subscription been actively used by other
consumers, "
+ + "in this situation, the exactly-once
semantics cannot be guaranteed.");
+ } else {
+ String encodedSubName =
+ URLEncoder.encode(subscriptionName,
StandardCharsets.UTF_8.toString());
+ PersistentTopicInternalStats.CursorStats c =
+ admin.topics()
+ .getInternalStats(topic.getTopic()).cursors
+ .get(encodedSubName);
+ String[] ids = c.markDeletePosition.split(":", 2);
+ long ledgerId = Long.parseLong(ids[0]);
+ long entryIdInMarkDelete = Long.parseLong(ids[1]);
+ // we are getting the next mid from sub position, if the
entryId is -1,
+ // it denotes we haven't read data from the ledger before,
+ // therefore no need to skip the current entry for the
next position
+ long entryId = entryIdInMarkDelete == -1 ? -1 :
entryIdInMarkDelete + 1;
+ int partitionIdx =
TopicName.getPartitionIndex(topic.getTopic());
+ return new MessageIdImpl(ledgerId, entryId, partitionIdx);
+ }
+ } else {
+ // create sub on topic
+ admin.topics()
+ .createSubscription(topic.getTopic(),
subscriptionName, defaultPosition);
+ return defaultPosition;
+ }
+ } catch (PulsarAdminException | UnsupportedEncodingException e) {
+ throw new IllegalStateException("Failed to get stats for topic " +
topic, e);
+ }
+ }
+
/**
* Designate the close of the metadata reader.
*/
diff --git
a/inlong-sort/sort-core/src/test/java/org/apache/inlong/sort/parser/PulsarSqlParserTest.java
b/inlong-sort/sort-core/src/test/java/org/apache/inlong/sort/parser/PulsarSqlParserTest.java
index bc5774dba..8c72bfbfb 100644
---
a/inlong-sort/sort-core/src/test/java/org/apache/inlong/sort/parser/PulsarSqlParserTest.java
+++
b/inlong-sort/sort-core/src/test/java/org/apache/inlong/sort/parser/PulsarSqlParserTest.java
@@ -74,8 +74,8 @@ public class PulsarSqlParserTest {
format,
"earliest",
null,
- null,
- null);
+ "test",
+ "earliest");
}
private NodeRelation buildNodeRelation(List<Node> inputs, List<Node>
outputs) {