### Motivation
unacked message is not redelivered after setting ackTimeout, but it is actually
redelivered after 2*acktimeout.
The main reason is in UnAckedMessageTracker.
```
public void start(PulsarClientImpl client, ConsumerBase<?> consumerBase,
long ackTimeoutMillis) {
this.stop();
timeout = client.timer().newTimeout(new TimerTask() {
@Override
public void run(Timeout t) throws Exception {
if (isAckTimeout()) { < === first timeout, it is false,
because oldOpenSet is empty.
log.warn("[{}] {} messages have timed-out", consumerBase,
oldOpenSet.size());
Set<MessageId> messageIds = new HashSet<>();
oldOpenSet.forEach(messageIds::add);
oldOpenSet.clear();
consumerBase.redeliverUnacknowledgedMessages(messageIds);
}
toggle(); < === toggle after timeout
timeout = client.timer().newTimeout(this, ackTimeoutMillis,
TimeUnit.MILLISECONDS);
}
}, ackTimeoutMillis, TimeUnit.MILLISECONDS);
}
```
before first timeout, all messageId was added in CurrentSet, not in OldOpenSet,
so isAckTimeout() is false, and `redeliverUnacknowledgedMessages` was not
called at first timeout.
### Modifications
- move `toggle()` from behind if clause to before if clause.
- add ut
### Result
ut passed
[ Full content available at:
https://github.com/apache/incubator-pulsar/pull/2590 ]
This message was relayed via gitbox.apache.org for [email protected]