RobertIndie commented on code in PR #17795:
URL: https://github.com/apache/pulsar/pull/17795#discussion_r991779036
##########
pulsar-broker/src/test/java/org/apache/pulsar/client/impl/MessageChunkingTest.java:
##########
@@ -573,6 +574,55 @@ public void testChunkSize() throws Exception {
}
}
+ @Test
+ public void testBlockIfQueueFUllWhenChunking() throws Exception {
+ this.conf.setMaxMessageSize(50);
+
+ @Cleanup
+ final Producer<String> producer =
pulsarClient.newProducer(Schema.STRING)
+ .topic("my-property/my-ns/test-chunk-size")
+ .enableChunking(true)
+ .enableBatching(false)
+ .blockIfQueueFull(true)
+ .maxPendingMessages(3)
+ .create();
+
+ // Test sending large message (totalChunks > maxPendingMessages)
should not cause deadlock
+ // We need to use a separate thread to send the message instead of
using the sendAsync, because the deadlock
+ // might happen before publishing messages to the broker.
+ CompletableFuture<Void> sendMsg = CompletableFuture.runAsync(() -> {
+ try {
+ producer.send(createMessagePayload(200));
+ } catch (PulsarClientException e) {
+ throw new RuntimeException(e);
+ }
+ });
+ try {
+ sendMsg.get(5, TimeUnit.SECONDS);
+ } catch (TimeoutException e) {
+ Assert.fail("Deadlock detected when sending large message.");
+ }
+
+ // Test sending multiple large messages (For every message,
totalChunks < maxPendingMessages) concurrently
+ // should not cause the deadlock.
+ List<CompletableFuture<Void>> sendMsgFutures = Lists.newArrayList();
Review Comment:
It's already imported. The compilation was fine in my repo:
https://github.com/RobertIndie/pulsar/pull/5 I am investigating the CI problem.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]