Hi Pulsar community,

Start voting for PIP-150: https://github.com/apache/pulsar/issues/14883

Thanks,
Zixuan

-----

Discussion thread:
https://lists.apache.org/thread/n3drk2g2oy766qnbtx17knvtssy3tdyl

## Motivation

Currently, the Pulsar-client supports setting the `startMessageId` for
Consumer and Reader, and also supports reading the message of
`startMessageId` position.

Assume, we have two message id 1,2,3,4 in the topic:

- When we set `earliest` as `startMessageId` value, we can get the message
of message id 1
- When we set `latest` as `startMessageId` value, we can't get any message

 Sometimes we want to read the message id 4 for the first time, we have
only one approach in client:

```
 Reader<byte[]> reader = pulsarClient.newReader()
                .topic(topicName)
                .subscriptionName(subscriptionName)
                .startMessageId(MessageId.latest)
                .startMessageIdInclusive()
                .create();

reader.hasMessageAvailable();
Message<byte[]> msg = reader.readNext(1, TimeUnit.SECONDS);
```

Call `reader.hasMessageAvailable()` before `reader.readNext()` can get the
correct message id 4, which include seek action when the
`startMessageIdInclusive()` is enabled.

This approach is confusing.   If we do this on the broker side, it will
make things easier.

## Goal

This PIP proposes support for reading the message of `startMessageId`
position on the broker side:

- Add to `Consumer`
- Add to `Reader`

## Implementation

### Protocol

Add a `start_message_id_inclusive` field to `CommandSubscribe` for
determine whether to read the message of `startMessageId` position:

```
message CommandSubscribe {
    // some fields

    // If specified, the subscription will read the message from the start
message id position.
    optional bool start_message_id_inclusive = 20 [default = false];
}
```

### ManagedCursorImpl

Add a check in
`org.apache.bookkeeper.mledger.impl.ManagedCursorImpl#initializeCursorPosition`.


We only need to care that the `startMessageId` is `MessageId.latest` and
the`start_message_id_inclusive` is `true`, we get latest position from
ledger as `readPosition` value, otherwise if
the`start_message_id_inclusive` is `false`,  get next position of the
latest position as `readPosition` value.

### Client

The `Consumer` and `Reader` support setting the
`start_message_id_inclusive` value to `CommandSubscribe` command.

### Compatibility

This feature can have both backward and forward compatibility, this means
the users can use any client to request any broker.

Notice that the users still can read the message of the latest position by
call `reader.hasMessageAvailable()` before `reader.readNext()`, but this
call can be ignored when using the new client and the new broker.

Reply via email to