This is an automated email from the ASF dual-hosted git repository.
wanghailin pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git
The following commit(s) were added to refs/heads/dev by this push:
new cd44b5894e [Fix] [Connector] Rocketmq source startOffset greater than
endOffset error (#6287)
cd44b5894e is described below
commit cd44b5894e962892e991b2dcf8facddb09b56ac4
Author: 王渔 <[email protected]>
AuthorDate: Mon Jan 29 13:40:16 2024 +0800
[Fix] [Connector] Rocketmq source startOffset greater than endOffset error
(#6287)
---
.../seatunnel/rocketmq/source/RocketMqSourceSplitEnumerator.java | 5 ++++-
1 file changed, 4 insertions(+), 1 deletion(-)
diff --git
a/seatunnel-connectors-v2/connector-rocketmq/src/main/java/org/apache/seatunnel/connectors/seatunnel/rocketmq/source/RocketMqSourceSplitEnumerator.java
b/seatunnel-connectors-v2/connector-rocketmq/src/main/java/org/apache/seatunnel/connectors/seatunnel/rocketmq/source/RocketMqSourceSplitEnumerator.java
index ce841a4bf0..6630d495f9 100644
---
a/seatunnel-connectors-v2/connector-rocketmq/src/main/java/org/apache/seatunnel/connectors/seatunnel/rocketmq/source/RocketMqSourceSplitEnumerator.java
+++
b/seatunnel-connectors-v2/connector-rocketmq/src/main/java/org/apache/seatunnel/connectors/seatunnel/rocketmq/source/RocketMqSourceSplitEnumerator.java
@@ -149,7 +149,10 @@ public class RocketMqSourceSplitEnumerator
ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET);
splits.forEach(
split -> {
- split.setStartOffset(split.getEndOffset() + 1);
+ split.setStartOffset(
+ Math.min(
+ split.getEndOffset() + 1,
+
listOffsets.get(split.getMessageQueue())));
split.setEndOffset(listOffsets.get(split.getMessageQueue()));
});
return splits.stream()