syhily commented on code in PR #19972:
URL: https://github.com/apache/flink/pull/19972#discussion_r921757700
##########
flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/enumerator/cursor/start/MessageIdStartCursor.java:
##########
@@ -43,23 +44,16 @@ public class MessageIdStartCursor implements StartCursor {
* code</a> for understanding pulsar internal logic.
*
* @param messageId The message id for start position.
- * @param inclusive Should we include the start message id in consuming
result.
+ * @param inclusive Whether we include the start message id in consuming
result. This works only
+ * if we provide a specified message id instead of {@link
MessageId#earliest} or {@link
+ * MessageId#latest}.
*/
public MessageIdStartCursor(MessageId messageId, boolean inclusive) {
- if (inclusive) {
- this.messageId = messageId;
+ MessageIdImpl idImpl = unwrapMessageId(messageId);
+ if (MessageId.earliest.equals(idImpl) ||
MessageId.latest.equals(idImpl) || inclusive) {
+ this.messageId = idImpl;
} else {
- checkState(
- messageId instanceof MessageIdImpl,
- "We only support normal message id and batch message id.");
- MessageIdImpl id = (MessageIdImpl) messageId;
- if (MessageId.earliest.equals(messageId) ||
MessageId.latest.equals(messageId)) {
- this.messageId = messageId;
- } else {
- this.messageId =
- new MessageIdImpl(
- id.getLedgerId(), id.getEntryId() + 1,
id.getPartitionIndex());
- }
+ this.messageId = nextMessageId(idImpl);
Review Comment:
Yep. We just reuse the same logic here in favor of DRY.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]