This is an automated email from the ASF dual-hosted git repository. duhengforever pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/rocketmq-flink.git
commit c9564d6d2b122345889d33b02e2d4ae5098a675c Author: MeYJ <[email protected]> AuthorDate: Mon Jun 3 11:33:38 2019 +0800 fix(module): fix load wrong offset from savepoint (#288) --- src/main/java/org/apache/rocketmq/flink/RocketMQSource.java | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/src/main/java/org/apache/rocketmq/flink/RocketMQSource.java b/src/main/java/org/apache/rocketmq/flink/RocketMQSource.java index 9940e8e..ccd6bb4 100644 --- a/src/main/java/org/apache/rocketmq/flink/RocketMQSource.java +++ b/src/main/java/org/apache/rocketmq/flink/RocketMQSource.java @@ -320,7 +320,9 @@ public class RocketMQSource<OUT> extends RichParallelSourceFunction<OUT> for (Tuple2<MessageQueue, Long> mqOffsets : unionOffsetStates.get()) { // unionOffsetStates is the restored global union state; // should only snapshot mqs that actually belong to us - restoredOffsets.put(mqOffsets.f0, mqOffsets.f1); + if (!restoredOffsets.containsKey(mqOffsets.f0) || restoredOffsets.get(mqOffsets.f0) < mqOffsets.f1) { + restoredOffsets.put(mqOffsets.f0, mqOffsets.f1); + } } LOG.info("Setting restore state in the consumer. Using the following offsets: {}", restoredOffsets); } else {
