This is an automated email from the ASF dual-hosted git repository.

technoboy pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new a49fbf6b6a6 [improve][broker] Using TopicName instead of String as the 
parameter for `getTopic`. (#17416)
a49fbf6b6a6 is described below

commit a49fbf6b6a67c7ce11983e7105221997f8d95a3a
Author: Jiwei Guo <[email protected]>
AuthorDate: Mon Sep 5 13:07:59 2022 +0800

    [improve][broker] Using TopicName instead of String as the parameter for 
`getTopic`. (#17416)
---
 .../pulsar/broker/service/BrokerService.java       | 26 +++++++++++++---------
 .../pulsar/broker/service/BrokerServiceTest.java   | 24 ++++++++++++++++++++
 2 files changed, 39 insertions(+), 11 deletions(-)

diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
index 8985cb9ec07..35a5e8ad34d 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
@@ -953,13 +953,18 @@ public class BrokerService implements Closeable {
 
     public CompletableFuture<Optional<Topic>> getTopic(final String topic, 
boolean createIfMissing,
                                                        Map<String, String> 
properties) {
+        return getTopic(TopicName.get(topic), createIfMissing, properties);
+    }
+
+    public CompletableFuture<Optional<Topic>> getTopic(final TopicName 
topicName, boolean createIfMissing,
+                                                       Map<String, String> 
properties) {
         try {
-            CompletableFuture<Optional<Topic>> topicFuture = topics.get(topic);
+            CompletableFuture<Optional<Topic>> topicFuture = 
topics.get(topicName.toString());
             if (topicFuture != null) {
                 if (topicFuture.isCompletedExceptionally()
                         || (topicFuture.isDone() && 
!topicFuture.getNow(Optional.empty()).isPresent())) {
                     // Exceptional topics should be recreated.
-                    topics.remove(topic, topicFuture);
+                    topics.remove(topicName.toString(), topicFuture);
                 } else {
                     // a non-existing topic in the cache shouldn't prevent 
creating a topic
                     if (createIfMissing) {
@@ -969,7 +974,7 @@ public class BrokerService implements Closeable {
                             return topicFuture.thenCompose(value -> {
                                 if (!value.isPresent()) {
                                     // retry and create topic
-                                    return getTopic(topic, createIfMissing, 
properties);
+                                    return getTopic(topicName, 
createIfMissing, properties);
                                 } else {
                                     // in-progress future completed 
successfully
                                     return 
CompletableFuture.completedFuture(value);
@@ -981,14 +986,13 @@ public class BrokerService implements Closeable {
                     }
                 }
             }
-            final boolean isPersistentTopic = 
TopicName.get(topic).getDomain().equals(TopicDomain.persistent);
+            final boolean isPersistentTopic = 
topicName.getDomain().equals(TopicDomain.persistent);
             if (isPersistentTopic) {
-                return topics.computeIfAbsent(topic, (topicName) -> {
-                    return this.loadOrCreatePersistentTopic(topicName, 
createIfMissing, properties);
+                return topics.computeIfAbsent(topicName.toString(), (k) -> {
+                    return this.loadOrCreatePersistentTopic(k, 
createIfMissing, properties);
                 });
             } else {
-                return topics.computeIfAbsent(topic, (name) -> {
-                    final TopicName topicName = TopicName.get(name);
+                return topics.computeIfAbsent(topicName.toString(), (name) -> {
                     if (topicName.isPartitioned()) {
                         final TopicName partitionedTopicName = 
TopicName.get(topicName.getPartitionedTopicName());
                         return 
this.fetchPartitionedTopicMetadataAsync(partitionedTopicName).thenCompose((metadata)
 -> {
@@ -1005,14 +1009,14 @@ public class BrokerService implements Closeable {
                 });
             }
         } catch (IllegalArgumentException e) {
-            log.warn("[{}] Illegalargument exception when loading topic", 
topic, e);
+            log.warn("[{}] Illegalargument exception when loading topic", 
topicName, e);
             return FutureUtil.failedFuture(e);
         } catch (RuntimeException e) {
             Throwable cause = e.getCause();
             if (cause instanceof ServiceUnitNotReadyException) {
-                log.warn("[{}] Service unit is not ready when loading the 
topic", topic);
+                log.warn("[{}] Service unit is not ready when loading the 
topic", topicName);
             } else {
-                log.warn("[{}] Unexpected exception when loading topic: {}", 
topic, e.getMessage(), e);
+                log.warn("[{}] Unexpected exception when loading topic: {}", 
topicName, e.getMessage(), e);
             }
 
             return FutureUtil.failedFuture(cause);
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerServiceTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerServiceTest.java
index 1fb63470456..a2ce2c23ffe 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerServiceTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerServiceTest.java
@@ -88,6 +88,7 @@ import org.apache.pulsar.client.api.ProducerBuilder;
 import org.apache.pulsar.client.api.PulsarClient;
 import org.apache.pulsar.client.api.PulsarClientException;
 import org.apache.pulsar.client.api.Schema;
+import org.apache.pulsar.client.api.SubscriptionInitialPosition;
 import org.apache.pulsar.client.api.SubscriptionType;
 import org.apache.pulsar.client.impl.ClientCnx;
 import org.apache.pulsar.client.impl.ConnectionPool;
@@ -98,6 +99,7 @@ import 
org.apache.pulsar.common.api.proto.CommandLookupTopicResponse;
 import 
org.apache.pulsar.common.api.proto.CommandPartitionedTopicMetadataResponse;
 import org.apache.pulsar.common.naming.NamespaceBundle;
 import org.apache.pulsar.common.naming.NamespaceName;
+import org.apache.pulsar.common.naming.SystemTopicNames;
 import org.apache.pulsar.common.naming.TopicName;
 import org.apache.pulsar.common.policies.data.BundlesData;
 import org.apache.pulsar.common.policies.data.LocalPolicies;
@@ -1474,4 +1476,26 @@ public class BrokerServiceTest extends BrokerTestBase {
         assertTrue(brokerService.isSystemTopic("persistent://" + 
heartbeatNamespaceV1.toString() + "/healthcheck"));
         assertTrue(brokerService.isSystemTopic(heartbeatNamespaceV2.toString() 
+ "/healthcheck"));
     }
+
+    @Test
+    public void testGetTopic() throws Exception {
+        final String ns = "prop/ns-test";
+        admin.namespaces().createNamespace(ns, 2);
+        final String topicName = ns + "/topic-1";
+        
admin.topics().createNonPartitionedTopic(String.format("persistent://%s", 
topicName));
+        Producer<String> producer1 = 
pulsarClient.newProducer(Schema.STRING).topic(topicName).create();
+        producer1.close();
+        PersistentTopic persistentTopic = (PersistentTopic) 
pulsar.getBrokerService().getTopic(topicName.toString(), false).get().get();
+        persistentTopic.close().join();
+        List<String> topics = new 
ArrayList<>(pulsar.getBrokerService().getTopics().keys());
+        topics.removeIf(item -> 
item.contains(SystemTopicNames.NAMESPACE_EVENTS_LOCAL_NAME));
+        Assert.assertEquals(topics.size(), 0);
+        @Cleanup
+        Consumer<String> consumer = pulsarClient.newConsumer(Schema.STRING)
+                .topic(topicName)
+                .subscriptionName("sub-1")
+                
.subscriptionInitialPosition(SubscriptionInitialPosition.Earliest)
+                .subscriptionType(SubscriptionType.Shared)
+                .subscribe();
+    }
 }

Reply via email to