This is an automated email from the ASF dual-hosted git repository. eolivelli pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push: new 3ba8d20 [tests] use Awaitility replace Thread.sleep for pulsar-broker. (#11281) 3ba8d20 is described below commit 3ba8d202246996416e34b325f34a1965510cd87e Author: YANGLiiN <ie...@qq.com> AuthorDate: Mon Jul 19 22:12:01 2021 +0800 [tests] use Awaitility replace Thread.sleep for pulsar-broker. (#11281) --- .../bookkeeper/mledger/impl/ManagedCursorTest.java | 4 +- .../broker/admin/AdminApiGetLastMessageIdTest.java | 34 ++++++------ .../broker/admin/AdminApiSchemaAutoUpdateTest.java | 16 ++---- .../apache/pulsar/broker/admin/AdminApiTest.java | 45 ++++------------ .../apache/pulsar/broker/admin/AdminApiTest2.java | 50 ++++++++--------- .../apache/pulsar/broker/admin/NamespacesTest.java | 8 ++- .../pulsar/broker/admin/v1/V1_AdminApiTest2.java | 18 ++----- .../AntiAffinityNamespaceGroupTest.java | 28 ++++------ .../loadbalance/ModularLoadManagerImplTest.java | 10 +--- .../broker/namespace/OwnershipCacheTest.java | 11 ++-- .../GracefulExecutorServicesShutdownTest.java | 3 +- .../broker/service/ReplicatorSubscriptionTest.java | 9 ++-- .../pulsar/broker/service/ServerCnxTest.java | 4 +- .../broker/service/SubscriptionSeekTest.java | 31 +++++------ .../broker/service/TopicTerminationTest.java | 19 +++---- .../broker/transaction/TransactionProduceTest.java | 25 +++------ .../pulsar/client/api/NonPersistentTopicTest.java | 14 ++--- .../client/api/SimpleProducerConsumerTest.java | 63 +++++++++++----------- 18 files changed, 156 insertions(+), 236 deletions(-) diff --git a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorTest.java b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorTest.java index 2a648ea..422ce53 100644 --- a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorTest.java +++ b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorTest.java @@ -946,9 +946,7 @@ public class ManagedCursorTest extends MockedBookKeeperTestCase { ledger.addEntry("dummy-entry-7".getBytes(Encoding)); // Verify that GC trimming kicks in - while (ledger.getNumberOfEntries() > 2) { - Thread.sleep(10); - } + Awaitility.await().until(() -> ledger.getNumberOfEntries() <= 2); } @Test(timeOut = 20000) diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiGetLastMessageIdTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiGetLastMessageIdTest.java index 3e85523..cba0686 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiGetLastMessageIdTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiGetLastMessageIdTest.java @@ -18,7 +18,21 @@ */ package org.apache.pulsar.broker.admin; +import static org.mockito.Mockito.doNothing; +import static org.mockito.Mockito.doReturn; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.spy; import com.google.common.collect.Sets; +import java.lang.reflect.Field; +import java.util.Collection; +import java.util.Date; +import java.util.Map; +import java.util.UUID; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.TimeUnit; +import javax.ws.rs.container.AsyncResponse; +import javax.ws.rs.container.TimeoutHandler; +import javax.ws.rs.core.UriInfo; import org.apache.pulsar.broker.admin.v2.PersistentTopics; import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest; import org.apache.pulsar.broker.authentication.AuthenticationDataHttps; @@ -41,22 +55,6 @@ import org.testng.annotations.BeforeClass; import org.testng.annotations.BeforeMethod; import org.testng.annotations.Test; -import javax.ws.rs.container.AsyncResponse; -import javax.ws.rs.container.TimeoutHandler; -import javax.ws.rs.core.UriInfo; -import java.lang.reflect.Field; -import java.util.Collection; -import java.util.Date; -import java.util.Map; -import java.util.UUID; -import java.util.concurrent.CompletableFuture; -import java.util.concurrent.TimeUnit; - -import static org.mockito.Mockito.doNothing; -import static org.mockito.Mockito.doReturn; -import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.spy; - @Test(groups = "broker") public class AdminApiGetLastMessageIdTest extends MockedPulsarServiceBaseTest { @@ -204,9 +202,7 @@ public class AdminApiGetLastMessageIdTest extends MockedPulsarServiceBaseTest { } persistentTopics.getLastMessageId(asyncResponse, "prop", "ns-abc", "my-topic", true); - while (id[0] == null) { - Thread.sleep(1); - } + Awaitility.await().until(() -> id[0] != null); Assert.assertTrue(((MessageIdImpl)id[0]).getLedgerId() >= 0); Assert.assertEquals(numberOfMessages-1, ((MessageIdImpl)id[0]).getEntryId()); messageId = id[0]; diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiSchemaAutoUpdateTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiSchemaAutoUpdateTest.java index ec2b1e8..891e7ac 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiSchemaAutoUpdateTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiSchemaAutoUpdateTest.java @@ -18,25 +18,20 @@ */ package org.apache.pulsar.broker.admin; -import org.apache.avro.reflect.AvroAlias; -import org.apache.avro.reflect.AvroDefault; import com.google.common.collect.Sets; - import java.lang.reflect.Field; - import lombok.extern.slf4j.Slf4j; - +import org.apache.avro.reflect.AvroAlias; +import org.apache.avro.reflect.AvroDefault; import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest; import org.apache.pulsar.broker.service.Topic; -import org.apache.pulsar.common.policies.data.ClusterData; -import org.apache.pulsar.common.policies.data.SchemaCompatibilityStrategy; import org.apache.pulsar.client.api.Producer; import org.apache.pulsar.client.api.PulsarClientException; import org.apache.pulsar.client.api.Schema; -import org.apache.pulsar.common.policies.data.ClusterDataImpl; -import org.apache.pulsar.common.policies.data.TenantInfoImpl; +import org.apache.pulsar.common.policies.data.ClusterData; import org.apache.pulsar.common.policies.data.SchemaAutoUpdateCompatibilityStrategy; - +import org.apache.pulsar.common.policies.data.SchemaCompatibilityStrategy; +import org.apache.pulsar.common.policies.data.TenantInfoImpl; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.testng.Assert; @@ -192,7 +187,6 @@ public class AdminApiSchemaAutoUpdateTest extends MockedPulsarServiceBaseTest { if (strategy.get(t) == SchemaCompatibilityStrategy.FULL) { break; } - Thread.sleep(100); } log.info("try with fully compat, again"); try (Producer<V4Data> p = pulsarClient.newProducer(Schema.AVRO(V4Data.class)).topic(topicName).create()) { diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiTest.java index 7ac114e..c1b8b66 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiTest.java @@ -263,9 +263,8 @@ public class AdminApiTest extends MockedPulsarServiceBaseTest { .build()); admin.clusters().deleteCluster("usw"); - Thread.sleep(300); - - assertEquals(admin.clusters().getClusters(), Lists.newArrayList("test")); + Awaitility.await() + .untilAsserted(() -> assertEquals(admin.clusters().getClusters(), Lists.newArrayList("test"))); admin.namespaces().deleteNamespace("prop-xyz/ns1"); admin.clusters().deleteCluster("test"); @@ -598,13 +597,7 @@ public class AdminApiTest extends MockedPulsarServiceBaseTest { ObjectMapperFactory.getThreadLocal().writeValueAsBytes(configMap), Optional.empty()).join(); // wait config to be updated - for (int i = 0; i < 5; i++) { - if (pulsar.getConfiguration().getBrokerShutdownTimeoutMs() != newValue) { - Thread.sleep(100 + (i * 10)); - } else { - break; - } - } + Awaitility.await().until(() -> pulsar.getConfiguration().getBrokerShutdownTimeoutMs() == newValue); // verify value is updated assertEquals(pulsar.getConfiguration().getBrokerShutdownTimeoutMs(), newValue); } @@ -1588,15 +1581,8 @@ public class AdminApiTest extends MockedPulsarServiceBaseTest { LOG.info("--- RELOAD ---"); // Force reload of namespace and wait for topic to be ready - for (int i = 0; i < 30; i++) { - try { - admin.topics().getStats("persistent://prop-xyz/ns1/ds2"); - break; - } catch (PulsarAdminException e) { - LOG.warn("Failed to get topic stats.. {}", e.getMessage()); - Thread.sleep(1000); - } - } + Awaitility.await().timeout(30, TimeUnit.SECONDS).ignoreExceptionsInstanceOf(PulsarAdminException.class) + .until(() -> admin.topics().getStats("persistent://prop-xyz/ns1/ds2") != null); admin.topics().deleteSubscription("persistent://prop-xyz/ns1/ds2", "my-sub"); admin.topics().delete("persistent://prop-xyz/ns1/ds2"); @@ -1646,15 +1632,8 @@ public class AdminApiTest extends MockedPulsarServiceBaseTest { LOG.info("--- RELOAD ---"); // Force reload of namespace and wait for topic to be ready - for (int i = 0; i < 30; i++) { - try { - admin.topics().getStats("persistent://prop-xyz/ns1-bundles/ds2"); - break; - } catch (PulsarAdminException e) { - LOG.warn("Failed to get topic stats.. {}", e.getMessage()); - Thread.sleep(1000); - } - } + Awaitility.await().timeout(30, TimeUnit.SECONDS).ignoreExceptionsInstanceOf(PulsarAdminException.class) + .until(() -> admin.topics().getStats("persistent://prop-xyz/ns1-bundles/ds2") != null); admin.topics().deleteSubscription("persistent://prop-xyz/ns1-bundles/ds2", "my-sub"); admin.topics().delete("persistent://prop-xyz/ns1-bundles/ds2"); @@ -2847,13 +2826,9 @@ public class AdminApiTest extends MockedPulsarServiceBaseTest { Assert.assertEquals((int) admin.namespaces().getSubscriptionExpirationTime(namespace2), 1); Assert.assertNull(admin.namespaces().getSubscriptionExpirationTime(namespace3)); - Thread.sleep(60000); - for (int i = 0; i < 60; i++) { - if (admin.topics().getSubscriptions(topic2).size() == 0) { - break; - } - Thread.sleep(1000); - } + + Awaitility.await().timeout(120, TimeUnit.SECONDS) + .until(() -> admin.topics().getSubscriptions(topic2).size() == 0); Assert.assertEquals(admin.topics().getSubscriptions(topic1).size(), 1); Assert.assertEquals(admin.topics().getSubscriptions(topic2).size(), 0); Assert.assertEquals(admin.topics().getSubscriptions(topic3).size(), 1); diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiTest2.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiTest2.java index b67939d..f07a2ca 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiTest2.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiTest2.java @@ -484,11 +484,7 @@ public class AdminApiTest2 extends MockedPulsarServiceBaseTest { // unload the topic unloadTopic(topicName); // producer will retry and recreate the topic - for (int i = 0; i < 5; i++) { - if (!pulsar.getBrokerService().getTopicReference(topicName).isPresent() || i != 4) { - Thread.sleep(200); - } - } + Awaitility.await().until(() -> pulsar.getBrokerService().getTopicReference(topicName).isPresent()); // topic should be loaded by this time topic = pulsar.getBrokerService().getTopicReference(topicName).get(); assertNotNull(topic); @@ -1255,12 +1251,12 @@ public class AdminApiTest2 extends MockedPulsarServiceBaseTest { consumer.acknowledge(message); // wait for ack send - Thread.sleep(500); - - // Consumer acks the message, so the precise backlog is 0 - topicStats = admin.topics().getStats(topic, true, true); - assertEquals(topicStats.getSubscriptions().get(subName).getBacklogSize(), 0); - assertEquals(topicStats.getSubscriptions().get(subName).getMsgBacklog(), 0); + Awaitility.await().untilAsserted(() -> { + // Consumer acks the message, so the precise backlog is 0 + TopicStats topicStats2 = admin.topics().getStats(topic, true, true); + assertEquals(topicStats2.getSubscriptions().get(subName).getBacklogSize(), 0); + assertEquals(topicStats2.getSubscriptions().get(subName).getMsgBacklog(), 0); + }); topicStats = admin.topics().getStats(topic); assertEquals(topicStats.getSubscriptions().get(subName).getMsgBacklog(), 9); @@ -1302,20 +1298,24 @@ public class AdminApiTest2 extends MockedPulsarServiceBaseTest { // Wait for messages to be tracked for delayed delivery. This happens // on the consumer dispatch side, so when the send() is complete we're // not yet guaranteed to see the stats updated. - Thread.sleep(500); + Awaitility.await().untilAsserted(() -> { + TopicStats topicStats = admin.topics().getStats(topic, true, true); + assertEquals(topicStats.getSubscriptions().get(subName).getMsgBacklog(), 10); + assertEquals(topicStats.getSubscriptions().get(subName).getMsgBacklogNoDelayed(), 5); + }); - TopicStats topicStats = admin.topics().getStats(topic, true, true); - assertEquals(topicStats.getSubscriptions().get(subName).getMsgBacklog(), 10); - assertEquals(topicStats.getSubscriptions().get(subName).getMsgBacklogNoDelayed(), 5); for (int i = 0; i < 5; i++) { consumer.acknowledge(consumer.receive()); } + // Wait the ack send. - Thread.sleep(500); - topicStats = admin.topics().getStats(topic, true, true); - assertEquals(topicStats.getSubscriptions().get(subName).getMsgBacklog(), 5); - assertEquals(topicStats.getSubscriptions().get(subName).getMsgBacklogNoDelayed(), 0); + Awaitility.await().untilAsserted(() -> { + TopicStats topicStats = admin.topics().getStats(topic, true, true); + assertEquals(topicStats.getSubscriptions().get(subName).getMsgBacklog(), 5); + assertEquals(topicStats.getSubscriptions().get(subName).getMsgBacklogNoDelayed(), 0); + }); + } @Test @@ -1403,11 +1403,13 @@ public class AdminApiTest2 extends MockedPulsarServiceBaseTest { consumer.acknowledge(consumer.receive()); } // Wait the ack send. - Thread.sleep(500); - topicStats = admin.topics().getPartitionedStats(topic, false, true, true); - assertEquals(topicStats.getSubscriptions().get(subName).getMsgBacklog(), 5); - assertEquals(topicStats.getSubscriptions().get(subName).getBacklogSize(), 238); - assertEquals(topicStats.getSubscriptions().get(subName).getMsgBacklogNoDelayed(), 0); + Awaitility.await().untilAsserted(() -> { + TopicStats topicStats2 = admin.topics().getPartitionedStats(topic, false, true, true); + assertEquals(topicStats2.getSubscriptions().get(subName).getMsgBacklog(), 5); + assertEquals(topicStats2.getSubscriptions().get(subName).getBacklogSize(), 238); + assertEquals(topicStats2.getSubscriptions().get(subName).getMsgBacklogNoDelayed(), 0); + }); + } @Test diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/NamespacesTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/NamespacesTest.java index b2eda82..de0ed94 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/NamespacesTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/NamespacesTest.java @@ -85,7 +85,6 @@ import org.apache.pulsar.common.naming.TopicName; import org.apache.pulsar.common.policies.data.AuthAction; import org.apache.pulsar.common.policies.data.BundlesData; import org.apache.pulsar.common.policies.data.ClusterData; -import org.apache.pulsar.common.policies.data.ClusterDataImpl; import org.apache.pulsar.common.policies.data.OffloadPoliciesImpl; import org.apache.pulsar.common.policies.data.PersistencePolicies; import org.apache.pulsar.common.policies.data.Policies; @@ -102,6 +101,7 @@ import org.apache.zookeeper.CreateMode; import org.apache.zookeeper.KeeperException.Code; import org.apache.zookeeper.MockZooKeeper; import org.apache.zookeeper.ZooDefs; +import org.awaitility.Awaitility; import org.mockito.ArgumentCaptor; import org.mockito.ArgumentMatcher; import org.mockito.Mockito; @@ -1246,8 +1246,7 @@ public class NamespacesTest extends MockedPulsarServiceBaseTest { // Subscribe Rate Limiter is enabled, will limited by broker pulsarClient.updateServiceUrl(lookupUrl.toString()); - Thread.sleep(1000L); - assertFalse(consumer.isConnected()); + Awaitility.await().untilAsserted(() -> assertFalse(consumer.isConnected())); // Out of limit period Thread.sleep(6000L); @@ -1258,8 +1257,7 @@ public class NamespacesTest extends MockedPulsarServiceBaseTest { subscribeRate = new SubscribeRate(0, 10); admin.namespaces().setSubscribeRate(namespace, subscribeRate); pulsarClient.updateServiceUrl(lookupUrl.toString()); - Thread.sleep(1000L); - assertTrue(consumer.isConnected()); + Awaitility.await().untilAsserted(() -> assertTrue(consumer.isConnected())); pulsar.getConfiguration().setAuthorizationEnabled(true); admin.topics().deletePartitionedTopic(topicName, true); admin.namespaces().deleteNamespace(namespace); diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/v1/V1_AdminApiTest2.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/v1/V1_AdminApiTest2.java index 607cdb6..fbbae43 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/v1/V1_AdminApiTest2.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/v1/V1_AdminApiTest2.java @@ -25,21 +25,16 @@ import static org.testng.Assert.assertNotNull; import static org.testng.Assert.assertNull; import static org.testng.Assert.assertTrue; import static org.testng.Assert.fail; - import com.google.common.collect.Lists; import com.google.common.collect.Maps; import com.google.common.collect.Sets; - import java.net.URL; -import java.util.Collections; import java.util.HashSet; import java.util.List; import java.util.Map; import java.util.Set; import java.util.concurrent.TimeUnit; - import lombok.Cleanup; - import org.apache.bookkeeper.mledger.impl.ManagedCursorImpl; import org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl; import org.apache.pulsar.broker.admin.v1.V1_AdminApiTest.MockedPulsarService; @@ -61,18 +56,17 @@ import org.apache.pulsar.client.impl.MessageIdImpl; import org.apache.pulsar.common.naming.TopicDomain; import org.apache.pulsar.common.naming.TopicName; import org.apache.pulsar.common.policies.data.ClusterData; -import org.apache.pulsar.common.policies.data.ClusterDataImpl; import org.apache.pulsar.common.policies.data.ConsumerStats; -import org.apache.pulsar.common.policies.data.FailureDomainImpl; import org.apache.pulsar.common.policies.data.FailureDomain; import org.apache.pulsar.common.policies.data.NonPersistentTopicStats; import org.apache.pulsar.common.policies.data.PartitionedTopicStats; import org.apache.pulsar.common.policies.data.PersistencePolicies; import org.apache.pulsar.common.policies.data.PersistentTopicInternalStats; -import org.apache.pulsar.common.policies.data.TopicStats; -import org.apache.pulsar.common.policies.data.TenantInfoImpl; import org.apache.pulsar.common.policies.data.RetentionPolicies; import org.apache.pulsar.common.policies.data.SubscriptionStats; +import org.apache.pulsar.common.policies.data.TenantInfoImpl; +import org.apache.pulsar.common.policies.data.TopicStats; +import org.awaitility.Awaitility; import org.testng.annotations.AfterMethod; import org.testng.annotations.BeforeMethod; import org.testng.annotations.DataProvider; @@ -397,11 +391,7 @@ public class V1_AdminApiTest2 extends MockedPulsarServiceBaseTest { // unload the topic unloadTopic(topicName, isPersistentTopic); // producer will retry and recreate the topic - for (int i = 0; i < 5; i++) { - if (!pulsar.getBrokerService().getTopicReference(topicName).isPresent() || i != 4) { - Thread.sleep(200); - } - } + Awaitility.await().until(() -> pulsar.getBrokerService().getTopicReference(topicName).isPresent()); // topic should be loaded by this time topic = pulsar.getBrokerService().getTopicReference(topicName).get(); assertNotNull(topic); diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/AntiAffinityNamespaceGroupTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/AntiAffinityNamespaceGroupTest.java index cca562d..1429c73 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/AntiAffinityNamespaceGroupTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/AntiAffinityNamespaceGroupTest.java @@ -22,13 +22,11 @@ import static org.testng.Assert.assertEquals; import static org.testng.Assert.assertFalse; import static org.testng.Assert.assertNotEquals; import static org.testng.Assert.assertTrue; - import com.beust.jcommander.internal.Maps; import com.google.common.collect.BoundType; import com.google.common.collect.Range; import com.google.common.collect.Sets; import com.google.common.hash.Hashing; - import java.lang.reflect.Field; import java.net.URL; import java.util.Collections; @@ -40,7 +38,6 @@ import java.util.concurrent.ExecutorService; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; - import lombok.Cleanup; import org.apache.bookkeeper.util.ZkUtils; import org.apache.pulsar.broker.PulsarService; @@ -57,9 +54,7 @@ import org.apache.pulsar.common.naming.NamespaceBundles; import org.apache.pulsar.common.naming.NamespaceName; import org.apache.pulsar.common.naming.ServiceUnitId; import org.apache.pulsar.common.policies.data.ClusterData; -import org.apache.pulsar.common.policies.data.ClusterDataImpl; import org.apache.pulsar.common.policies.data.FailureDomain; -import org.apache.pulsar.common.policies.data.FailureDomainImpl; import org.apache.pulsar.common.policies.data.TenantInfoImpl; import org.apache.pulsar.common.util.ObjectMapperFactory; import org.apache.pulsar.common.util.collections.ConcurrentOpenHashMap; @@ -68,6 +63,7 @@ import org.apache.pulsar.zookeeper.LocalBookkeeperEnsemble; import org.apache.zookeeper.CreateMode; import org.apache.zookeeper.ZooDefs.Ids; import org.apache.zookeeper.ZooKeeper; +import org.awaitility.Awaitility; import org.testng.annotations.AfterMethod; import org.testng.annotations.BeforeMethod; import org.testng.annotations.Test; @@ -152,7 +148,11 @@ public class AntiAffinityNamespaceGroupTest { primaryLoadManager = (ModularLoadManagerImpl) getField(pulsar1.getLoadManager().get(), "loadManager"); secondaryLoadManager = (ModularLoadManagerImpl) getField(pulsar2.getLoadManager().get(), "loadManager"); nsFactory = new NamespaceBundleFactory(pulsar1, Hashing.crc32()); - Thread.sleep(100); + + Awaitility.await().untilAsserted(() -> { + assertEquals(pulsar1.getState(), PulsarService.State.Started); + assertEquals(pulsar2.getState(), PulsarService.State.Started); + }); } @AfterMethod(alwaysRun = true) @@ -423,16 +423,10 @@ public class AntiAffinityNamespaceGroupTest { admin1.namespaces().setNamespaceAntiAffinityGroup(namespace2, namespaceAntiAffinityGroup); // validate strategically if brokerToDomainCache updated - for (int i = 0; i < 5; i++) { - if (!isLoadManagerUpdatedDomainCache(primaryLoadManager) - || !isLoadManagerUpdatedDomainCache(secondaryLoadManager)) { - Thread.sleep(200); - } else { - break; - } - } - assertTrue(isLoadManagerUpdatedDomainCache(primaryLoadManager)); - assertTrue(isLoadManagerUpdatedDomainCache(secondaryLoadManager)); + Awaitility.await().untilAsserted(() -> { + assertTrue(isLoadManagerUpdatedDomainCache(primaryLoadManager)); + assertTrue(isLoadManagerUpdatedDomainCache(secondaryLoadManager)); + }); ServiceUnitId serviceUnit1 = makeBundle(tenant, cluster, "ns1"); String selectedBroker1 = primaryLoadManager.selectBrokerForAssignment(serviceUnit1).get(); @@ -554,4 +548,4 @@ public class AntiAffinityNamespaceGroupTest { BoundType.CLOSED)); } -} \ No newline at end of file +} diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/ModularLoadManagerImplTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/ModularLoadManagerImplTest.java index 9225443..62b6a52 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/ModularLoadManagerImplTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/ModularLoadManagerImplTest.java @@ -26,13 +26,11 @@ import static org.testng.Assert.assertEquals; import static org.testng.Assert.assertFalse; import static org.testng.Assert.assertNotEquals; import static org.testng.Assert.assertTrue; - import com.fasterxml.jackson.databind.ObjectMapper; import com.google.common.collect.BoundType; import com.google.common.collect.Range; import com.google.common.collect.Sets; import com.google.common.hash.Hashing; - import java.lang.reflect.Field; import java.lang.reflect.Method; import java.net.URL; @@ -47,9 +45,7 @@ import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicReference; import java.util.function.Supplier; - import lombok.extern.slf4j.Slf4j; - import org.apache.pulsar.broker.BrokerData; import org.apache.pulsar.broker.BundleData; import org.apache.pulsar.broker.PulsarServerException; @@ -69,7 +65,6 @@ import org.apache.pulsar.common.naming.NamespaceBundles; import org.apache.pulsar.common.naming.NamespaceName; import org.apache.pulsar.common.naming.ServiceUnitId; import org.apache.pulsar.common.policies.data.ClusterData; -import org.apache.pulsar.common.policies.data.ClusterDataImpl; import org.apache.pulsar.common.policies.data.NamespaceIsolationDataImpl; import org.apache.pulsar.common.policies.data.TenantInfoImpl; import org.apache.pulsar.common.util.ObjectMapperFactory; @@ -238,11 +233,8 @@ public class ModularLoadManagerImplTest { LoadData loadData = (LoadData) getField(primaryLoadManager, "loadData"); - // Give some time for the watch to fire. - Thread.sleep(500); - // Make sure the second broker is not in the internal map. - assertFalse(loadData.getBrokerData().containsKey(secondaryHost)); + Awaitility.await().untilAsserted(() -> assertFalse(loadData.getBrokerData().containsKey(secondaryHost))); // Try 5 more selections, ensure they all go to the first broker. for (int i = 2; i < 7; ++i) { diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/namespace/OwnershipCacheTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/namespace/OwnershipCacheTest.java index 6c85853..1fe531c 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/namespace/OwnershipCacheTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/namespace/OwnershipCacheTest.java @@ -55,6 +55,7 @@ import org.apache.pulsar.metadata.coordination.impl.CoordinationServiceImpl; import org.apache.pulsar.zookeeper.ZookeeperServerTest; import org.apache.zookeeper.MockZooKeeper; import org.apache.zookeeper.ZooKeeper; +import org.awaitility.Awaitility; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.testng.annotations.AfterMethod; @@ -354,12 +355,10 @@ public class OwnershipCacheTest { assertFalse(data1.isDisabled()); assertEquals(cache.getOwnedBundles().size(), 1); cache.removeOwnership(bundle); - Thread.sleep(500); - assertTrue(cache.getOwnedBundles().isEmpty()); - - Thread.sleep(500); - - assertFalse(store.exists(ServiceUnitUtils.path(bundle)).join()); + Awaitility.await().untilAsserted(() -> { + assertTrue(cache.getOwnedBundles().isEmpty()); + assertFalse(store.exists(ServiceUnitUtils.path(bundle)).join()); + }); } @Test diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/GracefulExecutorServicesShutdownTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/GracefulExecutorServicesShutdownTest.java index a784608..de97bd7 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/GracefulExecutorServicesShutdownTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/GracefulExecutorServicesShutdownTest.java @@ -36,7 +36,6 @@ import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; import java.util.concurrent.atomic.AtomicBoolean; import org.testng.annotations.Test; -import org.awaitility.Awaitility; public class GracefulExecutorServicesShutdownTest { @@ -170,4 +169,4 @@ public class GracefulExecutorServicesShutdownTest { .handle(); assertTrue(future.isDone()); } -} \ No newline at end of file +} diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorSubscriptionTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorSubscriptionTest.java index 87cc281..58d5f8e 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorSubscriptionTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorSubscriptionTest.java @@ -26,7 +26,6 @@ import com.google.common.collect.Sets; import java.nio.charset.StandardCharsets; import java.util.ArrayList; import java.util.LinkedHashSet; -import java.util.Optional; import java.util.Set; import java.util.concurrent.TimeUnit; import lombok.Cleanup; @@ -43,6 +42,7 @@ import org.apache.pulsar.client.api.PulsarClientException; import org.apache.pulsar.client.api.Schema; import org.apache.pulsar.common.policies.data.PartitionedTopicStats; import org.apache.pulsar.common.policies.data.TopicStats; +import org.awaitility.Awaitility; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.testng.annotations.AfterClass; @@ -276,9 +276,10 @@ public class ReplicatorSubscriptionTest extends ReplicatorTestBase { // Unload topic in r1 admin1.topics().unload(topicName); - Thread.sleep(1000); - stats = admin1.topics().getStats(topicName); - assertFalse(stats.getSubscriptions().get(subName).isReplicated()); + Awaitility.await().untilAsserted(() -> { + TopicStats stats2 = admin1.topics().getStats(topicName); + assertFalse(stats2.getSubscriptions().get(subName).isReplicated()); + }); // Make sure the replicated subscription is actually disabled final int numMessages = 20; diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ServerCnxTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ServerCnxTest.java index 84af70b..f16f7bb 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ServerCnxTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ServerCnxTest.java @@ -1125,9 +1125,7 @@ public class ServerCnxTest { assertEquals(response.getClass(), CommandError.class); assertEquals(((CommandError) response).getRequestId(), 3); - while (serverCnx.hasConsumer(1)) { - Thread.sleep(10); - } + Awaitility.await().until(() -> !serverCnx.hasConsumer(1)); ByteBuf subscribe3 = Commands.newSubscribe(successTopicName, // successSubName, 1 /* consumer id */, 4 /* request id */, SubType.Exclusive, 0, diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/SubscriptionSeekTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/SubscriptionSeekTest.java index 22e7135..08d7e9c 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/SubscriptionSeekTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/SubscriptionSeekTest.java @@ -52,6 +52,7 @@ import org.apache.pulsar.client.impl.MessageIdImpl; import org.apache.pulsar.client.impl.TopicMessageIdImpl; import org.apache.pulsar.common.naming.TopicName; import org.apache.pulsar.common.util.RelativeTimeUtil; +import org.awaitility.Awaitility; import org.testng.annotations.AfterClass; import org.testng.annotations.BeforeClass; import org.testng.annotations.Test; @@ -102,11 +103,11 @@ public class SubscriptionSeekTest extends BrokerTestBase { assertEquals(sub.getNumberOfEntriesInBacklog(false), 0); // Wait for consumer to reconnect - Thread.sleep(500); + Awaitility.await().until(consumer::isConnected); consumer.seek(MessageId.earliest); assertEquals(sub.getNumberOfEntriesInBacklog(false), 10); - Thread.sleep(500); + Awaitility.await().until(consumer::isConnected); consumer.seek(messageIds.get(5)); assertEquals(sub.getNumberOfEntriesInBacklog(false), 5); @@ -118,11 +119,11 @@ public class SubscriptionSeekTest extends BrokerTestBase { log.info("MessageId {}: beforeEarliest: {}, afterLatest: {}", messageId, beforeEarliest, afterLatest); - Thread.sleep(500); + Awaitility.await().until(consumer::isConnected); consumer.seek(beforeEarliest); assertEquals(sub.getNumberOfEntriesInBacklog(false), 10); - Thread.sleep(500); + Awaitility.await().until(consumer::isConnected); consumer.seek(afterLatest); assertEquals(sub.getNumberOfEntriesInBacklog(false), 0); } @@ -221,43 +222,43 @@ public class SubscriptionSeekTest extends BrokerTestBase { admin.topics().resetCursor(topicName, subscriptionName, MessageId.earliest); // Wait consumer reconnect - Thread.sleep(1000); + Awaitility.await().until(consumer::isConnected); Message<String> receiveBeforeEarliest = consumer.receive(); assertEquals(receiveBeforeEarliest.getValue(), messages.get(0)); admin.topics().resetCursor(topicName, subscriptionName, MessageId.latest); // Wait consumer reconnect - Thread.sleep(1000); + Awaitility.await().until(consumer::isConnected); Message<String> receiveAfterLatest = consumer.receive(1, TimeUnit.SECONDS); assertNull(receiveAfterLatest); admin.topics().resetCursor(topicName, subscriptionName, messageIds.get(0), true); // Wait consumer reconnect - Thread.sleep(1000); + Awaitility.await().until(consumer::isConnected); Message<String> received = consumer.receive(); assertEquals(received.getMessageId(), messageIds.get(1)); admin.topics().resetCursor(topicName, subscriptionName, messageIds.get(0), false); // Wait consumer reconnect - Thread.sleep(1000); + Awaitility.await().until(consumer::isConnected); received = consumer.receive(); assertEquals(received.getMessageId(), messageIds.get(0)); admin.topics().resetCursor(topicName, subscriptionName, messageIds.get(messageIds.size() - 1), true); // Wait consumer reconnect - Thread.sleep(1000); + Awaitility.await().until(consumer::isConnected); received = consumer.receive(1, TimeUnit.SECONDS); assertNull(received); admin.topics().resetCursor(topicName, subscriptionName, messageIds.get(messageIds.size() - 1), false); // Wait consumer reconnect - Thread.sleep(1000); + Awaitility.await().until(consumer::isConnected); received = consumer.receive(); assertEquals(received.getMessageId(), messageIds.get(messageIds.size() - 1)); admin.topics().resetCursor(topicName, subscriptionName, new BatchMessageIdImpl(-1, -1, -1 ,10), true); // Wait consumer reconnect - Thread.sleep(1000); + Awaitility.await().until(consumer::isConnected); received = consumer.receive(); assertEquals(received.getMessageId(), messageIds.get(0)); } @@ -358,7 +359,7 @@ public class SubscriptionSeekTest extends BrokerTestBase { assertEquals(sub.getNumberOfEntriesInBacklog(false), 0); // Wait for consumer to reconnect - Thread.sleep(1000); + Awaitility.await().until(consumer::isConnected); consumer.seek(currentTimestamp - resetTimeInMillis); assertEquals(sub.getNumberOfEntriesInBacklog(false), 10); } @@ -456,7 +457,7 @@ public class SubscriptionSeekTest extends BrokerTestBase { assertEquals(backlogs, 0); // Wait for consumer to reconnect - Thread.sleep(1000); + Awaitility.await().until(consumer::isConnected); consumer.seek(currentTimestamp - resetTimeInMillis); backlogs = 0; @@ -494,7 +495,7 @@ public class SubscriptionSeekTest extends BrokerTestBase { assertEquals(connectedSinceSet.size(), 2); consumer1.seek(MessageId.earliest); // Wait for consumer to reconnect - Thread.sleep(1000); + Awaitility.await().until(consumer1::isConnected); consumers = topicRef.getSubscriptions().get("my-subscription").getConsumers(); assertEquals(consumers.size(), 2); @@ -531,7 +532,7 @@ public class SubscriptionSeekTest extends BrokerTestBase { assertEquals(connectedSinceSet.size(), 2); consumer1.seek(MessageId.earliest); // Wait for consumer to reconnect - Thread.sleep(1000); + Awaitility.await().until(consumer1::isConnected); consumers = topicRef.getSubscriptions().get("my-subscription").getConsumers(); assertEquals(consumers.size(), 2); diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/TopicTerminationTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/TopicTerminationTest.java index a7a1111..0cd84f3 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/TopicTerminationTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/TopicTerminationTest.java @@ -42,6 +42,7 @@ import org.apache.pulsar.client.api.PulsarClientException; import org.apache.pulsar.client.api.Reader; import org.apache.pulsar.client.api.ReaderListener; import org.apache.pulsar.common.util.FutureUtil; +import org.awaitility.Awaitility; import org.testng.annotations.AfterMethod; import org.testng.annotations.BeforeMethod; import org.testng.annotations.Test; @@ -220,8 +221,7 @@ public class TopicTerminationTest extends BrokerTestBase { Message<byte[]> msg4 = consumer.receive(100, TimeUnit.MILLISECONDS); assertNull(msg4); - Thread.sleep(100); - assertTrue(consumer.hasReachedEndOfTopic()); + Awaitility.await().untilAsserted(() -> assertTrue(consumer.hasReachedEndOfTopic())); } @Test(timeOut = 20000) @@ -254,8 +254,7 @@ public class TopicTerminationTest extends BrokerTestBase { consumer.acknowledgeCumulative(msgId3); - Thread.sleep(100); - assertFalse(consumer.hasReachedEndOfTopic()); + Awaitility.await().untilAsserted(() -> assertFalse(consumer.hasReachedEndOfTopic())); MessageId lastMessageId = admin.topics().terminateTopicAsync(topicName).get(); assertEquals(lastMessageId, msgId3); @@ -292,8 +291,7 @@ public class TopicTerminationTest extends BrokerTestBase { Message<byte[]> msg4 = reader.readNext(100, TimeUnit.MILLISECONDS); assertNull(msg4); - Thread.sleep(100); - assertTrue(reader.hasReachedEndOfTopic()); + Awaitility.await().untilAsserted(() -> assertTrue(reader.hasReachedEndOfTopic())); } @Test(timeOut = 20000) @@ -323,8 +321,7 @@ public class TopicTerminationTest extends BrokerTestBase { /* MessageId msgId2 = */ producer.send("test-msg-2".getBytes()); MessageId msgId3 = producer.send("test-msg-3".getBytes()); - Thread.sleep(100); - assertFalse(reader.hasReachedEndOfTopic()); + Awaitility.await().untilAsserted(() -> assertFalse(reader.hasReachedEndOfTopic())); MessageId lastMessageId = admin.topics().terminateTopicAsync(topicName).get(); assertEquals(lastMessageId, msgId3); @@ -348,8 +345,7 @@ public class TopicTerminationTest extends BrokerTestBase { org.apache.pulsar.client.api.Consumer<byte[]> consumer = pulsarClient.newConsumer().topic(topicName) .subscriptionName("my-sub").subscribe(); - Thread.sleep(200); - assertTrue(consumer.hasReachedEndOfTopic()); + Awaitility.await().untilAsserted(() -> assertTrue(consumer.hasReachedEndOfTopic())); } @Test(timeOut = 20000) @@ -363,7 +359,6 @@ public class TopicTerminationTest extends BrokerTestBase { org.apache.pulsar.client.api.Consumer<byte[]> consumer = pulsarClient.newConsumer().topic(topicName) .subscriptionName("my-sub").subscribe(); - Thread.sleep(200); - assertTrue(consumer.hasReachedEndOfTopic()); + Awaitility.await().untilAsserted(() -> assertTrue(consumer.hasReachedEndOfTopic())); } } diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionProduceTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionProduceTest.java index e66525e..13e8c69 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionProduceTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionProduceTest.java @@ -19,9 +19,7 @@ package org.apache.pulsar.broker.transaction; import static java.nio.charset.StandardCharsets.UTF_8; - import com.google.common.collect.Sets; - import java.lang.reflect.Field; import java.util.ArrayList; import java.util.HashSet; @@ -31,10 +29,8 @@ import java.util.Set; import java.util.concurrent.CompletableFuture; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; - import lombok.Cleanup; import lombok.extern.slf4j.Slf4j; - import org.apache.bookkeeper.mledger.Entry; import org.apache.bookkeeper.mledger.ManagedLedgerConfig; import org.apache.bookkeeper.mledger.ReadOnlyCursor; @@ -57,7 +53,6 @@ import org.apache.pulsar.common.api.proto.MessageMetadata; import org.apache.pulsar.common.naming.NamespaceName; import org.apache.pulsar.common.naming.TopicName; import org.apache.pulsar.common.policies.data.ClusterData; -import org.apache.pulsar.common.policies.data.ClusterDataImpl; import org.apache.pulsar.common.policies.data.TenantInfoImpl; import org.apache.pulsar.common.protocol.Commands; import org.awaitility.Awaitility; @@ -289,10 +284,9 @@ public class TransactionProduceTest extends TransactionTestBase { consumer.acknowledgeAsync(message.getMessageId(), txn); } - Thread.sleep(1000); - // The pending messages count should be the incomingMessageCnt - Assert.assertEquals(getPendingAckCount(ACK_COMMIT_TOPIC, subscriptionName), incomingMessageCnt); + Awaitility.await().untilAsserted( + () -> Assert.assertEquals(getPendingAckCount(ACK_COMMIT_TOPIC, subscriptionName), incomingMessageCnt)); consumer.redeliverUnacknowledgedMessages(); Message<byte[]> message = consumer.receive(2, TimeUnit.SECONDS); @@ -303,10 +297,9 @@ public class TransactionProduceTest extends TransactionTestBase { txn.commit().get(); - Thread.sleep(1000); - // After commit, the pending messages count should be 0 - Assert.assertEquals(getPendingAckCount(ACK_COMMIT_TOPIC, subscriptionName), 0); + Awaitility.await().untilAsserted( + () -> Assert.assertEquals(getPendingAckCount(ACK_COMMIT_TOPIC, subscriptionName), 0)); consumer.redeliverUnacknowledgedMessages(); for (int i = 0; i < incomingMessageCnt; i++) { @@ -352,10 +345,9 @@ public class TransactionProduceTest extends TransactionTestBase { consumer.acknowledgeAsync(message.getMessageId(), txn); } - Thread.sleep(1000); - // The pending messages count should be the incomingMessageCnt - Assert.assertEquals(getPendingAckCount(ACK_ABORT_TOPIC, subscriptionName), incomingMessageCnt); + Awaitility.await().untilAsserted( + () -> Assert.assertEquals(getPendingAckCount(ACK_ABORT_TOPIC, subscriptionName), incomingMessageCnt)); consumer.redeliverUnacknowledgedMessages(); Message<byte[]> message = consumer.receive(2, TimeUnit.SECONDS); @@ -366,10 +358,9 @@ public class TransactionProduceTest extends TransactionTestBase { txn.abort().get(); - Thread.sleep(1000); - // After commit, the pending messages count should be 0 - Assert.assertEquals(getPendingAckCount(ACK_ABORT_TOPIC, subscriptionName), 0); + Awaitility.await().untilAsserted( + () -> Assert.assertEquals(getPendingAckCount(ACK_ABORT_TOPIC, subscriptionName), 0)); consumer.redeliverUnacknowledgedMessages(); for (int i = 0; i < incomingMessageCnt; i++) { diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/NonPersistentTopicTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/NonPersistentTopicTest.java index 1e4b1de..d3bdea2 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/NonPersistentTopicTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/NonPersistentTopicTest.java @@ -26,9 +26,7 @@ import static org.testng.Assert.assertNull; import static org.testng.Assert.assertThrows; import static org.testng.Assert.assertTrue; import static org.testng.Assert.fail; - import com.google.common.collect.Sets; - import java.lang.reflect.Field; import java.net.URL; import java.util.Optional; @@ -41,7 +39,6 @@ import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicReference; - import lombok.Cleanup; import org.apache.pulsar.broker.PulsarService; import org.apache.pulsar.broker.ServiceConfiguration; @@ -60,7 +57,6 @@ import org.apache.pulsar.client.impl.ProducerImpl; import org.apache.pulsar.common.naming.NamespaceBundle; import org.apache.pulsar.common.naming.TopicName; import org.apache.pulsar.common.policies.data.ClusterData; -import org.apache.pulsar.common.policies.data.ClusterDataImpl; import org.apache.pulsar.common.policies.data.NonPersistentPublisherStats; import org.apache.pulsar.common.policies.data.NonPersistentSubscriptionStats; import org.apache.pulsar.common.policies.data.NonPersistentTopicStats; @@ -170,8 +166,8 @@ public class NonPersistentTopicTest extends ProducerConsumerBase { for (int i = 0; i < totalProduceMsg; i++) { String message = "my-message-" + i; producer.send(message.getBytes()); - Thread.sleep(10); } + producer.flush(); Message<?> msg = null; Set<String> messageSet = Sets.newHashSet(); @@ -213,8 +209,8 @@ public class NonPersistentTopicTest extends ProducerConsumerBase { for (int i = 0; i < totalProduceMsg; i++) { String message = "my-message-" + i; producer.send(message.getBytes()); - Thread.sleep(10); } + producer.flush(); Message<?> msg = null; Set<String> messageSet = Sets.newHashSet(); @@ -269,8 +265,8 @@ public class NonPersistentTopicTest extends ProducerConsumerBase { for (int i = 0; i < totalProduceMsg; i++) { String message = "my-message-" + i; producer.send(message.getBytes()); - Thread.sleep(10); } + producer.flush(); Message<?> msg = null; Set<String> messageSet = Sets.newHashSet(); @@ -311,8 +307,8 @@ public class NonPersistentTopicTest extends ProducerConsumerBase { for (int i = 0; i < totalProduceMsg; i++) { String message = "my-message-" + i; producer.send(message.getBytes()); - Thread.sleep(10); } + producer.flush(); Message<?> msg = null; Set<String> messageSet = Sets.newHashSet(); @@ -421,8 +417,8 @@ public class NonPersistentTopicTest extends ProducerConsumerBase { for (int i = 0; i < totalProduceMsg; i++) { String message = "my-message-" + i; producer.send(message.getBytes()); - Thread.sleep(10); } + producer.flush(); // consume from shared-subscriptions Message<?> msg = null; diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleProducerConsumerTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleProducerConsumerTest.java index 5ebb2fb..04516c0 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleProducerConsumerTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleProducerConsumerTest.java @@ -453,8 +453,8 @@ public class SimpleProducerConsumerTest extends ProducerConsumerBase { assertTrue(latch.get().await(receiverQueueSize, TimeUnit.SECONDS), "Timed out waiting for message listener acks"); log.info("Giving message listener an opportunity to receive messages while paused"); - Thread.sleep(2000); // hopefully this is long enough - assertEquals(received.intValue(), receiverQueueSize, "Consumer received messages while paused"); + Awaitility.await().untilAsserted( + () -> assertEquals(received.intValue(), receiverQueueSize, "Consumer received messages while paused")); latch.set(new CountDownLatch(receiverQueueSize)); @@ -498,8 +498,9 @@ public class SimpleProducerConsumerTest extends ProducerConsumerBase { // Make sure no flow permits are sent when the consumer reconnects to the topic admin.topics().unload(topicName); - Thread.sleep(2000); - assertEquals(received.intValue(), receiverQueueSize, "Consumer received messages while paused"); + Awaitility.await().untilAsserted( + () -> assertEquals(received.intValue(), receiverQueueSize, "Consumer received messages while paused")); + latch.set(new CountDownLatch(receiverQueueSize)); consumer.resume(); @@ -1216,22 +1217,26 @@ public class SimpleProducerConsumerTest extends ProducerConsumerBase { CompletableFuture<MessageId> future = producer.newMessage().sequenceId(i).value(message.getBytes()).sendAsync(); futures.add(future); } - Thread.sleep(3000); - futures.get(0).exceptionally(ex -> { - long sequenceId = ((PulsarClientException) ex.getCause()).getSequenceId(); - Assert.assertEquals(sequenceId, 0L); - return null; - }); - futures.get(1).exceptionally(ex -> { - long sequenceId = ((PulsarClientException) ex.getCause()).getSequenceId(); - Assert.assertEquals(sequenceId, 1L); - return null; - }); - futures.get(2).exceptionally(ex -> { - long sequenceId = ((PulsarClientException) ex.getCause()).getSequenceId(); - Assert.assertEquals(sequenceId, 2L); - return null; + Awaitility.await().until(() -> { + futures.get(0).exceptionally(ex -> { + long sequenceId = ((PulsarClientException) ex.getCause()).getSequenceId(); + Assert.assertEquals(sequenceId, 0L); + return null; + }); + futures.get(1).exceptionally(ex -> { + long sequenceId = ((PulsarClientException) ex.getCause()).getSequenceId(); + Assert.assertEquals(sequenceId, 1L); + return null; + }); + futures.get(2).exceptionally(ex -> { + long sequenceId = ((PulsarClientException) ex.getCause()).getSequenceId(); + Assert.assertEquals(sequenceId, 2L); + return null; + }); + + return true; }); + log.info("-- Exiting {} test --", methodName); } @@ -1639,12 +1644,10 @@ public class SimpleProducerConsumerTest extends ProducerConsumerBase { } // (2) wait for consumer to receive messages - Thread.sleep(1000); - assertEquals(consumer.numMessagesInQueue(), receiverQueueSize); + Awaitility.await().untilAsserted(() -> assertEquals(consumer.numMessagesInQueue(), receiverQueueSize)); // (3) wait for messages to expire, we should've received more - Thread.sleep(2000); - assertEquals(consumer.numMessagesInQueue(), receiverQueueSize); + Awaitility.await().untilAsserted(() -> assertEquals(consumer.numMessagesInQueue(), receiverQueueSize)); for (int i = 0; i < totalProducedMsgs; i++) { Message<byte[]> msg = consumer.receive(RECEIVE_TIMEOUT_SECONDS, TimeUnit.SECONDS); @@ -2065,8 +2068,8 @@ public class SimpleProducerConsumerTest extends ProducerConsumerBase { for (int i = 0; i < totalProducedMsgs; i++) { String message = "my-message-" + i; producer.send(message.getBytes()); - Thread.sleep(10); } + producer.flush(); // (1.a) start consumer again consumer = (ConsumerImpl<byte[]>) pulsarClient.newConsumer() @@ -2348,8 +2351,8 @@ public class SimpleProducerConsumerTest extends ProducerConsumerBase { for (int i = 0; i < receiverQueueSize; i++) { String message = "my-message-" + i; producer.send(message.getBytes()); - Thread.sleep(10); } + producer.flush(); // (1.a) consume first consumeMsgInParts msgs and trigger redeliver Message<byte[]> msg; List<Message<byte[]>> messages1 = Lists.newArrayList(); @@ -2387,8 +2390,8 @@ public class SimpleProducerConsumerTest extends ProducerConsumerBase { for (int i = 0; i < receiverQueueSize; i++) { String message = "my-message-" + i; producer.send(message.getBytes()); - Thread.sleep(100); } + producer.flush(); int remainingMsgs = (2 * receiverQueueSize) - (2 * consumeMsgInParts); messages1.clear(); @@ -3190,9 +3193,7 @@ public class SimpleProducerConsumerTest extends ProducerConsumerBase { admin.topics().updatePartitionedTopic(topicName, 3); // 4. wait for client to update partitions - while(((MultiTopicsConsumerImpl)consumer).getConsumers().size() <= 1) { - Thread.sleep(1); - } + Awaitility.await().until(() -> ((MultiTopicsConsumerImpl) consumer).getConsumers().size() <= 1); // 5. produce 5 more messages for (int i = 5; i < 10; i++) { @@ -3288,7 +3289,7 @@ public class SimpleProducerConsumerTest extends ProducerConsumerBase { } // 6. should not consume any messages - assertNull(consumer.receive(RECEIVE_TIMEOUT_SECONDS, TimeUnit.SECONDS)); + Awaitility.await().untilAsserted(() -> assertNull(consumer.receive(RECEIVE_TIMEOUT_SECONDS, TimeUnit.SECONDS))); // 7. resume multi-topic consumer consumer.resume(); @@ -4135,4 +4136,4 @@ public class SimpleProducerConsumerTest extends ProducerConsumerBase { blockedMessageLatch.countDown(); log.info("-- Exiting {} test --", methodName); } -} \ No newline at end of file +}