This is an automated email from the ASF dual-hosted git repository. fpaul pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
commit b4b9f06375e28a5107d4b0108e7478c574bf05cd Author: Yufei Zhang <[email protected]> AuthorDate: Tue Dec 7 20:41:36 2021 +0800 [FLINK-25044][pulsar][test]: fix the messsageId overflow when set to latest --- .../source/enumerator/cursor/start/MessageIdStartCursor.java | 10 +++++++--- 1 file changed, 7 insertions(+), 3 deletions(-) diff --git a/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/enumerator/cursor/start/MessageIdStartCursor.java b/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/enumerator/cursor/start/MessageIdStartCursor.java index 0185bb3..f807960 100644 --- a/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/enumerator/cursor/start/MessageIdStartCursor.java +++ b/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/enumerator/cursor/start/MessageIdStartCursor.java @@ -53,9 +53,13 @@ public class MessageIdStartCursor implements StartCursor { messageId instanceof MessageIdImpl, "We only support normal message id and batch message id."); MessageIdImpl id = (MessageIdImpl) messageId; - this.messageId = - new MessageIdImpl( - id.getLedgerId(), id.getEntryId() + 1, id.getPartitionIndex()); + if (MessageId.earliest.equals(messageId) || MessageId.latest.equals(messageId)) { + this.messageId = messageId; + } else { + this.messageId = + new MessageIdImpl( + id.getLedgerId(), id.getEntryId() + 1, id.getPartitionIndex()); + } } }
