merlimat commented on a change in pull request #12181:
URL: https://github.com/apache/pulsar/pull/12181#discussion_r718806516



##########
File path: 
pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java
##########
@@ -1790,9 +1796,15 @@ private void batchMessageAndSend() {
                 }
             } catch (PulsarClientException e) {
                 Thread.currentThread().interrupt();
-                semaphore.ifPresent(s -> 
s.release(batchMessageContainer.getNumMessagesInBatch()));
+                semaphore.ifPresent(s -> s.release(Math.min(
+                        maxPermits - s.availablePermits(),
+                        batchMessageContainer.getNumMessagesInBatch()))

Review comment:
       This is just masking the problem. If the counts are out of sync, this 
won't be helping either as it will not put them back in sync. We need to 
identify the reason for why they're out of sync.

##########
File path: 
pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java
##########
@@ -1790,9 +1796,15 @@ private void batchMessageAndSend() {
                 }
             } catch (PulsarClientException e) {
                 Thread.currentThread().interrupt();
-                semaphore.ifPresent(s -> 
s.release(batchMessageContainer.getNumMessagesInBatch()));
+                semaphore.ifPresent(s -> s.release(Math.min(
+                        maxPermits - s.availablePermits(),
+                        batchMessageContainer.getNumMessagesInBatch()))

Review comment:
       I don't think it limits the issue. Simply it's masking it by making the 
semaphore useless, as in that the acquire/release are going to be completely 
either always blocking or never blocking. 

##########
File path: 
pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java
##########
@@ -1790,9 +1796,15 @@ private void batchMessageAndSend() {
                 }
             } catch (PulsarClientException e) {
                 Thread.currentThread().interrupt();
-                semaphore.ifPresent(s -> 
s.release(batchMessageContainer.getNumMessagesInBatch()));
+                semaphore.ifPresent(s -> s.release(Math.min(
+                        maxPermits - s.availablePermits(),
+                        batchMessageContainer.getNumMessagesInBatch()))

Review comment:
       So, it's actually preferable to have an exception that identifies that 
we have an issue (provided that we can restart from there) rather than just 
masking the problem. That is, until a proper solution for the root cause is 
implemented.

##########
File path: 
pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java
##########
@@ -1790,9 +1796,15 @@ private void batchMessageAndSend() {
                 }
             } catch (PulsarClientException e) {
                 Thread.currentThread().interrupt();
-                semaphore.ifPresent(s -> 
s.release(batchMessageContainer.getNumMessagesInBatch()));
+                semaphore.ifPresent(s -> s.release(Math.min(
+                        maxPermits - s.availablePermits(),
+                        batchMessageContainer.getNumMessagesInBatch()))

Review comment:
       I don't think limiting the counter is a good idea. Making sure we 
recover from the exception is a better approach. 

##########
File path: 
pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java
##########
@@ -1790,9 +1796,15 @@ private void batchMessageAndSend() {
                 }
             } catch (PulsarClientException e) {
                 Thread.currentThread().interrupt();
-                semaphore.ifPresent(s -> 
s.release(batchMessageContainer.getNumMessagesInBatch()));
+                semaphore.ifPresent(s -> s.release(Math.min(
+                        maxPermits - s.availablePermits(),
+                        batchMessageContainer.getNumMessagesInBatch()))

Review comment:
       The terminology is not correct here. The semaphore *is* bounded (it's 
its only purpose): it only allows a number of acquire.
   
   The problem is if we're trying to release it more times than it was 
acquired, but that doesn't make it unbounded.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to