This is an automated email from the ASF dual-hosted git repository.

zhoubo pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/rocketmq-connect.git


The following commit(s) were added to refs/heads/master by this push:
     new 34f60629 [ISSUE #330] fix OFFSET_ILLEGAL error when RmqSourceTask 
pulls messages (#331)
34f60629 is described below

commit 34f6062936ae993fa32215e6a170402f299ca89f
Author: Slideee <[email protected]>
AuthorDate: Wed Sep 28 16:57:57 2022 +0800

    [ISSUE #330] fix OFFSET_ILLEGAL error when RmqSourceTask pulls messages 
(#331)
---
 .../src/main/java/org/apache/rocketmq/replicator/RmqSourceTask.java | 6 ++++++
 1 file changed, 6 insertions(+)

diff --git 
a/connectors/rocketmq-replicator/src/main/java/org/apache/rocketmq/replicator/RmqSourceTask.java
 
b/connectors/rocketmq-replicator/src/main/java/org/apache/rocketmq/replicator/RmqSourceTask.java
index 78d2457e..d74761b7 100644
--- 
a/connectors/rocketmq-replicator/src/main/java/org/apache/rocketmq/replicator/RmqSourceTask.java
+++ 
b/connectors/rocketmq-replicator/src/main/java/org/apache/rocketmq/replicator/RmqSourceTask.java
@@ -146,6 +146,12 @@ public class RmqSourceTask extends SourceTask {
                     PullResult pullResult = consumer.pull(taskTopicConfig, "*",
                         this.mqOffsetMap.get(taskTopicConfig), 32);
                     switch (pullResult.getPullStatus()) {
+                        case OFFSET_ILLEGAL: {
+                            if (this.mqOffsetMap.get(taskTopicConfig) < 
pullResult.getNextBeginOffset()) {
+                                this.mqOffsetMap.put(taskTopicConfig, 
pullResult.getNextBeginOffset());
+                            }
+                            break;
+                        }
                         case FOUND: {
                             this.mqOffsetMap.put(taskTopicConfig, 
pullResult.getNextBeginOffset());
                             List<MessageExt> msgs = 
pullResult.getMsgFoundList();

Reply via email to