This is an automated email from the ASF dual-hosted git repository.

zixuan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new f9705346809 [improve][broker] checkTopicExists supports checking 
partitioned topic without index (#21701)
f9705346809 is described below

commit f97053468094103af7c7bb488bfb0973e3f37e6f
Author: Zixuan Liu <[email protected]>
AuthorDate: Tue Dec 19 12:00:54 2023 +0800

    [improve][broker] checkTopicExists supports checking partitioned topic 
without index (#21701)
    
    Signed-off-by: Zixuan Liu <[email protected]>
---
 .../pulsar/broker/namespace/NamespaceService.java  | 66 +++++++++++-----------
 .../broker/namespace/NamespaceServiceTest.java     | 27 +++++++++
 2 files changed, 59 insertions(+), 34 deletions(-)

diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java
index 0d35e7cad69..4a54d4e0908 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java
@@ -1351,42 +1351,40 @@ public class NamespaceService implements AutoCloseable {
     }
 
     public CompletableFuture<Boolean> checkTopicExists(TopicName topic) {
-        if (topic.isPersistent()) {
-            if (topic.isPartitioned()) {
-                return pulsar.getBrokerService()
-                        
.fetchPartitionedTopicMetadataAsync(TopicName.get(topic.getPartitionedTopicName()))
-                        .thenCompose(metadata -> {
-                            // Allow creating the non-partitioned persistent 
topic that name includes `-partition-`
-                            if (metadata.partitions == 0
-                                    || topic.getPartitionIndex() < 
metadata.partitions) {
-                                return 
pulsar.getPulsarResources().getTopicResources().persistentTopicExists(topic);
-                            }
-                            return CompletableFuture.completedFuture(false);
-                        });
-            } else {
-                return 
pulsar.getPulsarResources().getTopicResources().persistentTopicExists(topic);
-            }
+        CompletableFuture<Boolean> future;
+        // If the topic is persistent and the name includes `-partition-`, 
find the topic from the managed/ledger.
+        if (topic.isPersistent() && topic.isPartitioned()) {
+            future = 
pulsar.getPulsarResources().getTopicResources().persistentTopicExists(topic);
         } else {
-            if (topic.isPartitioned()) {
-                final TopicName partitionedTopicName = 
TopicName.get(topic.getPartitionedTopicName());
-                return pulsar.getBrokerService()
-                        
.fetchPartitionedTopicMetadataAsync(partitionedTopicName)
-                        .thenApply((metadata) -> topic.getPartitionIndex() < 
metadata.partitions);
-            } else {
-                // only checks and don't do any topic creating and loading.
-                CompletableFuture<Optional<Topic>> topicFuture =
-                        
pulsar.getBrokerService().getTopics().get(topic.toString());
-                if (topicFuture == null) {
-                    return CompletableFuture.completedFuture(false);
-                } else {
-                    return 
topicFuture.thenApply(Optional::isPresent).exceptionally(throwable -> {
-                        LOG.warn("[{}] topicFuture completed with exception 
when checkTopicExists, {}",
-                                topic, throwable.getMessage());
-                        return false;
-                    });
-                }
-            }
+            future = CompletableFuture.completedFuture(false);
         }
+
+        return future.thenCompose(found -> {
+            if (found != null && found) {
+                return CompletableFuture.completedFuture(true);
+            }
+
+            return pulsar.getBrokerService()
+                    
.fetchPartitionedTopicMetadataAsync(TopicName.get(topic.getPartitionedTopicName()))
+                    .thenCompose(metadata -> {
+                        if (metadata.partitions > 0) {
+                            return CompletableFuture.completedFuture(true);
+                        }
+
+                        if (topic.isPersistent()) {
+                            return 
pulsar.getPulsarResources().getTopicResources().persistentTopicExists(topic);
+                        } else {
+                            // The non-partitioned non-persistent topic only 
exist in the broker topics.
+                            CompletableFuture<Optional<Topic>> 
nonPersistentTopicFuture =
+                                    
pulsar.getBrokerService().getTopics().get(topic.toString());
+                            if (nonPersistentTopicFuture == null) {
+                                return 
CompletableFuture.completedFuture(false);
+                            } else {
+                                return 
nonPersistentTopicFuture.thenApply(Optional::isPresent);
+                            }
+                        }
+                    });
+        });
     }
 
     public CompletableFuture<List<String>> getListOfTopics(NamespaceName 
namespaceName, Mode mode) {
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/namespace/NamespaceServiceTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/namespace/NamespaceServiceTest.java
index 2e584489c06..c22e49e5fea 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/namespace/NamespaceServiceTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/namespace/NamespaceServiceTest.java
@@ -45,6 +45,7 @@ import java.util.List;
 import java.util.Map;
 import java.util.Optional;
 import java.util.Set;
+import java.util.UUID;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.TimeUnit;
@@ -73,6 +74,7 @@ import 
org.apache.pulsar.common.naming.NamespaceBundleSplitAlgorithm;
 import org.apache.pulsar.common.naming.NamespaceBundles;
 import org.apache.pulsar.common.naming.NamespaceName;
 import org.apache.pulsar.common.naming.ServiceUnitId;
+import org.apache.pulsar.common.naming.TopicDomain;
 import org.apache.pulsar.common.naming.TopicName;
 import org.apache.pulsar.common.policies.data.BundlesData;
 import org.apache.pulsar.common.policies.data.LocalPolicies;
@@ -95,6 +97,7 @@ import org.slf4j.LoggerFactory;
 import org.testng.Assert;
 import org.testng.annotations.AfterMethod;
 import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.DataProvider;
 import org.testng.annotations.Test;
 
 @Test(groups = "flaky")
@@ -799,6 +802,30 @@ public class NamespaceServiceTest extends BrokerTestBase {
         assertFalse(getResult.isPresent());
     }
 
+    @DataProvider(name = "topicDomain")
+    public Object[] topicDomain() {
+        return new Object[]{
+                TopicDomain.persistent.value(),
+                TopicDomain.non_persistent.value()
+        };
+    }
+
+    @Test(dataProvider = "topicDomain")
+    public void testCheckTopicExists(String topicDomain) throws Exception {
+        String topic = topicDomain + "://prop/ns-abc/" + UUID.randomUUID();
+        admin.topics().createNonPartitionedTopic(topic);
+        Awaitility.await().untilAsserted(() -> {
+            
assertTrue(pulsar.getNamespaceService().checkTopicExists(TopicName.get(topic)).get());
+        });
+
+        String partitionedTopic = topicDomain + "://prop/ns-abc/" + 
UUID.randomUUID();
+        admin.topics().createPartitionedTopic(partitionedTopic, 5);
+        Awaitility.await().untilAsserted(() -> {
+            
assertTrue(pulsar.getNamespaceService().checkTopicExists(TopicName.get(partitionedTopic)).get());
+            
assertTrue(pulsar.getNamespaceService().checkTopicExists(TopicName.get(partitionedTopic
 + "-partition-2")).get());
+        });
+    }
+
     /**
      * 1. Manually trigger "LoadReportUpdaterTask"
      * 2. Registry another new zk-node-listener "waitForBrokerChangeNotice".

Reply via email to