This is an automated email from the ASF dual-hosted git repository. duhengforever pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/rocketmq-flink.git
commit 58d07fbe52968e43e7eb041045bbcc194bd904b7 Author: tangyoupeng <[email protected]> AuthorDate: Mon Jun 3 11:10:17 2019 +0800 Fix getting wrong offset bug when the source restart (#190) --- src/main/java/org/apache/rocketmq/flink/RocketMQSource.java | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/src/main/java/org/apache/rocketmq/flink/RocketMQSource.java b/src/main/java/org/apache/rocketmq/flink/RocketMQSource.java index 8e8e57b..9940e8e 100644 --- a/src/main/java/org/apache/rocketmq/flink/RocketMQSource.java +++ b/src/main/java/org/apache/rocketmq/flink/RocketMQSource.java @@ -22,6 +22,7 @@ import java.nio.charset.StandardCharsets; import java.util.List; import java.util.Map; import java.util.Properties; +import java.util.Set; import java.util.concurrent.ConcurrentHashMap; import org.apache.commons.lang.Validate; @@ -289,6 +290,10 @@ public class RocketMQSource<OUT> extends RichParallelSourceFunction<OUT> offsetTable, context.getCheckpointId(), context.getCheckpointTimestamp()); } + // remove the unassigned queues in order to avoid read the wrong offset when the source restart + Set<MessageQueue> assignedQueues = consumer.fetchMessageQueuesInBalance(topic); + offsetTable.entrySet().removeIf(item -> !assignedQueues.contains(item.getKey())); + for (Map.Entry<MessageQueue, Long> entry : offsetTable.entrySet()) { unionOffsetStates.add(Tuple2.of(entry.getKey(), entry.getValue())); }
