This is an automated email from the ASF dual-hosted git repository.

lizhimin pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/rocketmq-flink.git


The following commit(s) were added to refs/heads/main by this push:
     new 422909f  [ISSUE #76] Fix bug when the job restore from ck (#77)
422909f is described below

commit 422909f352808b15f9eef1e699af0bd0fff17a88
Author: 高思伟 <[email protected]>
AuthorDate: Wed Dec 7 10:37:22 2022 +0800

    [ISSUE #76] Fix bug when the job restore from ck (#77)
    
    Fix bug when the job restore from ck
    
    Co-authored-by: 高思伟 <[email protected]>
---
 .../apache/rocketmq/flink/legacy/RocketMQSourceFunction.java | 12 +++++-------
 .../legacy/sourceFunction/RocketMQSourceFunctionTest.java    |  3 ++-
 2 files changed, 7 insertions(+), 8 deletions(-)

diff --git 
a/src/main/java/org/apache/rocketmq/flink/legacy/RocketMQSourceFunction.java 
b/src/main/java/org/apache/rocketmq/flink/legacy/RocketMQSourceFunction.java
index 97037a1..fa57839 100644
--- a/src/main/java/org/apache/rocketmq/flink/legacy/RocketMQSourceFunction.java
+++ b/src/main/java/org/apache/rocketmq/flink/legacy/RocketMQSourceFunction.java
@@ -176,10 +176,6 @@ public class RocketMQSourceFunction<OUT> extends 
RichParallelSourceFunction<OUT>
         if (restoredOffsets == null) {
             restoredOffsets = new ConcurrentHashMap<>();
         }
-
-        // use restoredOffsets to init offset table.
-        initOffsetTableFromRestoredOffsets();
-
         if (pendingOffsetsToCommit == null) {
             pendingOffsetsToCommit = new LinkedMap();
         }
@@ -250,7 +246,9 @@ public class RocketMQSourceFunction<OUT> extends 
RichParallelSourceFunction<OUT>
                 RocketMQUtils.allocate(totalQueues, taskNumber, 
ctx.getIndexOfThisSubtask());
         // If the job recovers from the state, the state has already contained 
the offsets of last
         // commit.
-        if (!restored) {
+        if (restored) {
+            initOffsetTableFromRestoredOffsets(messageQueues);
+        } else {
             initOffsets(messageQueues);
         }
     }
@@ -539,11 +537,11 @@ public class RocketMQSourceFunction<OUT> extends 
RichParallelSourceFunction<OUT>
         }
     }
 
-    public void initOffsetTableFromRestoredOffsets() {
+    public void initOffsetTableFromRestoredOffsets(List<MessageQueue> 
messageQueues) {
         Preconditions.checkNotNull(restoredOffsets, "restoredOffsets can't be 
null");
         restoredOffsets.forEach(
                 (mq, offset) -> {
-                    if (!offsetTable.containsKey(mq) || offsetTable.get(mq) < 
offset) {
+                    if (messageQueues.contains(mq)) {
                         offsetTable.put(mq, offset);
                     }
                 });
diff --git 
a/src/test/java/org/apache/rocketmq/flink/legacy/sourceFunction/RocketMQSourceFunctionTest.java
 
b/src/test/java/org/apache/rocketmq/flink/legacy/sourceFunction/RocketMQSourceFunctionTest.java
index 8b7b44f..6ef73e6 100644
--- 
a/src/test/java/org/apache/rocketmq/flink/legacy/sourceFunction/RocketMQSourceFunctionTest.java
+++ 
b/src/test/java/org/apache/rocketmq/flink/legacy/sourceFunction/RocketMQSourceFunctionTest.java
@@ -27,6 +27,7 @@ import 
org.apache.rocketmq.flink.legacy.common.serialization.SimpleStringDeseria
 
 import org.junit.Test;
 
+import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.Map;
 import java.util.Properties;
@@ -78,7 +79,7 @@ public class RocketMQSourceFunctionTest {
         map.put(new MessageQueue("tpc", "broker-1", 1), 31L);
         setFieldValue(source, "restoredOffsets", map);
         setFieldValue(source, "offsetTable", new ConcurrentHashMap<>());
-        source.initOffsetTableFromRestoredOffsets();
+        source.initOffsetTableFromRestoredOffsets(new 
ArrayList<>(map.keySet()));
         Map<MessageQueue, Long> offsetTable = (Map) getFieldValue(source, 
"offsetTable");
         for (Map.Entry<MessageQueue, Long> entry : map.entrySet()) {
             assertEquals(offsetTable.containsKey(entry.getKey()), true);

Reply via email to