This is an automated email from the ASF dual-hosted git repository.

zixuan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 7f0429c1cd8 [fix][broker] Add topic consistency check (#24118)
7f0429c1cd8 is described below

commit 7f0429c1cd81fbde6e0213a84e42f1ea55b5f7d2
Author: Zixuan Liu <node...@gmail.com>
AuthorDate: Wed Mar 26 18:01:20 2025 +0800

    [fix][broker] Add topic consistency check (#24118)
    
    Signed-off-by: Zixuan Liu <node...@gmail.com>
---
 .../pulsar/broker/namespace/NamespaceService.java  |  35 +++--
 .../pulsar/broker/service/BrokerService.java       | 150 +++++++++++----------
 .../apache/pulsar/broker/admin/AdminApiTest.java   |   6 +-
 .../pulsar/broker/admin/TopicAutoCreationTest.java |  20 +--
 .../broker/protocol/PulsarClientBasedHandler.java  |   7 +-
 .../broker/service/ExclusiveProducerTest.java      |  16 ++-
 .../pulsar/broker/service/PersistentTopicTest.java |   8 +-
 .../pulsar/broker/service/ReplicatorTest.java      |   6 +-
 .../nonpersistent/NonPersistentTopicTest.java      |  20 ++-
 .../service/persistent/PersistentTopicTest.java    |  35 +----
 .../pulsar/client/api/ConsumerCreationTest.java    | 127 +++++++++++++++++
 .../pulsar/client/api/ProducerCreationTest.java    |  73 ++++++++++
 .../pulsar/client/impl/LookupServiceTest.java      |  82 +++++++++++
 .../pulsar/client/api/PulsarClientException.java   |   4 +-
 14 files changed, 433 insertions(+), 156 deletions(-)

diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java
index b2ee299bb03..8f5cef1bdff 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java
@@ -1412,17 +1412,26 @@ public class NamespaceService implements AutoCloseable {
      * Check topic exists( partitioned or non-partitioned ).
      */
     public CompletableFuture<TopicExistsInfo> checkTopicExists(TopicName 
topic) {
-        return pulsar.getBrokerService()
-            
.fetchPartitionedTopicMetadataAsync(TopicName.get(topic.toString()))
-            .thenCompose(metadata -> {
-                if (metadata.partitions > 0) {
-                    return CompletableFuture.completedFuture(
-                            
TopicExistsInfo.newPartitionedTopicExists(metadata.partitions));
-                }
-                return checkNonPartitionedTopicExists(topic)
-                    .thenApply(b -> b ? 
TopicExistsInfo.newNonPartitionedTopicExists()
-                            : TopicExistsInfo.newTopicNotExists());
-            });
+        // For non-persistent/persistent partitioned topic, which has metadata.
+        return pulsar.getBrokerService().fetchPartitionedTopicMetadataAsync(
+                        topic.isPartitioned() ? 
TopicName.get(topic.getPartitionedTopicName()) : topic)
+                .thenCompose(metadata -> {
+                    if (metadata.partitions > 0) {
+                        if (!topic.isPartitioned()) {
+                            return CompletableFuture.completedFuture(
+                                    
TopicExistsInfo.newPartitionedTopicExists(metadata.partitions));
+                        } else {
+                            if (topic.getPartitionIndex() < 
metadata.partitions) {
+                                return CompletableFuture.completedFuture(
+                                        
TopicExistsInfo.newNonPartitionedTopicExists());
+                            }
+                        }
+                    }
+                    // Direct query the single topic.
+                    return checkNonPartitionedTopicExists(topic).thenApply(
+                            b -> b ? 
TopicExistsInfo.newNonPartitionedTopicExists() :
+                                    TopicExistsInfo.newTopicNotExists());
+                });
     }
 
     /***
@@ -1443,12 +1452,12 @@ public class NamespaceService implements AutoCloseable {
      */
     public CompletableFuture<Boolean> 
checkNonPersistentNonPartitionedTopicExists(String topic) {
         TopicName topicName = TopicName.get(topic);
-        // "non-partitioned & non-persistent" topics only exist on the owner 
broker.
+        // "non-partitioned & non-persistent" topics only exist on the cache 
of the owner broker.
         return checkTopicOwnership(TopicName.get(topic)).thenCompose(isOwned 
-> {
             // The current broker is the owner.
             if (isOwned) {
                CompletableFuture<Optional<Topic>> nonPersistentTopicFuture = 
pulsar.getBrokerService()
-                       .getTopic(topic, false);
+                       .getTopics().get(topic);
                if (nonPersistentTopicFuture != null) {
                    return 
nonPersistentTopicFuture.thenApply(Optional::isPresent);
                } else {
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
index 107bb01ffa0..dd3486496e9 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
@@ -1061,6 +1061,70 @@ public class BrokerService implements Closeable {
         return getTopic(TopicName.get(topic), createIfMissing, properties);
     }
 
+    /**
+     * Validates that the topic is consistent with its partition metadata.
+     *
+     * This method ensures the topic (partitioned or non-partitioned) correctly
+     * matches the actual partitions in the metadata. Inconsistencies typically
+     * indicate configuration issues or metadata synchronization problems.
+     *
+     * This validation is particularly important in geo-replicated 
environments where
+     * topic metadata may not be fully synchronized across all regions, 
potentially
+     * leading to access errors if not properly handled.
+     *
+     * @param topicName The topic name to validate
+     * @return CompletableFuture that completes normally if validation passes, 
or
+     * completes exceptionally with NotAllowedException if validation fails
+     */
+    private CompletableFuture<Void> validateTopicConsistency(TopicName 
topicName) {
+        if 
(NamespaceService.isHeartbeatNamespace(topicName.getNamespaceObject())) {
+            // Skip validation for heartbeat namespace.
+            return CompletableFuture.completedFuture(null);
+        }
+        TopicName baseTopicName =
+                topicName.isPartitioned() ? 
TopicName.get(topicName.getPartitionedTopicName()) : topicName;
+        return fetchPartitionedTopicMetadataAsync(baseTopicName)
+                .thenCompose(metadata -> {
+                    if (topicName.isPartitioned()) {
+                        if (metadata.partitions == 0) {
+                            // Edge case: When a complete partitioned topic 
name is provided but metadata shows 0
+                            // partitions.
+                            // This indicates that the partitioned topic 
metadata doesn't exist.
+                            //
+                            // Resolution options:
+                            // 1. Creates the partitioned topic via admin API.
+                            // 2. Uses the base topic name and then rely on 
auto-creation the partitioned topic if
+                            // enabled.
+                            return FutureUtil.failedFuture(
+                                    new 
BrokerServiceException.NotAllowedException(
+                                            "Partition metadata not found for 
the partitioned topic: " + topicName));
+                        }
+                        if (topicName.getPartitionIndex() >= 
metadata.partitions) {
+                            final String errorMsg =
+                                    String.format(
+                                            "Illegal topic partition name %s 
with max allowed "
+                                                    + "%d partitions", 
topicName,
+                                            metadata.partitions);
+                            log.warn(errorMsg);
+                            return FutureUtil.failedFuture(
+                                    new 
BrokerServiceException.NotAllowedException(errorMsg));
+                        }
+                    } else if (metadata.partitions > 0) {
+                        // Edge case: Non-partitioned topic name was provided, 
but metadata indicates this is
+                        // actually a partitioned
+                        // topic (partitions > 0).
+                        //
+                        // Resolution: Must use the complete partitioned topic 
name('topic-name-partition-N').
+                        //
+                        // This ensures proper routing to the specific 
partition and prevents ambiguity in topic
+                        // addressing.
+                        return FutureUtil.failedFuture(new 
BrokerServiceException.NotAllowedException(
+                                "Found partitioned metadata for 
non-partitioned topic: " + topicName));
+                    }
+                    return CompletableFuture.completedFuture(null);
+                });
+    }
+
     /**
      * Retrieves or creates a topic based on the specified parameters.
      * 0. If disable PersistentTopics or NonPersistentTopics, it will return a 
failed future with NotAllowedException.
@@ -1107,30 +1171,9 @@ public class BrokerService implements Closeable {
                         throw FutureUtil.wrapToCompletionException(new 
ServiceUnitNotReadyException(errorInfo));
                     }).thenCompose(optionalTopicPolicies -> {
                         final TopicPolicies topicPolicies = 
optionalTopicPolicies.orElse(null);
-                        if (topicName.isPartitioned()) {
-                            final TopicName topicNameEntity = 
TopicName.get(topicName.getPartitionedTopicName());
-                            return 
fetchPartitionedTopicMetadataAsync(topicNameEntity)
-                                    .thenCompose((metadata) -> {
-                                        // Allow creating non-partitioned 
persistent topic that name includes
-                                        // `partition`
-                                        if (metadata.partitions == 0
-                                                || 
topicName.getPartitionIndex() < metadata.partitions) {
-                                            return 
topics.computeIfAbsent(topicName.toString(), (tpName) ->
-                                                    
loadOrCreatePersistentTopic(tpName,
-                                                            createIfMissing, 
properties, topicPolicies));
-                                        } else {
-                                            final String errorMsg =
-                                                    String.format("Illegal 
topic partition name %s with max allowed "
-                                                            + "%d partitions", 
topicName, metadata.partitions);
-                                            log.warn(errorMsg);
-                                            return FutureUtil.failedFuture(
-                                                    new 
BrokerServiceException.NotAllowedException(errorMsg));
-                                        }
-                                    });
-                        } else {
-                            return 
topics.computeIfAbsent(topicName.toString(), (tpName) ->
-                                    loadOrCreatePersistentTopic(tpName, 
createIfMissing, properties, topicPolicies));
-                        }
+                        return topics.computeIfAbsent(topicName.toString(),
+                                (tpName) -> 
loadOrCreatePersistentTopic(tpName, createIfMissing, properties,
+                                        topicPolicies));
                     });
                 });
             } else {
@@ -1144,29 +1187,10 @@ public class BrokerService implements Closeable {
                 if (!topics.containsKey(topicName.toString())) {
                     topicEventsDispatcher.notify(topicName.toString(), 
TopicEvent.LOAD, EventStage.BEFORE);
                 }
-                if (topicName.isPartitioned()) {
-                    final TopicName partitionedTopicName = 
TopicName.get(topicName.getPartitionedTopicName());
-                    return 
this.fetchPartitionedTopicMetadataAsync(partitionedTopicName).thenCompose((metadata)
 -> {
-                        if (topicName.getPartitionIndex() < 
metadata.partitions) {
-                            return 
topics.computeIfAbsent(topicName.toString(), (name) -> {
-                                topicEventsDispatcher
-                                        .notify(topicName.toString(), 
TopicEvent.CREATE, EventStage.BEFORE);
-
-                                CompletableFuture<Optional<Topic>> res = 
createNonPersistentTopic(name);
-
-                                CompletableFuture<Optional<Topic>> eventFuture 
= topicEventsDispatcher
-                                        .notifyOnCompletion(res, 
topicName.toString(), TopicEvent.CREATE);
-                                topicEventsDispatcher
-                                        .notifyOnCompletion(eventFuture, 
topicName.toString(), TopicEvent.LOAD);
-                                return res;
-                            });
-                        }
-                        topicEventsDispatcher.notify(topicName.toString(), 
TopicEvent.LOAD, EventStage.FAILURE);
-                        return 
CompletableFuture.completedFuture(Optional.empty());
-                    });
-                } else if (createIfMissing) {
+                if (topicName.isPartitioned() || createIfMissing) {
                     return topics.computeIfAbsent(topicName.toString(), (name) 
-> {
-                        topicEventsDispatcher.notify(topicName.toString(), 
TopicEvent.CREATE, EventStage.BEFORE);
+                        topicEventsDispatcher
+                                .notify(topicName.toString(), 
TopicEvent.CREATE, EventStage.BEFORE);
 
                         CompletableFuture<Optional<Topic>> res = 
createNonPersistentTopic(name);
 
@@ -1176,14 +1200,13 @@ public class BrokerService implements Closeable {
                                 .notifyOnCompletion(eventFuture, 
topicName.toString(), TopicEvent.LOAD);
                         return res;
                     });
-                } else {
-                    CompletableFuture<Optional<Topic>> topicFuture = 
topics.get(topicName.toString());
-                    if (topicFuture == null) {
-                        topicEventsDispatcher.notify(topicName.toString(), 
TopicEvent.LOAD, EventStage.FAILURE);
-                        topicFuture = 
CompletableFuture.completedFuture(Optional.empty());
-                    }
-                    return topicFuture;
                 }
+                CompletableFuture<Optional<Topic>> topicFuture = 
topics.get(topicName.toString());
+                if (topicFuture == null) {
+                    topicEventsDispatcher.notify(topicName.toString(), 
TopicEvent.LOAD, EventStage.FAILURE);
+                    topicFuture = 
CompletableFuture.completedFuture(Optional.empty());
+                }
+                return topicFuture;
             }
         } catch (IllegalArgumentException e) {
             log.warn("[{}] Illegalargument exception when loading topic", 
topicName, e);
@@ -1357,8 +1380,9 @@ public class BrokerService implements Closeable {
             topicFuture.completeExceptionally(e);
             return topicFuture;
         }
-        CompletableFuture<Void> isOwner = checkTopicNsOwnership(topic);
-        isOwner.thenRun(() -> {
+        checkTopicNsOwnership(topic)
+                .thenCompose((__) -> 
validateTopicConsistency(TopicName.get(topic)))
+                .thenRun(() -> {
             nonPersistentTopic.initialize()
                     .thenCompose(__ -> nonPersistentTopic.checkReplication())
                     .thenRun(() -> {
@@ -1375,17 +1399,7 @@ public class BrokerService implements Closeable {
                 return null;
             });
         }).exceptionally(e -> {
-            log.warn("CheckTopicNsOwnership fail when 
createNonPersistentTopic! {}", topic, e.getCause());
-            // CheckTopicNsOwnership fail dont create nonPersistentTopic, when 
topic do lookup will find the correct
-            // broker. When client get non-persistent-partitioned topic
-            // metadata will the non-persistent-topic will be created.
-            // so we should add checkTopicNsOwnership logic otherwise the 
topic will be created
-            // if it dont own by this broker,we should return success
-            // otherwise it will keep retrying getPartitionedTopicMetadata
-            topicFuture.complete(Optional.of(nonPersistentTopic));
-            // after get metadata return success, we should delete this topic 
from this broker, because this topic not
-            // owner by this broker and it don't initialize and 
checkReplication
-            pulsar.getExecutor().execute(() -> topics.remove(topic, 
topicFuture));
+            
topicFuture.completeExceptionally(FutureUtil.unwrapCompletionException(e));
             return null;
         });
 
@@ -1772,8 +1786,8 @@ public class BrokerService implements Closeable {
                 : CompletableFuture.completedFuture(null);
 
         CompletableFuture<Void> isTopicAlreadyMigrated = 
checkTopicAlreadyMigrated(topicName);
-
-        maxTopicsCheck.thenCompose(__ -> isTopicAlreadyMigrated)
+        maxTopicsCheck.thenCompose(partitionedTopicMetadata -> 
validateTopicConsistency(topicName))
+                .thenCompose(__ -> isTopicAlreadyMigrated)
                 .thenCompose(__ -> getManagedLedgerConfig(topicName, 
topicPolicies))
         .thenAccept(managedLedgerConfig -> {
             if (isBrokerEntryMetadataEnabled() || 
isBrokerPayloadProcessorEnabled()) {
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiTest.java 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiTest.java
index cea43cc9345..e9ca122bba1 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiTest.java
@@ -2912,10 +2912,8 @@ public class AdminApiTest extends 
MockedPulsarServiceBaseTest {
 
     @Test
     public void testPersistentTopicsExpireMessagesInvalidPartitionIndex() 
throws Exception {
-        // Force to create a topic
-        
publishMessagesOnPersistentTopic("persistent://prop-xyz/ns1/ds2-partition-2", 
0);
-        assertEquals(admin.topics().getList("prop-xyz/ns1"),
-                List.of("persistent://prop-xyz/ns1/ds2-partition-2"));
+        // Create a topic
+        admin.topics().createPartitionedTopic("persistent://prop-xyz/ns1/ds2", 
3);
 
         // create consumer and subscription
         @Cleanup
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TopicAutoCreationTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TopicAutoCreationTest.java
index c90ad15242c..e06abd972c1 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TopicAutoCreationTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TopicAutoCreationTest.java
@@ -23,6 +23,7 @@ import static org.mockito.ArgumentMatchers.anyBoolean;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.when;
 import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertThrows;
 import static org.testng.Assert.assertTrue;
 import java.net.InetSocketAddress;
 import java.util.List;
@@ -39,6 +40,7 @@ import org.apache.pulsar.client.api.Consumer;
 import org.apache.pulsar.client.api.Producer;
 import org.apache.pulsar.client.api.ProducerConsumerBase;
 import org.apache.pulsar.client.api.PulsarClientException;
+import org.apache.pulsar.client.api.PulsarClientException.NotAllowedException;
 import org.apache.pulsar.client.impl.LookupService;
 import org.apache.pulsar.client.impl.LookupTopicResult;
 import org.apache.pulsar.client.impl.PulsarClientImpl;
@@ -101,16 +103,14 @@ public class TopicAutoCreationTest extends 
ProducerConsumerBase {
 
         final String partition = "persistent://" + namespaceName + 
"/test-partitioned-topi-auto-creation-partition-0";
 
-        producer = pulsarClient.newProducer()
-                .topic(partition)
-                .create();
-
-        partitionedTopics = 
admin.topics().getPartitionedTopicList(namespaceName);
-        topics = admin.topics().getList(namespaceName);
-        assertEquals(partitionedTopics.size(), 0);
-        assertEquals(topics.size(), 1);
-
-        producer.close();
+        // The Pulsar doesn't automatically create the metadata for the single 
partition, so the producer creation
+        // will fail.
+        assertThrows(NotAllowedException.class, () -> {
+            @Cleanup
+            Producer<byte[]> ignored = pulsarClient.newProducer()
+                    .topic(partition)
+                    .create();
+        });
     }
 
 
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/protocol/PulsarClientBasedHandler.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/protocol/PulsarClientBasedHandler.java
index ed9881a8cad..3d24fe3ce38 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/protocol/PulsarClientBasedHandler.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/protocol/PulsarClientBasedHandler.java
@@ -79,9 +79,11 @@ public class PulsarClientBasedHandler implements 
ProtocolHandler {
 
     @Override
     public void start(BrokerService service) {
+        @Cleanup
+        PulsarAdmin admin = null;
         try {
             final var port = 
service.getPulsar().getListenPortHTTP().orElseThrow();
-            @Cleanup final var admin = 
PulsarAdmin.builder().serviceHttpUrl("http://localhost:"; + port).build();
+            admin = PulsarAdmin.builder().serviceHttpUrl("http://localhost:"; + 
port).build();
             try {
                 admin.clusters().createCluster(cluster, ClusterData.builder()
                         .serviceUrl(service.getPulsar().getWebServiceAddress())
@@ -103,6 +105,7 @@ public class PulsarClientBasedHandler implements 
ProtocolHandler {
             throw new RuntimeException(e);
         }
         try {
+            admin.topics().createPartitionedTopic(topic, partitions);
             final var port = service.getListenPort().orElseThrow();
             client = PulsarClient.builder().serviceUrl("pulsar://localhost:" + 
port).build();
             readers = new ArrayList<>();
@@ -122,7 +125,7 @@ public class PulsarClientBasedHandler implements 
ProtocolHandler {
                     });
                 }
             });
-        } catch (PulsarClientException e) {
+        } catch (PulsarClientException | PulsarAdminException e) {
             throw new RuntimeException(e);
         }
     }
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ExclusiveProducerTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ExclusiveProducerTest.java
index 5f95e557b8c..33a34d3fff4 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ExclusiveProducerTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ExclusiveProducerTest.java
@@ -19,20 +19,21 @@
 package org.apache.pulsar.broker.service;
 
 import static org.testng.Assert.assertFalse;
+import static org.testng.Assert.assertThrows;
 import static org.testng.Assert.assertTrue;
 import static org.testng.Assert.fail;
-
+import io.netty.util.HashedWheelTimer;
 import java.util.Optional;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.TimeUnit;
-import io.netty.util.HashedWheelTimer;
 import lombok.Cleanup;
 import org.apache.pulsar.client.api.Producer;
 import org.apache.pulsar.client.api.ProducerAccessMode;
 import org.apache.pulsar.client.api.PulsarClient;
 import 
org.apache.pulsar.client.api.PulsarClientException.ProducerBusyException;
 import 
org.apache.pulsar.client.api.PulsarClientException.ProducerFencedException;
+import org.apache.pulsar.client.api.PulsarClientException.TimeoutException;
 import org.apache.pulsar.client.api.Schema;
 import org.apache.pulsar.client.impl.PulsarClientImpl;
 import org.apache.pulsar.common.naming.TopicName;
@@ -316,6 +317,7 @@ public class ExclusiveProducerTest extends BrokerTestBase {
     public void topicDeleted(String ignored, boolean partitioned) throws 
Exception {
         String topic = newTopic("persistent", partitioned);
 
+        @Cleanup
         Producer<String> p1 = pulsarClient.newProducer(Schema.STRING)
                 .topic(topic)
                 .accessMode(ProducerAccessMode.Exclusive)
@@ -329,8 +331,14 @@ public class ExclusiveProducerTest extends BrokerTestBase {
             admin.topics().delete(topic, true);
         }
 
-        // The producer should be able to publish again on the topic
-        p1.send("msg-2");
+        if (!partitioned) {
+            // The producer should be able to publish again on the topic
+            p1.send("msg-2");
+        } else {
+            // The partitioned topic is deleted, the producer should not be 
able to publish again on the topic.
+            // Partitioned metadata is required to publish messages to the 
topic.
+            assertThrows(TimeoutException.class, () -> p1.send("msg-2"));
+        }
     }
 
     @Test(dataProvider = "topics")
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicTest.java
index 92b767104f6..0863e1ec5c4 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicTest.java
@@ -127,7 +127,6 @@ import org.apache.pulsar.common.api.proto.MessageMetadata;
 import org.apache.pulsar.common.api.proto.ProducerAccessMode;
 import org.apache.pulsar.common.naming.NamespaceBundle;
 import org.apache.pulsar.common.naming.TopicName;
-import org.apache.pulsar.common.partition.PartitionedTopicMetadata;
 import org.apache.pulsar.common.policies.data.Policies;
 import org.apache.pulsar.common.policies.data.TopicPolicies;
 import org.apache.pulsar.common.policies.data.stats.SubscriptionStatsImpl;
@@ -141,7 +140,7 @@ import org.apache.pulsar.compaction.Compactor;
 import org.apache.pulsar.compaction.CompactorMXBean;
 import org.apache.pulsar.compaction.PulsarCompactionServiceFactory;
 import org.apache.pulsar.metadata.api.MetadataStoreException;
-import org.apache.pulsar.metadata.impl.FaultInjectionMetadataStore;
+import 
org.apache.pulsar.metadata.impl.FaultInjectionMetadataStore.OperationType;
 import org.awaitility.Awaitility;
 import org.mockito.ArgumentCaptor;
 import org.mockito.Mockito;
@@ -1464,8 +1463,6 @@ public class PersistentTopicTest extends 
MockedBookKeeperTestCase {
         
doReturn(CompletableFuture.completedFuture(null)).when(ledgerMock).asyncTruncate();
 
         // create topic
-        
brokerService.pulsar().getPulsarResources().getNamespaceResources().getPartitionedTopicResources()
-                .createPartitionedTopic(TopicName.get(successTopicName), new 
PartitionedTopicMetadata(2));
         PersistentTopic topic = (PersistentTopic) 
brokerService.getOrCreateTopic(successTopicName).get();
 
         Field isFencedField = AbstractTopic.class.getDeclaredField("isFenced");
@@ -1477,8 +1474,7 @@ public class PersistentTopicTest extends 
MockedBookKeeperTestCase {
         assertFalse((boolean) isClosingOrDeletingField.get(topic));
 
         metadataStore.failConditional(new MetadataStoreException("injected 
error"), (op, path) ->
-                op == FaultInjectionMetadataStore.OperationType.PUT &&
-                        
path.equals("/admin/partitioned-topics/prop/use/ns-abc/persistent/successTopic"));
+                op == OperationType.EXISTS && 
path.equals("/admin/flags/policies-readonly"));
         try {
             topic.delete().get();
             fail();
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorTest.java
index d1d7358f346..60932e09116 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorTest.java
@@ -71,7 +71,7 @@ import org.apache.commons.lang3.RandomUtils;
 import org.apache.pulsar.broker.BrokerTestUtil;
 import org.apache.pulsar.broker.PulsarService;
 import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest;
-import org.apache.pulsar.broker.service.BrokerServiceException.NamingException;
+import 
org.apache.pulsar.broker.service.BrokerServiceException.NotAllowedException;
 import org.apache.pulsar.broker.service.persistent.PersistentReplicator;
 import org.apache.pulsar.broker.service.persistent.PersistentTopic;
 import org.apache.pulsar.broker.stats.OpenTelemetryReplicatorStats;
@@ -1208,7 +1208,7 @@ public class ReplicatorTest extends ReplicatorTestBase {
             if (!isPartitionedTopic) {
                 fail("Topic creation should not fail without any partitioned 
topic");
             }
-            assertTrue(e.getCause() instanceof NamingException);
+            assertTrue(e.getCause() instanceof NotAllowedException);
         }
 
         // non-persistent topic test
@@ -1221,7 +1221,7 @@ public class ReplicatorTest extends ReplicatorTestBase {
             if (!isPartitionedTopic) {
                 fail("Topic creation should not fail without any partitioned 
topic");
             }
-            assertTrue(e.getCause() instanceof NamingException);
+            assertTrue(e.getCause() instanceof NotAllowedException);
         }
 
     }
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopicTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopicTest.java
index e0d6a432bda..a654b477b45 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopicTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopicTest.java
@@ -18,6 +18,10 @@
  */
 package org.apache.pulsar.broker.service.nonpersistent;
 
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertNotNull;
+import static org.testng.Assert.assertThrows;
+import static org.testng.Assert.assertTrue;
 import java.lang.reflect.Field;
 import java.util.Optional;
 import java.util.UUID;
@@ -43,11 +47,6 @@ import org.testng.annotations.AfterMethod;
 import org.testng.annotations.BeforeMethod;
 import org.testng.annotations.Test;
 
-import static org.testng.Assert.assertTrue;
-import static org.testng.Assert.assertEquals;
-import static org.testng.Assert.assertNotNull;
-import static org.testng.Assert.fail;
-
 @Test(groups = "broker")
 public class NonPersistentTopicTest extends BrokerTestBase {
 
@@ -113,19 +112,16 @@ public class NonPersistentTopicTest extends 
BrokerTestBase {
     }
 
     @Test
-    public void testCreateNonExistentPartitions() throws PulsarAdminException, 
PulsarClientException {
+    public void testCreateNonExistentPartitions() throws PulsarAdminException {
         final String topicName = 
"non-persistent://prop/ns-abc/testCreateNonExistentPartitions";
         admin.topics().createPartitionedTopic(topicName, 4);
         TopicName partition = TopicName.get(topicName).getPartition(4);
-        try {
+        assertThrows(PulsarClientException.NotAllowedException.class, () -> {
             @Cleanup
-            Producer<byte[]> producer = pulsarClient.newProducer()
+            Producer<byte[]> ignored = pulsarClient.newProducer()
                     .topic(partition.toString())
                     .create();
-            fail("unexpected behaviour");
-        } catch (PulsarClientException.TopicDoesNotExistException ignored) {
-
-        }
+        });
         
assertEquals(admin.topics().getPartitionedTopicMetadata(topicName).partitions, 
4);
     }
 
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentTopicTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentTopicTest.java
index 3e74ab5e1ac..12b9b0568b7 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentTopicTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentTopicTest.java
@@ -18,8 +18,6 @@
  */
 package org.apache.pulsar.broker.service.persistent;
 
-import com.google.common.collect.Multimap;
-import com.google.common.collect.Sets;
 import static 
org.apache.pulsar.broker.stats.prometheus.PrometheusMetricsClient.parseMetrics;
 import static org.mockito.ArgumentMatchers.any;
 import static org.mockito.ArgumentMatchers.anyInt;
@@ -36,6 +34,8 @@ import static org.testng.Assert.assertNotNull;
 import static org.testng.Assert.assertNull;
 import static org.testng.Assert.assertTrue;
 import static org.testng.Assert.fail;
+import com.google.common.collect.Multimap;
+import com.google.common.collect.Sets;
 import java.io.ByteArrayOutputStream;
 import java.lang.reflect.Field;
 import java.nio.charset.StandardCharsets;
@@ -578,37 +578,6 @@ public class PersistentTopicTest extends BrokerTestBase {
         
Assert.assertEquals(admin.topics().getPartitionedTopicMetadata(topicName).partitions,
 4);
     }
 
-    @Test
-    public void testCompatibilityWithPartitionKeyword() throws 
PulsarAdminException, PulsarClientException {
-        final String topicName = 
"persistent://prop/ns-abc/testCompatibilityWithPartitionKeyword";
-        TopicName topicNameEntity = TopicName.get(topicName);
-        String partition2 = topicNameEntity.getPartition(2).toString();
-        // Create a non-partitioned topic with -partition- keyword
-        Producer<byte[]> producer = pulsarClient.newProducer()
-                .topic(partition2)
-                .create();
-        List<String> topics = admin.topics().getList("prop/ns-abc");
-        // Close previous producer to simulate reconnect
-        producer.close();
-        // Disable auto topic creation
-        conf.setAllowAutoTopicCreation(false);
-        // Check the topic exist in the list.
-        Assert.assertTrue(topics.contains(partition2));
-        // Check this topic has no partition metadata.
-        Assert.assertThrows(PulsarAdminException.NotFoundException.class,
-                () -> admin.topics().getPartitionedTopicMetadata(topicName));
-        // Reconnect to the broker and expect successful because the topic has 
existed in the broker.
-        producer = pulsarClient.newProducer()
-                .topic(partition2)
-                .create();
-        producer.close();
-        // Check the topic exist in the list again.
-        Assert.assertTrue(topics.contains(partition2));
-        // Check this topic has no partition metadata again.
-        Assert.assertThrows(PulsarAdminException.NotFoundException.class,
-                () -> admin.topics().getPartitionedTopicMetadata(topicName));
-    }
-
     @Test
     public void testDeleteTopicFail() throws Exception {
         final String fullyTopicName = "persistent://prop/ns-abc/" + "tp_"
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/ConsumerCreationTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/ConsumerCreationTest.java
new file mode 100644
index 00000000000..a81dbe02b34
--- /dev/null
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/ConsumerCreationTest.java
@@ -0,0 +1,127 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.pulsar.client.api;
+
+import static org.testng.Assert.assertThrows;
+import lombok.Cleanup;
+import org.apache.pulsar.client.admin.PulsarAdminException;
+import org.apache.pulsar.client.api.PulsarClientException.NotAllowedException;
+import org.apache.pulsar.common.naming.TopicDomain;
+import org.apache.pulsar.common.naming.TopicName;
+import org.testng.annotations.AfterMethod;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.DataProvider;
+import org.testng.annotations.Test;
+
+@Test(groups = "broker-api")
+public class ConsumerCreationTest extends ProducerConsumerBase {
+
+    @BeforeMethod
+    @Override
+    protected void setup() throws Exception {
+        super.internalSetup();
+        super.producerBaseSetup();
+    }
+
+    @AfterMethod(alwaysRun = true)
+    @Override
+    protected void cleanup() throws Exception {
+        super.internalCleanup();
+    }
+
+    @DataProvider(name = "topicDomainProvider")
+    public Object[][] topicDomainProvider() {
+        return new Object[][]{
+                {TopicDomain.persistent},
+                {TopicDomain.non_persistent}
+        };
+    }
+
+    @Test(dataProvider = "topicDomainProvider")
+    public void testCreateConsumerWhenTopicTypeMismatch(TopicDomain domain)
+            throws PulsarAdminException, PulsarClientException {
+        String nonPartitionedTopic =
+                TopicName.get(domain.value(), "public", "default",
+                                
"testCreateConsumerWhenTopicTypeMismatch-nonPartitionedTopic")
+                        .toString();
+        admin.topics().createNonPartitionedTopic(nonPartitionedTopic);
+
+        // Topic type is non-partitioned, Trying to create consumer on 
partitioned topic.
+        assertThrows(NotAllowedException.class, () -> {
+            @Cleanup
+            Consumer<byte[]> ignored =
+                    
pulsarClient.newConsumer().topic(TopicName.get(nonPartitionedTopic).getPartition(2).toString())
+                            .subscriptionName("my-sub").subscribe();
+        });
+
+        // Topic type is partitioned, Trying to create consumer on 
non-partitioned topic.
+        String partitionedTopic = TopicName.get(domain.value(), "public", 
"default",
+                        
"testCreateConsumerWhenTopicTypeMismatch-partitionedTopic")
+                .toString();
+        admin.topics().createPartitionedTopic(partitionedTopic, 3);
+
+        // Works fine because the lookup can help our to find the correct 
topic.
+        {
+            @Cleanup
+            Consumer<byte[]> ignored =
+                    
pulsarClient.newConsumer().topic(TopicName.get(partitionedTopic).getPartition(2).toString())
+                            .subscriptionName("my-sub").subscribe();
+        }
+
+        // Partition index is out of range.
+        assertThrows(NotAllowedException.class, () -> {
+            @Cleanup
+            Consumer<byte[]> ignored =
+                    
pulsarClient.newConsumer().topic(TopicName.get(partitionedTopic).getPartition(100).toString())
+                            .subscriptionName("my-sub").subscribe();
+        });
+    }
+
+    @Test(dataProvider = "topicDomainProvider")
+    public void testCreateConsumerWhenSinglePartitionIsDeleted(TopicDomain 
domain)
+            throws PulsarAdminException, PulsarClientException {
+        testCreateConsumerWhenSinglePartitionIsDeleted(domain, false);
+        testCreateConsumerWhenSinglePartitionIsDeleted(domain, true);
+    }
+
+    private void testCreateConsumerWhenSinglePartitionIsDeleted(TopicDomain 
domain, boolean allowAutoTopicCreation)
+            throws PulsarAdminException, PulsarClientException {
+        conf.setAllowAutoTopicCreation(allowAutoTopicCreation);
+
+        String partitionedTopic = TopicName.get(domain.value(), "public", 
"default",
+                        "testCreateConsumerWhenSinglePartitionIsDeleted-" + 
allowAutoTopicCreation)
+                .toString();
+        admin.topics().createPartitionedTopic(partitionedTopic, 3);
+        
admin.topics().delete(TopicName.get(partitionedTopic).getPartition(1).toString());
+
+        // Non-persistent topic only have the metadata, and no partition, so 
it works fine.
+        if (allowAutoTopicCreation || 
domain.equals(TopicDomain.non_persistent)) {
+            @Cleanup
+            Consumer<byte[]> ignored =
+                    
pulsarClient.newConsumer().topic(partitionedTopic).subscriptionName("my-sub").subscribe();
+        } else {
+            
assertThrows(PulsarClientException.TopicDoesNotExistException.class, () -> {
+                @Cleanup
+                Consumer<byte[]> ignored =
+                        
pulsarClient.newConsumer().topic(partitionedTopic).subscriptionName("my-sub").subscribe();
+            });
+        }
+    }
+}
\ No newline at end of file
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/ProducerCreationTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/ProducerCreationTest.java
index d5734588288..cd75383a487 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/ProducerCreationTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/ProducerCreationTest.java
@@ -18,8 +18,11 @@
  */
 package org.apache.pulsar.client.api;
 
+import static org.testng.Assert.assertThrows;
 import static org.testng.Assert.fail;
+import lombok.Cleanup;
 import org.apache.pulsar.client.admin.PulsarAdminException;
+import org.apache.pulsar.client.api.PulsarClientException.NotAllowedException;
 import org.apache.pulsar.client.impl.ProducerBuilderImpl;
 import org.apache.pulsar.client.impl.ProducerImpl;
 import org.apache.pulsar.common.naming.TopicDomain;
@@ -191,4 +194,74 @@ public class ProducerCreationTest extends 
ProducerConsumerBase {
 
         
Assert.assertFalse(admin.topics().getSubscriptions(topic.toString()).contains(initialSubscriptionName));
     }
+
+    @Test(dataProvider = "topicDomainProvider")
+    public void testCreateProducerWhenTopicTypeMismatch(TopicDomain domain)
+            throws PulsarAdminException, PulsarClientException {
+        String nonPartitionedTopic =
+                TopicName.get(domain.value(), "public", "default",
+                                
"testCreateProducerWhenTopicTypeMismatch-nonPartitionedTopic")
+                        .toString();
+        admin.topics().createNonPartitionedTopic(nonPartitionedTopic);
+
+        // Topic type is non-partitioned, trying to create producer on the 
complete partitioned topic.
+        // Should throw NotAllowedException.
+        assertThrows(NotAllowedException.class, () -> {
+            @Cleanup
+            Producer<byte[]> ignored =
+                    
pulsarClient.newProducer().topic(TopicName.get(nonPartitionedTopic).getPartition(2).toString())
+                            .create();
+        });
+
+        // Topic type is partitioned, trying to create producer on the base 
partitioned topic.
+        String partitionedTopic = TopicName.get(domain.value(), "public", 
"default",
+                        
"testCreateProducerWhenTopicTypeMismatch-partitionedTopic")
+                .toString();
+        admin.topics().createPartitionedTopic(partitionedTopic, 3);
+
+        // Works fine because the lookup can help our to find all the topics.
+        {
+            @Cleanup
+            Producer<byte[]> ignored =
+                    
pulsarClient.newProducer().topic(TopicName.get(partitionedTopic).getPartitionedTopicName())
+                            .create();
+        }
+
+        // Partition index is out of range.
+        assertThrows(NotAllowedException.class, () -> {
+            @Cleanup
+            Producer<byte[]> ignored =
+                    
pulsarClient.newProducer().topic(TopicName.get(partitionedTopic).getPartition(100).toString())
+                            .create();
+        });
+    }
+
+    @Test(dataProvider = "topicDomainProvider")
+    public void testCreateProducerWhenSinglePartitionIsDeleted(TopicDomain 
domain)
+            throws PulsarAdminException, PulsarClientException {
+        testCreateProducerWhenSinglePartitionIsDeleted(domain, false);
+        testCreateProducerWhenSinglePartitionIsDeleted(domain, true);
+    }
+
+    private void testCreateProducerWhenSinglePartitionIsDeleted(TopicDomain 
domain, boolean allowAutoTopicCreation)
+            throws PulsarAdminException, PulsarClientException {
+        conf.setAllowAutoTopicCreation(allowAutoTopicCreation);
+
+        String partitionedTopic = TopicName.get(domain.value(), "public", 
"default",
+                        "testCreateProducerWhenSinglePartitionIsDeleted-" + 
allowAutoTopicCreation)
+                .toString();
+        admin.topics().createPartitionedTopic(partitionedTopic, 3);
+        
admin.topics().delete(TopicName.get(partitionedTopic).getPartition(1).toString());
+
+        // Non-persistent topic only have the metadata, and no partition, so 
it works fine.
+        if (allowAutoTopicCreation || domain == TopicDomain.non_persistent) {
+            @Cleanup
+            Producer<byte[]> ignored = 
pulsarClient.newProducer().topic(partitionedTopic).create();
+        } else {
+            
assertThrows(PulsarClientException.TopicDoesNotExistException.class, () -> {
+                @Cleanup
+                Producer<byte[]> ignored = 
pulsarClient.newProducer().topic(partitionedTopic).create();
+            });
+        }
+    }
 }
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/LookupServiceTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/LookupServiceTest.java
index 59cb7ae03d0..c4ef53b292b 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/LookupServiceTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/LookupServiceTest.java
@@ -19,13 +19,19 @@
 package org.apache.pulsar.client.impl;
 
 import static 
org.apache.pulsar.common.api.proto.CommandGetTopicsOfNamespace.Mode;
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.testng.Assert.assertEquals;
 import static org.testng.Assert.assertFalse;
+import static org.testng.Assert.assertThrows;
 import static org.testng.Assert.assertTrue;
 import java.util.Collection;
+import java.util.concurrent.TimeUnit;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.pulsar.broker.BrokerTestUtil;
+import org.apache.pulsar.client.admin.PulsarAdminException;
 import org.apache.pulsar.client.api.ProducerConsumerBase;
 import org.apache.pulsar.client.api.PulsarClient;
+import org.apache.pulsar.client.api.PulsarClientException;
 import org.apache.pulsar.common.naming.NamespaceName;
 import org.apache.pulsar.common.naming.TopicName;
 import org.testng.annotations.AfterClass;
@@ -125,4 +131,80 @@ public class LookupServiceTest extends 
ProducerConsumerBase {
         admin.topics().delete(nonPartitionedTopic, false);
     }
 
+    @Test(dataProvider = "isUsingHttpLookup")
+    public void testGetPartitionedTopicMetadataByPulsarClient(boolean 
isUsingHttpLookup) throws PulsarAdminException {
+        LookupService lookupService = getLookupService(isUsingHttpLookup);
+
+        // metadataAutoCreationEnabled is true.
+        assertThat(lookupService.getPartitionedTopicMetadata(
+                
TopicName.get(BrokerTestUtil.newUniqueName("persistent://public/default/tp")), 
true))
+                .succeedsWithin(3, TimeUnit.SECONDS)
+                .matches(n -> n.partitions == 0);
+
+        // metadataAutoCreationEnabled is true.
+        // Allow the get the metadata of single partition topic, because the 
auto-creation is enabled.
+        // But the producer/consumer is unavailable because the topic doesn't 
have the metadata.
+        assertThat(lookupService.getPartitionedTopicMetadata(
+                
TopicName.get(BrokerTestUtil.newUniqueName("persistent://public/default/tp") + 
"-partition-10"),
+                true))
+                .succeedsWithin(3, TimeUnit.SECONDS)
+                .matches(n -> n.partitions == 0);
+
+        Class<? extends Throwable> expectedExceptionClass =
+                isUsingHttpLookup ? 
PulsarClientException.NotFoundException.class :
+                        PulsarClientException.TopicDoesNotExistException.class;
+        // metadataAutoCreationEnabled is false.
+        assertThat(lookupService.getPartitionedTopicMetadata(
+                
TopicName.get(BrokerTestUtil.newUniqueName("persistent://public/default/tp")), 
false))
+                .failsWithin(3, TimeUnit.SECONDS)
+                .withThrowableThat()
+                .withCauseInstanceOf(expectedExceptionClass);
+
+        // metadataAutoCreationEnabled is false.
+        assertThat(lookupService.getPartitionedTopicMetadata(
+                
TopicName.get(BrokerTestUtil.newUniqueName("persistent://public/default/tp") + 
"-partition-10"),
+                false))
+                .failsWithin(3, TimeUnit.SECONDS)
+                .withThrowableThat()
+                .withCauseInstanceOf(expectedExceptionClass);
+
+        // Verify the topic exists, and the metadataAutoCreationEnabled is 
false.
+        String nonPartitionedTopic = 
BrokerTestUtil.newUniqueName("persistent://public/default/tp");
+        admin.topics().createNonPartitionedTopic(nonPartitionedTopic);
+        
assertThat(lookupService.getPartitionedTopicMetadata(TopicName.get(nonPartitionedTopic),
 false))
+                .succeedsWithin(3, TimeUnit.SECONDS)
+                .matches(n -> n.partitions == 0);
+
+        String partitionedTopic = 
BrokerTestUtil.newUniqueName("persistent://public/default/tp");
+        String partitionedTopicWithPartitionIndex = partitionedTopic + 
"-partition-10";
+        admin.topics().createPartitionedTopic(partitionedTopic, 20);
+        
assertThat(lookupService.getPartitionedTopicMetadata(TopicName.get(partitionedTopic),
 false))
+                .succeedsWithin(3, TimeUnit.SECONDS)
+                .matches(n -> n.partitions == 20);
+        
assertThat(lookupService.getPartitionedTopicMetadata(TopicName.get(partitionedTopicWithPartitionIndex),
 false))
+                .succeedsWithin(3, TimeUnit.SECONDS)
+                .matches(n -> n.partitions == 0);
+    }
+
+    @Test
+    public void testGetPartitionedTopicMedataByAdmin() throws 
PulsarAdminException {
+        String nonPartitionedTopic = 
BrokerTestUtil.newUniqueName("persistent://public/default/tp");
+        String partitionedTopic = 
BrokerTestUtil.newUniqueName("persistent://public/default/tp");
+        String partitionedTopicWithPartitionIndex = partitionedTopic + 
"-partition-10";
+        // No topic, so throw the NotFound.
+        // BTW: The admin api doesn't allow to creat the metadata of topic 
default.
+        assertThrows(PulsarAdminException.NotFoundException.class, () -> 
admin.topics()
+                .getPartitionedTopicMetadata(nonPartitionedTopic));
+        assertThrows(PulsarAdminException.NotFoundException.class, () -> 
admin.topics()
+                .getPartitionedTopicMetadata(partitionedTopic));
+        assertThrows(PulsarAdminException.NotFoundException.class,
+                () -> 
admin.topics().getPartitionedTopicMetadata(partitionedTopicWithPartitionIndex));
+
+        admin.topics().createNonPartitionedTopic(nonPartitionedTopic);
+        
assertEquals(admin.topics().getPartitionedTopicMetadata(nonPartitionedTopic).partitions,
 0);
+
+        admin.topics().createPartitionedTopic(partitionedTopic, 20);
+        
assertEquals(admin.topics().getPartitionedTopicMetadata(partitionedTopic).partitions,
 20);
+        
assertEquals(admin.topics().getPartitionedTopicMetadata(partitionedTopicWithPartitionIndex).partitions,
 0);
+    }
 }
diff --git 
a/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/PulsarClientException.java
 
b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/PulsarClientException.java
index b2c9b2b697b..32cec950a69 100644
--- 
a/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/PulsarClientException.java
+++ 
b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/PulsarClientException.java
@@ -961,7 +961,9 @@ public class PulsarClientException extends IOException {
     public static Throwable wrap(Throwable t, String msg) {
         msg += "\n" + t.getMessage();
         // wrap an exception with new message info
-        if (t instanceof TimeoutException) {
+        if (t instanceof TopicDoesNotExistException) {
+            return new TopicDoesNotExistException(msg);
+        } else if (t instanceof TimeoutException) {
             return new TimeoutException(msg);
         } else if (t instanceof InvalidConfigurationException) {
             return new InvalidConfigurationException(msg);

Reply via email to