This is an automated email from the ASF dual-hosted git repository.
zixuan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new f9705346809 [improve][broker] checkTopicExists supports checking
partitioned topic without index (#21701)
f9705346809 is described below
commit f97053468094103af7c7bb488bfb0973e3f37e6f
Author: Zixuan Liu <[email protected]>
AuthorDate: Tue Dec 19 12:00:54 2023 +0800
[improve][broker] checkTopicExists supports checking partitioned topic
without index (#21701)
Signed-off-by: Zixuan Liu <[email protected]>
---
.../pulsar/broker/namespace/NamespaceService.java | 66 +++++++++++-----------
.../broker/namespace/NamespaceServiceTest.java | 27 +++++++++
2 files changed, 59 insertions(+), 34 deletions(-)
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java
index 0d35e7cad69..4a54d4e0908 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java
@@ -1351,42 +1351,40 @@ public class NamespaceService implements AutoCloseable {
}
public CompletableFuture<Boolean> checkTopicExists(TopicName topic) {
- if (topic.isPersistent()) {
- if (topic.isPartitioned()) {
- return pulsar.getBrokerService()
-
.fetchPartitionedTopicMetadataAsync(TopicName.get(topic.getPartitionedTopicName()))
- .thenCompose(metadata -> {
- // Allow creating the non-partitioned persistent
topic that name includes `-partition-`
- if (metadata.partitions == 0
- || topic.getPartitionIndex() <
metadata.partitions) {
- return
pulsar.getPulsarResources().getTopicResources().persistentTopicExists(topic);
- }
- return CompletableFuture.completedFuture(false);
- });
- } else {
- return
pulsar.getPulsarResources().getTopicResources().persistentTopicExists(topic);
- }
+ CompletableFuture<Boolean> future;
+ // If the topic is persistent and the name includes `-partition-`,
find the topic from the managed/ledger.
+ if (topic.isPersistent() && topic.isPartitioned()) {
+ future =
pulsar.getPulsarResources().getTopicResources().persistentTopicExists(topic);
} else {
- if (topic.isPartitioned()) {
- final TopicName partitionedTopicName =
TopicName.get(topic.getPartitionedTopicName());
- return pulsar.getBrokerService()
-
.fetchPartitionedTopicMetadataAsync(partitionedTopicName)
- .thenApply((metadata) -> topic.getPartitionIndex() <
metadata.partitions);
- } else {
- // only checks and don't do any topic creating and loading.
- CompletableFuture<Optional<Topic>> topicFuture =
-
pulsar.getBrokerService().getTopics().get(topic.toString());
- if (topicFuture == null) {
- return CompletableFuture.completedFuture(false);
- } else {
- return
topicFuture.thenApply(Optional::isPresent).exceptionally(throwable -> {
- LOG.warn("[{}] topicFuture completed with exception
when checkTopicExists, {}",
- topic, throwable.getMessage());
- return false;
- });
- }
- }
+ future = CompletableFuture.completedFuture(false);
}
+
+ return future.thenCompose(found -> {
+ if (found != null && found) {
+ return CompletableFuture.completedFuture(true);
+ }
+
+ return pulsar.getBrokerService()
+
.fetchPartitionedTopicMetadataAsync(TopicName.get(topic.getPartitionedTopicName()))
+ .thenCompose(metadata -> {
+ if (metadata.partitions > 0) {
+ return CompletableFuture.completedFuture(true);
+ }
+
+ if (topic.isPersistent()) {
+ return
pulsar.getPulsarResources().getTopicResources().persistentTopicExists(topic);
+ } else {
+ // The non-partitioned non-persistent topic only
exist in the broker topics.
+ CompletableFuture<Optional<Topic>>
nonPersistentTopicFuture =
+
pulsar.getBrokerService().getTopics().get(topic.toString());
+ if (nonPersistentTopicFuture == null) {
+ return
CompletableFuture.completedFuture(false);
+ } else {
+ return
nonPersistentTopicFuture.thenApply(Optional::isPresent);
+ }
+ }
+ });
+ });
}
public CompletableFuture<List<String>> getListOfTopics(NamespaceName
namespaceName, Mode mode) {
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/namespace/NamespaceServiceTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/namespace/NamespaceServiceTest.java
index 2e584489c06..c22e49e5fea 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/namespace/NamespaceServiceTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/namespace/NamespaceServiceTest.java
@@ -45,6 +45,7 @@ import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
+import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeUnit;
@@ -73,6 +74,7 @@ import
org.apache.pulsar.common.naming.NamespaceBundleSplitAlgorithm;
import org.apache.pulsar.common.naming.NamespaceBundles;
import org.apache.pulsar.common.naming.NamespaceName;
import org.apache.pulsar.common.naming.ServiceUnitId;
+import org.apache.pulsar.common.naming.TopicDomain;
import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.policies.data.BundlesData;
import org.apache.pulsar.common.policies.data.LocalPolicies;
@@ -95,6 +97,7 @@ import org.slf4j.LoggerFactory;
import org.testng.Assert;
import org.testng.annotations.AfterMethod;
import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.DataProvider;
import org.testng.annotations.Test;
@Test(groups = "flaky")
@@ -799,6 +802,30 @@ public class NamespaceServiceTest extends BrokerTestBase {
assertFalse(getResult.isPresent());
}
+ @DataProvider(name = "topicDomain")
+ public Object[] topicDomain() {
+ return new Object[]{
+ TopicDomain.persistent.value(),
+ TopicDomain.non_persistent.value()
+ };
+ }
+
+ @Test(dataProvider = "topicDomain")
+ public void testCheckTopicExists(String topicDomain) throws Exception {
+ String topic = topicDomain + "://prop/ns-abc/" + UUID.randomUUID();
+ admin.topics().createNonPartitionedTopic(topic);
+ Awaitility.await().untilAsserted(() -> {
+
assertTrue(pulsar.getNamespaceService().checkTopicExists(TopicName.get(topic)).get());
+ });
+
+ String partitionedTopic = topicDomain + "://prop/ns-abc/" +
UUID.randomUUID();
+ admin.topics().createPartitionedTopic(partitionedTopic, 5);
+ Awaitility.await().untilAsserted(() -> {
+
assertTrue(pulsar.getNamespaceService().checkTopicExists(TopicName.get(partitionedTopic)).get());
+
assertTrue(pulsar.getNamespaceService().checkTopicExists(TopicName.get(partitionedTopic
+ "-partition-2")).get());
+ });
+ }
+
/**
* 1. Manually trigger "LoadReportUpdaterTask"
* 2. Registry another new zk-node-listener "waitForBrokerChangeNotice".