This is an automated email from the ASF dual-hosted git repository. penghui pushed a commit to branch branch-2.10 in repository https://gitbox.apache.org/repos/asf/pulsar.git
commit 3fc367d52b58f1a14fd6f433060d236aff269676 Author: lipenghui <[email protected]> AuthorDate: Wed Jul 13 00:29:58 2022 +0800 [fix][flaky-test] PersistentFailoverE2ETest.testSimpleConsumerEventsWithPartition (#16493) (cherry picked from commit 8a54fd944e4418f619954f09f5552ecd962bebf9) --- .../broker/service/PersistentFailoverE2ETest.java | 31 +++++++++++++--------- 1 file changed, 19 insertions(+), 12 deletions(-) diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentFailoverE2ETest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentFailoverE2ETest.java index c10742c59e6..9fe83f7b11d 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentFailoverE2ETest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentFailoverE2ETest.java @@ -52,6 +52,7 @@ import org.apache.pulsar.client.impl.TopicMessageImpl; import org.apache.pulsar.common.api.proto.CommandSubscribe.SubType; import org.apache.pulsar.common.naming.TopicName; import org.apache.pulsar.common.util.FutureUtil; +import org.awaitility.Awaitility; import org.testng.Assert; import org.testng.annotations.AfterClass; import org.testng.annotations.BeforeClass; @@ -184,8 +185,9 @@ public class PersistentFailoverE2ETest extends BrokerTestBase { rolloverPerIntervalStats(); - assertEquals(subRef.getNumberOfEntriesInBacklog(false), numMsgs); - Thread.sleep(ASYNC_EVENT_COMPLETION_WAIT); + Awaitility.await().untilAsserted(() -> { + assertEquals(subRef.getNumberOfEntriesInBacklog(false), numMsgs); + }); // 3. consumer1 should have all the messages while consumer2 should have no messages Message<byte[]> msg = null; @@ -200,8 +202,9 @@ public class PersistentFailoverE2ETest extends BrokerTestBase { rolloverPerIntervalStats(); // 4. messages deleted on individual acks - Thread.sleep(ASYNC_EVENT_COMPLETION_WAIT); - assertEquals(subRef.getNumberOfEntriesInBacklog(false), 0); + Awaitility.await().untilAsserted(() -> { + assertEquals(subRef.getNumberOfEntriesInBacklog(false), 0); + }); for (int i = 0; i < numMsgs; i++) { String message = "my-message-" + i; @@ -224,10 +227,12 @@ public class PersistentFailoverE2ETest extends BrokerTestBase { // do not ack } consumer1.close(); - Thread.sleep(CONSUMER_ADD_OR_REMOVE_WAIT_TIME); - verifyConsumerActive(listener2, -1); - verifyConsumerNotReceiveAnyStateChanges(listener1); + Awaitility.await().untilAsserted(() -> { + verifyConsumerActive(listener2, -1); + verifyConsumerNotReceiveAnyStateChanges(listener1); + }); + for (int i = 5; i < numMsgs; i++) { msg = consumer2.receive(1, TimeUnit.SECONDS); Assert.assertNotNull(msg); @@ -237,8 +242,10 @@ public class PersistentFailoverE2ETest extends BrokerTestBase { Assert.assertNull(consumer2.receive(100, TimeUnit.MILLISECONDS)); rolloverPerIntervalStats(); - Thread.sleep(ASYNC_EVENT_COMPLETION_WAIT); - assertEquals(subRef.getNumberOfEntriesInBacklog(false), 0); + Awaitility.await().untilAsserted(() -> { + assertEquals(subRef.getNumberOfEntriesInBacklog(false), 0); + + }); // 8. unsubscribe not allowed if multiple consumers connected try { @@ -257,9 +264,9 @@ public class PersistentFailoverE2ETest extends BrokerTestBase { fail("Should not fail", e); } - Thread.sleep(ASYNC_EVENT_COMPLETION_WAIT); - subRef = topicRef.getSubscription(subName); - assertNull(subRef); + Awaitility.await().untilAsserted(() -> { + assertNull(topicRef.getSubscription(subName)); + }); producer.close(); consumer2.close();
