This is an automated email from the ASF dual-hosted git repository. duhengforever pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/rocketmq-flink.git
commit c6cb79d2536991367456086228a83a5518c19e02 Merge: 0be1ed7 8531921 Author: Xin Wang <[email protected]> AuthorDate: Sun Jul 7 19:37:50 2019 +0800 Merge pull request #229 from Jennifer-sarah/master update consumer offset after checkpoint completed .../org/apache/rocketmq/flink/RocketMQSource.java | 65 ++++++++++++++++++++-- 1 file changed, 59 insertions(+), 6 deletions(-) diff --cc src/main/java/org/apache/rocketmq/flink/RocketMQSource.java index 18277e0,b6e68f8..06eecfb --- a/src/main/java/org/apache/rocketmq/flink/RocketMQSource.java +++ b/src/main/java/org/apache/rocketmq/flink/RocketMQSource.java @@@ -22,10 -23,9 +23,11 @@@ import java.util.HashMap import java.util.List; import java.util.Map; import java.util.Properties; +import java.util.Set; +import java.util.UUID; import java.util.concurrent.ConcurrentHashMap; + import org.apache.commons.collections.map.LinkedMap; import org.apache.commons.lang.Validate; import org.apache.flink.api.common.state.ListState; import org.apache.flink.api.common.state.ListStateDescriptor; @@@ -286,17 -300,18 +302,22 @@@ public class RocketMQSource<OUT> extend unionOffsetStates.clear(); - if (LOG.isDebugEnabled()) { - LOG.debug("Snapshotted state, last processed offsets: {}, checkpoint id: {}, timestamp: {}", - offsetTable, context.getCheckpointId(), context.getCheckpointTimestamp()); - } + HashMap<MessageQueue, Long> currentOffsets = new HashMap<>(offsetTable.size()); + // remove the unassigned queues in order to avoid read the wrong offset when the source restart + Set<MessageQueue> assignedQueues = consumer.fetchMessageQueuesInBalance(topic); + offsetTable.entrySet().removeIf(item -> !assignedQueues.contains(item.getKey())); + for (Map.Entry<MessageQueue, Long> entry : offsetTable.entrySet()) { unionOffsetStates.add(Tuple2.of(entry.getKey(), entry.getValue())); + currentOffsets.put(entry.getKey(), entry.getValue()); + } + + pendingOffsetsToCommit.put(context.getCheckpointId(), currentOffsets); + + if (LOG.isDebugEnabled()) { + LOG.debug("Snapshotted state, last processed offsets: {}, checkpoint id: {}, timestamp: {}", + offsetTable, context.getCheckpointId(), context.getCheckpointTimestamp()); } }
