Penghui meet this issue in their test.
This issue may be caused by the un-acked messages tracking in both individual 
ConsumerImpl and MultiTopicsConsumerImpl.


In MultiTopicsConsumerImpl, we are managing and tracking all un-acked message.  
But when using MessageListener, there will be some issues.  
This is because MessageListener.received() is only called 1 time when the 
message is delivered from underline individual ConsumerImpl.
The calling stack is like this:
```
MultiTopicsConsumerImpl.startReceivingMessages() // try to receive messages 
from all individual ConsumerImpl
\
MultiTopicsConsumerImpl.receiveMessageFromConsumer(ConsumerImpl consumer)
// call individual ConsumerImpl.receiveAsync() to import messages. And call 
MessageListener.received() in receiveAsync() call back.
\
MultiTopicsConsumerImpl.messageReceived()  // called in 
ConsumerImpl.receiveAsync() callback.
\
listener.received
```

While in ConsumerImpl.messageProcessed(), it not tracking messages for 
MultiTopicsConsumerImpl.
```
if (partitionIndex != -1) {
    // we should no longer track this message, TopicsConsumer will take care 
from now onwards
    unAckedMessageTracker.remove(id);    < === we are not tracking messages
} else {
    unAckedMessageTracker.add(id);
}
```

So for a timed-out message, it will not be redelivered in ConsumerImpl, and as 
above calling stack, MessageListener.received()  will not get called.

[ Full content available at: 
https://github.com/apache/incubator-pulsar/issues/2574 ]
This message was relayed via gitbox.apache.org for [email protected]

Reply via email to