This is an automated email from the ASF dual-hosted git repository.

yubiao pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 9aed73653e1 [fix] [broker] response not-found error if topic does not 
exist when calling getPartitionedTopicMetadata (#22838)
9aed73653e1 is described below

commit 9aed73653e1f706e3517072cce4a352d0838f8d7
Author: fengyubiao <[email protected]>
AuthorDate: Mon Jun 17 23:39:08 2024 +0800

    [fix] [broker] response not-found error if topic does not exist when 
calling getPartitionedTopicMetadata (#22838)
---
 .../broker/admin/impl/PersistentTopicsBase.java    |  21 +-
 .../broker/admin/v2/NonPersistentTopics.java       |  16 +-
 .../pulsar/broker/lookup/TopicLookupBase.java      |  22 +-
 .../pulsar/broker/namespace/NamespaceService.java  | 101 +++-
 .../pulsar/broker/namespace/TopicExistsInfo.java   |  82 +++
 .../pulsar/broker/service/BrokerService.java       | 117 ++--
 .../apache/pulsar/broker/service/ServerCnx.java    |  81 +--
 .../admin/GetPartitionMetadataMultiBrokerTest.java | 222 ++++++++
 .../broker/admin/GetPartitionMetadataTest.java     | 608 +++++++++++----------
 .../org/apache/pulsar/broker/admin/TopicsTest.java |  13 +-
 .../broker/lookup/http/HttpTopicLookupv2Test.java  |  19 +-
 .../broker/namespace/NamespaceServiceTest.java     |   7 +-
 .../apache/pulsar/broker/service/TopicGCTest.java  |   2 +
 .../pulsar/client/impl/ConsumerBuilderImpl.java    |  37 +-
 14 files changed, 899 insertions(+), 449 deletions(-)

diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
index 2f2a899950a..beb8ecc8d79 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
@@ -561,13 +561,13 @@ public class PersistentTopicsBase extends AdminResource {
                         // is a non-partitioned topic so we shouldn't check if 
the topic exists.
                         return 
pulsar().getBrokerService().isAllowAutoTopicCreationAsync(topicName)
                                 .thenCompose(brokerAllowAutoTopicCreation -> {
-                            if (checkAllowAutoCreation) {
+                            if (checkAllowAutoCreation && 
brokerAllowAutoTopicCreation) {
                                 // Whether it exists or not, auto create a 
non-partitioned topic by client.
                                 return 
CompletableFuture.completedFuture(metadata);
                             } else {
                                 // If it does not exist, response a Not Found 
error.
                                 // Otherwise, response a non-partitioned 
metadata.
-                                return 
internalCheckTopicExists(topicName).thenApply(__ -> metadata);
+                                return 
internalCheckNonPartitionedTopicExists(topicName).thenApply(__ -> metadata);
                             }
                         });
                     }
@@ -715,6 +715,17 @@ public class PersistentTopicsBase extends AdminResource {
 
     protected CompletableFuture<Void> internalCheckTopicExists(TopicName 
topicName) {
         return pulsar().getNamespaceService().checkTopicExists(topicName)
+                .thenAccept(info -> {
+                    boolean exists = info.isExists();
+                    info.recycle();
+                    if (!exists) {
+                        throw new RestException(Status.NOT_FOUND, 
getTopicNotFoundErrorMessage(topicName.toString()));
+                    }
+                });
+    }
+
+    protected CompletableFuture<Void> 
internalCheckNonPartitionedTopicExists(TopicName topicName) {
+        return 
pulsar().getNamespaceService().checkNonPartitionedTopicExists(topicName)
                 .thenAccept(exist -> {
                     if (!exist) {
                         throw new RestException(Status.NOT_FOUND, 
getTopicNotFoundErrorMessage(topicName.toString()));
@@ -5338,8 +5349,10 @@ public class PersistentTopicsBase extends AdminResource {
                             "Only persistent topic can be set as shadow 
topic"));
                 }
                 
futures.add(pulsar().getNamespaceService().checkTopicExists(shadowTopicName)
-                        .thenAccept(isExists -> {
-                            if (!isExists) {
+                        .thenAccept(info -> {
+                            boolean exists = info.isExists();
+                            info.recycle();
+                            if (!exists) {
                                 throw new 
RestException(Status.PRECONDITION_FAILED,
                                         "Shadow topic [" + shadowTopic + "] 
not exists.");
                             }
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/NonPersistentTopics.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/NonPersistentTopics.java
index 5a7ea1b7632..9f58aa4ca9d 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/NonPersistentTopics.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/NonPersistentTopics.java
@@ -98,8 +98,20 @@ public class NonPersistentTopics extends PersistentTopics {
             @QueryParam("authoritative") @DefaultValue("false") boolean 
authoritative,
             @ApiParam(value = "Is check configuration required to 
automatically create topic")
             @QueryParam("checkAllowAutoCreation") @DefaultValue("false") 
boolean checkAllowAutoCreation) {
-        super.getPartitionedMetadata(asyncResponse, tenant, namespace, 
encodedTopic, authoritative,
-                checkAllowAutoCreation);
+        validateTopicName(tenant, namespace, encodedTopic);
+        validateTopicOwnershipAsync(topicName, 
authoritative).whenComplete((__, ex) -> {
+            if (ex != null) {
+                Throwable actEx = FutureUtil.unwrapCompletionException(ex);
+                if (isNot307And404Exception(actEx)) {
+                    log.error("[{}] Failed to get internal stats for topic 
{}", clientAppId(), topicName, ex);
+                }
+                resumeAsyncResponseExceptionally(asyncResponse, actEx);
+            } else {
+                // "super.getPartitionedMetadata" will handle error itself.
+                super.getPartitionedMetadata(asyncResponse, tenant, namespace, 
encodedTopic, authoritative,
+                        checkAllowAutoCreation);
+            }
+        });
     }
 
     @GET
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/lookup/TopicLookupBase.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/lookup/TopicLookupBase.java
index 7b2c7774148..9a05c3d992a 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/lookup/TopicLookupBase.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/lookup/TopicLookupBase.java
@@ -67,16 +67,22 @@ public class TopicLookupBase extends PulsarWebResource {
                 .thenCompose(__ -> 
validateGlobalNamespaceOwnershipAsync(topicName.getNamespaceObject()))
                 .thenCompose(__ -> validateTopicOperationAsync(topicName, 
TopicOperation.LOOKUP, null))
                 .thenCompose(__ -> {
+                    // Case-1: Non-persistent topic.
                     // Currently, it's hard to check the 
non-persistent-non-partitioned topic, because it only exists
                     // in the broker, it doesn't have metadata. If the topic 
is non-persistent and non-partitioned,
-                    // we'll return the true flag.
-                    CompletableFuture<Boolean> existFuture = 
(!topicName.isPersistent() && !topicName.isPartitioned())
-                            ? CompletableFuture.completedFuture(true)
-                            : 
pulsar().getNamespaceService().checkTopicExists(topicName)
-                                .thenCompose(exists -> exists ? 
CompletableFuture.completedFuture(true)
-                                        : 
pulsar().getBrokerService().isAllowAutoTopicCreationAsync(topicName));
-
-                    return existFuture;
+                    // we'll return the true flag. So either it is a 
partitioned topic or not, the result will be true.
+                    if (!topicName.isPersistent()) {
+                        return CompletableFuture.completedFuture(true);
+                    }
+                    // Case-2: Persistent topic.
+                    return 
pulsar().getNamespaceService().checkTopicExists(topicName).thenCompose(info -> {
+                        boolean exists = info.isExists();
+                        info.recycle();
+                        if (exists) {
+                            return CompletableFuture.completedFuture(true);
+                        }
+                        return 
pulsar().getBrokerService().isAllowAutoTopicCreationAsync(topicName);
+                    });
                 })
                 .thenCompose(exist -> {
                     if (!exist) {
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java
index 80559b736c6..9df2b09204c 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java
@@ -51,6 +51,7 @@ import java.util.regex.Matcher;
 import java.util.regex.Pattern;
 import java.util.stream.Collectors;
 import javax.annotation.Nullable;
+import lombok.extern.slf4j.Slf4j;
 import org.apache.commons.collections4.CollectionUtils;
 import org.apache.commons.collections4.ListUtils;
 import org.apache.commons.lang3.StringUtils;
@@ -72,6 +73,7 @@ import 
org.apache.pulsar.broker.service.nonpersistent.NonPersistentTopic;
 import org.apache.pulsar.broker.stats.prometheus.metrics.Summary;
 import org.apache.pulsar.broker.web.PulsarWebResource;
 import org.apache.pulsar.client.admin.PulsarAdmin;
+import org.apache.pulsar.client.admin.PulsarAdminException;
 import org.apache.pulsar.client.api.ClientBuilder;
 import org.apache.pulsar.client.api.PulsarClient;
 import org.apache.pulsar.client.api.PulsarClientException;
@@ -123,6 +125,7 @@ import org.slf4j.LoggerFactory;
  *
  * @see org.apache.pulsar.broker.PulsarService
  */
+@Slf4j
 public class NamespaceService implements AutoCloseable {
     private static final Logger LOG = 
LoggerFactory.getLogger(NamespaceService.class);
 
@@ -1400,40 +1403,86 @@ public class NamespaceService implements AutoCloseable {
                 });
     }
 
-    public CompletableFuture<Boolean> checkTopicExists(TopicName topic) {
-        CompletableFuture<Boolean> future;
-        // If the topic is persistent and the name includes `-partition-`, 
find the topic from the managed/ledger.
-        if (topic.isPersistent() && topic.isPartitioned()) {
-            future = 
pulsar.getPulsarResources().getTopicResources().persistentTopicExists(topic);
+    /***
+     * Check topic exists( partitioned or non-partitioned ).
+     */
+    public CompletableFuture<TopicExistsInfo> checkTopicExists(TopicName 
topic) {
+        return pulsar.getBrokerService()
+            
.fetchPartitionedTopicMetadataAsync(TopicName.get(topic.toString()))
+            .thenCompose(metadata -> {
+                if (metadata.partitions > 0) {
+                    return CompletableFuture.completedFuture(
+                            
TopicExistsInfo.newPartitionedTopicExists(metadata.partitions));
+                }
+                return checkNonPartitionedTopicExists(topic)
+                    .thenApply(b -> b ? 
TopicExistsInfo.newNonPartitionedTopicExists()
+                            : TopicExistsInfo.newTopicNotExists());
+            });
+    }
+
+    /***
+     * Check non-partitioned topic exists.
+     */
+    public CompletableFuture<Boolean> checkNonPartitionedTopicExists(TopicName 
topic) {
+        if (topic.isPersistent()) {
+            return 
pulsar.getPulsarResources().getTopicResources().persistentTopicExists(topic);
         } else {
-            future = CompletableFuture.completedFuture(false);
+            return 
checkNonPersistentNonPartitionedTopicExists(topic.toString());
         }
+    }
 
-        return future.thenCompose(found -> {
-            if (found != null && found) {
-                return CompletableFuture.completedFuture(true);
+    /**
+     * Regarding non-persistent topic, we do not know whether it exists or 
not. Redirect the request to the ownership
+     * broker of this topic. HTTP API has implemented the mechanism that 
redirect to ownership broker, so just call
+     * HTTP API here.
+     */
+    public CompletableFuture<Boolean> 
checkNonPersistentNonPartitionedTopicExists(String topic) {
+        TopicName topicName = TopicName.get(topic);
+        // "non-partitioned & non-persistent" topics only exist on the owner 
broker.
+        return checkTopicOwnership(TopicName.get(topic)).thenCompose(isOwned 
-> {
+            // The current broker is the owner.
+            if (isOwned) {
+               CompletableFuture<Optional<Topic>> nonPersistentTopicFuture = 
pulsar.getBrokerService()
+                       .getTopic(topic, false);
+               if (nonPersistentTopicFuture != null) {
+                   return 
nonPersistentTopicFuture.thenApply(Optional::isPresent);
+               } else {
+                   return CompletableFuture.completedFuture(false);
+               }
             }
 
-            return pulsar.getBrokerService()
-                    
.fetchPartitionedTopicMetadataAsync(TopicName.get(topic.getPartitionedTopicName()))
-                    .thenCompose(metadata -> {
-                        if (metadata.partitions > 0) {
-                            return CompletableFuture.completedFuture(true);
-                        }
-
-                        if (topic.isPersistent()) {
-                            return 
pulsar.getPulsarResources().getTopicResources().persistentTopicExists(topic);
-                        } else {
-                            // The non-partitioned non-persistent topic only 
exist in the broker topics.
-                            CompletableFuture<Optional<Topic>> 
nonPersistentTopicFuture =
-                                    
pulsar.getBrokerService().getTopics().get(topic.toString());
-                            if (nonPersistentTopicFuture == null) {
+            // Forward to the owner broker.
+            PulsarClientImpl pulsarClient;
+            try {
+                pulsarClient = (PulsarClientImpl) pulsar.getClient();
+            } catch (Exception ex) {
+                // This error will never occur.
+                log.error("{} Failed to get partition metadata due to create 
internal admin client fails", topic, ex);
+                return FutureUtil.failedFuture(ex);
+            }
+            LookupOptions lookupOptions = 
LookupOptions.builder().readOnly(false).authoritative(true).build();
+            return getBrokerServiceUrlAsync(TopicName.get(topic), 
lookupOptions)
+                .thenCompose(lookupResult -> {
+                    if (!lookupResult.isPresent()) {
+                        log.error("{} Failed to get partition metadata due can 
not find the owner broker", topic);
+                        return FutureUtil.failedFuture(new 
ServiceUnitNotReadyException(
+                                "No broker was available to own " + 
topicName));
+                    }
+                    return 
pulsarClient.getLookup(lookupResult.get().getLookupData().getBrokerUrl())
+                        .getPartitionedTopicMetadata(topicName, false)
+                        .thenApply(metadata -> true)
+                        .exceptionallyCompose(ex -> {
+                            Throwable actEx = 
FutureUtil.unwrapCompletionException(ex);
+                            if (actEx instanceof 
PulsarClientException.NotFoundException
+                                    || actEx instanceof 
PulsarClientException.TopicDoesNotExistException
+                                    || actEx instanceof 
PulsarAdminException.NotFoundException) {
                                 return 
CompletableFuture.completedFuture(false);
                             } else {
-                                return 
nonPersistentTopicFuture.thenApply(Optional::isPresent);
+                                log.error("{} Failed to get partition metadata 
due to redirecting fails", topic, ex);
+                                return CompletableFuture.failedFuture(ex);
                             }
-                        }
-                    });
+                        });
+            });
         });
     }
 
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/TopicExistsInfo.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/TopicExistsInfo.java
new file mode 100644
index 00000000000..1c3f117719e
--- /dev/null
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/TopicExistsInfo.java
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.namespace;
+
+import io.netty.util.Recycler;
+import lombok.Getter;
+import org.apache.pulsar.common.policies.data.TopicType;
+
+public class TopicExistsInfo {
+
+    private static final Recycler<TopicExistsInfo> RECYCLER = new Recycler<>() 
{
+        @Override
+        protected TopicExistsInfo newObject(Handle<TopicExistsInfo> handle) {
+            return new TopicExistsInfo(handle);
+        }
+    };
+
+    private static TopicExistsInfo nonPartitionedExists = new 
TopicExistsInfo(true, 0);
+
+    private static TopicExistsInfo notExists = new TopicExistsInfo(false, 0);
+
+    public static TopicExistsInfo newPartitionedTopicExists(Integer 
partitions){
+        TopicExistsInfo info = RECYCLER.get();
+        info.exists = true;
+        info.partitions = partitions.intValue();
+        return info;
+    }
+
+    public static TopicExistsInfo newNonPartitionedTopicExists(){
+        return nonPartitionedExists;
+    }
+
+    public static TopicExistsInfo newTopicNotExists(){
+        return notExists;
+    }
+
+    private final Recycler.Handle<TopicExistsInfo> handle;
+
+    @Getter
+    private int partitions;
+    @Getter
+    private boolean exists;
+
+    private TopicExistsInfo(Recycler.Handle<TopicExistsInfo> handle) {
+        this.handle = handle;
+    }
+
+    private TopicExistsInfo(boolean exists, int partitions) {
+        this.handle = null;
+        this.partitions = partitions;
+        this.exists = exists;
+    }
+
+    public void recycle() {
+        if (this == notExists || this == nonPartitionedExists || this.handle 
== null) {
+            return;
+        }
+        this.exists = false;
+        this.partitions = 0;
+        this.handle.recycle(this);
+    }
+
+    public TopicType getTopicType() {
+        return this.partitions > 0 ? TopicType.PARTITIONED : 
TopicType.NON_PARTITIONED;
+    }
+}
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
index 82d7fad3874..6ecd0a1ba60 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
@@ -3178,65 +3178,66 @@ public class BrokerService implements Closeable {
         if (pulsar.getNamespaceService() == null) {
             return FutureUtil.failedFuture(new NamingException("namespace 
service is not ready"));
         }
-        return 
pulsar.getPulsarResources().getNamespaceResources().getPoliciesAsync(topicName.getNamespaceObject())
-                .thenCompose(policies -> 
pulsar.getNamespaceService().checkTopicExists(topicName)
-                    .thenCompose(topicExists -> 
fetchPartitionedTopicMetadataAsync(topicName)
-                            .thenCompose(metadata -> {
-                                CompletableFuture<PartitionedTopicMetadata> 
future = new CompletableFuture<>();
-
-                                // There are a couple of potentially blocking 
calls, which we cannot make from the
-                                // MetadataStore callback thread.
-                                pulsar.getExecutor().execute(() -> {
-                                    // If topic is already exist, creating 
partitioned topic is not allowed.
-
-                                    if (metadata.partitions == 0
-                                            && !topicExists
-                                            && !topicName.isPartitioned()
-                                            && pulsar.getBrokerService()
-                                            
.isDefaultTopicTypePartitioned(topicName, policies)) {
-                                        
isAllowAutoTopicCreationAsync(topicName, policies).thenAccept(allowed -> {
-                                            if (allowed) {
-                                                pulsar.getBrokerService()
-                                                        
.createDefaultPartitionedTopicAsync(topicName, policies)
-                                                        .thenAccept(md -> 
future.complete(md))
-                                                        .exceptionally(ex -> {
-                                                            if (ex.getCause()
-                                                                    instanceof 
MetadataStoreException
-                                                                    
.AlreadyExistsException) {
-                                                                log.info("[{}] 
The partitioned topic is already"
-                                                                        + " 
created, try to refresh the cache and read"
-                                                                        + " 
again.", topicName);
-                                                                // The 
partitioned topic might be created concurrently
-                                                                
fetchPartitionedTopicMetadataAsync(topicName, true)
-                                                                        
.whenComplete((metadata2, ex2) -> {
-                                                                            if 
(ex2 == null) {
-                                                                               
 future.complete(metadata2);
-                                                                            } 
else {
-                                                                               
 future.completeExceptionally(ex2);
-                                                                            }
-                                                                        });
-                                                            } else {
-                                                                
log.error("[{}] operation of creating partitioned"
-                                                                        + " 
topic metadata failed",
-                                                                        
topicName, ex);
-                                                                
future.completeExceptionally(ex);
-                                                            }
-                                                            return null;
-                                                        });
-                                            } else {
-                                                future.complete(metadata);
-                                            }
-                                        }).exceptionally(ex -> {
-                                            future.completeExceptionally(ex);
-                                            return null;
-                                        });
-                                    } else {
-                                        future.complete(metadata);
-                                    }
-                                });
+        return 
pulsar.getNamespaceService().checkTopicExists(topicName).thenComposeAsync(topicExistsInfo
 -> {
+            final boolean topicExists = topicExistsInfo.isExists();
+            final TopicType topicType = topicExistsInfo.getTopicType();
+            final Integer partitions = topicExistsInfo.getPartitions();
+            topicExistsInfo.recycle();
+
+            // Topic exists.
+            if (topicExists) {
+                if (topicType.equals(TopicType.PARTITIONED)) {
+                    return CompletableFuture.completedFuture(new 
PartitionedTopicMetadata(partitions));
+                }
+                return CompletableFuture.completedFuture(new 
PartitionedTopicMetadata(0));
+            }
 
-                                return future;
-                            })));
+            // Try created if allowed to create a partitioned topic 
automatically.
+            return 
pulsar.getPulsarResources().getNamespaceResources().getPoliciesAsync(topicName.getNamespaceObject())
+                .thenComposeAsync(policies -> {
+                    return isAllowAutoTopicCreationAsync(topicName, 
policies).thenComposeAsync(allowed -> {
+                        // Not Allow auto-creation.
+                        if (!allowed) {
+                            // Do not change the original behavior, or default 
return a non-partitioned topic.
+                            return CompletableFuture.completedFuture(new 
PartitionedTopicMetadata(0));
+                        }
+
+                        // Allow auto create non-partitioned topic.
+                        boolean autoCreatePartitionedTopic = 
pulsar.getBrokerService()
+                                .isDefaultTopicTypePartitioned(topicName, 
policies);
+                        if (!autoCreatePartitionedTopic || 
topicName.isPartitioned()) {
+                            return CompletableFuture.completedFuture(new 
PartitionedTopicMetadata(0));
+                        }
+
+                        // Create partitioned metadata.
+                        return 
pulsar.getBrokerService().createDefaultPartitionedTopicAsync(topicName, 
policies)
+                            .exceptionallyCompose(ex -> {
+                                // The partitioned topic might be created 
concurrently.
+                                if (ex.getCause() instanceof 
MetadataStoreException.AlreadyExistsException) {
+                                    log.info("[{}] The partitioned topic is 
already created, try to refresh the cache"
+                                            + " and read again.", topicName);
+                                    
CompletableFuture<PartitionedTopicMetadata> recheckFuture =
+                                            
fetchPartitionedTopicMetadataAsync(topicName, true);
+                                    recheckFuture.exceptionally(ex2 -> {
+                                        // Just for printing a log if error 
occurs.
+                                        log.error("[{}] Fetch partitioned 
topic metadata failed", topicName, ex);
+                                        return null;
+                                    });
+                                    return recheckFuture;
+                                } else {
+                                    log.error("[{}] operation of creating 
partitioned topic metadata failed",
+                                            topicName, ex);
+                                    return CompletableFuture.failedFuture(ex);
+                                }
+                            });
+                    }, pulsar.getExecutor()).exceptionallyCompose(ex -> {
+                        log.error("[{}] operation of get partitioned metadata 
failed due to calling"
+                                        + " isAllowAutoTopicCreationAsync 
failed",
+                                topicName, ex);
+                        return CompletableFuture.failedFuture(ex);
+                    });
+            }, pulsar.getExecutor());
+        }, pulsar.getExecutor());
     }
 
     @SuppressWarnings("deprecation")
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
index 6901097bbbb..b184f794949 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
@@ -84,8 +84,7 @@ import org.apache.pulsar.broker.limiter.ConnectionController;
 import 
org.apache.pulsar.broker.loadbalance.extensions.ExtensibleLoadManagerImpl;
 import org.apache.pulsar.broker.loadbalance.extensions.data.BrokerLookupData;
 import org.apache.pulsar.broker.namespace.LookupOptions;
-import org.apache.pulsar.broker.resources.NamespaceResources;
-import org.apache.pulsar.broker.resources.TopicResources;
+import org.apache.pulsar.broker.namespace.NamespaceService;
 import 
org.apache.pulsar.broker.service.BrokerServiceException.ConsumerBusyException;
 import 
org.apache.pulsar.broker.service.BrokerServiceException.ServerMetadataException;
 import 
org.apache.pulsar.broker.service.BrokerServiceException.ServiceUnitNotReadyException;
@@ -161,6 +160,7 @@ import 
org.apache.pulsar.common.policies.data.BacklogQuota.BacklogQuotaType;
 import org.apache.pulsar.common.policies.data.ClusterPolicies.ClusterUrl;
 import org.apache.pulsar.common.policies.data.NamespaceOperation;
 import org.apache.pulsar.common.policies.data.TopicOperation;
+import org.apache.pulsar.common.policies.data.TopicType;
 import org.apache.pulsar.common.policies.data.stats.ConsumerStatsImpl;
 import org.apache.pulsar.common.protocol.ByteBufPair;
 import org.apache.pulsar.common.protocol.CommandUtils;
@@ -614,58 +614,33 @@ public class ServerCnx extends PulsarHandler implements 
TransportCnx {
                 if (isAuthorized) {
                     // Get if exists, respond not found error if not exists.
                     
getBrokerService().isAllowAutoTopicCreationAsync(topicName).thenAccept(brokerAllowAutoCreate
 -> {
-                        boolean autoCreateIfNotExist = 
partitionMetadata.isMetadataAutoCreationEnabled();
+                        boolean autoCreateIfNotExist = 
partitionMetadata.isMetadataAutoCreationEnabled()
+                                && brokerAllowAutoCreate;
                         if (!autoCreateIfNotExist) {
-                            final NamespaceResources namespaceResources = 
getBrokerService().pulsar()
-                                    
.getPulsarResources().getNamespaceResources();
-                            final TopicResources topicResources = 
getBrokerService().pulsar().getPulsarResources()
-                                    .getTopicResources();
-                            namespaceResources.getPartitionedTopicResources()
-                                .getPartitionedTopicMetadataAsync(topicName, 
false)
-                                .handle((metadata, getMetadataEx) -> {
-                                    if (getMetadataEx != null) {
-                                        log.error("{} {} Failed to get 
partition metadata", topicName,
-                                                ServerCnx.this.toString(), 
getMetadataEx);
-                                        writeAndFlush(
-                                                
Commands.newPartitionMetadataResponse(ServerError.MetadataError,
-                                                        "Failed to get 
partition metadata",
-                                                        requestId));
-                                    } else if (metadata.isPresent()) {
-                                        
commandSender.sendPartitionMetadataResponse(metadata.get().partitions,
-                                                requestId);
-                                    } else if (topicName.isPersistent()) {
-                                        
topicResources.persistentTopicExists(topicName).thenAccept(exists -> {
-                                            if (exists) {
-                                                
commandSender.sendPartitionMetadataResponse(0, requestId);
-                                                return;
-                                            }
-                                            
writeAndFlush(Commands.newPartitionMetadataResponse(
-                                                    ServerError.TopicNotFound, 
"", requestId));
-                                        }).exceptionally(ex -> {
-                                            log.error("{} {} Failed to get 
partition metadata", topicName,
-                                                    ServerCnx.this.toString(), 
ex);
-                                            writeAndFlush(
-                                                    
Commands.newPartitionMetadataResponse(ServerError.MetadataError,
-                                                            "Failed to check 
partition metadata",
-                                                            requestId));
-                                            return null;
-                                        });
-                                    } else {
-                                        // Regarding non-persistent topic, we 
do not know whether it exists or not.
-                                        // Just return a non-partitioned 
metadata if partitioned metadata does not
-                                        // exist.
-                                        // Broker will respond a not found 
error when doing subscribing or producing if
-                                        // broker not allow to auto create 
topics.
-                                        
commandSender.sendPartitionMetadataResponse(0, requestId);
-                                    }
-                                    return null;
-                                }).whenComplete((ignore, ignoreEx) -> {
-                                    lookupSemaphore.release();
-                                    if (ignoreEx != null) {
-                                        log.error("{} {} Failed to handle 
partition metadata request", topicName,
-                                                ServerCnx.this.toString(), 
ignoreEx);
-                                    }
-                                });
+                            NamespaceService namespaceService = 
getBrokerService().getPulsar().getNamespaceService();
+                            
namespaceService.checkTopicExists(topicName).thenAccept(topicExistsInfo -> {
+                                lookupSemaphore.release();
+                                if (!topicExistsInfo.isExists()) {
+                                    
writeAndFlush(Commands.newPartitionMetadataResponse(
+                                            ServerError.TopicNotFound, "", 
requestId));
+                                } else if 
(topicExistsInfo.getTopicType().equals(TopicType.PARTITIONED)) {
+                                    
commandSender.sendPartitionMetadataResponse(topicExistsInfo.getPartitions(),
+                                            requestId);
+                                } else {
+                                    
commandSender.sendPartitionMetadataResponse(0, requestId);
+                                }
+                                // release resources.
+                                topicExistsInfo.recycle();
+                            }).exceptionally(ex -> {
+                                lookupSemaphore.release();
+                                log.error("{} {} Failed to get partition 
metadata", topicName,
+                                        ServerCnx.this.toString(), ex);
+                                writeAndFlush(
+                                        
Commands.newPartitionMetadataResponse(ServerError.MetadataError,
+                                                "Failed to get partition 
metadata",
+                                                requestId));
+                                return null;
+                            });
                         } else {
                             // Get if exists, create a new one if not exists.
                             
unsafeGetPartitionedTopicMetadataAsync(getBrokerService().pulsar(), topicName)
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/GetPartitionMetadataMultiBrokerTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/GetPartitionMetadataMultiBrokerTest.java
new file mode 100644
index 00000000000..28cf91ee165
--- /dev/null
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/GetPartitionMetadataMultiBrokerTest.java
@@ -0,0 +1,222 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.admin;
+
+import static org.testng.Assert.assertFalse;
+import static org.testng.Assert.assertTrue;
+import java.net.URL;
+import java.util.List;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.pulsar.broker.PulsarService;
+import org.apache.pulsar.client.admin.PulsarAdmin;
+import org.apache.pulsar.client.api.PulsarClient;
+import org.apache.pulsar.client.impl.PulsarClientImpl;
+import org.apache.pulsar.common.naming.TopicDomain;
+import org.apache.pulsar.common.naming.TopicName;
+import org.apache.pulsar.common.policies.data.TopicType;
+import org.testng.annotations.AfterClass;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Test;
+
+@Test(groups = "broker-admin")
+@Slf4j
+public class GetPartitionMetadataMultiBrokerTest extends 
GetPartitionMetadataTest {
+
+    private PulsarService pulsar2;
+    private URL url2;
+    private PulsarAdmin admin2;
+    private PulsarClientImpl clientWithHttpLookup2;
+    private PulsarClientImpl clientWitBinaryLookup2;
+
+    @BeforeClass(alwaysRun = true)
+    protected void setup() throws Exception {
+        super.setup();
+    }
+
+    @Override
+    @AfterClass(alwaysRun = true)
+    protected void cleanup() throws Exception {
+        super.cleanup();
+    }
+
+    @Override
+    protected void cleanupBrokers() throws Exception {
+        // Cleanup broker2.
+        if (clientWithHttpLookup2 != null) {
+            clientWithHttpLookup2.close();
+            clientWithHttpLookup2 = null;
+        }
+        if (clientWitBinaryLookup2 != null) {
+            clientWitBinaryLookup2.close();
+            clientWitBinaryLookup2 = null;
+        }
+        if (admin2 != null) {
+            admin2.close();
+            admin2 = null;
+        }
+        if (pulsar2 != null) {
+            pulsar2.close();
+            pulsar2 = null;
+        }
+
+        // Super cleanup.
+        super.cleanupBrokers();
+    }
+
+    @Override
+    protected void setupBrokers() throws Exception {
+        super.setupBrokers();
+        doInitConf();
+        pulsar2 = new PulsarService(conf);
+        pulsar2.start();
+        url2 = new URL(pulsar2.getWebServiceAddress());
+        admin2 = PulsarAdmin.builder().serviceHttpUrl(url2.toString()).build();
+        clientWithHttpLookup2 =
+                (PulsarClientImpl) 
PulsarClient.builder().serviceUrl(pulsar2.getWebServiceAddress()).build();
+        clientWitBinaryLookup2 =
+                (PulsarClientImpl) 
PulsarClient.builder().serviceUrl(pulsar2.getBrokerServiceUrl()).build();
+    }
+
+    @Override
+    protected PulsarClientImpl[] getClientsToTest() {
+        return new PulsarClientImpl[] {clientWithHttpLookup1, 
clientWitBinaryLookup1,
+                clientWithHttpLookup2, clientWitBinaryLookup2};
+    }
+
+    protected PulsarClientImpl[] getClientsToTest(boolean isUsingHttpLookup) {
+        if (isUsingHttpLookup) {
+            return new PulsarClientImpl[]{clientWithHttpLookup1, 
clientWithHttpLookup2};
+        } else {
+            return new PulsarClientImpl[]{clientWitBinaryLookup1, 
clientWitBinaryLookup2};
+        }
+    }
+
+    @Override
+    protected int getLookupRequestPermits() {
+        return 
pulsar1.getBrokerService().getLookupRequestSemaphore().availablePermits()
+                + 
pulsar2.getBrokerService().getLookupRequestSemaphore().availablePermits();
+    }
+
+    protected void verifyPartitionsNeverCreated(String topicNameStr) throws 
Exception {
+        TopicName topicName = TopicName.get(topicNameStr);
+        try {
+            List<String> topicList = admin1.topics().getList("public/default");
+            for (int i = 0; i < 3; i++) {
+                assertFalse(topicList.contains(topicName.getPartition(i)));
+            }
+        } catch (Exception ex) {
+            // If the namespace bundle has not been loaded yet, it means no 
non-persistent topic was created. So
+            //   this behavior is also correct.
+            // This error is not expected, a seperated PR is needed to fix 
this issue.
+            assertTrue(ex.getMessage().contains("Failed to find ownership 
for"));
+        }
+    }
+
+    protected void verifyNonPartitionedTopicNeverCreated(String topicNameStr) 
throws Exception {
+        TopicName topicName = TopicName.get(topicNameStr);
+        try {
+            List<String> topicList = admin1.topics().getList("public/default");
+            
assertFalse(topicList.contains(topicName.getPartitionedTopicName()));
+        } catch (Exception ex) {
+            // If the namespace bundle has not been loaded yet, it means no 
non-persistent topic was created. So
+            //   this behavior is also correct.
+            // This error is not expected, a seperated PR is needed to fix 
this issue.
+            assertTrue(ex.getMessage().contains("Failed to find ownership 
for"));
+        }
+    }
+
+    protected void modifyTopicAutoCreation(boolean allowAutoTopicCreation,
+                                           TopicType 
allowAutoTopicCreationType,
+                                           int defaultNumPartitions) throws 
Exception {
+        doModifyTopicAutoCreation(admin1, pulsar1, allowAutoTopicCreation, 
allowAutoTopicCreationType,
+                defaultNumPartitions);
+        doModifyTopicAutoCreation(admin2, pulsar2, allowAutoTopicCreation, 
allowAutoTopicCreationType,
+                defaultNumPartitions);
+    }
+
+    /**
+     * {@inheritDoc}
+     */
+    @Test(dataProvider = "topicDomains")
+    public void testAutoCreatingMetadataWhenCallingOldAPI(TopicDomain 
topicDomain) throws Exception {
+        super.testAutoCreatingMetadataWhenCallingOldAPI(topicDomain);
+    }
+
+    /**
+     * {@inheritDoc}
+     */
+    @Test(dataProvider = "autoCreationParamsAll", enabled = false)
+    public void testGetMetadataIfNonPartitionedTopicExists(boolean 
configAllowAutoTopicCreation,
+                                                           boolean 
paramMetadataAutoCreationEnabled,
+                                                           boolean 
isUsingHttpLookup,
+                                                           TopicDomain 
topicDomain) throws Exception {
+        
super.testGetMetadataIfNonPartitionedTopicExists(configAllowAutoTopicCreation, 
paramMetadataAutoCreationEnabled,
+                isUsingHttpLookup, topicDomain);
+    }
+
+    /**
+     * {@inheritDoc}
+     */
+    @Test(dataProvider = "autoCreationParamsAll")
+    public void testGetMetadataIfPartitionedTopicExists(boolean 
configAllowAutoTopicCreation,
+                                                        boolean 
paramMetadataAutoCreationEnabled,
+                                                        boolean 
isUsingHttpLookup,
+                                                        TopicDomain 
topicDomain) throws Exception {
+        
super.testGetMetadataIfNonPartitionedTopicExists(configAllowAutoTopicCreation, 
paramMetadataAutoCreationEnabled,
+                isUsingHttpLookup, topicDomain);
+    }
+
+    /**
+     * {@inheritDoc}
+     */
+    @Test(dataProvider = "clients")
+    public void testAutoCreatePartitionedTopic(boolean isUsingHttpLookup, 
TopicDomain topicDomain) throws Exception {
+        super.testAutoCreatePartitionedTopic(isUsingHttpLookup, topicDomain);
+    }
+
+    /**
+     * {@inheritDoc}
+     */
+    @Test(dataProvider = "clients")
+    public void testAutoCreateNonPartitionedTopic(boolean isUsingHttpLookup, 
TopicDomain topicDomain) throws Exception {
+        super.testAutoCreateNonPartitionedTopic(isUsingHttpLookup, 
topicDomain);
+    }
+
+    /**
+     * {@inheritDoc}
+     */
+    @Test(dataProvider = "autoCreationParamsNotAllow")
+    public void testGetMetadataIfNotAllowedCreate(boolean 
configAllowAutoTopicCreation,
+                                                  boolean 
paramMetadataAutoCreationEnabled,
+                                                  boolean isUsingHttpLookup) 
throws Exception {
+        super.testGetMetadataIfNotAllowedCreate(configAllowAutoTopicCreation, 
paramMetadataAutoCreationEnabled,
+                isUsingHttpLookup);
+    }
+
+    /**
+     * {@inheritDoc}
+     */
+    @Test(dataProvider = "autoCreationParamsNotAllow")
+    public void testGetMetadataIfNotAllowedCreateOfNonPersistentTopic(boolean 
configAllowAutoTopicCreation,
+                                                  boolean 
paramMetadataAutoCreationEnabled,
+                                                  boolean isUsingHttpLookup) 
throws Exception {
+        
super.testGetMetadataIfNotAllowedCreateOfNonPersistentTopic(configAllowAutoTopicCreation,
+                paramMetadataAutoCreationEnabled, isUsingHttpLookup);
+    }
+}
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/GetPartitionMetadataTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/GetPartitionMetadataTest.java
index 51f643d2b78..bf99b172829 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/GetPartitionMetadataTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/GetPartitionMetadataTest.java
@@ -22,70 +22,150 @@ import static org.testng.Assert.assertEquals;
 import static org.testng.Assert.assertFalse;
 import static org.testng.Assert.assertTrue;
 import static org.testng.Assert.fail;
+import com.google.common.collect.Sets;
+import java.net.URL;
+import java.util.Collections;
 import java.util.List;
 import java.util.Optional;
-import java.util.concurrent.Semaphore;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.pulsar.broker.BrokerTestUtil;
+import org.apache.pulsar.broker.PulsarService;
 import org.apache.pulsar.broker.ServiceConfiguration;
-import org.apache.pulsar.client.api.ProducerConsumerBase;
+import org.apache.pulsar.client.admin.PulsarAdmin;
 import org.apache.pulsar.client.api.PulsarClient;
 import org.apache.pulsar.client.api.PulsarClientException;
-import org.apache.pulsar.client.impl.LookupService;
 import org.apache.pulsar.client.impl.PulsarClientImpl;
+import org.apache.pulsar.common.naming.NamespaceName;
 import org.apache.pulsar.common.naming.TopicDomain;
 import org.apache.pulsar.common.naming.TopicName;
 import org.apache.pulsar.common.partition.PartitionedTopicMetadata;
+import org.apache.pulsar.common.policies.data.ClusterDataImpl;
+import org.apache.pulsar.common.policies.data.TenantInfoImpl;
 import org.apache.pulsar.common.policies.data.TopicType;
 import org.apache.pulsar.common.util.FutureUtil;
+import org.apache.pulsar.zookeeper.LocalBookkeeperEnsemble;
 import org.awaitility.Awaitility;
-import org.testng.Assert;
-import org.testng.annotations.AfterMethod;
+import org.testng.annotations.AfterClass;
+import org.testng.annotations.BeforeClass;
 import org.testng.annotations.DataProvider;
 import org.testng.annotations.Test;
 
 @Test(groups = "broker-admin")
 @Slf4j
-public class GetPartitionMetadataTest extends ProducerConsumerBase {
+public class GetPartitionMetadataTest {
 
-    private static final String DEFAULT_NS = "public/default";
+    protected static final String DEFAULT_NS = "public/default";
 
-    private PulsarClientImpl clientWithHttpLookup;
-    private PulsarClientImpl clientWitBinaryLookup;
+    protected String clusterName = "c1";
 
-    @Override
+    protected LocalBookkeeperEnsemble bkEnsemble;
+
+    protected ServiceConfiguration conf = new ServiceConfiguration();
+
+    protected PulsarService pulsar1;
+    protected URL url1;
+    protected PulsarAdmin admin1;
+    protected PulsarClientImpl clientWithHttpLookup1;
+    protected PulsarClientImpl clientWitBinaryLookup1;
+
+    @BeforeClass(alwaysRun = true)
     protected void setup() throws Exception {
-        super.internalSetup();
-        super.producerBaseSetup();
-        clientWithHttpLookup =
-                (PulsarClientImpl) 
PulsarClient.builder().serviceUrl(pulsar.getWebServiceAddress()).build();
-        clientWitBinaryLookup =
-                (PulsarClientImpl) 
PulsarClient.builder().serviceUrl(pulsar.getBrokerServiceUrl()).build();
+        bkEnsemble = new LocalBookkeeperEnsemble(3, 0, () -> 0);
+        bkEnsemble.start();
+        // Start broker.
+        setupBrokers();
+        // Create default NS.
+        admin1.clusters().createCluster(clusterName, new ClusterDataImpl());
+        
admin1.tenants().createTenant(NamespaceName.get(DEFAULT_NS).getTenant(),
+                new TenantInfoImpl(Collections.emptySet(), 
Sets.newHashSet(clusterName)));
+        admin1.namespaces().createNamespace(DEFAULT_NS);
     }
 
-    @Override
-    @AfterMethod(alwaysRun = true)
+    @AfterClass(alwaysRun = true)
     protected void cleanup() throws Exception {
-        super.internalCleanup();
-        if (clientWithHttpLookup != null) {
-            clientWithHttpLookup.close();
+        cleanupBrokers();
+        if (bkEnsemble != null) {
+            bkEnsemble.stop();
+            bkEnsemble = null;
+        }
+    }
+
+    protected void cleanupBrokers() throws Exception {
+        // Cleanup broker2.
+        if (clientWithHttpLookup1 != null) {
+            clientWithHttpLookup1.close();
+            clientWithHttpLookup1 = null;
+        }
+        if (clientWitBinaryLookup1 != null) {
+            clientWitBinaryLookup1.close();
+            clientWitBinaryLookup1 = null;
         }
-        if (clientWitBinaryLookup != null) {
-            clientWitBinaryLookup.close();
+        if (admin1 != null) {
+            admin1.close();
+            admin1 = null;
         }
+        if (pulsar1 != null) {
+            pulsar1.close();
+            pulsar1 = null;
+        }
+        // Reset configs.
+        conf = new ServiceConfiguration();
+    }
+
+    protected void setupBrokers() throws Exception {
+        doInitConf();
+        // Start broker.
+        pulsar1 = new PulsarService(conf);
+        pulsar1.start();
+        url1 = new URL(pulsar1.getWebServiceAddress());
+        admin1 = PulsarAdmin.builder().serviceHttpUrl(url1.toString()).build();
+        clientWithHttpLookup1 =
+                (PulsarClientImpl) 
PulsarClient.builder().serviceUrl(pulsar1.getWebServiceAddress()).build();
+        clientWitBinaryLookup1 =
+                (PulsarClientImpl) 
PulsarClient.builder().serviceUrl(pulsar1.getBrokerServiceUrl()).build();
     }
 
-    @Override
-    protected void doInitConf() throws Exception {
-        super.doInitConf();
+    protected void doInitConf() {
+        conf.setClusterName(clusterName);
+        conf.setAdvertisedAddress("localhost");
+        conf.setBrokerServicePort(Optional.of(0));
+        conf.setWebServicePort(Optional.of(0));
+        conf.setMetadataStoreUrl("zk:127.0.0.1:" + 
bkEnsemble.getZookeeperPort());
+        conf.setConfigurationMetadataStoreUrl("zk:127.0.0.1:" + 
bkEnsemble.getZookeeperPort() + "/foo");
+        conf.setBrokerDeleteInactiveTopicsEnabled(false);
+        conf.setBrokerShutdownTimeoutMs(0L);
+        conf.setLoadBalancerSheddingEnabled(false);
     }
 
-    private LookupService getLookupService(boolean isUsingHttpLookup) {
+    protected PulsarClientImpl[] getClientsToTest() {
+        return new PulsarClientImpl[] {clientWithHttpLookup1, 
clientWitBinaryLookup1};
+    }
+
+    protected PulsarClientImpl[] getClientsToTest(boolean isUsingHttpLookup) {
         if (isUsingHttpLookup) {
-            return clientWithHttpLookup.getLookup();
+            return new PulsarClientImpl[] {clientWithHttpLookup1};
         } else {
-            return clientWitBinaryLookup.getLookup();
+            return new PulsarClientImpl[] {clientWitBinaryLookup1};
         }
+
+    }
+
+    protected int getLookupRequestPermits() {
+        return 
pulsar1.getBrokerService().getLookupRequestSemaphore().availablePermits();
+    }
+
+    protected void verifyPartitionsNeverCreated(String topicNameStr) throws 
Exception {
+        TopicName topicName = TopicName.get(topicNameStr);
+        List<String> topicList = admin1.topics().getList("public/default");
+        for (int i = 0; i < 3; i++) {
+            assertFalse(topicList.contains(topicName.getPartition(i)));
+        }
+    }
+
+    protected void verifyNonPartitionedTopicNeverCreated(String topicNameStr) 
throws Exception {
+        TopicName topicName = TopicName.get(topicNameStr);
+        List<String> topicList = admin1.topics().getList("public/default");
+        assertFalse(topicList.contains(topicName.getPartitionedTopicName()));
     }
 
     @DataProvider(name = "topicDomains")
@@ -96,43 +176,53 @@ public class GetPartitionMetadataTest extends 
ProducerConsumerBase {
         };
     }
 
-    @Test(dataProvider = "topicDomains")
-    public void testAutoCreatingMetadataWhenCallingOldAPI(TopicDomain 
topicDomain) throws Exception {
-        conf.setAllowAutoTopicCreationType(TopicType.PARTITIONED);
-        conf.setDefaultNumPartitions(3);
-        conf.setAllowAutoTopicCreation(true);
-        setup();
-
-        Semaphore semaphore = 
pulsar.getBrokerService().getLookupRequestSemaphore();
-        int lookupPermitsBefore = semaphore.availablePermits();
-
-        // HTTP client.
-        final String tp1 = BrokerTestUtil.newUniqueName(topicDomain.value() + 
"://" + DEFAULT_NS + "/tp");
-        clientWithHttpLookup.getPartitionsForTopic(tp1).join();
-        Optional<PartitionedTopicMetadata> metadata1 = 
pulsar.getPulsarResources().getNamespaceResources()
-                .getPartitionedTopicResources()
-                .getPartitionedTopicMetadataAsync(TopicName.get(tp1), 
true).join();
-        assertTrue(metadata1.isPresent());
-        assertEquals(metadata1.get().partitions, 3);
-
-        // Binary client.
-        final String tp2 = BrokerTestUtil.newUniqueName(topicDomain.value() + 
"://" + DEFAULT_NS + "/tp");
-        clientWitBinaryLookup.getPartitionsForTopic(tp2).join();
-        Optional<PartitionedTopicMetadata> metadata2 = 
pulsar.getPulsarResources().getNamespaceResources()
-                .getPartitionedTopicResources()
-                .getPartitionedTopicMetadataAsync(TopicName.get(tp2), 
true).join();
-        assertTrue(metadata2.isPresent());
-        assertEquals(metadata2.get().partitions, 3);
-
-        // Verify: lookup semaphore has been releases.
+    protected static void doModifyTopicAutoCreation(PulsarAdmin admin1, 
PulsarService pulsar1,
+                                                  boolean 
allowAutoTopicCreation, TopicType allowAutoTopicCreationType,
+                                                  int defaultNumPartitions) 
throws Exception {
+        admin1.brokers().updateDynamicConfiguration(
+                "allowAutoTopicCreation", allowAutoTopicCreation + "");
+        admin1.brokers().updateDynamicConfiguration(
+                "allowAutoTopicCreationType", allowAutoTopicCreationType + "");
+        admin1.brokers().updateDynamicConfiguration(
+                "defaultNumPartitions", defaultNumPartitions + "");
         Awaitility.await().untilAsserted(() -> {
-            int lookupPermitsAfter = semaphore.availablePermits();
-            assertEquals(lookupPermitsAfter, lookupPermitsBefore);
+            
assertEquals(pulsar1.getConfiguration().isAllowAutoTopicCreation(), 
allowAutoTopicCreation);
+            
assertEquals(pulsar1.getConfiguration().getAllowAutoTopicCreationType(), 
allowAutoTopicCreationType);
+            assertEquals(pulsar1.getConfiguration().getDefaultNumPartitions(), 
defaultNumPartitions);
         });
+    }
 
-        // Cleanup.
-        admin.topics().deletePartitionedTopic(tp1, false);
-        admin.topics().deletePartitionedTopic(tp2, false);
+    protected void modifyTopicAutoCreation(boolean allowAutoTopicCreation,
+                                           TopicType 
allowAutoTopicCreationType,
+                                           int defaultNumPartitions) throws 
Exception {
+        doModifyTopicAutoCreation(admin1, pulsar1, allowAutoTopicCreation, 
allowAutoTopicCreationType,
+                defaultNumPartitions);
+    }
+
+    @Test(dataProvider = "topicDomains")
+    public void testAutoCreatingMetadataWhenCallingOldAPI(TopicDomain 
topicDomain) throws Exception {
+        modifyTopicAutoCreation(true, TopicType.PARTITIONED, 3);
+
+        int lookupPermitsBefore = getLookupRequestPermits();
+
+        for (PulsarClientImpl client : getClientsToTest()) {
+            // Verify: the behavior of topic creation.
+            final String tp = BrokerTestUtil.newUniqueName(topicDomain.value() 
+ "://" + DEFAULT_NS + "/tp");
+            client.getPartitionsForTopic(tp).join();
+            Optional<PartitionedTopicMetadata> metadata1 = 
pulsar1.getPulsarResources().getNamespaceResources()
+                    .getPartitionedTopicResources()
+                    .getPartitionedTopicMetadataAsync(TopicName.get(tp), 
true).join();
+            assertTrue(metadata1.isPresent());
+            assertEquals(metadata1.get().partitions, 3);
+
+            // Verify: lookup semaphore has been releases.
+            Awaitility.await().untilAsserted(() -> {
+                assertEquals(getLookupRequestPermits(), lookupPermitsBefore);
+            });
+
+            // Cleanup.
+            admin1.topics().deletePartitionedTopic(tp, false);
+        }
     }
 
     @DataProvider(name = "autoCreationParamsAll")
@@ -163,40 +253,32 @@ public class GetPartitionMetadataTest extends 
ProducerConsumerBase {
                                                            boolean 
paramMetadataAutoCreationEnabled,
                                                            boolean 
isUsingHttpLookup,
                                                            TopicDomain 
topicDomain) throws Exception {
-        conf.setAllowAutoTopicCreationType(TopicType.PARTITIONED);
-        conf.setDefaultNumPartitions(3);
-        conf.setAllowAutoTopicCreation(configAllowAutoTopicCreation);
-        setup();
+        modifyTopicAutoCreation(configAllowAutoTopicCreation, 
TopicType.PARTITIONED, 3);
 
-        Semaphore semaphore = 
pulsar.getBrokerService().getLookupRequestSemaphore();
-        int lookupPermitsBefore = semaphore.availablePermits();
+        int lookupPermitsBefore = getLookupRequestPermits();
 
-        LookupService lookup = getLookupService(isUsingHttpLookup);
         // Create topic.
-        final String topicNameStr = 
BrokerTestUtil.newUniqueName(topicDomain.value() + "://" + DEFAULT_NS + "/tp");
-        final TopicName topicName = TopicName.get(topicNameStr);
-        admin.topics().createNonPartitionedTopic(topicNameStr);
-        // Verify.
-        PulsarClient client = 
PulsarClient.builder().serviceUrl(pulsar.getBrokerServiceUrl()).build();
-        PartitionedTopicMetadata response =
-                lookup.getPartitionedTopicMetadata(topicName, 
paramMetadataAutoCreationEnabled).join();
-        assertEquals(response.partitions, 0);
-        List<String> partitionedTopics = 
admin.topics().getPartitionedTopicList("public/default");
-        assertFalse(partitionedTopics.contains(topicNameStr));
-        List<String> topicList = admin.topics().getList("public/default");
-        for (int i = 0; i < 3; i++) {
-            assertFalse(topicList.contains(topicName.getPartition(i)));
-        }
+        final String topicNameStr = 
BrokerTestUtil.newUniqueName(topicDomain.value() + "://" + DEFAULT_NS + "/tp_");
+        admin1.topics().createNonPartitionedTopic(topicNameStr);
+
+        PulsarClientImpl[] clientArray = getClientsToTest(isUsingHttpLookup);
+        for (PulsarClientImpl client : clientArray) {
+            // Verify: the result of get partitioned topic metadata.
+            PartitionedTopicMetadata response =
+                    client.getPartitionedTopicMetadata(topicNameStr, 
paramMetadataAutoCreationEnabled).join();
+            assertEquals(response.partitions, 0);
+            List<String> partitionedTopics = 
admin1.topics().getPartitionedTopicList("public/default");
+            assertFalse(partitionedTopics.contains(topicNameStr));
+            verifyPartitionsNeverCreated(topicNameStr);
 
-        // Verify: lookup semaphore has been releases.
-        Awaitility.await().untilAsserted(() -> {
-            int lookupPermitsAfter = semaphore.availablePermits();
-            assertEquals(lookupPermitsAfter, lookupPermitsBefore);
-        });
+            // Verify: lookup semaphore has been releases.
+            Awaitility.await().untilAsserted(() -> {
+                assertEquals(getLookupRequestPermits(), lookupPermitsBefore);
+            });
+        }
 
         // Cleanup.
-        client.close();
-        admin.topics().delete(topicNameStr, false);
+        admin1.topics().delete(topicNameStr, false);
     }
 
     @Test(dataProvider = "autoCreationParamsAll")
@@ -204,36 +286,30 @@ public class GetPartitionMetadataTest extends 
ProducerConsumerBase {
                                                         boolean 
paramMetadataAutoCreationEnabled,
                                                         boolean 
isUsingHttpLookup,
                                                         TopicDomain 
topicDomain) throws Exception {
-        conf.setAllowAutoTopicCreationType(TopicType.PARTITIONED);
-        conf.setDefaultNumPartitions(3);
-        conf.setAllowAutoTopicCreation(configAllowAutoTopicCreation);
-        setup();
+        modifyTopicAutoCreation(configAllowAutoTopicCreation, 
TopicType.PARTITIONED, 3);
 
-        Semaphore semaphore = 
pulsar.getBrokerService().getLookupRequestSemaphore();
-        int lookupPermitsBefore = semaphore.availablePermits();
+        int lookupPermitsBefore = getLookupRequestPermits();
 
-        LookupService lookup = getLookupService(isUsingHttpLookup);
         // Create topic.
         final String topicNameStr = 
BrokerTestUtil.newUniqueName(topicDomain.value() + "://" + DEFAULT_NS + "/tp");
-        final TopicName topicName = TopicName.get(topicNameStr);
-        admin.topics().createPartitionedTopic(topicNameStr, 3);
-        // Verify.
-        PulsarClient client = 
PulsarClient.builder().serviceUrl(pulsar.getBrokerServiceUrl()).build();
-        PartitionedTopicMetadata response =
-                lookup.getPartitionedTopicMetadata(topicName, 
paramMetadataAutoCreationEnabled).join();
-        assertEquals(response.partitions, 3);
-        List<String> topicList = admin.topics().getList("public/default");
-        assertFalse(topicList.contains(topicNameStr));
-
-        // Verify: lookup semaphore has been releases.
-        Awaitility.await().untilAsserted(() -> {
-            int lookupPermitsAfter = semaphore.availablePermits();
-            assertEquals(lookupPermitsAfter, lookupPermitsBefore);
-        });
+        admin1.topics().createPartitionedTopic(topicNameStr, 3);
+
+        PulsarClientImpl[] clientArray = getClientsToTest(isUsingHttpLookup);
+        for (PulsarClientImpl client : clientArray) {
+            // Verify: the result of get partitioned topic metadata.
+            PartitionedTopicMetadata response =
+                    client.getPartitionedTopicMetadata(topicNameStr, 
paramMetadataAutoCreationEnabled).join();
+            assertEquals(response.partitions, 3);
+            verifyNonPartitionedTopicNeverCreated(topicNameStr);
+
+            // Verify: lookup semaphore has been releases.
+            Awaitility.await().untilAsserted(() -> {
+                assertEquals(getLookupRequestPermits(), lookupPermitsBefore);
+            });
+        }
 
         // Cleanup.
-        client.close();
-        admin.topics().deletePartitionedTopic(topicNameStr, false);
+        admin1.topics().deletePartitionedTopic(topicNameStr, false);
     }
 
     @DataProvider(name = "clients")
@@ -247,76 +323,96 @@ public class GetPartitionMetadataTest extends 
ProducerConsumerBase {
 
     @Test(dataProvider = "clients")
     public void testAutoCreatePartitionedTopic(boolean isUsingHttpLookup, 
TopicDomain topicDomain) throws Exception {
-        conf.setAllowAutoTopicCreationType(TopicType.PARTITIONED);
-        conf.setDefaultNumPartitions(3);
-        conf.setAllowAutoTopicCreation(true);
-        setup();
-
-        Semaphore semaphore = 
pulsar.getBrokerService().getLookupRequestSemaphore();
-        int lookupPermitsBefore = semaphore.availablePermits();
-
-        LookupService lookup = getLookupService(isUsingHttpLookup);
-        // Create topic.
-        final String topicNameStr = 
BrokerTestUtil.newUniqueName(topicDomain.value() + "://" + DEFAULT_NS + "/tp");
-        final TopicName topicName = TopicName.get(topicNameStr);
-        // Verify.
-        PulsarClient client = 
PulsarClient.builder().serviceUrl(pulsar.getBrokerServiceUrl()).build();
-        PartitionedTopicMetadata response = 
lookup.getPartitionedTopicMetadata(topicName, true).join();
-        assertEquals(response.partitions, 3);
-        List<String> partitionedTopics = 
admin.topics().getPartitionedTopicList("public/default");
-        assertTrue(partitionedTopics.contains(topicNameStr));
-        List<String> topicList = admin.topics().getList("public/default");
-        assertFalse(topicList.contains(topicNameStr));
-        for (int i = 0; i < 3; i++) {
+        modifyTopicAutoCreation(true, TopicType.PARTITIONED, 3);
+
+        int lookupPermitsBefore = getLookupRequestPermits();
+
+        PulsarClientImpl[] clientArray = getClientsToTest(isUsingHttpLookup);
+        for (PulsarClientImpl client : clientArray) {
+            // Case-1: normal topic.
+            final String topicNameStr = 
BrokerTestUtil.newUniqueName(topicDomain.value() + "://" + DEFAULT_NS + "/tp");
+            // Verify: the result of get partitioned topic metadata.
+            PartitionedTopicMetadata response = 
client.getPartitionedTopicMetadata(topicNameStr, true).join();
+            assertEquals(response.partitions, 3);
+            // Verify: the behavior of topic creation.
+            List<String> partitionedTopics = 
admin1.topics().getPartitionedTopicList("public/default");
+            assertTrue(partitionedTopics.contains(topicNameStr));
+            verifyNonPartitionedTopicNeverCreated(topicNameStr);
             // The API "getPartitionedTopicMetadata" only creates the 
partitioned metadata, it will not create the
             // partitions.
-            assertFalse(topicList.contains(topicName.getPartition(i)));
+            verifyPartitionsNeverCreated(topicNameStr);
+
+            // Case-2: topic with suffix "-partition-1".
+            final String topicNameStrWithSuffix = BrokerTestUtil.newUniqueName(
+                    topicDomain.value() + "://" + DEFAULT_NS + "/tp") + 
"-partition-1";
+            // Verify: the result of get partitioned topic metadata.
+            PartitionedTopicMetadata response2 =
+                    client.getPartitionedTopicMetadata(topicNameStrWithSuffix, 
true).join();
+            assertEquals(response2.partitions, 0);
+            // Verify: the behavior of topic creation.
+            List<String> partitionedTopics2 =
+                    admin1.topics().getPartitionedTopicList("public/default");
+            assertFalse(partitionedTopics2.contains(topicNameStrWithSuffix));
+            assertFalse(partitionedTopics2.contains(
+                    
TopicName.get(topicNameStrWithSuffix).getPartitionedTopicName()));
+
+            // Verify: lookup semaphore has been releases.
+            Awaitility.await().untilAsserted(() -> {
+                assertEquals(getLookupRequestPermits(), lookupPermitsBefore);
+            });
+            // Cleanup.
+            admin1.topics().deletePartitionedTopic(topicNameStr, false);
+            try {
+                admin1.topics().delete(topicNameStrWithSuffix, false);
+            } catch (Exception ex) {}
         }
 
-        // Verify: lookup semaphore has been releases.
-        Awaitility.await().untilAsserted(() -> {
-            int lookupPermitsAfter = semaphore.availablePermits();
-            assertEquals(lookupPermitsAfter, lookupPermitsBefore);
-        });
-
-        // Cleanup.
-        client.close();
-        admin.topics().deletePartitionedTopic(topicNameStr, false);
     }
 
     @Test(dataProvider = "clients")
     public void testAutoCreateNonPartitionedTopic(boolean isUsingHttpLookup, 
TopicDomain topicDomain) throws Exception {
-        conf.setAllowAutoTopicCreationType(TopicType.NON_PARTITIONED);
-        conf.setAllowAutoTopicCreation(true);
-        setup();
-
-        Semaphore semaphore = 
pulsar.getBrokerService().getLookupRequestSemaphore();
-        int lookupPermitsBefore = semaphore.availablePermits();
-
-        LookupService lookup = getLookupService(isUsingHttpLookup);
-        // Create topic.
-        final String topicNameStr = 
BrokerTestUtil.newUniqueName(topicDomain.value() + "://" + DEFAULT_NS + "/tp");
-        final TopicName topicName = TopicName.get(topicNameStr);
-        // Verify.
-        PulsarClient client = 
PulsarClient.builder().serviceUrl(pulsar.getBrokerServiceUrl()).build();
-        PartitionedTopicMetadata response = 
lookup.getPartitionedTopicMetadata(topicName, true).join();
-        assertEquals(response.partitions, 0);
-        List<String> partitionedTopics = 
admin.topics().getPartitionedTopicList("public/default");
-        assertFalse(partitionedTopics.contains(topicNameStr));
-        List<String> topicList = admin.topics().getList("public/default");
-        assertFalse(topicList.contains(topicNameStr));
-
-        // Verify: lookup semaphore has been releases.
-        Awaitility.await().untilAsserted(() -> {
-            int lookupPermitsAfter = semaphore.availablePermits();
-            assertEquals(lookupPermitsAfter, lookupPermitsBefore);
-        });
-
-        // Cleanup.
-        client.close();
-        try {
-            admin.topics().delete(topicNameStr, false);
-        } catch (Exception ex) {}
+        modifyTopicAutoCreation(true, TopicType.NON_PARTITIONED, 3);
+
+        int lookupPermitsBefore = getLookupRequestPermits();
+
+        PulsarClientImpl[] clientArray = getClientsToTest(isUsingHttpLookup);
+        for (PulsarClientImpl client : clientArray) {
+            // Case 1: normal topic.
+            final String topicNameStr = 
BrokerTestUtil.newUniqueName(topicDomain.value() + "://" + DEFAULT_NS + "/tp");
+            // Verify: the result of get partitioned topic metadata.
+            PartitionedTopicMetadata response = 
client.getPartitionedTopicMetadata(topicNameStr, true).join();
+            assertEquals(response.partitions, 0);
+            // Verify: the behavior of topic creation.
+            List<String> partitionedTopics = 
admin1.topics().getPartitionedTopicList("public/default");
+            assertFalse(partitionedTopics.contains(topicNameStr));
+            verifyPartitionsNeverCreated(topicNameStr);
+
+            // Case-2: topic with suffix "-partition-1".
+            final String topicNameStrWithSuffix = BrokerTestUtil.newUniqueName(
+                    topicDomain.value() + "://" + DEFAULT_NS + "/tp") + 
"-partition-1";
+            // Verify: the result of get partitioned topic metadata.
+            PartitionedTopicMetadata response2 =
+                    client.getPartitionedTopicMetadata(topicNameStrWithSuffix, 
true).join();
+            assertEquals(response2.partitions, 0);
+            // Verify: the behavior of topic creation.
+            List<String> partitionedTopics2 =
+                    admin1.topics().getPartitionedTopicList("public/default");
+            assertFalse(partitionedTopics2.contains(topicNameStrWithSuffix));
+            assertFalse(partitionedTopics2.contains(
+                    
TopicName.get(topicNameStrWithSuffix).getPartitionedTopicName()));
+
+            // Verify: lookup semaphore has been releases.
+            Awaitility.await().untilAsserted(() -> {
+                assertEquals(getLookupRequestPermits(), lookupPermitsBefore);
+            });
+            // Cleanup.
+            try {
+                admin1.topics().delete(topicNameStr, false);
+            } catch (Exception ex) {}
+            try {
+                admin1.topics().delete(topicNameStrWithSuffix, false);
+            } catch (Exception ex) {}
+        }
     }
 
     @DataProvider(name = "autoCreationParamsNotAllow")
@@ -336,64 +432,38 @@ public class GetPartitionMetadataTest extends 
ProducerConsumerBase {
     public void testGetMetadataIfNotAllowedCreate(boolean 
configAllowAutoTopicCreation,
                                                   boolean 
paramMetadataAutoCreationEnabled,
                                                   boolean isUsingHttpLookup) 
throws Exception {
-        if (!configAllowAutoTopicCreation && paramMetadataAutoCreationEnabled) 
{
-            // These test cases are for the following PR.
-            // Which was described in the Motivation of 
https://github.com/apache/pulsar/pull/22206.
-            return;
-        }
-        conf.setAllowAutoTopicCreationType(TopicType.PARTITIONED);
-        conf.setDefaultNumPartitions(3);
-        conf.setAllowAutoTopicCreation(configAllowAutoTopicCreation);
-        setup();
-
-        Semaphore semaphore = 
pulsar.getBrokerService().getLookupRequestSemaphore();
-        int lookupPermitsBefore = semaphore.availablePermits();
-
-        LookupService lookup = getLookupService(isUsingHttpLookup);
-        // Define topic.
-        final String topicNameStr = 
BrokerTestUtil.newUniqueName("persistent://" + DEFAULT_NS + "/tp");
-        final TopicName topicName = TopicName.get(topicNameStr);
-        // Verify.
-        PulsarClient client = 
PulsarClient.builder().serviceUrl(pulsar.getBrokerServiceUrl()).build();
-        try {
-            lookup.getPartitionedTopicMetadata(TopicName.get(topicNameStr), 
paramMetadataAutoCreationEnabled).join();
-            fail("Expect a not found exception");
-        } catch (Exception e) {
-            log.warn("", e);
-            Throwable unwrapEx = FutureUtil.unwrapCompletionException(e);
-            assertTrue(unwrapEx instanceof 
PulsarClientException.TopicDoesNotExistException
-                    || unwrapEx instanceof 
PulsarClientException.NotFoundException);
-        }
+        modifyTopicAutoCreation(configAllowAutoTopicCreation, 
TopicType.PARTITIONED, 3);
 
-        List<String> partitionedTopics = 
admin.topics().getPartitionedTopicList("public/default");
-        
pulsar.getPulsarResources().getNamespaceResources().getPartitionedTopicResources().partitionedTopicExists(topicName);
-        assertFalse(partitionedTopics.contains(topicNameStr));
-        List<String> topicList = admin.topics().getList("public/default");
-        assertFalse(topicList.contains(topicNameStr));
-        for (int i = 0; i < 3; i++) {
-            assertFalse(topicList.contains(topicName.getPartition(i)));
-        }
+        int lookupPermitsBefore = getLookupRequestPermits();
 
-        // Verify: lookup semaphore has been releases.
-        Awaitility.await().untilAsserted(() -> {
-            int lookupPermitsAfter = semaphore.availablePermits();
-            assertEquals(lookupPermitsAfter, lookupPermitsBefore);
-        });
-
-        // Cleanup.
-        client.close();
-    }
+        PulsarClientImpl[] clientArray = getClientsToTest(isUsingHttpLookup);
+        for (PulsarClientImpl client : clientArray) {
+            // Define topic.
+            final String topicNameStr = 
BrokerTestUtil.newUniqueName("persistent://" + DEFAULT_NS + "/tp");
+            final TopicName topicName = TopicName.get(topicNameStr);
+            // Verify: the result of get partitioned topic metadata.
+            try {
+                client.getPartitionedTopicMetadata(topicNameStr, 
paramMetadataAutoCreationEnabled)
+                        .join();
+                fail("Expect a not found exception");
+            } catch (Exception e) {
+                Throwable unwrapEx = FutureUtil.unwrapCompletionException(e);
+                assertTrue(unwrapEx instanceof 
PulsarClientException.TopicDoesNotExistException
+                        || unwrapEx instanceof 
PulsarClientException.NotFoundException);
+            }
+            // Verify: the behavior of topic creation.
+            List<String> partitionedTopics = 
admin1.topics().getPartitionedTopicList("public/default");
+            
pulsar1.getPulsarResources().getNamespaceResources().getPartitionedTopicResources()
+                    .partitionedTopicExists(topicName);
+            assertFalse(partitionedTopics.contains(topicNameStr));
+            verifyNonPartitionedTopicNeverCreated(topicNameStr);
+            verifyPartitionsNeverCreated(topicNameStr);
 
-    @DataProvider(name = "autoCreationParamsForNonPersistentTopic")
-    public Object[][] autoCreationParamsForNonPersistentTopic(){
-        return new Object[][]{
-                // configAllowAutoTopicCreation, 
paramCreateIfAutoCreationEnabled, isUsingHttpLookup.
-                {true, true, true},
-                {true, true, false},
-                {false, true, true},
-                {false, true, false},
-                {false, false, true}
-        };
+            // Verify: lookup semaphore has been releases.
+            Awaitility.await().untilAsserted(() -> {
+                assertEquals(getLookupRequestPermits(), lookupPermitsBefore);
+            });
+        }
     }
 
     /**
@@ -408,66 +478,46 @@ public class GetPartitionMetadataTest extends 
ProducerConsumerBase {
      *   param-auto-create = false
      *     HTTP API: not found error
      *     binary API: not support
-     *  This test only guarantees that the behavior is the same as before. The 
following separated PR will fix the
-     *  incorrect behavior.
+     * After PIP-344, the behavior will be the same as persistent topics, 
which was described in PIP-344.
      */
-    @Test(dataProvider = "autoCreationParamsForNonPersistentTopic")
-    public void testGetNonPersistentMetadataIfNotAllowedCreate(boolean 
configAllowAutoTopicCreation,
+    @Test(dataProvider = "autoCreationParamsNotAllow")
+    public void testGetMetadataIfNotAllowedCreateOfNonPersistentTopic(boolean 
configAllowAutoTopicCreation,
                                                   boolean 
paramMetadataAutoCreationEnabled,
                                                   boolean isUsingHttpLookup) 
throws Exception {
-        conf.setAllowAutoTopicCreationType(TopicType.PARTITIONED);
-        conf.setDefaultNumPartitions(3);
-        conf.setAllowAutoTopicCreation(configAllowAutoTopicCreation);
-        setup();
-
-        Semaphore semaphore = 
pulsar.getBrokerService().getLookupRequestSemaphore();
-        int lookupPermitsBefore = semaphore.availablePermits();
-
-        LookupService lookup = getLookupService(isUsingHttpLookup);
-        // Define topic.
-        final String topicNameStr = 
BrokerTestUtil.newUniqueName("non-persistent://" + DEFAULT_NS + "/tp");
-        final TopicName topicName = TopicName.get(topicNameStr);
-        // Verify.
-        // Regarding non-persistent topic, we do not know whether it exists or 
not.
-        // Broker will return a non-partitioned metadata if partitioned 
metadata does not exist.
-        PulsarClient client = 
PulsarClient.builder().serviceUrl(pulsar.getBrokerServiceUrl()).build();
-
-        if (!configAllowAutoTopicCreation && !paramMetadataAutoCreationEnabled 
&& isUsingHttpLookup) {
+        modifyTopicAutoCreation(configAllowAutoTopicCreation, 
TopicType.PARTITIONED, 3);
+
+        int lookupPermitsBefore = getLookupRequestPermits();
+
+        PulsarClientImpl[] clientArray = getClientsToTest(isUsingHttpLookup);
+        for (PulsarClientImpl client : clientArray) {
+            // Define topic.
+            final String topicNameStr = 
BrokerTestUtil.newUniqueName("non-persistent://" + DEFAULT_NS + "/tp");
+            final TopicName topicName = TopicName.get(topicNameStr);
+            // Verify: the result of get partitioned topic metadata.
             try {
-                
lookup.getPartitionedTopicMetadata(TopicName.get(topicNameStr), 
paramMetadataAutoCreationEnabled)
+                PartitionedTopicMetadata topicMetadata = client
+                        .getPartitionedTopicMetadata(topicNameStr, 
paramMetadataAutoCreationEnabled)
                         .join();
-                Assert.fail("Expected a not found ex");
+                log.info("Get topic metadata: {}", topicMetadata.partitions);
+                fail("Expected a not found ex");
             } catch (Exception ex) {
-                // Cleanup.
-                client.close();
-                return;
+                Throwable unwrapEx = FutureUtil.unwrapCompletionException(ex);
+                assertTrue(unwrapEx instanceof 
PulsarClientException.TopicDoesNotExistException
+                        || unwrapEx instanceof 
PulsarClientException.NotFoundException);
             }
-        }
 
-        PartitionedTopicMetadata metadata = lookup
-                .getPartitionedTopicMetadata(TopicName.get(topicNameStr), 
paramMetadataAutoCreationEnabled).join();
-        if (configAllowAutoTopicCreation && paramMetadataAutoCreationEnabled) {
-            assertEquals(metadata.partitions, 3);
-        } else {
-            assertEquals(metadata.partitions, 0);
-        }
-
-        List<String> partitionedTopics = 
admin.topics().getPartitionedTopicList("public/default");
-        
pulsar.getPulsarResources().getNamespaceResources().getPartitionedTopicResources()
-                .partitionedTopicExists(topicName);
-        if (configAllowAutoTopicCreation && paramMetadataAutoCreationEnabled) {
-            assertTrue(partitionedTopics.contains(topicNameStr));
-        } else {
+            // Verify: the behavior of topic creation.
+            List<String> partitionedTopics = 
admin1.topics().getPartitionedTopicList("public/default");
+            
pulsar1.getPulsarResources().getNamespaceResources().getPartitionedTopicResources()
+                    .partitionedTopicExists(topicName);
             assertFalse(partitionedTopics.contains(topicNameStr));
+            verifyNonPartitionedTopicNeverCreated(topicNameStr);
+            verifyPartitionsNeverCreated(topicNameStr);
         }
 
         // Verify: lookup semaphore has been releases.
         Awaitility.await().untilAsserted(() -> {
-            int lookupPermitsAfter = semaphore.availablePermits();
-            assertEquals(lookupPermitsAfter, lookupPermitsBefore);
+            assertEquals(getLookupRequestPermits(), lookupPermitsBefore);
         });
-
-        // Cleanup.
-        client.close();
     }
 }
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TopicsTest.java 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TopicsTest.java
index 9aa29f08c5c..c9457e1a888 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TopicsTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TopicsTest.java
@@ -56,6 +56,7 @@ import org.apache.pulsar.broker.PulsarService;
 import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest;
 import org.apache.pulsar.broker.authentication.AuthenticationDataHttps;
 import org.apache.pulsar.broker.namespace.NamespaceService;
+import org.apache.pulsar.broker.namespace.TopicExistsInfo;
 import org.apache.pulsar.broker.rest.Topics;
 import org.apache.pulsar.broker.service.BrokerService;
 import org.apache.pulsar.broker.service.BrokerServiceException;
@@ -357,9 +358,12 @@ public class TopicsTest extends 
MockedPulsarServiceBaseTest {
         CompletableFuture future = new CompletableFuture();
         future.completeExceptionally(new BrokerServiceException("Fake 
Exception"));
         CompletableFuture existFuture = new CompletableFuture();
-        existFuture.complete(true);
+        existFuture.complete(TopicExistsInfo.newNonPartitionedTopicExists());
         
doReturn(future).when(nameSpaceService).getBrokerServiceUrlAsync(any(), any());
         doReturn(existFuture).when(nameSpaceService).checkTopicExists(any());
+        CompletableFuture existBooleanFuture = new CompletableFuture();
+        existBooleanFuture.complete(false);
+        
doReturn(existBooleanFuture).when(nameSpaceService).checkNonPartitionedTopicExists(any());
         doReturn(nameSpaceService).when(pulsar).getNamespaceService();
         AsyncResponse asyncResponse = mock(AsyncResponse.class);
         ProducerMessages producerMessages = new ProducerMessages();
@@ -370,7 +374,7 @@ public class TopicsTest extends MockedPulsarServiceBaseTest 
{
         topics.produceOnPersistentTopic(asyncResponse, testTenant, 
testNamespace, testTopicName, false, producerMessages);
         ArgumentCaptor<RestException> responseCaptor = 
ArgumentCaptor.forClass(RestException.class);
         verify(asyncResponse, 
timeout(5000).times(1)).resume(responseCaptor.capture());
-        Assert.assertEquals(responseCaptor.getValue().getMessage(), "Can't 
find owner of given topic.");
+        
Assert.assertTrue(responseCaptor.getValue().getMessage().contains(topicName + " 
not found"));
     }
 
     @Test
@@ -378,8 +382,11 @@ public class TopicsTest extends 
MockedPulsarServiceBaseTest {
         String topicName = "persistent://" + testTenant + "/" + testNamespace 
+ "/" + testTopicName;
         NamespaceService nameSpaceService = mock(NamespaceService.class);
         CompletableFuture existFuture = new CompletableFuture();
-        existFuture.complete(false);
+        existFuture.complete(TopicExistsInfo.newTopicNotExists());
+        CompletableFuture existBooleanFuture = new CompletableFuture();
+        existBooleanFuture.complete(false);
         doReturn(existFuture).when(nameSpaceService).checkTopicExists(any());
+        
doReturn(existBooleanFuture).when(nameSpaceService).checkNonPartitionedTopicExists(any());
         doReturn(nameSpaceService).when(pulsar).getNamespaceService();
         AsyncResponse asyncResponse = mock(AsyncResponse.class);
         ProducerMessages producerMessages = new ProducerMessages();
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/lookup/http/HttpTopicLookupv2Test.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/lookup/http/HttpTopicLookupv2Test.java
index 7004eae29b5..ab492de055b 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/lookup/http/HttpTopicLookupv2Test.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/lookup/http/HttpTopicLookupv2Test.java
@@ -44,6 +44,7 @@ import org.apache.pulsar.broker.lookup.NamespaceData;
 import org.apache.pulsar.broker.lookup.RedirectData;
 import org.apache.pulsar.broker.lookup.v1.TopicLookup;
 import org.apache.pulsar.broker.namespace.NamespaceService;
+import org.apache.pulsar.broker.namespace.TopicExistsInfo;
 import org.apache.pulsar.broker.resources.ClusterResources;
 import org.apache.pulsar.broker.resources.NamespaceResources;
 import org.apache.pulsar.broker.resources.PulsarResources;
@@ -149,9 +150,12 @@ public class HttpTopicLookupv2Test {
         config.setAuthorizationEnabled(true);
 
         NamespaceService namespaceService = pulsar.getNamespaceService();
-        CompletableFuture<Boolean> future = new CompletableFuture<>();
-        future.complete(false);
+        CompletableFuture<TopicExistsInfo> future = new CompletableFuture<>();
+        future.complete(TopicExistsInfo.newTopicNotExists());
         
doReturn(future).when(namespaceService).checkTopicExists(any(TopicName.class));
+        CompletableFuture<Boolean> booleanFuture = new CompletableFuture<>();
+        booleanFuture.complete(false);
+        
doReturn(booleanFuture).when(namespaceService).checkNonPartitionedTopicExists(any(TopicName.class));
 
         AsyncResponse asyncResponse1 = mock(AsyncResponse.class);
         destLookup.lookupTopicAsync(asyncResponse1, 
TopicDomain.persistent.value(), "myprop", "usc", "ns2", "topic_not_exist", 
false, null, null);
@@ -260,9 +264,12 @@ public class HttpTopicLookupv2Test {
         policies3Future.complete(Optional.of(policies3));
         
doReturn(policies3Future).when(namespaceResources).getPoliciesAsync(namespaceName2);
         NamespaceService namespaceService = pulsar.getNamespaceService();
-        CompletableFuture<Boolean> future = new CompletableFuture<>();
-        future.complete(false);
+        CompletableFuture<TopicExistsInfo> future = new CompletableFuture<>();
+        future.complete(TopicExistsInfo.newTopicNotExists());
         
doReturn(future).when(namespaceService).checkTopicExists(any(TopicName.class));
+        CompletableFuture<Boolean> booleanFuture = new CompletableFuture<>();
+        booleanFuture.complete(false);
+        
doReturn(future).when(namespaceService).checkNonPartitionedTopicExists(any(TopicName.class));
         destLookup.lookupTopicAsync(asyncResponse, 
TopicDomain.persistent.value(), property, cluster, ns2,
                 "invalid-localCluster", false, null, null);
         verify(asyncResponse).resume(arg.capture());
@@ -294,8 +301,8 @@ public class HttpTopicLookupv2Test {
         doReturn(uri).when(uriInfo).getRequestUri();
         config.setAuthorizationEnabled(true);
         NamespaceService namespaceService = pulsar.getNamespaceService();
-        CompletableFuture<Boolean> future = new CompletableFuture<>();
-        future.complete(false);
+        CompletableFuture<TopicExistsInfo> future = new CompletableFuture<>();
+        future.complete(TopicExistsInfo.newTopicNotExists());
         
doReturn(future).when(namespaceService).checkTopicExists(any(TopicName.class));
 
         // Get the current semaphore first
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/namespace/NamespaceServiceTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/namespace/NamespaceServiceTest.java
index a0313ef7436..0b0d38a071e 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/namespace/NamespaceServiceTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/namespace/NamespaceServiceTest.java
@@ -815,14 +815,15 @@ public class NamespaceServiceTest extends BrokerTestBase {
         String topic = topicDomain + "://prop/ns-abc/" + UUID.randomUUID();
         admin.topics().createNonPartitionedTopic(topic);
         Awaitility.await().untilAsserted(() -> {
-            
assertTrue(pulsar.getNamespaceService().checkTopicExists(TopicName.get(topic)).get());
+            
assertTrue(pulsar.getNamespaceService().checkTopicExists(TopicName.get(topic)).get().isExists());
         });
 
         String partitionedTopic = topicDomain + "://prop/ns-abc/" + 
UUID.randomUUID();
         admin.topics().createPartitionedTopic(partitionedTopic, 5);
         Awaitility.await().untilAsserted(() -> {
-            
assertTrue(pulsar.getNamespaceService().checkTopicExists(TopicName.get(partitionedTopic)).get());
-            
assertTrue(pulsar.getNamespaceService().checkTopicExists(TopicName.get(partitionedTopic
 + "-partition-2")).get());
+            
assertTrue(pulsar.getNamespaceService().checkTopicExists(TopicName.get(partitionedTopic)).get().isExists());
+            assertTrue(pulsar.getNamespaceService()
+                    .checkTopicExists(TopicName.get(partitionedTopic + 
"-partition-2")).get().isExists());
         });
     }
 
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/TopicGCTest.java 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/TopicGCTest.java
index 7790940c132..8fdf0723ea8 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/TopicGCTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/TopicGCTest.java
@@ -18,6 +18,7 @@
  */
 package org.apache.pulsar.broker.service;
 
+import static org.testng.Assert.assertNotNull;
 import static org.testng.Assert.assertTrue;
 import java.util.Optional;
 import java.util.concurrent.CompletableFuture;
@@ -99,6 +100,7 @@ public class TopicGCTest extends ProducerConsumerBase {
         Consumer<String> consumerAllPartition = 
pulsarClient.newConsumer(Schema.STRING).topic(topic)
                 
.subscriptionName(subscription).isAckReceiptEnabled(true).subscribe();
         Message<String> msg = consumerAllPartition.receive(2, 
TimeUnit.SECONDS);
+        assertNotNull(msg);
         String receivedMsgValue = msg.getValue();
         log.info("received msg: {}", receivedMsgValue);
         consumerAllPartition.acknowledge(msg);
diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBuilderImpl.java
 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBuilderImpl.java
index 7735f66e783..4d6cf96a010 100644
--- 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBuilderImpl.java
+++ 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBuilderImpl.java
@@ -32,6 +32,7 @@ import lombok.AccessLevel;
 import lombok.Getter;
 import lombok.NonNull;
 import org.apache.commons.lang3.StringUtils;
+import org.apache.pulsar.client.admin.PulsarAdminException;
 import org.apache.pulsar.client.api.BatchReceivePolicy;
 import org.apache.pulsar.client.api.Consumer;
 import org.apache.pulsar.client.api.ConsumerBuilder;
@@ -58,7 +59,6 @@ import 
org.apache.pulsar.client.impl.conf.ConsumerConfigurationData;
 import org.apache.pulsar.client.impl.conf.TopicConsumerConfigurationData;
 import org.apache.pulsar.client.util.RetryMessageUtil;
 import org.apache.pulsar.common.naming.TopicName;
-import org.apache.pulsar.common.partition.PartitionedTopicMetadata;
 import org.apache.pulsar.common.util.FutureUtil;
 
 @Getter(AccessLevel.PUBLIC)
@@ -104,6 +104,31 @@ public class ConsumerBuilderImpl<T> implements 
ConsumerBuilder<T> {
         }
     }
 
+    private CompletableFuture<Boolean> checkDlqAlreadyExists(String topic) {
+        CompletableFuture<Boolean> existsFuture = new CompletableFuture<>();
+        client.getPartitionedTopicMetadata(topic, false).thenAccept(metadata 
-> {
+            TopicName topicName = TopicName.get(topic);
+            if (topicName.isPersistent()) {
+                // Either partitioned or non-partitioned, it exists.
+                existsFuture.complete(true);
+            } else {
+                // If it is a non-persistent topic, return true only it is a 
partitioned topic.
+                existsFuture.complete(metadata != null && metadata.partitions 
> 0);
+            }
+        }).exceptionally(ex -> {
+            Throwable actEx = FutureUtil.unwrapCompletionException(ex);
+            if (actEx instanceof PulsarClientException.NotFoundException
+                    || actEx instanceof 
PulsarClientException.TopicDoesNotExistException
+                    || actEx instanceof 
PulsarAdminException.NotFoundException) {
+                existsFuture.complete(false);
+            } else {
+                existsFuture.completeExceptionally(ex);
+            }
+            return null;
+        });
+        return existsFuture;
+    }
+
     @Override
     public CompletableFuture<Consumer<T>> subscribeAsync() {
         if (conf.getTopicNames().isEmpty() && conf.getTopicsPattern() == null) 
{
@@ -135,20 +160,18 @@ public class ConsumerBuilderImpl<T> implements 
ConsumerBuilder<T> {
             DeadLetterPolicy deadLetterPolicy = conf.getDeadLetterPolicy();
             if (deadLetterPolicy == null || 
StringUtils.isBlank(deadLetterPolicy.getRetryLetterTopic())
                     || 
StringUtils.isBlank(deadLetterPolicy.getDeadLetterTopic())) {
-                CompletableFuture<PartitionedTopicMetadata> 
retryLetterTopicMetadata =
-                        
client.getPartitionedTopicMetadata(oldRetryLetterTopic, true);
-                CompletableFuture<PartitionedTopicMetadata> 
deadLetterTopicMetadata =
-                        client.getPartitionedTopicMetadata(oldDeadLetterTopic, 
true);
+                CompletableFuture<Boolean> retryLetterTopicMetadata = 
checkDlqAlreadyExists(oldRetryLetterTopic);
+                CompletableFuture<Boolean> deadLetterTopicMetadata = 
checkDlqAlreadyExists(oldDeadLetterTopic);
                 applyDLQConfig = 
CompletableFuture.allOf(retryLetterTopicMetadata, deadLetterTopicMetadata)
                         .thenAccept(__ -> {
                             String retryLetterTopic = topicFirst + "-" + 
conf.getSubscriptionName()
                                     + 
RetryMessageUtil.RETRY_GROUP_TOPIC_SUFFIX;
                             String deadLetterTopic = topicFirst + "-" + 
conf.getSubscriptionName()
                                     + RetryMessageUtil.DLQ_GROUP_TOPIC_SUFFIX;
-                            if (retryLetterTopicMetadata.join().partitions > 
0) {
+                            if (retryLetterTopicMetadata.join()) {
                                 retryLetterTopic = oldRetryLetterTopic;
                             }
-                            if (deadLetterTopicMetadata.join().partitions > 0) 
{
+                            if (deadLetterTopicMetadata.join()) {
                                 deadLetterTopic = oldDeadLetterTopic;
                             }
                             if (deadLetterPolicy == null) {

Reply via email to