This is an automated email from the ASF dual-hosted git repository. penghui pushed a commit to branch branch-2.10 in repository https://gitbox.apache.org/repos/asf/pulsar.git
commit b82dca1863d6eeb42c2f0cd0a98fe1787e6082f9 Author: Jiwei Guo <[email protected]> AuthorDate: Tue Nov 15 21:30:24 2022 +0800 [improve][test] Add subscribing regex topic test for `delete_when_subscriptions_caught_up`. (#18368) (cherry picked from commit 868458a8c60ea0b3b66716895d0965da177e7c90) --- .../broker/service/InactiveTopicDeleteTest.java | 39 ++++++++++++++++++---- .../policies/data/InactiveTopicDeleteMode.java | 2 +- 2 files changed, 34 insertions(+), 7 deletions(-) diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/InactiveTopicDeleteTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/InactiveTopicDeleteTest.java index c972d0ea5bb..87075c587bc 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/InactiveTopicDeleteTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/InactiveTopicDeleteTest.java @@ -32,7 +32,10 @@ import java.util.concurrent.TimeUnit; import org.apache.pulsar.broker.namespace.NamespaceService; import org.apache.pulsar.broker.service.persistent.PersistentTopic; import org.apache.pulsar.client.api.Consumer; +import org.apache.pulsar.client.api.Message; import org.apache.pulsar.client.api.Producer; +import org.apache.pulsar.client.impl.ConsumerImpl; +import org.apache.pulsar.client.impl.MultiTopicsConsumerImpl; import org.apache.pulsar.common.naming.NamespaceName; import org.apache.pulsar.common.naming.TopicName; import org.apache.pulsar.common.naming.TopicVersion; @@ -313,30 +316,54 @@ public class InactiveTopicDeleteTest extends BrokerTestBase { super.baseSetup(); final String topic = "persistent://prop/ns-abc/testDeleteWhenNoBacklogs"; - + final String topic2 = "persistent://prop/ns-abc/testDeleteWhenNoBacklogsB"; Producer<byte[]> producer = pulsarClient.newProducer() .topic(topic) .create(); + Producer<byte[]> producer2 = pulsarClient.newProducer() + .topic(topic2) + .create(); Consumer<byte[]> consumer = pulsarClient.newConsumer() .topic(topic) .subscriptionName("sub") .subscribe(); - for (int i = 0; i < 10; i++) { + Consumer<byte[]> consumer2 = pulsarClient.newConsumer() + .topicsPattern("persistent://prop/ns-abc/test.*") + .subscriptionName("sub2") + .subscribe(); + + int producedCount = 10; + for (int i = 0; i < producedCount; i++) { producer.send("Pulsar".getBytes()); + producer2.send("Pulsar".getBytes()); } producer.close(); + producer2.close(); + int receivedCount = 0; + Message<byte[]> msg; + while((msg = consumer2.receive(1, TimeUnit.SECONDS)) != null) { + consumer2.acknowledge(msg); + receivedCount ++; + } + assertEquals(producedCount * 2, receivedCount); Thread.sleep(2000); - Assert.assertTrue(admin.topics().getList("prop/ns-abc") - .contains(topic)); + Assert.assertTrue(admin.topics().getList("prop/ns-abc").contains(topic)); admin.topics().skipAllMessages(topic, "sub"); - Awaitility.await() - .untilAsserted(() -> Assert.assertFalse(admin.topics().getList("prop/ns-abc").contains(topic))); + Awaitility.await().untilAsserted(() -> { + Assert.assertFalse(consumer.isConnected()); + final List<ConsumerImpl> consumers = ((MultiTopicsConsumerImpl) consumer2).getConsumers(); + consumers.forEach(c -> Assert.assertFalse(c.isConnected())); + Assert.assertFalse(consumer2.isConnected()); + Assert.assertFalse(admin.topics().getList("prop/ns-abc").contains(topic)); + Assert.assertFalse(admin.topics().getList("prop/ns-abc").contains(topic2)); + }); consumer.close(); + consumer2.close(); } @Test diff --git a/pulsar-client-admin-api/src/main/java/org/apache/pulsar/common/policies/data/InactiveTopicDeleteMode.java b/pulsar-client-admin-api/src/main/java/org/apache/pulsar/common/policies/data/InactiveTopicDeleteMode.java index eb7d1ba3ee7..7b0db5898ef 100644 --- a/pulsar-client-admin-api/src/main/java/org/apache/pulsar/common/policies/data/InactiveTopicDeleteMode.java +++ b/pulsar-client-admin-api/src/main/java/org/apache/pulsar/common/policies/data/InactiveTopicDeleteMode.java @@ -29,7 +29,7 @@ public enum InactiveTopicDeleteMode { delete_when_no_subscriptions, /** - * The topic can be deleted when all subscriptions catchup and no active producers/consumers. + * The topic can be deleted when all subscriptions catchup and no active producers. */ delete_when_subscriptions_caught_up }
