This is an automated email from the ASF dual-hosted git repository.

mmerli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 8b45a65  [Issue 9451] Fix flaky test 
SimpleProducerConsumerTest.testConcurrentConsumerReceiveWhileReconnect (#9575)
8b45a65 is described below

commit 8b45a659d5f68d6f03f7497b4ca7b15628c4a281
Author: Lari Hotari <[email protected]>
AuthorDate: Fri Feb 12 21:09:13 2021 +0200

    [Issue 9451] Fix flaky test 
SimpleProducerConsumerTest.testConcurrentConsumerReceiveWhileReconnect (#9575)
    
    - use Awaitility to wait for assertions to pass
---
 .../client/api/SimpleProducerConsumerTest.java     | 41 +++++++++++-----------
 1 file changed, 21 insertions(+), 20 deletions(-)

diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleProducerConsumerTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleProducerConsumerTest.java
index 9465d56..22b3e08 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleProducerConsumerTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleProducerConsumerTest.java
@@ -760,12 +760,9 @@ public class SimpleProducerConsumerTest extends 
ProducerConsumerBase {
         }
 
         barrier.await();
-        // there will be 10 threads calling receive() from the same consumer 
and will block
-        Thread.sleep(100);
 
         // we restart the broker to reconnect
         restartBroker();
-        Thread.sleep(2000);
 
         // publish 100 messages so that the consumers blocked on receive() 
will now get the messages
         ProducerBuilder<byte[]> producerBuilder = pulsarClient.newProducer()
@@ -781,12 +778,14 @@ public class SimpleProducerConsumerTest extends 
ProducerConsumerBase {
             String message = "my-message-" + i;
             producer.send(message.getBytes());
         }
-        Thread.sleep(500);
 
         ConsumerImpl<byte[]> consumerImpl = (ConsumerImpl<byte[]>) consumer;
-        // The available permits should be 10 and num messages in the queue 
should be 90
-        Assert.assertEquals(consumerImpl.getAvailablePermits(), 
numConsumersThreads);
-        Assert.assertEquals(consumerImpl.numMessagesInQueue(), recvQueueSize - 
numConsumersThreads);
+
+        Awaitility.await().untilAsserted(() -> {
+            // The available permits should be 10 and num messages in the 
queue should be 90
+            Assert.assertEquals(consumerImpl.getAvailablePermits(), 
numConsumersThreads);
+            Assert.assertEquals(consumerImpl.numMessagesInQueue(), 
recvQueueSize - numConsumersThreads);
+        });
 
         barrier.reset();
         for (int i = 0; i < numConsumersThreads; i++) {
@@ -797,11 +796,12 @@ public class SimpleProducerConsumerTest extends 
ProducerConsumerBase {
             });
         }
         barrier.await();
-        Thread.sleep(100);
 
-        // The available permits should be 20 and num messages in the queue 
should be 80
-        Assert.assertEquals(consumerImpl.getAvailablePermits(), 
numConsumersThreads * 2);
-        Assert.assertEquals(consumerImpl.numMessagesInQueue(), recvQueueSize - 
(numConsumersThreads * 2));
+        Awaitility.await().untilAsserted(() -> {
+            // The available permits should be 20 and num messages in the 
queue should be 80
+            Assert.assertEquals(consumerImpl.getAvailablePermits(), 
numConsumersThreads * 2);
+            Assert.assertEquals(consumerImpl.numMessagesInQueue(), 
recvQueueSize - (numConsumersThreads * 2));
+        });
 
         // clear the queue
         while (true) {
@@ -811,9 +811,11 @@ public class SimpleProducerConsumerTest extends 
ProducerConsumerBase {
             }
         }
 
-        // The available permits should be 0 and num messages in the queue 
should be 0
-        Assert.assertEquals(consumerImpl.getAvailablePermits(), 0);
-        Assert.assertEquals(consumerImpl.numMessagesInQueue(), 0);
+        Awaitility.await().untilAsserted(() -> {
+            // The available permits should be 0 and num messages in the queue 
should be 0
+            Assert.assertEquals(consumerImpl.getAvailablePermits(), 0);
+            Assert.assertEquals(consumerImpl.numMessagesInQueue(), 0);
+        });
 
         barrier.reset();
         for (int i = 0; i < numConsumersThreads; i++) {
@@ -824,15 +826,14 @@ public class SimpleProducerConsumerTest extends 
ProducerConsumerBase {
             });
         }
         barrier.await();
-        // we again make 10 threads call receive() and get blocked
-        Thread.sleep(100);
 
         restartBroker();
-        Thread.sleep(2000);
 
-        // The available permits should be 10 and num messages in the queue 
should be 90
-        Assert.assertEquals(consumerImpl.getAvailablePermits(), 
numConsumersThreads);
-        Assert.assertEquals(consumerImpl.numMessagesInQueue(), recvQueueSize - 
numConsumersThreads);
+        Awaitility.await().untilAsserted(() -> {
+            // The available permits should be 10 and num messages in the 
queue should be 90
+            Assert.assertEquals(consumerImpl.getAvailablePermits(), 
numConsumersThreads);
+            Assert.assertEquals(consumerImpl.numMessagesInQueue(), 
recvQueueSize - numConsumersThreads);
+        });
         consumer.close();
         executor.shutdown();
     }

Reply via email to