This is an automated email from the ASF dual-hosted git repository.

eolivelli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 916b61d  [Java Client] fixed Producer semaphore permit release issue 
(#13682)
916b61d is described below

commit 916b61d3a497c44c8511347d2c4eac95c53f8506
Author: Qiang Zhao <[email protected]>
AuthorDate: Mon Jan 10 19:27:26 2022 +0800

    [Java Client] fixed Producer semaphore permit release issue (#13682)
---
 .../apache/pulsar/client/impl/ProducerSemaphoreTest.java | 14 ++++++++++++++
 .../java/org/apache/pulsar/client/impl/ProducerImpl.java | 16 +++++++++++-----
 2 files changed, 25 insertions(+), 5 deletions(-)

diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ProducerSemaphoreTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ProducerSemaphoreTest.java
index 4e93124..78fc659 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ProducerSemaphoreTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ProducerSemaphoreTest.java
@@ -72,11 +72,13 @@ public class ProducerSemaphoreTest extends 
ProducerConsumerBase {
                 futures.add(producer.newMessage().value(("Semaphore-test-" + 
i).getBytes()).sendAsync());
             }
             
Assert.assertEquals(producer.getSemaphore().get().availablePermits(), 
pendingQueueSize - messages);
+            Assert.assertFalse(producer.isErrorStat());
         } finally {
             producer.getClientCnx().channel().config().setAutoRead(true);
         }
         FutureUtil.waitForAll(futures).get();
         Assert.assertEquals(producer.getSemaphore().get().availablePermits(), 
pendingQueueSize);
+        Assert.assertFalse(producer.isErrorStat());
         futures.clear();
 
         // Simulate replicator, non batching message but `numMessagesInBatch` 
of message metadata > 1
@@ -89,15 +91,18 @@ public class ProducerSemaphoreTest extends 
ProducerConsumerBase {
                 futures.add(producer.sendAsync(msg));
             }
             
Assert.assertEquals(producer.getSemaphore().get().availablePermits(), 
pendingQueueSize - messages/2);
+            Assert.assertFalse(producer.isErrorStat());
         } finally {
             producer.getClientCnx().channel().config().setAutoRead(true);
         }
         FutureUtil.waitForAll(futures).get();
         Assert.assertEquals(producer.getSemaphore().get().availablePermits(), 
pendingQueueSize);
+        Assert.assertFalse(producer.isErrorStat());
         futures.clear();
 
         // Here must ensure that the semaphore available permits is 0
         Assert.assertEquals(producer.getSemaphore().get().availablePermits(), 
pendingQueueSize);
+        Assert.assertFalse(producer.isErrorStat());
 
         // Acquire 5 and not wait the send ack call back
         producer.getClientCnx().channel().config().setAutoRead(false);
@@ -108,12 +113,14 @@ public class ProducerSemaphoreTest extends 
ProducerConsumerBase {
 
             // Here must ensure that the Semaphore a acquired 5
             
Assert.assertEquals(producer.getSemaphore().get().availablePermits(), 
pendingQueueSize - messages / 2);
+            Assert.assertFalse(producer.isErrorStat());
         } finally {
             producer.getClientCnx().channel().config().setAutoRead(true);
 
         }
         FutureUtil.waitForAll(futures).get();
         Assert.assertEquals(producer.getSemaphore().get().availablePermits(), 
pendingQueueSize);
+        Assert.assertFalse(producer.isErrorStat());
     }
 
     /**
@@ -138,6 +145,7 @@ public class ProducerSemaphoreTest extends 
ProducerConsumerBase {
         // Test that when we fill the queue with "replicator" messages, we are 
notified
         // (replicator itself would block)
         Assert.assertEquals(producer.getSemaphore().get().availablePermits(), 
pendingQueueSize);
+        Assert.assertFalse(producer.isErrorStat());
         producer.getClientCnx().channel().config().setAutoRead(false);
         try {
             for (int i = 0; i < pendingQueueSize; i++) {
@@ -148,6 +156,7 @@ public class ProducerSemaphoreTest extends 
ProducerConsumerBase {
                 futures.add(producer.sendAsync(msg));
             }
             
Assert.assertEquals(producer.getSemaphore().get().availablePermits(), 0);
+            Assert.assertFalse(producer.isErrorStat());
             try {
                 MessageMetadata metadata = new MessageMetadata()
                         .setNumMessagesInBatch(10);
@@ -159,6 +168,7 @@ public class ProducerSemaphoreTest extends 
ProducerConsumerBase {
                 Assert.assertEquals(ee.getCause().getClass(),
                                     
PulsarClientException.ProducerQueueIsFullError.class);
                 
Assert.assertEquals(producer.getSemaphore().get().availablePermits(), 0);
+                Assert.assertFalse(producer.isErrorStat());
             }
         } finally {
             producer.getClientCnx().channel().config().setAutoRead(true);
@@ -168,12 +178,14 @@ public class ProducerSemaphoreTest extends 
ProducerConsumerBase {
 
         // Test that when we fill the queue with normal messages, we get an 
error
         Assert.assertEquals(producer.getSemaphore().get().availablePermits(), 
pendingQueueSize);
+        Assert.assertFalse(producer.isErrorStat());
         producer.getClientCnx().channel().config().setAutoRead(false);
         try {
             for (int i = 0; i < pendingQueueSize; i++) {
                 futures.add(producer.newMessage().value(("Semaphore-test-" + 
i).getBytes()).sendAsync());
             }
             
Assert.assertEquals(producer.getSemaphore().get().availablePermits(), 0);
+            Assert.assertFalse(producer.isErrorStat());
 
             try {
                 
producer.newMessage().value(("Semaphore-test-Q-full").getBytes()).sendAsync().get();
@@ -181,6 +193,7 @@ public class ProducerSemaphoreTest extends 
ProducerConsumerBase {
                 Assert.assertEquals(ee.getCause().getClass(),
                                     
PulsarClientException.ProducerQueueIsFullError.class);
                 
Assert.assertEquals(producer.getSemaphore().get().availablePermits(), 0);
+                Assert.assertFalse(producer.isErrorStat());
 
             }
         } finally {
@@ -188,5 +201,6 @@ public class ProducerSemaphoreTest extends 
ProducerConsumerBase {
         }
         FutureUtil.waitForAll(futures).get();
         Assert.assertEquals(producer.getSemaphore().get().availablePermits(), 
pendingQueueSize);
+        Assert.assertFalse(producer.isErrorStat());
     }
 }
diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java
index cf9a234..2f630ba 100644
--- 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java
+++ 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java
@@ -266,11 +266,12 @@ public class ProducerImpl<T> extends ProducerBase<T> 
implements TimerTask, Conne
     protected void semaphoreRelease(final int releaseCountRequest) {
         if (semaphore.isPresent()) {
             if (!errorState) {
-                final int availablePermits = 
semaphore.get().availablePermits();
-                if (availablePermits - releaseCountRequest < 0) {
-                    log.error("Semaphore permit release count request greater 
then availablePermits" +
-                                    " : availablePermits={}, 
releaseCountRequest={}",
-                            availablePermits, releaseCountRequest);
+                final int availableReleasePermits =
+                        conf.getMaxPendingMessages() - 
this.semaphore.get().availablePermits();
+                if (availableReleasePermits - releaseCountRequest < 0) {
+                    log.error("Semaphore permit release count request greater 
then availableReleasePermits" +
+                                    " : availableReleasePermits={}, 
releaseCountRequest={}",
+                            availableReleasePermits, releaseCountRequest);
                     errorState = true;
                 }
             }
@@ -2107,5 +2108,10 @@ public class ProducerImpl<T> extends ProducerBase<T> 
implements TimerTask, Conne
         return semaphore;
     }
 
+    @VisibleForTesting
+    boolean isErrorStat() {
+        return errorState;
+    }
+
     private static final Logger log = 
LoggerFactory.getLogger(ProducerImpl.class);
 }

Reply via email to