This is an automated email from the ASF dual-hosted git repository. penghui pushed a commit to branch branch-2.8 in repository https://gitbox.apache.org/repos/asf/pulsar.git
commit ce9fe54441aad610f530050e4649a513ed3a62d5 Author: wuxuanqicn <[email protected]> AuthorDate: Sat Mar 12 13:50:05 2022 +0800 Fixed flaky test MemoryLimitTest#testRejectMessages (#14220) (#14628) Co-authored-by: xuanqi.wu <[email protected]> (cherry picked from commit 5f8db372ee3926f93eb109ab3b713038c3b523c8) --- .../apache/pulsar/client/api/MemoryLimitTest.java | 86 ++++++++++++---------- 1 file changed, 47 insertions(+), 39 deletions(-) diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/MemoryLimitTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/MemoryLimitTest.java index ec98e7d..431991e 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/MemoryLimitTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/MemoryLimitTest.java @@ -18,20 +18,22 @@ */ package org.apache.pulsar.client.api; -import static org.testng.Assert.assertEquals; -import static org.testng.Assert.fail; - -import java.util.concurrent.CountDownLatch; - import lombok.Cleanup; - import org.apache.pulsar.client.api.PulsarClientException.MemoryBufferIsFullError; -import org.apache.pulsar.client.impl.PulsarClientImpl; +import org.apache.pulsar.client.impl.ProducerImpl; +import org.apache.pulsar.client.impl.PulsarTestClient; +import org.awaitility.Awaitility; import org.testng.annotations.AfterMethod; import org.testng.annotations.BeforeMethod; import org.testng.annotations.DataProvider; import org.testng.annotations.Test; +import java.time.Duration; +import java.util.concurrent.TimeUnit; + +import static org.testng.Assert.assertEquals; +import static org.testng.Assert.fail; + @Test(groups = "broker-api") public class MemoryLimitTest extends ProducerConsumerBase { @@ -62,27 +64,30 @@ public class MemoryLimitTest extends ProducerConsumerBase { throws Exception { String topic = newTopicName(); - @Cleanup - PulsarClientImpl client = (PulsarClientImpl) PulsarClient.builder() + ClientBuilder clientBuilder = PulsarClient.builder() .serviceUrl(pulsar.getBrokerServiceUrl()) - .memoryLimit(100, SizeUnit.KILO_BYTES) - .build(); + .memoryLimit(100, SizeUnit.KILO_BYTES); @Cleanup - Producer<byte[]> producer = client.newProducer() + PulsarTestClient client = PulsarTestClient.create(clientBuilder); + + @Cleanup + ProducerImpl<byte[]> producer = (ProducerImpl<byte[]>) client.newProducer() .topic(topic) .blockIfQueueFull(false) + .sendTimeout(5, TimeUnit.SECONDS) .create(); + // make sure all message pending at pendingMessages queue + // connection with broker can not be established, so handleSendReceipt will not be invoked while sending message + client.dropOpSendMessages(); final int n = 101; - CountDownLatch latch = new CountDownLatch(n); - for (int i = 0; i < n; i++) { - producer.sendAsync(new byte[1024]).thenRun(() -> { - latch.countDown(); - }); + producer.sendAsync(new byte[1024]); } - + Awaitility.await() + .atMost(Duration.ofSeconds(5)) + .until(() -> producer.getPendingQueueSize() == n); assertEquals(client.getMemoryLimitController().currentUsage(), n * 1024); try { @@ -92,8 +97,10 @@ public class MemoryLimitTest extends ProducerConsumerBase { // Expected } - latch.await(); - + client.allowReconnecting(); + Awaitility.await() + .atMost(Duration.ofSeconds(30)) + .until(() -> producer.getPendingQueueSize() == 0); assertEquals(client.getMemoryLimitController().currentUsage(), 0); // We should now be able to send again @@ -105,41 +112,40 @@ public class MemoryLimitTest extends ProducerConsumerBase { String t1 = newTopicName(); String t2 = newTopicName(); - @Cleanup - PulsarClientImpl client = (PulsarClientImpl) PulsarClient.builder() + ClientBuilder clientBuilder = PulsarClient.builder() .serviceUrl(pulsar.getBrokerServiceUrl()) - .memoryLimit(100, SizeUnit.KILO_BYTES) - .build(); + .memoryLimit(100, SizeUnit.KILO_BYTES); + + @Cleanup + PulsarTestClient client = PulsarTestClient.create(clientBuilder); @Cleanup - Producer<byte[]> p1 = client.newProducer() + ProducerImpl<byte[]> p1 = (ProducerImpl<byte[]>) client.newProducer() .topic(t1) .blockIfQueueFull(false) + .sendTimeout(5, TimeUnit.SECONDS) .create(); @Cleanup - Producer<byte[]> p2 = client.newProducer() + ProducerImpl<byte[]> p2 = (ProducerImpl<byte[]>) client.newProducer() .topic(t2) .blockIfQueueFull(false) + .sendTimeout(5, TimeUnit.SECONDS) .create(); + client.dropOpSendMessages(); final int n = 101; - CountDownLatch latch = new CountDownLatch(n); - for (int i = 0; i < n / 2; i++) { - p1.sendAsync(new byte[1024]).thenRun(() -> { - latch.countDown(); - }); - p2.sendAsync(new byte[1024]).thenRun(() -> { - latch.countDown(); - }); + p1.sendAsync(new byte[1024]); + p2.sendAsync(new byte[1024]); } // Last message in order to reach the limit - p1.sendAsync(new byte[1024]).thenRun(() -> { - latch.countDown(); - }); + p1.sendAsync(new byte[1024]); + Awaitility.await() + .atMost(Duration.ofSeconds(5)) + .until(() -> (p1.getPendingQueueSize() + p2.getPendingQueueSize()) == n); assertEquals(client.getMemoryLimitController().currentUsage(), n * 1024); try { @@ -156,8 +162,10 @@ public class MemoryLimitTest extends ProducerConsumerBase { // Expected } - latch.await(); - + client.allowReconnecting(); + Awaitility.await() + .atMost(Duration.ofSeconds(30)) + .until(() -> (p1.getPendingQueueSize() + p2.getPendingQueueSize()) == 0); assertEquals(client.getMemoryLimitController().currentUsage(), 0); // We should now be able to send again
