SteNicholas commented on code in PR #46:
URL: https://github.com/apache/rocketmq-flink/pull/46#discussion_r990724567
##########
src/main/java/org/apache/rocketmq/flink/source/reader/RocketMQPartitionSplitReader.java:
##########
@@ -75,6 +70,8 @@
private final long startTime;
private final long startOffset;
+ private final int pollTime;
Review Comment:
```suggestion
private final long pollTime;
```
##########
src/main/java/org/apache/rocketmq/flink/legacy/RocketMQSourceFunction.java:
##########
@@ -404,8 +380,7 @@ private long getMessageQueueOffset(MessageQueue mq) throws
MQClientException {
private void updateMessageQueueOffset(MessageQueue mq, long offset) throws
MQClientException {
offsetTable.put(mq, offset);
if (!enableCheckpoint) {
- consumer.updateConsumeOffset(mq, offset);
- consumer.getOffsetStore().persist(consumer.queueWithNamespace(mq));
+ consumer.getOffsetStore().updateOffset(mq, offset, false);
Review Comment:
Why not persist the offset? IMO, the offset should be persisted here. cc
@ShannonDing @zhouxinyu
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]