This is an automated email from the ASF dual-hosted git repository.
duhengforever pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/rocketmq-flink.git
The following commit(s) were added to refs/heads/main by this push:
new 4721e3e support 'startMessageOffset' for RocketMQSourceFunction (#51)
4721e3e is described below
commit 4721e3e3ba83585b210d8dde737cbf829beac677
Author: Nicholas Jiang <[email protected]>
AuthorDate: Mon Aug 22 12:32:35 2022 +0800
support 'startMessageOffset' for RocketMQSourceFunction (#51)
---
src/main/java/org/apache/rocketmq/flink/common/RocketMQOptions.java | 6 +++++-
src/main/java/org/apache/rocketmq/flink/legacy/RocketMQConfig.java | 2 ++
.../org/apache/rocketmq/flink/legacy/RocketMQSourceFunction.java | 5 +++--
3 files changed, 10 insertions(+), 3 deletions(-)
diff --git
a/src/main/java/org/apache/rocketmq/flink/common/RocketMQOptions.java
b/src/main/java/org/apache/rocketmq/flink/common/RocketMQOptions.java
index 8655644..dfdef29 100644
--- a/src/main/java/org/apache/rocketmq/flink/common/RocketMQOptions.java
+++ b/src/main/java/org/apache/rocketmq/flink/common/RocketMQOptions.java
@@ -21,6 +21,8 @@ package org.apache.rocketmq.flink.common;
import org.apache.flink.configuration.ConfigOption;
import org.apache.flink.configuration.ConfigOptions;
+import static
org.apache.rocketmq.flink.legacy.RocketMQConfig.DEFAULT_START_MESSAGE_OFFSET;
+
/** Includes config options of RocketMQ connector type. */
public class RocketMQOptions {
@@ -43,7 +45,9 @@ public class RocketMQOptions {
ConfigOptions.key("sql").stringType().noDefaultValue();
public static final ConfigOption<Long> OPTIONAL_START_MESSAGE_OFFSET =
-
ConfigOptions.key("startMessageOffset").longType().defaultValue(-1L);
+ ConfigOptions.key("startMessageOffset")
+ .longType()
+ .defaultValue(DEFAULT_START_MESSAGE_OFFSET);
public static final ConfigOption<Long> OPTIONAL_START_TIME_MILLS =
ConfigOptions.key("startTimeMs").longType().defaultValue(-1L);
diff --git a/src/main/java/org/apache/rocketmq/flink/legacy/RocketMQConfig.java
b/src/main/java/org/apache/rocketmq/flink/legacy/RocketMQConfig.java
index bcd0783..936beb8 100644
--- a/src/main/java/org/apache/rocketmq/flink/legacy/RocketMQConfig.java
+++ b/src/main/java/org/apache/rocketmq/flink/legacy/RocketMQConfig.java
@@ -81,6 +81,8 @@ public class RocketMQConfig {
public static final String CONSUMER_START_MESSAGE_OFFSET =
"consumer.start.message.offset";
+ public static final long DEFAULT_START_MESSAGE_OFFSET = -1;
+
public static final String CONSUMER_BATCH_SIZE = "consumer.batch.size";
public static final int DEFAULT_CONSUMER_BATCH_SIZE = 32;
diff --git
a/src/main/java/org/apache/rocketmq/flink/legacy/RocketMQSourceFunction.java
b/src/main/java/org/apache/rocketmq/flink/legacy/RocketMQSourceFunction.java
index 1d88c69..4ea0d1f 100644
--- a/src/main/java/org/apache/rocketmq/flink/legacy/RocketMQSourceFunction.java
+++ b/src/main/java/org/apache/rocketmq/flink/legacy/RocketMQSourceFunction.java
@@ -80,6 +80,7 @@ import static
org.apache.rocketmq.flink.legacy.RocketMQConfig.CONSUMER_OFFSET_EA
import static
org.apache.rocketmq.flink.legacy.RocketMQConfig.CONSUMER_OFFSET_LATEST;
import static
org.apache.rocketmq.flink.legacy.RocketMQConfig.CONSUMER_OFFSET_TIMESTAMP;
import static
org.apache.rocketmq.flink.legacy.RocketMQConfig.DEFAULT_CONSUMER_BATCH_SIZE;
+import static
org.apache.rocketmq.flink.legacy.RocketMQConfig.DEFAULT_START_MESSAGE_OFFSET;
import static
org.apache.rocketmq.flink.legacy.common.util.RocketMQUtils.getInteger;
import static
org.apache.rocketmq.flink.legacy.common.util.RocketMQUtils.getLong;
@@ -138,7 +139,7 @@ public class RocketMQSourceFunction<OUT> extends
RichParallelSourceFunction<OUT>
props.containsKey(RocketMQConfig.CONSUMER_START_MESSAGE_OFFSET)
? Long.parseLong(
props.getProperty(RocketMQConfig.CONSUMER_START_MESSAGE_OFFSET))
- : -1;
+ : DEFAULT_START_MESSAGE_OFFSET;
Validate.notEmpty(topic, "Consumer topic can not be empty");
Validate.notEmpty(group, "Consumer group can not be empty");
@@ -365,7 +366,7 @@ public class RocketMQSourceFunction<OUT> extends
RichParallelSourceFunction<OUT>
}
// restoredOffsets(unionOffsetStates) is the restored global union
state;
// should only snapshot mqs that actually belong to us
- if (startMessageOffset == -1) {
+ if (startMessageOffset == DEFAULT_START_MESSAGE_OFFSET) {
// fetchConsumeOffset from broker
offset = consumer.fetchConsumeOffset(mq, false);
if (!restored || offset < 0) {