This is an automated email from the ASF dual-hosted git repository.

duhengforever pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/rocketmq-flink.git

commit c9564d6d2b122345889d33b02e2d4ae5098a675c
Author: MeYJ <[email protected]>
AuthorDate: Mon Jun 3 11:33:38 2019 +0800

    fix(module): fix load wrong offset from savepoint (#288)
---
 src/main/java/org/apache/rocketmq/flink/RocketMQSource.java | 4 +++-
 1 file changed, 3 insertions(+), 1 deletion(-)

diff --git a/src/main/java/org/apache/rocketmq/flink/RocketMQSource.java 
b/src/main/java/org/apache/rocketmq/flink/RocketMQSource.java
index 9940e8e..ccd6bb4 100644
--- a/src/main/java/org/apache/rocketmq/flink/RocketMQSource.java
+++ b/src/main/java/org/apache/rocketmq/flink/RocketMQSource.java
@@ -320,7 +320,9 @@ public class RocketMQSource<OUT> extends 
RichParallelSourceFunction<OUT>
             for (Tuple2<MessageQueue, Long> mqOffsets : 
unionOffsetStates.get()) {
                 // unionOffsetStates is the restored global union state;
                 // should only snapshot mqs that actually belong to us
-                restoredOffsets.put(mqOffsets.f0, mqOffsets.f1);
+                if (!restoredOffsets.containsKey(mqOffsets.f0) || 
restoredOffsets.get(mqOffsets.f0) < mqOffsets.f1) {
+                    restoredOffsets.put(mqOffsets.f0, mqOffsets.f1);
+                }
             }
             LOG.info("Setting restore state in the consumer. Using the 
following offsets: {}", restoredOffsets);
         } else {

Reply via email to