merlimat commented on a change in pull request #12181:
URL: https://github.com/apache/pulsar/pull/12181#discussion_r718806516
##########
File path:
pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java
##########
@@ -1790,9 +1796,15 @@ private void batchMessageAndSend() {
}
} catch (PulsarClientException e) {
Thread.currentThread().interrupt();
- semaphore.ifPresent(s ->
s.release(batchMessageContainer.getNumMessagesInBatch()));
+ semaphore.ifPresent(s -> s.release(Math.min(
+ maxPermits - s.availablePermits(),
+ batchMessageContainer.getNumMessagesInBatch()))
Review comment:
This is just masking the problem. If the counts are out of sync, this
won't be helping either as it will not put them back in sync. We need to
identify the reason for why they're out of sync.
##########
File path:
pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java
##########
@@ -1790,9 +1796,15 @@ private void batchMessageAndSend() {
}
} catch (PulsarClientException e) {
Thread.currentThread().interrupt();
- semaphore.ifPresent(s ->
s.release(batchMessageContainer.getNumMessagesInBatch()));
+ semaphore.ifPresent(s -> s.release(Math.min(
+ maxPermits - s.availablePermits(),
+ batchMessageContainer.getNumMessagesInBatch()))
Review comment:
I don't think it limits the issue. Simply it's masking it by making the
semaphore useless, as in that the acquire/release are going to be completely
either always blocking or never blocking.
##########
File path:
pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java
##########
@@ -1790,9 +1796,15 @@ private void batchMessageAndSend() {
}
} catch (PulsarClientException e) {
Thread.currentThread().interrupt();
- semaphore.ifPresent(s ->
s.release(batchMessageContainer.getNumMessagesInBatch()));
+ semaphore.ifPresent(s -> s.release(Math.min(
+ maxPermits - s.availablePermits(),
+ batchMessageContainer.getNumMessagesInBatch()))
Review comment:
So, it's actually preferable to have an exception that identifies that
we have an issue (provided that we can restart from there) rather than just
masking the problem. That is, until a proper solution for the root cause is
implemented.
##########
File path:
pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java
##########
@@ -1790,9 +1796,15 @@ private void batchMessageAndSend() {
}
} catch (PulsarClientException e) {
Thread.currentThread().interrupt();
- semaphore.ifPresent(s ->
s.release(batchMessageContainer.getNumMessagesInBatch()));
+ semaphore.ifPresent(s -> s.release(Math.min(
+ maxPermits - s.availablePermits(),
+ batchMessageContainer.getNumMessagesInBatch()))
Review comment:
I don't think limiting the counter is a good idea. Making sure we
recover from the exception is a better approach.
##########
File path:
pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java
##########
@@ -1790,9 +1796,15 @@ private void batchMessageAndSend() {
}
} catch (PulsarClientException e) {
Thread.currentThread().interrupt();
- semaphore.ifPresent(s ->
s.release(batchMessageContainer.getNumMessagesInBatch()));
+ semaphore.ifPresent(s -> s.release(Math.min(
+ maxPermits - s.availablePermits(),
+ batchMessageContainer.getNumMessagesInBatch()))
Review comment:
The terminology is not correct here. The semaphore *is* bounded (it's
its only purpose): it only allows a number of acquire.
The problem is if we're trying to release it more times than it was
acquired, but that doesn't make it unbounded.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]