codelipenghui commented on code in PR #21545:
URL: https://github.com/apache/pulsar/pull/21545#discussion_r1386538040
##########
pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TopicAutoCreationTest.java:
##########
@@ -186,4 +191,56 @@ public void
testPartitionedTopicAutoCreationForbiddenDuringNamespaceDeletion()
}
}
+
+ @Test
+ public void testAutoCreationGotNotFoundException() throws
PulsarAdminException, PulsarClientException {
+ final String namespace = "public/test_1";
+ final String topicName =
"persistent://public/test_1/test_auto_creation_got_not_found"
+ + System.currentTimeMillis();
+ final int retryTimes = 30;
+ admin.namespaces().createNamespace(namespace);
+ admin.namespaces().setAutoTopicCreation(namespace,
AutoTopicCreationOverride.builder()
+ .allowAutoTopicCreation(true)
+ .topicType("non-partitioned")
+ .build());
+
+ @Cleanup("shutdown")
+ final ExecutorService executor1 = Executors.newSingleThreadExecutor();
+
+ @Cleanup("shutdown")
+ final ExecutorService executor2 = Executors.newSingleThreadExecutor();
+
+ for (int i = 0; i < retryTimes; i++) {
+ final CompletableFuture<Void> adminListSub =
CompletableFuture.runAsync(() -> {
+ try {
+ admin.topics().getSubscriptions(topicName);
+ } catch (PulsarAdminException e) {
+ throw new RuntimeException(e);
+ }
+ }, executor1);
+
+ final CompletableFuture<Consumer<byte[]>> consumerSub =
CompletableFuture.supplyAsync(() -> {
+ try {
+ return pulsarClient.newConsumer()
+ .topic(topicName)
+ .subscriptionName("sub-1")
+ .subscribe();
+ } catch (PulsarClientException e) {
+ throw new RuntimeException(e);
+ }
+ }, executor2);
+
+ try {
+ adminListSub.join();
+ } catch (Throwable ex) {
+ System.out.println(ex);
+ }
Review Comment:
We don't need to catch the exception, right?
##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java:
##########
@@ -1075,10 +1075,23 @@ public CompletableFuture<Optional<Topic>>
getTopic(final TopicName topicName, bo
return
loadOrCreatePersistentTopic(tpName, createIfMissing,
properties, topicPolicies);
}
- return
CompletableFuture.completedFuture(Optional.empty());
+ final String info =
+ String.format("Creating a
topic encountered an illegal partition name."
+ + "
topic_name=%s metadata_partition_number=%s",
+ topicName,
metadata.partitions);
+ log.warn(info);
+ return CompletableFuture
+ .failedFuture(new
BrokerServiceException.NamingException(info));
});
}
return loadOrCreatePersistentTopic(tpName,
createIfMissing, properties, topicPolicies);
+ }).thenCompose(optionalTopic -> {
Review Comment:
Can we change `optionalTopic` -> `Pair<isValidTopic, optionalTopic>` so that
we don't need to return a failed future at
https://github.com/apache/pulsar/pull/21545/files#diff-0210356c8a88e4efa89eb769a027fa6c166db479dbad8bbbbc704c6ed6e317f5R1083
Then we should only retry to load the topic when `if(isValidTopic &&
optionalTopic.isEmptry() && createIfMissing)`
Following this pattern, we will not change any behavior related to the
invalid partition name. If it also makes sense to improve the invalid partition
name, it should be happened in another PR.
##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java:
##########
@@ -1075,10 +1075,23 @@ public CompletableFuture<Optional<Topic>>
getTopic(final TopicName topicName, bo
return
loadOrCreatePersistentTopic(tpName, createIfMissing,
properties, topicPolicies);
}
- return
CompletableFuture.completedFuture(Optional.empty());
+ final String info =
+ String.format("Creating a
topic encountered an illegal partition name."
+ + "
topic_name=%s metadata_partition_number=%s",
+ topicName,
metadata.partitions);
+ log.warn(info);
+ return CompletableFuture
+ .failedFuture(new
BrokerServiceException.NamingException(info));
});
}
return loadOrCreatePersistentTopic(tpName,
createIfMissing, properties, topicPolicies);
+ }).thenCompose(optionalTopic -> {
+ if (!optionalTopic.isPresent() && createIfMissing) {
+ log.warn("Different topic automatic creation
strategies lead to race conditions. "
+ + "Try again to try to recover.
topic_name={}", topicName);
+ return getTopic(topicName, true, properties);
Review Comment:
```suggestion
return getTopic(topicName, createIfMissing,
properties);
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]