poorbarcode commented on code in PR #22206:
URL: https://github.com/apache/pulsar/pull/22206#discussion_r1607046581


##########
pulsar-client-api/src/main/java/org/apache/pulsar/client/api/PulsarClient.java:
##########
@@ -308,14 +308,32 @@ static ClientBuilder builder() {
      *
      * <p>This can be used to discover the partitions and create {@link 
Reader}, {@link Consumer} or {@link Producer}
      * instances directly on a particular partition.
-     *
+     * @Deprecated it is not suggested to use now; please use {@link 
#getPartitionsForTopic(String, boolean)}.
      * @param topic
      *            the topic name
      * @return a future that will yield a list of the topic partitions or 
{@link PulsarClientException} if there was any
      *         error in the operation.
+     *
      * @since 2.3.0
      */
-    CompletableFuture<List<String>> getPartitionsForTopic(String topic);
+    @Deprecated
+    default CompletableFuture<List<String>> getPartitionsForTopic(String 
topic) {
+        return getPartitionsForTopic(topic, true);
+    }
+
+    /**
+     * 1. Get the partitions if the topic exists. Return "[{partition-0}, 
{partition-1}....{partition-n}}]" if a
+     *   partitioned topic exists; return "[{topic}]" if a non-partitioned 
topic exists.
+     * 2. When {@param metadataAutoCreationEnabled} is "false", neither the 
partitioned topic nor non-partitioned
+     *   topic does not exist. You will get an {@link 
PulsarClientException.NotFoundException}.
+     *  2-1. You will get a {@link 
PulsarClientException.NotSupportedException} if the broker's version is an older
+     *    one that does not support this feature and the Pulsar client is 
using a binary protocol "serviceUrl".

Review Comment:
   Sure, fixed.



##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java:
##########
@@ -607,6 +609,44 @@ protected void 
handlePartitionMetadataRequest(CommandPartitionedTopicMetadata pa
             isTopicOperationAllowed(topicName, TopicOperation.LOOKUP, 
authenticationData, originalAuthData).thenApply(
                     isAuthorized -> {
                 if (isAuthorized) {
+                    // Get if exists, respond not found error if not exists.
+                    if (!partitionMetadata.isMetadataAutoCreationEnabled()) {
+                        final NamespaceResources namespaceResources = 
getBrokerService().pulsar().getPulsarResources()
+                                .getNamespaceResources();
+                        final TopicResources topicResources = 
getBrokerService().pulsar().getPulsarResources()
+                                .getTopicResources();
+                        return 
namespaceResources.getPartitionedTopicResources()

Review Comment:
   Removed.



##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java:
##########
@@ -607,6 +609,44 @@ protected void 
handlePartitionMetadataRequest(CommandPartitionedTopicMetadata pa
             isTopicOperationAllowed(topicName, TopicOperation.LOOKUP, 
authenticationData, originalAuthData).thenApply(
                     isAuthorized -> {
                 if (isAuthorized) {
+                    // Get if exists, respond not found error if not exists.
+                    if (!partitionMetadata.isMetadataAutoCreationEnabled()) {
+                        final NamespaceResources namespaceResources = 
getBrokerService().pulsar().getPulsarResources()
+                                .getNamespaceResources();
+                        final TopicResources topicResources = 
getBrokerService().pulsar().getPulsarResources()
+                                .getTopicResources();
+                        return 
namespaceResources.getPartitionedTopicResources()
+                            .getPartitionedTopicMetadataAsync(topicName, false)
+                            .thenAccept(metadata -> {
+                                if (metadata.isPresent()) {
+                                    
commandSender.sendPartitionMetadataResponse(metadata.get().partitions, 
requestId);
+                                    lookupSemaphore.release();
+                                    return;
+                                }
+                                if (topicName.isPersistent()) {
+                                    
topicResources.persistentTopicExists(topicName).thenAccept(exists -> {
+                                        if (exists) {
+                                            
commandSender.sendPartitionMetadataResponse(0, requestId);
+                                            lookupSemaphore.release();
+                                            return;
+                                        }
+                                        
writeAndFlush(Commands.newPartitionMetadataResponse(ServerError.TopicNotFound,
+                                                        "", requestId));
+                                        lookupSemaphore.release();
+                                    }).exceptionally(ex -> {
+                                        log.error("{} {} Failed to get 
partition metadata", topicName,
+                                                ctx.channel(), ex);
+                                        writeAndFlush(
+                                                
Commands.newPartitionMetadataResponse(ServerError.AuthorizationError,
+                                                "Failed to check partition 
metadata",
+                                                requestId));
+                                        lookupSemaphore.release();
+                                        return null;
+                                    });
+                                }
+                            });

Review Comment:
   Fixed. Thanks



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to