This is an automated email from the ASF dual-hosted git repository.
nicholasjiang pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/rocketmq-flink.git
The following commit(s) were added to refs/heads/main by this push:
new 6656964 [Hotfix] RocketMQSourceFunction persists offset in
notifyCheckpointComplete
new 638b7fa Merge pull request #42 from SteNicholas/hotfix-persist-offset
6656964 is described below
commit 6656964b95313b98f8147fefa856a9475db8141c
Author: SteNicholas <[email protected]>
AuthorDate: Wed Jul 20 17:42:01 2022 +0800
[Hotfix] RocketMQSourceFunction persists offset in notifyCheckpointComplete
---
.../java/org/apache/rocketmq/flink/legacy/RocketMQSourceFunction.java | 1 +
1 file changed, 1 insertion(+)
diff --git
a/src/main/java/org/apache/rocketmq/flink/legacy/RocketMQSourceFunction.java
b/src/main/java/org/apache/rocketmq/flink/legacy/RocketMQSourceFunction.java
index 6fb7875..ac3e4d6 100644
--- a/src/main/java/org/apache/rocketmq/flink/legacy/RocketMQSourceFunction.java
+++ b/src/main/java/org/apache/rocketmq/flink/legacy/RocketMQSourceFunction.java
@@ -564,6 +564,7 @@ public class RocketMQSourceFunction<OUT> extends
RichParallelSourceFunction<OUT>
for (Map.Entry<MessageQueue, Long> entry : offsets.entrySet()) {
consumer.updateConsumeOffset(entry.getKey(), entry.getValue());
+
consumer.getOffsetStore().persist(consumer.queueWithNamespace(entry.getKey()));
}
}
}