@codelipenghui meet this issue in their usage. 
The main reason is in UnAckedMessageTracker.
```
    public void start(PulsarClientImpl client, ConsumerBase<?> consumerBase, 
long ackTimeoutMillis) {
        this.stop();
        timeout = client.timer().newTimeout(new TimerTask() {
            @Override
            public void run(Timeout t) throws Exception {
                if (isAckTimeout()) {   < === first timeout, it is false, 
because oldOpenSet is empty.
                    log.warn("[{}] {} messages have timed-out", consumerBase, 
oldOpenSet.size());
                    Set<MessageId> messageIds = new HashSet<>();
                    oldOpenSet.forEach(messageIds::add);
                    oldOpenSet.clear();
                    consumerBase.redeliverUnacknowledgedMessages(messageIds);
                }
                toggle();    < === toggle after timeout
                timeout = client.timer().newTimeout(this, ackTimeoutMillis, 
TimeUnit.MILLISECONDS);
            }
        }, ackTimeoutMillis, TimeUnit.MILLISECONDS);
    }
```
before first timeout, all messageId was added in CurrentSet, not in OldOpenSet, 
so isAckTimeout() is false, and `redeliverUnacknowledgedMessages` was not 
called at first timeout.

The fix may move `toggle()` from behind if clause to before if clause.

[ Full content available at: 
https://github.com/apache/incubator-pulsar/issues/2584 ]
This message was relayed via gitbox.apache.org for [email protected]

Reply via email to