This is an automated email from the ASF dual-hosted git repository.

lhotari pushed a commit to branch branch-3.0
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit 84588efc8ae72810892a245adcb9a0231c47dc57
Author: Lari Hotari <[email protected]>
AuthorDate: Mon Feb 9 13:25:32 2026 +0200

    Backport "[fix][client] Fix deadlock of NegativeAcksTracker (#23651)"
---
 .../pulsar/client/impl/NegativeAcksTracker.java    | 32 ++++++++++++----------
 1 file changed, 17 insertions(+), 15 deletions(-)

diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/NegativeAcksTracker.java
 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/NegativeAcksTracker.java
index d6b86e3593d..eccb6c8f005 100644
--- 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/NegativeAcksTracker.java
+++ 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/NegativeAcksTracker.java
@@ -66,30 +66,32 @@ class NegativeAcksTracker implements Closeable {
         }
     }
 
-    private synchronized void triggerRedelivery(Timeout t) {
-        if (nackedMessages.isEmpty()) {
-            this.timeout = null;
-            return;
-        }
-
-        // Group all the nacked messages into one single re-delivery request
+    private void triggerRedelivery(Timeout t) {
         Set<MessageId> messagesToRedeliver = new HashSet<>();
-        long now = System.nanoTime();
-        nackedMessages.forEach((msgId, timestamp) -> {
-            if (timestamp < now) {
-                addChunkedMessageIdsAndRemoveFromSequenceMap(msgId, 
messagesToRedeliver, this.consumer);
-                messagesToRedeliver.add(msgId);
+        synchronized (this) {
+            if (nackedMessages.isEmpty()) {
+                this.timeout = null;
+                return;
             }
-        });
 
+            long now = System.nanoTime();
+            nackedMessages.forEach((msgId, timestamp) -> {
+                if (timestamp < now) {
+                    addChunkedMessageIdsAndRemoveFromSequenceMap(msgId, 
messagesToRedeliver, this.consumer);
+                    messagesToRedeliver.add(msgId);
+                }
+            });
+            this.timeout = timer.newTimeout(this::triggerRedelivery, 
timerIntervalNanos, TimeUnit.NANOSECONDS);
+        }
+
+        // release the lock of NegativeAcksTracker before calling 
consumer.redeliverUnacknowledgedMessages,
+        // in which we may acquire the lock of consumer, leading to potential 
deadlock.
         if (!messagesToRedeliver.isEmpty()) {
             messagesToRedeliver.forEach(nackedMessages::remove);
             consumer.onNegativeAcksSend(messagesToRedeliver);
             log.info("[{}] {} messages will be re-delivered", consumer, 
messagesToRedeliver.size());
             consumer.redeliverUnacknowledgedMessages(messagesToRedeliver);
         }
-
-        this.timeout = timer.newTimeout(this::triggerRedelivery, 
timerIntervalNanos, TimeUnit.NANOSECONDS);
     }
 
     public synchronized void add(MessageId messageId) {

Reply via email to