This is an automated email from the ASF dual-hosted git repository.
fcsaky pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/flink-connector-pulsar.git
The following commit(s) were added to refs/heads/main by this push:
new 3b4b184 [FLINK-35477] Make initial cursor position configurable
3b4b184 is described below
commit 3b4b184b1a9d5f57fa0649593ccbbc671e40b53a
Author: xu.guo <[email protected]>
AuthorDate: Mon Apr 14 08:24:39 2025 +0200
[FLINK-35477] Make initial cursor position configurable
Closes #105
---
.../connector/pulsar/source/PulsarSourceBuilder.java | 9 +++++++++
.../connector/pulsar/source/PulsarSourceOptions.java | 8 ++++++++
.../pulsar/source/config/SourceConfiguration.java | 15 +++++++++++++--
.../pulsar/source/enumerator/PulsarSourceEnumerator.java | 3 ++-
.../pulsar/source/reader/PulsarPartitionSplitReader.java | 3 +++
5 files changed, 35 insertions(+), 3 deletions(-)
diff --git
a/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/PulsarSourceBuilder.java
b/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/PulsarSourceBuilder.java
index 687e9b8..ac9695a 100644
---
a/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/PulsarSourceBuilder.java
+++
b/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/PulsarSourceBuilder.java
@@ -46,6 +46,7 @@ import
org.apache.flink.connector.pulsar.source.reader.deserializer.PulsarTypeIn
import org.apache.pulsar.client.api.ConsumerCryptoFailureAction;
import org.apache.pulsar.client.api.RegexSubscriptionMode;
import org.apache.pulsar.client.api.Schema;
+import org.apache.pulsar.client.api.SubscriptionInitialPosition;
import org.apache.pulsar.client.api.schema.GenericRecord;
import org.apache.pulsar.common.schema.KeyValue;
import org.apache.pulsar.common.schema.SchemaInfo;
@@ -56,6 +57,7 @@ import org.slf4j.LoggerFactory;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
+import java.util.Objects;
import java.util.Properties;
import java.util.regex.Pattern;
@@ -65,6 +67,7 @@ import static
org.apache.flink.connector.pulsar.common.config.PulsarOptions.PULS
import static
org.apache.flink.connector.pulsar.common.config.PulsarOptions.PULSAR_SERVICE_URL;
import static
org.apache.flink.connector.pulsar.source.PulsarSourceOptions.PULSAR_CONSUMER_NAME;
import static
org.apache.flink.connector.pulsar.source.PulsarSourceOptions.PULSAR_CRYPTO_FAILURE_ACTION;
+import static
org.apache.flink.connector.pulsar.source.PulsarSourceOptions.PULSAR_INITIAL_CURSOR;
import static
org.apache.flink.connector.pulsar.source.PulsarSourceOptions.PULSAR_PARTITION_DISCOVERY_INTERVAL_MS;
import static
org.apache.flink.connector.pulsar.source.PulsarSourceOptions.PULSAR_READ_SCHEMA_EVOLUTION;
import static
org.apache.flink.connector.pulsar.source.PulsarSourceOptions.PULSAR_SUBSCRIPTION_NAME;
@@ -590,6 +593,12 @@ public final class PulsarSourceBuilder<OUT> {
}
}
+ if (Objects.equals(startCursor, StartCursor.latest())) {
+ configBuilder.set(PULSAR_INITIAL_CURSOR,
SubscriptionInitialPosition.Latest);
+ } else {
+ configBuilder.set(PULSAR_INITIAL_CURSOR,
SubscriptionInitialPosition.Earliest);
+ }
+
// Make sure they are serializable.
checkState(
isSerializable(deserializationSchema),
diff --git
a/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/PulsarSourceOptions.java
b/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/PulsarSourceOptions.java
index de1595a..3346fed 100644
---
a/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/PulsarSourceOptions.java
+++
b/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/PulsarSourceOptions.java
@@ -28,6 +28,7 @@ import
org.apache.flink.connector.pulsar.common.config.PulsarOptions;
import org.apache.flink.connector.pulsar.source.config.CursorVerification;
import org.apache.pulsar.client.api.ConsumerCryptoFailureAction;
+import org.apache.pulsar.client.api.SubscriptionInitialPosition;
import org.apache.pulsar.client.api.SubscriptionMode;
import java.time.Duration;
@@ -232,6 +233,13 @@ public final class PulsarSourceOptions {
code("StartCursor"))
.build());
+ public static final ConfigOption<SubscriptionInitialPosition>
PULSAR_INITIAL_CURSOR =
+ ConfigOptions.key(SOURCE_CONFIG_PREFIX + "initialCursor")
+ .enumType(SubscriptionInitialPosition.class)
+ .defaultValue(SubscriptionInitialPosition.Latest)
+ .withDescription(
+ Description.builder().text("Consumer initial
position.").build());
+
///////////////////////////////////////////////////////////////////////////////
//
// The configuration for ConsumerConfigurationData part.
diff --git
a/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/config/SourceConfiguration.java
b/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/config/SourceConfiguration.java
index 2b47297..765db30 100644
---
a/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/config/SourceConfiguration.java
+++
b/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/config/SourceConfiguration.java
@@ -27,6 +27,7 @@ import
org.apache.flink.connector.pulsar.source.enumerator.cursor.StartCursor;
import org.apache.pulsar.client.api.ConsumerBuilder;
import org.apache.pulsar.client.api.Schema;
+import org.apache.pulsar.client.api.SubscriptionInitialPosition;
import org.apache.pulsar.client.api.SubscriptionMode;
import org.apache.pulsar.client.api.SubscriptionType;
@@ -40,6 +41,7 @@ import static
org.apache.flink.connector.pulsar.source.PulsarSourceOptions.PULSA
import static
org.apache.flink.connector.pulsar.source.PulsarSourceOptions.PULSAR_ENABLE_AUTO_ACKNOWLEDGE_MESSAGE;
import static
org.apache.flink.connector.pulsar.source.PulsarSourceOptions.PULSAR_ENABLE_SOURCE_METRICS;
import static
org.apache.flink.connector.pulsar.source.PulsarSourceOptions.PULSAR_FETCH_ONE_MESSAGE_TIME;
+import static
org.apache.flink.connector.pulsar.source.PulsarSourceOptions.PULSAR_INITIAL_CURSOR;
import static
org.apache.flink.connector.pulsar.source.PulsarSourceOptions.PULSAR_MAX_FETCH_RECORDS;
import static
org.apache.flink.connector.pulsar.source.PulsarSourceOptions.PULSAR_MAX_FETCH_TIME;
import static
org.apache.flink.connector.pulsar.source.PulsarSourceOptions.PULSAR_PARTITION_DISCOVERY_INTERVAL_MS;
@@ -68,6 +70,7 @@ public class SourceConfiguration extends PulsarConfiguration {
private final boolean enableSchemaEvolution;
private final boolean enableMetrics;
private final boolean resetSubscriptionCursor;
+ private final SubscriptionInitialPosition subscriptionInitialPosition;
public SourceConfiguration(Configuration configuration) {
super(configuration);
@@ -87,6 +90,7 @@ public class SourceConfiguration extends PulsarConfiguration {
this.enableMetrics =
get(PULSAR_ENABLE_SOURCE_METRICS) &&
get(PULSAR_STATS_INTERVAL_SECONDS) > 0;
this.resetSubscriptionCursor = get(PULSAR_RESET_SUBSCRIPTION_CURSOR);
+ this.subscriptionInitialPosition = get(PULSAR_INITIAL_CURSOR);
}
/** The capacity of the element queue in the source reader. */
@@ -209,6 +213,11 @@ public class SourceConfiguration extends
PulsarConfiguration {
return getSubscriptionName() + "(Exclusive," + getSubscriptionMode() +
")";
}
+ /** The initial position for the subscription. */
+ public SubscriptionInitialPosition getInitialPosition() {
+ return subscriptionInitialPosition;
+ }
+
@Override
public boolean equals(Object o) {
if (this == o) {
@@ -234,7 +243,8 @@ public class SourceConfiguration extends
PulsarConfiguration {
&& allowKeySharedOutOfOrderDelivery ==
that.allowKeySharedOutOfOrderDelivery
&& enableSchemaEvolution == that.enableSchemaEvolution
&& enableMetrics == that.enableMetrics
- && resetSubscriptionCursor == that.resetSubscriptionCursor;
+ && resetSubscriptionCursor == that.resetSubscriptionCursor
+ && subscriptionInitialPosition ==
that.subscriptionInitialPosition;
}
@Override
@@ -254,6 +264,7 @@ public class SourceConfiguration extends
PulsarConfiguration {
allowKeySharedOutOfOrderDelivery,
enableSchemaEvolution,
enableMetrics,
- resetSubscriptionCursor);
+ resetSubscriptionCursor,
+ subscriptionInitialPosition);
}
}
diff --git
a/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/enumerator/PulsarSourceEnumerator.java
b/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/enumerator/PulsarSourceEnumerator.java
index 4837b4d..550bb7d 100644
---
a/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/enumerator/PulsarSourceEnumerator.java
+++
b/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/enumerator/PulsarSourceEnumerator.java
@@ -220,7 +220,8 @@ public class PulsarSourceEnumerator
startCursor.position(partition.getTopic(),
partition.getPartitionId());
try {
- //If resetSubscriptionCursor is set to true, the position is
reset to the position specified by StartCursor each time
+ // If resetSubscriptionCursor is set to true, the position is
reset to the position
+ // specified by StartCursor each time
if (sourceConfiguration.isResetSubscriptionCursor()) {
position.setupSubPosition(pulsarClient, topic,
subscriptionName);
}
diff --git
a/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/reader/PulsarPartitionSplitReader.java
b/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/reader/PulsarPartitionSplitReader.java
index 2b196e3..6e76e98 100644
---
a/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/reader/PulsarPartitionSplitReader.java
+++
b/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/reader/PulsarPartitionSplitReader.java
@@ -313,6 +313,9 @@ public class PulsarPartitionSplitReader
consumerBuilder.keySharedPolicy(policy);
}
+ // set initial position
+
consumerBuilder.subscriptionInitialPosition(sourceConfiguration.getInitialPosition());
+
// Create the consumer configuration by using common utils.
Consumer<byte[]> consumer = consumerBuilder.subscribe();