This is an automated email from the ASF dual-hosted git repository.
penghui pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 7503b14 fix flaky-test:
V1_ProducerConsumerTest.testConcurrentConsumerReceiveWhileReconnect (#9435)
7503b14 is described below
commit 7503b146d7eee2c2ccf6b59c352cdfc7bde7fda9
Author: feynmanlin <[email protected]>
AuthorDate: Fri Feb 5 18:58:01 2021 +0800
fix flaky-test:
V1_ProducerConsumerTest.testConcurrentConsumerReceiveWhileReconnect (#9435)
---
.../pulsar/client/api/v1/V1_ProducerConsumerTest.java | 13 +++++++------
1 file changed, 7 insertions(+), 6 deletions(-)
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/v1/V1_ProducerConsumerTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/v1/V1_ProducerConsumerTest.java
index 2baec56..6f00f0e 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/v1/V1_ProducerConsumerTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/v1/V1_ProducerConsumerTest.java
@@ -74,6 +74,7 @@ import org.apache.pulsar.client.impl.TypedMessageBuilderImpl;
import org.apache.pulsar.common.protocol.Commands;
import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.util.FutureUtil;
+import org.awaitility.Awaitility;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.testng.Assert;
@@ -538,11 +539,11 @@ public class V1_ProducerConsumerTest extends
V1_ProducerConsumerBase {
String message = "my-message-" + i;
producer.send(message.getBytes());
}
- Thread.sleep(500);
ConsumerImpl<byte[]> consumerImpl = (ConsumerImpl<byte[]>) consumer;
// The available permits should be 10 and num messages in the queue
should be 90
- Assert.assertEquals(consumerImpl.getAvailablePermits(),
numConsumersThreads);
+ Awaitility.await().untilAsserted(() ->
+ Assert.assertEquals(consumerImpl.getAvailablePermits(),
numConsumersThreads));
Assert.assertEquals(consumerImpl.numMessagesInQueue(), recvQueueSize -
numConsumersThreads);
barrier.reset();
@@ -557,10 +558,10 @@ public class V1_ProducerConsumerTest extends
V1_ProducerConsumerBase {
});
}
barrier.await();
- Thread.sleep(100);
// The available permits should be 20 and num messages in the queue
should be 80
- Assert.assertEquals(consumerImpl.getAvailablePermits(),
numConsumersThreads * 2);
+ Awaitility.await().untilAsserted(() ->
+ Assert.assertEquals(consumerImpl.getAvailablePermits(),
numConsumersThreads * 2));
Assert.assertEquals(consumerImpl.numMessagesInQueue(), recvQueueSize -
(numConsumersThreads * 2));
// clear the queue
@@ -591,10 +592,10 @@ public class V1_ProducerConsumerTest extends
V1_ProducerConsumerBase {
Thread.sleep(100);
restartBroker();
- Thread.sleep(2000);
// The available permits should be 10 and num messages in the queue
should be 90
- Assert.assertEquals(consumerImpl.getAvailablePermits(),
numConsumersThreads);
+ Awaitility.await().untilAsserted(() ->
+ Assert.assertEquals(consumerImpl.getAvailablePermits(),
numConsumersThreads));
Assert.assertEquals(consumerImpl.numMessagesInQueue(), recvQueueSize -
numConsumersThreads);
consumer.close();