jiazhai commented on issue #2574: Timeout message not get redeliver in TopicsConsumer when use message listener URL: https://github.com/apache/incubator-pulsar/issues/2574#issuecomment-421044188 Penghui meet this issue in their test. This issue may be caused by the un-acked messages tracking in both individual ConsumerImpl and MultiTopicsConsumerImpl. In MultiTopicsConsumerImpl, we are managing and tracking all un-acked message. But when using MessageListener, there will be some issues. This is because MessageListener.received() is only called 1 time when the message is delivered from underline individual ConsumerImpl. The calling stack is like this: ``` MultiTopicsConsumerImpl.startReceivingMessages() // try to receive messages from all individual ConsumerImpl \ MultiTopicsConsumerImpl.receiveMessageFromConsumer(ConsumerImpl consumer) // call individual ConsumerImpl.receiveAsync() to import messages. And call MessageListener.received() in receiveAsync() call back. \ MultiTopicsConsumerImpl.messageReceived() // called in ConsumerImpl.receiveAsync() callback. \ listener.received ``` While in ConsumerImpl.messageProcessed(), it not tracking messages for MultiTopicsConsumerImpl. ``` if (partitionIndex != -1) { // we should no longer track this message, TopicsConsumer will take care from now onwards unAckedMessageTracker.remove(id); < === we are not tracking messages } else { unAckedMessageTracker.add(id); } ``` So for a timed-out message, it will not be redelivered in ConsumerImpl, and as above calling stack, MessageListener.received() will not get called.
---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services