This is an automated email from the ASF dual-hosted git repository. duhengforever pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/rocketmq-flink.git
commit da92a64a12f4de3b352f29182a4565cc4ac23ae4 Author: JerryTaoTao <[email protected]> AuthorDate: Thu Jan 21 19:19:19 2021 +0800 [rocketmq-connector-flink] rebalance cause offset rollback to long time ago (#672) Co-authored-by: hzyuemeng1 <[email protected]> --- .../org/apache/rocketmq/flink/RocketMQSource.java | 20 ++++++++++++++++---- 1 file changed, 16 insertions(+), 4 deletions(-) diff --git a/src/main/java/org/apache/rocketmq/flink/RocketMQSource.java b/src/main/java/org/apache/rocketmq/flink/RocketMQSource.java index 35c5122..72783a8 100644 --- a/src/main/java/org/apache/rocketmq/flink/RocketMQSource.java +++ b/src/main/java/org/apache/rocketmq/flink/RocketMQSource.java @@ -24,6 +24,7 @@ import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.api.java.typeutils.ResultTypeQueryable; import org.apache.flink.configuration.Configuration; import org.apache.flink.metrics.Counter; +import org.apache.flink.util.Preconditions; import org.apache.flink.metrics.Meter; import org.apache.flink.metrics.MeterView; import org.apache.flink.metrics.SimpleCounter; @@ -120,6 +121,10 @@ public class RocketMQSource<OUT> extends RichParallelSourceFunction<OUT> if (restoredOffsets == null) { restoredOffsets = new ConcurrentHashMap<>(); } + + //use restoredOffsets to init offset table. + initOffsetTableFromRestoredOffsets(); + if (pendingOffsetsToCommit == null) { pendingOffsetsToCommit = new LinkedMap(); } @@ -252,13 +257,10 @@ public class RocketMQSource<OUT> extends RichParallelSourceFunction<OUT> Long offset = offsetTable.get(mq); // restoredOffsets(unionOffsetStates) is the restored global union state; // should only snapshot mqs that actually belong to us - if (restored && offset == null) { - offset = restoredOffsets.get(mq); - } if (offset == null) { // fetchConsumeOffset from broker offset = consumer.fetchConsumeOffset(mq, false); - if (offset < 0) { + if (!restored || offset < 0) { String initialOffset = props.getProperty(RocketMQConfig.CONSUMER_OFFSET_RESET_TO, CONSUMER_OFFSET_LATEST); switch (initialOffset) { case CONSUMER_OFFSET_EARLIEST: @@ -318,6 +320,16 @@ public class RocketMQSource<OUT> extends RichParallelSourceFunction<OUT> } } + public void initOffsetTableFromRestoredOffsets() { + Preconditions.checkNotNull(restoredOffsets, "restoredOffsets can't be null"); + restoredOffsets.forEach((mq, offset) -> { + if (!offsetTable.containsKey(mq) || offsetTable.get(mq) < offset) { + offsetTable.put(mq, offset); + } + }); + log.info("init offset table from restoredOffsets successful.", offsetTable); + } + @Override public void snapshotState(FunctionSnapshotContext context) throws Exception { // called when a snapshot for a checkpoint is requested
