This is an automated email from the ASF dual-hosted git repository.
technoboy pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new a49fbf6b6a6 [improve][broker] Using TopicName instead of String as the
parameter for `getTopic`. (#17416)
a49fbf6b6a6 is described below
commit a49fbf6b6a67c7ce11983e7105221997f8d95a3a
Author: Jiwei Guo <[email protected]>
AuthorDate: Mon Sep 5 13:07:59 2022 +0800
[improve][broker] Using TopicName instead of String as the parameter for
`getTopic`. (#17416)
---
.../pulsar/broker/service/BrokerService.java | 26 +++++++++++++---------
.../pulsar/broker/service/BrokerServiceTest.java | 24 ++++++++++++++++++++
2 files changed, 39 insertions(+), 11 deletions(-)
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
index 8985cb9ec07..35a5e8ad34d 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
@@ -953,13 +953,18 @@ public class BrokerService implements Closeable {
public CompletableFuture<Optional<Topic>> getTopic(final String topic,
boolean createIfMissing,
Map<String, String>
properties) {
+ return getTopic(TopicName.get(topic), createIfMissing, properties);
+ }
+
+ public CompletableFuture<Optional<Topic>> getTopic(final TopicName
topicName, boolean createIfMissing,
+ Map<String, String>
properties) {
try {
- CompletableFuture<Optional<Topic>> topicFuture = topics.get(topic);
+ CompletableFuture<Optional<Topic>> topicFuture =
topics.get(topicName.toString());
if (topicFuture != null) {
if (topicFuture.isCompletedExceptionally()
|| (topicFuture.isDone() &&
!topicFuture.getNow(Optional.empty()).isPresent())) {
// Exceptional topics should be recreated.
- topics.remove(topic, topicFuture);
+ topics.remove(topicName.toString(), topicFuture);
} else {
// a non-existing topic in the cache shouldn't prevent
creating a topic
if (createIfMissing) {
@@ -969,7 +974,7 @@ public class BrokerService implements Closeable {
return topicFuture.thenCompose(value -> {
if (!value.isPresent()) {
// retry and create topic
- return getTopic(topic, createIfMissing,
properties);
+ return getTopic(topicName,
createIfMissing, properties);
} else {
// in-progress future completed
successfully
return
CompletableFuture.completedFuture(value);
@@ -981,14 +986,13 @@ public class BrokerService implements Closeable {
}
}
}
- final boolean isPersistentTopic =
TopicName.get(topic).getDomain().equals(TopicDomain.persistent);
+ final boolean isPersistentTopic =
topicName.getDomain().equals(TopicDomain.persistent);
if (isPersistentTopic) {
- return topics.computeIfAbsent(topic, (topicName) -> {
- return this.loadOrCreatePersistentTopic(topicName,
createIfMissing, properties);
+ return topics.computeIfAbsent(topicName.toString(), (k) -> {
+ return this.loadOrCreatePersistentTopic(k,
createIfMissing, properties);
});
} else {
- return topics.computeIfAbsent(topic, (name) -> {
- final TopicName topicName = TopicName.get(name);
+ return topics.computeIfAbsent(topicName.toString(), (name) -> {
if (topicName.isPartitioned()) {
final TopicName partitionedTopicName =
TopicName.get(topicName.getPartitionedTopicName());
return
this.fetchPartitionedTopicMetadataAsync(partitionedTopicName).thenCompose((metadata)
-> {
@@ -1005,14 +1009,14 @@ public class BrokerService implements Closeable {
});
}
} catch (IllegalArgumentException e) {
- log.warn("[{}] Illegalargument exception when loading topic",
topic, e);
+ log.warn("[{}] Illegalargument exception when loading topic",
topicName, e);
return FutureUtil.failedFuture(e);
} catch (RuntimeException e) {
Throwable cause = e.getCause();
if (cause instanceof ServiceUnitNotReadyException) {
- log.warn("[{}] Service unit is not ready when loading the
topic", topic);
+ log.warn("[{}] Service unit is not ready when loading the
topic", topicName);
} else {
- log.warn("[{}] Unexpected exception when loading topic: {}",
topic, e.getMessage(), e);
+ log.warn("[{}] Unexpected exception when loading topic: {}",
topicName, e.getMessage(), e);
}
return FutureUtil.failedFuture(cause);
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerServiceTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerServiceTest.java
index 1fb63470456..a2ce2c23ffe 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerServiceTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerServiceTest.java
@@ -88,6 +88,7 @@ import org.apache.pulsar.client.api.ProducerBuilder;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.api.Schema;
+import org.apache.pulsar.client.api.SubscriptionInitialPosition;
import org.apache.pulsar.client.api.SubscriptionType;
import org.apache.pulsar.client.impl.ClientCnx;
import org.apache.pulsar.client.impl.ConnectionPool;
@@ -98,6 +99,7 @@ import
org.apache.pulsar.common.api.proto.CommandLookupTopicResponse;
import
org.apache.pulsar.common.api.proto.CommandPartitionedTopicMetadataResponse;
import org.apache.pulsar.common.naming.NamespaceBundle;
import org.apache.pulsar.common.naming.NamespaceName;
+import org.apache.pulsar.common.naming.SystemTopicNames;
import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.policies.data.BundlesData;
import org.apache.pulsar.common.policies.data.LocalPolicies;
@@ -1474,4 +1476,26 @@ public class BrokerServiceTest extends BrokerTestBase {
assertTrue(brokerService.isSystemTopic("persistent://" +
heartbeatNamespaceV1.toString() + "/healthcheck"));
assertTrue(brokerService.isSystemTopic(heartbeatNamespaceV2.toString()
+ "/healthcheck"));
}
+
+ @Test
+ public void testGetTopic() throws Exception {
+ final String ns = "prop/ns-test";
+ admin.namespaces().createNamespace(ns, 2);
+ final String topicName = ns + "/topic-1";
+
admin.topics().createNonPartitionedTopic(String.format("persistent://%s",
topicName));
+ Producer<String> producer1 =
pulsarClient.newProducer(Schema.STRING).topic(topicName).create();
+ producer1.close();
+ PersistentTopic persistentTopic = (PersistentTopic)
pulsar.getBrokerService().getTopic(topicName.toString(), false).get().get();
+ persistentTopic.close().join();
+ List<String> topics = new
ArrayList<>(pulsar.getBrokerService().getTopics().keys());
+ topics.removeIf(item ->
item.contains(SystemTopicNames.NAMESPACE_EVENTS_LOCAL_NAME));
+ Assert.assertEquals(topics.size(), 0);
+ @Cleanup
+ Consumer<String> consumer = pulsarClient.newConsumer(Schema.STRING)
+ .topic(topicName)
+ .subscriptionName("sub-1")
+
.subscriptionInitialPosition(SubscriptionInitialPosition.Earliest)
+ .subscriptionType(SubscriptionType.Shared)
+ .subscribe();
+ }
}