This is an automated email from the ASF dual-hosted git repository.
zhoubo pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/rocketmq-connect.git
The following commit(s) were added to refs/heads/master by this push:
new 34f60629 [ISSUE #330] fix OFFSET_ILLEGAL error when RmqSourceTask
pulls messages (#331)
34f60629 is described below
commit 34f6062936ae993fa32215e6a170402f299ca89f
Author: Slideee <[email protected]>
AuthorDate: Wed Sep 28 16:57:57 2022 +0800
[ISSUE #330] fix OFFSET_ILLEGAL error when RmqSourceTask pulls messages
(#331)
---
.../src/main/java/org/apache/rocketmq/replicator/RmqSourceTask.java | 6 ++++++
1 file changed, 6 insertions(+)
diff --git
a/connectors/rocketmq-replicator/src/main/java/org/apache/rocketmq/replicator/RmqSourceTask.java
b/connectors/rocketmq-replicator/src/main/java/org/apache/rocketmq/replicator/RmqSourceTask.java
index 78d2457e..d74761b7 100644
---
a/connectors/rocketmq-replicator/src/main/java/org/apache/rocketmq/replicator/RmqSourceTask.java
+++
b/connectors/rocketmq-replicator/src/main/java/org/apache/rocketmq/replicator/RmqSourceTask.java
@@ -146,6 +146,12 @@ public class RmqSourceTask extends SourceTask {
PullResult pullResult = consumer.pull(taskTopicConfig, "*",
this.mqOffsetMap.get(taskTopicConfig), 32);
switch (pullResult.getPullStatus()) {
+ case OFFSET_ILLEGAL: {
+ if (this.mqOffsetMap.get(taskTopicConfig) <
pullResult.getNextBeginOffset()) {
+ this.mqOffsetMap.put(taskTopicConfig,
pullResult.getNextBeginOffset());
+ }
+ break;
+ }
case FOUND: {
this.mqOffsetMap.put(taskTopicConfig,
pullResult.getNextBeginOffset());
List<MessageExt> msgs =
pullResult.getMsgFoundList();