This is an automated email from the ASF dual-hosted git repository.

penghui pushed a commit to branch branch-2.9
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit a08f0f506e5985df50a818e601ee4b3869e1ce02
Author: Qiang Zhao <[email protected]>
AuthorDate: Mon Jan 10 19:27:26 2022 +0800

    [Java Client] fixed Producer semaphore permit release issue (#13682)
    
    (cherry picked from commit 916b61d3a497c44c8511347d2c4eac95c53f8506)
---
 .../apache/pulsar/client/impl/ProducerSemaphoreTest.java | 14 ++++++++++++++
 .../java/org/apache/pulsar/client/impl/ProducerImpl.java | 16 +++++++++++-----
 2 files changed, 25 insertions(+), 5 deletions(-)

diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ProducerSemaphoreTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ProducerSemaphoreTest.java
index c719cbd..0c7f4b1 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ProducerSemaphoreTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ProducerSemaphoreTest.java
@@ -75,11 +75,13 @@ public class ProducerSemaphoreTest extends 
ProducerConsumerBase {
                 futures.add(producer.newMessage().value(("Semaphore-test-" + 
i).getBytes()).sendAsync());
             }
             
Assert.assertEquals(producer.getSemaphore().get().availablePermits(), 
pendingQueueSize - messages);
+            Assert.assertFalse(producer.isErrorStat());
         } finally {
             producer.getClientCnx().channel().config().setAutoRead(true);
         }
         FutureUtil.waitForAll(futures).get();
         Assert.assertEquals(producer.getSemaphore().get().availablePermits(), 
pendingQueueSize);
+        Assert.assertFalse(producer.isErrorStat());
         futures.clear();
 
         // Simulate replicator, non batching message but `numMessagesInBatch` 
of message metadata > 1
@@ -92,15 +94,18 @@ public class ProducerSemaphoreTest extends 
ProducerConsumerBase {
                 futures.add(producer.sendAsync(msg));
             }
             
Assert.assertEquals(producer.getSemaphore().get().availablePermits(), 
pendingQueueSize - messages/2);
+            Assert.assertFalse(producer.isErrorStat());
         } finally {
             producer.getClientCnx().channel().config().setAutoRead(true);
         }
         FutureUtil.waitForAll(futures).get();
         Assert.assertEquals(producer.getSemaphore().get().availablePermits(), 
pendingQueueSize);
+        Assert.assertFalse(producer.isErrorStat());
         futures.clear();
 
         // Here must ensure that the semaphore available permits is 0
         Assert.assertEquals(producer.getSemaphore().get().availablePermits(), 
pendingQueueSize);
+        Assert.assertFalse(producer.isErrorStat());
 
         // Acquire 5 and not wait the send ack call back
         producer.getClientCnx().channel().config().setAutoRead(false);
@@ -111,12 +116,14 @@ public class ProducerSemaphoreTest extends 
ProducerConsumerBase {
 
             // Here must ensure that the Semaphore a acquired 5
             
Assert.assertEquals(producer.getSemaphore().get().availablePermits(), 
pendingQueueSize - messages / 2);
+            Assert.assertFalse(producer.isErrorStat());
         } finally {
             producer.getClientCnx().channel().config().setAutoRead(true);
 
         }
         FutureUtil.waitForAll(futures).get();
         Assert.assertEquals(producer.getSemaphore().get().availablePermits(), 
pendingQueueSize);
+        Assert.assertFalse(producer.isErrorStat());
     }
 
     /**
@@ -141,6 +148,7 @@ public class ProducerSemaphoreTest extends 
ProducerConsumerBase {
         // Test that when we fill the queue with "replicator" messages, we are 
notified
         // (replicator itself would block)
         Assert.assertEquals(producer.getSemaphore().get().availablePermits(), 
pendingQueueSize);
+        Assert.assertFalse(producer.isErrorStat());
         producer.getClientCnx().channel().config().setAutoRead(false);
         try {
             for (int i = 0; i < pendingQueueSize; i++) {
@@ -151,6 +159,7 @@ public class ProducerSemaphoreTest extends 
ProducerConsumerBase {
                 futures.add(producer.sendAsync(msg));
             }
             
Assert.assertEquals(producer.getSemaphore().get().availablePermits(), 0);
+            Assert.assertFalse(producer.isErrorStat());
             try {
                 MessageMetadata metadata = new MessageMetadata()
                         .setNumMessagesInBatch(10);
@@ -162,6 +171,7 @@ public class ProducerSemaphoreTest extends 
ProducerConsumerBase {
                 Assert.assertEquals(ee.getCause().getClass(),
                                     
PulsarClientException.ProducerQueueIsFullError.class);
                 
Assert.assertEquals(producer.getSemaphore().get().availablePermits(), 0);
+                Assert.assertFalse(producer.isErrorStat());
             }
         } finally {
             producer.getClientCnx().channel().config().setAutoRead(true);
@@ -171,12 +181,14 @@ public class ProducerSemaphoreTest extends 
ProducerConsumerBase {
 
         // Test that when we fill the queue with normal messages, we get an 
error
         Assert.assertEquals(producer.getSemaphore().get().availablePermits(), 
pendingQueueSize);
+        Assert.assertFalse(producer.isErrorStat());
         producer.getClientCnx().channel().config().setAutoRead(false);
         try {
             for (int i = 0; i < pendingQueueSize; i++) {
                 futures.add(producer.newMessage().value(("Semaphore-test-" + 
i).getBytes()).sendAsync());
             }
             
Assert.assertEquals(producer.getSemaphore().get().availablePermits(), 0);
+            Assert.assertFalse(producer.isErrorStat());
 
             try {
                 
producer.newMessage().value(("Semaphore-test-Q-full").getBytes()).sendAsync().get();
@@ -184,6 +196,7 @@ public class ProducerSemaphoreTest extends 
ProducerConsumerBase {
                 Assert.assertEquals(ee.getCause().getClass(),
                                     
PulsarClientException.ProducerQueueIsFullError.class);
                 
Assert.assertEquals(producer.getSemaphore().get().availablePermits(), 0);
+                Assert.assertFalse(producer.isErrorStat());
 
             }
         } finally {
@@ -191,5 +204,6 @@ public class ProducerSemaphoreTest extends 
ProducerConsumerBase {
         }
         FutureUtil.waitForAll(futures).get();
         Assert.assertEquals(producer.getSemaphore().get().availablePermits(), 
pendingQueueSize);
+        Assert.assertFalse(producer.isErrorStat());
     }
 }
diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java
index f5165d7..46649bd 100644
--- 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java
+++ 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java
@@ -266,11 +266,12 @@ public class ProducerImpl<T> extends ProducerBase<T> 
implements TimerTask, Conne
     protected void semaphoreRelease(final int releaseCountRequest) {
         if (semaphore.isPresent()) {
             if (!errorState) {
-                final int availablePermits = 
semaphore.get().availablePermits();
-                if (availablePermits - releaseCountRequest < 0) {
-                    log.error("Semaphore permit release count request greater 
then availablePermits" +
-                                    " : availablePermits={}, 
releaseCountRequest={}",
-                            availablePermits, releaseCountRequest);
+                final int availableReleasePermits =
+                        conf.getMaxPendingMessages() - 
this.semaphore.get().availablePermits();
+                if (availableReleasePermits - releaseCountRequest < 0) {
+                    log.error("Semaphore permit release count request greater 
then availableReleasePermits" +
+                                    " : availableReleasePermits={}, 
releaseCountRequest={}",
+                            availableReleasePermits, releaseCountRequest);
                     errorState = true;
                 }
             }
@@ -2043,5 +2044,10 @@ public class ProducerImpl<T> extends ProducerBase<T> 
implements TimerTask, Conne
         return semaphore;
     }
 
+    @VisibleForTesting
+    boolean isErrorStat() {
+        return errorState;
+    }
+
     private static final Logger log = 
LoggerFactory.getLogger(ProducerImpl.class);
 }

Reply via email to