This is an automated email from the ASF dual-hosted git repository.

lhotari pushed a commit to branch branch-4.0
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit 0693715ca096e55cb52e294decdbf39496d3ecec
Author: Matteo Merli <[email protected]>
AuthorDate: Thu Apr 30 06:18:54 2026 -0700

    [fix][broker] Fix stuck chunks in SharedConsumerAssignor permit tracking 
(#25620)
    
    (cherry picked from commit 759a5f520f81ba45caef9bd94a79997855a695fd)
---
 .../apache/pulsar/broker/service/SharedConsumerAssignor.java   | 10 +++++-----
 1 file changed, 5 insertions(+), 5 deletions(-)

diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/SharedConsumerAssignor.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/SharedConsumerAssignor.java
index bbf8dfd2b10..a317ad7560b 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/SharedConsumerAssignor.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/SharedConsumerAssignor.java
@@ -89,7 +89,7 @@ public class SharedConsumerAssignor {
             if (metadata == null || !metadata.hasUuid() || 
!metadata.hasChunkId() || !metadata.hasNumChunksFromMsg()) {
                 consumerToEntries.computeIfAbsent(consumer, __ -> new 
ArrayList<>()).add(entryAndMetadata);
             } else {
-                final Consumer consumerForUuid = getConsumerForUuid(metadata, 
consumer, availablePermits);
+                final Consumer consumerForUuid = getConsumerForUuid(metadata, 
consumer);
                 if (consumerForUuid == null) {
                     unassignedMessageProcessor.accept(entryAndMetadata);
                     continue;
@@ -120,9 +120,7 @@ public class SharedConsumerAssignor {
         return null;
     }
 
-    private Consumer getConsumerForUuid(final MessageMetadata metadata,
-                                        final Consumer defaultConsumer,
-                                        final int currentAvailablePermits) {
+    private Consumer getConsumerForUuid(final MessageMetadata metadata, final 
Consumer defaultConsumer) {
         final String uuid = metadata.getUuid();
         Consumer consumer = uuidToConsumer.get(uuid);
         if (consumer == null) {
@@ -141,7 +139,9 @@ public class SharedConsumerAssignor {
             // The last chunk is received, we should remove the cache
             uuidToConsumer.remove(uuid);
         }
-        consumerToPermits.put(consumer, currentAvailablePermits - 1);
+        // Decrement target consumer's permits, not the loop's local 
availablePermits — on a cache
+        // redirect those track different consumers.
+        consumerToPermits.put(consumer, permits - 1);
         return consumer;
     }
 }

Reply via email to