This is an automated email from the ASF dual-hosted git repository.
eolivelli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new a93f30f836e [fix][broker] Allow user lookup topic name with
`-partition-` but no metadata (#19171)
a93f30f836e is described below
commit a93f30f836e1766da9a8b0d9c16e58fc872a7777
Author: Qiang Zhao <[email protected]>
AuthorDate: Thu Jan 12 17:21:21 2023 +0800
[fix][broker] Allow user lookup topic name with `-partition-` but no
metadata (#19171)
---
.../pulsar/broker/namespace/NamespaceService.java | 15 +++++++----
.../service/persistent/PersistentTopicTest.java | 31 ++++++++++++++++++++++
2 files changed, 41 insertions(+), 5 deletions(-)
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java
index 27df77e815d..84bce75bf5a 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java
@@ -1165,11 +1165,16 @@ public class NamespaceService implements AutoCloseable {
public CompletableFuture<Boolean> checkTopicExists(TopicName topic) {
if (topic.isPersistent()) {
if (topic.isPartitioned()) {
- return
pulsar.getPulsarResources().getNamespaceResources().getPartitionedTopicResources()
-
.partitionedTopicExistsAsync(TopicName.get(topic.getPartitionedTopicName()))
- .thenCompose(exists -> exists
- ?
pulsar.getPulsarResources().getTopicResources().persistentTopicExists(topic)
- : CompletableFuture.completedFuture(false));
+ return pulsar.getBrokerService()
+
.fetchPartitionedTopicMetadataAsync(TopicName.get(topic.getPartitionedTopicName()))
+ .thenCompose(metadata -> {
+ // Allow creating the non-partitioned persistent
topic that name includes `-partition-`
+ if (metadata.partitions == 0
+ || topic.getPartitionIndex() <
metadata.partitions) {
+ return
pulsar.getPulsarResources().getTopicResources().persistentTopicExists(topic);
+ }
+ return CompletableFuture.completedFuture(false);
+ });
} else {
return
pulsar.getPulsarResources().getTopicResources().persistentTopicExists(topic);
}
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentTopicTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentTopicTest.java
index 19c5bd5c9aa..6f9c260c8ff 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentTopicTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentTopicTest.java
@@ -422,4 +422,35 @@ public class PersistentTopicTest extends BrokerTestBase {
}
Assert.assertEquals(admin.topics().getPartitionedTopicMetadata(topicName).partitions,
4);
}
+
+ @Test
+ public void testCompatibilityWithPartitionKeyword() throws
PulsarAdminException, PulsarClientException {
+ final String topicName =
"persistent://prop/ns-abc/testCompatibilityWithPartitionKeyword";
+ TopicName topicNameEntity = TopicName.get(topicName);
+ String partition2 = topicNameEntity.getPartition(2).toString();
+ // Create a non-partitioned topic with -partition- keyword
+ Producer<byte[]> producer = pulsarClient.newProducer()
+ .topic(partition2)
+ .create();
+ List<String> topics = admin.topics().getList("prop/ns-abc");
+ // Close previous producer to simulate reconnect
+ producer.close();
+ // Disable auto topic creation
+ conf.setAllowAutoTopicCreation(false);
+ // Check the topic exist in the list.
+ Assert.assertTrue(topics.contains(partition2));
+ // Check this topic has no partition metadata.
+ Assert.assertThrows(PulsarAdminException.NotFoundException.class,
+ () -> admin.topics().getPartitionedTopicMetadata(topicName));
+ // Reconnect to the broker and expect successful because the topic has
existed in the broker.
+ producer = pulsarClient.newProducer()
+ .topic(partition2)
+ .create();
+ producer.close();
+ // Check the topic exist in the list again.
+ Assert.assertTrue(topics.contains(partition2));
+ // Check this topic has no partition metadata again.
+ Assert.assertThrows(PulsarAdminException.NotFoundException.class,
+ () -> admin.topics().getPartitionedTopicMetadata(topicName));
+ }
}