This is an automated email from the ASF dual-hosted git repository.

lhotari pushed a commit to branch branch-3.0
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit 7e8d0cc1d3c6c7b97cf8ee5dc8b1cc2ef9d1fdc3
Author: Lari Hotari <[email protected]>
AuthorDate: Mon Feb 9 13:08:48 2026 +0200

    Revert "[fix][client] Fix deadlock of NegativeAcksTracker (#23651)"
    
    This reverts commit 3a533aff447665c0541888f890b4faeaed6c4336.
---
 .../pulsar/client/impl/NegativeAcksTracker.java    | 40 ++++++++++------------
 1 file changed, 19 insertions(+), 21 deletions(-)

diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/NegativeAcksTracker.java
 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/NegativeAcksTracker.java
index 5256ebf04f4..e1724ebb85c 100644
--- 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/NegativeAcksTracker.java
+++ 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/NegativeAcksTracker.java
@@ -68,38 +68,36 @@ class NegativeAcksTracker implements Closeable {
         }
     }
 
-    private void triggerRedelivery(Timeout t) {
+    private synchronized void triggerRedelivery(Timeout t) {
+        if (nackedMessages.isEmpty()) {
+            this.timeout = null;
+            return;
+        }
+
+        // Group all the nacked messages into one single re-delivery request
         Set<MessageId> messagesToRedeliver = new HashSet<>();
-        synchronized (this) {
-            if (nackedMessages.isEmpty()) {
-                this.timeout = null;
-                return;
+        long now = System.nanoTime();
+        nackedMessages.forEach((ledgerId, entryId, partitionIndex, timestamp) 
-> {
+            if (timestamp < now) {
+                MessageId msgId = new MessageIdImpl(ledgerId, entryId,
+                        // need to covert non-partitioned topic partition 
index to -1
+                        (int) (partitionIndex == 
NON_PARTITIONED_TOPIC_PARTITION_INDEX ? -1 : partitionIndex));
+                addChunkedMessageIdsAndRemoveFromSequenceMap(msgId, 
messagesToRedeliver, this.consumer);
+                messagesToRedeliver.add(msgId);
             }
+        });
 
-            long now = System.nanoTime();
-            nackedMessages.forEach((ledgerId, entryId, partitionIndex, 
timestamp) -> {
-                if (timestamp < now) {
-                    MessageId msgId = new MessageIdImpl(ledgerId, entryId,
-                            // need to covert non-partitioned topic partition 
index to -1
-                            (int) (partitionIndex == 
NON_PARTITIONED_TOPIC_PARTITION_INDEX ? -1 : partitionIndex));
-                    addChunkedMessageIdsAndRemoveFromSequenceMap(msgId, 
messagesToRedeliver, this.consumer);
-                    messagesToRedeliver.add(msgId);
-                }
-            });
+        if (!messagesToRedeliver.isEmpty()) {
             for (MessageId messageId : messagesToRedeliver) {
                 nackedMessages.remove(((MessageIdImpl) 
messageId).getLedgerId(),
                         ((MessageIdImpl) messageId).getEntryId());
             }
-            this.timeout = timer.newTimeout(this::triggerRedelivery, 
timerIntervalNanos, TimeUnit.NANOSECONDS);
-        }
-
-        // release the lock of NegativeAcksTracker before calling 
consumer.redeliverUnacknowledgedMessages,
-        // in which we may acquire the lock of consumer, leading to potential 
deadlock.
-        if (!messagesToRedeliver.isEmpty()) {
             consumer.onNegativeAcksSend(messagesToRedeliver);
             log.info("[{}] {} messages will be re-delivered", consumer, 
messagesToRedeliver.size());
             consumer.redeliverUnacknowledgedMessages(messagesToRedeliver);
         }
+
+        this.timeout = timer.newTimeout(this::triggerRedelivery, 
timerIntervalNanos, TimeUnit.NANOSECONDS);
     }
 
     public synchronized void add(MessageId messageId) {

Reply via email to