This is an automated email from the ASF dual-hosted git repository.
duhengforever pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/rocketmq-flink.git
The following commit(s) were added to refs/heads/main by this push:
new 8fd83d7 support 'startMessageOffset' for RocketMQSourceFunction (#52)
8fd83d7 is described below
commit 8fd83d71bfd0ca55a2042b4484149f2553ee0ee0
Author: Nicholas Jiang <[email protected]>
AuthorDate: Mon Aug 22 13:10:50 2022 +0800
support 'startMessageOffset' for RocketMQSourceFunction (#52)
---
.../java/org/apache/rocketmq/flink/legacy/RocketMQSourceFunction.java | 2 +-
1 file changed, 1 insertion(+), 1 deletion(-)
diff --git
a/src/main/java/org/apache/rocketmq/flink/legacy/RocketMQSourceFunction.java
b/src/main/java/org/apache/rocketmq/flink/legacy/RocketMQSourceFunction.java
index 4ea0d1f..db2ab15 100644
--- a/src/main/java/org/apache/rocketmq/flink/legacy/RocketMQSourceFunction.java
+++ b/src/main/java/org/apache/rocketmq/flink/legacy/RocketMQSourceFunction.java
@@ -369,7 +369,7 @@ public class RocketMQSourceFunction<OUT> extends
RichParallelSourceFunction<OUT>
if (startMessageOffset == DEFAULT_START_MESSAGE_OFFSET) {
// fetchConsumeOffset from broker
offset = consumer.fetchConsumeOffset(mq, false);
- if (!restored || offset < 0) {
+ if (!restored && offset < 0) {
String initialOffset =
props.getProperty(
RocketMQConfig.CONSUMER_OFFSET_RESET_TO,
CONSUMER_OFFSET_LATEST);