This is an automated email from the ASF dual-hosted git repository.

duhengforever pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/rocketmq-flink.git


The following commit(s) were added to refs/heads/main by this push:
     new 8fd83d7  support 'startMessageOffset' for RocketMQSourceFunction (#52)
8fd83d7 is described below

commit 8fd83d71bfd0ca55a2042b4484149f2553ee0ee0
Author: Nicholas Jiang <[email protected]>
AuthorDate: Mon Aug 22 13:10:50 2022 +0800

    support 'startMessageOffset' for RocketMQSourceFunction (#52)
---
 .../java/org/apache/rocketmq/flink/legacy/RocketMQSourceFunction.java   | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)

diff --git 
a/src/main/java/org/apache/rocketmq/flink/legacy/RocketMQSourceFunction.java 
b/src/main/java/org/apache/rocketmq/flink/legacy/RocketMQSourceFunction.java
index 4ea0d1f..db2ab15 100644
--- a/src/main/java/org/apache/rocketmq/flink/legacy/RocketMQSourceFunction.java
+++ b/src/main/java/org/apache/rocketmq/flink/legacy/RocketMQSourceFunction.java
@@ -369,7 +369,7 @@ public class RocketMQSourceFunction<OUT> extends 
RichParallelSourceFunction<OUT>
         if (startMessageOffset == DEFAULT_START_MESSAGE_OFFSET) {
             // fetchConsumeOffset from broker
             offset = consumer.fetchConsumeOffset(mq, false);
-            if (!restored || offset < 0) {
+            if (!restored && offset < 0) {
                 String initialOffset =
                         props.getProperty(
                                 RocketMQConfig.CONSUMER_OFFSET_RESET_TO, 
CONSUMER_OFFSET_LATEST);

Reply via email to