This is an automated email from the ASF dual-hosted git repository. lhotari pushed a commit to branch branch-4.0 in repository https://gitbox.apache.org/repos/asf/pulsar.git
commit 8b3245f25fa51b5d4a2576af361608208f42a662 Author: Ruimin MA <[email protected]> AuthorDate: Tue Oct 28 10:50:18 2025 +0800 [fix][broker] Fix totalAvailablePermits not reduced when removing consumer from non-persistent dispatcher (#24885) (cherry picked from commit ebfff5814ed21811bc3f6ae076fb265c7266c2d8) --- .../NonPersistentDispatcherMultipleConsumers.java | 2 ++ ...ntStickyKeyDispatcherMultipleConsumersTest.java | 26 ++++++++++++++++++++++ 2 files changed, 28 insertions(+) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentDispatcherMultipleConsumers.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentDispatcherMultipleConsumers.java index 5941093e71c..f729175f70c 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentDispatcherMultipleConsumers.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentDispatcherMultipleConsumers.java @@ -106,6 +106,8 @@ public class NonPersistentDispatcherMultipleConsumers extends AbstractDispatcher closeFuture.complete(null); } TOTAL_AVAILABLE_PERMITS_UPDATER.set(this, 0); + } else { + TOTAL_AVAILABLE_PERMITS_UPDATER.addAndGet(this, -consumer.getAvailablePermits()); } } else { if (log.isDebugEnabled()) { diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentStickyKeyDispatcherMultipleConsumersTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentStickyKeyDispatcherMultipleConsumersTest.java index 8c495cbd9db..d0c8a0c7dd2 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentStickyKeyDispatcherMultipleConsumersTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentStickyKeyDispatcherMultipleConsumersTest.java @@ -106,6 +106,32 @@ public class NonPersistentStickyKeyDispatcherMultipleConsumersTest { assertTrue(selector.getConsumerKeyHashRanges().isEmpty()); } + @Test(timeOut = 10000) + public void testTotalAvailablePermitsWhenRemoveConsumer() throws Exception { + final int permits = 10; + // add 2 consumers to dispatcher + Consumer consumerMock = mock(Consumer.class); + when(consumerMock.getAvailablePermits()).thenReturn(permits); + nonpersistentDispatcher.addConsumer(consumerMock); + Consumer consumerMock2 = mock(Consumer.class); + when(consumerMock2.getAvailablePermits()).thenReturn(permits); + when(consumerMock2.isWritable()).thenReturn(true); + nonpersistentDispatcher.addConsumer(consumerMock2); + + // add 10 flow permits to the dispatcher, so TOTAL_AVAILABLE_PERMITS_UPDATER will be 10 + nonpersistentDispatcher.consumerFlow(consumerMock2, permits); + assertEquals(NonPersistentDispatcherMultipleConsumers + .TOTAL_AVAILABLE_PERMITS_UPDATER.get(nonpersistentDispatcher), 10); + // add another 10 flow permits to the dispatcher, so TOTAL_AVAILABLE_PERMITS_UPDATER will be 20 + nonpersistentDispatcher.consumerFlow(consumerMock, permits); + assertEquals(NonPersistentDispatcherMultipleConsumers. + TOTAL_AVAILABLE_PERMITS_UPDATER.get(nonpersistentDispatcher), 20); + // remove one consumer, so the TOTAL_AVAILABLE_PERMITS_UPDATER of the dispatcher will be reduced to 10 + nonpersistentDispatcher.removeConsumer(consumerMock); + assertEquals(NonPersistentDispatcherMultipleConsumers. + TOTAL_AVAILABLE_PERMITS_UPDATER.get(nonpersistentDispatcher), 10); + } + @Test(timeOut = 10000) public void testSendMessage() throws BrokerServiceException { Consumer consumerMock = mock(Consumer.class);
