This is an automated email from the ASF dual-hosted git repository.

fcsaky pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/flink-connector-pulsar.git


The following commit(s) were added to refs/heads/main by this push:
     new 0f7b744  [FLINK-37299] Stateless startup cannot continue consuming 
from current consumer group location
0f7b744 is described below

commit 0f7b744fe938eaba7afd441faa6c3fe915b84381
Author: yebai1105 <[email protected]>
AuthorDate: Sat Oct 18 02:16:48 2025 +0800

    [FLINK-37299] Stateless startup cannot continue consuming from current 
consumer group location
---
 .../connector/pulsar/source/enumerator/PulsarSourceEnumerator.java   | 5 ++++-
 1 file changed, 4 insertions(+), 1 deletion(-)

diff --git 
a/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/enumerator/PulsarSourceEnumerator.java
 
b/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/enumerator/PulsarSourceEnumerator.java
index 57bb72b..4837b4d 100644
--- 
a/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/enumerator/PulsarSourceEnumerator.java
+++ 
b/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/enumerator/PulsarSourceEnumerator.java
@@ -220,7 +220,10 @@ public class PulsarSourceEnumerator
                     startCursor.position(partition.getTopic(), 
partition.getPartitionId());
 
             try {
-                position.setupSubPosition(pulsarClient, topic, 
subscriptionName);
+                //If resetSubscriptionCursor is set to true, the position is 
reset to the position specified by StartCursor each time
+                if (sourceConfiguration.isResetSubscriptionCursor()) {
+                    position.setupSubPosition(pulsarClient, topic, 
subscriptionName);
+                }
             } catch (PulsarClientException e) {
                 throw new FlinkRuntimeException(e);
             }

Reply via email to