massakam commented on a change in pull request #10762:
URL: https://github.com/apache/pulsar/pull/10762#discussion_r642824270
##########
File path:
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java
##########
@@ -697,20 +698,22 @@ public boolean isConsumerAvailable(Consumer consumer) {
@Override
public synchronized void redeliverUnacknowledgedMessages(Consumer
consumer) {
- consumer.getPendingAcks().forEach((ledgerId, entryId, batchSize, none)
-> {
- addMessageToReplay(ledgerId, entryId);
+ consumer.getPendingAcks().forEach((ledgerId, entryId, batchSize,
stickyKeyHash) -> {
+ addMessageToReplay(ledgerId, entryId, stickyKeyHash);
});
if (log.isDebugEnabled()) {
log.debug("[{}-{}] Redelivering unacknowledged messages for
consumer {}", name, consumer,
- messagesToRedeliver);
+ redeliveryMessages);
}
readMoreEntries();
}
@Override
public synchronized void redeliverUnacknowledgedMessages(Consumer
consumer, List<PositionImpl> positions) {
positions.forEach(position -> {
- if (addMessageToReplay(position.getLedgerId(),
position.getEntryId())) {
+ // TODO: We want to pass a sticky key hash as a third argument to
guarantee the order of the messages
+ // on Key_Shared subscription, but it's difficult to get the
sticky key here
+ if (addMessageToReplay(position.getLedgerId(),
position.getEntryId(), null)) {
Review comment:
This method is executed when redelivery is requested from the consumer
side (e.g. negative ack, ack timeout). So the user should allow the messages to
be out of order in this case.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]