This is an automated email from the ASF dual-hosted git repository. penghui pushed a commit to branch branch-2.9 in repository https://gitbox.apache.org/repos/asf/pulsar.git
commit a08f0f506e5985df50a818e601ee4b3869e1ce02 Author: Qiang Zhao <[email protected]> AuthorDate: Mon Jan 10 19:27:26 2022 +0800 [Java Client] fixed Producer semaphore permit release issue (#13682) (cherry picked from commit 916b61d3a497c44c8511347d2c4eac95c53f8506) --- .../apache/pulsar/client/impl/ProducerSemaphoreTest.java | 14 ++++++++++++++ .../java/org/apache/pulsar/client/impl/ProducerImpl.java | 16 +++++++++++----- 2 files changed, 25 insertions(+), 5 deletions(-) diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ProducerSemaphoreTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ProducerSemaphoreTest.java index c719cbd..0c7f4b1 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ProducerSemaphoreTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ProducerSemaphoreTest.java @@ -75,11 +75,13 @@ public class ProducerSemaphoreTest extends ProducerConsumerBase { futures.add(producer.newMessage().value(("Semaphore-test-" + i).getBytes()).sendAsync()); } Assert.assertEquals(producer.getSemaphore().get().availablePermits(), pendingQueueSize - messages); + Assert.assertFalse(producer.isErrorStat()); } finally { producer.getClientCnx().channel().config().setAutoRead(true); } FutureUtil.waitForAll(futures).get(); Assert.assertEquals(producer.getSemaphore().get().availablePermits(), pendingQueueSize); + Assert.assertFalse(producer.isErrorStat()); futures.clear(); // Simulate replicator, non batching message but `numMessagesInBatch` of message metadata > 1 @@ -92,15 +94,18 @@ public class ProducerSemaphoreTest extends ProducerConsumerBase { futures.add(producer.sendAsync(msg)); } Assert.assertEquals(producer.getSemaphore().get().availablePermits(), pendingQueueSize - messages/2); + Assert.assertFalse(producer.isErrorStat()); } finally { producer.getClientCnx().channel().config().setAutoRead(true); } FutureUtil.waitForAll(futures).get(); Assert.assertEquals(producer.getSemaphore().get().availablePermits(), pendingQueueSize); + Assert.assertFalse(producer.isErrorStat()); futures.clear(); // Here must ensure that the semaphore available permits is 0 Assert.assertEquals(producer.getSemaphore().get().availablePermits(), pendingQueueSize); + Assert.assertFalse(producer.isErrorStat()); // Acquire 5 and not wait the send ack call back producer.getClientCnx().channel().config().setAutoRead(false); @@ -111,12 +116,14 @@ public class ProducerSemaphoreTest extends ProducerConsumerBase { // Here must ensure that the Semaphore a acquired 5 Assert.assertEquals(producer.getSemaphore().get().availablePermits(), pendingQueueSize - messages / 2); + Assert.assertFalse(producer.isErrorStat()); } finally { producer.getClientCnx().channel().config().setAutoRead(true); } FutureUtil.waitForAll(futures).get(); Assert.assertEquals(producer.getSemaphore().get().availablePermits(), pendingQueueSize); + Assert.assertFalse(producer.isErrorStat()); } /** @@ -141,6 +148,7 @@ public class ProducerSemaphoreTest extends ProducerConsumerBase { // Test that when we fill the queue with "replicator" messages, we are notified // (replicator itself would block) Assert.assertEquals(producer.getSemaphore().get().availablePermits(), pendingQueueSize); + Assert.assertFalse(producer.isErrorStat()); producer.getClientCnx().channel().config().setAutoRead(false); try { for (int i = 0; i < pendingQueueSize; i++) { @@ -151,6 +159,7 @@ public class ProducerSemaphoreTest extends ProducerConsumerBase { futures.add(producer.sendAsync(msg)); } Assert.assertEquals(producer.getSemaphore().get().availablePermits(), 0); + Assert.assertFalse(producer.isErrorStat()); try { MessageMetadata metadata = new MessageMetadata() .setNumMessagesInBatch(10); @@ -162,6 +171,7 @@ public class ProducerSemaphoreTest extends ProducerConsumerBase { Assert.assertEquals(ee.getCause().getClass(), PulsarClientException.ProducerQueueIsFullError.class); Assert.assertEquals(producer.getSemaphore().get().availablePermits(), 0); + Assert.assertFalse(producer.isErrorStat()); } } finally { producer.getClientCnx().channel().config().setAutoRead(true); @@ -171,12 +181,14 @@ public class ProducerSemaphoreTest extends ProducerConsumerBase { // Test that when we fill the queue with normal messages, we get an error Assert.assertEquals(producer.getSemaphore().get().availablePermits(), pendingQueueSize); + Assert.assertFalse(producer.isErrorStat()); producer.getClientCnx().channel().config().setAutoRead(false); try { for (int i = 0; i < pendingQueueSize; i++) { futures.add(producer.newMessage().value(("Semaphore-test-" + i).getBytes()).sendAsync()); } Assert.assertEquals(producer.getSemaphore().get().availablePermits(), 0); + Assert.assertFalse(producer.isErrorStat()); try { producer.newMessage().value(("Semaphore-test-Q-full").getBytes()).sendAsync().get(); @@ -184,6 +196,7 @@ public class ProducerSemaphoreTest extends ProducerConsumerBase { Assert.assertEquals(ee.getCause().getClass(), PulsarClientException.ProducerQueueIsFullError.class); Assert.assertEquals(producer.getSemaphore().get().availablePermits(), 0); + Assert.assertFalse(producer.isErrorStat()); } } finally { @@ -191,5 +204,6 @@ public class ProducerSemaphoreTest extends ProducerConsumerBase { } FutureUtil.waitForAll(futures).get(); Assert.assertEquals(producer.getSemaphore().get().availablePermits(), pendingQueueSize); + Assert.assertFalse(producer.isErrorStat()); } } diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java index f5165d7..46649bd 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java @@ -266,11 +266,12 @@ public class ProducerImpl<T> extends ProducerBase<T> implements TimerTask, Conne protected void semaphoreRelease(final int releaseCountRequest) { if (semaphore.isPresent()) { if (!errorState) { - final int availablePermits = semaphore.get().availablePermits(); - if (availablePermits - releaseCountRequest < 0) { - log.error("Semaphore permit release count request greater then availablePermits" + - " : availablePermits={}, releaseCountRequest={}", - availablePermits, releaseCountRequest); + final int availableReleasePermits = + conf.getMaxPendingMessages() - this.semaphore.get().availablePermits(); + if (availableReleasePermits - releaseCountRequest < 0) { + log.error("Semaphore permit release count request greater then availableReleasePermits" + + " : availableReleasePermits={}, releaseCountRequest={}", + availableReleasePermits, releaseCountRequest); errorState = true; } } @@ -2043,5 +2044,10 @@ public class ProducerImpl<T> extends ProducerBase<T> implements TimerTask, Conne return semaphore; } + @VisibleForTesting + boolean isErrorStat() { + return errorState; + } + private static final Logger log = LoggerFactory.getLogger(ProducerImpl.class); }
