gaoran10 commented on code in PR #22206:
URL: https://github.com/apache/pulsar/pull/22206#discussion_r1607978894


##########
pulsar-client-api/src/main/java/org/apache/pulsar/client/api/PulsarClient.java:
##########
@@ -308,14 +308,32 @@ static ClientBuilder builder() {
      *
      * <p>This can be used to discover the partitions and create {@link 
Reader}, {@link Consumer} or {@link Producer}
      * instances directly on a particular partition.
-     *
+     * @Deprecated it is not suggested to use now; please use {@link 
#getPartitionsForTopic(String, boolean)}.
      * @param topic
      *            the topic name
      * @return a future that will yield a list of the topic partitions or 
{@link PulsarClientException} if there was any
      *         error in the operation.
+     *
      * @since 2.3.0
      */
-    CompletableFuture<List<String>> getPartitionsForTopic(String topic);
+    @Deprecated
+    default CompletableFuture<List<String>> getPartitionsForTopic(String 
topic) {
+        return getPartitionsForTopic(topic, true);
+    }
+
+    /**
+     * 1. Get the partitions if the topic exists. Return "[{partition-0}, 
{partition-1}....{partition-n}}]" if a
+     *   partitioned topic exists; return "[{topic}]" if a non-partitioned 
topic exists.
+     * 2. When {@param metadataAutoCreationEnabled} is "false", neither the 
partitioned topic nor non-partitioned
+     *   topic does not exist. You will get an {@link 
PulsarClientException.NotFoundException}.

Review Comment:
   Maybe this should be `PulsarClientException.TopicDoesNotExistException`.



##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java:
##########
@@ -607,35 +609,89 @@ protected void 
handlePartitionMetadataRequest(CommandPartitionedTopicMetadata pa
             isTopicOperationAllowed(topicName, TopicOperation.LOOKUP, 
authenticationData, originalAuthData).thenApply(
                     isAuthorized -> {
                 if (isAuthorized) {
-                    
unsafeGetPartitionedTopicMetadataAsync(getBrokerService().pulsar(), topicName)
-                        .handle((metadata, ex) -> {
-                                if (ex == null) {
-                                    int partitions = metadata.partitions;
-                                    
commandSender.sendPartitionMetadataResponse(partitions, requestId);
-                                } else {
-                                    if (ex instanceof PulsarClientException) {
-                                        log.warn("Failed to authorize {} at 
[{}] on topic {} : {}", getRole(),
-                                                remoteAddress, topicName, 
ex.getMessage());
-                                        
commandSender.sendPartitionMetadataResponse(ServerError.AuthorizationError,
-                                                ex.getMessage(), requestId);
+                    // Get if exists, respond not found error if not exists.
+                    
getBrokerService().isAllowAutoTopicCreationAsync(topicName).thenAccept(brokerAllowAutoCreate
 -> {
+                        boolean autoCreateIfNotExist = 
partitionMetadata.isMetadataAutoCreationEnabled();
+                        if (!autoCreateIfNotExist) {
+                            final NamespaceResources namespaceResources = 
getBrokerService().pulsar()
+                                    
.getPulsarResources().getNamespaceResources();
+                            final TopicResources topicResources = 
getBrokerService().pulsar().getPulsarResources()
+                                    .getTopicResources();
+                            namespaceResources.getPartitionedTopicResources()
+                                .getPartitionedTopicMetadataAsync(topicName, 
false)
+                                .thenAccept(metadata -> {
+                                    if (metadata.isPresent()) {
+                                        
commandSender.sendPartitionMetadataResponse(metadata.get().partitions,
+                                                requestId);
+                                        lookupSemaphore.release();
+                                        return;
+                                    }
+                                    if (topicName.isPersistent()) {
+                                        
topicResources.persistentTopicExists(topicName).thenAccept(exists -> {
+                                            if (exists) {
+                                                
commandSender.sendPartitionMetadataResponse(0, requestId);
+                                                lookupSemaphore.release();
+                                                return;
+                                            }
+                                            
writeAndFlush(Commands.newPartitionMetadataResponse(
+                                                    ServerError.TopicNotFound, 
"", requestId));
+                                            lookupSemaphore.release();
+                                            return;
+                                        }).exceptionally(ex -> {
+                                            log.error("{} {} Failed to get 
partition metadata", topicName,
+                                                    ServerCnx.this.toString(), 
ex);
+                                            writeAndFlush(
+                                                    
Commands.newPartitionMetadataResponse(ServerError.MetadataError,
+                                                            "Failed to check 
partition metadata",
+                                                            requestId));
+                                            lookupSemaphore.release();
+                                            return null;
+                                        });
+                                    }

Review Comment:
   Do we need to release the `lookupSemaphore` when handling a non-persistent 
topic?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to