wenbingshen opened a new pull request, #21790:
URL: https://github.com/apache/pulsar/pull/21790

   ### Motivation
   
   When the modification of the current PR is rolled back, the following unit 
test will fail, and the main producer thread will be stuck forever in 
org.apache.pulsar.client.impl.MemoryLimitController#reserveMemory 
condition.await();
   
   ```java
    @Test(timeOut = 10_000)
       public void testProducerBlockReserveMemory() throws Exception {
           replacePulsarClient(PulsarClient.builder().
                   serviceUrl(lookupUrl.toString())
                   .memoryLimit(1, SizeUnit.KILO_BYTES));
           @Cleanup
           ProducerImpl<byte[]> producer = (ProducerImpl<byte[]>) 
pulsarClient.newProducer()
                   .topic("testProducerMemoryLimit")
                   .sendTimeout(5, TimeUnit.SECONDS)
                   .compressionType(CompressionType.SNAPPY)
                   .messageRoutingMode(MessageRoutingMode.RoundRobinPartition)
                   .maxPendingMessages(0)
                   .blockIfQueueFull(true)
                   .enableBatching(true)
                   .batchingMaxMessages(100)
                   .batchingMaxBytes(65536)
                   .batchingMaxPublishDelay(100, TimeUnit.MILLISECONDS)
                   .create();
           int msgCount = 5;
           CountDownLatch cdl = new CountDownLatch(msgCount);
           for (int i = 0; i < msgCount; i++) {
               
producer.sendAsync("memory-test".getBytes(StandardCharsets.UTF_8)).whenComplete(((messageId,
 throwable) -> {
                   cdl.countDown();
               }));
           }
   
           cdl.await();
   
           producer.close();
           PulsarClientImpl clientImpl = (PulsarClientImpl) this.pulsarClient;
           final MemoryLimitController memoryLimitController = 
clientImpl.getMemoryLimitController();
           Assert.assertEquals(memoryLimitController.currentUsage(), 0);
       }
   ```
   
   According to my investigation this is related to this PR #17936 
   
   1. When the producer turns on the following parameters and uses asynchronous 
sending:
                    .compressionType(CompressionType.SNAPPY)
                    .blockIfQueueFull(true)
                    .enableBatching(true)
   Set .memoryLimit(1, SizeUnit.KILO_BYTES)); small enough to make it easier to 
reproduce the problem.
   
   2. BatchMessageContainer will apply for a batchAllocatedSize from 
MemoryLimitController when building BatchMsgMetadata for the first time. At 
this time, MemoryLimitController.currentUsage=(msg payload size + 
batchedMessageMetadataAndPayload size),
   
   3. The main production thread continues to send data asynchronously, but at 
this time the MemoryLimitController has reached the memoryLimit limit, and the 
main thread will be stuck in condition.await(); waiting to wake up;
   4. When the batch message compression is completed, the 
BatchMessageContainer will call updateAndReserveBatchAllocatedSize again. At 
this time, the memory will be released to the MemoryLimitController to the 
actual batch message size, but it will not wake up those threads that are 
blocked and waiting due to insufficient memory.
   5. After the batch message is sent, call 
org.apache.pulsar.client.impl.MemoryLimitController#releaseMemory to release 
the batch size memory. However, since the size released by 
MemoryLimitController.currentUsage + is smaller than memoryLimit, no threads 
are awakened.
   
   6. The main production thread is always stuck in condition.await();
   
   
   
   ### Documentation
   
   - [ ] `doc` <!-- Your PR contains doc changes. -->
   - [ ] `doc-required` <!-- Your PR changes impact docs and you will update 
later -->
   - [x] `doc-not-needed` <!-- Your PR changes do not impact docs -->
   - [ ] `doc-complete` <!-- Docs have been already added -->


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to