This is an automated email from the ASF dual-hosted git repository.

lhotari pushed a commit to branch branch-3.3
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit 9eee8e235cb5588c1784908f4bc5ba175ef69fd4
Author: Lari Hotari <lhot...@apache.org>
AuthorDate: Wed Apr 23 13:52:27 2025 +0300

    Revert "[fix][broker] Add topic consistency check (#24118)"
    
    This reverts commit 61ab666cdb79ce3ca21fe1d7666bbd1ab5cf8670.
---
 .../pulsar/broker/namespace/NamespaceService.java  |  35 ++---
 .../pulsar/broker/service/BrokerService.java       | 150 ++++++++++-----------
 .../apache/pulsar/broker/admin/AdminApiTest.java   |   6 +-
 .../pulsar/broker/admin/TopicAutoCreationTest.java |  20 +--
 .../broker/protocol/PulsarClientBasedHandler.java  |   7 +-
 .../broker/service/ExclusiveProducerTest.java      |  16 +--
 .../pulsar/broker/service/PersistentTopicTest.java |   8 +-
 .../pulsar/broker/service/ReplicatorTest.java      |   6 +-
 .../nonpersistent/NonPersistentTopicTest.java      |  20 +--
 .../service/persistent/PersistentTopicTest.java    |  35 ++++-
 .../pulsar/client/api/ConsumerCreationTest.java    | 127 -----------------
 .../pulsar/client/api/ProducerCreationTest.java    |  73 ----------
 .../pulsar/client/impl/LookupServiceTest.java      |  82 -----------
 .../pulsar/client/api/PulsarClientException.java   |   4 +-
 14 files changed, 156 insertions(+), 433 deletions(-)

diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java
index 5320a50e10f..f3fb17c02fc 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java
@@ -1421,26 +1421,17 @@ public class NamespaceService implements AutoCloseable {
      * Check topic exists( partitioned or non-partitioned ).
      */
     public CompletableFuture<TopicExistsInfo> checkTopicExists(TopicName 
topic) {
-        // For non-persistent/persistent partitioned topic, which has metadata.
-        return pulsar.getBrokerService().fetchPartitionedTopicMetadataAsync(
-                        topic.isPartitioned() ? 
TopicName.get(topic.getPartitionedTopicName()) : topic)
-                .thenCompose(metadata -> {
-                    if (metadata.partitions > 0) {
-                        if (!topic.isPartitioned()) {
-                            return CompletableFuture.completedFuture(
-                                    
TopicExistsInfo.newPartitionedTopicExists(metadata.partitions));
-                        } else {
-                            if (topic.getPartitionIndex() < 
metadata.partitions) {
-                                return CompletableFuture.completedFuture(
-                                        
TopicExistsInfo.newNonPartitionedTopicExists());
-                            }
-                        }
-                    }
-                    // Direct query the single topic.
-                    return checkNonPartitionedTopicExists(topic).thenApply(
-                            b -> b ? 
TopicExistsInfo.newNonPartitionedTopicExists() :
-                                    TopicExistsInfo.newTopicNotExists());
-                });
+        return pulsar.getBrokerService()
+            
.fetchPartitionedTopicMetadataAsync(TopicName.get(topic.toString()))
+            .thenCompose(metadata -> {
+                if (metadata.partitions > 0) {
+                    return CompletableFuture.completedFuture(
+                            
TopicExistsInfo.newPartitionedTopicExists(metadata.partitions));
+                }
+                return checkNonPartitionedTopicExists(topic)
+                    .thenApply(b -> b ? 
TopicExistsInfo.newNonPartitionedTopicExists()
+                            : TopicExistsInfo.newTopicNotExists());
+            });
     }
 
     /***
@@ -1461,12 +1452,12 @@ public class NamespaceService implements AutoCloseable {
      */
     public CompletableFuture<Boolean> 
checkNonPersistentNonPartitionedTopicExists(String topic) {
         TopicName topicName = TopicName.get(topic);
-        // "non-partitioned & non-persistent" topics only exist on the cache 
of the owner broker.
+        // "non-partitioned & non-persistent" topics only exist on the owner 
broker.
         return checkTopicOwnership(TopicName.get(topic)).thenCompose(isOwned 
-> {
             // The current broker is the owner.
             if (isOwned) {
                CompletableFuture<Optional<Topic>> nonPersistentTopicFuture = 
pulsar.getBrokerService()
-                       .getTopics().get(topic);
+                       .getTopic(topic, false);
                if (nonPersistentTopicFuture != null) {
                    return 
nonPersistentTopicFuture.thenApply(Optional::isPresent);
                } else {
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
index 79b214def01..766af721a6b 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
@@ -1040,70 +1040,6 @@ public class BrokerService implements Closeable {
         return getTopic(TopicName.get(topic), createIfMissing, properties);
     }
 
-    /**
-     * Validates that the topic is consistent with its partition metadata.
-     *
-     * This method ensures the topic (partitioned or non-partitioned) correctly
-     * matches the actual partitions in the metadata. Inconsistencies typically
-     * indicate configuration issues or metadata synchronization problems.
-     *
-     * This validation is particularly important in geo-replicated 
environments where
-     * topic metadata may not be fully synchronized across all regions, 
potentially
-     * leading to access errors if not properly handled.
-     *
-     * @param topicName The topic name to validate
-     * @return CompletableFuture that completes normally if validation passes, 
or
-     * completes exceptionally with NotAllowedException if validation fails
-     */
-    private CompletableFuture<Void> validateTopicConsistency(TopicName 
topicName) {
-        if 
(NamespaceService.isHeartbeatNamespace(topicName.getNamespaceObject())) {
-            // Skip validation for heartbeat namespace.
-            return CompletableFuture.completedFuture(null);
-        }
-        TopicName baseTopicName =
-                topicName.isPartitioned() ? 
TopicName.get(topicName.getPartitionedTopicName()) : topicName;
-        return fetchPartitionedTopicMetadataAsync(baseTopicName)
-                .thenCompose(metadata -> {
-                    if (topicName.isPartitioned()) {
-                        if (metadata.partitions == 0) {
-                            // Edge case: When a complete partitioned topic 
name is provided but metadata shows 0
-                            // partitions.
-                            // This indicates that the partitioned topic 
metadata doesn't exist.
-                            //
-                            // Resolution options:
-                            // 1. Creates the partitioned topic via admin API.
-                            // 2. Uses the base topic name and then rely on 
auto-creation the partitioned topic if
-                            // enabled.
-                            return FutureUtil.failedFuture(
-                                    new 
BrokerServiceException.NotAllowedException(
-                                            "Partition metadata not found for 
the partitioned topic: " + topicName));
-                        }
-                        if (topicName.getPartitionIndex() >= 
metadata.partitions) {
-                            final String errorMsg =
-                                    String.format(
-                                            "Illegal topic partition name %s 
with max allowed "
-                                                    + "%d partitions", 
topicName,
-                                            metadata.partitions);
-                            log.warn(errorMsg);
-                            return FutureUtil.failedFuture(
-                                    new 
BrokerServiceException.NotAllowedException(errorMsg));
-                        }
-                    } else if (metadata.partitions > 0) {
-                        // Edge case: Non-partitioned topic name was provided, 
but metadata indicates this is
-                        // actually a partitioned
-                        // topic (partitions > 0).
-                        //
-                        // Resolution: Must use the complete partitioned topic 
name('topic-name-partition-N').
-                        //
-                        // This ensures proper routing to the specific 
partition and prevents ambiguity in topic
-                        // addressing.
-                        return FutureUtil.failedFuture(new 
BrokerServiceException.NotAllowedException(
-                                "Found partitioned metadata for 
non-partitioned topic: " + topicName));
-                    }
-                    return CompletableFuture.completedFuture(null);
-                });
-    }
-
     /**
      * Retrieves or creates a topic based on the specified parameters.
      * 0. If disable PersistentTopics or NonPersistentTopics, it will return a 
failed future with NotAllowedException.
@@ -1150,9 +1086,30 @@ public class BrokerService implements Closeable {
                         throw FutureUtil.wrapToCompletionException(new 
ServiceUnitNotReadyException(errorInfo));
                     }).thenCompose(optionalTopicPolicies -> {
                         final TopicPolicies topicPolicies = 
optionalTopicPolicies.orElse(null);
-                        return topics.computeIfAbsent(topicName.toString(),
-                                (tpName) -> 
loadOrCreatePersistentTopic(tpName, createIfMissing, properties,
-                                        topicPolicies));
+                        if (topicName.isPartitioned()) {
+                            final TopicName topicNameEntity = 
TopicName.get(topicName.getPartitionedTopicName());
+                            return 
fetchPartitionedTopicMetadataAsync(topicNameEntity)
+                                    .thenCompose((metadata) -> {
+                                        // Allow creating non-partitioned 
persistent topic that name includes
+                                        // `partition`
+                                        if (metadata.partitions == 0
+                                                || 
topicName.getPartitionIndex() < metadata.partitions) {
+                                            return 
topics.computeIfAbsent(topicName.toString(), (tpName) ->
+                                                    
loadOrCreatePersistentTopic(tpName,
+                                                            createIfMissing, 
properties, topicPolicies));
+                                        } else {
+                                            final String errorMsg =
+                                                    String.format("Illegal 
topic partition name %s with max allowed "
+                                                            + "%d partitions", 
topicName, metadata.partitions);
+                                            log.warn(errorMsg);
+                                            return FutureUtil.failedFuture(
+                                                    new 
BrokerServiceException.NotAllowedException(errorMsg));
+                                        }
+                                    });
+                        } else {
+                            return 
topics.computeIfAbsent(topicName.toString(), (tpName) ->
+                                    loadOrCreatePersistentTopic(tpName, 
createIfMissing, properties, topicPolicies));
+                        }
                     });
                 });
             } else {
@@ -1166,10 +1123,29 @@ public class BrokerService implements Closeable {
                 if (!topics.containsKey(topicName.toString())) {
                     topicEventsDispatcher.notify(topicName.toString(), 
TopicEvent.LOAD, EventStage.BEFORE);
                 }
-                if (topicName.isPartitioned() || createIfMissing) {
+                if (topicName.isPartitioned()) {
+                    final TopicName partitionedTopicName = 
TopicName.get(topicName.getPartitionedTopicName());
+                    return 
this.fetchPartitionedTopicMetadataAsync(partitionedTopicName).thenCompose((metadata)
 -> {
+                        if (topicName.getPartitionIndex() < 
metadata.partitions) {
+                            return 
topics.computeIfAbsent(topicName.toString(), (name) -> {
+                                topicEventsDispatcher
+                                        .notify(topicName.toString(), 
TopicEvent.CREATE, EventStage.BEFORE);
+
+                                CompletableFuture<Optional<Topic>> res = 
createNonPersistentTopic(name);
+
+                                CompletableFuture<Optional<Topic>> eventFuture 
= topicEventsDispatcher
+                                        .notifyOnCompletion(res, 
topicName.toString(), TopicEvent.CREATE);
+                                topicEventsDispatcher
+                                        .notifyOnCompletion(eventFuture, 
topicName.toString(), TopicEvent.LOAD);
+                                return res;
+                            });
+                        }
+                        topicEventsDispatcher.notify(topicName.toString(), 
TopicEvent.LOAD, EventStage.FAILURE);
+                        return 
CompletableFuture.completedFuture(Optional.empty());
+                    });
+                } else if (createIfMissing) {
                     return topics.computeIfAbsent(topicName.toString(), (name) 
-> {
-                        topicEventsDispatcher
-                                .notify(topicName.toString(), 
TopicEvent.CREATE, EventStage.BEFORE);
+                        topicEventsDispatcher.notify(topicName.toString(), 
TopicEvent.CREATE, EventStage.BEFORE);
 
                         CompletableFuture<Optional<Topic>> res = 
createNonPersistentTopic(name);
 
@@ -1179,13 +1155,14 @@ public class BrokerService implements Closeable {
                                 .notifyOnCompletion(eventFuture, 
topicName.toString(), TopicEvent.LOAD);
                         return res;
                     });
+                } else {
+                    CompletableFuture<Optional<Topic>> topicFuture = 
topics.get(topicName.toString());
+                    if (topicFuture == null) {
+                        topicEventsDispatcher.notify(topicName.toString(), 
TopicEvent.LOAD, EventStage.FAILURE);
+                        topicFuture = 
CompletableFuture.completedFuture(Optional.empty());
+                    }
+                    return topicFuture;
                 }
-                CompletableFuture<Optional<Topic>> topicFuture = 
topics.get(topicName.toString());
-                if (topicFuture == null) {
-                    topicEventsDispatcher.notify(topicName.toString(), 
TopicEvent.LOAD, EventStage.FAILURE);
-                    topicFuture = 
CompletableFuture.completedFuture(Optional.empty());
-                }
-                return topicFuture;
             }
         } catch (IllegalArgumentException e) {
             log.warn("[{}] Illegalargument exception when loading topic", 
topicName, e);
@@ -1341,9 +1318,8 @@ public class BrokerService implements Closeable {
             topicFuture.completeExceptionally(e);
             return topicFuture;
         }
-        checkTopicNsOwnership(topic)
-                .thenCompose((__) -> 
validateTopicConsistency(TopicName.get(topic)))
-                .thenRun(() -> {
+        CompletableFuture<Void> isOwner = checkTopicNsOwnership(topic);
+        isOwner.thenRun(() -> {
             nonPersistentTopic.initialize()
                     .thenCompose(__ -> nonPersistentTopic.checkReplication())
                     .thenRun(() -> {
@@ -1360,7 +1336,17 @@ public class BrokerService implements Closeable {
                 return null;
             });
         }).exceptionally(e -> {
-            
topicFuture.completeExceptionally(FutureUtil.unwrapCompletionException(e));
+            log.warn("CheckTopicNsOwnership fail when 
createNonPersistentTopic! {}", topic, e.getCause());
+            // CheckTopicNsOwnership fail dont create nonPersistentTopic, when 
topic do lookup will find the correct
+            // broker. When client get non-persistent-partitioned topic
+            // metadata will the non-persistent-topic will be created.
+            // so we should add checkTopicNsOwnership logic otherwise the 
topic will be created
+            // if it dont own by this broker,we should return success
+            // otherwise it will keep retrying getPartitionedTopicMetadata
+            topicFuture.complete(Optional.of(nonPersistentTopic));
+            // after get metadata return success, we should delete this topic 
from this broker, because this topic not
+            // owner by this broker and it don't initialize and 
checkReplication
+            pulsar.getExecutor().execute(() -> topics.remove(topic, 
topicFuture));
             return null;
         });
 
@@ -1728,8 +1714,8 @@ public class BrokerService implements Closeable {
                 : CompletableFuture.completedFuture(null);
 
         CompletableFuture<Void> isTopicAlreadyMigrated = 
checkTopicAlreadyMigrated(topicName);
-        maxTopicsCheck.thenCompose(partitionedTopicMetadata -> 
validateTopicConsistency(topicName))
-                .thenCompose(__ -> isTopicAlreadyMigrated)
+
+        maxTopicsCheck.thenCompose(__ -> isTopicAlreadyMigrated)
                 .thenCompose(__ -> getManagedLedgerConfig(topicName, 
topicPolicies))
         .thenAccept(managedLedgerConfig -> {
             if (isBrokerEntryMetadataEnabled() || 
isBrokerPayloadProcessorEnabled()) {
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiTest.java 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiTest.java
index 4ea7105dba1..9ebeaeb7853 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiTest.java
@@ -2914,8 +2914,10 @@ public class AdminApiTest extends 
MockedPulsarServiceBaseTest {
 
     @Test
     public void testPersistentTopicsExpireMessagesInvalidPartitionIndex() 
throws Exception {
-        // Create a topic
-        admin.topics().createPartitionedTopic("persistent://prop-xyz/ns1/ds2", 
3);
+        // Force to create a topic
+        
publishMessagesOnPersistentTopic("persistent://prop-xyz/ns1/ds2-partition-2", 
0);
+        assertEquals(admin.topics().getList("prop-xyz/ns1"),
+                List.of("persistent://prop-xyz/ns1/ds2-partition-2"));
 
         // create consumer and subscription
         @Cleanup
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TopicAutoCreationTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TopicAutoCreationTest.java
index e06abd972c1..c90ad15242c 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TopicAutoCreationTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TopicAutoCreationTest.java
@@ -23,7 +23,6 @@ import static org.mockito.ArgumentMatchers.anyBoolean;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.when;
 import static org.testng.Assert.assertEquals;
-import static org.testng.Assert.assertThrows;
 import static org.testng.Assert.assertTrue;
 import java.net.InetSocketAddress;
 import java.util.List;
@@ -40,7 +39,6 @@ import org.apache.pulsar.client.api.Consumer;
 import org.apache.pulsar.client.api.Producer;
 import org.apache.pulsar.client.api.ProducerConsumerBase;
 import org.apache.pulsar.client.api.PulsarClientException;
-import org.apache.pulsar.client.api.PulsarClientException.NotAllowedException;
 import org.apache.pulsar.client.impl.LookupService;
 import org.apache.pulsar.client.impl.LookupTopicResult;
 import org.apache.pulsar.client.impl.PulsarClientImpl;
@@ -103,14 +101,16 @@ public class TopicAutoCreationTest extends 
ProducerConsumerBase {
 
         final String partition = "persistent://" + namespaceName + 
"/test-partitioned-topi-auto-creation-partition-0";
 
-        // The Pulsar doesn't automatically create the metadata for the single 
partition, so the producer creation
-        // will fail.
-        assertThrows(NotAllowedException.class, () -> {
-            @Cleanup
-            Producer<byte[]> ignored = pulsarClient.newProducer()
-                    .topic(partition)
-                    .create();
-        });
+        producer = pulsarClient.newProducer()
+                .topic(partition)
+                .create();
+
+        partitionedTopics = 
admin.topics().getPartitionedTopicList(namespaceName);
+        topics = admin.topics().getList(namespaceName);
+        assertEquals(partitionedTopics.size(), 0);
+        assertEquals(topics.size(), 1);
+
+        producer.close();
     }
 
 
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/protocol/PulsarClientBasedHandler.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/protocol/PulsarClientBasedHandler.java
index 3d24fe3ce38..ed9881a8cad 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/protocol/PulsarClientBasedHandler.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/protocol/PulsarClientBasedHandler.java
@@ -79,11 +79,9 @@ public class PulsarClientBasedHandler implements 
ProtocolHandler {
 
     @Override
     public void start(BrokerService service) {
-        @Cleanup
-        PulsarAdmin admin = null;
         try {
             final var port = 
service.getPulsar().getListenPortHTTP().orElseThrow();
-            admin = PulsarAdmin.builder().serviceHttpUrl("http://localhost:"; + 
port).build();
+            @Cleanup final var admin = 
PulsarAdmin.builder().serviceHttpUrl("http://localhost:"; + port).build();
             try {
                 admin.clusters().createCluster(cluster, ClusterData.builder()
                         .serviceUrl(service.getPulsar().getWebServiceAddress())
@@ -105,7 +103,6 @@ public class PulsarClientBasedHandler implements 
ProtocolHandler {
             throw new RuntimeException(e);
         }
         try {
-            admin.topics().createPartitionedTopic(topic, partitions);
             final var port = service.getListenPort().orElseThrow();
             client = PulsarClient.builder().serviceUrl("pulsar://localhost:" + 
port).build();
             readers = new ArrayList<>();
@@ -125,7 +122,7 @@ public class PulsarClientBasedHandler implements 
ProtocolHandler {
                     });
                 }
             });
-        } catch (PulsarClientException | PulsarAdminException e) {
+        } catch (PulsarClientException e) {
             throw new RuntimeException(e);
         }
     }
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ExclusiveProducerTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ExclusiveProducerTest.java
index 33a34d3fff4..5f95e557b8c 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ExclusiveProducerTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ExclusiveProducerTest.java
@@ -19,21 +19,20 @@
 package org.apache.pulsar.broker.service;
 
 import static org.testng.Assert.assertFalse;
-import static org.testng.Assert.assertThrows;
 import static org.testng.Assert.assertTrue;
 import static org.testng.Assert.fail;
-import io.netty.util.HashedWheelTimer;
+
 import java.util.Optional;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.TimeUnit;
+import io.netty.util.HashedWheelTimer;
 import lombok.Cleanup;
 import org.apache.pulsar.client.api.Producer;
 import org.apache.pulsar.client.api.ProducerAccessMode;
 import org.apache.pulsar.client.api.PulsarClient;
 import 
org.apache.pulsar.client.api.PulsarClientException.ProducerBusyException;
 import 
org.apache.pulsar.client.api.PulsarClientException.ProducerFencedException;
-import org.apache.pulsar.client.api.PulsarClientException.TimeoutException;
 import org.apache.pulsar.client.api.Schema;
 import org.apache.pulsar.client.impl.PulsarClientImpl;
 import org.apache.pulsar.common.naming.TopicName;
@@ -317,7 +316,6 @@ public class ExclusiveProducerTest extends BrokerTestBase {
     public void topicDeleted(String ignored, boolean partitioned) throws 
Exception {
         String topic = newTopic("persistent", partitioned);
 
-        @Cleanup
         Producer<String> p1 = pulsarClient.newProducer(Schema.STRING)
                 .topic(topic)
                 .accessMode(ProducerAccessMode.Exclusive)
@@ -331,14 +329,8 @@ public class ExclusiveProducerTest extends BrokerTestBase {
             admin.topics().delete(topic, true);
         }
 
-        if (!partitioned) {
-            // The producer should be able to publish again on the topic
-            p1.send("msg-2");
-        } else {
-            // The partitioned topic is deleted, the producer should not be 
able to publish again on the topic.
-            // Partitioned metadata is required to publish messages to the 
topic.
-            assertThrows(TimeoutException.class, () -> p1.send("msg-2"));
-        }
+        // The producer should be able to publish again on the topic
+        p1.send("msg-2");
     }
 
     @Test(dataProvider = "topics")
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicTest.java
index d64e36718b9..c18c2c5c5d6 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicTest.java
@@ -121,6 +121,7 @@ import org.apache.pulsar.common.api.proto.MessageMetadata;
 import org.apache.pulsar.common.api.proto.ProducerAccessMode;
 import org.apache.pulsar.common.naming.NamespaceBundle;
 import org.apache.pulsar.common.naming.TopicName;
+import org.apache.pulsar.common.partition.PartitionedTopicMetadata;
 import org.apache.pulsar.common.policies.data.Policies;
 import org.apache.pulsar.common.policies.data.TopicPolicies;
 import org.apache.pulsar.common.policies.data.stats.SubscriptionStatsImpl;
@@ -135,7 +136,7 @@ import org.apache.pulsar.compaction.Compactor;
 import org.apache.pulsar.compaction.CompactorMXBean;
 import org.apache.pulsar.compaction.PulsarCompactionServiceFactory;
 import org.apache.pulsar.metadata.api.MetadataStoreException;
-import 
org.apache.pulsar.metadata.impl.FaultInjectionMetadataStore.OperationType;
+import org.apache.pulsar.metadata.impl.FaultInjectionMetadataStore;
 import org.awaitility.Awaitility;
 import org.mockito.ArgumentCaptor;
 import org.mockito.Mockito;
@@ -1468,6 +1469,8 @@ public class PersistentTopicTest extends 
MockedBookKeeperTestCase {
         
doReturn(CompletableFuture.completedFuture(null)).when(ledgerMock).asyncTruncate();
 
         // create topic
+        
brokerService.pulsar().getPulsarResources().getNamespaceResources().getPartitionedTopicResources()
+                .createPartitionedTopic(TopicName.get(successTopicName), new 
PartitionedTopicMetadata(2));
         PersistentTopic topic = (PersistentTopic) 
brokerService.getOrCreateTopic(successTopicName).get();
 
         Field isFencedField = AbstractTopic.class.getDeclaredField("isFenced");
@@ -1479,7 +1482,8 @@ public class PersistentTopicTest extends 
MockedBookKeeperTestCase {
         assertFalse((boolean) isClosingOrDeletingField.get(topic));
 
         metadataStore.failConditional(new MetadataStoreException("injected 
error"), (op, path) ->
-                op == OperationType.EXISTS && 
path.equals("/admin/flags/policies-readonly"));
+                op == FaultInjectionMetadataStore.OperationType.PUT &&
+                        
path.equals("/admin/partitioned-topics/prop/use/ns-abc/persistent/successTopic"));
         try {
             topic.delete().get();
             fail();
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorTest.java
index 319d9e48453..044b36992a9 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorTest.java
@@ -66,7 +66,7 @@ import org.apache.commons.lang3.RandomUtils;
 import org.apache.pulsar.broker.BrokerTestUtil;
 import org.apache.pulsar.broker.PulsarService;
 import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest;
-import 
org.apache.pulsar.broker.service.BrokerServiceException.NotAllowedException;
+import org.apache.pulsar.broker.service.BrokerServiceException.NamingException;
 import org.apache.pulsar.broker.service.persistent.PersistentReplicator;
 import org.apache.pulsar.broker.service.persistent.PersistentTopic;
 import org.apache.pulsar.client.admin.PulsarAdmin;
@@ -1189,7 +1189,7 @@ public class ReplicatorTest extends ReplicatorTestBase {
             if (!isPartitionedTopic) {
                 fail("Topic creation should not fail without any partitioned 
topic");
             }
-            assertTrue(e.getCause() instanceof NotAllowedException);
+            assertTrue(e.getCause() instanceof NamingException);
         }
 
         // non-persistent topic test
@@ -1202,7 +1202,7 @@ public class ReplicatorTest extends ReplicatorTestBase {
             if (!isPartitionedTopic) {
                 fail("Topic creation should not fail without any partitioned 
topic");
             }
-            assertTrue(e.getCause() instanceof NotAllowedException);
+            assertTrue(e.getCause() instanceof NamingException);
         }
 
     }
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopicTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopicTest.java
index 468cd3e4109..e2aec70fb11 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopicTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopicTest.java
@@ -18,10 +18,6 @@
  */
 package org.apache.pulsar.broker.service.nonpersistent;
 
-import static org.testng.Assert.assertEquals;
-import static org.testng.Assert.assertNotNull;
-import static org.testng.Assert.assertThrows;
-import static org.testng.Assert.assertTrue;
 import java.lang.reflect.Field;
 import java.util.Optional;
 import java.util.UUID;
@@ -48,6 +44,11 @@ import org.testng.annotations.AfterMethod;
 import org.testng.annotations.BeforeMethod;
 import org.testng.annotations.Test;
 
+import static org.testng.Assert.assertTrue;
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertNotNull;
+import static org.testng.Assert.fail;
+
 @Test(groups = "broker")
 public class NonPersistentTopicTest extends BrokerTestBase {
 
@@ -113,16 +114,19 @@ public class NonPersistentTopicTest extends 
BrokerTestBase {
     }
 
     @Test
-    public void testCreateNonExistentPartitions() throws PulsarAdminException {
+    public void testCreateNonExistentPartitions() throws PulsarAdminException, 
PulsarClientException {
         final String topicName = 
"non-persistent://prop/ns-abc/testCreateNonExistentPartitions";
         admin.topics().createPartitionedTopic(topicName, 4);
         TopicName partition = TopicName.get(topicName).getPartition(4);
-        assertThrows(PulsarClientException.NotAllowedException.class, () -> {
+        try {
             @Cleanup
-            Producer<byte[]> ignored = pulsarClient.newProducer()
+            Producer<byte[]> producer = pulsarClient.newProducer()
                     .topic(partition.toString())
                     .create();
-        });
+            fail("unexpected behaviour");
+        } catch (PulsarClientException.TopicDoesNotExistException ignored) {
+
+        }
         
assertEquals(admin.topics().getPartitionedTopicMetadata(topicName).partitions, 
4);
     }
 
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentTopicTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentTopicTest.java
index 12fe82fc954..6bbd598b454 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentTopicTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentTopicTest.java
@@ -18,6 +18,8 @@
  */
 package org.apache.pulsar.broker.service.persistent;
 
+import com.google.common.collect.Multimap;
+import com.google.common.collect.Sets;
 import static 
org.apache.pulsar.broker.stats.prometheus.PrometheusMetricsClient.parseMetrics;
 import static org.mockito.ArgumentMatchers.any;
 import static org.mockito.ArgumentMatchers.anyInt;
@@ -34,8 +36,6 @@ import static org.testng.Assert.assertNotNull;
 import static org.testng.Assert.assertNull;
 import static org.testng.Assert.assertTrue;
 import static org.testng.Assert.fail;
-import com.google.common.collect.Multimap;
-import com.google.common.collect.Sets;
 import java.io.ByteArrayOutputStream;
 import java.lang.reflect.Field;
 import java.nio.charset.StandardCharsets;
@@ -577,6 +577,37 @@ public class PersistentTopicTest extends BrokerTestBase {
         
Assert.assertEquals(admin.topics().getPartitionedTopicMetadata(topicName).partitions,
 4);
     }
 
+    @Test
+    public void testCompatibilityWithPartitionKeyword() throws 
PulsarAdminException, PulsarClientException {
+        final String topicName = 
"persistent://prop/ns-abc/testCompatibilityWithPartitionKeyword";
+        TopicName topicNameEntity = TopicName.get(topicName);
+        String partition2 = topicNameEntity.getPartition(2).toString();
+        // Create a non-partitioned topic with -partition- keyword
+        Producer<byte[]> producer = pulsarClient.newProducer()
+                .topic(partition2)
+                .create();
+        List<String> topics = admin.topics().getList("prop/ns-abc");
+        // Close previous producer to simulate reconnect
+        producer.close();
+        // Disable auto topic creation
+        conf.setAllowAutoTopicCreation(false);
+        // Check the topic exist in the list.
+        Assert.assertTrue(topics.contains(partition2));
+        // Check this topic has no partition metadata.
+        Assert.assertThrows(PulsarAdminException.NotFoundException.class,
+                () -> admin.topics().getPartitionedTopicMetadata(topicName));
+        // Reconnect to the broker and expect successful because the topic has 
existed in the broker.
+        producer = pulsarClient.newProducer()
+                .topic(partition2)
+                .create();
+        producer.close();
+        // Check the topic exist in the list again.
+        Assert.assertTrue(topics.contains(partition2));
+        // Check this topic has no partition metadata again.
+        Assert.assertThrows(PulsarAdminException.NotFoundException.class,
+                () -> admin.topics().getPartitionedTopicMetadata(topicName));
+    }
+
     @Test
     public void testDeleteTopicFail() throws Exception {
         final String fullyTopicName = "persistent://prop/ns-abc/" + "tp_"
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/ConsumerCreationTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/ConsumerCreationTest.java
deleted file mode 100644
index a81dbe02b34..00000000000
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/ConsumerCreationTest.java
+++ /dev/null
@@ -1,127 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.pulsar.client.api;
-
-import static org.testng.Assert.assertThrows;
-import lombok.Cleanup;
-import org.apache.pulsar.client.admin.PulsarAdminException;
-import org.apache.pulsar.client.api.PulsarClientException.NotAllowedException;
-import org.apache.pulsar.common.naming.TopicDomain;
-import org.apache.pulsar.common.naming.TopicName;
-import org.testng.annotations.AfterMethod;
-import org.testng.annotations.BeforeMethod;
-import org.testng.annotations.DataProvider;
-import org.testng.annotations.Test;
-
-@Test(groups = "broker-api")
-public class ConsumerCreationTest extends ProducerConsumerBase {
-
-    @BeforeMethod
-    @Override
-    protected void setup() throws Exception {
-        super.internalSetup();
-        super.producerBaseSetup();
-    }
-
-    @AfterMethod(alwaysRun = true)
-    @Override
-    protected void cleanup() throws Exception {
-        super.internalCleanup();
-    }
-
-    @DataProvider(name = "topicDomainProvider")
-    public Object[][] topicDomainProvider() {
-        return new Object[][]{
-                {TopicDomain.persistent},
-                {TopicDomain.non_persistent}
-        };
-    }
-
-    @Test(dataProvider = "topicDomainProvider")
-    public void testCreateConsumerWhenTopicTypeMismatch(TopicDomain domain)
-            throws PulsarAdminException, PulsarClientException {
-        String nonPartitionedTopic =
-                TopicName.get(domain.value(), "public", "default",
-                                
"testCreateConsumerWhenTopicTypeMismatch-nonPartitionedTopic")
-                        .toString();
-        admin.topics().createNonPartitionedTopic(nonPartitionedTopic);
-
-        // Topic type is non-partitioned, Trying to create consumer on 
partitioned topic.
-        assertThrows(NotAllowedException.class, () -> {
-            @Cleanup
-            Consumer<byte[]> ignored =
-                    
pulsarClient.newConsumer().topic(TopicName.get(nonPartitionedTopic).getPartition(2).toString())
-                            .subscriptionName("my-sub").subscribe();
-        });
-
-        // Topic type is partitioned, Trying to create consumer on 
non-partitioned topic.
-        String partitionedTopic = TopicName.get(domain.value(), "public", 
"default",
-                        
"testCreateConsumerWhenTopicTypeMismatch-partitionedTopic")
-                .toString();
-        admin.topics().createPartitionedTopic(partitionedTopic, 3);
-
-        // Works fine because the lookup can help our to find the correct 
topic.
-        {
-            @Cleanup
-            Consumer<byte[]> ignored =
-                    
pulsarClient.newConsumer().topic(TopicName.get(partitionedTopic).getPartition(2).toString())
-                            .subscriptionName("my-sub").subscribe();
-        }
-
-        // Partition index is out of range.
-        assertThrows(NotAllowedException.class, () -> {
-            @Cleanup
-            Consumer<byte[]> ignored =
-                    
pulsarClient.newConsumer().topic(TopicName.get(partitionedTopic).getPartition(100).toString())
-                            .subscriptionName("my-sub").subscribe();
-        });
-    }
-
-    @Test(dataProvider = "topicDomainProvider")
-    public void testCreateConsumerWhenSinglePartitionIsDeleted(TopicDomain 
domain)
-            throws PulsarAdminException, PulsarClientException {
-        testCreateConsumerWhenSinglePartitionIsDeleted(domain, false);
-        testCreateConsumerWhenSinglePartitionIsDeleted(domain, true);
-    }
-
-    private void testCreateConsumerWhenSinglePartitionIsDeleted(TopicDomain 
domain, boolean allowAutoTopicCreation)
-            throws PulsarAdminException, PulsarClientException {
-        conf.setAllowAutoTopicCreation(allowAutoTopicCreation);
-
-        String partitionedTopic = TopicName.get(domain.value(), "public", 
"default",
-                        "testCreateConsumerWhenSinglePartitionIsDeleted-" + 
allowAutoTopicCreation)
-                .toString();
-        admin.topics().createPartitionedTopic(partitionedTopic, 3);
-        
admin.topics().delete(TopicName.get(partitionedTopic).getPartition(1).toString());
-
-        // Non-persistent topic only have the metadata, and no partition, so 
it works fine.
-        if (allowAutoTopicCreation || 
domain.equals(TopicDomain.non_persistent)) {
-            @Cleanup
-            Consumer<byte[]> ignored =
-                    
pulsarClient.newConsumer().topic(partitionedTopic).subscriptionName("my-sub").subscribe();
-        } else {
-            
assertThrows(PulsarClientException.TopicDoesNotExistException.class, () -> {
-                @Cleanup
-                Consumer<byte[]> ignored =
-                        
pulsarClient.newConsumer().topic(partitionedTopic).subscriptionName("my-sub").subscribe();
-            });
-        }
-    }
-}
\ No newline at end of file
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/ProducerCreationTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/ProducerCreationTest.java
index cd75383a487..d5734588288 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/ProducerCreationTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/ProducerCreationTest.java
@@ -18,11 +18,8 @@
  */
 package org.apache.pulsar.client.api;
 
-import static org.testng.Assert.assertThrows;
 import static org.testng.Assert.fail;
-import lombok.Cleanup;
 import org.apache.pulsar.client.admin.PulsarAdminException;
-import org.apache.pulsar.client.api.PulsarClientException.NotAllowedException;
 import org.apache.pulsar.client.impl.ProducerBuilderImpl;
 import org.apache.pulsar.client.impl.ProducerImpl;
 import org.apache.pulsar.common.naming.TopicDomain;
@@ -194,74 +191,4 @@ public class ProducerCreationTest extends 
ProducerConsumerBase {
 
         
Assert.assertFalse(admin.topics().getSubscriptions(topic.toString()).contains(initialSubscriptionName));
     }
-
-    @Test(dataProvider = "topicDomainProvider")
-    public void testCreateProducerWhenTopicTypeMismatch(TopicDomain domain)
-            throws PulsarAdminException, PulsarClientException {
-        String nonPartitionedTopic =
-                TopicName.get(domain.value(), "public", "default",
-                                
"testCreateProducerWhenTopicTypeMismatch-nonPartitionedTopic")
-                        .toString();
-        admin.topics().createNonPartitionedTopic(nonPartitionedTopic);
-
-        // Topic type is non-partitioned, trying to create producer on the 
complete partitioned topic.
-        // Should throw NotAllowedException.
-        assertThrows(NotAllowedException.class, () -> {
-            @Cleanup
-            Producer<byte[]> ignored =
-                    
pulsarClient.newProducer().topic(TopicName.get(nonPartitionedTopic).getPartition(2).toString())
-                            .create();
-        });
-
-        // Topic type is partitioned, trying to create producer on the base 
partitioned topic.
-        String partitionedTopic = TopicName.get(domain.value(), "public", 
"default",
-                        
"testCreateProducerWhenTopicTypeMismatch-partitionedTopic")
-                .toString();
-        admin.topics().createPartitionedTopic(partitionedTopic, 3);
-
-        // Works fine because the lookup can help our to find all the topics.
-        {
-            @Cleanup
-            Producer<byte[]> ignored =
-                    
pulsarClient.newProducer().topic(TopicName.get(partitionedTopic).getPartitionedTopicName())
-                            .create();
-        }
-
-        // Partition index is out of range.
-        assertThrows(NotAllowedException.class, () -> {
-            @Cleanup
-            Producer<byte[]> ignored =
-                    
pulsarClient.newProducer().topic(TopicName.get(partitionedTopic).getPartition(100).toString())
-                            .create();
-        });
-    }
-
-    @Test(dataProvider = "topicDomainProvider")
-    public void testCreateProducerWhenSinglePartitionIsDeleted(TopicDomain 
domain)
-            throws PulsarAdminException, PulsarClientException {
-        testCreateProducerWhenSinglePartitionIsDeleted(domain, false);
-        testCreateProducerWhenSinglePartitionIsDeleted(domain, true);
-    }
-
-    private void testCreateProducerWhenSinglePartitionIsDeleted(TopicDomain 
domain, boolean allowAutoTopicCreation)
-            throws PulsarAdminException, PulsarClientException {
-        conf.setAllowAutoTopicCreation(allowAutoTopicCreation);
-
-        String partitionedTopic = TopicName.get(domain.value(), "public", 
"default",
-                        "testCreateProducerWhenSinglePartitionIsDeleted-" + 
allowAutoTopicCreation)
-                .toString();
-        admin.topics().createPartitionedTopic(partitionedTopic, 3);
-        
admin.topics().delete(TopicName.get(partitionedTopic).getPartition(1).toString());
-
-        // Non-persistent topic only have the metadata, and no partition, so 
it works fine.
-        if (allowAutoTopicCreation || domain == TopicDomain.non_persistent) {
-            @Cleanup
-            Producer<byte[]> ignored = 
pulsarClient.newProducer().topic(partitionedTopic).create();
-        } else {
-            
assertThrows(PulsarClientException.TopicDoesNotExistException.class, () -> {
-                @Cleanup
-                Producer<byte[]> ignored = 
pulsarClient.newProducer().topic(partitionedTopic).create();
-            });
-        }
-    }
 }
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/LookupServiceTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/LookupServiceTest.java
index c4ef53b292b..59cb7ae03d0 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/LookupServiceTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/LookupServiceTest.java
@@ -19,19 +19,13 @@
 package org.apache.pulsar.client.impl;
 
 import static 
org.apache.pulsar.common.api.proto.CommandGetTopicsOfNamespace.Mode;
-import static org.assertj.core.api.Assertions.assertThat;
-import static org.testng.Assert.assertEquals;
 import static org.testng.Assert.assertFalse;
-import static org.testng.Assert.assertThrows;
 import static org.testng.Assert.assertTrue;
 import java.util.Collection;
-import java.util.concurrent.TimeUnit;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.pulsar.broker.BrokerTestUtil;
-import org.apache.pulsar.client.admin.PulsarAdminException;
 import org.apache.pulsar.client.api.ProducerConsumerBase;
 import org.apache.pulsar.client.api.PulsarClient;
-import org.apache.pulsar.client.api.PulsarClientException;
 import org.apache.pulsar.common.naming.NamespaceName;
 import org.apache.pulsar.common.naming.TopicName;
 import org.testng.annotations.AfterClass;
@@ -131,80 +125,4 @@ public class LookupServiceTest extends 
ProducerConsumerBase {
         admin.topics().delete(nonPartitionedTopic, false);
     }
 
-    @Test(dataProvider = "isUsingHttpLookup")
-    public void testGetPartitionedTopicMetadataByPulsarClient(boolean 
isUsingHttpLookup) throws PulsarAdminException {
-        LookupService lookupService = getLookupService(isUsingHttpLookup);
-
-        // metadataAutoCreationEnabled is true.
-        assertThat(lookupService.getPartitionedTopicMetadata(
-                
TopicName.get(BrokerTestUtil.newUniqueName("persistent://public/default/tp")), 
true))
-                .succeedsWithin(3, TimeUnit.SECONDS)
-                .matches(n -> n.partitions == 0);
-
-        // metadataAutoCreationEnabled is true.
-        // Allow the get the metadata of single partition topic, because the 
auto-creation is enabled.
-        // But the producer/consumer is unavailable because the topic doesn't 
have the metadata.
-        assertThat(lookupService.getPartitionedTopicMetadata(
-                
TopicName.get(BrokerTestUtil.newUniqueName("persistent://public/default/tp") + 
"-partition-10"),
-                true))
-                .succeedsWithin(3, TimeUnit.SECONDS)
-                .matches(n -> n.partitions == 0);
-
-        Class<? extends Throwable> expectedExceptionClass =
-                isUsingHttpLookup ? 
PulsarClientException.NotFoundException.class :
-                        PulsarClientException.TopicDoesNotExistException.class;
-        // metadataAutoCreationEnabled is false.
-        assertThat(lookupService.getPartitionedTopicMetadata(
-                
TopicName.get(BrokerTestUtil.newUniqueName("persistent://public/default/tp")), 
false))
-                .failsWithin(3, TimeUnit.SECONDS)
-                .withThrowableThat()
-                .withCauseInstanceOf(expectedExceptionClass);
-
-        // metadataAutoCreationEnabled is false.
-        assertThat(lookupService.getPartitionedTopicMetadata(
-                
TopicName.get(BrokerTestUtil.newUniqueName("persistent://public/default/tp") + 
"-partition-10"),
-                false))
-                .failsWithin(3, TimeUnit.SECONDS)
-                .withThrowableThat()
-                .withCauseInstanceOf(expectedExceptionClass);
-
-        // Verify the topic exists, and the metadataAutoCreationEnabled is 
false.
-        String nonPartitionedTopic = 
BrokerTestUtil.newUniqueName("persistent://public/default/tp");
-        admin.topics().createNonPartitionedTopic(nonPartitionedTopic);
-        
assertThat(lookupService.getPartitionedTopicMetadata(TopicName.get(nonPartitionedTopic),
 false))
-                .succeedsWithin(3, TimeUnit.SECONDS)
-                .matches(n -> n.partitions == 0);
-
-        String partitionedTopic = 
BrokerTestUtil.newUniqueName("persistent://public/default/tp");
-        String partitionedTopicWithPartitionIndex = partitionedTopic + 
"-partition-10";
-        admin.topics().createPartitionedTopic(partitionedTopic, 20);
-        
assertThat(lookupService.getPartitionedTopicMetadata(TopicName.get(partitionedTopic),
 false))
-                .succeedsWithin(3, TimeUnit.SECONDS)
-                .matches(n -> n.partitions == 20);
-        
assertThat(lookupService.getPartitionedTopicMetadata(TopicName.get(partitionedTopicWithPartitionIndex),
 false))
-                .succeedsWithin(3, TimeUnit.SECONDS)
-                .matches(n -> n.partitions == 0);
-    }
-
-    @Test
-    public void testGetPartitionedTopicMedataByAdmin() throws 
PulsarAdminException {
-        String nonPartitionedTopic = 
BrokerTestUtil.newUniqueName("persistent://public/default/tp");
-        String partitionedTopic = 
BrokerTestUtil.newUniqueName("persistent://public/default/tp");
-        String partitionedTopicWithPartitionIndex = partitionedTopic + 
"-partition-10";
-        // No topic, so throw the NotFound.
-        // BTW: The admin api doesn't allow to creat the metadata of topic 
default.
-        assertThrows(PulsarAdminException.NotFoundException.class, () -> 
admin.topics()
-                .getPartitionedTopicMetadata(nonPartitionedTopic));
-        assertThrows(PulsarAdminException.NotFoundException.class, () -> 
admin.topics()
-                .getPartitionedTopicMetadata(partitionedTopic));
-        assertThrows(PulsarAdminException.NotFoundException.class,
-                () -> 
admin.topics().getPartitionedTopicMetadata(partitionedTopicWithPartitionIndex));
-
-        admin.topics().createNonPartitionedTopic(nonPartitionedTopic);
-        
assertEquals(admin.topics().getPartitionedTopicMetadata(nonPartitionedTopic).partitions,
 0);
-
-        admin.topics().createPartitionedTopic(partitionedTopic, 20);
-        
assertEquals(admin.topics().getPartitionedTopicMetadata(partitionedTopic).partitions,
 20);
-        
assertEquals(admin.topics().getPartitionedTopicMetadata(partitionedTopicWithPartitionIndex).partitions,
 0);
-    }
 }
diff --git 
a/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/PulsarClientException.java
 
b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/PulsarClientException.java
index 023ac1263b7..5bb88bcbd0a 100644
--- 
a/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/PulsarClientException.java
+++ 
b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/PulsarClientException.java
@@ -992,9 +992,7 @@ public class PulsarClientException extends IOException {
     public static Throwable wrap(Throwable t, String msg) {
         msg += "\n" + t.getMessage();
         // wrap an exception with new message info
-        if (t instanceof TopicDoesNotExistException) {
-            return new TopicDoesNotExistException(msg);
-        } else if (t instanceof TimeoutException) {
+        if (t instanceof TimeoutException) {
             return new TimeoutException(msg);
         } else if (t instanceof InvalidConfigurationException) {
             return new InvalidConfigurationException(msg);

Reply via email to