This is an automated email from the ASF dual-hosted git repository.

penghui pushed a commit to branch branch-2.8
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit 4f87667881065f8a5d039fc78536c821ae907df9
Author: Ali Ahmed <[email protected]>
AuthorDate: Wed Oct 20 13:16:24 2021 -0700

    Add log error tracking for semaphore count leak (#12410)
    
    Co-authored-by: Ali Ahmed <[email protected]>
    (cherry picked from commit 7c219b11966d4eb8cc20111468c3439d23f8777c)
---
 .../apache/pulsar/client/impl/ProducerImpl.java    | 31 +++++++++++++++++-----
 1 file changed, 24 insertions(+), 7 deletions(-)

diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java
index bbf75ee..1bd0977 100644
--- 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java
+++ 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java
@@ -140,6 +140,8 @@ public class ProducerImpl<T> extends ProducerBase<T> 
implements TimerTask, Conne
 
     private Optional<Long> topicEpoch = Optional.empty();
 
+    private boolean errorState;
+
     @SuppressWarnings("rawtypes")
     private static final AtomicLongFieldUpdater<ProducerImpl> 
msgIdGeneratorUpdater = AtomicLongFieldUpdater
             .newUpdater(ProducerImpl.class, "msgIdGenerator");
@@ -250,6 +252,21 @@ public class ProducerImpl<T> extends ProducerBase<T> 
implements TimerTask, Conne
         grabCnx();
     }
 
+    protected void semaphoreRelease(final int releaseCountRequest) {
+        if (semaphore.isPresent()) {
+            if (!errorState) {
+                final int availablePermits = 
semaphore.get().availablePermits();
+                if (availablePermits - releaseCountRequest < 0) {
+                    log.error("Semaphore permit release count request greater 
then availablePermits" +
+                                    " : availablePermits={}, 
releaseCountRequest={}",
+                            availablePermits, releaseCountRequest);
+                    errorState = true;
+                }
+            }
+            semaphore.get().release(releaseCountRequest);
+        }
+    }
+
     protected OpSendMsgQueue createPendingMessagesQueue() {
         return new OpSendMsgQueue();
     }
@@ -1012,9 +1029,9 @@ public class ProducerImpl<T> extends ProducerBase<T> 
implements TimerTask, Conne
     }
 
     private void releaseSemaphoreForSendOp(OpSendMsg op) {
-        if (semaphore.isPresent()) {
-            semaphore.get().release(isBatchMessagingEnabled() ? 
op.numMessagesInBatch : 1);
-        }
+
+        semaphoreRelease(isBatchMessagingEnabled() ? op.numMessagesInBatch : 
1);
+
         client.getMemoryLimitController().releaseMemory(op.uncompressedSize);
     }
 
@@ -1731,7 +1748,7 @@ public class ProducerImpl<T> extends ProducerBase<T> 
implements TimerTask, Conne
             });
 
             pendingMessages.clear();
-            semaphore.ifPresent(s -> s.release(releaseCount.get()));
+            semaphoreRelease(releaseCount.get());
             if (batchMessagingEnabled) {
                 failPendingBatchMessages(ex);
             }
@@ -1757,7 +1774,7 @@ public class ProducerImpl<T> extends ProducerBase<T> 
implements TimerTask, Conne
         }
         final int numMessagesInBatch = 
batchMessageContainer.getNumMessagesInBatch();
         batchMessageContainer.discard(ex);
-        semaphore.ifPresent(s -> s.release(numMessagesInBatch));
+        semaphoreRelease(numMessagesInBatch);
     }
 
     @Override
@@ -1800,9 +1817,9 @@ public class ProducerImpl<T> extends ProducerBase<T> 
implements TimerTask, Conne
                     processOpSendMsg(opSendMsg);
                 }
             } catch (PulsarClientException e) {
-                semaphore.ifPresent(s -> 
s.release(batchMessageContainer.getNumMessagesInBatch()));
+                
semaphoreRelease(batchMessageContainer.getNumMessagesInBatch());
             } catch (Throwable t) {
-                semaphore.ifPresent(s -> 
s.release(batchMessageContainer.getNumMessagesInBatch()));
+                
semaphoreRelease(batchMessageContainer.getNumMessagesInBatch());
                 log.warn("[{}] [{}] error while create opSendMsg by batch 
message container", topic, producerName, t);
             }
         }

Reply via email to