This way is right. PengHui Li <peng...@apache.org> 于2022年4月11日周一 09:12写道:
> > regarding the compatibility, should we check the protocol version at the > client side? > The old version version doesn't support `start_message_id_inclusive` which > means the > client side still needs to do the seek operation while requesting an old > version broker. > > Oh sorry, my fault. If the broker side does not support > `start_message_id_inclusive`, users > need to call `reader.hasMessageAvailable()` first. > > +1 for the proposal. > > Penghui > > On Mon, Apr 11, 2022 at 9:10 AM PengHui Li <peng...@apache.org> wrote: > > > Hi zixuan, > > > > The proposal looks good, > > regarding the compatibility, should we check the protocol version at the > > client side? > > The old version version doesn't support `start_message_id_inclusive` > which > > means the > > client side still needs to do the seek operation while requesting an old > > version broker. > > > > > Notice that the users still can read the message of the latest position > > by > > call `reader.hasMessageAvailable()` before `reader.readNext()`, but this > > call can be ignored when using the new client and the new broker. > > > > For this part, I don't think it should be a notice, We do not > deliberately > > modify current behavior, > > just fix the problem if users don't want to call > > reader.hasMessageAvailable() because they know > > topic has at least one message. Add notice here, it looks like after this > > proposal, we are not > > suggesting to use hasMessageAvailable() any more. > > > > Penghui > > > > On Wed, Apr 6, 2022 at 4:54 PM Zixuan Liu <node...@gmail.com> wrote: > > > >> Hi Pulsar community, > >> > >> Start voting for PIP-150: https://github.com/apache/pulsar/issues/14883 > >> > >> Thanks, > >> Zixuan > >> > >> ----- > >> > >> Discussion thread: > >> https://lists.apache.org/thread/n3drk2g2oy766qnbtx17knvtssy3tdyl > >> > >> ## Motivation > >> > >> Currently, the Pulsar-client supports setting the `startMessageId` for > >> Consumer and Reader, and also supports reading the message of > >> `startMessageId` position. > >> > >> Assume, we have two message id 1,2,3,4 in the topic: > >> > >> - When we set `earliest` as `startMessageId` value, we can get the > message > >> of message id 1 > >> - When we set `latest` as `startMessageId` value, we can't get any > message > >> > >> Sometimes we want to read the message id 4 for the first time, we have > >> only one approach in client: > >> > >> ``` > >> Reader<byte[]> reader = pulsarClient.newReader() > >> .topic(topicName) > >> .subscriptionName(subscriptionName) > >> .startMessageId(MessageId.latest) > >> .startMessageIdInclusive() > >> .create(); > >> > >> reader.hasMessageAvailable(); > >> Message<byte[]> msg = reader.readNext(1, TimeUnit.SECONDS); > >> ``` > >> > >> Call `reader.hasMessageAvailable()` before `reader.readNext()` can get > the > >> correct message id 4, which include seek action when the > >> `startMessageIdInclusive()` is enabled. > >> > >> This approach is confusing. If we do this on the broker side, it will > >> make things easier. > >> > >> ## Goal > >> > >> This PIP proposes support for reading the message of `startMessageId` > >> position on the broker side: > >> > >> - Add to `Consumer` > >> - Add to `Reader` > >> > >> ## Implementation > >> > >> ### Protocol > >> > >> Add a `start_message_id_inclusive` field to `CommandSubscribe` for > >> determine whether to read the message of `startMessageId` position: > >> > >> ``` > >> message CommandSubscribe { > >> // some fields > >> > >> // If specified, the subscription will read the message from the > start > >> message id position. > >> optional bool start_message_id_inclusive = 20 [default = false]; > >> } > >> ``` > >> > >> ### ManagedCursorImpl > >> > >> Add a check in > >> > >> > `org.apache.bookkeeper.mledger.impl.ManagedCursorImpl#initializeCursorPosition`. > >> > >> > >> We only need to care that the `startMessageId` is `MessageId.latest` and > >> the`start_message_id_inclusive` is `true`, we get latest position from > >> ledger as `readPosition` value, otherwise if > >> the`start_message_id_inclusive` is `false`, get next position of the > >> latest position as `readPosition` value. > >> > >> ### Client > >> > >> The `Consumer` and `Reader` support setting the > >> `start_message_id_inclusive` value to `CommandSubscribe` command. > >> > >> ### Compatibility > >> > >> This feature can have both backward and forward compatibility, this > means > >> the users can use any client to request any broker. > >> > >> Notice that the users still can read the message of the latest position > by > >> call `reader.hasMessageAvailable()` before `reader.readNext()`, but this > >> call can be ignored when using the new client and the new broker. > >> > > >