This is an automated email from the ASF dual-hosted git repository. xiangying pushed a commit to branch xaingying/cherry-pick-2.10/improve_test in repository https://gitbox.apache.org/repos/asf/pulsar.git
commit de786f632798eddc1ec4c8c65c622884ae39d9a4 Author: Cong Zhao <[email protected]> AuthorDate: Thu Oct 13 12:20:43 2022 +0800 [improve][test] Improve AdminApiTest to reduce the execution time (#17980) (cherry picked from commit e4d3c494677b04aee7196c431c3d50835c5044f8) --- .../apache/pulsar/broker/admin/AdminApiTest.java | 131 ++++++++++++++++----- 1 file changed, 100 insertions(+), 31 deletions(-) diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiTest.java index 7206de36090..3f151daf8d2 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiTest.java @@ -135,8 +135,9 @@ import org.awaitility.Awaitility; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.testng.Assert; +import org.testng.annotations.AfterClass; import org.testng.annotations.AfterMethod; -import org.testng.annotations.BeforeMethod; +import org.testng.annotations.BeforeClass; import org.testng.annotations.DataProvider; import org.testng.annotations.Test; @@ -159,7 +160,7 @@ public class AdminApiTest extends MockedPulsarServiceBaseTest { private NamespaceBundleFactory bundleFactory; - @BeforeMethod + @BeforeClass @Override public void setup() throws Exception { conf.setLoadBalancerEnabled(true); @@ -186,14 +187,39 @@ public class AdminApiTest extends MockedPulsarServiceBaseTest { otherPulsar = mockPulsarSetup.getPulsar(); otheradmin = mockPulsarSetup.getAdmin(); - // Setup namespaces + setupClusters(); + } + + @AfterMethod(alwaysRun = true) + public void resetClusters() throws Exception { + pulsar.getConfiguration().setForceDeleteTenantAllowed(true); + pulsar.getConfiguration().setForceDeleteNamespaceAllowed(true); + for (String tenant : admin.tenants().getTenants()) { + for (String namespace : admin.namespaces().getNamespaces(tenant)) { + deleteNamespaceGraceFully(namespace, true); + } + admin.tenants().deleteTenant(tenant, true); + } + + for (String cluster : admin.clusters().getClusters()) { + admin.clusters().deleteCluster(cluster); + } + + pulsar.getConfiguration().setForceDeleteTenantAllowed(false); + pulsar.getConfiguration().setForceDeleteNamespaceAllowed(false); + + resetConfig(); + setupClusters(); + } + + private void setupClusters() throws PulsarAdminException { admin.clusters().createCluster("test", ClusterData.builder().serviceUrl(pulsar.getWebServiceAddress()).build()); TenantInfoImpl tenantInfo = new TenantInfoImpl(Sets.newHashSet("role1", "role2"), Sets.newHashSet("test")); admin.tenants().createTenant("prop-xyz", tenantInfo); admin.namespaces().createNamespace("prop-xyz/ns1", Sets.newHashSet("test")); } - @AfterMethod(alwaysRun = true) + @AfterClass(alwaysRun = true) @Override public void cleanup() throws Exception { adminTls.close(); @@ -1167,6 +1193,7 @@ public class AdminApiTest extends MockedPulsarServiceBaseTest { String subName = "my-sub"; // create consumer and subscription + @Cleanup Consumer<byte[]> consumer = pulsarClient.newConsumer().topic(topic).subscriptionName(subName).subscribe(); TopicStats topicStats = admin.topics().getStats(topic, false, false, true); @@ -1204,7 +1231,9 @@ public class AdminApiTest extends MockedPulsarServiceBaseTest { assertEquals(admin.topics().getPartitionedTopicMetadata(partitionedTopicName).partitions, 2); // create consumer and subscription - pulsarClient.newConsumer().topic(partitionedTopicName).subscriptionName(subName).subscribe(); + @Cleanup + Consumer<byte[]> consumer = + pulsarClient.newConsumer().topic(partitionedTopicName).subscriptionName(subName).subscribe(); // publish several messages publishMessagesOnPersistentTopic(partitionedTopicName, 10); @@ -1403,8 +1432,10 @@ public class AdminApiTest extends MockedPulsarServiceBaseTest { @Test public void testNamespaceSplitBundle() throws Exception { + admin.namespaces().createNamespace("prop-xyz/splitBundle", Set.of("test")); + // Force to create a topic - final String namespace = "prop-xyz/ns1"; + final String namespace = "prop-xyz/splitBundle"; final String topicName = (new StringBuilder("persistent://")).append(namespace).append("/ds2").toString(); Producer<byte[]> producer = pulsarClient.newProducer(Schema.BYTES) .topic(topicName) @@ -1466,7 +1497,9 @@ public class AdminApiTest extends MockedPulsarServiceBaseTest { for (int i = 0; i < bundles.getBundles().size(); i++) { assertNotEquals(bundles.getBundles().get(i).toString(), splitRange[i]); } - producers.forEach(Producer::closeAsync); + for (Producer<byte[]> producer : producers) { + producer.close(); + } } @Test @@ -1483,6 +1516,9 @@ public class AdminApiTest extends MockedPulsarServiceBaseTest { @Test public void testNamespaceSplitBundleWithDefaultTopicCountEquallyDivideAlgorithm() throws Exception { + cleanup(); + setup(); + conf.setDefaultNamespaceBundleSplitAlgorithm(NamespaceBundleSplitAlgorithm.TOPIC_COUNT_EQUALLY_DIVIDE); // Force to create a topic final String namespace = "prop-xyz/ns1"; @@ -1516,7 +1552,9 @@ public class AdminApiTest extends MockedPulsarServiceBaseTest { for (int i = 0; i < bundles.getBundles().size(); i++) { assertNotEquals(bundles.getBundles().get(i).toString(), splitRange[i]); } - producers.forEach(Producer::closeAsync); + for (Producer<byte[]> producer : producers) { + producer.close(); + } conf.setDefaultNamespaceBundleSplitAlgorithm(NamespaceBundleSplitAlgorithm.RANGE_EQUALLY_DIVIDE_NAME); } @@ -1618,22 +1656,24 @@ public class AdminApiTest extends MockedPulsarServiceBaseTest { @Test public void testNamespaceUnloadBundle() throws Exception { - assertEquals(admin.topics().getList("prop-xyz/ns1"), Lists.newArrayList()); + admin.namespaces().createNamespace("prop-xyz/unloadBundle", Set.of("test")); + + assertEquals(admin.topics().getList("prop-xyz/unloadBundle"), new ArrayList<>()); // Force to create a topic - publishMessagesOnPersistentTopic("persistent://prop-xyz/ns1/ds2", 0); - assertEquals(admin.topics().getList("prop-xyz/ns1"), - Lists.newArrayList("persistent://prop-xyz/ns1/ds2")); + publishMessagesOnPersistentTopic("persistent://prop-xyz/unloadBundle/ds2", 0); + assertEquals(admin.topics().getList("prop-xyz/unloadBundle"), + Lists.newArrayList("persistent://prop-xyz/unloadBundle/ds2")); // create consumer and subscription - Consumer<byte[]> consumer = pulsarClient.newConsumer().topic("persistent://prop-xyz/ns1/ds2") + Consumer<byte[]> consumer = pulsarClient.newConsumer().topic("persistent://prop-xyz/unloadBundle/ds2") .subscriptionName("my-sub").subscribe(); - assertEquals(admin.topics().getSubscriptions("persistent://prop-xyz/ns1/ds2"), + assertEquals(admin.topics().getSubscriptions("persistent://prop-xyz/unloadBundle/ds2"), Lists.newArrayList("my-sub")); // Create producer Producer<byte[]> producer = pulsarClient.newProducer(Schema.BYTES) - .topic("persistent://prop-xyz/ns1/ds2") + .topic("persistent://prop-xyz/unloadBundle/ds2") .enableBatching(false) .messageRoutingMode(MessageRoutingMode.SinglePartition) .create(); @@ -1646,13 +1686,13 @@ public class AdminApiTest extends MockedPulsarServiceBaseTest { producer.close(); try { - admin.namespaces().unloadNamespaceBundle("prop-xyz/ns1", "0x00000000_0xffffffff"); + admin.namespaces().unloadNamespaceBundle("prop-xyz/unloadBundle", "0x00000000_0xffffffff"); } catch (Exception e) { fail("Unload shouldn't have throw exception"); } // check that no one owns the namespace - NamespaceBundle bundle = bundleFactory.getBundle(NamespaceName.get("prop-xyz/ns1"), + NamespaceBundle bundle = bundleFactory.getBundle(NamespaceName.get("prop-xyz/unloadBundle"), Range.range(0L, BoundType.CLOSED, 0xffffffffL, BoundType.CLOSED)); assertFalse(pulsar.getNamespaceService().isServiceUnitOwned(bundle)); assertFalse(otherPulsar.getNamespaceService().isServiceUnitOwned(bundle)); @@ -1662,14 +1702,17 @@ public class AdminApiTest extends MockedPulsarServiceBaseTest { // Force reload of namespace and wait for topic to be ready Awaitility.await().timeout(30, TimeUnit.SECONDS).ignoreExceptionsInstanceOf(PulsarAdminException.class) - .until(() -> admin.topics().getStats("persistent://prop-xyz/ns1/ds2") != null); + .until(() -> admin.topics().getStats("persistent://prop-xyz/unloadBundle/ds2") != null); - admin.topics().deleteSubscription("persistent://prop-xyz/ns1/ds2", "my-sub"); - admin.topics().delete("persistent://prop-xyz/ns1/ds2"); + admin.topics().deleteSubscription("persistent://prop-xyz/unloadBundle/ds2", "my-sub"); + admin.topics().delete("persistent://prop-xyz/unloadBundle/ds2"); } @Test(dataProvider = "numBundles") public void testNamespaceBundleUnload(Integer numBundles) throws Exception { + cleanup(); + setup(); + admin.namespaces().createNamespace("prop-xyz/ns1-bundles", numBundles); admin.namespaces().setNamespaceReplicationClusters("prop-xyz/ns1-bundles", Sets.newHashSet("test")); @@ -1775,15 +1818,25 @@ public class AdminApiTest extends MockedPulsarServiceBaseTest { admin.namespaces().setNamespaceReplicationClusters("prop-xyz/ns1-bundles", Sets.newHashSet("test")); // create consumer and subscription - pulsarClient.newConsumer().topic("persistent://prop-xyz/ns1-bundles/ds2").subscriptionName("my-sub") - .subscribe(); - pulsarClient.newConsumer().topic("persistent://prop-xyz/ns1-bundles/ds2").subscriptionName("my-sub-1") - .subscribe(); - pulsarClient.newConsumer().topic("persistent://prop-xyz/ns1-bundles/ds2").subscriptionName("my-sub-2") + @Cleanup + Consumer<byte[]> consumer = + pulsarClient.newConsumer().topic("persistent://prop-xyz/ns1-bundles/ds2").subscriptionName("my-sub") + .subscribe(); + @Cleanup + Consumer<byte[]> consumer2 = + pulsarClient.newConsumer().topic("persistent://prop-xyz/ns1-bundles/ds2").subscriptionName("my-sub-1") + .subscribe(); + @Cleanup + Consumer<byte[]> consumer3 = + pulsarClient.newConsumer().topic("persistent://prop-xyz/ns1-bundles/ds2").subscriptionName("my-sub-2") .subscribe(); - pulsarClient.newConsumer().topic("persistent://prop-xyz/ns1-bundles/ds1").subscriptionName("my-sub") + @Cleanup + Consumer<byte[]> consumer4 = + pulsarClient.newConsumer().topic("persistent://prop-xyz/ns1-bundles/ds1").subscriptionName("my-sub") .subscribe(); - pulsarClient.newConsumer().topic("persistent://prop-xyz/ns1-bundles/ds1").subscriptionName("my-sub-1") + @Cleanup + Consumer<byte[]> consumer5 = + pulsarClient.newConsumer().topic("persistent://prop-xyz/ns1-bundles/ds1").subscriptionName("my-sub-1") .subscribe(); // Create producer @@ -1847,7 +1900,8 @@ public class AdminApiTest extends MockedPulsarServiceBaseTest { .subscriptionName("my-sub").subscribe(); Consumer<byte[]> consumer2 = pulsarClient.newConsumer().topic("persistent://prop-xyz/ns1-bundles/ds2") .subscriptionName("my-sub-1").subscribe(); - /* Consumer consumer3 = */ pulsarClient.newConsumer().topic("persistent://prop-xyz/ns1-bundles/ds2") + @Cleanup + Consumer<byte[]> consumer3 = pulsarClient.newConsumer().topic("persistent://prop-xyz/ns1-bundles/ds2") .subscriptionName("my-sub-2").subscribe(); Consumer<byte[]> consumer4 = pulsarClient.newConsumer().topic("persistent://prop-xyz/ns1-bundles/ds1") .subscriptionName("my-sub").subscribe(); @@ -2606,7 +2660,9 @@ public class AdminApiTest extends MockedPulsarServiceBaseTest { final int numOfPartitions = 4; admin.topics().createPartitionedTopic(topic1, numOfPartitions); // Create a consumer to get stats on this topic - pulsarClient.newConsumer().topic(topic1).subscriptionName("my-subscriber-name").subscribe(); + @Cleanup + Consumer<byte[]> consumer = + pulsarClient.newConsumer().topic(topic1).subscriptionName("my-subscriber-name").subscribe(); TopicsImpl persistent = (TopicsImpl) admin.topics(); Field field = TopicsImpl.class.getDeclaredField("adminV2Topics"); @@ -2861,12 +2917,14 @@ public class AdminApiTest extends MockedPulsarServiceBaseTest { public void testTopicStatsLastExpireTimestampForSubscription() throws PulsarAdminException, PulsarClientException, InterruptedException { admin.namespaces().setNamespaceMessageTTL("prop-xyz/ns1", 10); final String topic = "persistent://prop-xyz/ns1/testTopicStatsLastExpireTimestampForSubscription"; + @Cleanup Producer<byte[]> producer = pulsarClient.newProducer() .topic(topic) .create(); for (int i = 0; i < 10; i++) { producer.send(new byte[1024 * i * 5]); } + @Cleanup Consumer<byte[]> consumer = pulsarClient.newConsumer() .topic(topic) .subscriptionName("sub-1") @@ -2941,10 +2999,12 @@ public class AdminApiTest extends MockedPulsarServiceBaseTest { @Test public void testBacklogSizeShouldBeZeroWhenConsumerAckedAllMessages() throws Exception { final String topic = "persistent://prop-xyz/ns1/testBacklogSizeShouldBeZeroWhenConsumerAckedAllMessages"; + @Cleanup Consumer<byte[]> consumer = pulsarClient.newConsumer() .topic(topic) .subscriptionName("sub-1") .subscribe(); + @Cleanup Producer<byte[]> producer = pulsarClient.newProducer() .topic(topic) .create(); @@ -2980,6 +3040,7 @@ public class AdminApiTest extends MockedPulsarServiceBaseTest { public void testGetReadPositionWhenJoining() throws Exception { final String topic = "persistent://prop-xyz/ns1/testGetReadPositionWhenJoining-" + UUID.randomUUID().toString(); final String subName = "my-sub"; + @Cleanup Producer<byte[]> producer = pulsarClient.newProducer() .topic(topic) .enableBatching(false) @@ -2991,12 +3052,14 @@ public class AdminApiTest extends MockedPulsarServiceBaseTest { messageId = (MessageIdImpl) producer.send(("Hello Pulsar - " + i).getBytes()); } + List<Consumer<byte[]>> consumers = new ArrayList<>(); for (int i = 0; i < 2; i++) { - pulsarClient.newConsumer() + Consumer<byte[]> consumer = pulsarClient.newConsumer() .topic(topic) .subscriptionType(SubscriptionType.Key_Shared) .subscriptionName(subName) .subscribe(); + consumers.add(consumer); } TopicStats stats = admin.topics().getStats(topic); @@ -3007,6 +3070,10 @@ public class AdminApiTest extends MockedPulsarServiceBaseTest { ConsumerStats consumerStats = subStats.getConsumers().get(0); Assert.assertEquals(consumerStats.getReadPositionWhenJoining(), PositionImpl.get(messageId.getLedgerId(), messageId.getEntryId() + 1).toString()); + + for (Consumer<byte[]> consumer : consumers) { + consumer.close(); + } } @Test @@ -3019,13 +3086,15 @@ public class AdminApiTest extends MockedPulsarServiceBaseTest { admin.topics().createPartitionedTopic(topic, numPartitions); for (int i = 0; i < 2; i++) { - pulsarClient.newConsumer() + @Cleanup + Consumer<byte[]> consumer = pulsarClient.newConsumer() .topic(topic) .subscriptionType(SubscriptionType.Shared) .subscriptionName(subName) .subscribe(); } + @Cleanup Producer<byte[]> producer = pulsarClient.newProducer() .topic(topic) .enableBatching(false)
