wenbingshen opened a new pull request, #21790:
URL: https://github.com/apache/pulsar/pull/21790
### Motivation
When the modification of the current PR is rolled back, the following unit
test will fail, and the main producer thread will be stuck forever in
org.apache.pulsar.client.impl.MemoryLimitController#reserveMemory
condition.await();
```java
@Test(timeOut = 10_000)
public void testProducerBlockReserveMemory() throws Exception {
replacePulsarClient(PulsarClient.builder().
serviceUrl(lookupUrl.toString())
.memoryLimit(1, SizeUnit.KILO_BYTES));
@Cleanup
ProducerImpl<byte[]> producer = (ProducerImpl<byte[]>)
pulsarClient.newProducer()
.topic("testProducerMemoryLimit")
.sendTimeout(5, TimeUnit.SECONDS)
.compressionType(CompressionType.SNAPPY)
.messageRoutingMode(MessageRoutingMode.RoundRobinPartition)
.maxPendingMessages(0)
.blockIfQueueFull(true)
.enableBatching(true)
.batchingMaxMessages(100)
.batchingMaxBytes(65536)
.batchingMaxPublishDelay(100, TimeUnit.MILLISECONDS)
.create();
int msgCount = 5;
CountDownLatch cdl = new CountDownLatch(msgCount);
for (int i = 0; i < msgCount; i++) {
producer.sendAsync("memory-test".getBytes(StandardCharsets.UTF_8)).whenComplete(((messageId,
throwable) -> {
cdl.countDown();
}));
}
cdl.await();
producer.close();
PulsarClientImpl clientImpl = (PulsarClientImpl) this.pulsarClient;
final MemoryLimitController memoryLimitController =
clientImpl.getMemoryLimitController();
Assert.assertEquals(memoryLimitController.currentUsage(), 0);
}
```
According to my investigation this is related to this PR #17936
1. When the producer turns on the following parameters and uses asynchronous
sending:
.compressionType(CompressionType.SNAPPY)
.blockIfQueueFull(true)
.enableBatching(true)
Set .memoryLimit(1, SizeUnit.KILO_BYTES)); small enough to make it easier to
reproduce the problem.
2. BatchMessageContainer will apply for a batchAllocatedSize from
MemoryLimitController when building BatchMsgMetadata for the first time. At
this time, MemoryLimitController.currentUsage=(msg payload size +
batchedMessageMetadataAndPayload size),
3. The main production thread continues to send data asynchronously, but at
this time the MemoryLimitController has reached the memoryLimit limit, and the
main thread will be stuck in condition.await(); waiting to wake up;
4. When the batch message compression is completed, the
BatchMessageContainer will call updateAndReserveBatchAllocatedSize again. At
this time, the memory will be released to the MemoryLimitController to the
actual batch message size, but it will not wake up those threads that are
blocked and waiting due to insufficient memory.
5. After the batch message is sent, call
org.apache.pulsar.client.impl.MemoryLimitController#releaseMemory to release
the batch size memory. However, since the size released by
MemoryLimitController.currentUsage + is smaller than memoryLimit, no threads
are awakened.
6. The main production thread is always stuck in condition.await();
### Documentation
- [ ] `doc` <!-- Your PR contains doc changes. -->
- [ ] `doc-required` <!-- Your PR changes impact docs and you will update
later -->
- [x] `doc-not-needed` <!-- Your PR changes do not impact docs -->
- [ ] `doc-complete` <!-- Docs have been already added -->
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]