This is an automated email from the ASF dual-hosted git repository. lhotari pushed a commit to branch branch-3.0 in repository https://gitbox.apache.org/repos/asf/pulsar.git
commit 7e8d0cc1d3c6c7b97cf8ee5dc8b1cc2ef9d1fdc3 Author: Lari Hotari <[email protected]> AuthorDate: Mon Feb 9 13:08:48 2026 +0200 Revert "[fix][client] Fix deadlock of NegativeAcksTracker (#23651)" This reverts commit 3a533aff447665c0541888f890b4faeaed6c4336. --- .../pulsar/client/impl/NegativeAcksTracker.java | 40 ++++++++++------------ 1 file changed, 19 insertions(+), 21 deletions(-) diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/NegativeAcksTracker.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/NegativeAcksTracker.java index 5256ebf04f4..e1724ebb85c 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/NegativeAcksTracker.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/NegativeAcksTracker.java @@ -68,38 +68,36 @@ class NegativeAcksTracker implements Closeable { } } - private void triggerRedelivery(Timeout t) { + private synchronized void triggerRedelivery(Timeout t) { + if (nackedMessages.isEmpty()) { + this.timeout = null; + return; + } + + // Group all the nacked messages into one single re-delivery request Set<MessageId> messagesToRedeliver = new HashSet<>(); - synchronized (this) { - if (nackedMessages.isEmpty()) { - this.timeout = null; - return; + long now = System.nanoTime(); + nackedMessages.forEach((ledgerId, entryId, partitionIndex, timestamp) -> { + if (timestamp < now) { + MessageId msgId = new MessageIdImpl(ledgerId, entryId, + // need to covert non-partitioned topic partition index to -1 + (int) (partitionIndex == NON_PARTITIONED_TOPIC_PARTITION_INDEX ? -1 : partitionIndex)); + addChunkedMessageIdsAndRemoveFromSequenceMap(msgId, messagesToRedeliver, this.consumer); + messagesToRedeliver.add(msgId); } + }); - long now = System.nanoTime(); - nackedMessages.forEach((ledgerId, entryId, partitionIndex, timestamp) -> { - if (timestamp < now) { - MessageId msgId = new MessageIdImpl(ledgerId, entryId, - // need to covert non-partitioned topic partition index to -1 - (int) (partitionIndex == NON_PARTITIONED_TOPIC_PARTITION_INDEX ? -1 : partitionIndex)); - addChunkedMessageIdsAndRemoveFromSequenceMap(msgId, messagesToRedeliver, this.consumer); - messagesToRedeliver.add(msgId); - } - }); + if (!messagesToRedeliver.isEmpty()) { for (MessageId messageId : messagesToRedeliver) { nackedMessages.remove(((MessageIdImpl) messageId).getLedgerId(), ((MessageIdImpl) messageId).getEntryId()); } - this.timeout = timer.newTimeout(this::triggerRedelivery, timerIntervalNanos, TimeUnit.NANOSECONDS); - } - - // release the lock of NegativeAcksTracker before calling consumer.redeliverUnacknowledgedMessages, - // in which we may acquire the lock of consumer, leading to potential deadlock. - if (!messagesToRedeliver.isEmpty()) { consumer.onNegativeAcksSend(messagesToRedeliver); log.info("[{}] {} messages will be re-delivered", consumer, messagesToRedeliver.size()); consumer.redeliverUnacknowledgedMessages(messagesToRedeliver); } + + this.timeout = timer.newTimeout(this::triggerRedelivery, timerIntervalNanos, TimeUnit.NANOSECONDS); } public synchronized void add(MessageId messageId) {
