aryemazouz opened a new issue #9301:
URL: https://github.com/apache/pulsar/issues/9301
Currently in ReaderConfigurationData the API allow to ‘setStartMessageId’
only from single message ID and this apply to all consumers in the
MultiTopicsReaderImpl.
Is it possible to add start message per partition / topic.
Today if I want to avoid loosing data I have to give MessageId.earliest and
after that to execute the seek (as my code does).
But the annoying issue is that after I create the reader it start accept
messages from Pulsar and till the seek is actually executed I consume events, I
can avoid handling them because I keep tracking the cursor per topic /
partition but I still need to consume them (to ensure exactly once).
It will be nicer if I could give the above map between topic and the start
message ID to avoid it (and if I don’t have the last MessageId in my map it
will be MessageId.earliest).
The best way is to give Function<String, MessageId> to reader configuration
between Consumer.getTopic and the start message ID.
- I am using key hash feature (pulsar broker / client version 2.7.0).
- It will also be helpful add add support for multiple topics (by pattern /
list) to the MultiTopicsReaderImpl
`/**
* Create reader
*/
private Reader<byte[]> createReader() throws PulsarClientException {
ReaderConfigurationData<byte[]> cloned = ...;
cloned.setStartMessageId(MessageId.earliest);
cloned.setReaderName(...);
cloned.setKeyHashRanges(Arrays.asList(new Range(myRange.getStart(),
myRange.getEnd()))); //Using key hash feature
Reader<byte[]> reader =
client.getShared().createReaderAsync(cloned).join();
seek(reader);
return reader;
}
/**
* Execute seek by partitions
*/
private void seek(Reader reader) throws PulsarClientException {
if(reader instanceof MultiTopicsReaderImpl){
MultiTopicsReaderImpl multiTopicsReader =
(MultiTopicsReaderImpl)reader;
List<ConsumerImpl> consumers =
multiTopicsReader.getMultiTopicsConsumer().getConsumers();
for(Consumer consumer : consumers){
seek(consumer.getTopic(), messageId ->
consumer.seek(messageId));
}
}
else if(reader instanceof ReaderImpl){
seek(reader.getTopic(), messageId -> reader.seek(messageId));
}
else{
throw new IllegalArgumentException("Unknown reader type: " +
reader.getClass());
}
}
/**
* Find the relevant message ID to seek on the given consumer / reader
*/
private void seek(String topic, ExceptionalConsumer<MessageId> seek){
PartitionedTopicCursor<MessageId, Message<T>> partitionCursor =
cursors.get(topic); //Out internal cursor tracking
MessageId seekTo;
if(partitionCursor == null){
cursors.put(topic, new PartitionedTopicCursor(.../*Init new one
to keep tracking on this partition*/));
return; //no need to seek
}
MessageId seekTo seekTo = partitionCursor.getStartFrom();
seek.accept(seekTo);
}`
**Additional context**
Add any other context or screenshots about the feature request here.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]