RobertIndie commented on code in PR #17795:
URL: https://github.com/apache/pulsar/pull/17795#discussion_r991784145


##########
pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java:
##########
@@ -519,35 +519,42 @@ public void sendAsync(Message<?> message, SendCallback 
callback) {
 
         // chunked message also sent individually so, try to acquire 
send-permits
         for (int i = 0; i < (totalChunks - 1); i++) {
-            if (!canEnqueueRequest(callback, message.getSequenceId(), 0 /* The 
memory was already reserved */)) {
+            if (!conf.isBlockIfQueueFull() && !canEnqueueRequest(callback, 
message.getSequenceId(),
+                    0 /* The memory was already reserved */)) {
                 
client.getMemoryLimitController().releaseMemory(uncompressedSize);
                 semaphoreRelease(i + 1);
                 return;
             }
         }

Review Comment:
   @BewareMyPower Yes. Chunks are sent asynchronously one by one in the same 
thread. But from your example, it seems that they are sent synchronously one by 
one. My understanding is that there may be a case like this in your approach:
   
   Chunk 0 before send: Acquire 1 semaphore and max(size...) memory
   Chunk 1 before send: 
   // Here we already occupy 2 semaphores and (size0+size1) memory. It's not 
the correct behavior.
   Chunk 0 after send: 
   Chunk 1 after send: Release 1 semaphore and max(size...) memory.
   ...
   
   Please correct if I'm wrong.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to