Hi Pulsar community, Start voting for PIP-150: https://github.com/apache/pulsar/issues/14883
Thanks, Zixuan ----- Discussion thread: https://lists.apache.org/thread/n3drk2g2oy766qnbtx17knvtssy3tdyl ## Motivation Currently, the Pulsar-client supports setting the `startMessageId` for Consumer and Reader, and also supports reading the message of `startMessageId` position. Assume, we have two message id 1,2,3,4 in the topic: - When we set `earliest` as `startMessageId` value, we can get the message of message id 1 - When we set `latest` as `startMessageId` value, we can't get any message Sometimes we want to read the message id 4 for the first time, we have only one approach in client: ``` Reader<byte[]> reader = pulsarClient.newReader() .topic(topicName) .subscriptionName(subscriptionName) .startMessageId(MessageId.latest) .startMessageIdInclusive() .create(); reader.hasMessageAvailable(); Message<byte[]> msg = reader.readNext(1, TimeUnit.SECONDS); ``` Call `reader.hasMessageAvailable()` before `reader.readNext()` can get the correct message id 4, which include seek action when the `startMessageIdInclusive()` is enabled. This approach is confusing. If we do this on the broker side, it will make things easier. ## Goal This PIP proposes support for reading the message of `startMessageId` position on the broker side: - Add to `Consumer` - Add to `Reader` ## Implementation ### Protocol Add a `start_message_id_inclusive` field to `CommandSubscribe` for determine whether to read the message of `startMessageId` position: ``` message CommandSubscribe { // some fields // If specified, the subscription will read the message from the start message id position. optional bool start_message_id_inclusive = 20 [default = false]; } ``` ### ManagedCursorImpl Add a check in `org.apache.bookkeeper.mledger.impl.ManagedCursorImpl#initializeCursorPosition`. We only need to care that the `startMessageId` is `MessageId.latest` and the`start_message_id_inclusive` is `true`, we get latest position from ledger as `readPosition` value, otherwise if the`start_message_id_inclusive` is `false`, get next position of the latest position as `readPosition` value. ### Client The `Consumer` and `Reader` support setting the `start_message_id_inclusive` value to `CommandSubscribe` command. ### Compatibility This feature can have both backward and forward compatibility, this means the users can use any client to request any broker. Notice that the users still can read the message of the latest position by call `reader.hasMessageAvailable()` before `reader.readNext()`, but this call can be ignored when using the new client and the new broker.