This is an automated email from the ASF dual-hosted git repository. duhengforever pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/rocketmq-flink.git
commit f74cc2709208efa82588701554fb30410fd33469 Author: Jennifer-sarah <[email protected]> AuthorDate: Fri Mar 22 00:48:05 2019 +0800 Optimizing update offset code logic Optimizing update offset code logic --- .../org/apache/rocketmq/flink/RocketMQSource.java | 19 +++++++++++++------ 1 file changed, 13 insertions(+), 6 deletions(-) diff --git a/src/main/java/org/apache/rocketmq/flink/RocketMQSource.java b/src/main/java/org/apache/rocketmq/flink/RocketMQSource.java index 14b8042..f610efe 100644 --- a/src/main/java/org/apache/rocketmq/flink/RocketMQSource.java +++ b/src/main/java/org/apache/rocketmq/flink/RocketMQSource.java @@ -37,6 +37,7 @@ import org.apache.flink.runtime.state.FunctionInitializationContext; import org.apache.flink.runtime.state.FunctionSnapshotContext; import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction; import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction; +import org.apache.flink.streaming.api.operators.StreamingRuntimeContext; import org.apache.rocketmq.client.consumer.DefaultMQPullConsumer; import org.apache.rocketmq.client.consumer.MQPullConsumerScheduleService; import org.apache.rocketmq.client.consumer.PullResult; @@ -85,6 +86,7 @@ public class RocketMQSource<OUT> extends RichParallelSourceFunction<OUT> private static final String OFFSETS_STATE_NAME = "topic-partition-offset-states"; private transient volatile boolean restored; + private transient boolean enableCheckpoint; public RocketMQSource(KeyValueDeserializationSchema<OUT> schema, Properties props) { this.schema = schema; @@ -103,6 +105,8 @@ public class RocketMQSource<OUT> extends RichParallelSourceFunction<OUT> Validate.notEmpty(topic, "Consumer topic can not be empty"); Validate.notEmpty(group, "Consumer group can not be empty"); + this.enableCheckpoint = ((StreamingRuntimeContext) getRuntimeContext()).isCheckpointingEnabled(); + if (offsetTable == null) { offsetTable = new ConcurrentHashMap<>(); } @@ -243,7 +247,9 @@ public class RocketMQSource<OUT> extends RichParallelSourceFunction<OUT> private void putMessageQueueOffset(MessageQueue mq, long offset) throws MQClientException { offsetTable.put(mq, offset); - consumer.updateConsumeOffset(mq, offset); + if (!enableCheckpoint) { + consumer.updateConsumeOffset(mq, offset); + } } @Override @@ -285,14 +291,14 @@ public class RocketMQSource<OUT> extends RichParallelSourceFunction<OUT> unionOffsetStates.clear(); + for (Map.Entry<MessageQueue, Long> entry : offsetTable.entrySet()) { + unionOffsetStates.add(Tuple2.of(entry.getKey(), entry.getValue())); + } + if (LOG.isDebugEnabled()) { LOG.debug("Snapshotted state, last processed offsets: {}, checkpoint id: {}, timestamp: {}", offsetTable, context.getCheckpointId(), context.getCheckpointTimestamp()); } - - for (Map.Entry<MessageQueue, Long> entry : offsetTable.entrySet()) { - unionOffsetStates.add(Tuple2.of(entry.getKey(), entry.getValue())); - } } @Override @@ -330,7 +336,8 @@ public class RocketMQSource<OUT> extends RichParallelSourceFunction<OUT> } @Override - public void notifyCheckpointComplete(long l) throws Exception { + public void notifyCheckpointComplete(long checkpointId) throws Exception { + // consumer.c if (!runningChecker.isRunning()) { LOG.debug("notifyCheckpointComplete() called on closed source; returning null."); return;
