This is an automated email from the ASF dual-hosted git repository.

nicholasjiang pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/rocketmq-flink.git


The following commit(s) were added to refs/heads/main by this push:
     new 6656964  [Hotfix] RocketMQSourceFunction persists offset in 
notifyCheckpointComplete
     new 638b7fa  Merge pull request #42 from SteNicholas/hotfix-persist-offset
6656964 is described below

commit 6656964b95313b98f8147fefa856a9475db8141c
Author: SteNicholas <[email protected]>
AuthorDate: Wed Jul 20 17:42:01 2022 +0800

    [Hotfix] RocketMQSourceFunction persists offset in notifyCheckpointComplete
---
 .../java/org/apache/rocketmq/flink/legacy/RocketMQSourceFunction.java    | 1 +
 1 file changed, 1 insertion(+)

diff --git 
a/src/main/java/org/apache/rocketmq/flink/legacy/RocketMQSourceFunction.java 
b/src/main/java/org/apache/rocketmq/flink/legacy/RocketMQSourceFunction.java
index 6fb7875..ac3e4d6 100644
--- a/src/main/java/org/apache/rocketmq/flink/legacy/RocketMQSourceFunction.java
+++ b/src/main/java/org/apache/rocketmq/flink/legacy/RocketMQSourceFunction.java
@@ -564,6 +564,7 @@ public class RocketMQSourceFunction<OUT> extends 
RichParallelSourceFunction<OUT>
 
         for (Map.Entry<MessageQueue, Long> entry : offsets.entrySet()) {
             consumer.updateConsumeOffset(entry.getKey(), entry.getValue());
+            
consumer.getOffsetStore().persist(consumer.queueWithNamespace(entry.getKey()));
         }
     }
 }

Reply via email to