This is an automated email from the ASF dual-hosted git repository.

duhengforever pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/rocketmq-flink.git


The following commit(s) were added to refs/heads/main by this push:
     new 4721e3e  support 'startMessageOffset' for RocketMQSourceFunction (#51)
4721e3e is described below

commit 4721e3e3ba83585b210d8dde737cbf829beac677
Author: Nicholas Jiang <[email protected]>
AuthorDate: Mon Aug 22 12:32:35 2022 +0800

    support 'startMessageOffset' for RocketMQSourceFunction (#51)
---
 src/main/java/org/apache/rocketmq/flink/common/RocketMQOptions.java | 6 +++++-
 src/main/java/org/apache/rocketmq/flink/legacy/RocketMQConfig.java  | 2 ++
 .../org/apache/rocketmq/flink/legacy/RocketMQSourceFunction.java    | 5 +++--
 3 files changed, 10 insertions(+), 3 deletions(-)

diff --git 
a/src/main/java/org/apache/rocketmq/flink/common/RocketMQOptions.java 
b/src/main/java/org/apache/rocketmq/flink/common/RocketMQOptions.java
index 8655644..dfdef29 100644
--- a/src/main/java/org/apache/rocketmq/flink/common/RocketMQOptions.java
+++ b/src/main/java/org/apache/rocketmq/flink/common/RocketMQOptions.java
@@ -21,6 +21,8 @@ package org.apache.rocketmq.flink.common;
 import org.apache.flink.configuration.ConfigOption;
 import org.apache.flink.configuration.ConfigOptions;
 
+import static 
org.apache.rocketmq.flink.legacy.RocketMQConfig.DEFAULT_START_MESSAGE_OFFSET;
+
 /** Includes config options of RocketMQ connector type. */
 public class RocketMQOptions {
 
@@ -43,7 +45,9 @@ public class RocketMQOptions {
             ConfigOptions.key("sql").stringType().noDefaultValue();
 
     public static final ConfigOption<Long> OPTIONAL_START_MESSAGE_OFFSET =
-            
ConfigOptions.key("startMessageOffset").longType().defaultValue(-1L);
+            ConfigOptions.key("startMessageOffset")
+                    .longType()
+                    .defaultValue(DEFAULT_START_MESSAGE_OFFSET);
 
     public static final ConfigOption<Long> OPTIONAL_START_TIME_MILLS =
             ConfigOptions.key("startTimeMs").longType().defaultValue(-1L);
diff --git a/src/main/java/org/apache/rocketmq/flink/legacy/RocketMQConfig.java 
b/src/main/java/org/apache/rocketmq/flink/legacy/RocketMQConfig.java
index bcd0783..936beb8 100644
--- a/src/main/java/org/apache/rocketmq/flink/legacy/RocketMQConfig.java
+++ b/src/main/java/org/apache/rocketmq/flink/legacy/RocketMQConfig.java
@@ -81,6 +81,8 @@ public class RocketMQConfig {
 
     public static final String CONSUMER_START_MESSAGE_OFFSET = 
"consumer.start.message.offset";
 
+    public static final long DEFAULT_START_MESSAGE_OFFSET = -1;
+
     public static final String CONSUMER_BATCH_SIZE = "consumer.batch.size";
     public static final int DEFAULT_CONSUMER_BATCH_SIZE = 32;
 
diff --git 
a/src/main/java/org/apache/rocketmq/flink/legacy/RocketMQSourceFunction.java 
b/src/main/java/org/apache/rocketmq/flink/legacy/RocketMQSourceFunction.java
index 1d88c69..4ea0d1f 100644
--- a/src/main/java/org/apache/rocketmq/flink/legacy/RocketMQSourceFunction.java
+++ b/src/main/java/org/apache/rocketmq/flink/legacy/RocketMQSourceFunction.java
@@ -80,6 +80,7 @@ import static 
org.apache.rocketmq.flink.legacy.RocketMQConfig.CONSUMER_OFFSET_EA
 import static 
org.apache.rocketmq.flink.legacy.RocketMQConfig.CONSUMER_OFFSET_LATEST;
 import static 
org.apache.rocketmq.flink.legacy.RocketMQConfig.CONSUMER_OFFSET_TIMESTAMP;
 import static 
org.apache.rocketmq.flink.legacy.RocketMQConfig.DEFAULT_CONSUMER_BATCH_SIZE;
+import static 
org.apache.rocketmq.flink.legacy.RocketMQConfig.DEFAULT_START_MESSAGE_OFFSET;
 import static 
org.apache.rocketmq.flink.legacy.common.util.RocketMQUtils.getInteger;
 import static 
org.apache.rocketmq.flink.legacy.common.util.RocketMQUtils.getLong;
 
@@ -138,7 +139,7 @@ public class RocketMQSourceFunction<OUT> extends 
RichParallelSourceFunction<OUT>
                 props.containsKey(RocketMQConfig.CONSUMER_START_MESSAGE_OFFSET)
                         ? Long.parseLong(
                                 
props.getProperty(RocketMQConfig.CONSUMER_START_MESSAGE_OFFSET))
-                        : -1;
+                        : DEFAULT_START_MESSAGE_OFFSET;
 
         Validate.notEmpty(topic, "Consumer topic can not be empty");
         Validate.notEmpty(group, "Consumer group can not be empty");
@@ -365,7 +366,7 @@ public class RocketMQSourceFunction<OUT> extends 
RichParallelSourceFunction<OUT>
         }
         // restoredOffsets(unionOffsetStates) is the restored global union 
state;
         // should only snapshot mqs that actually belong to us
-        if (startMessageOffset == -1) {
+        if (startMessageOffset == DEFAULT_START_MESSAGE_OFFSET) {
             // fetchConsumeOffset from broker
             offset = consumer.fetchConsumeOffset(mq, false);
             if (!restored || offset < 0) {

Reply via email to