This is an automated email from the ASF dual-hosted git repository.

lhotari pushed a commit to branch branch-4.0
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit 8b3245f25fa51b5d4a2576af361608208f42a662
Author: Ruimin MA <[email protected]>
AuthorDate: Tue Oct 28 10:50:18 2025 +0800

    [fix][broker] Fix totalAvailablePermits not reduced when removing consumer 
from non-persistent dispatcher (#24885)
    
    (cherry picked from commit ebfff5814ed21811bc3f6ae076fb265c7266c2d8)
---
 .../NonPersistentDispatcherMultipleConsumers.java  |  2 ++
 ...ntStickyKeyDispatcherMultipleConsumersTest.java | 26 ++++++++++++++++++++++
 2 files changed, 28 insertions(+)

diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentDispatcherMultipleConsumers.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentDispatcherMultipleConsumers.java
index 5941093e71c..f729175f70c 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentDispatcherMultipleConsumers.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentDispatcherMultipleConsumers.java
@@ -106,6 +106,8 @@ public class NonPersistentDispatcherMultipleConsumers 
extends AbstractDispatcher
                     closeFuture.complete(null);
                 }
                 TOTAL_AVAILABLE_PERMITS_UPDATER.set(this, 0);
+            } else {
+                TOTAL_AVAILABLE_PERMITS_UPDATER.addAndGet(this, 
-consumer.getAvailablePermits());
             }
         } else {
             if (log.isDebugEnabled()) {
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentStickyKeyDispatcherMultipleConsumersTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentStickyKeyDispatcherMultipleConsumersTest.java
index 8c495cbd9db..d0c8a0c7dd2 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentStickyKeyDispatcherMultipleConsumersTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentStickyKeyDispatcherMultipleConsumersTest.java
@@ -106,6 +106,32 @@ public class 
NonPersistentStickyKeyDispatcherMultipleConsumersTest {
         assertTrue(selector.getConsumerKeyHashRanges().isEmpty());
     }
 
+    @Test(timeOut = 10000)
+    public void testTotalAvailablePermitsWhenRemoveConsumer() throws Exception 
{
+        final int permits = 10;
+        // add 2 consumers to dispatcher
+        Consumer consumerMock = mock(Consumer.class);
+        when(consumerMock.getAvailablePermits()).thenReturn(permits);
+        nonpersistentDispatcher.addConsumer(consumerMock);
+        Consumer consumerMock2 = mock(Consumer.class);
+        when(consumerMock2.getAvailablePermits()).thenReturn(permits);
+        when(consumerMock2.isWritable()).thenReturn(true);
+        nonpersistentDispatcher.addConsumer(consumerMock2);
+
+        // add 10 flow permits to the dispatcher, so 
TOTAL_AVAILABLE_PERMITS_UPDATER will be 10
+        nonpersistentDispatcher.consumerFlow(consumerMock2, permits);
+        assertEquals(NonPersistentDispatcherMultipleConsumers
+                        
.TOTAL_AVAILABLE_PERMITS_UPDATER.get(nonpersistentDispatcher), 10);
+        // add another 10 flow permits to the dispatcher, so 
TOTAL_AVAILABLE_PERMITS_UPDATER will be 20
+        nonpersistentDispatcher.consumerFlow(consumerMock, permits);
+        assertEquals(NonPersistentDispatcherMultipleConsumers.
+                        
TOTAL_AVAILABLE_PERMITS_UPDATER.get(nonpersistentDispatcher), 20);
+        // remove one consumer, so the TOTAL_AVAILABLE_PERMITS_UPDATER of the 
dispatcher will be reduced to 10
+        nonpersistentDispatcher.removeConsumer(consumerMock);
+        assertEquals(NonPersistentDispatcherMultipleConsumers.
+                TOTAL_AVAILABLE_PERMITS_UPDATER.get(nonpersistentDispatcher), 
10);
+    }
+
     @Test(timeOut = 10000)
     public void testSendMessage() throws BrokerServiceException {
         Consumer consumerMock = mock(Consumer.class);

Reply via email to