### Motivation

unacked message is not redelivered after setting ackTimeout, but it is actually 
redelivered after 2*acktimeout.

The main reason is in UnAckedMessageTracker.
```
    public void start(PulsarClientImpl client, ConsumerBase<?> consumerBase, 
long ackTimeoutMillis) {
        this.stop();
        timeout = client.timer().newTimeout(new TimerTask() {
            @Override
            public void run(Timeout t) throws Exception {
                if (isAckTimeout()) {   < === first timeout, it is false, 
because oldOpenSet is empty.
                    log.warn("[{}] {} messages have timed-out", consumerBase, 
oldOpenSet.size());
                    Set<MessageId> messageIds = new HashSet<>();
                    oldOpenSet.forEach(messageIds::add);
                    oldOpenSet.clear();
                    consumerBase.redeliverUnacknowledgedMessages(messageIds);
                }
                toggle();    < === toggle after timeout
                timeout = client.timer().newTimeout(this, ackTimeoutMillis, 
TimeUnit.MILLISECONDS);
            }
        }, ackTimeoutMillis, TimeUnit.MILLISECONDS);
    }
```
before first timeout, all messageId was added in CurrentSet, not in OldOpenSet, 
so isAckTimeout() is false, and `redeliverUnacknowledgedMessages` was not 
called at first timeout.

### Modifications

- move `toggle()` from behind if clause to before if clause.
- add ut

### Result

ut passed

[ Full content available at: 
https://github.com/apache/incubator-pulsar/pull/2590 ]
This message was relayed via gitbox.apache.org for [email protected]

Reply via email to