This is an automated email from the ASF dual-hosted git repository. penghui pushed a commit to branch branch-2.8 in repository https://gitbox.apache.org/repos/asf/pulsar.git
commit 4f87667881065f8a5d039fc78536c821ae907df9 Author: Ali Ahmed <[email protected]> AuthorDate: Wed Oct 20 13:16:24 2021 -0700 Add log error tracking for semaphore count leak (#12410) Co-authored-by: Ali Ahmed <[email protected]> (cherry picked from commit 7c219b11966d4eb8cc20111468c3439d23f8777c) --- .../apache/pulsar/client/impl/ProducerImpl.java | 31 +++++++++++++++++----- 1 file changed, 24 insertions(+), 7 deletions(-) diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java index bbf75ee..1bd0977 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java @@ -140,6 +140,8 @@ public class ProducerImpl<T> extends ProducerBase<T> implements TimerTask, Conne private Optional<Long> topicEpoch = Optional.empty(); + private boolean errorState; + @SuppressWarnings("rawtypes") private static final AtomicLongFieldUpdater<ProducerImpl> msgIdGeneratorUpdater = AtomicLongFieldUpdater .newUpdater(ProducerImpl.class, "msgIdGenerator"); @@ -250,6 +252,21 @@ public class ProducerImpl<T> extends ProducerBase<T> implements TimerTask, Conne grabCnx(); } + protected void semaphoreRelease(final int releaseCountRequest) { + if (semaphore.isPresent()) { + if (!errorState) { + final int availablePermits = semaphore.get().availablePermits(); + if (availablePermits - releaseCountRequest < 0) { + log.error("Semaphore permit release count request greater then availablePermits" + + " : availablePermits={}, releaseCountRequest={}", + availablePermits, releaseCountRequest); + errorState = true; + } + } + semaphore.get().release(releaseCountRequest); + } + } + protected OpSendMsgQueue createPendingMessagesQueue() { return new OpSendMsgQueue(); } @@ -1012,9 +1029,9 @@ public class ProducerImpl<T> extends ProducerBase<T> implements TimerTask, Conne } private void releaseSemaphoreForSendOp(OpSendMsg op) { - if (semaphore.isPresent()) { - semaphore.get().release(isBatchMessagingEnabled() ? op.numMessagesInBatch : 1); - } + + semaphoreRelease(isBatchMessagingEnabled() ? op.numMessagesInBatch : 1); + client.getMemoryLimitController().releaseMemory(op.uncompressedSize); } @@ -1731,7 +1748,7 @@ public class ProducerImpl<T> extends ProducerBase<T> implements TimerTask, Conne }); pendingMessages.clear(); - semaphore.ifPresent(s -> s.release(releaseCount.get())); + semaphoreRelease(releaseCount.get()); if (batchMessagingEnabled) { failPendingBatchMessages(ex); } @@ -1757,7 +1774,7 @@ public class ProducerImpl<T> extends ProducerBase<T> implements TimerTask, Conne } final int numMessagesInBatch = batchMessageContainer.getNumMessagesInBatch(); batchMessageContainer.discard(ex); - semaphore.ifPresent(s -> s.release(numMessagesInBatch)); + semaphoreRelease(numMessagesInBatch); } @Override @@ -1800,9 +1817,9 @@ public class ProducerImpl<T> extends ProducerBase<T> implements TimerTask, Conne processOpSendMsg(opSendMsg); } } catch (PulsarClientException e) { - semaphore.ifPresent(s -> s.release(batchMessageContainer.getNumMessagesInBatch())); + semaphoreRelease(batchMessageContainer.getNumMessagesInBatch()); } catch (Throwable t) { - semaphore.ifPresent(s -> s.release(batchMessageContainer.getNumMessagesInBatch())); + semaphoreRelease(batchMessageContainer.getNumMessagesInBatch()); log.warn("[{}] [{}] error while create opSendMsg by batch message container", topic, producerName, t); } }
