This is an automated email from the ASF dual-hosted git repository.

fcsaky pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/flink-connector-pulsar.git


The following commit(s) were added to refs/heads/main by this push:
     new 3b4b184  [FLINK-35477] Make initial cursor position configurable
3b4b184 is described below

commit 3b4b184b1a9d5f57fa0649593ccbbc671e40b53a
Author: xu.guo <[email protected]>
AuthorDate: Mon Apr 14 08:24:39 2025 +0200

    [FLINK-35477] Make initial cursor position configurable
    
    Closes #105
---
 .../connector/pulsar/source/PulsarSourceBuilder.java      |  9 +++++++++
 .../connector/pulsar/source/PulsarSourceOptions.java      |  8 ++++++++
 .../pulsar/source/config/SourceConfiguration.java         | 15 +++++++++++++--
 .../pulsar/source/enumerator/PulsarSourceEnumerator.java  |  3 ++-
 .../pulsar/source/reader/PulsarPartitionSplitReader.java  |  3 +++
 5 files changed, 35 insertions(+), 3 deletions(-)

diff --git 
a/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/PulsarSourceBuilder.java
 
b/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/PulsarSourceBuilder.java
index 687e9b8..ac9695a 100644
--- 
a/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/PulsarSourceBuilder.java
+++ 
b/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/PulsarSourceBuilder.java
@@ -46,6 +46,7 @@ import 
org.apache.flink.connector.pulsar.source.reader.deserializer.PulsarTypeIn
 import org.apache.pulsar.client.api.ConsumerCryptoFailureAction;
 import org.apache.pulsar.client.api.RegexSubscriptionMode;
 import org.apache.pulsar.client.api.Schema;
+import org.apache.pulsar.client.api.SubscriptionInitialPosition;
 import org.apache.pulsar.client.api.schema.GenericRecord;
 import org.apache.pulsar.common.schema.KeyValue;
 import org.apache.pulsar.common.schema.SchemaInfo;
@@ -56,6 +57,7 @@ import org.slf4j.LoggerFactory;
 import java.util.Arrays;
 import java.util.List;
 import java.util.Map;
+import java.util.Objects;
 import java.util.Properties;
 import java.util.regex.Pattern;
 
@@ -65,6 +67,7 @@ import static 
org.apache.flink.connector.pulsar.common.config.PulsarOptions.PULS
 import static 
org.apache.flink.connector.pulsar.common.config.PulsarOptions.PULSAR_SERVICE_URL;
 import static 
org.apache.flink.connector.pulsar.source.PulsarSourceOptions.PULSAR_CONSUMER_NAME;
 import static 
org.apache.flink.connector.pulsar.source.PulsarSourceOptions.PULSAR_CRYPTO_FAILURE_ACTION;
+import static 
org.apache.flink.connector.pulsar.source.PulsarSourceOptions.PULSAR_INITIAL_CURSOR;
 import static 
org.apache.flink.connector.pulsar.source.PulsarSourceOptions.PULSAR_PARTITION_DISCOVERY_INTERVAL_MS;
 import static 
org.apache.flink.connector.pulsar.source.PulsarSourceOptions.PULSAR_READ_SCHEMA_EVOLUTION;
 import static 
org.apache.flink.connector.pulsar.source.PulsarSourceOptions.PULSAR_SUBSCRIPTION_NAME;
@@ -590,6 +593,12 @@ public final class PulsarSourceBuilder<OUT> {
             }
         }
 
+        if (Objects.equals(startCursor, StartCursor.latest())) {
+            configBuilder.set(PULSAR_INITIAL_CURSOR, 
SubscriptionInitialPosition.Latest);
+        } else {
+            configBuilder.set(PULSAR_INITIAL_CURSOR, 
SubscriptionInitialPosition.Earliest);
+        }
+
         // Make sure they are serializable.
         checkState(
                 isSerializable(deserializationSchema),
diff --git 
a/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/PulsarSourceOptions.java
 
b/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/PulsarSourceOptions.java
index de1595a..3346fed 100644
--- 
a/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/PulsarSourceOptions.java
+++ 
b/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/PulsarSourceOptions.java
@@ -28,6 +28,7 @@ import 
org.apache.flink.connector.pulsar.common.config.PulsarOptions;
 import org.apache.flink.connector.pulsar.source.config.CursorVerification;
 
 import org.apache.pulsar.client.api.ConsumerCryptoFailureAction;
+import org.apache.pulsar.client.api.SubscriptionInitialPosition;
 import org.apache.pulsar.client.api.SubscriptionMode;
 
 import java.time.Duration;
@@ -232,6 +233,13 @@ public final class PulsarSourceOptions {
                                             code("StartCursor"))
                                     .build());
 
+    public static final ConfigOption<SubscriptionInitialPosition> 
PULSAR_INITIAL_CURSOR =
+            ConfigOptions.key(SOURCE_CONFIG_PREFIX + "initialCursor")
+                    .enumType(SubscriptionInitialPosition.class)
+                    .defaultValue(SubscriptionInitialPosition.Latest)
+                    .withDescription(
+                            Description.builder().text("Consumer initial 
position.").build());
+
     
///////////////////////////////////////////////////////////////////////////////
     //
     // The configuration for ConsumerConfigurationData part.
diff --git 
a/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/config/SourceConfiguration.java
 
b/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/config/SourceConfiguration.java
index 2b47297..765db30 100644
--- 
a/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/config/SourceConfiguration.java
+++ 
b/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/config/SourceConfiguration.java
@@ -27,6 +27,7 @@ import 
org.apache.flink.connector.pulsar.source.enumerator.cursor.StartCursor;
 
 import org.apache.pulsar.client.api.ConsumerBuilder;
 import org.apache.pulsar.client.api.Schema;
+import org.apache.pulsar.client.api.SubscriptionInitialPosition;
 import org.apache.pulsar.client.api.SubscriptionMode;
 import org.apache.pulsar.client.api.SubscriptionType;
 
@@ -40,6 +41,7 @@ import static 
org.apache.flink.connector.pulsar.source.PulsarSourceOptions.PULSA
 import static 
org.apache.flink.connector.pulsar.source.PulsarSourceOptions.PULSAR_ENABLE_AUTO_ACKNOWLEDGE_MESSAGE;
 import static 
org.apache.flink.connector.pulsar.source.PulsarSourceOptions.PULSAR_ENABLE_SOURCE_METRICS;
 import static 
org.apache.flink.connector.pulsar.source.PulsarSourceOptions.PULSAR_FETCH_ONE_MESSAGE_TIME;
+import static 
org.apache.flink.connector.pulsar.source.PulsarSourceOptions.PULSAR_INITIAL_CURSOR;
 import static 
org.apache.flink.connector.pulsar.source.PulsarSourceOptions.PULSAR_MAX_FETCH_RECORDS;
 import static 
org.apache.flink.connector.pulsar.source.PulsarSourceOptions.PULSAR_MAX_FETCH_TIME;
 import static 
org.apache.flink.connector.pulsar.source.PulsarSourceOptions.PULSAR_PARTITION_DISCOVERY_INTERVAL_MS;
@@ -68,6 +70,7 @@ public class SourceConfiguration extends PulsarConfiguration {
     private final boolean enableSchemaEvolution;
     private final boolean enableMetrics;
     private final boolean resetSubscriptionCursor;
+    private final SubscriptionInitialPosition subscriptionInitialPosition;
 
     public SourceConfiguration(Configuration configuration) {
         super(configuration);
@@ -87,6 +90,7 @@ public class SourceConfiguration extends PulsarConfiguration {
         this.enableMetrics =
                 get(PULSAR_ENABLE_SOURCE_METRICS) && 
get(PULSAR_STATS_INTERVAL_SECONDS) > 0;
         this.resetSubscriptionCursor = get(PULSAR_RESET_SUBSCRIPTION_CURSOR);
+        this.subscriptionInitialPosition = get(PULSAR_INITIAL_CURSOR);
     }
 
     /** The capacity of the element queue in the source reader. */
@@ -209,6 +213,11 @@ public class SourceConfiguration extends 
PulsarConfiguration {
         return getSubscriptionName() + "(Exclusive," + getSubscriptionMode() + 
")";
     }
 
+    /** The initial position for the subscription. */
+    public SubscriptionInitialPosition getInitialPosition() {
+        return subscriptionInitialPosition;
+    }
+
     @Override
     public boolean equals(Object o) {
         if (this == o) {
@@ -234,7 +243,8 @@ public class SourceConfiguration extends 
PulsarConfiguration {
                 && allowKeySharedOutOfOrderDelivery == 
that.allowKeySharedOutOfOrderDelivery
                 && enableSchemaEvolution == that.enableSchemaEvolution
                 && enableMetrics == that.enableMetrics
-                && resetSubscriptionCursor == that.resetSubscriptionCursor;
+                && resetSubscriptionCursor == that.resetSubscriptionCursor
+                && subscriptionInitialPosition == 
that.subscriptionInitialPosition;
     }
 
     @Override
@@ -254,6 +264,7 @@ public class SourceConfiguration extends 
PulsarConfiguration {
                 allowKeySharedOutOfOrderDelivery,
                 enableSchemaEvolution,
                 enableMetrics,
-                resetSubscriptionCursor);
+                resetSubscriptionCursor,
+                subscriptionInitialPosition);
     }
 }
diff --git 
a/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/enumerator/PulsarSourceEnumerator.java
 
b/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/enumerator/PulsarSourceEnumerator.java
index 4837b4d..550bb7d 100644
--- 
a/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/enumerator/PulsarSourceEnumerator.java
+++ 
b/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/enumerator/PulsarSourceEnumerator.java
@@ -220,7 +220,8 @@ public class PulsarSourceEnumerator
                     startCursor.position(partition.getTopic(), 
partition.getPartitionId());
 
             try {
-                //If resetSubscriptionCursor is set to true, the position is 
reset to the position specified by StartCursor each time
+                // If resetSubscriptionCursor is set to true, the position is 
reset to the position
+                // specified by StartCursor each time
                 if (sourceConfiguration.isResetSubscriptionCursor()) {
                     position.setupSubPosition(pulsarClient, topic, 
subscriptionName);
                 }
diff --git 
a/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/reader/PulsarPartitionSplitReader.java
 
b/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/reader/PulsarPartitionSplitReader.java
index 2b196e3..6e76e98 100644
--- 
a/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/reader/PulsarPartitionSplitReader.java
+++ 
b/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/reader/PulsarPartitionSplitReader.java
@@ -313,6 +313,9 @@ public class PulsarPartitionSplitReader
             consumerBuilder.keySharedPolicy(policy);
         }
 
+        // set initial position
+        
consumerBuilder.subscriptionInitialPosition(sourceConfiguration.getInitialPosition());
+
         // Create the consumer configuration by using common utils.
         Consumer<byte[]> consumer = consumerBuilder.subscribe();
 

Reply via email to