This is an automated email from the ASF dual-hosted git repository.

penghui pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 7503b14  fix flaky-test: 
V1_ProducerConsumerTest.testConcurrentConsumerReceiveWhileReconnect (#9435)
7503b14 is described below

commit 7503b146d7eee2c2ccf6b59c352cdfc7bde7fda9
Author: feynmanlin <[email protected]>
AuthorDate: Fri Feb 5 18:58:01 2021 +0800

    fix flaky-test: 
V1_ProducerConsumerTest.testConcurrentConsumerReceiveWhileReconnect (#9435)
---
 .../pulsar/client/api/v1/V1_ProducerConsumerTest.java       | 13 +++++++------
 1 file changed, 7 insertions(+), 6 deletions(-)

diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/v1/V1_ProducerConsumerTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/v1/V1_ProducerConsumerTest.java
index 2baec56..6f00f0e 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/v1/V1_ProducerConsumerTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/v1/V1_ProducerConsumerTest.java
@@ -74,6 +74,7 @@ import org.apache.pulsar.client.impl.TypedMessageBuilderImpl;
 import org.apache.pulsar.common.protocol.Commands;
 import org.apache.pulsar.common.naming.TopicName;
 import org.apache.pulsar.common.util.FutureUtil;
+import org.awaitility.Awaitility;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.testng.Assert;
@@ -538,11 +539,11 @@ public class V1_ProducerConsumerTest extends 
V1_ProducerConsumerBase {
             String message = "my-message-" + i;
             producer.send(message.getBytes());
         }
-        Thread.sleep(500);
 
         ConsumerImpl<byte[]> consumerImpl = (ConsumerImpl<byte[]>) consumer;
         // The available permits should be 10 and num messages in the queue 
should be 90
-        Assert.assertEquals(consumerImpl.getAvailablePermits(), 
numConsumersThreads);
+        Awaitility.await().untilAsserted(() ->
+                Assert.assertEquals(consumerImpl.getAvailablePermits(), 
numConsumersThreads));
         Assert.assertEquals(consumerImpl.numMessagesInQueue(), recvQueueSize - 
numConsumersThreads);
 
         barrier.reset();
@@ -557,10 +558,10 @@ public class V1_ProducerConsumerTest extends 
V1_ProducerConsumerBase {
             });
         }
         barrier.await();
-        Thread.sleep(100);
 
         // The available permits should be 20 and num messages in the queue 
should be 80
-        Assert.assertEquals(consumerImpl.getAvailablePermits(), 
numConsumersThreads * 2);
+        Awaitility.await().untilAsserted(() ->
+                Assert.assertEquals(consumerImpl.getAvailablePermits(), 
numConsumersThreads * 2));
         Assert.assertEquals(consumerImpl.numMessagesInQueue(), recvQueueSize - 
(numConsumersThreads * 2));
 
         // clear the queue
@@ -591,10 +592,10 @@ public class V1_ProducerConsumerTest extends 
V1_ProducerConsumerBase {
         Thread.sleep(100);
 
         restartBroker();
-        Thread.sleep(2000);
 
         // The available permits should be 10 and num messages in the queue 
should be 90
-        Assert.assertEquals(consumerImpl.getAvailablePermits(), 
numConsumersThreads);
+        Awaitility.await().untilAsserted(() ->
+                Assert.assertEquals(consumerImpl.getAvailablePermits(), 
numConsumersThreads));
         Assert.assertEquals(consumerImpl.numMessagesInQueue(), recvQueueSize - 
numConsumersThreads);
         consumer.close();
 

Reply via email to