This is an automated email from the ASF dual-hosted git repository.
mmerli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 8b45a65 [Issue 9451] Fix flaky test
SimpleProducerConsumerTest.testConcurrentConsumerReceiveWhileReconnect (#9575)
8b45a65 is described below
commit 8b45a659d5f68d6f03f7497b4ca7b15628c4a281
Author: Lari Hotari <[email protected]>
AuthorDate: Fri Feb 12 21:09:13 2021 +0200
[Issue 9451] Fix flaky test
SimpleProducerConsumerTest.testConcurrentConsumerReceiveWhileReconnect (#9575)
- use Awaitility to wait for assertions to pass
---
.../client/api/SimpleProducerConsumerTest.java | 41 +++++++++++-----------
1 file changed, 21 insertions(+), 20 deletions(-)
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleProducerConsumerTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleProducerConsumerTest.java
index 9465d56..22b3e08 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleProducerConsumerTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleProducerConsumerTest.java
@@ -760,12 +760,9 @@ public class SimpleProducerConsumerTest extends
ProducerConsumerBase {
}
barrier.await();
- // there will be 10 threads calling receive() from the same consumer
and will block
- Thread.sleep(100);
// we restart the broker to reconnect
restartBroker();
- Thread.sleep(2000);
// publish 100 messages so that the consumers blocked on receive()
will now get the messages
ProducerBuilder<byte[]> producerBuilder = pulsarClient.newProducer()
@@ -781,12 +778,14 @@ public class SimpleProducerConsumerTest extends
ProducerConsumerBase {
String message = "my-message-" + i;
producer.send(message.getBytes());
}
- Thread.sleep(500);
ConsumerImpl<byte[]> consumerImpl = (ConsumerImpl<byte[]>) consumer;
- // The available permits should be 10 and num messages in the queue
should be 90
- Assert.assertEquals(consumerImpl.getAvailablePermits(),
numConsumersThreads);
- Assert.assertEquals(consumerImpl.numMessagesInQueue(), recvQueueSize -
numConsumersThreads);
+
+ Awaitility.await().untilAsserted(() -> {
+ // The available permits should be 10 and num messages in the
queue should be 90
+ Assert.assertEquals(consumerImpl.getAvailablePermits(),
numConsumersThreads);
+ Assert.assertEquals(consumerImpl.numMessagesInQueue(),
recvQueueSize - numConsumersThreads);
+ });
barrier.reset();
for (int i = 0; i < numConsumersThreads; i++) {
@@ -797,11 +796,12 @@ public class SimpleProducerConsumerTest extends
ProducerConsumerBase {
});
}
barrier.await();
- Thread.sleep(100);
- // The available permits should be 20 and num messages in the queue
should be 80
- Assert.assertEquals(consumerImpl.getAvailablePermits(),
numConsumersThreads * 2);
- Assert.assertEquals(consumerImpl.numMessagesInQueue(), recvQueueSize -
(numConsumersThreads * 2));
+ Awaitility.await().untilAsserted(() -> {
+ // The available permits should be 20 and num messages in the
queue should be 80
+ Assert.assertEquals(consumerImpl.getAvailablePermits(),
numConsumersThreads * 2);
+ Assert.assertEquals(consumerImpl.numMessagesInQueue(),
recvQueueSize - (numConsumersThreads * 2));
+ });
// clear the queue
while (true) {
@@ -811,9 +811,11 @@ public class SimpleProducerConsumerTest extends
ProducerConsumerBase {
}
}
- // The available permits should be 0 and num messages in the queue
should be 0
- Assert.assertEquals(consumerImpl.getAvailablePermits(), 0);
- Assert.assertEquals(consumerImpl.numMessagesInQueue(), 0);
+ Awaitility.await().untilAsserted(() -> {
+ // The available permits should be 0 and num messages in the queue
should be 0
+ Assert.assertEquals(consumerImpl.getAvailablePermits(), 0);
+ Assert.assertEquals(consumerImpl.numMessagesInQueue(), 0);
+ });
barrier.reset();
for (int i = 0; i < numConsumersThreads; i++) {
@@ -824,15 +826,14 @@ public class SimpleProducerConsumerTest extends
ProducerConsumerBase {
});
}
barrier.await();
- // we again make 10 threads call receive() and get blocked
- Thread.sleep(100);
restartBroker();
- Thread.sleep(2000);
- // The available permits should be 10 and num messages in the queue
should be 90
- Assert.assertEquals(consumerImpl.getAvailablePermits(),
numConsumersThreads);
- Assert.assertEquals(consumerImpl.numMessagesInQueue(), recvQueueSize -
numConsumersThreads);
+ Awaitility.await().untilAsserted(() -> {
+ // The available permits should be 10 and num messages in the
queue should be 90
+ Assert.assertEquals(consumerImpl.getAvailablePermits(),
numConsumersThreads);
+ Assert.assertEquals(consumerImpl.numMessagesInQueue(),
recvQueueSize - numConsumersThreads);
+ });
consumer.close();
executor.shutdown();
}