This is an automated email from the ASF dual-hosted git repository. lhotari pushed a commit to branch branch-3.0 in repository https://gitbox.apache.org/repos/asf/pulsar.git
commit 84588efc8ae72810892a245adcb9a0231c47dc57 Author: Lari Hotari <[email protected]> AuthorDate: Mon Feb 9 13:25:32 2026 +0200 Backport "[fix][client] Fix deadlock of NegativeAcksTracker (#23651)" --- .../pulsar/client/impl/NegativeAcksTracker.java | 32 ++++++++++++---------- 1 file changed, 17 insertions(+), 15 deletions(-) diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/NegativeAcksTracker.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/NegativeAcksTracker.java index d6b86e3593d..eccb6c8f005 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/NegativeAcksTracker.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/NegativeAcksTracker.java @@ -66,30 +66,32 @@ class NegativeAcksTracker implements Closeable { } } - private synchronized void triggerRedelivery(Timeout t) { - if (nackedMessages.isEmpty()) { - this.timeout = null; - return; - } - - // Group all the nacked messages into one single re-delivery request + private void triggerRedelivery(Timeout t) { Set<MessageId> messagesToRedeliver = new HashSet<>(); - long now = System.nanoTime(); - nackedMessages.forEach((msgId, timestamp) -> { - if (timestamp < now) { - addChunkedMessageIdsAndRemoveFromSequenceMap(msgId, messagesToRedeliver, this.consumer); - messagesToRedeliver.add(msgId); + synchronized (this) { + if (nackedMessages.isEmpty()) { + this.timeout = null; + return; } - }); + long now = System.nanoTime(); + nackedMessages.forEach((msgId, timestamp) -> { + if (timestamp < now) { + addChunkedMessageIdsAndRemoveFromSequenceMap(msgId, messagesToRedeliver, this.consumer); + messagesToRedeliver.add(msgId); + } + }); + this.timeout = timer.newTimeout(this::triggerRedelivery, timerIntervalNanos, TimeUnit.NANOSECONDS); + } + + // release the lock of NegativeAcksTracker before calling consumer.redeliverUnacknowledgedMessages, + // in which we may acquire the lock of consumer, leading to potential deadlock. if (!messagesToRedeliver.isEmpty()) { messagesToRedeliver.forEach(nackedMessages::remove); consumer.onNegativeAcksSend(messagesToRedeliver); log.info("[{}] {} messages will be re-delivered", consumer, messagesToRedeliver.size()); consumer.redeliverUnacknowledgedMessages(messagesToRedeliver); } - - this.timeout = timer.newTimeout(this::triggerRedelivery, timerIntervalNanos, TimeUnit.NANOSECONDS); } public synchronized void add(MessageId messageId) {
