This is an automated email from the ASF dual-hosted git repository.

yubiao pushed a commit to branch branch-3.0
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/branch-3.0 by this push:
     new 6b84514bfab [fix][broker] Fix namespace bundle stuck in unloading 
status (#21445) (#21565)
6b84514bfab is described below

commit 6b84514bfabcbe1d07d67c8a734fb238ce01e89c
Author: Qiang Zhao <[email protected]>
AuthorDate: Tue Nov 14 21:46:06 2023 +0800

    [fix][broker] Fix namespace bundle stuck in unloading status (#21445) 
(#21565)
    
    Cherry-pick #21445 to branch-3.0
---
 .../pulsar/broker/service/BrokerService.java       | 372 +++++++++++----------
 .../SystemTopicBasedTopicPoliciesService.java      |  19 +-
 .../pulsar/broker/admin/TopicAutoCreationTest.java |   8 +-
 .../broker/namespace/NamespaceUnloadingTest.java   |  29 ++
 .../pulsar/broker/service/BrokerServiceTest.java   |   2 +-
 .../SystemTopicBasedTopicPoliciesServiceTest.java  |  29 --
 .../pulsar/client/cli/PulsarClientToolTest.java    |   3 +
 7 files changed, 247 insertions(+), 215 deletions(-)

diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
index 2d51ef6ef30..4d02e7e8773 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
@@ -49,6 +49,7 @@ import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
+import java.util.Objects;
 import java.util.Optional;
 import java.util.Set;
 import java.util.concurrent.CancellationException;
@@ -70,6 +71,7 @@ import java.util.concurrent.locks.ReentrantReadWriteLock;
 import java.util.function.Consumer;
 import java.util.function.Predicate;
 import javax.annotation.Nonnull;
+import javax.annotation.Nullable;
 import lombok.AccessLevel;
 import lombok.AllArgsConstructor;
 import lombok.Getter;
@@ -1051,19 +1053,32 @@ public class BrokerService implements Closeable {
             }
             final boolean isPersistentTopic = 
topicName.getDomain().equals(TopicDomain.persistent);
             if (isPersistentTopic) {
-                return topics.computeIfAbsent(topicName.toString(), (tpName) 
-> {
-                    if (topicName.isPartitioned()) {
-                        return 
fetchPartitionedTopicMetadataAsync(TopicName.get(topicName.getPartitionedTopicName()))
-                                .thenCompose((metadata) -> {
-                                    // Allow crate non-partitioned persistent 
topic that name includes `partition`
-                                    if (metadata.partitions == 0
-                                            || topicName.getPartitionIndex() < 
metadata.partitions) {
-                                        return 
loadOrCreatePersistentTopic(tpName, createIfMissing, properties);
-                                    }
-                                    return 
CompletableFuture.completedFuture(Optional.empty());
-                                });
-                    }
-                    return loadOrCreatePersistentTopic(tpName, 
createIfMissing, properties);
+                final CompletableFuture<Optional<TopicPolicies>> 
topicPoliciesFuture =
+                        getTopicPoliciesBypassSystemTopic(topicName);
+                return topicPoliciesFuture.exceptionally(ex -> {
+                    final Throwable rc = 
FutureUtil.unwrapCompletionException(ex);
+                    final String errorInfo = String.format("Topic creation 
encountered an exception by initialize"
+                            + " topic policies service. topic_name=%s 
error_message=%s", topicName, rc.getMessage());
+                    log.error(errorInfo, rc);
+                    throw FutureUtil.wrapToCompletionException(new 
ServiceUnitNotReadyException(errorInfo));
+                }).thenCompose(optionalTopicPolicies -> {
+                    final TopicPolicies topicPolicies = 
optionalTopicPolicies.orElse(null);
+                    return topics.computeIfAbsent(topicName.toString(), 
(tpName) -> {
+                        if (topicName.isPartitioned()) {
+                            final TopicName topicNameEntity = 
TopicName.get(topicName.getPartitionedTopicName());
+                            return 
fetchPartitionedTopicMetadataAsync(topicNameEntity)
+                                    .thenCompose((metadata) -> {
+                                        // Allow crate non-partitioned 
persistent topic that name includes `partition`
+                                        if (metadata.partitions == 0
+                                                || 
topicName.getPartitionIndex() < metadata.partitions) {
+                                            return 
loadOrCreatePersistentTopic(tpName, createIfMissing,
+                                                    properties, topicPolicies);
+                                        }
+                                        return 
CompletableFuture.completedFuture(Optional.empty());
+                                    });
+                        }
+                        return loadOrCreatePersistentTopic(tpName, 
createIfMissing, properties, topicPolicies);
+                    });
                 });
             } else {
                 return topics.computeIfAbsent(topicName.toString(), (name) -> {
@@ -1117,6 +1132,18 @@ public class BrokerService implements Closeable {
         }
     }
 
+    private CompletableFuture<Optional<TopicPolicies>> 
getTopicPoliciesBypassSystemTopic(@Nonnull TopicName topicName) {
+        Objects.requireNonNull(topicName);
+        final ServiceConfiguration serviceConfiguration = 
pulsar.getConfiguration();
+        if (serviceConfiguration.isSystemTopicEnabled() && 
serviceConfiguration.isTopicLevelPoliciesEnabled()
+                && 
!NamespaceService.isSystemServiceNamespace(topicName.getNamespace())
+                && 
!SystemTopicNames.isTopicPoliciesSystemTopic(topicName.toString())) {
+            return 
pulsar.getTopicPoliciesService().getTopicPoliciesAsync(topicName);
+        } else {
+            return CompletableFuture.completedFuture(Optional.empty());
+        }
+    }
+
     public CompletableFuture<Void> deleteTopic(String topic, boolean 
forceDelete) {
         topicEventsDispatcher.notify(topic, TopicEvent.DELETE, 
EventStage.BEFORE);
         CompletableFuture<Void> result =  deleteTopicInternal(topic, 
forceDelete);
@@ -1521,7 +1548,7 @@ public class BrokerService implements Closeable {
      * @throws RuntimeException
      */
     protected CompletableFuture<Optional<Topic>> 
loadOrCreatePersistentTopic(final String topic,
-            boolean createIfMissing, Map<String, String> properties) throws 
RuntimeException {
+            boolean createIfMissing, Map<String, String> properties, @Nullable 
TopicPolicies topicPolicies) {
         final CompletableFuture<Optional<Topic>> topicFuture = 
FutureUtil.createFutureWithTimeout(
                 
Duration.ofSeconds(pulsar.getConfiguration().getTopicLoadTimeoutSeconds()), 
executor(),
                 () -> FAILED_TO_LOAD_TOPIC_TIMEOUT_EXCEPTION);
@@ -1539,7 +1566,8 @@ public class BrokerService implements Closeable {
                     final Semaphore topicLoadSemaphore = 
topicLoadRequestSemaphore.get();
 
                     if (topicLoadSemaphore.tryAcquire()) {
-                        checkOwnershipAndCreatePersistentTopic(topic, 
createIfMissing, topicFuture, properties);
+                        checkOwnershipAndCreatePersistentTopic(topic, 
createIfMissing, topicFuture,
+                                properties, topicPolicies);
                         topicFuture.handle((persistentTopic, ex) -> {
                             // release permit and process pending topic
                             topicLoadSemaphore.release();
@@ -1547,7 +1575,8 @@ public class BrokerService implements Closeable {
                             return null;
                         });
                     } else {
-                        pendingTopicLoadingQueue.add(new 
TopicLoadingContext(topic, topicFuture, properties));
+                        pendingTopicLoadingQueue.add(new 
TopicLoadingContext(topic,
+                                topicFuture, properties, topicPolicies));
                         if (log.isDebugEnabled()) {
                             log.debug("topic-loading for {} added into pending 
queue", topic);
                         }
@@ -1588,7 +1617,7 @@ public class BrokerService implements Closeable {
 
     private void checkOwnershipAndCreatePersistentTopic(final String topic, 
boolean createIfMissing,
                                        CompletableFuture<Optional<Topic>> 
topicFuture,
-                                       Map<String, String> properties) {
+                                       Map<String, String> properties, 
@Nullable TopicPolicies topicPolicies) {
         TopicName topicName = TopicName.get(topic);
         pulsar.getNamespaceService().isServiceUnitActiveAsync(topicName)
                 .thenAccept(isActive -> {
@@ -1602,7 +1631,8 @@ public class BrokerService implements Closeable {
                         }
                         propertiesFuture.thenAccept(finalProperties ->
                                 //TODO add topicName in properties?
-                                createPersistentTopic(topic, createIfMissing, 
topicFuture, finalProperties)
+                                createPersistentTopic(topic, createIfMissing, 
topicFuture,
+                                        finalProperties, topicPolicies)
                         ).exceptionally(throwable -> {
                             log.warn("[{}] Read topic property failed", topic, 
throwable);
                             pulsar.getExecutor().execute(() -> 
topics.remove(topic, topicFuture));
@@ -1627,12 +1657,12 @@ public class BrokerService implements Closeable {
     public void createPersistentTopic0(final String topic, boolean 
createIfMissing,
                                        CompletableFuture<Optional<Topic>> 
topicFuture,
                                        Map<String, String> properties) {
-        createPersistentTopic(topic, createIfMissing, topicFuture, properties);
+        createPersistentTopic(topic, createIfMissing, topicFuture, properties, 
null);
     }
 
     private void createPersistentTopic(final String topic, boolean 
createIfMissing,
                                        CompletableFuture<Optional<Topic>> 
topicFuture,
-                                       Map<String, String> properties) {
+                                       Map<String, String> properties, 
@Nullable TopicPolicies topicPolicies) {
         TopicName topicName = TopicName.get(topic);
         final long topicCreateTimeMs = 
TimeUnit.NANOSECONDS.toMillis(System.nanoTime());
 
@@ -1648,7 +1678,8 @@ public class BrokerService implements Closeable {
                 ? checkMaxTopicsPerNamespace(topicName, 1)
                 : CompletableFuture.completedFuture(null);
 
-        maxTopicsCheck.thenCompose(__ -> 
getManagedLedgerConfig(topicName)).thenAccept(managedLedgerConfig -> {
+        maxTopicsCheck.thenCompose(__ ->
+                        getManagedLedgerConfig(topicName, 
topicPolicies)).thenAccept(managedLedgerConfig -> {
             if (isBrokerEntryMetadataEnabled() || 
isBrokerPayloadProcessorEnabled()) {
                 // init managedLedger interceptor
                 Set<BrokerEntryMetadataInterceptor> interceptors = new 
HashSet<>();
@@ -1774,170 +1805,167 @@ public class BrokerService implements Closeable {
     }
 
     public CompletableFuture<ManagedLedgerConfig> 
getManagedLedgerConfig(@Nonnull TopicName topicName) {
+        final CompletableFuture<Optional<TopicPolicies>> topicPoliciesFuture =
+                getTopicPoliciesBypassSystemTopic(topicName);
+        return topicPoliciesFuture.thenCompose(optionalTopicPolicies ->
+                getManagedLedgerConfig(topicName, 
optionalTopicPolicies.orElse(null)));
+    }
+
+    private CompletableFuture<ManagedLedgerConfig> 
getManagedLedgerConfig(@Nonnull TopicName topicName,
+                                                                          
@Nullable TopicPolicies topicPolicies) {
         requireNonNull(topicName);
         NamespaceName namespace = topicName.getNamespaceObject();
         ServiceConfiguration serviceConfig = pulsar.getConfiguration();
 
         NamespaceResources nsr = 
pulsar.getPulsarResources().getNamespaceResources();
         LocalPoliciesResources lpr = 
pulsar.getPulsarResources().getLocalPolicies();
-        final CompletableFuture<Optional<TopicPolicies>> topicPoliciesFuture;
-        if (pulsar.getConfig().isTopicLevelPoliciesEnabled()
-            && !NamespaceService.isSystemServiceNamespace(namespace.toString())
-            && 
!SystemTopicNames.isTopicPoliciesSystemTopic(topicName.toString())) {
-            topicPoliciesFuture = 
pulsar.getTopicPoliciesService().getTopicPoliciesAsync(topicName);
-        } else {
-            topicPoliciesFuture = 
CompletableFuture.completedFuture(Optional.empty());
-        }
-        return topicPoliciesFuture.thenCompose(topicPoliciesOptional -> {
-            final CompletableFuture<Optional<Policies>> nsPolicies = 
nsr.getPoliciesAsync(namespace);
-            final CompletableFuture<Optional<LocalPolicies>> lcPolicies = 
lpr.getLocalPoliciesAsync(namespace);
-            return nsPolicies.thenCombine(lcPolicies, (policies, 
localPolicies) -> {
-                PersistencePolicies persistencePolicies = null;
-                RetentionPolicies retentionPolicies = null;
-                OffloadPoliciesImpl topicLevelOffloadPolicies = null;
-                if (topicPoliciesOptional.isPresent()) {
-                    final TopicPolicies topicPolicies = 
topicPoliciesOptional.get();
-                    persistencePolicies = topicPolicies.getPersistence();
-                    retentionPolicies = topicPolicies.getRetentionPolicies();
-                    topicLevelOffloadPolicies = 
topicPolicies.getOffloadPolicies();
-                }
-
-                if (persistencePolicies == null) {
-                    persistencePolicies = policies.map(p -> 
p.persistence).orElseGet(
-                            () -> new 
PersistencePolicies(serviceConfig.getManagedLedgerDefaultEnsembleSize(),
-                                    
serviceConfig.getManagedLedgerDefaultWriteQuorum(),
-                                    
serviceConfig.getManagedLedgerDefaultAckQuorum(),
-                                    
serviceConfig.getManagedLedgerDefaultMarkDeleteRateLimit()));
-                }
+        final CompletableFuture<Optional<Policies>> nsPolicies = 
nsr.getPoliciesAsync(namespace);
+        final CompletableFuture<Optional<LocalPolicies>> lcPolicies = 
lpr.getLocalPoliciesAsync(namespace);
+        return nsPolicies.thenCombine(lcPolicies, (policies, localPolicies) -> 
{
+            PersistencePolicies persistencePolicies = null;
+            RetentionPolicies retentionPolicies = null;
+            OffloadPoliciesImpl topicLevelOffloadPolicies = null;
+            if (topicPolicies != null) {
+                persistencePolicies = topicPolicies.getPersistence();
+                retentionPolicies = topicPolicies.getRetentionPolicies();
+                topicLevelOffloadPolicies = topicPolicies.getOffloadPolicies();
+            }
 
-                if (retentionPolicies == null) {
-                    retentionPolicies = policies.map(p -> 
p.retention_policies).orElseGet(
-                            () -> new 
RetentionPolicies(serviceConfig.getDefaultRetentionTimeInMinutes(),
-                                    
serviceConfig.getDefaultRetentionSizeInMB())
-                    );
-                }
+            if (persistencePolicies == null) {
+                persistencePolicies = policies.map(p -> 
p.persistence).orElseGet(
+                        () -> new 
PersistencePolicies(serviceConfig.getManagedLedgerDefaultEnsembleSize(),
+                                
serviceConfig.getManagedLedgerDefaultWriteQuorum(),
+                                
serviceConfig.getManagedLedgerDefaultAckQuorum(),
+                                
serviceConfig.getManagedLedgerDefaultMarkDeleteRateLimit()));
+            }
 
-                ManagedLedgerConfig managedLedgerConfig = new 
ManagedLedgerConfig();
-                
managedLedgerConfig.setEnsembleSize(persistencePolicies.getBookkeeperEnsemble());
-                
managedLedgerConfig.setWriteQuorumSize(persistencePolicies.getBookkeeperWriteQuorum());
-                
managedLedgerConfig.setAckQuorumSize(persistencePolicies.getBookkeeperAckQuorum());
+            if (retentionPolicies == null) {
+                retentionPolicies = policies.map(p -> 
p.retention_policies).orElseGet(
+                        () -> new 
RetentionPolicies(serviceConfig.getDefaultRetentionTimeInMinutes(),
+                                serviceConfig.getDefaultRetentionSizeInMB())
+                );
+            }
 
-                if (serviceConfig.isStrictBookieAffinityEnabled()) {
+            ManagedLedgerConfig managedLedgerConfig = new 
ManagedLedgerConfig();
+            
managedLedgerConfig.setEnsembleSize(persistencePolicies.getBookkeeperEnsemble());
+            
managedLedgerConfig.setWriteQuorumSize(persistencePolicies.getBookkeeperWriteQuorum());
+            
managedLedgerConfig.setAckQuorumSize(persistencePolicies.getBookkeeperAckQuorum());
+
+            if (serviceConfig.isStrictBookieAffinityEnabled()) {
+                
managedLedgerConfig.setBookKeeperEnsemblePlacementPolicyClassName(
+                        IsolatedBookieEnsemblePlacementPolicy.class);
+                if (localPolicies.isPresent() && 
localPolicies.get().bookieAffinityGroup != null) {
+                    Map<String, Object> properties = new HashMap<>();
+                    
properties.put(IsolatedBookieEnsemblePlacementPolicy.ISOLATION_BOOKIE_GROUPS,
+                            
localPolicies.get().bookieAffinityGroup.getBookkeeperAffinityGroupPrimary());
+                    
properties.put(IsolatedBookieEnsemblePlacementPolicy.SECONDARY_ISOLATION_BOOKIE_GROUPS,
+                            
localPolicies.get().bookieAffinityGroup.getBookkeeperAffinityGroupSecondary());
+                    
managedLedgerConfig.setBookKeeperEnsemblePlacementPolicyProperties(properties);
+                } else if (isSystemTopic(topicName)) {
+                    Map<String, Object> properties = new HashMap<>();
+                    
properties.put(IsolatedBookieEnsemblePlacementPolicy.ISOLATION_BOOKIE_GROUPS, 
"*");
+                    properties.put(IsolatedBookieEnsemblePlacementPolicy
+                            .SECONDARY_ISOLATION_BOOKIE_GROUPS, "*");
+                    
managedLedgerConfig.setBookKeeperEnsemblePlacementPolicyProperties(properties);
+                } else {
+                    Map<String, Object> properties = new HashMap<>();
+                    
properties.put(IsolatedBookieEnsemblePlacementPolicy.ISOLATION_BOOKIE_GROUPS, 
"");
+                    
properties.put(IsolatedBookieEnsemblePlacementPolicy.SECONDARY_ISOLATION_BOOKIE_GROUPS,
 "");
+                    
managedLedgerConfig.setBookKeeperEnsemblePlacementPolicyProperties(properties);
+                }
+            } else {
+                if (localPolicies.isPresent() && 
localPolicies.get().bookieAffinityGroup != null) {
                     
managedLedgerConfig.setBookKeeperEnsemblePlacementPolicyClassName(
                             IsolatedBookieEnsemblePlacementPolicy.class);
-                    if (localPolicies.isPresent() && 
localPolicies.get().bookieAffinityGroup != null) {
-                        Map<String, Object> properties = new HashMap<>();
-                        
properties.put(IsolatedBookieEnsemblePlacementPolicy.ISOLATION_BOOKIE_GROUPS,
-                                
localPolicies.get().bookieAffinityGroup.getBookkeeperAffinityGroupPrimary());
-                        
properties.put(IsolatedBookieEnsemblePlacementPolicy.SECONDARY_ISOLATION_BOOKIE_GROUPS,
-                                
localPolicies.get().bookieAffinityGroup.getBookkeeperAffinityGroupSecondary());
-                        
managedLedgerConfig.setBookKeeperEnsemblePlacementPolicyProperties(properties);
-                    } else if (isSystemTopic(topicName)) {
-                        Map<String, Object> properties = new HashMap<>();
-                        
properties.put(IsolatedBookieEnsemblePlacementPolicy.ISOLATION_BOOKIE_GROUPS, 
"*");
-                        properties.put(IsolatedBookieEnsemblePlacementPolicy
-                                .SECONDARY_ISOLATION_BOOKIE_GROUPS, "*");
-                        
managedLedgerConfig.setBookKeeperEnsemblePlacementPolicyProperties(properties);
-                    } else {
-                        Map<String, Object> properties = new HashMap<>();
-                        
properties.put(IsolatedBookieEnsemblePlacementPolicy.ISOLATION_BOOKIE_GROUPS, 
"");
-                        
properties.put(IsolatedBookieEnsemblePlacementPolicy.SECONDARY_ISOLATION_BOOKIE_GROUPS,
 "");
-                        
managedLedgerConfig.setBookKeeperEnsemblePlacementPolicyProperties(properties);
-                    }
-                } else {
-                    if (localPolicies.isPresent() && 
localPolicies.get().bookieAffinityGroup != null) {
-                        
managedLedgerConfig.setBookKeeperEnsemblePlacementPolicyClassName(
-                                IsolatedBookieEnsemblePlacementPolicy.class);
-                        Map<String, Object> properties = new HashMap<>();
-                        
properties.put(IsolatedBookieEnsemblePlacementPolicy.ISOLATION_BOOKIE_GROUPS,
-                                
localPolicies.get().bookieAffinityGroup.getBookkeeperAffinityGroupPrimary());
-                        
properties.put(IsolatedBookieEnsemblePlacementPolicy.SECONDARY_ISOLATION_BOOKIE_GROUPS,
-                                
localPolicies.get().bookieAffinityGroup.getBookkeeperAffinityGroupSecondary());
-                        
managedLedgerConfig.setBookKeeperEnsemblePlacementPolicyProperties(properties);
-                    }
+                    Map<String, Object> properties = new HashMap<>();
+                    
properties.put(IsolatedBookieEnsemblePlacementPolicy.ISOLATION_BOOKIE_GROUPS,
+                            
localPolicies.get().bookieAffinityGroup.getBookkeeperAffinityGroupPrimary());
+                    
properties.put(IsolatedBookieEnsemblePlacementPolicy.SECONDARY_ISOLATION_BOOKIE_GROUPS,
+                            
localPolicies.get().bookieAffinityGroup.getBookkeeperAffinityGroupSecondary());
+                    
managedLedgerConfig.setBookKeeperEnsemblePlacementPolicyProperties(properties);
                 }
+            }
 
-                
managedLedgerConfig.setThrottleMarkDelete(persistencePolicies.getManagedLedgerMaxMarkDeleteRate());
-                
managedLedgerConfig.setDigestType(serviceConfig.getManagedLedgerDigestType());
-                
managedLedgerConfig.setPassword(serviceConfig.getManagedLedgerPassword());
-
-                managedLedgerConfig
-                        
.setMaxUnackedRangesToPersist(serviceConfig.getManagedLedgerMaxUnackedRangesToPersist());
-                
managedLedgerConfig.setPersistentUnackedRangesWithMultipleEntriesEnabled(
-                        
serviceConfig.isPersistentUnackedRangesWithMultipleEntriesEnabled());
-                
managedLedgerConfig.setMaxUnackedRangesToPersistInMetadataStore(
-                        
serviceConfig.getManagedLedgerMaxUnackedRangesToPersistInMetadataStore());
-                
managedLedgerConfig.setMaxEntriesPerLedger(serviceConfig.getManagedLedgerMaxEntriesPerLedger());
-                managedLedgerConfig
-                        
.setMinimumRolloverTime(serviceConfig.getManagedLedgerMinLedgerRolloverTimeMinutes(),
-                                TimeUnit.MINUTES);
-                managedLedgerConfig
-                        
.setMaximumRolloverTime(serviceConfig.getManagedLedgerMaxLedgerRolloverTimeMinutes(),
-                                TimeUnit.MINUTES);
-                
managedLedgerConfig.setMaxSizePerLedgerMb(serviceConfig.getManagedLedgerMaxSizePerLedgerMbytes());
-
-                managedLedgerConfig.setMetadataOperationsTimeoutSeconds(
-                        
serviceConfig.getManagedLedgerMetadataOperationsTimeoutSeconds());
-                managedLedgerConfig
-                        
.setReadEntryTimeoutSeconds(serviceConfig.getManagedLedgerReadEntryTimeoutSeconds());
-                managedLedgerConfig
-                        
.setAddEntryTimeoutSeconds(serviceConfig.getManagedLedgerAddEntryTimeoutSeconds());
-                
managedLedgerConfig.setMetadataEnsembleSize(serviceConfig.getManagedLedgerDefaultEnsembleSize());
-                managedLedgerConfig.setUnackedRangesOpenCacheSetEnabled(
-                        
serviceConfig.isManagedLedgerUnackedRangesOpenCacheSetEnabled());
-                
managedLedgerConfig.setMetadataWriteQuorumSize(serviceConfig.getManagedLedgerDefaultWriteQuorum());
-                
managedLedgerConfig.setMetadataAckQuorumSize(serviceConfig.getManagedLedgerDefaultAckQuorum());
-                managedLedgerConfig
-                        
.setMetadataMaxEntriesPerLedger(serviceConfig.getManagedLedgerCursorMaxEntriesPerLedger());
-
-                managedLedgerConfig
-                        
.setLedgerRolloverTimeout(serviceConfig.getManagedLedgerCursorRolloverTimeInSeconds());
-                managedLedgerConfig
-                        
.setRetentionTime(retentionPolicies.getRetentionTimeInMinutes(), 
TimeUnit.MINUTES);
-                
managedLedgerConfig.setRetentionSizeInMB(retentionPolicies.getRetentionSizeInMB());
-                
managedLedgerConfig.setAutoSkipNonRecoverableData(serviceConfig.isAutoSkipNonRecoverableData());
-                
managedLedgerConfig.setLazyCursorRecovery(serviceConfig.isLazyCursorRecovery());
-                managedLedgerConfig.setInactiveLedgerRollOverTime(
-                        
serviceConfig.getManagedLedgerInactiveLedgerRolloverTimeSeconds(), 
TimeUnit.SECONDS);
-                managedLedgerConfig.setCacheEvictionByMarkDeletedPosition(
-                        serviceConfig.isCacheEvictionByMarkDeletedPosition());
-                managedLedgerConfig.setMinimumBacklogCursorsForCaching(
-                        
serviceConfig.getManagedLedgerMinimumBacklogCursorsForCaching());
-                managedLedgerConfig.setMinimumBacklogEntriesForCaching(
-                        
serviceConfig.getManagedLedgerMinimumBacklogEntriesForCaching());
-                managedLedgerConfig.setMaxBacklogBetweenCursorsForCaching(
-                        
serviceConfig.getManagedLedgerMaxBacklogBetweenCursorsForCaching());
-
-                OffloadPoliciesImpl nsLevelOffloadPolicies =
-                        (OffloadPoliciesImpl) policies.map(p -> 
p.offload_policies).orElse(null);
-                OffloadPoliciesImpl offloadPolicies = 
OffloadPoliciesImpl.mergeConfiguration(
-                        topicLevelOffloadPolicies,
-                        
OffloadPoliciesImpl.oldPoliciesCompatible(nsLevelOffloadPolicies, 
policies.orElse(null)),
-                        getPulsar().getConfig().getProperties());
-                if 
(NamespaceService.isSystemServiceNamespace(namespace.toString())) {
-                    
managedLedgerConfig.setLedgerOffloader(NullLedgerOffloader.INSTANCE);
-                } else  {
-                    if (topicLevelOffloadPolicies != null) {
-                        try {
-                            LedgerOffloader topicLevelLedgerOffLoader =
-                                    
pulsar().createManagedLedgerOffloader(offloadPolicies);
-                            
managedLedgerConfig.setLedgerOffloader(topicLevelLedgerOffLoader);
-                        } catch (PulsarServerException e) {
-                            throw new RuntimeException(e);
-                        }
-                    } else {
-                        //If the topic level policy is null, use the namespace 
level
-                        managedLedgerConfig
-                                
.setLedgerOffloader(pulsar.getManagedLedgerOffloader(namespace, 
offloadPolicies));
+            
managedLedgerConfig.setThrottleMarkDelete(persistencePolicies.getManagedLedgerMaxMarkDeleteRate());
+            
managedLedgerConfig.setDigestType(serviceConfig.getManagedLedgerDigestType());
+            
managedLedgerConfig.setPassword(serviceConfig.getManagedLedgerPassword());
+
+            managedLedgerConfig
+                    
.setMaxUnackedRangesToPersist(serviceConfig.getManagedLedgerMaxUnackedRangesToPersist());
+            
managedLedgerConfig.setPersistentUnackedRangesWithMultipleEntriesEnabled(
+                    
serviceConfig.isPersistentUnackedRangesWithMultipleEntriesEnabled());
+            managedLedgerConfig.setMaxUnackedRangesToPersistInMetadataStore(
+                    
serviceConfig.getManagedLedgerMaxUnackedRangesToPersistInMetadataStore());
+            
managedLedgerConfig.setMaxEntriesPerLedger(serviceConfig.getManagedLedgerMaxEntriesPerLedger());
+            managedLedgerConfig
+                    
.setMinimumRolloverTime(serviceConfig.getManagedLedgerMinLedgerRolloverTimeMinutes(),
+                            TimeUnit.MINUTES);
+            managedLedgerConfig
+                    
.setMaximumRolloverTime(serviceConfig.getManagedLedgerMaxLedgerRolloverTimeMinutes(),
+                            TimeUnit.MINUTES);
+            
managedLedgerConfig.setMaxSizePerLedgerMb(serviceConfig.getManagedLedgerMaxSizePerLedgerMbytes());
+
+            managedLedgerConfig.setMetadataOperationsTimeoutSeconds(
+                    
serviceConfig.getManagedLedgerMetadataOperationsTimeoutSeconds());
+            managedLedgerConfig
+                    
.setReadEntryTimeoutSeconds(serviceConfig.getManagedLedgerReadEntryTimeoutSeconds());
+            managedLedgerConfig
+                    
.setAddEntryTimeoutSeconds(serviceConfig.getManagedLedgerAddEntryTimeoutSeconds());
+            
managedLedgerConfig.setMetadataEnsembleSize(serviceConfig.getManagedLedgerDefaultEnsembleSize());
+            managedLedgerConfig.setUnackedRangesOpenCacheSetEnabled(
+                    
serviceConfig.isManagedLedgerUnackedRangesOpenCacheSetEnabled());
+            
managedLedgerConfig.setMetadataWriteQuorumSize(serviceConfig.getManagedLedgerDefaultWriteQuorum());
+            
managedLedgerConfig.setMetadataAckQuorumSize(serviceConfig.getManagedLedgerDefaultAckQuorum());
+            managedLedgerConfig
+                    
.setMetadataMaxEntriesPerLedger(serviceConfig.getManagedLedgerCursorMaxEntriesPerLedger());
+
+            managedLedgerConfig
+                    
.setLedgerRolloverTimeout(serviceConfig.getManagedLedgerCursorRolloverTimeInSeconds());
+            managedLedgerConfig
+                    
.setRetentionTime(retentionPolicies.getRetentionTimeInMinutes(), 
TimeUnit.MINUTES);
+            
managedLedgerConfig.setRetentionSizeInMB(retentionPolicies.getRetentionSizeInMB());
+            
managedLedgerConfig.setAutoSkipNonRecoverableData(serviceConfig.isAutoSkipNonRecoverableData());
+            
managedLedgerConfig.setLazyCursorRecovery(serviceConfig.isLazyCursorRecovery());
+            managedLedgerConfig.setInactiveLedgerRollOverTime(
+                    
serviceConfig.getManagedLedgerInactiveLedgerRolloverTimeSeconds(), 
TimeUnit.SECONDS);
+            managedLedgerConfig.setCacheEvictionByMarkDeletedPosition(
+                    serviceConfig.isCacheEvictionByMarkDeletedPosition());
+            managedLedgerConfig.setMinimumBacklogCursorsForCaching(
+                    
serviceConfig.getManagedLedgerMinimumBacklogCursorsForCaching());
+            managedLedgerConfig.setMinimumBacklogEntriesForCaching(
+                    
serviceConfig.getManagedLedgerMinimumBacklogEntriesForCaching());
+            managedLedgerConfig.setMaxBacklogBetweenCursorsForCaching(
+                    
serviceConfig.getManagedLedgerMaxBacklogBetweenCursorsForCaching());
+
+            OffloadPoliciesImpl nsLevelOffloadPolicies =
+                    (OffloadPoliciesImpl) policies.map(p -> 
p.offload_policies).orElse(null);
+            OffloadPoliciesImpl offloadPolicies = 
OffloadPoliciesImpl.mergeConfiguration(
+                    topicLevelOffloadPolicies,
+                    
OffloadPoliciesImpl.oldPoliciesCompatible(nsLevelOffloadPolicies, 
policies.orElse(null)),
+                    getPulsar().getConfig().getProperties());
+            if 
(NamespaceService.isSystemServiceNamespace(namespace.toString())) {
+                
managedLedgerConfig.setLedgerOffloader(NullLedgerOffloader.INSTANCE);
+            } else  {
+                if (topicLevelOffloadPolicies != null) {
+                    try {
+                        LedgerOffloader topicLevelLedgerOffLoader =
+                                
pulsar().createManagedLedgerOffloader(offloadPolicies);
+                        
managedLedgerConfig.setLedgerOffloader(topicLevelLedgerOffLoader);
+                    } catch (PulsarServerException e) {
+                        throw new RuntimeException(e);
                     }
+                } else {
+                    //If the topic level policy is null, use the namespace 
level
+                    managedLedgerConfig
+                            
.setLedgerOffloader(pulsar.getManagedLedgerOffloader(namespace, 
offloadPolicies));
                 }
+            }
 
-                managedLedgerConfig.setDeletionAtBatchIndexLevelEnabled(
-                        
serviceConfig.isAcknowledgmentAtBatchIndexLevelEnabled());
-                managedLedgerConfig.setNewEntriesCheckDelayInMillis(
-                        
serviceConfig.getManagedLedgerNewEntriesCheckDelayInMillis());
-                return managedLedgerConfig;
-            });
+            managedLedgerConfig.setDeletionAtBatchIndexLevelEnabled(
+                    serviceConfig.isAcknowledgmentAtBatchIndexLevelEnabled());
+            managedLedgerConfig.setNewEntriesCheckDelayInMillis(
+                    
serviceConfig.getManagedLedgerNewEntriesCheckDelayInMillis());
+            return managedLedgerConfig;
         });
     }
 
@@ -3043,7 +3071,10 @@ public class BrokerService implements Closeable {
             CompletableFuture<Optional<Topic>> pendingFuture = 
pendingTopic.getTopicFuture();
             final Semaphore topicLoadSemaphore = 
topicLoadRequestSemaphore.get();
             final boolean acquiredPermit = topicLoadSemaphore.tryAcquire();
-            checkOwnershipAndCreatePersistentTopic(topic, true, pendingFuture, 
pendingTopic.getProperties());
+            checkOwnershipAndCreatePersistentTopic(topic,
+                    true,
+                    pendingFuture,
+                    pendingTopic.getProperties(), 
pendingTopic.getTopicPolicies());
             pendingFuture.handle((persistentTopic, ex) -> {
                 // release permit and process next pending topic
                 if (acquiredPermit) {
@@ -3614,5 +3645,6 @@ public class BrokerService implements Closeable {
         private final String topic;
         private final CompletableFuture<Optional<Topic>> topicFuture;
         private final Map<String, String> properties;
+        private final TopicPolicies topicPolicies;
     }
 }
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesService.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesService.java
index da312340954..80fecbe67b6 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesService.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesService.java
@@ -29,7 +29,6 @@ import java.util.Optional;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.CopyOnWriteArrayList;
-import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicInteger;
 import javax.annotation.Nonnull;
 import org.apache.commons.lang3.tuple.MutablePair;
@@ -43,10 +42,8 @@ import org.apache.pulsar.broker.systopic.SystemTopicClient;
 import org.apache.pulsar.client.api.Message;
 import org.apache.pulsar.client.api.MessageId;
 import org.apache.pulsar.client.api.PulsarClientException;
-import org.apache.pulsar.client.impl.Backoff;
 import org.apache.pulsar.client.impl.MessageImpl;
 import org.apache.pulsar.client.impl.TopicMessageImpl;
-import org.apache.pulsar.client.util.RetryUtil;
 import org.apache.pulsar.common.events.ActionType;
 import org.apache.pulsar.common.events.EventType;
 import org.apache.pulsar.common.events.PulsarEvent;
@@ -320,7 +317,7 @@ public class SystemTopicBasedTopicPoliciesService 
implements TopicPoliciesServic
         requireNonNull(namespace);
         return policyCacheInitMap.computeIfAbsent(namespace, (k) -> {
             final CompletableFuture<SystemTopicClient.Reader<PulsarEvent>> 
readerCompletableFuture =
-                    createSystemTopicClientWithRetry(namespace);
+                    createSystemTopicClient(namespace);
             readerCaches.put(namespace, readerCompletableFuture);
             ownedBundlesCountPerNamespace.putIfAbsent(namespace, new 
AtomicInteger(1));
             final CompletableFuture<Void> initFuture = readerCompletableFuture
@@ -346,20 +343,16 @@ public class SystemTopicBasedTopicPoliciesService 
implements TopicPoliciesServic
         });
     }
 
-    protected CompletableFuture<SystemTopicClient.Reader<PulsarEvent>> 
createSystemTopicClientWithRetry(
+    protected CompletableFuture<SystemTopicClient.Reader<PulsarEvent>> 
createSystemTopicClient(
             NamespaceName namespace) {
-        CompletableFuture<SystemTopicClient.Reader<PulsarEvent>> result = new 
CompletableFuture<>();
         try {
             createSystemTopicFactoryIfNeeded();
-        } catch (PulsarServerException e) {
-            result.completeExceptionally(e);
-            return result;
+        } catch (PulsarServerException ex) {
+            return FutureUtil.failedFuture(ex);
         }
-        SystemTopicClient<PulsarEvent> systemTopicClient = 
namespaceEventsSystemTopicFactory
+        final SystemTopicClient<PulsarEvent> systemTopicClient = 
namespaceEventsSystemTopicFactory
                 .createTopicPoliciesSystemTopicClient(namespace);
-        Backoff backoff = new Backoff(1, TimeUnit.SECONDS, 3, 
TimeUnit.SECONDS, 10, TimeUnit.SECONDS);
-        RetryUtil.retryAsynchronously(systemTopicClient::newReaderAsync, 
backoff, pulsarService.getExecutor(), result);
-        return result;
+        return systemTopicClient.newReaderAsync();
     }
 
     @Override
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TopicAutoCreationTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TopicAutoCreationTest.java
index d447787d1b7..590edc2d3f3 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TopicAutoCreationTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TopicAutoCreationTest.java
@@ -135,6 +135,8 @@ public class TopicAutoCreationTest extends 
ProducerConsumerBase {
                         new InetSocketAddress(pulsar.getAdvertisedAddress(), 
pulsar.getBrokerListenPort().get());
                 return 
CompletableFuture.completedFuture(Pair.of(brokerAddress, brokerAddress));
             });
+            final String topicPoliciesServiceInitException
+                    = "Topic creation encountered an exception by initialize 
topic policies service";
 
             // Creating a producer and creating a Consumer may trigger 
automatic topic
             // creation, let's try to create a Producer and a Consumer
@@ -145,7 +147,8 @@ public class TopicAutoCreationTest extends 
ProducerConsumerBase {
             } catch (PulsarClientException.LookupException expected) {
                 String msg = "Namespace bundle for topic (%s) not served by 
this instance";
                 log.info("Expected error", expected);
-                assertTrue(expected.getMessage().contains(String.format(msg, 
topic)));
+                assertTrue(expected.getMessage().contains(String.format(msg, 
topic))
+                        || 
expected.getMessage().contains(topicPoliciesServiceInitException));
             }
 
             try (Consumer<byte[]> ignored = pulsarClient.newConsumer()
@@ -155,7 +158,8 @@ public class TopicAutoCreationTest extends 
ProducerConsumerBase {
             } catch (PulsarClientException.LookupException expected) {
                 String msg = "Namespace bundle for topic (%s) not served by 
this instance";
                 log.info("Expected error", expected);
-                assertTrue(expected.getMessage().contains(String.format(msg, 
topic)));
+                assertTrue(expected.getMessage().contains(String.format(msg, 
topic))
+                        || 
expected.getMessage().contains(topicPoliciesServiceInitException));
             }
 
 
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/namespace/NamespaceUnloadingTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/namespace/NamespaceUnloadingTest.java
index 0dbfe176087..1526611874a 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/namespace/NamespaceUnloadingTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/namespace/NamespaceUnloadingTest.java
@@ -22,8 +22,12 @@ import static org.testng.Assert.assertTrue;
 
 import com.google.common.collect.Sets;
 
+import lombok.Cleanup;
 import org.apache.pulsar.broker.service.BrokerTestBase;
+import org.apache.pulsar.client.admin.PulsarAdminException;
 import org.apache.pulsar.client.api.Producer;
+import org.apache.pulsar.client.api.PulsarClientException;
+import org.apache.pulsar.client.api.Schema;
 import org.testng.annotations.AfterMethod;
 import org.testng.annotations.BeforeMethod;
 import org.testng.annotations.Test;
@@ -34,6 +38,9 @@ public class NamespaceUnloadingTest extends BrokerTestBase {
     @BeforeMethod
     @Override
     protected void setup() throws Exception {
+        conf.setTopicLevelPoliciesEnabled(true);
+        conf.setForceDeleteNamespaceAllowed(true);
+        conf.setTopicLoadTimeoutSeconds(Integer.MAX_VALUE);
         super.baseSetup();
     }
 
@@ -68,4 +75,26 @@ public class NamespaceUnloadingTest extends BrokerTestBase {
         producer.close();
     }
 
+    @Test
+    public void testUnloadWithTopicCreation() throws PulsarAdminException, 
PulsarClientException {
+        final String namespaceName = "prop/ns_unloading";
+        final String topicName = 
"persistent://prop/ns_unloading/with_topic_creation";
+        final int partitions = 5;
+        admin.namespaces().createNamespace(namespaceName, 1);
+        admin.topics().createPartitionedTopic(topicName, partitions);
+        @Cleanup
+        Producer<Integer> producer = pulsarClient.newProducer(Schema.INT32)
+                .topic(topicName)
+                .create();
+
+        for (int i = 0; i < 100; i++) {
+            admin.namespaces().unloadNamespaceBundle(namespaceName, 
"0x00000000_0xffffffff");
+        }
+
+        for (int i = 0; i < partitions; i++) {
+            producer.send(i);
+        }
+        admin.namespaces().deleteNamespace(namespaceName, true);
+    }
+
 }
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerServiceTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerServiceTest.java
index 2489aa5f026..57bf3a2a866 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerServiceTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerServiceTest.java
@@ -1152,7 +1152,7 @@ public class BrokerServiceTest extends BrokerTestBase {
 
         // try to create topic which should fail as bundle is disable
         CompletableFuture<Optional<Topic>> futureResult = 
pulsar.getBrokerService()
-                .loadOrCreatePersistentTopic(topicName, true, null);
+                .loadOrCreatePersistentTopic(topicName, true, null, null);
 
         try {
             futureResult.get();
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesServiceTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesServiceTest.java
index 5b70ff99675..ba5e42867d3 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesServiceTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesServiceTest.java
@@ -18,9 +18,6 @@
  */
 package org.apache.pulsar.broker.service;
 
-import static org.mockito.ArgumentMatchers.any;
-import static org.mockito.Mockito.doReturn;
-import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.spy;
 import static org.testng.AssertJUnit.assertEquals;
 import static org.testng.AssertJUnit.assertNotNull;
@@ -43,11 +40,8 @@ import lombok.extern.slf4j.Slf4j;
 import org.apache.commons.lang3.reflect.FieldUtils;
 import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest;
 import 
org.apache.pulsar.broker.service.BrokerServiceException.TopicPoliciesCacheNotInitException;
-import org.apache.pulsar.broker.systopic.NamespaceEventsSystemTopicFactory;
 import org.apache.pulsar.broker.systopic.SystemTopicClient;
-import org.apache.pulsar.broker.systopic.TopicPoliciesSystemTopicClient;
 import org.apache.pulsar.client.admin.PulsarAdminException;
-import org.apache.pulsar.client.api.PulsarClientException;
 import org.apache.pulsar.client.impl.Backoff;
 import org.apache.pulsar.client.impl.BackoffBuilder;
 import org.apache.pulsar.common.events.PulsarEvent;
@@ -56,7 +50,6 @@ import org.apache.pulsar.common.naming.TopicName;
 import org.apache.pulsar.common.policies.data.ClusterData;
 import org.apache.pulsar.common.policies.data.TenantInfoImpl;
 import org.apache.pulsar.common.policies.data.TopicPolicies;
-import org.apache.pulsar.common.util.FutureUtil;
 import org.assertj.core.api.Assertions;
 import org.awaitility.Awaitility;
 import org.mockito.Mockito;
@@ -321,28 +314,6 @@ public class SystemTopicBasedTopicPoliciesServiceTest 
extends MockedPulsarServic
         assertTrue("actual:" + cost, cost >= 5000 - 1000);
     }
 
-    @Test
-    public void testCreatSystemTopicClientWithRetry() throws Exception {
-        SystemTopicBasedTopicPoliciesService service =
-                spy((SystemTopicBasedTopicPoliciesService) 
pulsar.getTopicPoliciesService());
-        Field field = SystemTopicBasedTopicPoliciesService.class
-                .getDeclaredField("namespaceEventsSystemTopicFactory");
-        field.setAccessible(true);
-        NamespaceEventsSystemTopicFactory factory = 
spy((NamespaceEventsSystemTopicFactory) field.get(service));
-        SystemTopicClient<PulsarEvent> client = 
mock(TopicPoliciesSystemTopicClient.class);
-        
doReturn(client).when(factory).createTopicPoliciesSystemTopicClient(any());
-        field.set(service, factory);
-
-        SystemTopicClient.Reader<PulsarEvent> reader = 
mock(SystemTopicClient.Reader.class);
-        // Throw an exception first, create successfully after retrying
-        doReturn(FutureUtil.failedFuture(new PulsarClientException("test")))
-                
.doReturn(CompletableFuture.completedFuture(reader)).when(client).newReaderAsync();
-
-        SystemTopicClient.Reader<PulsarEvent> reader1 = 
service.createSystemTopicClientWithRetry(null).get();
-
-        assertEquals(reader1, reader);
-    }
-
     @Test
     public void testGetTopicPoliciesWithRetry() throws Exception {
         Field initMapField = 
SystemTopicBasedTopicPoliciesService.class.getDeclaredField("policyCacheInitMap");
diff --git 
a/pulsar-client-tools-test/src/test/java/org/apache/pulsar/client/cli/PulsarClientToolTest.java
 
b/pulsar-client-tools-test/src/test/java/org/apache/pulsar/client/cli/PulsarClientToolTest.java
index 8d416125fd1..acda1b6f6a5 100644
--- 
a/pulsar-client-tools-test/src/test/java/org/apache/pulsar/client/cli/PulsarClientToolTest.java
+++ 
b/pulsar-client-tools-test/src/test/java/org/apache/pulsar/client/cli/PulsarClientToolTest.java
@@ -111,6 +111,7 @@ public class PulsarClientToolTest extends BrokerTestBase {
         properties.setProperty("useTls", "false");
 
         final String topicName = getTopicWithRandomSuffix("non-durable");
+        admin.topics().createNonPartitionedTopic(topicName);
 
         int numberOfMessages = 10;
         @Cleanup("shutdownNow")
@@ -202,6 +203,7 @@ public class PulsarClientToolTest extends BrokerTestBase {
         properties.setProperty("useTls", "false");
 
         final String topicName = getTopicWithRandomSuffix("reader");
+        admin.topics().createNonPartitionedTopic(topicName);
 
         int numberOfMessages = 10;
         @Cleanup("shutdownNow")
@@ -251,6 +253,7 @@ public class PulsarClientToolTest extends BrokerTestBase {
         properties.setProperty("useTls", "false");
 
         final String topicName = getTopicWithRandomSuffix("encryption");
+        admin.topics().createNonPartitionedTopic(topicName);
         final String keyUriBase = 
"file:../pulsar-broker/src/test/resources/certificate/";
         final int numberOfMessages = 10;
 

Reply via email to