This is an automated email from the ASF dual-hosted git repository.
fcsaky pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/flink-connector-pulsar.git
The following commit(s) were added to refs/heads/main by this push:
new 0f7b744 [FLINK-37299] Stateless startup cannot continue consuming
from current consumer group location
0f7b744 is described below
commit 0f7b744fe938eaba7afd441faa6c3fe915b84381
Author: yebai1105 <[email protected]>
AuthorDate: Sat Oct 18 02:16:48 2025 +0800
[FLINK-37299] Stateless startup cannot continue consuming from current
consumer group location
---
.../connector/pulsar/source/enumerator/PulsarSourceEnumerator.java | 5 ++++-
1 file changed, 4 insertions(+), 1 deletion(-)
diff --git
a/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/enumerator/PulsarSourceEnumerator.java
b/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/enumerator/PulsarSourceEnumerator.java
index 57bb72b..4837b4d 100644
---
a/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/enumerator/PulsarSourceEnumerator.java
+++
b/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/enumerator/PulsarSourceEnumerator.java
@@ -220,7 +220,10 @@ public class PulsarSourceEnumerator
startCursor.position(partition.getTopic(),
partition.getPartitionId());
try {
- position.setupSubPosition(pulsarClient, topic,
subscriptionName);
+ //If resetSubscriptionCursor is set to true, the position is
reset to the position specified by StartCursor each time
+ if (sourceConfiguration.isResetSubscriptionCursor()) {
+ position.setupSubPosition(pulsarClient, topic,
subscriptionName);
+ }
} catch (PulsarClientException e) {
throw new FlinkRuntimeException(e);
}