This is an automated email from the ASF dual-hosted git repository. duhengforever pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/rocketmq-flink.git
commit 8531921af25bd94a32f6fa149f3147fe8ad370be Author: Jennifer-sarah <[email protected]> AuthorDate: Fri Mar 22 08:23:08 2019 +0800 fix concurrent checkpoint bug fix concurrent checkpoint bug --- .../org/apache/rocketmq/flink/RocketMQSource.java | 39 ++++++++++++++++++++-- 1 file changed, 36 insertions(+), 3 deletions(-) diff --git a/src/main/java/org/apache/rocketmq/flink/RocketMQSource.java b/src/main/java/org/apache/rocketmq/flink/RocketMQSource.java index 5b76e54..b6e68f8 100644 --- a/src/main/java/org/apache/rocketmq/flink/RocketMQSource.java +++ b/src/main/java/org/apache/rocketmq/flink/RocketMQSource.java @@ -19,11 +19,13 @@ package org.apache.rocketmq.flink; import java.nio.charset.StandardCharsets; +import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Properties; import java.util.concurrent.ConcurrentHashMap; +import org.apache.commons.collections.map.LinkedMap; import org.apache.commons.lang.Validate; import org.apache.flink.api.common.state.ListState; import org.apache.flink.api.common.state.ListStateDescriptor; @@ -43,6 +45,7 @@ import org.apache.rocketmq.client.consumer.MQPullConsumerScheduleService; import org.apache.rocketmq.client.consumer.PullResult; import org.apache.rocketmq.client.consumer.PullTaskCallback; import org.apache.rocketmq.client.consumer.PullTaskContext; +import org.apache.rocketmq.client.consumer.store.OffsetStore; import org.apache.rocketmq.client.exception.MQClientException; import org.apache.rocketmq.common.message.MessageExt; import org.apache.rocketmq.common.message.MessageQueue; @@ -78,6 +81,8 @@ public class RocketMQSource<OUT> extends RichParallelSourceFunction<OUT> private transient ListState<Tuple2<MessageQueue, Long>> unionOffsetStates; private Map<MessageQueue, Long> offsetTable; private Map<MessageQueue, Long> restoredOffsets; + /** Data for pending but uncommitted offsets. */ + private LinkedMap pendingOffsetsToCommit; private Properties props; private String topic; @@ -113,6 +118,9 @@ public class RocketMQSource<OUT> extends RichParallelSourceFunction<OUT> if (restoredOffsets == null) { restoredOffsets = new ConcurrentHashMap<>(); } + if (pendingOffsetsToCommit == null) { + pendingOffsetsToCommit = new LinkedMap(); + } runningChecker = new RunningChecker(); @@ -263,6 +271,7 @@ public class RocketMQSource<OUT> extends RichParallelSourceFunction<OUT> offsetTable.clear(); restoredOffsets.clear(); + pendingOffsetsToCommit.clear(); } @Override @@ -291,13 +300,18 @@ public class RocketMQSource<OUT> extends RichParallelSourceFunction<OUT> unionOffsetStates.clear(); + HashMap<MessageQueue, Long> currentOffsets = new HashMap<>(offsetTable.size()); + for (Map.Entry<MessageQueue, Long> entry : offsetTable.entrySet()) { unionOffsetStates.add(Tuple2.of(entry.getKey(), entry.getValue())); + currentOffsets.put(entry.getKey(), entry.getValue()); } + pendingOffsetsToCommit.put(context.getCheckpointId(), currentOffsets); + if (LOG.isDebugEnabled()) { LOG.debug("Snapshotted state, last processed offsets: {}, checkpoint id: {}, timestamp: {}", - offsetTable, context.getCheckpointId(), context.getCheckpointTimestamp()); + offsetTable, context.getCheckpointId(), context.getCheckpointTimestamp()); } } @@ -337,14 +351,33 @@ public class RocketMQSource<OUT> extends RichParallelSourceFunction<OUT> @Override public void notifyCheckpointComplete(long checkpointId) throws Exception { - // callback when checkpoint complete + // callback when checkpoint complete if (!runningChecker.isRunning()) { LOG.debug("notifyCheckpointComplete() called on closed source; returning null."); return; } - for (Map.Entry<MessageQueue, Long> entry : offsetTable.entrySet()) { + final int posInMap = pendingOffsetsToCommit.indexOf(checkpointId); + if (posInMap == -1) { + LOG.warn("Received confirmation for unknown checkpoint id {}", checkpointId); + return; + } + + Map<MessageQueue, Long> offsets = (Map<MessageQueue, Long>)pendingOffsetsToCommit.remove(posInMap); + + // remove older checkpoints in map + for (int i = 0; i < posInMap; i++) { + pendingOffsetsToCommit.remove(0); + } + + if (offsets == null || offsets.size() == 0) { + LOG.debug("Checkpoint state was empty."); + return; + } + + for (Map.Entry<MessageQueue, Long> entry : offsets.entrySet()) { consumer.updateConsumeOffset(entry.getKey(), entry.getValue()); } + } }
