This is an automated email from the ASF dual-hosted git repository.

yubiao pushed a commit to branch branch-3.3
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit a927ef43935798d573c23910af58f5f1dd1e916a
Author: fengyubiao <[email protected]>
AuthorDate: Thu May 23 21:15:16 2024 +0800

    [fix] [client] PIP-344 Do not create partitioned metadata when calling 
pulsarClient.getPartitionsForTopic(topicName) (#22206)
    
    (cherry picked from commit 4e5c0bcc2b44c33a966287b86c2c235be249dc51)
---
 .../broker/admin/impl/PersistentTopicsBase.java    |  26 +-
 .../apache/pulsar/broker/service/ServerCnx.java    | 110 +++--
 .../broker/admin/GetPartitionMetadataTest.java     | 473 +++++++++++++++++++++
 .../pulsar/broker/admin/TopicAutoCreationTest.java |   3 +-
 .../BrokerServiceAutoTopicCreationTest.java        |   4 +-
 .../pulsar/broker/service/BrokerServiceTest.java   |  10 +-
 .../service/BrokerServiceThrottlingTest.java       |   2 +-
 .../pulsar/broker/service/ServerCnxTest.java       |   2 +-
 .../buffer/TransactionLowWaterMarkTest.java        |   4 +-
 .../pulsar/client/api/BrokerServiceLookupTest.java |   2 +-
 .../org/apache/pulsar/client/api/PulsarClient.java |  23 +-
 .../client/impl/BinaryProtoLookupService.java      |  13 +-
 .../pulsar/client/impl/ConsumerBuilderImpl.java    |   4 +-
 .../pulsar/client/impl/HttpLookupService.java      |   6 +-
 .../apache/pulsar/client/impl/LookupService.java   |  27 +-
 .../client/impl/MultiTopicsConsumerImpl.java       |   2 +-
 .../pulsar/client/impl/PulsarClientImpl.java       |  32 +-
 .../TransactionCoordinatorClientImpl.java          |   3 +-
 .../client/impl/MultiTopicsConsumerImplTest.java   |  12 +-
 .../pulsar/client/impl/PulsarClientImplTest.java   |   3 +-
 .../apache/pulsar/common/protocol/Commands.java    |   7 +-
 .../org/apache/pulsar/common/util/FutureUtil.java  |   4 +-
 pulsar-common/src/main/proto/PulsarApi.proto       |   2 +
 .../pulsar/proxy/server/LookupProxyHandler.java    |   3 +-
 24 files changed, 689 insertions(+), 88 deletions(-)

diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
index ebb92679599..f9471b03c5f 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
@@ -546,19 +546,29 @@ public class PersistentTopicsBase extends AdminResource {
                                                                           
boolean checkAllowAutoCreation) {
         return getPartitionedTopicMetadataAsync(topicName, authoritative, 
checkAllowAutoCreation)
                 .thenCompose(metadata -> {
-                    CompletableFuture<Void> ret;
-                    if (metadata.partitions == 0 && !checkAllowAutoCreation) {
+                    if (metadata.partitions > 1) {
+                        // Some clients does not support partitioned topic.
+                        return 
internalValidateClientVersionAsync().thenApply(__ -> metadata);
+                    } else if (metadata.partitions == 1) {
+                        return CompletableFuture.completedFuture(metadata);
+                    } else {
+                        // metadata.partitions == 0
                         // The topic may be a non-partitioned topic, so check 
if it exists here.
                         // However, when checkAllowAutoCreation is true, the 
client will create the topic if
                         // it doesn't exist. In this case, `partitions == 0` 
means the automatically created topic
                         // is a non-partitioned topic so we shouldn't check if 
the topic exists.
-                        ret = internalCheckTopicExists(topicName);
-                    } else if (metadata.partitions > 1) {
-                        ret = internalValidateClientVersionAsync();
-                    } else {
-                        ret = CompletableFuture.completedFuture(null);
+                        return 
pulsar().getBrokerService().isAllowAutoTopicCreationAsync(topicName)
+                                .thenCompose(brokerAllowAutoTopicCreation -> {
+                            if (checkAllowAutoCreation) {
+                                // Whether it exists or not, auto create a 
non-partitioned topic by client.
+                                return 
CompletableFuture.completedFuture(metadata);
+                            } else {
+                                // If it does not exist, response a Not Found 
error.
+                                // Otherwise, response a non-partitioned 
metadata.
+                                return 
internalCheckTopicExists(topicName).thenApply(__ -> metadata);
+                            }
+                        });
                     }
-                    return ret.thenApply(__ -> metadata);
                 });
     }
 
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
index eed75e2c450..c904c0982c2 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
@@ -83,6 +83,8 @@ import org.apache.pulsar.broker.limiter.ConnectionController;
 import 
org.apache.pulsar.broker.loadbalance.extensions.ExtensibleLoadManagerImpl;
 import org.apache.pulsar.broker.loadbalance.extensions.data.BrokerLookupData;
 import org.apache.pulsar.broker.namespace.LookupOptions;
+import org.apache.pulsar.broker.resources.NamespaceResources;
+import org.apache.pulsar.broker.resources.TopicResources;
 import 
org.apache.pulsar.broker.service.BrokerServiceException.ConsumerBusyException;
 import 
org.apache.pulsar.broker.service.BrokerServiceException.ServerMetadataException;
 import 
org.apache.pulsar.broker.service.BrokerServiceException.ServiceUnitNotReadyException;
@@ -613,35 +615,93 @@ public class ServerCnx extends PulsarHandler implements 
TransportCnx {
             isTopicOperationAllowed(topicName, TopicOperation.LOOKUP, 
authenticationData, originalAuthData).thenApply(
                     isAuthorized -> {
                 if (isAuthorized) {
-                    
unsafeGetPartitionedTopicMetadataAsync(getBrokerService().pulsar(), topicName)
-                        .handle((metadata, ex) -> {
-                                if (ex == null) {
-                                    int partitions = metadata.partitions;
-                                    
commandSender.sendPartitionMetadataResponse(partitions, requestId);
-                                } else {
-                                    if (ex instanceof PulsarClientException) {
-                                        log.warn("Failed to authorize {} at 
[{}] on topic {} : {}", getRole(),
-                                                remoteAddress, topicName, 
ex.getMessage());
-                                        
commandSender.sendPartitionMetadataResponse(ServerError.AuthorizationError,
-                                                ex.getMessage(), requestId);
+                    // Get if exists, respond not found error if not exists.
+                    
getBrokerService().isAllowAutoTopicCreationAsync(topicName).thenAccept(brokerAllowAutoCreate
 -> {
+                        boolean autoCreateIfNotExist = 
partitionMetadata.isMetadataAutoCreationEnabled();
+                        if (!autoCreateIfNotExist) {
+                            final NamespaceResources namespaceResources = 
getBrokerService().pulsar()
+                                    
.getPulsarResources().getNamespaceResources();
+                            final TopicResources topicResources = 
getBrokerService().pulsar().getPulsarResources()
+                                    .getTopicResources();
+                            namespaceResources.getPartitionedTopicResources()
+                                .getPartitionedTopicMetadataAsync(topicName, 
false)
+                                .handle((metadata, getMetadataEx) -> {
+                                    if (getMetadataEx != null) {
+                                        log.error("{} {} Failed to get 
partition metadata", topicName,
+                                                ServerCnx.this.toString(), 
getMetadataEx);
+                                        writeAndFlush(
+                                                
Commands.newPartitionMetadataResponse(ServerError.MetadataError,
+                                                        "Failed to get 
partition metadata",
+                                                        requestId));
+                                    } else if (metadata.isPresent()) {
+                                        
commandSender.sendPartitionMetadataResponse(metadata.get().partitions,
+                                                requestId);
+                                    } else if (topicName.isPersistent()) {
+                                        
topicResources.persistentTopicExists(topicName).thenAccept(exists -> {
+                                            if (exists) {
+                                                
commandSender.sendPartitionMetadataResponse(0, requestId);
+                                                return;
+                                            }
+                                            
writeAndFlush(Commands.newPartitionMetadataResponse(
+                                                    ServerError.TopicNotFound, 
"", requestId));
+                                        }).exceptionally(ex -> {
+                                            log.error("{} {} Failed to get 
partition metadata", topicName,
+                                                    ServerCnx.this.toString(), 
ex);
+                                            writeAndFlush(
+                                                    
Commands.newPartitionMetadataResponse(ServerError.MetadataError,
+                                                            "Failed to check 
partition metadata",
+                                                            requestId));
+                                            return null;
+                                        });
+                                    } else {
+                                        // Regarding non-persistent topic, we 
do not know whether it exists or not.
+                                        // Just return a non-partitioned 
metadata if partitioned metadata does not
+                                        // exist.
+                                        // Broker will respond a not found 
error when doing subscribing or producing if
+                                        // broker not allow to auto create 
topics.
+                                        
commandSender.sendPartitionMetadataResponse(0, requestId);
+                                    }
+                                    return null;
+                                }).whenComplete((ignore, ignoreEx) -> {
+                                    lookupSemaphore.release();
+                                    if (ignoreEx != null) {
+                                        log.error("{} {} Failed to handle 
partition metadata request", topicName,
+                                                ServerCnx.this.toString(), 
ignoreEx);
+                                    }
+                                });
+                        } else {
+                            // Get if exists, create a new one if not exists.
+                            
unsafeGetPartitionedTopicMetadataAsync(getBrokerService().pulsar(), topicName)
+                                .whenComplete((metadata, ex) -> {
+                                    lookupSemaphore.release();
+                                    if (ex == null) {
+                                        int partitions = metadata.partitions;
+                                        
commandSender.sendPartitionMetadataResponse(partitions, requestId);
                                     } else {
-                                        log.warn("Failed to get Partitioned 
Metadata [{}] {}: {}", remoteAddress,
-                                                topicName, ex.getMessage(), 
ex);
-                                        ServerError error = 
ServerError.ServiceNotReady;
-                                        if (ex instanceof RestException 
restException){
-                                            int responseCode = 
restException.getResponse().getStatus();
-                                            if (responseCode == 
NOT_FOUND.getStatusCode()){
-                                                error = 
ServerError.TopicNotFound;
-                                            } else if (responseCode < 
INTERNAL_SERVER_ERROR.getStatusCode()){
-                                                error = 
ServerError.MetadataError;
+                                        if (ex instanceof 
PulsarClientException) {
+                                            log.warn("Failed to authorize {} 
at [{}] on topic {} : {}", getRole(),
+                                                    remoteAddress, topicName, 
ex.getMessage());
+                                            
commandSender.sendPartitionMetadataResponse(ServerError.AuthorizationError,
+                                                    ex.getMessage(), 
requestId);
+                                        } else {
+                                            log.warn("Failed to get 
Partitioned Metadata [{}] {}: {}", remoteAddress,
+                                                    topicName, 
ex.getMessage(), ex);
+                                            ServerError error = 
ServerError.ServiceNotReady;
+                                            if (ex instanceof RestException 
restException){
+                                                int responseCode = 
restException.getResponse().getStatus();
+                                                if (responseCode == 
NOT_FOUND.getStatusCode()){
+                                                    error = 
ServerError.TopicNotFound;
+                                                } else if (responseCode < 
INTERNAL_SERVER_ERROR.getStatusCode()){
+                                                    error = 
ServerError.MetadataError;
+                                                }
                                             }
+                                            
commandSender.sendPartitionMetadataResponse(error, ex.getMessage(),
+                                                    requestId);
                                         }
-                                        
commandSender.sendPartitionMetadataResponse(error, ex.getMessage(), requestId);
                                     }
-                                }
-                                lookupSemaphore.release();
-                                return null;
-                            });
+                                });
+                        }
+                    });
                 } else {
                     final String msg = "Client is not authorized to Get 
Partition Metadata";
                     log.warn("[{}] {} with role {} on topic {}", 
remoteAddress, msg, getPrincipal(), topicName);
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/GetPartitionMetadataTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/GetPartitionMetadataTest.java
new file mode 100644
index 00000000000..51f643d2b78
--- /dev/null
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/GetPartitionMetadataTest.java
@@ -0,0 +1,473 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.admin;
+
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertFalse;
+import static org.testng.Assert.assertTrue;
+import static org.testng.Assert.fail;
+import java.util.List;
+import java.util.Optional;
+import java.util.concurrent.Semaphore;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.pulsar.broker.BrokerTestUtil;
+import org.apache.pulsar.broker.ServiceConfiguration;
+import org.apache.pulsar.client.api.ProducerConsumerBase;
+import org.apache.pulsar.client.api.PulsarClient;
+import org.apache.pulsar.client.api.PulsarClientException;
+import org.apache.pulsar.client.impl.LookupService;
+import org.apache.pulsar.client.impl.PulsarClientImpl;
+import org.apache.pulsar.common.naming.TopicDomain;
+import org.apache.pulsar.common.naming.TopicName;
+import org.apache.pulsar.common.partition.PartitionedTopicMetadata;
+import org.apache.pulsar.common.policies.data.TopicType;
+import org.apache.pulsar.common.util.FutureUtil;
+import org.awaitility.Awaitility;
+import org.testng.Assert;
+import org.testng.annotations.AfterMethod;
+import org.testng.annotations.DataProvider;
+import org.testng.annotations.Test;
+
+@Test(groups = "broker-admin")
+@Slf4j
+public class GetPartitionMetadataTest extends ProducerConsumerBase {
+
+    private static final String DEFAULT_NS = "public/default";
+
+    private PulsarClientImpl clientWithHttpLookup;
+    private PulsarClientImpl clientWitBinaryLookup;
+
+    @Override
+    protected void setup() throws Exception {
+        super.internalSetup();
+        super.producerBaseSetup();
+        clientWithHttpLookup =
+                (PulsarClientImpl) 
PulsarClient.builder().serviceUrl(pulsar.getWebServiceAddress()).build();
+        clientWitBinaryLookup =
+                (PulsarClientImpl) 
PulsarClient.builder().serviceUrl(pulsar.getBrokerServiceUrl()).build();
+    }
+
+    @Override
+    @AfterMethod(alwaysRun = true)
+    protected void cleanup() throws Exception {
+        super.internalCleanup();
+        if (clientWithHttpLookup != null) {
+            clientWithHttpLookup.close();
+        }
+        if (clientWitBinaryLookup != null) {
+            clientWitBinaryLookup.close();
+        }
+    }
+
+    @Override
+    protected void doInitConf() throws Exception {
+        super.doInitConf();
+    }
+
+    private LookupService getLookupService(boolean isUsingHttpLookup) {
+        if (isUsingHttpLookup) {
+            return clientWithHttpLookup.getLookup();
+        } else {
+            return clientWitBinaryLookup.getLookup();
+        }
+    }
+
+    @DataProvider(name = "topicDomains")
+    public Object[][] topicDomains() {
+        return new Object[][]{
+            {TopicDomain.persistent},
+            {TopicDomain.non_persistent}
+        };
+    }
+
+    @Test(dataProvider = "topicDomains")
+    public void testAutoCreatingMetadataWhenCallingOldAPI(TopicDomain 
topicDomain) throws Exception {
+        conf.setAllowAutoTopicCreationType(TopicType.PARTITIONED);
+        conf.setDefaultNumPartitions(3);
+        conf.setAllowAutoTopicCreation(true);
+        setup();
+
+        Semaphore semaphore = 
pulsar.getBrokerService().getLookupRequestSemaphore();
+        int lookupPermitsBefore = semaphore.availablePermits();
+
+        // HTTP client.
+        final String tp1 = BrokerTestUtil.newUniqueName(topicDomain.value() + 
"://" + DEFAULT_NS + "/tp");
+        clientWithHttpLookup.getPartitionsForTopic(tp1).join();
+        Optional<PartitionedTopicMetadata> metadata1 = 
pulsar.getPulsarResources().getNamespaceResources()
+                .getPartitionedTopicResources()
+                .getPartitionedTopicMetadataAsync(TopicName.get(tp1), 
true).join();
+        assertTrue(metadata1.isPresent());
+        assertEquals(metadata1.get().partitions, 3);
+
+        // Binary client.
+        final String tp2 = BrokerTestUtil.newUniqueName(topicDomain.value() + 
"://" + DEFAULT_NS + "/tp");
+        clientWitBinaryLookup.getPartitionsForTopic(tp2).join();
+        Optional<PartitionedTopicMetadata> metadata2 = 
pulsar.getPulsarResources().getNamespaceResources()
+                .getPartitionedTopicResources()
+                .getPartitionedTopicMetadataAsync(TopicName.get(tp2), 
true).join();
+        assertTrue(metadata2.isPresent());
+        assertEquals(metadata2.get().partitions, 3);
+
+        // Verify: lookup semaphore has been releases.
+        Awaitility.await().untilAsserted(() -> {
+            int lookupPermitsAfter = semaphore.availablePermits();
+            assertEquals(lookupPermitsAfter, lookupPermitsBefore);
+        });
+
+        // Cleanup.
+        admin.topics().deletePartitionedTopic(tp1, false);
+        admin.topics().deletePartitionedTopic(tp2, false);
+    }
+
+    @DataProvider(name = "autoCreationParamsAll")
+    public Object[][] autoCreationParamsAll(){
+        return new Object[][]{
+            // configAllowAutoTopicCreation, paramCreateIfAutoCreationEnabled, 
isUsingHttpLookup.
+            {true, true, true, TopicDomain.persistent},
+            {true, true, false, TopicDomain.persistent},
+            {true, false, true, TopicDomain.persistent},
+            {true, false, false, TopicDomain.persistent},
+            {false, true, true, TopicDomain.persistent},
+            {false, true, false, TopicDomain.persistent},
+            {false, false, true, TopicDomain.persistent},
+            {false, false, false, TopicDomain.persistent},
+            {true, true, true, TopicDomain.non_persistent},
+            {true, true, false, TopicDomain.non_persistent},
+            {true, false, true, TopicDomain.non_persistent},
+            {true, false, false, TopicDomain.non_persistent},
+            {false, true, true, TopicDomain.non_persistent},
+            {false, true, false, TopicDomain.non_persistent},
+            {false, false, true, TopicDomain.non_persistent},
+            {false, false, false, TopicDomain.non_persistent}
+        };
+    }
+
+    @Test(dataProvider = "autoCreationParamsAll")
+    public void testGetMetadataIfNonPartitionedTopicExists(boolean 
configAllowAutoTopicCreation,
+                                                           boolean 
paramMetadataAutoCreationEnabled,
+                                                           boolean 
isUsingHttpLookup,
+                                                           TopicDomain 
topicDomain) throws Exception {
+        conf.setAllowAutoTopicCreationType(TopicType.PARTITIONED);
+        conf.setDefaultNumPartitions(3);
+        conf.setAllowAutoTopicCreation(configAllowAutoTopicCreation);
+        setup();
+
+        Semaphore semaphore = 
pulsar.getBrokerService().getLookupRequestSemaphore();
+        int lookupPermitsBefore = semaphore.availablePermits();
+
+        LookupService lookup = getLookupService(isUsingHttpLookup);
+        // Create topic.
+        final String topicNameStr = 
BrokerTestUtil.newUniqueName(topicDomain.value() + "://" + DEFAULT_NS + "/tp");
+        final TopicName topicName = TopicName.get(topicNameStr);
+        admin.topics().createNonPartitionedTopic(topicNameStr);
+        // Verify.
+        PulsarClient client = 
PulsarClient.builder().serviceUrl(pulsar.getBrokerServiceUrl()).build();
+        PartitionedTopicMetadata response =
+                lookup.getPartitionedTopicMetadata(topicName, 
paramMetadataAutoCreationEnabled).join();
+        assertEquals(response.partitions, 0);
+        List<String> partitionedTopics = 
admin.topics().getPartitionedTopicList("public/default");
+        assertFalse(partitionedTopics.contains(topicNameStr));
+        List<String> topicList = admin.topics().getList("public/default");
+        for (int i = 0; i < 3; i++) {
+            assertFalse(topicList.contains(topicName.getPartition(i)));
+        }
+
+        // Verify: lookup semaphore has been releases.
+        Awaitility.await().untilAsserted(() -> {
+            int lookupPermitsAfter = semaphore.availablePermits();
+            assertEquals(lookupPermitsAfter, lookupPermitsBefore);
+        });
+
+        // Cleanup.
+        client.close();
+        admin.topics().delete(topicNameStr, false);
+    }
+
+    @Test(dataProvider = "autoCreationParamsAll")
+    public void testGetMetadataIfPartitionedTopicExists(boolean 
configAllowAutoTopicCreation,
+                                                        boolean 
paramMetadataAutoCreationEnabled,
+                                                        boolean 
isUsingHttpLookup,
+                                                        TopicDomain 
topicDomain) throws Exception {
+        conf.setAllowAutoTopicCreationType(TopicType.PARTITIONED);
+        conf.setDefaultNumPartitions(3);
+        conf.setAllowAutoTopicCreation(configAllowAutoTopicCreation);
+        setup();
+
+        Semaphore semaphore = 
pulsar.getBrokerService().getLookupRequestSemaphore();
+        int lookupPermitsBefore = semaphore.availablePermits();
+
+        LookupService lookup = getLookupService(isUsingHttpLookup);
+        // Create topic.
+        final String topicNameStr = 
BrokerTestUtil.newUniqueName(topicDomain.value() + "://" + DEFAULT_NS + "/tp");
+        final TopicName topicName = TopicName.get(topicNameStr);
+        admin.topics().createPartitionedTopic(topicNameStr, 3);
+        // Verify.
+        PulsarClient client = 
PulsarClient.builder().serviceUrl(pulsar.getBrokerServiceUrl()).build();
+        PartitionedTopicMetadata response =
+                lookup.getPartitionedTopicMetadata(topicName, 
paramMetadataAutoCreationEnabled).join();
+        assertEquals(response.partitions, 3);
+        List<String> topicList = admin.topics().getList("public/default");
+        assertFalse(topicList.contains(topicNameStr));
+
+        // Verify: lookup semaphore has been releases.
+        Awaitility.await().untilAsserted(() -> {
+            int lookupPermitsAfter = semaphore.availablePermits();
+            assertEquals(lookupPermitsAfter, lookupPermitsBefore);
+        });
+
+        // Cleanup.
+        client.close();
+        admin.topics().deletePartitionedTopic(topicNameStr, false);
+    }
+
+    @DataProvider(name = "clients")
+    public Object[][] clients(){
+        return new Object[][]{
+                // isUsingHttpLookup.
+                {true, TopicDomain.persistent},
+                {false, TopicDomain.non_persistent}
+        };
+    }
+
+    @Test(dataProvider = "clients")
+    public void testAutoCreatePartitionedTopic(boolean isUsingHttpLookup, 
TopicDomain topicDomain) throws Exception {
+        conf.setAllowAutoTopicCreationType(TopicType.PARTITIONED);
+        conf.setDefaultNumPartitions(3);
+        conf.setAllowAutoTopicCreation(true);
+        setup();
+
+        Semaphore semaphore = 
pulsar.getBrokerService().getLookupRequestSemaphore();
+        int lookupPermitsBefore = semaphore.availablePermits();
+
+        LookupService lookup = getLookupService(isUsingHttpLookup);
+        // Create topic.
+        final String topicNameStr = 
BrokerTestUtil.newUniqueName(topicDomain.value() + "://" + DEFAULT_NS + "/tp");
+        final TopicName topicName = TopicName.get(topicNameStr);
+        // Verify.
+        PulsarClient client = 
PulsarClient.builder().serviceUrl(pulsar.getBrokerServiceUrl()).build();
+        PartitionedTopicMetadata response = 
lookup.getPartitionedTopicMetadata(topicName, true).join();
+        assertEquals(response.partitions, 3);
+        List<String> partitionedTopics = 
admin.topics().getPartitionedTopicList("public/default");
+        assertTrue(partitionedTopics.contains(topicNameStr));
+        List<String> topicList = admin.topics().getList("public/default");
+        assertFalse(topicList.contains(topicNameStr));
+        for (int i = 0; i < 3; i++) {
+            // The API "getPartitionedTopicMetadata" only creates the 
partitioned metadata, it will not create the
+            // partitions.
+            assertFalse(topicList.contains(topicName.getPartition(i)));
+        }
+
+        // Verify: lookup semaphore has been releases.
+        Awaitility.await().untilAsserted(() -> {
+            int lookupPermitsAfter = semaphore.availablePermits();
+            assertEquals(lookupPermitsAfter, lookupPermitsBefore);
+        });
+
+        // Cleanup.
+        client.close();
+        admin.topics().deletePartitionedTopic(topicNameStr, false);
+    }
+
+    @Test(dataProvider = "clients")
+    public void testAutoCreateNonPartitionedTopic(boolean isUsingHttpLookup, 
TopicDomain topicDomain) throws Exception {
+        conf.setAllowAutoTopicCreationType(TopicType.NON_PARTITIONED);
+        conf.setAllowAutoTopicCreation(true);
+        setup();
+
+        Semaphore semaphore = 
pulsar.getBrokerService().getLookupRequestSemaphore();
+        int lookupPermitsBefore = semaphore.availablePermits();
+
+        LookupService lookup = getLookupService(isUsingHttpLookup);
+        // Create topic.
+        final String topicNameStr = 
BrokerTestUtil.newUniqueName(topicDomain.value() + "://" + DEFAULT_NS + "/tp");
+        final TopicName topicName = TopicName.get(topicNameStr);
+        // Verify.
+        PulsarClient client = 
PulsarClient.builder().serviceUrl(pulsar.getBrokerServiceUrl()).build();
+        PartitionedTopicMetadata response = 
lookup.getPartitionedTopicMetadata(topicName, true).join();
+        assertEquals(response.partitions, 0);
+        List<String> partitionedTopics = 
admin.topics().getPartitionedTopicList("public/default");
+        assertFalse(partitionedTopics.contains(topicNameStr));
+        List<String> topicList = admin.topics().getList("public/default");
+        assertFalse(topicList.contains(topicNameStr));
+
+        // Verify: lookup semaphore has been releases.
+        Awaitility.await().untilAsserted(() -> {
+            int lookupPermitsAfter = semaphore.availablePermits();
+            assertEquals(lookupPermitsAfter, lookupPermitsBefore);
+        });
+
+        // Cleanup.
+        client.close();
+        try {
+            admin.topics().delete(topicNameStr, false);
+        } catch (Exception ex) {}
+    }
+
+    @DataProvider(name = "autoCreationParamsNotAllow")
+    public Object[][] autoCreationParamsNotAllow(){
+        return new Object[][]{
+                // configAllowAutoTopicCreation, 
paramCreateIfAutoCreationEnabled, isUsingHttpLookup.
+                {true, false, true},
+                {true, false, false},
+                {false, false, true},
+                {false, false, false},
+                {false, true, true},
+                {false, true, false},
+        };
+    }
+
+    @Test(dataProvider = "autoCreationParamsNotAllow")
+    public void testGetMetadataIfNotAllowedCreate(boolean 
configAllowAutoTopicCreation,
+                                                  boolean 
paramMetadataAutoCreationEnabled,
+                                                  boolean isUsingHttpLookup) 
throws Exception {
+        if (!configAllowAutoTopicCreation && paramMetadataAutoCreationEnabled) 
{
+            // These test cases are for the following PR.
+            // Which was described in the Motivation of 
https://github.com/apache/pulsar/pull/22206.
+            return;
+        }
+        conf.setAllowAutoTopicCreationType(TopicType.PARTITIONED);
+        conf.setDefaultNumPartitions(3);
+        conf.setAllowAutoTopicCreation(configAllowAutoTopicCreation);
+        setup();
+
+        Semaphore semaphore = 
pulsar.getBrokerService().getLookupRequestSemaphore();
+        int lookupPermitsBefore = semaphore.availablePermits();
+
+        LookupService lookup = getLookupService(isUsingHttpLookup);
+        // Define topic.
+        final String topicNameStr = 
BrokerTestUtil.newUniqueName("persistent://" + DEFAULT_NS + "/tp");
+        final TopicName topicName = TopicName.get(topicNameStr);
+        // Verify.
+        PulsarClient client = 
PulsarClient.builder().serviceUrl(pulsar.getBrokerServiceUrl()).build();
+        try {
+            lookup.getPartitionedTopicMetadata(TopicName.get(topicNameStr), 
paramMetadataAutoCreationEnabled).join();
+            fail("Expect a not found exception");
+        } catch (Exception e) {
+            log.warn("", e);
+            Throwable unwrapEx = FutureUtil.unwrapCompletionException(e);
+            assertTrue(unwrapEx instanceof 
PulsarClientException.TopicDoesNotExistException
+                    || unwrapEx instanceof 
PulsarClientException.NotFoundException);
+        }
+
+        List<String> partitionedTopics = 
admin.topics().getPartitionedTopicList("public/default");
+        
pulsar.getPulsarResources().getNamespaceResources().getPartitionedTopicResources().partitionedTopicExists(topicName);
+        assertFalse(partitionedTopics.contains(topicNameStr));
+        List<String> topicList = admin.topics().getList("public/default");
+        assertFalse(topicList.contains(topicNameStr));
+        for (int i = 0; i < 3; i++) {
+            assertFalse(topicList.contains(topicName.getPartition(i)));
+        }
+
+        // Verify: lookup semaphore has been releases.
+        Awaitility.await().untilAsserted(() -> {
+            int lookupPermitsAfter = semaphore.availablePermits();
+            assertEquals(lookupPermitsAfter, lookupPermitsBefore);
+        });
+
+        // Cleanup.
+        client.close();
+    }
+
+    @DataProvider(name = "autoCreationParamsForNonPersistentTopic")
+    public Object[][] autoCreationParamsForNonPersistentTopic(){
+        return new Object[][]{
+                // configAllowAutoTopicCreation, 
paramCreateIfAutoCreationEnabled, isUsingHttpLookup.
+                {true, true, true},
+                {true, true, false},
+                {false, true, true},
+                {false, true, false},
+                {false, false, true}
+        };
+    }
+
+    /**
+     * Regarding the API "get partitioned metadata" about non-persistent topic.
+     * The original behavior is:
+     *   param-auto-create = true, broker-config-auto-create = true
+     *     HTTP API: default configuration {@link 
ServiceConfiguration#getDefaultNumPartitions()}
+     *     binary API: default configuration {@link 
ServiceConfiguration#getDefaultNumPartitions()}
+     *   param-auto-create = true, broker-config-auto-create = false
+     *     HTTP API: {partitions: 0}
+     *     binary API: {partitions: 0}
+     *   param-auto-create = false
+     *     HTTP API: not found error
+     *     binary API: not support
+     *  This test only guarantees that the behavior is the same as before. The 
following separated PR will fix the
+     *  incorrect behavior.
+     */
+    @Test(dataProvider = "autoCreationParamsForNonPersistentTopic")
+    public void testGetNonPersistentMetadataIfNotAllowedCreate(boolean 
configAllowAutoTopicCreation,
+                                                  boolean 
paramMetadataAutoCreationEnabled,
+                                                  boolean isUsingHttpLookup) 
throws Exception {
+        conf.setAllowAutoTopicCreationType(TopicType.PARTITIONED);
+        conf.setDefaultNumPartitions(3);
+        conf.setAllowAutoTopicCreation(configAllowAutoTopicCreation);
+        setup();
+
+        Semaphore semaphore = 
pulsar.getBrokerService().getLookupRequestSemaphore();
+        int lookupPermitsBefore = semaphore.availablePermits();
+
+        LookupService lookup = getLookupService(isUsingHttpLookup);
+        // Define topic.
+        final String topicNameStr = 
BrokerTestUtil.newUniqueName("non-persistent://" + DEFAULT_NS + "/tp");
+        final TopicName topicName = TopicName.get(topicNameStr);
+        // Verify.
+        // Regarding non-persistent topic, we do not know whether it exists or 
not.
+        // Broker will return a non-partitioned metadata if partitioned 
metadata does not exist.
+        PulsarClient client = 
PulsarClient.builder().serviceUrl(pulsar.getBrokerServiceUrl()).build();
+
+        if (!configAllowAutoTopicCreation && !paramMetadataAutoCreationEnabled 
&& isUsingHttpLookup) {
+            try {
+                
lookup.getPartitionedTopicMetadata(TopicName.get(topicNameStr), 
paramMetadataAutoCreationEnabled)
+                        .join();
+                Assert.fail("Expected a not found ex");
+            } catch (Exception ex) {
+                // Cleanup.
+                client.close();
+                return;
+            }
+        }
+
+        PartitionedTopicMetadata metadata = lookup
+                .getPartitionedTopicMetadata(TopicName.get(topicNameStr), 
paramMetadataAutoCreationEnabled).join();
+        if (configAllowAutoTopicCreation && paramMetadataAutoCreationEnabled) {
+            assertEquals(metadata.partitions, 3);
+        } else {
+            assertEquals(metadata.partitions, 0);
+        }
+
+        List<String> partitionedTopics = 
admin.topics().getPartitionedTopicList("public/default");
+        
pulsar.getPulsarResources().getNamespaceResources().getPartitionedTopicResources()
+                .partitionedTopicExists(topicName);
+        if (configAllowAutoTopicCreation && paramMetadataAutoCreationEnabled) {
+            assertTrue(partitionedTopics.contains(topicNameStr));
+        } else {
+            assertFalse(partitionedTopics.contains(topicNameStr));
+        }
+
+        // Verify: lookup semaphore has been releases.
+        Awaitility.await().untilAsserted(() -> {
+            int lookupPermitsAfter = semaphore.availablePermits();
+            assertEquals(lookupPermitsAfter, lookupPermitsBefore);
+        });
+
+        // Cleanup.
+        client.close();
+    }
+}
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TopicAutoCreationTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TopicAutoCreationTest.java
index bb4a23bf24b..55601ad4c6b 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TopicAutoCreationTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TopicAutoCreationTest.java
@@ -19,6 +19,7 @@
 package org.apache.pulsar.broker.admin;
 
 import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.anyBoolean;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.when;
 import static org.testng.Assert.assertEquals;
@@ -133,7 +134,7 @@ public class TopicAutoCreationTest extends 
ProducerConsumerBase {
             // we want to skip the "lookup" phase, because it is blocked by 
the HTTP API
             LookupService mockLookup = mock(LookupService.class);
             ((PulsarClientImpl) pulsarClient).setLookup(mockLookup);
-            when(mockLookup.getPartitionedTopicMetadata(any())).thenAnswer(
+            when(mockLookup.getPartitionedTopicMetadata(any(), 
anyBoolean())).thenAnswer(
                     i -> CompletableFuture.completedFuture(new 
PartitionedTopicMetadata(0)));
             when(mockLookup.getBroker(any())).thenAnswer(ignored -> {
                 InetSocketAddress brokerAddress =
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerServiceAutoTopicCreationTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerServiceAutoTopicCreationTest.java
index 0a6cffc7685..ea5365bcf4b 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerServiceAutoTopicCreationTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerServiceAutoTopicCreationTest.java
@@ -566,13 +566,13 @@ public class BrokerServiceAutoTopicCreationTest extends 
BrokerTestBase{
         try {
             
pulsarClient.newProducer().topic(ExtensibleLoadManagerImpl.BROKER_LOAD_DATA_STORE_TOPIC).create();
             Assert.fail("Create should have failed.");
-        } catch (PulsarClientException.TopicDoesNotExistException e) {
+        } catch (PulsarClientException.TopicDoesNotExistException | 
PulsarClientException.NotFoundException e) {
             // expected
         }
         try {
             
pulsarClient.newProducer().topic(ExtensibleLoadManagerImpl.TOP_BUNDLES_LOAD_DATA_STORE_TOPIC).create();
             Assert.fail("Create should have failed.");
-        } catch (PulsarClientException.TopicDoesNotExistException e) {
+        } catch (PulsarClientException.TopicDoesNotExistException | 
PulsarClientException.NotFoundException e) {
             // expected
         }
 
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerServiceTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerServiceTest.java
index 442f850bbb9..172842b5ed3 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerServiceTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerServiceTest.java
@@ -1038,12 +1038,12 @@ public class BrokerServiceTest extends BrokerTestBase {
             // for PMR
             // 2 lookup will succeed
             long reqId1 = reqId++;
-            ByteBuf request1 = Commands.newPartitionMetadataRequest(topicName, 
reqId1);
+            ByteBuf request1 = Commands.newPartitionMetadataRequest(topicName, 
reqId1, true);
             CompletableFuture<?> f1 = 
pool.getConnection(resolver.resolveHost())
                 .thenCompose(clientCnx -> clientCnx.newLookup(request1, 
reqId1));
 
             long reqId2 = reqId++;
-            ByteBuf request2 = Commands.newPartitionMetadataRequest(topicName, 
reqId2);
+            ByteBuf request2 = Commands.newPartitionMetadataRequest(topicName, 
reqId2, true);
             CompletableFuture<?> f2 = 
pool.getConnection(resolver.resolveHost())
                 .thenCompose(clientCnx -> {
                     CompletableFuture<?> future = 
clientCnx.newLookup(request2, reqId2);
@@ -1058,17 +1058,17 @@ public class BrokerServiceTest extends BrokerTestBase {
             // 3 lookup will fail
             latchRef.set(new CountDownLatch(1));
             long reqId3 = reqId++;
-            ByteBuf request3 = Commands.newPartitionMetadataRequest(topicName, 
reqId3);
+            ByteBuf request3 = Commands.newPartitionMetadataRequest(topicName, 
reqId3, true);
             f1 = pool.getConnection(resolver.resolveHost())
                 .thenCompose(clientCnx -> clientCnx.newLookup(request3, 
reqId3));
 
             long reqId4 = reqId++;
-            ByteBuf request4 = Commands.newPartitionMetadataRequest(topicName, 
reqId4);
+            ByteBuf request4 = Commands.newPartitionMetadataRequest(topicName, 
reqId4, true);
             f2 = pool.getConnection(resolver.resolveHost())
                 .thenCompose(clientCnx -> clientCnx.newLookup(request4, 
reqId4));
 
             long reqId5 = reqId++;
-            ByteBuf request5 = Commands.newPartitionMetadataRequest(topicName, 
reqId5);
+            ByteBuf request5 = Commands.newPartitionMetadataRequest(topicName, 
reqId5, true);
             CompletableFuture<?> f3 = 
pool.getConnection(resolver.resolveHost())
                 .thenCompose(clientCnx -> {
                     CompletableFuture<?> future = 
clientCnx.newLookup(request5, reqId5);
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerServiceThrottlingTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerServiceThrottlingTest.java
index 312bfe0fc8a..c6a94833c4c 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerServiceThrottlingTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerServiceThrottlingTest.java
@@ -198,7 +198,7 @@ public class BrokerServiceThrottlingTest extends 
BrokerTestBase {
             for (int i = 0; i < totalConsumers; i++) {
                 long reqId = 0xdeadbeef + i;
                 Future<?> f = executor.submit(() -> {
-                        ByteBuf request = 
Commands.newPartitionMetadataRequest(topicName, reqId);
+                        ByteBuf request = 
Commands.newPartitionMetadataRequest(topicName, reqId, true);
                         pool.getConnection(resolver.resolveHost())
                             .thenCompose(clientCnx -> 
clientCnx.newLookup(request, reqId))
                             .get();
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ServerCnxTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ServerCnxTest.java
index 1cb2f76c5e2..5387bc4998c 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ServerCnxTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ServerCnxTest.java
@@ -3571,7 +3571,7 @@ public class ServerCnxTest {
         doReturn(false).when(pulsar).isRunning();
         assertTrue(channel.isActive());
 
-        ByteBuf clientCommand = 
Commands.newPartitionMetadataRequest(successTopicName, 1);
+        ByteBuf clientCommand = 
Commands.newPartitionMetadataRequest(successTopicName, 1, true);
         channel.writeInbound(clientCommand);
         Object response = getResponse();
         assertTrue(response instanceof 
CommandPartitionedTopicMetadataResponse);
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/buffer/TransactionLowWaterMarkTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/buffer/TransactionLowWaterMarkTest.java
index aa7240a59f9..6e121aca381 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/buffer/TransactionLowWaterMarkTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/buffer/TransactionLowWaterMarkTest.java
@@ -148,7 +148,7 @@ public class TransactionLowWaterMarkTest extends 
TransactionTestBase {
 
         PartitionedTopicMetadata partitionedTopicMetadata =
                 ((PulsarClientImpl) pulsarClient).getLookup()
-                        
.getPartitionedTopicMetadata(SystemTopicNames.TRANSACTION_COORDINATOR_ASSIGN).get();
+                        
.getPartitionedTopicMetadata(SystemTopicNames.TRANSACTION_COORDINATOR_ASSIGN, 
false).get();
         Transaction lowWaterMarkTxn = null;
         for (int i = 0; i < partitionedTopicMetadata.partitions; i++) {
             lowWaterMarkTxn = pulsarClient.newTransaction()
@@ -253,7 +253,7 @@ public class TransactionLowWaterMarkTest extends 
TransactionTestBase {
 
         PartitionedTopicMetadata partitionedTopicMetadata =
                 ((PulsarClientImpl) pulsarClient).getLookup()
-                        
.getPartitionedTopicMetadata(SystemTopicNames.TRANSACTION_COORDINATOR_ASSIGN).get();
+                        
.getPartitionedTopicMetadata(SystemTopicNames.TRANSACTION_COORDINATOR_ASSIGN, 
false).get();
         Transaction lowWaterMarkTxn = null;
         for (int i = 0; i < partitionedTopicMetadata.partitions; i++) {
             lowWaterMarkTxn = pulsarClient.newTransaction()
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/BrokerServiceLookupTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/BrokerServiceLookupTest.java
index 0ad0b01dc1c..336728f279e 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/BrokerServiceLookupTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/BrokerServiceLookupTest.java
@@ -931,7 +931,7 @@ public class BrokerServiceLookupTest extends 
ProducerConsumerBase {
         // Verify the request is works after merge the requests.
         List<CompletableFuture<PartitionedTopicMetadata>> futures = new 
ArrayList<>();
         for (int i = 0; i < 100; i++) {
-            
futures.add(lookupService.getPartitionedTopicMetadata(TopicName.get(tpName)));
+            
futures.add(lookupService.getPartitionedTopicMetadata(TopicName.get(tpName), 
false));
         }
         for (CompletableFuture<PartitionedTopicMetadata> future : futures) {
             assertEquals(future.join().partitions, topicPartitions);
diff --git 
a/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/PulsarClient.java
 
b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/PulsarClient.java
index 78952fcaed8..6c46bce254f 100644
--- 
a/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/PulsarClient.java
+++ 
b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/PulsarClient.java
@@ -308,14 +308,33 @@ public interface PulsarClient extends Closeable {
      *
      * <p>This can be used to discover the partitions and create {@link 
Reader}, {@link Consumer} or {@link Producer}
      * instances directly on a particular partition.
-     *
+     * @Deprecated it is not suggested to use now; please use {@link 
#getPartitionsForTopic(String, boolean)}.
      * @param topic
      *            the topic name
      * @return a future that will yield a list of the topic partitions or 
{@link PulsarClientException} if there was any
      *         error in the operation.
+     *
      * @since 2.3.0
      */
-    CompletableFuture<List<String>> getPartitionsForTopic(String topic);
+    @Deprecated
+    default CompletableFuture<List<String>> getPartitionsForTopic(String 
topic) {
+        return getPartitionsForTopic(topic, true);
+    }
+
+    /**
+     * 1. Get the partitions if the topic exists. Return "[{partition-0}, 
{partition-1}....{partition-n}}]" if a
+     *   partitioned topic exists; return "[{topic}]" if a non-partitioned 
topic exists.
+     * 2. When {@param metadataAutoCreationEnabled} is "false", neither the 
partitioned topic nor non-partitioned
+     *   topic does not exist. You will get an {@link 
PulsarClientException.NotFoundException} or a
+     *   {@link PulsarClientException.TopicDoesNotExistException}.
+     *  2-1. You will get a {@link 
PulsarClientException.NotSupportedException} with 
metadataAutoCreationEnabled=false
+     *    on an old broker version which does not support getting partitions 
without partitioned metadata auto-creation.
+     * 3. When {@param metadataAutoCreationEnabled} is "true," it will trigger 
an auto-creation for this topic(using
+     *   the default topic auto-creation strategy you set for the broker), and 
the corresponding result is returned.
+     *   For the result, see case 1.
+     * @version 3.3.0.
+     */
+    CompletableFuture<List<String>> getPartitionsForTopic(String topic, 
boolean metadataAutoCreationEnabled);
 
     /**
      * Close the PulsarClient and release all the resources.
diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/BinaryProtoLookupService.java
 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/BinaryProtoLookupService.java
index 9c9371b09cb..b363d6e4366 100644
--- 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/BinaryProtoLookupService.java
+++ 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/BinaryProtoLookupService.java
@@ -144,12 +144,14 @@ public class BinaryProtoLookupService implements 
LookupService {
      * calls broker binaryProto-lookup api to get metadata of 
partitioned-topic.
      *
      */
-    public CompletableFuture<PartitionedTopicMetadata> 
getPartitionedTopicMetadata(TopicName topicName) {
+    @Override
+    public CompletableFuture<PartitionedTopicMetadata> 
getPartitionedTopicMetadata(
+            TopicName topicName, boolean metadataAutoCreationEnabled) {
         final MutableObject<CompletableFuture> newFutureCreated = new 
MutableObject<>();
         try {
             return partitionedMetadataInProgress.computeIfAbsent(topicName, 
tpName -> {
-                CompletableFuture<PartitionedTopicMetadata> newFuture =
-                        
getPartitionedTopicMetadata(serviceNameResolver.resolveHost(), topicName);
+                CompletableFuture<PartitionedTopicMetadata> newFuture = 
getPartitionedTopicMetadata(
+                        serviceNameResolver.resolveHost(), topicName, 
metadataAutoCreationEnabled);
                 newFutureCreated.setValue(newFuture);
                 return newFuture;
             });
@@ -246,14 +248,15 @@ public class BinaryProtoLookupService implements 
LookupService {
     }
 
     private CompletableFuture<PartitionedTopicMetadata> 
getPartitionedTopicMetadata(InetSocketAddress socketAddress,
-            TopicName topicName) {
+            TopicName topicName, boolean metadataAutoCreationEnabled) {
 
         long startTime = System.nanoTime();
         CompletableFuture<PartitionedTopicMetadata> partitionFuture = new 
CompletableFuture<>();
 
         client.getCnxPool().getConnection(socketAddress).thenAccept(clientCnx 
-> {
             long requestId = client.newRequestId();
-            ByteBuf request = 
Commands.newPartitionMetadataRequest(topicName.toString(), requestId);
+            ByteBuf request = 
Commands.newPartitionMetadataRequest(topicName.toString(), requestId,
+                    metadataAutoCreationEnabled);
             clientCnx.newLookup(request, requestId).whenComplete((r, t) -> {
                 if (t != null) {
                     histoGetTopicMetadata.recordFailure(System.nanoTime() - 
startTime);
diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBuilderImpl.java
 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBuilderImpl.java
index 7686d0072cf..7735f66e783 100644
--- 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBuilderImpl.java
+++ 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBuilderImpl.java
@@ -136,9 +136,9 @@ public class ConsumerBuilderImpl<T> implements 
ConsumerBuilder<T> {
             if (deadLetterPolicy == null || 
StringUtils.isBlank(deadLetterPolicy.getRetryLetterTopic())
                     || 
StringUtils.isBlank(deadLetterPolicy.getDeadLetterTopic())) {
                 CompletableFuture<PartitionedTopicMetadata> 
retryLetterTopicMetadata =
-                        
client.getPartitionedTopicMetadata(oldRetryLetterTopic);
+                        
client.getPartitionedTopicMetadata(oldRetryLetterTopic, true);
                 CompletableFuture<PartitionedTopicMetadata> 
deadLetterTopicMetadata =
-                        client.getPartitionedTopicMetadata(oldDeadLetterTopic);
+                        client.getPartitionedTopicMetadata(oldDeadLetterTopic, 
true);
                 applyDLQConfig = 
CompletableFuture.allOf(retryLetterTopicMetadata, deadLetterTopicMetadata)
                         .thenAccept(__ -> {
                             String retryLetterTopic = topicFirst + "-" + 
conf.getSubscriptionName()
diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/HttpLookupService.java
 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/HttpLookupService.java
index ea13930411d..44ef4ac17ee 100644
--- 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/HttpLookupService.java
+++ 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/HttpLookupService.java
@@ -136,12 +136,14 @@ public class HttpLookupService implements LookupService {
     }
 
     @Override
-    public CompletableFuture<PartitionedTopicMetadata> 
getPartitionedTopicMetadata(TopicName topicName) {
+    public CompletableFuture<PartitionedTopicMetadata> 
getPartitionedTopicMetadata(
+            TopicName topicName, boolean metadataAutoCreationEnabled) {
         long startTime = System.nanoTime();
 
         String format = topicName.isV2() ? "admin/v2/%s/partitions" : 
"admin/%s/partitions";
         CompletableFuture<PartitionedTopicMetadata> httpFuture =  
httpClient.get(
-                String.format(format, topicName.getLookupName()) + 
"?checkAllowAutoCreation=true",
+                String.format(format, topicName.getLookupName()) + 
"?checkAllowAutoCreation="
+                        + metadataAutoCreationEnabled,
                 PartitionedTopicMetadata.class);
 
         httpFuture.thenRun(() -> {
diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/LookupService.java 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/LookupService.java
index 4d59d6591db..ccd1f6b23f2 100644
--- 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/LookupService.java
+++ 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/LookupService.java
@@ -60,11 +60,30 @@ public interface LookupService extends AutoCloseable {
 
     /**
      * Returns {@link PartitionedTopicMetadata} for a given topic.
-     *
-     * @param topicName topic-name
-     * @return
+     * Note: this method will try to create the topic partitioned metadata if 
it does not exist.
+     * @deprecated Please call {{@link #getPartitionedTopicMetadata(TopicName, 
boolean)}}.
+     */
+    @Deprecated
+    default CompletableFuture<PartitionedTopicMetadata> 
getPartitionedTopicMetadata(TopicName topicName) {
+        return getPartitionedTopicMetadata(topicName, true);
+    }
+
+    /**
+     * 1.Get the partitions if the topic exists. Return "{partition: n}" if a 
partitioned topic exists;
+     *  return "{partition: 0}" if a non-partitioned topic exists.
+     * 2. When {@param metadataAutoCreationEnabled} is "false," neither 
partitioned topic nor non-partitioned topic
+     *   does not exist. You will get a {@link 
PulsarClientException.NotFoundException} or
+     *   a {@link PulsarClientException.TopicDoesNotExistException}.
+     *  2-1. You will get a {@link 
PulsarClientException.NotSupportedException} with 
metadataAutoCreationEnabled=false
+     *       on an old broker version which does not support getting 
partitions without partitioned metadata
+     *       auto-creation.
+     * 3.When {@param metadataAutoCreationEnabled} is "true," it will trigger 
an auto-creation for this topic(using
+     *  the default topic auto-creation strategy you set for the broker), and 
the corresponding result is returned.
+     *  For the result, see case 1.
+     * @version 3.3.0.
      */
-    CompletableFuture<PartitionedTopicMetadata> 
getPartitionedTopicMetadata(TopicName topicName);
+    CompletableFuture<PartitionedTopicMetadata> 
getPartitionedTopicMetadata(TopicName topicName,
+                                                                            
boolean metadataAutoCreationEnabled);
 
     /**
      * Returns current SchemaInfo {@link SchemaInfo} for a given topic.
diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java
 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java
index 20fd03d6a28..8047e05351a 100644
--- 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java
+++ 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java
@@ -954,7 +954,7 @@ public class MultiTopicsConsumerImpl<T> extends 
ConsumerBase<T> {
 
         CompletableFuture<Void> subscribeResult = new CompletableFuture<>();
 
-        client.getPartitionedTopicMetadata(topicName)
+        client.getPartitionedTopicMetadata(topicName, true)
                 .thenAccept(metadata -> 
subscribeTopicPartitions(subscribeResult, fullTopicName, metadata.partitions,
                     createTopicIfDoesNotExist))
                 .exceptionally(ex1 -> {
diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java
 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java
index 1e3964ed750..b93786bb4b1 100644
--- 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java
+++ 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java
@@ -387,7 +387,7 @@ public class PulsarClientImpl implements PulsarClient {
                                                                    
ProducerInterceptors interceptors) {
         CompletableFuture<Producer<T>> producerCreatedFuture = new 
CompletableFuture<>();
 
-        getPartitionedTopicMetadata(topic).thenAccept(metadata -> {
+        getPartitionedTopicMetadata(topic, true).thenAccept(metadata -> {
             if (log.isDebugEnabled()) {
                 log.debug("[{}] Received topic metadata. partitions: {}", 
topic, metadata.partitions);
             }
@@ -529,7 +529,7 @@ public class PulsarClientImpl implements PulsarClient {
 
         String topic = conf.getSingleTopic();
 
-        getPartitionedTopicMetadata(topic).thenAccept(metadata -> {
+        getPartitionedTopicMetadata(topic, true).thenAccept(metadata -> {
             if (log.isDebugEnabled()) {
                 log.debug("[{}] Received topic metadata. partitions: {}", 
topic, metadata.partitions);
             }
@@ -669,7 +669,7 @@ public class PulsarClientImpl implements PulsarClient {
 
         CompletableFuture<Reader<T>> readerFuture = new CompletableFuture<>();
 
-        getPartitionedTopicMetadata(topic).thenAccept(metadata -> {
+        getPartitionedTopicMetadata(topic, true).thenAccept(metadata -> {
             if (log.isDebugEnabled()) {
                 log.debug("[{}] Received topic metadata. partitions: {}", 
topic, metadata.partitions);
             }
@@ -1090,11 +1090,8 @@ public class PulsarClientImpl implements PulsarClient {
         }
     }
 
-    public CompletableFuture<Integer> getNumberOfPartitions(String topic) {
-        return getPartitionedTopicMetadata(topic).thenApply(metadata -> 
metadata.partitions);
-    }
-
-    public CompletableFuture<PartitionedTopicMetadata> 
getPartitionedTopicMetadata(String topic) {
+    public CompletableFuture<PartitionedTopicMetadata> 
getPartitionedTopicMetadata(
+            String topic, boolean metadataAutoCreationEnabled) {
 
         CompletableFuture<PartitionedTopicMetadata> metadataFuture = new 
CompletableFuture<>();
 
@@ -1107,7 +1104,7 @@ public class PulsarClientImpl implements PulsarClient {
                     .setMax(conf.getMaxBackoffIntervalNanos(), 
TimeUnit.NANOSECONDS)
                     .create();
             getPartitionedTopicMetadata(topicName, backoff, opTimeoutMs,
-                                        metadataFuture, new ArrayList<>());
+                                        metadataFuture, new ArrayList<>(), 
metadataAutoCreationEnabled);
         } catch (IllegalArgumentException e) {
             return FutureUtil.failedFuture(new 
PulsarClientException.InvalidConfigurationException(e.getMessage()));
         }
@@ -1118,15 +1115,19 @@ public class PulsarClientImpl implements PulsarClient {
                                              Backoff backoff,
                                              AtomicLong remainingTime,
                                              
CompletableFuture<PartitionedTopicMetadata> future,
-                                             List<Throwable> 
previousExceptions) {
+                                             List<Throwable> 
previousExceptions,
+                                             boolean 
metadataAutoCreationEnabled) {
         long startTime = System.nanoTime();
-        
lookup.getPartitionedTopicMetadata(topicName).thenAccept(future::complete).exceptionally(e
 -> {
+        CompletableFuture<PartitionedTopicMetadata> queryFuture =
+                lookup.getPartitionedTopicMetadata(topicName, 
metadataAutoCreationEnabled);
+        queryFuture.thenAccept(future::complete).exceptionally(e -> {
             remainingTime.addAndGet(-1 * 
TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startTime));
             long nextDelay = Math.min(backoff.next(), remainingTime.get());
             // skip retry scheduler when set lookup throttle in client or 
server side which will lead to
             // `TooManyRequestsException`
             boolean isLookupThrottling = 
!PulsarClientException.isRetriableError(e.getCause())
-                || e.getCause() instanceof 
PulsarClientException.AuthenticationException;
+                || e.getCause() instanceof 
PulsarClientException.AuthenticationException
+                || e.getCause() instanceof 
PulsarClientException.NotFoundException;
             if (nextDelay <= 0 || isLookupThrottling) {
                 PulsarClientException.setPreviousExceptions(e, 
previousExceptions);
                 future.completeExceptionally(e);
@@ -1138,15 +1139,16 @@ public class PulsarClientImpl implements PulsarClient {
                 log.warn("[topic: {}] Could not get connection while 
getPartitionedTopicMetadata -- "
                         + "Will try again in {} ms", topicName, nextDelay);
                 remainingTime.addAndGet(-nextDelay);
-                getPartitionedTopicMetadata(topicName, backoff, remainingTime, 
future, previousExceptions);
+                getPartitionedTopicMetadata(topicName, backoff, remainingTime, 
future, previousExceptions,
+                        metadataAutoCreationEnabled);
             }, nextDelay, TimeUnit.MILLISECONDS);
             return null;
         });
     }
 
     @Override
-    public CompletableFuture<List<String>> getPartitionsForTopic(String topic) 
{
-        return getPartitionedTopicMetadata(topic).thenApply(metadata -> {
+    public CompletableFuture<List<String>> getPartitionsForTopic(String topic, 
boolean metadataAutoCreationEnabled) {
+        return getPartitionedTopicMetadata(topic, 
metadataAutoCreationEnabled).thenApply(metadata -> {
             if (metadata.partitions > 0) {
                 TopicName topicName = TopicName.get(topic);
                 List<String> partitions = new ArrayList<>(metadata.partitions);
diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/transaction/TransactionCoordinatorClientImpl.java
 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/transaction/TransactionCoordinatorClientImpl.java
index 9e79fc203c2..499627f9c73 100644
--- 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/transaction/TransactionCoordinatorClientImpl.java
+++ 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/transaction/TransactionCoordinatorClientImpl.java
@@ -79,7 +79,8 @@ public class TransactionCoordinatorClientImpl implements 
TransactionCoordinatorC
     @Override
     public CompletableFuture<Void> startAsync() {
         if (STATE_UPDATER.compareAndSet(this, State.NONE, State.STARTING)) {
-            return 
pulsarClient.getLookup().getPartitionedTopicMetadata(SystemTopicNames.TRANSACTION_COORDINATOR_ASSIGN)
+            return pulsarClient.getLookup()
+                
.getPartitionedTopicMetadata(SystemTopicNames.TRANSACTION_COORDINATOR_ASSIGN, 
true)
                 .thenCompose(partitionMeta -> {
                     List<CompletableFuture<Void>> connectFutureList = new 
ArrayList<>();
                     if (LOG.isDebugEnabled()) {
diff --git 
a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImplTest.java
 
b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImplTest.java
index febec2bff32..191124bb7b0 100644
--- 
a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImplTest.java
+++ 
b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImplTest.java
@@ -22,6 +22,7 @@ import static 
org.apache.pulsar.client.impl.ClientTestFixtures.createDelayedComp
 import static 
org.apache.pulsar.client.impl.ClientTestFixtures.createExceptionFuture;
 import static 
org.apache.pulsar.client.impl.ClientTestFixtures.createPulsarClientMockWithMockedClientCnx;
 import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.anyBoolean;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.times;
 import static org.mockito.Mockito.verify;
@@ -153,7 +154,8 @@ public class MultiTopicsConsumerImplTest {
         int completionDelayMillis = 100;
         Schema<byte[]> schema = Schema.BYTES;
         PulsarClientImpl clientMock = 
createPulsarClientMockWithMockedClientCnx(executorProvider, internalExecutor);
-        
when(clientMock.getPartitionedTopicMetadata(any())).thenAnswer(invocation -> 
createDelayedCompletedFuture(
+        when(clientMock.getPartitionedTopicMetadata(any(), anyBoolean()))
+                .thenAnswer(invocation -> createDelayedCompletedFuture(
                 new PartitionedTopicMetadata(), completionDelayMillis));
         MultiTopicsConsumerImpl<byte[]> impl = new 
MultiTopicsConsumerImpl<byte[]>(
                 clientMock, consumerConfData, executorProvider,
@@ -201,7 +203,8 @@ public class MultiTopicsConsumerImplTest {
         int completionDelayMillis = 10;
         Schema<byte[]> schema = Schema.BYTES;
         PulsarClientImpl clientMock = 
createPulsarClientMockWithMockedClientCnx(executorProvider, internalExecutor);
-        
when(clientMock.getPartitionedTopicMetadata(any())).thenAnswer(invocation -> 
createExceptionFuture(
+        when(clientMock.getPartitionedTopicMetadata(any(), anyBoolean()))
+                .thenAnswer(invocation -> createExceptionFuture(
                 new PulsarClientException.InvalidConfigurationException("a 
mock exception"), completionDelayMillis));
         CompletableFuture<Consumer<byte[]>> completeFuture = new 
CompletableFuture<>();
         MultiTopicsConsumerImpl<byte[]> impl = new 
MultiTopicsConsumerImpl<byte[]>(clientMock, consumerConfData,
@@ -237,7 +240,8 @@ public class MultiTopicsConsumerImplTest {
 
         // Simulate non partitioned topics
         PartitionedTopicMetadata metadata = new PartitionedTopicMetadata(0);
-        
when(clientMock.getPartitionedTopicMetadata(any())).thenReturn(CompletableFuture.completedFuture(metadata));
+        when(clientMock.getPartitionedTopicMetadata(any(), anyBoolean()))
+                .thenReturn(CompletableFuture.completedFuture(metadata));
         CompletableFuture<Consumer<byte[]>> completeFuture = new 
CompletableFuture<>();
 
         MultiTopicsConsumerImpl<byte[]> impl = new MultiTopicsConsumerImpl<>(
@@ -248,7 +252,7 @@ public class MultiTopicsConsumerImplTest {
 
         // getPartitionedTopicMetadata should have been called only the first 
time, for each of the 3 topics,
         // but not anymore since the topics are not partitioned.
-        verify(clientMock, times(3)).getPartitionedTopicMetadata(any());
+        verify(clientMock, times(3)).getPartitionedTopicMetadata(any(), 
anyBoolean());
     }
 
 }
diff --git 
a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/PulsarClientImplTest.java
 
b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/PulsarClientImplTest.java
index 274b9b4f2d5..3e897ed89f2 100644
--- 
a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/PulsarClientImplTest.java
+++ 
b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/PulsarClientImplTest.java
@@ -19,6 +19,7 @@
 package org.apache.pulsar.client.impl;
 
 import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.anyBoolean;
 import static org.mockito.ArgumentMatchers.anyInt;
 import static org.mockito.ArgumentMatchers.anyLong;
 import static org.mockito.ArgumentMatchers.nullable;
@@ -107,7 +108,7 @@ public class PulsarClientImplTest {
                 nullable(String.class)))
                 .thenReturn(CompletableFuture.completedFuture(
                         new GetTopicsResult(Collections.emptyList(), null, 
false, true)));
-        when(lookup.getPartitionedTopicMetadata(any(TopicName.class)))
+        when(lookup.getPartitionedTopicMetadata(any(TopicName.class), 
anyBoolean()))
                 .thenReturn(CompletableFuture.completedFuture(new 
PartitionedTopicMetadata()));
         when(lookup.getBroker(any()))
                 .thenReturn(CompletableFuture.completedFuture(new 
LookupTopicResult(
diff --git 
a/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/Commands.java 
b/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/Commands.java
index 65674af0ae1..53907e61914 100644
--- 
a/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/Commands.java
+++ 
b/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/Commands.java
@@ -190,6 +190,7 @@ public class Commands {
         flags.setSupportsAuthRefresh(true);
         flags.setSupportsBrokerEntryMetadata(true);
         flags.setSupportsPartialProducer(true);
+        flags.setSupportsGetPartitionedMetadataWithoutAutoCreation(true);
     }
 
     public static ByteBuf newConnect(String authMethodName, String authData, 
int protocolVersion, String libVersion,
@@ -910,11 +911,13 @@ public class Commands {
         return serializeWithSize(newPartitionMetadataResponseCommand(error, 
errorMsg, requestId));
     }
 
-    public static ByteBuf newPartitionMetadataRequest(String topic, long 
requestId) {
+    public static ByteBuf newPartitionMetadataRequest(String topic, long 
requestId,
+                                                      boolean 
metadataAutoCreationEnabled) {
         BaseCommand cmd = localCmd(Type.PARTITIONED_METADATA);
         cmd.setPartitionMetadata()
                 .setTopic(topic)
-                .setRequestId(requestId);
+                .setRequestId(requestId)
+                .setMetadataAutoCreationEnabled(metadataAutoCreationEnabled);
         return serializeWithSize(cmd);
     }
 
diff --git 
a/pulsar-common/src/main/java/org/apache/pulsar/common/util/FutureUtil.java 
b/pulsar-common/src/main/java/org/apache/pulsar/common/util/FutureUtil.java
index f6fcb12f359..0628d494af3 100644
--- a/pulsar-common/src/main/java/org/apache/pulsar/common/util/FutureUtil.java
+++ b/pulsar-common/src/main/java/org/apache/pulsar/common/util/FutureUtil.java
@@ -199,9 +199,9 @@ public class FutureUtil {
 
     public static Throwable unwrapCompletionException(Throwable ex) {
         if (ex instanceof CompletionException) {
-            return ex.getCause();
+            return unwrapCompletionException(ex.getCause());
         } else if (ex instanceof ExecutionException) {
-            return ex.getCause();
+            return unwrapCompletionException(ex.getCause());
         } else {
             return ex;
         }
diff --git a/pulsar-common/src/main/proto/PulsarApi.proto 
b/pulsar-common/src/main/proto/PulsarApi.proto
index 387e4e3ff67..5a7eb582eb5 100644
--- a/pulsar-common/src/main/proto/PulsarApi.proto
+++ b/pulsar-common/src/main/proto/PulsarApi.proto
@@ -300,6 +300,7 @@ message FeatureFlags {
   optional bool supports_broker_entry_metadata = 2 [default = false];
   optional bool supports_partial_producer = 3 [default = false];
   optional bool supports_topic_watchers = 4 [default = false];
+  optional bool supports_get_partitioned_metadata_without_auto_creation = 5 
[default = false];
 }
 
 message CommandConnected {
@@ -413,6 +414,7 @@ message CommandPartitionedTopicMetadata {
     // to the proxy.
     optional string original_auth_data = 4;
     optional string original_auth_method = 5;
+    optional bool metadata_auto_creation_enabled = 6 [default = true];
 }
 
 message CommandPartitionedTopicMetadataResponse {
diff --git 
a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/LookupProxyHandler.java
 
b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/LookupProxyHandler.java
index f76adadcc3e..03975e153ac 100644
--- 
a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/LookupProxyHandler.java
+++ 
b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/LookupProxyHandler.java
@@ -241,7 +241,8 @@ public class LookupProxyHandler {
             // Connected to backend broker
             long requestId = proxyConnection.newRequestId();
             ByteBuf command;
-            command = 
Commands.newPartitionMetadataRequest(topicName.toString(), requestId);
+            command = 
Commands.newPartitionMetadataRequest(topicName.toString(), requestId,
+                    partitionMetadata.isMetadataAutoCreationEnabled());
             clientCnx.newLookup(command, requestId).whenComplete((r, t) -> {
                 if (t != null) {
                     log.warn("[{}] failed to get Partitioned metadata : {}", 
topicName.toString(),

Reply via email to