YinY1 commented on issue #25204: URL: https://github.com/apache/pulsar/issues/25204#issuecomment-3839587533
I suppose the bug may caused by `MultiTopicsConsumerImpl.receiveMessageFromConsumer()`. More specificly, consumer async received a batch of messages, then validated consumer epoch. If `redeliverUnacknowledgedMessages` increses epoch concurrently, messages with same epoch may be filtered out after epoch updated, but some will not. For example, a list of messages with epoch [0, 0, 0]. User call `redeliverUnacknowledgedMessages` updated epoch to 1, so user wants get messages with epoch 1 after called that. But in internal consumer, `messagesFuture` may completed before user's call, then iterate each message and validate epoch. msg[0] will not be filtered out for current epoch is 0. Then user's call happened, epoch has been updated to 1, so msg[1..] will be filtered out. timeline is like: `CONSUMER_EPOCH` = 0 --> `messagesFuture.thenAcceptAsync` with messages **epochs** [0, 0, 0] --> isValidConsumerEpoch(messages[0]) = **true** --> `redeliverUnacknowledgedMessages()` --> `CONSUMER_EPOCH.incrementAndGet` (now is 1) `isValidConsumerEpoch(messages[1])` = **false** --> `isValidConsumerEpoch(messages[2])` = **false** --> `consumer.receive()` got *messages[0]* relivant code is in pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java:250-284 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
