codelipenghui commented on code in PR #21545:
URL: https://github.com/apache/pulsar/pull/21545#discussion_r1386538040


##########
pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TopicAutoCreationTest.java:
##########
@@ -186,4 +191,56 @@ public void 
testPartitionedTopicAutoCreationForbiddenDuringNamespaceDeletion()
         }
 
     }
+
+    @Test
+    public void testAutoCreationGotNotFoundException() throws 
PulsarAdminException, PulsarClientException {
+        final String namespace = "public/test_1";
+        final String topicName = 
"persistent://public/test_1/test_auto_creation_got_not_found"
+                + System.currentTimeMillis();
+        final int retryTimes = 30;
+        admin.namespaces().createNamespace(namespace);
+        admin.namespaces().setAutoTopicCreation(namespace, 
AutoTopicCreationOverride.builder()
+                .allowAutoTopicCreation(true)
+                .topicType("non-partitioned")
+                .build());
+
+        @Cleanup("shutdown")
+        final ExecutorService executor1 = Executors.newSingleThreadExecutor();
+
+        @Cleanup("shutdown")
+        final ExecutorService executor2 = Executors.newSingleThreadExecutor();
+
+        for (int i = 0; i < retryTimes; i++) {
+            final CompletableFuture<Void> adminListSub = 
CompletableFuture.runAsync(() -> {
+                try {
+                    admin.topics().getSubscriptions(topicName);
+                } catch (PulsarAdminException e) {
+                    throw new RuntimeException(e);
+                }
+            }, executor1);
+
+            final CompletableFuture<Consumer<byte[]>> consumerSub = 
CompletableFuture.supplyAsync(() -> {
+                try {
+                    return pulsarClient.newConsumer()
+                            .topic(topicName)
+                            .subscriptionName("sub-1")
+                            .subscribe();
+                } catch (PulsarClientException e) {
+                    throw new RuntimeException(e);
+                }
+            }, executor2);
+
+            try {
+                adminListSub.join();
+            } catch (Throwable ex) {
+                System.out.println(ex);
+            }

Review Comment:
   We don't need to catch the exception, right?



##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java:
##########
@@ -1075,10 +1075,23 @@ public CompletableFuture<Optional<Topic>> 
getTopic(final TopicName topicName, bo
                                             return 
loadOrCreatePersistentTopic(tpName, createIfMissing,
                                                     properties, topicPolicies);
                                         }
-                                        return 
CompletableFuture.completedFuture(Optional.empty());
+                                        final String info =
+                                                String.format("Creating a 
topic encountered an illegal partition name."
+                                                                + " 
topic_name=%s metadata_partition_number=%s",
+                                                        topicName, 
metadata.partitions);
+                                        log.warn(info);
+                                        return CompletableFuture
+                                                .failedFuture(new 
BrokerServiceException.NamingException(info));
                                     });
                         }
                         return loadOrCreatePersistentTopic(tpName, 
createIfMissing, properties, topicPolicies);
+                    }).thenCompose(optionalTopic -> {

Review Comment:
   Can we change `optionalTopic` -> `Pair<isValidTopic, optionalTopic>` so that 
we don't need to return a failed future at 
https://github.com/apache/pulsar/pull/21545/files#diff-0210356c8a88e4efa89eb769a027fa6c166db479dbad8bbbbc704c6ed6e317f5R1083
   
   Then we should only retry to load the topic when `if(isValidTopic && 
optionalTopic.isEmptry() && createIfMissing)`
   
   Following this pattern, we will not change any behavior related to the 
invalid partition name. If it also makes sense to improve the invalid partition 
name, it should be happened in another PR.



##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java:
##########
@@ -1075,10 +1075,23 @@ public CompletableFuture<Optional<Topic>> 
getTopic(final TopicName topicName, bo
                                             return 
loadOrCreatePersistentTopic(tpName, createIfMissing,
                                                     properties, topicPolicies);
                                         }
-                                        return 
CompletableFuture.completedFuture(Optional.empty());
+                                        final String info =
+                                                String.format("Creating a 
topic encountered an illegal partition name."
+                                                                + " 
topic_name=%s metadata_partition_number=%s",
+                                                        topicName, 
metadata.partitions);
+                                        log.warn(info);
+                                        return CompletableFuture
+                                                .failedFuture(new 
BrokerServiceException.NamingException(info));
                                     });
                         }
                         return loadOrCreatePersistentTopic(tpName, 
createIfMissing, properties, topicPolicies);
+                    }).thenCompose(optionalTopic -> {
+                        if (!optionalTopic.isPresent() && createIfMissing) {
+                            log.warn("Different topic automatic creation 
strategies lead to race conditions. "
+                                    + "Try again to try to recover. 
topic_name={}", topicName);
+                            return getTopic(topicName, true, properties);

Review Comment:
   ```suggestion
                               return getTopic(topicName, createIfMissing, 
properties);
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to