aahmed-se commented on a change in pull request #12181:
URL: https://github.com/apache/pulsar/pull/12181#discussion_r718786230



##########
File path: 
pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java
##########
@@ -1018,14 +1021,17 @@ private long getHighestSequenceId(OpSendMsg op) {
     }
 
     private void releaseSemaphoreForSendOp(OpSendMsg op) {
-        if (semaphore.isPresent()) {
-            semaphore.get().release(isBatchMessagingEnabled() ? 
op.numMessagesInBatch : 1);
-        }
+        semaphore.ifPresent(semaphore -> semaphore.release(
+                Math.min(
+                        maxPermits - semaphore.availablePermits(),

Review comment:
       We are not trying to  solve all of the issues here, this is a simple 
overflow protection fix.
   The root cause still has to be figured out but this is simpler alternative 
to implementing a bounded semaphore.

##########
File path: 
pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java
##########
@@ -1790,9 +1796,15 @@ private void batchMessageAndSend() {
                 }
             } catch (PulsarClientException e) {
                 Thread.currentThread().interrupt();
-                semaphore.ifPresent(s -> 
s.release(batchMessageContainer.getNumMessagesInBatch()));
+                semaphore.ifPresent(s -> s.release(Math.min(
+                        maxPermits - s.availablePermits(),
+                        batchMessageContainer.getNumMessagesInBatch()))

Review comment:
       This limits this issue https://github.com/apache/pulsar/issues/12151 
happening in prod. Since we implementing a max limit. 

##########
File path: 
pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java
##########
@@ -1790,9 +1796,15 @@ private void batchMessageAndSend() {
                 }
             } catch (PulsarClientException e) {
                 Thread.currentThread().interrupt();
-                semaphore.ifPresent(s -> 
s.release(batchMessageContainer.getNumMessagesInBatch()));
+                semaphore.ifPresent(s -> s.release(Math.min(
+                        maxPermits - s.availablePermits(),
+                        batchMessageContainer.getNumMessagesInBatch()))

Review comment:
       Let's limit the count but add a logger , that will prevent the blow up 
in production but still record the issue.

##########
File path: 
pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java
##########
@@ -1790,9 +1796,15 @@ private void batchMessageAndSend() {
                 }
             } catch (PulsarClientException e) {
                 Thread.currentThread().interrupt();
-                semaphore.ifPresent(s -> 
s.release(batchMessageContainer.getNumMessagesInBatch()));
+                semaphore.ifPresent(s -> s.release(Math.min(
+                        maxPermits - s.availablePermits(),
+                        batchMessageContainer.getNumMessagesInBatch()))

Review comment:
       Let's limit the count but add a logger, that will prevent the blow up in 
production but still record the issue.

##########
File path: 
pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java
##########
@@ -1790,9 +1796,15 @@ private void batchMessageAndSend() {
                 }
             } catch (PulsarClientException e) {
                 Thread.currentThread().interrupt();
-                semaphore.ifPresent(s -> 
s.release(batchMessageContainer.getNumMessagesInBatch()));
+                semaphore.ifPresent(s -> s.release(Math.min(
+                        maxPermits - s.availablePermits(),
+                        batchMessageContainer.getNumMessagesInBatch()))

Review comment:
       creating a bounded semaphore seems to be a popular approach to safeguard 
production that's what I tried to emulate.
   
https://github.com/googleapis/gax-java/blob/49efdc38f2d6ca3838dcecee63dc6cd07f6c2b23/gax/src/main/java/com/google/api/gax/batching/BlockingSemaphore.java

##########
File path: 
pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java
##########
@@ -1790,9 +1796,15 @@ private void batchMessageAndSend() {
                 }
             } catch (PulsarClientException e) {
                 Thread.currentThread().interrupt();
-                semaphore.ifPresent(s -> 
s.release(batchMessageContainer.getNumMessagesInBatch()));
+                semaphore.ifPresent(s -> s.release(Math.min(
+                        maxPermits - s.availablePermits(),
+                        batchMessageContainer.getNumMessagesInBatch()))

Review comment:
       I try and limit the total number of permits that can be held. Regardless 
of release requests.
   
   It's similar to this 
https://github.com/googleapis/gax-java/blob/49efdc38f2/gax/src/main/java/com/google/api/gax/batching/NonBlockingSemaphore.java
 but I am not throwing any exceptions but silently disgarding the requests. I 
think logging the makes sense though.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to