Penghui meet this issue in their test.
This issue may be caused by the un-acked messages tracking in both individual
ConsumerImpl and MultiTopicsConsumerImpl.
In MultiTopicsConsumerImpl, we are managing and tracking all un-acked message.
But when using MessageListener, there will be some issues.
This is because MessageListener.received() is only called 1 time when the
message is delivered from underline individual ConsumerImpl.
The calling stack is like this:
```
MultiTopicsConsumerImpl.startReceivingMessages() // try to receive messages
from all individual ConsumerImpl
\
MultiTopicsConsumerImpl.receiveMessageFromConsumer(ConsumerImpl consumer)
// call individual ConsumerImpl.receiveAsync() to import messages. And call
MessageListener.received() in receiveAsync() call back.
\
MultiTopicsConsumerImpl.messageReceived() // called in
ConsumerImpl.receiveAsync() callback.
\
listener.received
```
While in ConsumerImpl.messageProcessed(), it not tracking messages for
MultiTopicsConsumerImpl.
```
if (partitionIndex != -1) {
// we should no longer track this message, TopicsConsumer will take care
from now onwards
unAckedMessageTracker.remove(id); < === we are not tracking messages
} else {
unAckedMessageTracker.add(id);
}
```
So for a timed-out message, it will not be redelivered in ConsumerImpl, and as
above calling stack, MessageListener.received() will not get called.
[ Full content available at:
https://github.com/apache/incubator-pulsar/issues/2574 ]
This message was relayed via gitbox.apache.org for [email protected]