BewareMyPower commented on code in PR #17795:
URL: https://github.com/apache/pulsar/pull/17795#discussion_r991522490


##########
pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java:
##########
@@ -519,35 +519,42 @@ public void sendAsync(Message<?> message, SendCallback 
callback) {
 
         // chunked message also sent individually so, try to acquire 
send-permits
         for (int i = 0; i < (totalChunks - 1); i++) {
-            if (!canEnqueueRequest(callback, message.getSequenceId(), 0 /* The 
memory was already reserved */)) {
+            if (!conf.isBlockIfQueueFull() && !canEnqueueRequest(callback, 
message.getSequenceId(),
+                    0 /* The memory was already reserved */)) {
                 
client.getMemoryLimitController().releaseMemory(uncompressedSize);
                 semaphoreRelease(i + 1);
                 return;
             }
         }

Review Comment:
   The root cause is that for a chunked message with N chunks, since chunks are 
sent asynchronously one by one in the same thread, the time order will be:
   
   1. Chunk 0 before send: Acquire 1 semaphore and `size0` memory.
   2. Chunk 0 after send: Release 1 semaphore and `size0` memory.
   3. Chunk 1 before send: Acquire 1 semaphore and `size1` memory.
   4. Chunk 1 after send: Release 1 semaphore and `size1` memory. 
   5. ...
   
   In this case, we don't need N spots from the semaphore and `sum(size0, 
size1, ...)` bytes from the memory limiter. Instead, we only need 1 spot from 
the semaphore and `max(size0, size1, ...)` bytes from the memory limiter.
   
   Therefore, we should not call `canEnqueueRequest` for chunk 1 and later, no 
matter if BlockIfQueueFUll is enabled. Instead, we should call 
`canEnqueueRequest` in each iteration in the loop.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to