This is an automated email from the ASF dual-hosted git repository. penghui pushed a commit to branch branch-2.7 in repository https://gitbox.apache.org/repos/asf/pulsar.git
commit 56680581a1212f39b84a00717142b4062fa8de5d Author: Enrico Olivelli <[email protected]> AuthorDate: Tue Dec 29 09:43:20 2020 +0100 Getting the stats of a non-persistent topic that has been cleaned causes it to re-appear (#9029) If a non-persistent topic is unused it is automatically deleted by Pulsar. If you then get the stats on that topic name using the REST API, it causes that topic to re-appear. For example, a non-persistent topic `public/bob/np` exists in a namespace. It is returned when using the `admin/v2/non-persistent/public/bob` endpoint: ``` ["non-persistent://public/bob/np"] ``` Since this topic is unused, it gets cleaned and no longer is returned by the endpoint: ``` [] ``` However, if you request the stats for that topic using the CLI (which calls the REST API), like this, you actually get a response (not a 404): ``` bin/pulsar-admin topics stats non-persistent://public/bob/np Warning: Nashorn engine is planned to be removed from a future JDK release { "msgRateIn" : 0.0, "msgThroughputIn" : 0.0, "msgRateOut" : 0.0, "msgThroughputOut" : 0.0, "bytesInCounter" : 0, "msgInCounter" : 0, "bytesOutCounter" : 0, "msgOutCounter" : 0, "averageMsgSize" : 0.0, "msgChunkPublished" : false, "storageSize" : 0, "backlogSize" : 0, "publishers" : [ ], "subscriptions" : { }, "replication" : { } } ``` And now the topic re-appears on the topic-list endpoint: ``` ["non-persistent://public/bob/np"] ``` ### Modifications When loading a temporary topic with createIfMissing = false do not try to create it, simply return an empty value. Add test case. This change added tests and can be verified as in the bug description. Run: pulsar-admin topics create non-persistent://public/default/tmp wait for the topic to be deleted run pulsar-admin topics stats non-persistent://public/default/tmp (cherry picked from commit b860c059eb4d13969469b23c9a3bffd6bf7e5a66) --- .../pulsar/broker/service/BrokerService.java | 17 ++++-- .../broker/service/NonPersistentTopicE2ETest.java | 66 +++++++++++++++++++++- 2 files changed, 77 insertions(+), 6 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java index 5348397..32bf07e 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java @@ -759,10 +759,19 @@ public class BrokerService implements Closeable, ZooKeeperCacheListener<Policies } } final boolean isPersistentTopic = TopicName.get(topic).getDomain().equals(TopicDomain.persistent); - return topics.computeIfAbsent(topic, (topicName) -> { - return isPersistentTopic ? this.loadOrCreatePersistentTopic(topicName, createIfMissing) - : createNonPersistentTopic(topicName); - }); + if (isPersistentTopic) { + return topics.computeIfAbsent(topic, (topicName) -> { + return this.loadOrCreatePersistentTopic(topicName, createIfMissing); + }); + } else { + return topics.computeIfAbsent(topic, (topicName) -> { + if (createIfMissing) { + return createNonPersistentTopic(topicName); + } else { + return CompletableFuture.completedFuture(Optional.empty()); + } + }); + } } catch (IllegalArgumentException e) { log.warn("[{}] Illegalargument exception when loading topic", topic, e); return failedFuture(e); diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/NonPersistentTopicE2ETest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/NonPersistentTopicE2ETest.java index 1bbb20f..fab7a5e 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/NonPersistentTopicE2ETest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/NonPersistentTopicE2ETest.java @@ -37,6 +37,8 @@ import org.apache.pulsar.client.impl.schema.JSONSchema; import org.apache.pulsar.common.naming.TopicName; import org.apache.pulsar.common.protocol.schema.SchemaData; import org.apache.pulsar.common.schema.SchemaType; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import org.testng.Assert; import org.testng.annotations.AfterMethod; import org.testng.annotations.BeforeMethod; @@ -48,6 +50,8 @@ import static org.testng.Assert.assertTrue; public class NonPersistentTopicE2ETest extends BrokerTestBase { + private static final Logger log = LoggerFactory.getLogger(NonPersistentTopicE2ETest.class); + @BeforeMethod @Override protected void setup() throws Exception { @@ -102,8 +106,8 @@ public class NonPersistentTopicE2ETest extends BrokerTestBase { assertFalse(topic.isPresent()); assertFalse(topicHasSchema(topicName)); - // 2. Topic is not GCed with live connection - topicName = "non-persistent://prop/ns-abc/topic-2"; + // 1a. Topic that add/removes subscription can be GC'd + topicName = "non-persistent://prop/ns-abc/topic-1a"; String subName = "sub1"; Consumer<byte[]> consumer = pulsarClient.newConsumer().topic(topicName).subscriptionName(subName).subscribe(); topic = getTopic(topicName); @@ -111,6 +115,23 @@ public class NonPersistentTopicE2ETest extends BrokerTestBase { topic.get().addSchema(schemaData).join(); assertTrue(topicHasSchema(topicName)); + admin.topics().deleteSubscription(topicName, subName); + consumer.close(); + + runGC(); + topic = getTopic(topicName); + assertFalse(topic.isPresent()); + assertFalse(topicHasSchema(topicName)); + + // 2. Topic is not GCed with live connection + topicName = "non-persistent://prop/ns-abc/topic-2"; + subName = "sub1"; + consumer = pulsarClient.newConsumer().topic(topicName).subscriptionName(subName).subscribe(); + topic = getTopic(topicName); + assertTrue(topic.isPresent()); + topic.get().addSchema(schemaData).join(); + assertTrue(topicHasSchema(topicName)); + runGC(); topic = getTopic(topicName); assertTrue(topic.isPresent()); @@ -170,4 +191,45 @@ public class NonPersistentTopicE2ETest extends BrokerTestBase { producer2.close(); } + @Test + public void testGC() throws Exception { + // 1. Simple successful GC + String topicName = "non-persistent://prop/ns-abc/topic-10"; + Producer<byte[]> producer = pulsarClient.newProducer().topic(topicName).create(); + producer.close(); + + assertTrue(pulsar.getBrokerService().getTopicReference(topicName).isPresent()); + runGC(); + assertFalse(pulsar.getBrokerService().getTopicReference(topicName).isPresent()); + + // 2. Topic is not GCed with live connection + String subName = "sub1"; + Consumer<byte[]> consumer = pulsarClient.newConsumer().topic(topicName).subscriptionName(subName).subscribe(); + + runGC(); + assertTrue(pulsar.getBrokerService().getTopicReference(topicName).isPresent()); + + // 3. Topic with subscription is not GCed even with no connections + consumer.close(); + + runGC(); + assertTrue(pulsar.getBrokerService().getTopicReference(topicName).isPresent()); + + // 4. Topic can be GCed after unsubscribe + admin.topics().deleteSubscription(topicName, subName); + + runGC(); + assertFalse(pulsar.getBrokerService().getTopicReference(topicName).isPresent()); + // 5. Get the topic and make sure it doesn't come back + admin.lookups().lookupTopic(topicName); + Optional<Topic> topic = pulsar.getBrokerService().getTopicIfExists(topicName).join(); + assertFalse(topic.isPresent()); + assertFalse(pulsar.getBrokerService().getTopicReference(topicName).isPresent()); + + // write again, the topic will be available + Producer<byte[]> producer2 = pulsarClient.newProducer().topic(topicName).create(); + producer2.close(); + + assertTrue(pulsar.getBrokerService().getTopicReference(topicName).isPresent()); + } }
