This is an automated email from the ASF dual-hosted git repository.

eolivelli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new a93f30f836e [fix][broker] Allow user lookup topic name with 
`-partition-` but no metadata (#19171)
a93f30f836e is described below

commit a93f30f836e1766da9a8b0d9c16e58fc872a7777
Author: Qiang Zhao <[email protected]>
AuthorDate: Thu Jan 12 17:21:21 2023 +0800

    [fix][broker] Allow user lookup topic name with `-partition-` but no 
metadata (#19171)
---
 .../pulsar/broker/namespace/NamespaceService.java  | 15 +++++++----
 .../service/persistent/PersistentTopicTest.java    | 31 ++++++++++++++++++++++
 2 files changed, 41 insertions(+), 5 deletions(-)

diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java
index 27df77e815d..84bce75bf5a 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java
@@ -1165,11 +1165,16 @@ public class NamespaceService implements AutoCloseable {
     public CompletableFuture<Boolean> checkTopicExists(TopicName topic) {
         if (topic.isPersistent()) {
             if (topic.isPartitioned()) {
-                return 
pulsar.getPulsarResources().getNamespaceResources().getPartitionedTopicResources()
-                        
.partitionedTopicExistsAsync(TopicName.get(topic.getPartitionedTopicName()))
-                        .thenCompose(exists -> exists
-                                ? 
pulsar.getPulsarResources().getTopicResources().persistentTopicExists(topic)
-                                : CompletableFuture.completedFuture(false));
+                return pulsar.getBrokerService()
+                        
.fetchPartitionedTopicMetadataAsync(TopicName.get(topic.getPartitionedTopicName()))
+                        .thenCompose(metadata -> {
+                            // Allow creating the non-partitioned persistent 
topic that name includes `-partition-`
+                            if (metadata.partitions == 0
+                                    || topic.getPartitionIndex() < 
metadata.partitions) {
+                                return 
pulsar.getPulsarResources().getTopicResources().persistentTopicExists(topic);
+                            }
+                            return CompletableFuture.completedFuture(false);
+                        });
             } else {
                 return 
pulsar.getPulsarResources().getTopicResources().persistentTopicExists(topic);
             }
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentTopicTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentTopicTest.java
index 19c5bd5c9aa..6f9c260c8ff 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentTopicTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentTopicTest.java
@@ -422,4 +422,35 @@ public class PersistentTopicTest extends BrokerTestBase {
         }
         
Assert.assertEquals(admin.topics().getPartitionedTopicMetadata(topicName).partitions,
 4);
     }
+
+    @Test
+    public void testCompatibilityWithPartitionKeyword() throws 
PulsarAdminException, PulsarClientException {
+        final String topicName = 
"persistent://prop/ns-abc/testCompatibilityWithPartitionKeyword";
+        TopicName topicNameEntity = TopicName.get(topicName);
+        String partition2 = topicNameEntity.getPartition(2).toString();
+        // Create a non-partitioned topic with -partition- keyword
+        Producer<byte[]> producer = pulsarClient.newProducer()
+                .topic(partition2)
+                .create();
+        List<String> topics = admin.topics().getList("prop/ns-abc");
+        // Close previous producer to simulate reconnect
+        producer.close();
+        // Disable auto topic creation
+        conf.setAllowAutoTopicCreation(false);
+        // Check the topic exist in the list.
+        Assert.assertTrue(topics.contains(partition2));
+        // Check this topic has no partition metadata.
+        Assert.assertThrows(PulsarAdminException.NotFoundException.class,
+                () -> admin.topics().getPartitionedTopicMetadata(topicName));
+        // Reconnect to the broker and expect successful because the topic has 
existed in the broker.
+        producer = pulsarClient.newProducer()
+                .topic(partition2)
+                .create();
+        producer.close();
+        // Check the topic exist in the list again.
+        Assert.assertTrue(topics.contains(partition2));
+        // Check this topic has no partition metadata again.
+        Assert.assertThrows(PulsarAdminException.NotFoundException.class,
+                () -> admin.topics().getPartitionedTopicMetadata(topicName));
+    }
 }

Reply via email to