RobertIndie commented on code in PR #17795:
URL: https://github.com/apache/pulsar/pull/17795#discussion_r991784145
##########
pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java:
##########
@@ -519,35 +519,42 @@ public void sendAsync(Message<?> message, SendCallback
callback) {
// chunked message also sent individually so, try to acquire
send-permits
for (int i = 0; i < (totalChunks - 1); i++) {
- if (!canEnqueueRequest(callback, message.getSequenceId(), 0 /* The
memory was already reserved */)) {
+ if (!conf.isBlockIfQueueFull() && !canEnqueueRequest(callback,
message.getSequenceId(),
+ 0 /* The memory was already reserved */)) {
client.getMemoryLimitController().releaseMemory(uncompressedSize);
semaphoreRelease(i + 1);
return;
}
}
Review Comment:
@BewareMyPower Yes. Chunks are sent asynchronously one by one in the same
thread. But from your example, it seems that they are sent synchronously one by
one. My understanding is that there may be a case like this in your approach:
Chunk 0 before send: Acquire 1 semaphore and max(size...) memory
Chunk 1 before send:
// Here we already occupy 2 semaphores and (size0+size1) memory. It's not
the correct behavior.
Chunk 0 after send:
Chunk 1 after send: Release 1 semaphore and max(size...) memory.
...
This doesn't seem a correct behavior. Please correct me if I'm wrong.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]