This is an automated email from the ASF dual-hosted git repository. duhengforever pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/rocketmq-flink.git
commit 65e30294aaaa25992752853a04fdfa69cd3c0e86 Author: Jennifer-sarah <[email protected]> AuthorDate: Thu Mar 21 13:43:12 2019 +0800 update consumer offset after checkpoint completed update consumer offset after checkpoint completed --- .../org/apache/rocketmq/flink/RocketMQSource.java | 27 ++++++++++++++++------ 1 file changed, 20 insertions(+), 7 deletions(-) diff --git a/src/main/java/org/apache/rocketmq/flink/RocketMQSource.java b/src/main/java/org/apache/rocketmq/flink/RocketMQSource.java index 8e8e57b..14b8042 100644 --- a/src/main/java/org/apache/rocketmq/flink/RocketMQSource.java +++ b/src/main/java/org/apache/rocketmq/flink/RocketMQSource.java @@ -32,6 +32,7 @@ import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.api.java.typeutils.ResultTypeQueryable; import org.apache.flink.configuration.Configuration; +import org.apache.flink.runtime.state.CheckpointListener; import org.apache.flink.runtime.state.FunctionInitializationContext; import org.apache.flink.runtime.state.FunctionSnapshotContext; import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction; @@ -60,7 +61,7 @@ import static org.apache.rocketmq.flink.RocketMQUtils.getLong; * Otherwise, the source doesn't provide any reliability guarantees. */ public class RocketMQSource<OUT> extends RichParallelSourceFunction<OUT> - implements CheckpointedFunction, ResultTypeQueryable<OUT> { + implements CheckpointedFunction, CheckpointListener, ResultTypeQueryable<OUT> { private static final long serialVersionUID = 1L; @@ -126,15 +127,15 @@ public class RocketMQSource<OUT> extends RichParallelSourceFunction<OUT> final Object lock = context.getCheckpointLock(); int delayWhenMessageNotFound = getInteger(props, RocketMQConfig.CONSUMER_DELAY_WHEN_MESSAGE_NOT_FOUND, - RocketMQConfig.DEFAULT_CONSUMER_DELAY_WHEN_MESSAGE_NOT_FOUND); + RocketMQConfig.DEFAULT_CONSUMER_DELAY_WHEN_MESSAGE_NOT_FOUND); String tag = props.getProperty(RocketMQConfig.CONSUMER_TAG, RocketMQConfig.DEFAULT_CONSUMER_TAG); int pullPoolSize = getInteger(props, RocketMQConfig.CONSUMER_PULL_POOL_SIZE, - RocketMQConfig.DEFAULT_CONSUMER_PULL_POOL_SIZE); + RocketMQConfig.DEFAULT_CONSUMER_PULL_POOL_SIZE); int pullBatchSize = getInteger(props, RocketMQConfig.CONSUMER_BATCH_SIZE, - RocketMQConfig.DEFAULT_CONSUMER_BATCH_SIZE); + RocketMQConfig.DEFAULT_CONSUMER_BATCH_SIZE); pullConsumerScheduleService.setPullThreadNums(pullPoolSize); pullConsumerScheduleService.registerPullTaskCallback(topic, new PullTaskCallback() { @@ -229,7 +230,7 @@ public class RocketMQSource<OUT> extends RichParallelSourceFunction<OUT> break; case CONSUMER_OFFSET_TIMESTAMP: offset = consumer.searchOffset(mq, getLong(props, - RocketMQConfig.CONSUMER_OFFSET_FROM_TIMESTAMP, System.currentTimeMillis())); + RocketMQConfig.CONSUMER_OFFSET_FROM_TIMESTAMP, System.currentTimeMillis())); break; default: throw new IllegalArgumentException("Unknown value for CONSUMER_OFFSET_RESET_TO."); @@ -286,7 +287,7 @@ public class RocketMQSource<OUT> extends RichParallelSourceFunction<OUT> if (LOG.isDebugEnabled()) { LOG.debug("Snapshotted state, last processed offsets: {}, checkpoint id: {}, timestamp: {}", - offsetTable, context.getCheckpointId(), context.getCheckpointTimestamp()); + offsetTable, context.getCheckpointId(), context.getCheckpointTimestamp()); } for (Map.Entry<MessageQueue, Long> entry : offsetTable.entrySet()) { @@ -304,7 +305,7 @@ public class RocketMQSource<OUT> extends RichParallelSourceFunction<OUT> LOG.debug("initialize State ..."); this.unionOffsetStates = context.getOperatorStateStore().getUnionListState(new ListStateDescriptor<>( - OFFSETS_STATE_NAME, TypeInformation.of(new TypeHint<Tuple2<MessageQueue, Long>>() { }))); + OFFSETS_STATE_NAME, TypeInformation.of(new TypeHint<Tuple2<MessageQueue, Long>>() { }))); this.restored = context.isRestored(); @@ -327,4 +328,16 @@ public class RocketMQSource<OUT> extends RichParallelSourceFunction<OUT> public TypeInformation<OUT> getProducedType() { return schema.getProducedType(); } + + @Override + public void notifyCheckpointComplete(long l) throws Exception { + if (!runningChecker.isRunning()) { + LOG.debug("notifyCheckpointComplete() called on closed source; returning null."); + return; + } + + for (Map.Entry<MessageQueue, Long> entry : offsetTable.entrySet()) { + consumer.updateConsumeOffset(entry.getKey(), entry.getValue()); + } + } }
