This is an automated email from the ASF dual-hosted git repository.

eolivelli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 3ba8d20  [tests] use Awaitility replace Thread.sleep for 
pulsar-broker. (#11281)
3ba8d20 is described below

commit 3ba8d202246996416e34b325f34a1965510cd87e
Author: YANGLiiN <ie...@qq.com>
AuthorDate: Mon Jul 19 22:12:01 2021 +0800

    [tests] use Awaitility replace Thread.sleep for pulsar-broker. (#11281)
---
 .../bookkeeper/mledger/impl/ManagedCursorTest.java |  4 +-
 .../broker/admin/AdminApiGetLastMessageIdTest.java | 34 ++++++------
 .../broker/admin/AdminApiSchemaAutoUpdateTest.java | 16 ++----
 .../apache/pulsar/broker/admin/AdminApiTest.java   | 45 ++++------------
 .../apache/pulsar/broker/admin/AdminApiTest2.java  | 50 ++++++++---------
 .../apache/pulsar/broker/admin/NamespacesTest.java |  8 ++-
 .../pulsar/broker/admin/v1/V1_AdminApiTest2.java   | 18 ++-----
 .../AntiAffinityNamespaceGroupTest.java            | 28 ++++------
 .../loadbalance/ModularLoadManagerImplTest.java    | 10 +---
 .../broker/namespace/OwnershipCacheTest.java       | 11 ++--
 .../GracefulExecutorServicesShutdownTest.java      |  3 +-
 .../broker/service/ReplicatorSubscriptionTest.java |  9 ++--
 .../pulsar/broker/service/ServerCnxTest.java       |  4 +-
 .../broker/service/SubscriptionSeekTest.java       | 31 +++++------
 .../broker/service/TopicTerminationTest.java       | 19 +++----
 .../broker/transaction/TransactionProduceTest.java | 25 +++------
 .../pulsar/client/api/NonPersistentTopicTest.java  | 14 ++---
 .../client/api/SimpleProducerConsumerTest.java     | 63 +++++++++++-----------
 18 files changed, 156 insertions(+), 236 deletions(-)

diff --git 
a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorTest.java
 
b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorTest.java
index 2a648ea..422ce53 100644
--- 
a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorTest.java
+++ 
b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorTest.java
@@ -946,9 +946,7 @@ public class ManagedCursorTest extends 
MockedBookKeeperTestCase {
         ledger.addEntry("dummy-entry-7".getBytes(Encoding));
 
         // Verify that GC trimming kicks in
-        while (ledger.getNumberOfEntries() > 2) {
-            Thread.sleep(10);
-        }
+        Awaitility.await().until(() -> ledger.getNumberOfEntries() <= 2);
     }
 
     @Test(timeOut = 20000)
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiGetLastMessageIdTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiGetLastMessageIdTest.java
index 3e85523..cba0686 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiGetLastMessageIdTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiGetLastMessageIdTest.java
@@ -18,7 +18,21 @@
  */
 package org.apache.pulsar.broker.admin;
 
+import static org.mockito.Mockito.doNothing;
+import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.spy;
 import com.google.common.collect.Sets;
+import java.lang.reflect.Field;
+import java.util.Collection;
+import java.util.Date;
+import java.util.Map;
+import java.util.UUID;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.TimeUnit;
+import javax.ws.rs.container.AsyncResponse;
+import javax.ws.rs.container.TimeoutHandler;
+import javax.ws.rs.core.UriInfo;
 import org.apache.pulsar.broker.admin.v2.PersistentTopics;
 import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest;
 import org.apache.pulsar.broker.authentication.AuthenticationDataHttps;
@@ -41,22 +55,6 @@ import org.testng.annotations.BeforeClass;
 import org.testng.annotations.BeforeMethod;
 import org.testng.annotations.Test;
 
-import javax.ws.rs.container.AsyncResponse;
-import javax.ws.rs.container.TimeoutHandler;
-import javax.ws.rs.core.UriInfo;
-import java.lang.reflect.Field;
-import java.util.Collection;
-import java.util.Date;
-import java.util.Map;
-import java.util.UUID;
-import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.TimeUnit;
-
-import static org.mockito.Mockito.doNothing;
-import static org.mockito.Mockito.doReturn;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.spy;
-
 @Test(groups = "broker")
 public class AdminApiGetLastMessageIdTest extends MockedPulsarServiceBaseTest {
 
@@ -204,9 +202,7 @@ public class AdminApiGetLastMessageIdTest extends 
MockedPulsarServiceBaseTest {
         }
 
         persistentTopics.getLastMessageId(asyncResponse, "prop", "ns-abc", 
"my-topic", true);
-        while (id[0] == null) {
-            Thread.sleep(1);
-        }
+        Awaitility.await().until(() -> id[0] != null);
         Assert.assertTrue(((MessageIdImpl)id[0]).getLedgerId() >= 0);
         Assert.assertEquals(numberOfMessages-1, 
((MessageIdImpl)id[0]).getEntryId());
         messageId = id[0];
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiSchemaAutoUpdateTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiSchemaAutoUpdateTest.java
index ec2b1e8..891e7ac 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiSchemaAutoUpdateTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiSchemaAutoUpdateTest.java
@@ -18,25 +18,20 @@
  */
 package org.apache.pulsar.broker.admin;
 
-import org.apache.avro.reflect.AvroAlias;
-import org.apache.avro.reflect.AvroDefault;
 import com.google.common.collect.Sets;
-
 import java.lang.reflect.Field;
-
 import lombok.extern.slf4j.Slf4j;
-
+import org.apache.avro.reflect.AvroAlias;
+import org.apache.avro.reflect.AvroDefault;
 import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest;
 import org.apache.pulsar.broker.service.Topic;
-import org.apache.pulsar.common.policies.data.ClusterData;
-import org.apache.pulsar.common.policies.data.SchemaCompatibilityStrategy;
 import org.apache.pulsar.client.api.Producer;
 import org.apache.pulsar.client.api.PulsarClientException;
 import org.apache.pulsar.client.api.Schema;
-import org.apache.pulsar.common.policies.data.ClusterDataImpl;
-import org.apache.pulsar.common.policies.data.TenantInfoImpl;
+import org.apache.pulsar.common.policies.data.ClusterData;
 import 
org.apache.pulsar.common.policies.data.SchemaAutoUpdateCompatibilityStrategy;
-
+import org.apache.pulsar.common.policies.data.SchemaCompatibilityStrategy;
+import org.apache.pulsar.common.policies.data.TenantInfoImpl;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.testng.Assert;
@@ -192,7 +187,6 @@ public class AdminApiSchemaAutoUpdateTest extends 
MockedPulsarServiceBaseTest {
             if (strategy.get(t) == SchemaCompatibilityStrategy.FULL) {
                 break;
             }
-            Thread.sleep(100);
         }
         log.info("try with fully compat, again");
         try (Producer<V4Data> p = 
pulsarClient.newProducer(Schema.AVRO(V4Data.class)).topic(topicName).create()) {
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiTest.java 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiTest.java
index 7ac114e..c1b8b66 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiTest.java
@@ -263,9 +263,8 @@ public class AdminApiTest extends 
MockedPulsarServiceBaseTest {
                         .build());
 
         admin.clusters().deleteCluster("usw");
-        Thread.sleep(300);
-
-        assertEquals(admin.clusters().getClusters(), 
Lists.newArrayList("test"));
+        Awaitility.await()
+                .untilAsserted(() -> 
assertEquals(admin.clusters().getClusters(), Lists.newArrayList("test")));
 
         admin.namespaces().deleteNamespace("prop-xyz/ns1");
         admin.clusters().deleteCluster("test");
@@ -598,13 +597,7 @@ public class AdminApiTest extends 
MockedPulsarServiceBaseTest {
                 
ObjectMapperFactory.getThreadLocal().writeValueAsBytes(configMap),
                 Optional.empty()).join();
         // wait config to be updated
-        for (int i = 0; i < 5; i++) {
-            if (pulsar.getConfiguration().getBrokerShutdownTimeoutMs() != 
newValue) {
-                Thread.sleep(100 + (i * 10));
-            } else {
-                break;
-            }
-        }
+        Awaitility.await().until(() -> 
pulsar.getConfiguration().getBrokerShutdownTimeoutMs() == newValue);
         // verify value is updated
         assertEquals(pulsar.getConfiguration().getBrokerShutdownTimeoutMs(), 
newValue);
     }
@@ -1588,15 +1581,8 @@ public class AdminApiTest extends 
MockedPulsarServiceBaseTest {
         LOG.info("--- RELOAD ---");
 
         // Force reload of namespace and wait for topic to be ready
-        for (int i = 0; i < 30; i++) {
-            try {
-                admin.topics().getStats("persistent://prop-xyz/ns1/ds2");
-                break;
-            } catch (PulsarAdminException e) {
-                LOG.warn("Failed to get topic stats.. {}", e.getMessage());
-                Thread.sleep(1000);
-            }
-        }
+        Awaitility.await().timeout(30, 
TimeUnit.SECONDS).ignoreExceptionsInstanceOf(PulsarAdminException.class)
+                .until(() -> 
admin.topics().getStats("persistent://prop-xyz/ns1/ds2") != null);
 
         admin.topics().deleteSubscription("persistent://prop-xyz/ns1/ds2", 
"my-sub");
         admin.topics().delete("persistent://prop-xyz/ns1/ds2");
@@ -1646,15 +1632,8 @@ public class AdminApiTest extends 
MockedPulsarServiceBaseTest {
         LOG.info("--- RELOAD ---");
 
         // Force reload of namespace and wait for topic to be ready
-        for (int i = 0; i < 30; i++) {
-            try {
-                
admin.topics().getStats("persistent://prop-xyz/ns1-bundles/ds2");
-                break;
-            } catch (PulsarAdminException e) {
-                LOG.warn("Failed to get topic stats.. {}", e.getMessage());
-                Thread.sleep(1000);
-            }
-        }
+        Awaitility.await().timeout(30, 
TimeUnit.SECONDS).ignoreExceptionsInstanceOf(PulsarAdminException.class)
+                .until(() -> 
admin.topics().getStats("persistent://prop-xyz/ns1-bundles/ds2") != null);
 
         
admin.topics().deleteSubscription("persistent://prop-xyz/ns1-bundles/ds2", 
"my-sub");
         admin.topics().delete("persistent://prop-xyz/ns1-bundles/ds2");
@@ -2847,13 +2826,9 @@ public class AdminApiTest extends 
MockedPulsarServiceBaseTest {
         Assert.assertEquals((int) 
admin.namespaces().getSubscriptionExpirationTime(namespace2), 1);
         
Assert.assertNull(admin.namespaces().getSubscriptionExpirationTime(namespace3));
 
-        Thread.sleep(60000);
-        for (int i = 0; i < 60; i++) {
-            if (admin.topics().getSubscriptions(topic2).size() == 0) {
-                break;
-            }
-            Thread.sleep(1000);
-        }
+
+        Awaitility.await().timeout(120, TimeUnit.SECONDS)
+                .until(() -> admin.topics().getSubscriptions(topic2).size() == 
0);
         Assert.assertEquals(admin.topics().getSubscriptions(topic1).size(), 1);
         Assert.assertEquals(admin.topics().getSubscriptions(topic2).size(), 0);
         Assert.assertEquals(admin.topics().getSubscriptions(topic3).size(), 1);
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiTest2.java 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiTest2.java
index b67939d..f07a2ca 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiTest2.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiTest2.java
@@ -484,11 +484,7 @@ public class AdminApiTest2 extends 
MockedPulsarServiceBaseTest {
         // unload the topic
         unloadTopic(topicName);
         // producer will retry and recreate the topic
-        for (int i = 0; i < 5; i++) {
-            if 
(!pulsar.getBrokerService().getTopicReference(topicName).isPresent() || i != 4) 
{
-                Thread.sleep(200);
-            }
-        }
+        Awaitility.await().until(() -> 
pulsar.getBrokerService().getTopicReference(topicName).isPresent());
         // topic should be loaded by this time
         topic = pulsar.getBrokerService().getTopicReference(topicName).get();
         assertNotNull(topic);
@@ -1255,12 +1251,12 @@ public class AdminApiTest2 extends 
MockedPulsarServiceBaseTest {
         consumer.acknowledge(message);
 
         // wait for ack send
-        Thread.sleep(500);
-
-        // Consumer acks the message, so the precise backlog is 0
-        topicStats = admin.topics().getStats(topic, true, true);
-        
assertEquals(topicStats.getSubscriptions().get(subName).getBacklogSize(), 0);
-        
assertEquals(topicStats.getSubscriptions().get(subName).getMsgBacklog(), 0);
+        Awaitility.await().untilAsserted(() -> {
+            // Consumer acks the message, so the precise backlog is 0
+            TopicStats topicStats2 = admin.topics().getStats(topic, true, 
true);
+            
assertEquals(topicStats2.getSubscriptions().get(subName).getBacklogSize(), 0);
+            
assertEquals(topicStats2.getSubscriptions().get(subName).getMsgBacklog(), 0);
+        });
 
         topicStats = admin.topics().getStats(topic);
         
assertEquals(topicStats.getSubscriptions().get(subName).getMsgBacklog(), 9);
@@ -1302,20 +1298,24 @@ public class AdminApiTest2 extends 
MockedPulsarServiceBaseTest {
         // Wait for messages to be tracked for delayed delivery. This happens
         // on the consumer dispatch side, so when the send() is complete we're
         // not yet guaranteed to see the stats updated.
-        Thread.sleep(500);
+        Awaitility.await().untilAsserted(() -> {
+            TopicStats topicStats = admin.topics().getStats(topic, true, true);
+            
assertEquals(topicStats.getSubscriptions().get(subName).getMsgBacklog(), 10);
+            
assertEquals(topicStats.getSubscriptions().get(subName).getMsgBacklogNoDelayed(),
 5);
+        });
 
-        TopicStats topicStats = admin.topics().getStats(topic, true, true);
-        
assertEquals(topicStats.getSubscriptions().get(subName).getMsgBacklog(), 10);
-        
assertEquals(topicStats.getSubscriptions().get(subName).getMsgBacklogNoDelayed(),
 5);
 
         for (int i = 0; i < 5; i++) {
             consumer.acknowledge(consumer.receive());
         }
+
         // Wait the ack send.
-        Thread.sleep(500);
-        topicStats = admin.topics().getStats(topic, true, true);
-        
assertEquals(topicStats.getSubscriptions().get(subName).getMsgBacklog(), 5);
-        
assertEquals(topicStats.getSubscriptions().get(subName).getMsgBacklogNoDelayed(),
 0);
+        Awaitility.await().untilAsserted(() -> {
+            TopicStats topicStats = admin.topics().getStats(topic, true, true);
+            
assertEquals(topicStats.getSubscriptions().get(subName).getMsgBacklog(), 5);
+            
assertEquals(topicStats.getSubscriptions().get(subName).getMsgBacklogNoDelayed(),
 0);
+        });
+
     }
 
     @Test
@@ -1403,11 +1403,13 @@ public class AdminApiTest2 extends 
MockedPulsarServiceBaseTest {
             consumer.acknowledge(consumer.receive());
         }
         // Wait the ack send.
-        Thread.sleep(500);
-        topicStats = admin.topics().getPartitionedStats(topic, false, true, 
true);
-        
assertEquals(topicStats.getSubscriptions().get(subName).getMsgBacklog(), 5);
-        
assertEquals(topicStats.getSubscriptions().get(subName).getBacklogSize(), 238);
-        
assertEquals(topicStats.getSubscriptions().get(subName).getMsgBacklogNoDelayed(),
 0);
+        Awaitility.await().untilAsserted(() -> {
+            TopicStats topicStats2 = admin.topics().getPartitionedStats(topic, 
false, true, true);
+            
assertEquals(topicStats2.getSubscriptions().get(subName).getMsgBacklog(), 5);
+            
assertEquals(topicStats2.getSubscriptions().get(subName).getBacklogSize(), 238);
+            
assertEquals(topicStats2.getSubscriptions().get(subName).getMsgBacklogNoDelayed(),
 0);
+        });
+
     }
 
     @Test
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/NamespacesTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/NamespacesTest.java
index b2eda82..de0ed94 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/NamespacesTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/NamespacesTest.java
@@ -85,7 +85,6 @@ import org.apache.pulsar.common.naming.TopicName;
 import org.apache.pulsar.common.policies.data.AuthAction;
 import org.apache.pulsar.common.policies.data.BundlesData;
 import org.apache.pulsar.common.policies.data.ClusterData;
-import org.apache.pulsar.common.policies.data.ClusterDataImpl;
 import org.apache.pulsar.common.policies.data.OffloadPoliciesImpl;
 import org.apache.pulsar.common.policies.data.PersistencePolicies;
 import org.apache.pulsar.common.policies.data.Policies;
@@ -102,6 +101,7 @@ import org.apache.zookeeper.CreateMode;
 import org.apache.zookeeper.KeeperException.Code;
 import org.apache.zookeeper.MockZooKeeper;
 import org.apache.zookeeper.ZooDefs;
+import org.awaitility.Awaitility;
 import org.mockito.ArgumentCaptor;
 import org.mockito.ArgumentMatcher;
 import org.mockito.Mockito;
@@ -1246,8 +1246,7 @@ public class NamespacesTest extends 
MockedPulsarServiceBaseTest {
 
         // Subscribe Rate Limiter is enabled, will limited by broker
         pulsarClient.updateServiceUrl(lookupUrl.toString());
-        Thread.sleep(1000L);
-        assertFalse(consumer.isConnected());
+        Awaitility.await().untilAsserted(() -> 
assertFalse(consumer.isConnected()));
 
         // Out of limit period
         Thread.sleep(6000L);
@@ -1258,8 +1257,7 @@ public class NamespacesTest extends 
MockedPulsarServiceBaseTest {
         subscribeRate = new SubscribeRate(0, 10);
         admin.namespaces().setSubscribeRate(namespace, subscribeRate);
         pulsarClient.updateServiceUrl(lookupUrl.toString());
-        Thread.sleep(1000L);
-        assertTrue(consumer.isConnected());
+        Awaitility.await().untilAsserted(() -> 
assertTrue(consumer.isConnected()));
         pulsar.getConfiguration().setAuthorizationEnabled(true);
         admin.topics().deletePartitionedTopic(topicName, true);
         admin.namespaces().deleteNamespace(namespace);
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/v1/V1_AdminApiTest2.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/v1/V1_AdminApiTest2.java
index 607cdb6..fbbae43 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/v1/V1_AdminApiTest2.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/v1/V1_AdminApiTest2.java
@@ -25,21 +25,16 @@ import static org.testng.Assert.assertNotNull;
 import static org.testng.Assert.assertNull;
 import static org.testng.Assert.assertTrue;
 import static org.testng.Assert.fail;
-
 import com.google.common.collect.Lists;
 import com.google.common.collect.Maps;
 import com.google.common.collect.Sets;
-
 import java.net.URL;
-import java.util.Collections;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
 import java.util.concurrent.TimeUnit;
-
 import lombok.Cleanup;
-
 import org.apache.bookkeeper.mledger.impl.ManagedCursorImpl;
 import org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl;
 import org.apache.pulsar.broker.admin.v1.V1_AdminApiTest.MockedPulsarService;
@@ -61,18 +56,17 @@ import org.apache.pulsar.client.impl.MessageIdImpl;
 import org.apache.pulsar.common.naming.TopicDomain;
 import org.apache.pulsar.common.naming.TopicName;
 import org.apache.pulsar.common.policies.data.ClusterData;
-import org.apache.pulsar.common.policies.data.ClusterDataImpl;
 import org.apache.pulsar.common.policies.data.ConsumerStats;
-import org.apache.pulsar.common.policies.data.FailureDomainImpl;
 import org.apache.pulsar.common.policies.data.FailureDomain;
 import org.apache.pulsar.common.policies.data.NonPersistentTopicStats;
 import org.apache.pulsar.common.policies.data.PartitionedTopicStats;
 import org.apache.pulsar.common.policies.data.PersistencePolicies;
 import org.apache.pulsar.common.policies.data.PersistentTopicInternalStats;
-import org.apache.pulsar.common.policies.data.TopicStats;
-import org.apache.pulsar.common.policies.data.TenantInfoImpl;
 import org.apache.pulsar.common.policies.data.RetentionPolicies;
 import org.apache.pulsar.common.policies.data.SubscriptionStats;
+import org.apache.pulsar.common.policies.data.TenantInfoImpl;
+import org.apache.pulsar.common.policies.data.TopicStats;
+import org.awaitility.Awaitility;
 import org.testng.annotations.AfterMethod;
 import org.testng.annotations.BeforeMethod;
 import org.testng.annotations.DataProvider;
@@ -397,11 +391,7 @@ public class V1_AdminApiTest2 extends 
MockedPulsarServiceBaseTest {
         // unload the topic
         unloadTopic(topicName, isPersistentTopic);
         // producer will retry and recreate the topic
-        for (int i = 0; i < 5; i++) {
-            if 
(!pulsar.getBrokerService().getTopicReference(topicName).isPresent() || i != 4) 
{
-                Thread.sleep(200);
-            }
-        }
+        Awaitility.await().until(() -> 
pulsar.getBrokerService().getTopicReference(topicName).isPresent());
         // topic should be loaded by this time
         topic = pulsar.getBrokerService().getTopicReference(topicName).get();
         assertNotNull(topic);
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/AntiAffinityNamespaceGroupTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/AntiAffinityNamespaceGroupTest.java
index cca562d..1429c73 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/AntiAffinityNamespaceGroupTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/AntiAffinityNamespaceGroupTest.java
@@ -22,13 +22,11 @@ import static org.testng.Assert.assertEquals;
 import static org.testng.Assert.assertFalse;
 import static org.testng.Assert.assertNotEquals;
 import static org.testng.Assert.assertTrue;
-
 import com.beust.jcommander.internal.Maps;
 import com.google.common.collect.BoundType;
 import com.google.common.collect.Range;
 import com.google.common.collect.Sets;
 import com.google.common.hash.Hashing;
-
 import java.lang.reflect.Field;
 import java.net.URL;
 import java.util.Collections;
@@ -40,7 +38,6 @@ import java.util.concurrent.ExecutorService;
 import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.ThreadPoolExecutor;
 import java.util.concurrent.TimeUnit;
-
 import lombok.Cleanup;
 import org.apache.bookkeeper.util.ZkUtils;
 import org.apache.pulsar.broker.PulsarService;
@@ -57,9 +54,7 @@ import org.apache.pulsar.common.naming.NamespaceBundles;
 import org.apache.pulsar.common.naming.NamespaceName;
 import org.apache.pulsar.common.naming.ServiceUnitId;
 import org.apache.pulsar.common.policies.data.ClusterData;
-import org.apache.pulsar.common.policies.data.ClusterDataImpl;
 import org.apache.pulsar.common.policies.data.FailureDomain;
-import org.apache.pulsar.common.policies.data.FailureDomainImpl;
 import org.apache.pulsar.common.policies.data.TenantInfoImpl;
 import org.apache.pulsar.common.util.ObjectMapperFactory;
 import org.apache.pulsar.common.util.collections.ConcurrentOpenHashMap;
@@ -68,6 +63,7 @@ import org.apache.pulsar.zookeeper.LocalBookkeeperEnsemble;
 import org.apache.zookeeper.CreateMode;
 import org.apache.zookeeper.ZooDefs.Ids;
 import org.apache.zookeeper.ZooKeeper;
+import org.awaitility.Awaitility;
 import org.testng.annotations.AfterMethod;
 import org.testng.annotations.BeforeMethod;
 import org.testng.annotations.Test;
@@ -152,7 +148,11 @@ public class AntiAffinityNamespaceGroupTest {
         primaryLoadManager = (ModularLoadManagerImpl) 
getField(pulsar1.getLoadManager().get(), "loadManager");
         secondaryLoadManager = (ModularLoadManagerImpl) 
getField(pulsar2.getLoadManager().get(), "loadManager");
         nsFactory = new NamespaceBundleFactory(pulsar1, Hashing.crc32());
-        Thread.sleep(100);
+
+        Awaitility.await().untilAsserted(() -> {
+            assertEquals(pulsar1.getState(), PulsarService.State.Started);
+            assertEquals(pulsar2.getState(), PulsarService.State.Started);
+        });
     }
 
     @AfterMethod(alwaysRun = true)
@@ -423,16 +423,10 @@ public class AntiAffinityNamespaceGroupTest {
         admin1.namespaces().setNamespaceAntiAffinityGroup(namespace2, 
namespaceAntiAffinityGroup);
 
         // validate strategically if brokerToDomainCache updated
-        for (int i = 0; i < 5; i++) {
-            if (!isLoadManagerUpdatedDomainCache(primaryLoadManager)
-                    || !isLoadManagerUpdatedDomainCache(secondaryLoadManager)) 
{
-                Thread.sleep(200);
-            } else {
-                break;
-            }
-        }
-        assertTrue(isLoadManagerUpdatedDomainCache(primaryLoadManager));
-        assertTrue(isLoadManagerUpdatedDomainCache(secondaryLoadManager));
+        Awaitility.await().untilAsserted(() -> {
+            assertTrue(isLoadManagerUpdatedDomainCache(primaryLoadManager));
+            assertTrue(isLoadManagerUpdatedDomainCache(secondaryLoadManager));
+        });
 
         ServiceUnitId serviceUnit1 = makeBundle(tenant, cluster, "ns1");
         String selectedBroker1 = 
primaryLoadManager.selectBrokerForAssignment(serviceUnit1).get();
@@ -554,4 +548,4 @@ public class AntiAffinityNamespaceGroupTest {
                         BoundType.CLOSED));
     }
 
-}
\ No newline at end of file
+}
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/ModularLoadManagerImplTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/ModularLoadManagerImplTest.java
index 9225443..62b6a52 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/ModularLoadManagerImplTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/ModularLoadManagerImplTest.java
@@ -26,13 +26,11 @@ import static org.testng.Assert.assertEquals;
 import static org.testng.Assert.assertFalse;
 import static org.testng.Assert.assertNotEquals;
 import static org.testng.Assert.assertTrue;
-
 import com.fasterxml.jackson.databind.ObjectMapper;
 import com.google.common.collect.BoundType;
 import com.google.common.collect.Range;
 import com.google.common.collect.Sets;
 import com.google.common.hash.Hashing;
-
 import java.lang.reflect.Field;
 import java.lang.reflect.Method;
 import java.net.URL;
@@ -47,9 +45,7 @@ import java.util.concurrent.ThreadPoolExecutor;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicReference;
 import java.util.function.Supplier;
-
 import lombok.extern.slf4j.Slf4j;
-
 import org.apache.pulsar.broker.BrokerData;
 import org.apache.pulsar.broker.BundleData;
 import org.apache.pulsar.broker.PulsarServerException;
@@ -69,7 +65,6 @@ import org.apache.pulsar.common.naming.NamespaceBundles;
 import org.apache.pulsar.common.naming.NamespaceName;
 import org.apache.pulsar.common.naming.ServiceUnitId;
 import org.apache.pulsar.common.policies.data.ClusterData;
-import org.apache.pulsar.common.policies.data.ClusterDataImpl;
 import org.apache.pulsar.common.policies.data.NamespaceIsolationDataImpl;
 import org.apache.pulsar.common.policies.data.TenantInfoImpl;
 import org.apache.pulsar.common.util.ObjectMapperFactory;
@@ -238,11 +233,8 @@ public class ModularLoadManagerImplTest {
 
         LoadData loadData = (LoadData) getField(primaryLoadManager, 
"loadData");
 
-        // Give some time for the watch to fire.
-        Thread.sleep(500);
-
         // Make sure the second broker is not in the internal map.
-        assertFalse(loadData.getBrokerData().containsKey(secondaryHost));
+        Awaitility.await().untilAsserted(() -> 
assertFalse(loadData.getBrokerData().containsKey(secondaryHost)));
 
         // Try 5 more selections, ensure they all go to the first broker.
         for (int i = 2; i < 7; ++i) {
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/namespace/OwnershipCacheTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/namespace/OwnershipCacheTest.java
index 6c85853..1fe531c 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/namespace/OwnershipCacheTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/namespace/OwnershipCacheTest.java
@@ -55,6 +55,7 @@ import 
org.apache.pulsar.metadata.coordination.impl.CoordinationServiceImpl;
 import org.apache.pulsar.zookeeper.ZookeeperServerTest;
 import org.apache.zookeeper.MockZooKeeper;
 import org.apache.zookeeper.ZooKeeper;
+import org.awaitility.Awaitility;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.testng.annotations.AfterMethod;
@@ -354,12 +355,10 @@ public class OwnershipCacheTest {
         assertFalse(data1.isDisabled());
         assertEquals(cache.getOwnedBundles().size(), 1);
         cache.removeOwnership(bundle);
-        Thread.sleep(500);
-        assertTrue(cache.getOwnedBundles().isEmpty());
-
-        Thread.sleep(500);
-
-        assertFalse(store.exists(ServiceUnitUtils.path(bundle)).join());
+        Awaitility.await().untilAsserted(() -> {
+            assertTrue(cache.getOwnedBundles().isEmpty());
+            assertFalse(store.exists(ServiceUnitUtils.path(bundle)).join());
+        });
     }
 
     @Test
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/GracefulExecutorServicesShutdownTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/GracefulExecutorServicesShutdownTest.java
index a784608..de97bd7 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/GracefulExecutorServicesShutdownTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/GracefulExecutorServicesShutdownTest.java
@@ -36,7 +36,6 @@ import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
 import java.util.concurrent.atomic.AtomicBoolean;
 import org.testng.annotations.Test;
-import org.awaitility.Awaitility;
 
 public class GracefulExecutorServicesShutdownTest {
 
@@ -170,4 +169,4 @@ public class GracefulExecutorServicesShutdownTest {
                 .handle();
         assertTrue(future.isDone());
     }
-}
\ No newline at end of file
+}
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorSubscriptionTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorSubscriptionTest.java
index 87cc281..58d5f8e 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorSubscriptionTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorSubscriptionTest.java
@@ -26,7 +26,6 @@ import com.google.common.collect.Sets;
 import java.nio.charset.StandardCharsets;
 import java.util.ArrayList;
 import java.util.LinkedHashSet;
-import java.util.Optional;
 import java.util.Set;
 import java.util.concurrent.TimeUnit;
 import lombok.Cleanup;
@@ -43,6 +42,7 @@ import org.apache.pulsar.client.api.PulsarClientException;
 import org.apache.pulsar.client.api.Schema;
 import org.apache.pulsar.common.policies.data.PartitionedTopicStats;
 import org.apache.pulsar.common.policies.data.TopicStats;
+import org.awaitility.Awaitility;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.testng.annotations.AfterClass;
@@ -276,9 +276,10 @@ public class ReplicatorSubscriptionTest extends 
ReplicatorTestBase {
 
         // Unload topic in r1
         admin1.topics().unload(topicName);
-        Thread.sleep(1000);
-        stats = admin1.topics().getStats(topicName);
-        assertFalse(stats.getSubscriptions().get(subName).isReplicated());
+        Awaitility.await().untilAsserted(() -> {
+            TopicStats stats2 = admin1.topics().getStats(topicName);
+            assertFalse(stats2.getSubscriptions().get(subName).isReplicated());
+        });
 
         // Make sure the replicated subscription is actually disabled
         final int numMessages = 20;
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ServerCnxTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ServerCnxTest.java
index 84af70b..f16f7bb 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ServerCnxTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ServerCnxTest.java
@@ -1125,9 +1125,7 @@ public class ServerCnxTest {
         assertEquals(response.getClass(), CommandError.class);
         assertEquals(((CommandError) response).getRequestId(), 3);
 
-        while (serverCnx.hasConsumer(1)) {
-            Thread.sleep(10);
-        }
+        Awaitility.await().until(() -> !serverCnx.hasConsumer(1));
 
         ByteBuf subscribe3 = Commands.newSubscribe(successTopicName, //
                 successSubName, 1 /* consumer id */, 4 /* request id */, 
SubType.Exclusive, 0,
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/SubscriptionSeekTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/SubscriptionSeekTest.java
index 22e7135..08d7e9c 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/SubscriptionSeekTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/SubscriptionSeekTest.java
@@ -52,6 +52,7 @@ import org.apache.pulsar.client.impl.MessageIdImpl;
 import org.apache.pulsar.client.impl.TopicMessageIdImpl;
 import org.apache.pulsar.common.naming.TopicName;
 import org.apache.pulsar.common.util.RelativeTimeUtil;
+import org.awaitility.Awaitility;
 import org.testng.annotations.AfterClass;
 import org.testng.annotations.BeforeClass;
 import org.testng.annotations.Test;
@@ -102,11 +103,11 @@ public class SubscriptionSeekTest extends BrokerTestBase {
         assertEquals(sub.getNumberOfEntriesInBacklog(false), 0);
 
         // Wait for consumer to reconnect
-        Thread.sleep(500);
+        Awaitility.await().until(consumer::isConnected);
         consumer.seek(MessageId.earliest);
         assertEquals(sub.getNumberOfEntriesInBacklog(false), 10);
 
-        Thread.sleep(500);
+        Awaitility.await().until(consumer::isConnected);
         consumer.seek(messageIds.get(5));
         assertEquals(sub.getNumberOfEntriesInBacklog(false), 5);
 
@@ -118,11 +119,11 @@ public class SubscriptionSeekTest extends BrokerTestBase {
 
         log.info("MessageId {}: beforeEarliest: {}, afterLatest: {}", 
messageId, beforeEarliest, afterLatest);
 
-        Thread.sleep(500);
+        Awaitility.await().until(consumer::isConnected);
         consumer.seek(beforeEarliest);
         assertEquals(sub.getNumberOfEntriesInBacklog(false), 10);
 
-        Thread.sleep(500);
+        Awaitility.await().until(consumer::isConnected);
         consumer.seek(afterLatest);
         assertEquals(sub.getNumberOfEntriesInBacklog(false), 0);
     }
@@ -221,43 +222,43 @@ public class SubscriptionSeekTest extends BrokerTestBase {
         admin.topics().resetCursor(topicName, subscriptionName, 
MessageId.earliest);
 
         // Wait consumer reconnect
-        Thread.sleep(1000);
+        Awaitility.await().until(consumer::isConnected);
         Message<String> receiveBeforeEarliest = consumer.receive();
         assertEquals(receiveBeforeEarliest.getValue(), messages.get(0));
 
         admin.topics().resetCursor(topicName, subscriptionName, 
MessageId.latest);
         // Wait consumer reconnect
-        Thread.sleep(1000);
+        Awaitility.await().until(consumer::isConnected);
         Message<String> receiveAfterLatest = consumer.receive(1, 
TimeUnit.SECONDS);
         assertNull(receiveAfterLatest);
 
         admin.topics().resetCursor(topicName, subscriptionName, 
messageIds.get(0), true);
         // Wait consumer reconnect
-        Thread.sleep(1000);
+        Awaitility.await().until(consumer::isConnected);
         Message<String> received = consumer.receive();
         assertEquals(received.getMessageId(), messageIds.get(1));
 
         admin.topics().resetCursor(topicName, subscriptionName, 
messageIds.get(0), false);
         // Wait consumer reconnect
-        Thread.sleep(1000);
+        Awaitility.await().until(consumer::isConnected);
         received = consumer.receive();
         assertEquals(received.getMessageId(), messageIds.get(0));
 
         admin.topics().resetCursor(topicName, subscriptionName, 
messageIds.get(messageIds.size() - 1), true);
         // Wait consumer reconnect
-        Thread.sleep(1000);
+        Awaitility.await().until(consumer::isConnected);
         received = consumer.receive(1, TimeUnit.SECONDS);
         assertNull(received);
 
         admin.topics().resetCursor(topicName, subscriptionName, 
messageIds.get(messageIds.size() - 1), false);
         // Wait consumer reconnect
-        Thread.sleep(1000);
+        Awaitility.await().until(consumer::isConnected);
         received = consumer.receive();
         assertEquals(received.getMessageId(), messageIds.get(messageIds.size() 
- 1));
 
         admin.topics().resetCursor(topicName, subscriptionName, new 
BatchMessageIdImpl(-1, -1, -1 ,10), true);
         // Wait consumer reconnect
-        Thread.sleep(1000);
+        Awaitility.await().until(consumer::isConnected);
         received = consumer.receive();
         assertEquals(received.getMessageId(), messageIds.get(0));
     }
@@ -358,7 +359,7 @@ public class SubscriptionSeekTest extends BrokerTestBase {
         assertEquals(sub.getNumberOfEntriesInBacklog(false), 0);
 
         // Wait for consumer to reconnect
-        Thread.sleep(1000);
+        Awaitility.await().until(consumer::isConnected);
         consumer.seek(currentTimestamp - resetTimeInMillis);
         assertEquals(sub.getNumberOfEntriesInBacklog(false), 10);
     }
@@ -456,7 +457,7 @@ public class SubscriptionSeekTest extends BrokerTestBase {
         assertEquals(backlogs, 0);
 
         // Wait for consumer to reconnect
-        Thread.sleep(1000);
+        Awaitility.await().until(consumer::isConnected);
         consumer.seek(currentTimestamp - resetTimeInMillis);
         backlogs = 0;
 
@@ -494,7 +495,7 @@ public class SubscriptionSeekTest extends BrokerTestBase {
         assertEquals(connectedSinceSet.size(), 2);
         consumer1.seek(MessageId.earliest);
         // Wait for consumer to reconnect
-        Thread.sleep(1000);
+        Awaitility.await().until(consumer1::isConnected);
 
         consumers = 
topicRef.getSubscriptions().get("my-subscription").getConsumers();
         assertEquals(consumers.size(), 2);
@@ -531,7 +532,7 @@ public class SubscriptionSeekTest extends BrokerTestBase {
         assertEquals(connectedSinceSet.size(), 2);
         consumer1.seek(MessageId.earliest);
         // Wait for consumer to reconnect
-        Thread.sleep(1000);
+        Awaitility.await().until(consumer1::isConnected);
 
         consumers = 
topicRef.getSubscriptions().get("my-subscription").getConsumers();
         assertEquals(consumers.size(), 2);
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/TopicTerminationTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/TopicTerminationTest.java
index a7a1111..0cd84f3 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/TopicTerminationTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/TopicTerminationTest.java
@@ -42,6 +42,7 @@ import org.apache.pulsar.client.api.PulsarClientException;
 import org.apache.pulsar.client.api.Reader;
 import org.apache.pulsar.client.api.ReaderListener;
 import org.apache.pulsar.common.util.FutureUtil;
+import org.awaitility.Awaitility;
 import org.testng.annotations.AfterMethod;
 import org.testng.annotations.BeforeMethod;
 import org.testng.annotations.Test;
@@ -220,8 +221,7 @@ public class TopicTerminationTest extends BrokerTestBase {
         Message<byte[]> msg4 = consumer.receive(100, TimeUnit.MILLISECONDS);
         assertNull(msg4);
 
-        Thread.sleep(100);
-        assertTrue(consumer.hasReachedEndOfTopic());
+        Awaitility.await().untilAsserted(() -> 
assertTrue(consumer.hasReachedEndOfTopic()));
     }
 
     @Test(timeOut = 20000)
@@ -254,8 +254,7 @@ public class TopicTerminationTest extends BrokerTestBase {
 
         consumer.acknowledgeCumulative(msgId3);
 
-        Thread.sleep(100);
-        assertFalse(consumer.hasReachedEndOfTopic());
+        Awaitility.await().untilAsserted(() -> 
assertFalse(consumer.hasReachedEndOfTopic()));
 
         MessageId lastMessageId = 
admin.topics().terminateTopicAsync(topicName).get();
         assertEquals(lastMessageId, msgId3);
@@ -292,8 +291,7 @@ public class TopicTerminationTest extends BrokerTestBase {
         Message<byte[]> msg4 = reader.readNext(100, TimeUnit.MILLISECONDS);
         assertNull(msg4);
 
-        Thread.sleep(100);
-        assertTrue(reader.hasReachedEndOfTopic());
+        Awaitility.await().untilAsserted(() -> 
assertTrue(reader.hasReachedEndOfTopic()));
     }
 
     @Test(timeOut = 20000)
@@ -323,8 +321,7 @@ public class TopicTerminationTest extends BrokerTestBase {
         /* MessageId msgId2 = */ producer.send("test-msg-2".getBytes());
         MessageId msgId3 = producer.send("test-msg-3".getBytes());
 
-        Thread.sleep(100);
-        assertFalse(reader.hasReachedEndOfTopic());
+        Awaitility.await().untilAsserted(() -> 
assertFalse(reader.hasReachedEndOfTopic()));
 
         MessageId lastMessageId = 
admin.topics().terminateTopicAsync(topicName).get();
         assertEquals(lastMessageId, msgId3);
@@ -348,8 +345,7 @@ public class TopicTerminationTest extends BrokerTestBase {
         org.apache.pulsar.client.api.Consumer<byte[]> consumer = 
pulsarClient.newConsumer().topic(topicName)
                 .subscriptionName("my-sub").subscribe();
 
-        Thread.sleep(200);
-        assertTrue(consumer.hasReachedEndOfTopic());
+        Awaitility.await().untilAsserted(() -> 
assertTrue(consumer.hasReachedEndOfTopic()));
     }
 
     @Test(timeOut = 20000)
@@ -363,7 +359,6 @@ public class TopicTerminationTest extends BrokerTestBase {
         org.apache.pulsar.client.api.Consumer<byte[]> consumer = 
pulsarClient.newConsumer().topic(topicName)
                 .subscriptionName("my-sub").subscribe();
 
-        Thread.sleep(200);
-        assertTrue(consumer.hasReachedEndOfTopic());
+        Awaitility.await().untilAsserted(() -> 
assertTrue(consumer.hasReachedEndOfTopic()));
     }
 }
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionProduceTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionProduceTest.java
index e66525e..13e8c69 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionProduceTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionProduceTest.java
@@ -19,9 +19,7 @@
 package org.apache.pulsar.broker.transaction;
 
 import static java.nio.charset.StandardCharsets.UTF_8;
-
 import com.google.common.collect.Sets;
-
 import java.lang.reflect.Field;
 import java.util.ArrayList;
 import java.util.HashSet;
@@ -31,10 +29,8 @@ import java.util.Set;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
-
 import lombok.Cleanup;
 import lombok.extern.slf4j.Slf4j;
-
 import org.apache.bookkeeper.mledger.Entry;
 import org.apache.bookkeeper.mledger.ManagedLedgerConfig;
 import org.apache.bookkeeper.mledger.ReadOnlyCursor;
@@ -57,7 +53,6 @@ import org.apache.pulsar.common.api.proto.MessageMetadata;
 import org.apache.pulsar.common.naming.NamespaceName;
 import org.apache.pulsar.common.naming.TopicName;
 import org.apache.pulsar.common.policies.data.ClusterData;
-import org.apache.pulsar.common.policies.data.ClusterDataImpl;
 import org.apache.pulsar.common.policies.data.TenantInfoImpl;
 import org.apache.pulsar.common.protocol.Commands;
 import org.awaitility.Awaitility;
@@ -289,10 +284,9 @@ public class TransactionProduceTest extends 
TransactionTestBase {
             consumer.acknowledgeAsync(message.getMessageId(), txn);
         }
 
-        Thread.sleep(1000);
-
         // The pending messages count should be the incomingMessageCnt
-        Assert.assertEquals(getPendingAckCount(ACK_COMMIT_TOPIC, 
subscriptionName), incomingMessageCnt);
+        Awaitility.await().untilAsserted(
+                () -> Assert.assertEquals(getPendingAckCount(ACK_COMMIT_TOPIC, 
subscriptionName), incomingMessageCnt));
 
         consumer.redeliverUnacknowledgedMessages();
         Message<byte[]> message = consumer.receive(2, TimeUnit.SECONDS);
@@ -303,10 +297,9 @@ public class TransactionProduceTest extends 
TransactionTestBase {
 
         txn.commit().get();
 
-        Thread.sleep(1000);
-
         // After commit, the pending messages count should be 0
-        Assert.assertEquals(getPendingAckCount(ACK_COMMIT_TOPIC, 
subscriptionName), 0);
+        Awaitility.await().untilAsserted(
+                () -> Assert.assertEquals(getPendingAckCount(ACK_COMMIT_TOPIC, 
subscriptionName), 0));
 
         consumer.redeliverUnacknowledgedMessages();
         for (int i = 0; i < incomingMessageCnt; i++) {
@@ -352,10 +345,9 @@ public class TransactionProduceTest extends 
TransactionTestBase {
             consumer.acknowledgeAsync(message.getMessageId(), txn);
         }
 
-        Thread.sleep(1000);
-
         // The pending messages count should be the incomingMessageCnt
-        Assert.assertEquals(getPendingAckCount(ACK_ABORT_TOPIC, 
subscriptionName), incomingMessageCnt);
+        Awaitility.await().untilAsserted(
+                () -> Assert.assertEquals(getPendingAckCount(ACK_ABORT_TOPIC, 
subscriptionName), incomingMessageCnt));
 
         consumer.redeliverUnacknowledgedMessages();
         Message<byte[]> message = consumer.receive(2, TimeUnit.SECONDS);
@@ -366,10 +358,9 @@ public class TransactionProduceTest extends 
TransactionTestBase {
 
         txn.abort().get();
 
-        Thread.sleep(1000);
-
         // After commit, the pending messages count should be 0
-        Assert.assertEquals(getPendingAckCount(ACK_ABORT_TOPIC, 
subscriptionName), 0);
+        Awaitility.await().untilAsserted(
+                () -> Assert.assertEquals(getPendingAckCount(ACK_ABORT_TOPIC, 
subscriptionName), 0));
 
         consumer.redeliverUnacknowledgedMessages();
         for (int i = 0; i < incomingMessageCnt; i++) {
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/NonPersistentTopicTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/NonPersistentTopicTest.java
index 1e4b1de..d3bdea2 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/NonPersistentTopicTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/NonPersistentTopicTest.java
@@ -26,9 +26,7 @@ import static org.testng.Assert.assertNull;
 import static org.testng.Assert.assertThrows;
 import static org.testng.Assert.assertTrue;
 import static org.testng.Assert.fail;
-
 import com.google.common.collect.Sets;
-
 import java.lang.reflect.Field;
 import java.net.URL;
 import java.util.Optional;
@@ -41,7 +39,6 @@ import java.util.concurrent.ThreadPoolExecutor;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicReference;
-
 import lombok.Cleanup;
 import org.apache.pulsar.broker.PulsarService;
 import org.apache.pulsar.broker.ServiceConfiguration;
@@ -60,7 +57,6 @@ import org.apache.pulsar.client.impl.ProducerImpl;
 import org.apache.pulsar.common.naming.NamespaceBundle;
 import org.apache.pulsar.common.naming.TopicName;
 import org.apache.pulsar.common.policies.data.ClusterData;
-import org.apache.pulsar.common.policies.data.ClusterDataImpl;
 import org.apache.pulsar.common.policies.data.NonPersistentPublisherStats;
 import org.apache.pulsar.common.policies.data.NonPersistentSubscriptionStats;
 import org.apache.pulsar.common.policies.data.NonPersistentTopicStats;
@@ -170,8 +166,8 @@ public class NonPersistentTopicTest extends 
ProducerConsumerBase {
         for (int i = 0; i < totalProduceMsg; i++) {
             String message = "my-message-" + i;
             producer.send(message.getBytes());
-            Thread.sleep(10);
         }
+        producer.flush();
 
         Message<?> msg = null;
         Set<String> messageSet = Sets.newHashSet();
@@ -213,8 +209,8 @@ public class NonPersistentTopicTest extends 
ProducerConsumerBase {
         for (int i = 0; i < totalProduceMsg; i++) {
             String message = "my-message-" + i;
             producer.send(message.getBytes());
-            Thread.sleep(10);
         }
+        producer.flush();
 
         Message<?> msg = null;
         Set<String> messageSet = Sets.newHashSet();
@@ -269,8 +265,8 @@ public class NonPersistentTopicTest extends 
ProducerConsumerBase {
         for (int i = 0; i < totalProduceMsg; i++) {
             String message = "my-message-" + i;
             producer.send(message.getBytes());
-            Thread.sleep(10);
         }
+        producer.flush();
 
         Message<?> msg = null;
         Set<String> messageSet = Sets.newHashSet();
@@ -311,8 +307,8 @@ public class NonPersistentTopicTest extends 
ProducerConsumerBase {
         for (int i = 0; i < totalProduceMsg; i++) {
             String message = "my-message-" + i;
             producer.send(message.getBytes());
-            Thread.sleep(10);
         }
+        producer.flush();
 
         Message<?> msg = null;
         Set<String> messageSet = Sets.newHashSet();
@@ -421,8 +417,8 @@ public class NonPersistentTopicTest extends 
ProducerConsumerBase {
         for (int i = 0; i < totalProduceMsg; i++) {
             String message = "my-message-" + i;
             producer.send(message.getBytes());
-            Thread.sleep(10);
         }
+        producer.flush();
 
         // consume from shared-subscriptions
         Message<?> msg = null;
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleProducerConsumerTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleProducerConsumerTest.java
index 5ebb2fb..04516c0 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleProducerConsumerTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleProducerConsumerTest.java
@@ -453,8 +453,8 @@ public class SimpleProducerConsumerTest extends 
ProducerConsumerBase {
         assertTrue(latch.get().await(receiverQueueSize, TimeUnit.SECONDS), 
"Timed out waiting for message listener acks");
 
         log.info("Giving message listener an opportunity to receive messages 
while paused");
-        Thread.sleep(2000);     // hopefully this is long enough
-        assertEquals(received.intValue(), receiverQueueSize, "Consumer 
received messages while paused");
+        Awaitility.await().untilAsserted(
+                () -> assertEquals(received.intValue(), receiverQueueSize, 
"Consumer received messages while paused"));
 
         latch.set(new CountDownLatch(receiverQueueSize));
 
@@ -498,8 +498,9 @@ public class SimpleProducerConsumerTest extends 
ProducerConsumerBase {
 
         // Make sure no flow permits are sent when the consumer reconnects to 
the topic
         admin.topics().unload(topicName);
-        Thread.sleep(2000);
-        assertEquals(received.intValue(), receiverQueueSize, "Consumer 
received messages while paused");
+        Awaitility.await().untilAsserted(
+                () -> assertEquals(received.intValue(), receiverQueueSize, 
"Consumer received messages while paused"));
+
 
         latch.set(new CountDownLatch(receiverQueueSize));
         consumer.resume();
@@ -1216,22 +1217,26 @@ public class SimpleProducerConsumerTest extends 
ProducerConsumerBase {
              CompletableFuture<MessageId> future = 
producer.newMessage().sequenceId(i).value(message.getBytes()).sendAsync();
              futures.add(future);
         }
-        Thread.sleep(3000);
-        futures.get(0).exceptionally(ex -> {
-            long sequenceId = ((PulsarClientException) 
ex.getCause()).getSequenceId();
-            Assert.assertEquals(sequenceId, 0L);
-            return null;
-        });
-        futures.get(1).exceptionally(ex -> {
-            long sequenceId = ((PulsarClientException) 
ex.getCause()).getSequenceId();
-            Assert.assertEquals(sequenceId, 1L);
-            return null;
-        });
-        futures.get(2).exceptionally(ex -> {
-            long sequenceId = ((PulsarClientException) 
ex.getCause()).getSequenceId();
-            Assert.assertEquals(sequenceId, 2L);
-            return null;
+        Awaitility.await().until(() -> {
+            futures.get(0).exceptionally(ex -> {
+                long sequenceId = ((PulsarClientException) 
ex.getCause()).getSequenceId();
+                Assert.assertEquals(sequenceId, 0L);
+                return null;
+            });
+            futures.get(1).exceptionally(ex -> {
+                long sequenceId = ((PulsarClientException) 
ex.getCause()).getSequenceId();
+                Assert.assertEquals(sequenceId, 1L);
+                return null;
+            });
+            futures.get(2).exceptionally(ex -> {
+                long sequenceId = ((PulsarClientException) 
ex.getCause()).getSequenceId();
+                Assert.assertEquals(sequenceId, 2L);
+                return null;
+            });
+
+            return true;
         });
+
         log.info("-- Exiting {} test --", methodName);
     }
 
@@ -1639,12 +1644,10 @@ public class SimpleProducerConsumerTest extends 
ProducerConsumerBase {
             }
 
             // (2) wait for consumer to receive messages
-            Thread.sleep(1000);
-            assertEquals(consumer.numMessagesInQueue(), receiverQueueSize);
+            Awaitility.await().untilAsserted(() -> 
assertEquals(consumer.numMessagesInQueue(), receiverQueueSize));
 
             // (3) wait for messages to expire, we should've received more
-            Thread.sleep(2000);
-            assertEquals(consumer.numMessagesInQueue(), receiverQueueSize);
+            Awaitility.await().untilAsserted(() -> 
assertEquals(consumer.numMessagesInQueue(), receiverQueueSize));
 
             for (int i = 0; i < totalProducedMsgs; i++) {
                 Message<byte[]> msg = 
consumer.receive(RECEIVE_TIMEOUT_SECONDS, TimeUnit.SECONDS);
@@ -2065,8 +2068,8 @@ public class SimpleProducerConsumerTest extends 
ProducerConsumerBase {
             for (int i = 0; i < totalProducedMsgs; i++) {
                 String message = "my-message-" + i;
                 producer.send(message.getBytes());
-                Thread.sleep(10);
             }
+            producer.flush();
 
             // (1.a) start consumer again
             consumer = (ConsumerImpl<byte[]>) pulsarClient.newConsumer()
@@ -2348,8 +2351,8 @@ public class SimpleProducerConsumerTest extends 
ProducerConsumerBase {
         for (int i = 0; i < receiverQueueSize; i++) {
             String message = "my-message-" + i;
             producer.send(message.getBytes());
-            Thread.sleep(10);
         }
+        producer.flush();
         // (1.a) consume first consumeMsgInParts msgs and trigger redeliver
         Message<byte[]> msg;
         List<Message<byte[]>> messages1 = Lists.newArrayList();
@@ -2387,8 +2390,8 @@ public class SimpleProducerConsumerTest extends 
ProducerConsumerBase {
         for (int i = 0; i < receiverQueueSize; i++) {
             String message = "my-message-" + i;
             producer.send(message.getBytes());
-            Thread.sleep(100);
         }
+        producer.flush();
 
         int remainingMsgs = (2 * receiverQueueSize) - (2 * consumeMsgInParts);
         messages1.clear();
@@ -3190,9 +3193,7 @@ public class SimpleProducerConsumerTest extends 
ProducerConsumerBase {
         admin.topics().updatePartitionedTopic(topicName, 3);
 
         // 4. wait for client to update partitions
-        while(((MultiTopicsConsumerImpl)consumer).getConsumers().size() <= 1) {
-            Thread.sleep(1);
-        }
+        Awaitility.await().until(() -> ((MultiTopicsConsumerImpl) 
consumer).getConsumers().size() <= 1);
 
         // 5. produce 5 more messages
         for (int i = 5; i < 10; i++) {
@@ -3288,7 +3289,7 @@ public class SimpleProducerConsumerTest extends 
ProducerConsumerBase {
         }
 
         // 6. should not consume any messages
-        assertNull(consumer.receive(RECEIVE_TIMEOUT_SECONDS, 
TimeUnit.SECONDS));
+        Awaitility.await().untilAsserted(() -> 
assertNull(consumer.receive(RECEIVE_TIMEOUT_SECONDS, TimeUnit.SECONDS)));
 
         // 7. resume multi-topic consumer
         consumer.resume();
@@ -4135,4 +4136,4 @@ public class SimpleProducerConsumerTest extends 
ProducerConsumerBase {
         blockedMessageLatch.countDown();
         log.info("-- Exiting {} test --", methodName);
     }
-}
\ No newline at end of file
+}

Reply via email to