This is an automated email from the ASF dual-hosted git repository.

penghui pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new afc92440582 [fix][broker] Fix inconsistent topic policy (#21231)
afc92440582 is described below

commit afc924405829f23962e04a717571f33930c165d6
Author: Qiang Zhao <[email protected]>
AuthorDate: Tue Sep 26 10:33:35 2023 +0800

    [fix][broker] Fix inconsistent topic policy (#21231)
---
 .../ProxySaslAuthenticationTest.java               |  13 +-
 .../authentication/SaslAuthenticateTest.java       |  11 +-
 .../pulsar/broker/service/BrokerService.java       | 295 +++++++++++----------
 .../SystemTopicBasedTopicPoliciesService.java      | 100 +++++--
 .../broker/service/TopicPoliciesService.java       |  41 +++
 .../pulsar/broker/admin/PersistentTopicsTest.java  |   1 +
 .../pulsar/broker/admin/TopicPoliciesTest.java     |   2 +-
 .../admin/TopicPoliciesWithBrokerRestartTest.java  | 104 ++++++++
 .../apache/pulsar/broker/admin/TopicsAuthTest.java |   2 +
 .../apache/pulsar/broker/auth/AuthLogsTest.java    |   2 +
 .../pulsar/broker/auth/MockAuthentication.java     |   6 +-
 .../broker/auth/MockedPulsarServiceBaseTest.java   |  19 ++
 .../broker/service/BrokerBookieIsolationTest.java  |   3 +-
 .../SystemTopicBasedTopicPoliciesServiceTest.java  |   6 +-
 .../service/persistent/PersistentTopicTest.java    |   3 +-
 .../AuthenticationTlsHostnameVerificationTest.java |   3 +
 .../api/AuthorizationProducerConsumerTest.java     |   5 +
 .../client/api/MutualAuthenticationTest.java       |   2 +-
 .../TokenAuthenticatedProducerConsumerTest.java    |   3 +
 ...kenOauth2AuthenticatedProducerConsumerTest.java |  14 +-
 ...eyStoreTlsProducerConsumerTestWithAuthTest.java |  22 ++
 .../impl/PatternTopicsConsumerImplAuthTest.java    |   1 +
 .../configurations/standalone_no_client_auth.conf  |   3 +-
 .../proxy/server/ProxyAuthenticationTest.java      |   2 +-
 .../proxy/server/ProxyForwardAuthDataTest.java     |   2 +-
 .../proxy/server/ProxyRolesEnforcementTest.java    |   2 +-
 .../server/ProxyWithAuthorizationNegTest.java      |   2 +
 .../apache/pulsar/sql/presto/TestPulsarAuth.java   |   4 +
 .../loadbalance/ExtensibleLoadManagerTest.java     |   1 +
 .../integration/presto/TestPulsarSQLAuth.java      |   1 +
 30 files changed, 478 insertions(+), 197 deletions(-)

diff --git 
a/pulsar-broker-auth-sasl/src/test/java/org/apache/pulsar/broker/authentication/ProxySaslAuthenticationTest.java
 
b/pulsar-broker-auth-sasl/src/test/java/org/apache/pulsar/broker/authentication/ProxySaslAuthenticationTest.java
index 261efe680f8..f0e45aa734a 100644
--- 
a/pulsar-broker-auth-sasl/src/test/java/org/apache/pulsar/broker/authentication/ProxySaslAuthenticationTest.java
+++ 
b/pulsar-broker-auth-sasl/src/test/java/org/apache/pulsar/broker/authentication/ProxySaslAuthenticationTest.java
@@ -49,6 +49,7 @@ import org.apache.pulsar.client.api.PulsarClientException;
 import org.apache.pulsar.client.api.Schema;
 import org.apache.pulsar.client.impl.auth.AuthenticationSasl;
 import org.apache.pulsar.common.configuration.PulsarConfigurationLoader;
+import org.apache.pulsar.common.util.ObjectMapperFactory;
 import org.apache.pulsar.proxy.server.ProxyConfiguration;
 import org.apache.pulsar.proxy.server.ProxyService;
 import org.slf4j.Logger;
@@ -193,15 +194,17 @@ public class ProxySaslAuthenticationTest extends 
ProducerConsumerBase {
                conf.setAuthenticationProviders(providers);
                conf.setClusterName("test");
                conf.setSuperUserRoles(ImmutableSet.of("client/" + 
localHostname + "@" + kdc.getRealm()));
-
-               super.init();
-
-               lookupUrl = new URI(pulsar.getBrokerServiceUrl());
-
                // set admin auth, to verify admin web resources
                Map<String, String> clientSaslConfig = new HashMap<>();
                clientSaslConfig.put("saslJaasClientSectionName", 
"PulsarClient");
                clientSaslConfig.put("serverType", "broker");
+               
conf.setBrokerClientAuthenticationPlugin(AuthenticationSasl.class.getName());
+               conf.setBrokerClientAuthenticationParameters(ObjectMapperFactory
+                               
.getMapper().getObjectMapper().writeValueAsString(clientSaslConfig));
+
+               super.init();
+
+               lookupUrl = new URI(pulsar.getBrokerServiceUrl());
                log.info("set client jaas section name: PulsarClient");
                admin = PulsarAdmin.builder()
                        .serviceHttpUrl(brokerUrl.toString())
diff --git 
a/pulsar-broker-auth-sasl/src/test/java/org/apache/pulsar/broker/authentication/SaslAuthenticateTest.java
 
b/pulsar-broker-auth-sasl/src/test/java/org/apache/pulsar/broker/authentication/SaslAuthenticateTest.java
index 5cace2221de..230c2ad787d 100644
--- 
a/pulsar-broker-auth-sasl/src/test/java/org/apache/pulsar/broker/authentication/SaslAuthenticateTest.java
+++ 
b/pulsar-broker-auth-sasl/src/test/java/org/apache/pulsar/broker/authentication/SaslAuthenticateTest.java
@@ -58,6 +58,7 @@ import org.apache.pulsar.client.api.PulsarClient;
 import org.apache.pulsar.client.impl.auth.AuthenticationSasl;
 import org.apache.pulsar.common.api.AuthData;
 import org.apache.pulsar.common.sasl.SaslConstants;
+import org.apache.pulsar.common.util.ObjectMapperFactory;
 import org.testng.Assert;
 import org.testng.annotations.AfterClass;
 import org.testng.annotations.AfterMethod;
@@ -186,7 +187,12 @@ public class SaslAuthenticateTest extends 
ProducerConsumerBase {
         conf.setAuthenticationProviders(providers);
         conf.setClusterName("test");
         conf.setSuperUserRoles(ImmutableSet.of("client" + "@" + 
kdc.getRealm()));
-
+        Map<String, String> clientSaslConfig = new HashMap<>();
+        clientSaslConfig.put("saslJaasClientSectionName", "PulsarClient");
+        clientSaslConfig.put("serverType", "broker");
+        
conf.setBrokerClientAuthenticationPlugin(AuthenticationSasl.class.getName());
+        conf.setBrokerClientAuthenticationParameters(ObjectMapperFactory
+                
.getMapper().getObjectMapper().writeValueAsString(clientSaslConfig));
         super.init();
 
         lookupUrl = new URI(pulsar.getWebServiceAddress());
@@ -197,9 +203,6 @@ public class SaslAuthenticateTest extends 
ProducerConsumerBase {
             .authentication(authSasl));
 
         // set admin auth, to verify admin web resources
-        Map<String, String> clientSaslConfig = new HashMap<>();
-        clientSaslConfig.put("saslJaasClientSectionName", "PulsarClient");
-        clientSaslConfig.put("serverType", "broker");
         log.info("set client jaas section name: PulsarClient");
         admin = PulsarAdmin.builder()
             .serviceHttpUrl(brokerUrl.toString())
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
index 391affef1da..388d9de84f8 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
@@ -1760,165 +1760,172 @@ public class BrokerService implements Closeable {
         });
     }
 
-    public CompletableFuture<ManagedLedgerConfig> 
getManagedLedgerConfig(TopicName topicName) {
+    public CompletableFuture<ManagedLedgerConfig> 
getManagedLedgerConfig(@Nonnull TopicName topicName) {
+        requireNonNull(topicName);
         NamespaceName namespace = topicName.getNamespaceObject();
         ServiceConfiguration serviceConfig = pulsar.getConfiguration();
 
         NamespaceResources nsr = 
pulsar.getPulsarResources().getNamespaceResources();
         LocalPoliciesResources lpr = 
pulsar.getPulsarResources().getLocalPolicies();
-        return nsr.getPoliciesAsync(namespace)
-                .thenCombine(lpr.getLocalPoliciesAsync(namespace), (policies, 
localPolicies) -> {
-                    PersistencePolicies persistencePolicies = null;
-                    RetentionPolicies retentionPolicies = null;
-                    OffloadPoliciesImpl topicLevelOffloadPolicies = null;
-
-                    if (pulsar.getConfig().isTopicLevelPoliciesEnabled()
-                            && 
!NamespaceService.isSystemServiceNamespace(namespace.toString())) {
-                        final TopicPolicies topicPolicies = 
pulsar.getTopicPoliciesService()
-                                .getTopicPoliciesIfExists(topicName);
-                        if (topicPolicies != null) {
-                            persistencePolicies = 
topicPolicies.getPersistence();
-                            retentionPolicies = 
topicPolicies.getRetentionPolicies();
-                            topicLevelOffloadPolicies = 
topicPolicies.getOffloadPolicies();
-                        }
-                    }
-
-                    if (persistencePolicies == null) {
-                        persistencePolicies = policies.map(p -> 
p.persistence).orElseGet(
-                                () -> new 
PersistencePolicies(serviceConfig.getManagedLedgerDefaultEnsembleSize(),
-                                        
serviceConfig.getManagedLedgerDefaultWriteQuorum(),
-                                        
serviceConfig.getManagedLedgerDefaultAckQuorum(),
-                                        
serviceConfig.getManagedLedgerDefaultMarkDeleteRateLimit()));
-                    }
+        final CompletableFuture<Optional<TopicPolicies>> topicPoliciesFuture;
+        if (pulsar.getConfig().isTopicLevelPoliciesEnabled()
+            && !NamespaceService.isSystemServiceNamespace(namespace.toString())
+            && 
!SystemTopicNames.isTopicPoliciesSystemTopic(topicName.toString())) {
+            topicPoliciesFuture = 
pulsar.getTopicPoliciesService().getTopicPoliciesAsync(topicName);
+        } else {
+            topicPoliciesFuture = 
CompletableFuture.completedFuture(Optional.empty());
+        }
+        return topicPoliciesFuture.thenCompose(topicPoliciesOptional -> {
+            final CompletableFuture<Optional<Policies>> nsPolicies = 
nsr.getPoliciesAsync(namespace);
+            final CompletableFuture<Optional<LocalPolicies>> lcPolicies = 
lpr.getLocalPoliciesAsync(namespace);
+            return nsPolicies.thenCombine(lcPolicies, (policies, 
localPolicies) -> {
+                PersistencePolicies persistencePolicies = null;
+                RetentionPolicies retentionPolicies = null;
+                OffloadPoliciesImpl topicLevelOffloadPolicies = null;
+                if (topicPoliciesOptional.isPresent()) {
+                    final TopicPolicies topicPolicies = 
topicPoliciesOptional.get();
+                    persistencePolicies = topicPolicies.getPersistence();
+                    retentionPolicies = topicPolicies.getRetentionPolicies();
+                    topicLevelOffloadPolicies = 
topicPolicies.getOffloadPolicies();
+                }
 
-                    if (retentionPolicies == null) {
-                        retentionPolicies = policies.map(p -> 
p.retention_policies).orElseGet(
-                                () -> new 
RetentionPolicies(serviceConfig.getDefaultRetentionTimeInMinutes(),
-                                        
serviceConfig.getDefaultRetentionSizeInMB())
-                        );
-                    }
+                if (persistencePolicies == null) {
+                    persistencePolicies = policies.map(p -> 
p.persistence).orElseGet(
+                            () -> new 
PersistencePolicies(serviceConfig.getManagedLedgerDefaultEnsembleSize(),
+                                    
serviceConfig.getManagedLedgerDefaultWriteQuorum(),
+                                    
serviceConfig.getManagedLedgerDefaultAckQuorum(),
+                                    
serviceConfig.getManagedLedgerDefaultMarkDeleteRateLimit()));
+                }
 
-                    ManagedLedgerConfig managedLedgerConfig = new 
ManagedLedgerConfig();
-                    
managedLedgerConfig.setEnsembleSize(persistencePolicies.getBookkeeperEnsemble());
-                    
managedLedgerConfig.setWriteQuorumSize(persistencePolicies.getBookkeeperWriteQuorum());
-                    
managedLedgerConfig.setAckQuorumSize(persistencePolicies.getBookkeeperAckQuorum());
+                if (retentionPolicies == null) {
+                    retentionPolicies = policies.map(p -> 
p.retention_policies).orElseGet(
+                            () -> new 
RetentionPolicies(serviceConfig.getDefaultRetentionTimeInMinutes(),
+                                    
serviceConfig.getDefaultRetentionSizeInMB())
+                    );
+                }
 
-                    if (serviceConfig.isStrictBookieAffinityEnabled()) {
+                ManagedLedgerConfig managedLedgerConfig = new 
ManagedLedgerConfig();
+                
managedLedgerConfig.setEnsembleSize(persistencePolicies.getBookkeeperEnsemble());
+                
managedLedgerConfig.setWriteQuorumSize(persistencePolicies.getBookkeeperWriteQuorum());
+                
managedLedgerConfig.setAckQuorumSize(persistencePolicies.getBookkeeperAckQuorum());
+
+                if (serviceConfig.isStrictBookieAffinityEnabled()) {
+                    
managedLedgerConfig.setBookKeeperEnsemblePlacementPolicyClassName(
+                            IsolatedBookieEnsemblePlacementPolicy.class);
+                    if (localPolicies.isPresent() && 
localPolicies.get().bookieAffinityGroup != null) {
+                        Map<String, Object> properties = new HashMap<>();
+                        
properties.put(IsolatedBookieEnsemblePlacementPolicy.ISOLATION_BOOKIE_GROUPS,
+                                
localPolicies.get().bookieAffinityGroup.getBookkeeperAffinityGroupPrimary());
+                        
properties.put(IsolatedBookieEnsemblePlacementPolicy.SECONDARY_ISOLATION_BOOKIE_GROUPS,
+                                
localPolicies.get().bookieAffinityGroup.getBookkeeperAffinityGroupSecondary());
+                        
managedLedgerConfig.setBookKeeperEnsemblePlacementPolicyProperties(properties);
+                    } else if (isSystemTopic(topicName)) {
+                        Map<String, Object> properties = new HashMap<>();
+                        
properties.put(IsolatedBookieEnsemblePlacementPolicy.ISOLATION_BOOKIE_GROUPS, 
"*");
+                        properties.put(IsolatedBookieEnsemblePlacementPolicy
+                                .SECONDARY_ISOLATION_BOOKIE_GROUPS, "*");
+                        
managedLedgerConfig.setBookKeeperEnsemblePlacementPolicyProperties(properties);
+                    } else {
+                        Map<String, Object> properties = new HashMap<>();
+                        
properties.put(IsolatedBookieEnsemblePlacementPolicy.ISOLATION_BOOKIE_GROUPS, 
"");
+                        
properties.put(IsolatedBookieEnsemblePlacementPolicy.SECONDARY_ISOLATION_BOOKIE_GROUPS,
 "");
+                        
managedLedgerConfig.setBookKeeperEnsemblePlacementPolicyProperties(properties);
+                    }
+                } else {
+                    if (localPolicies.isPresent() && 
localPolicies.get().bookieAffinityGroup != null) {
                         
managedLedgerConfig.setBookKeeperEnsemblePlacementPolicyClassName(
                                 IsolatedBookieEnsemblePlacementPolicy.class);
-                        if (localPolicies.isPresent() && 
localPolicies.get().bookieAffinityGroup != null) {
-                            Map<String, Object> properties = new HashMap<>();
-                            
properties.put(IsolatedBookieEnsemblePlacementPolicy.ISOLATION_BOOKIE_GROUPS,
-                                    
localPolicies.get().bookieAffinityGroup.getBookkeeperAffinityGroupPrimary());
-                            
properties.put(IsolatedBookieEnsemblePlacementPolicy.SECONDARY_ISOLATION_BOOKIE_GROUPS,
-                                    
localPolicies.get().bookieAffinityGroup.getBookkeeperAffinityGroupSecondary());
-                            
managedLedgerConfig.setBookKeeperEnsemblePlacementPolicyProperties(properties);
-                        } else if (isSystemTopic(topicName)) {
-                            Map<String, Object> properties = new HashMap<>();
-                            
properties.put(IsolatedBookieEnsemblePlacementPolicy.ISOLATION_BOOKIE_GROUPS, 
"*");
-                            
properties.put(IsolatedBookieEnsemblePlacementPolicy
-                                    .SECONDARY_ISOLATION_BOOKIE_GROUPS, "*");
-                            
managedLedgerConfig.setBookKeeperEnsemblePlacementPolicyProperties(properties);
-                        } else {
-                            Map<String, Object> properties = new HashMap<>();
-                            
properties.put(IsolatedBookieEnsemblePlacementPolicy.ISOLATION_BOOKIE_GROUPS, 
"");
-                            
properties.put(IsolatedBookieEnsemblePlacementPolicy.SECONDARY_ISOLATION_BOOKIE_GROUPS,
 "");
-                            
managedLedgerConfig.setBookKeeperEnsemblePlacementPolicyProperties(properties);
-                        }
-                    } else {
-                        if (localPolicies.isPresent() && 
localPolicies.get().bookieAffinityGroup != null) {
-                            
managedLedgerConfig.setBookKeeperEnsemblePlacementPolicyClassName(
-                                            
IsolatedBookieEnsemblePlacementPolicy.class);
-                            Map<String, Object> properties = new HashMap<>();
-                            
properties.put(IsolatedBookieEnsemblePlacementPolicy.ISOLATION_BOOKIE_GROUPS,
-                                    
localPolicies.get().bookieAffinityGroup.getBookkeeperAffinityGroupPrimary());
-                            
properties.put(IsolatedBookieEnsemblePlacementPolicy.SECONDARY_ISOLATION_BOOKIE_GROUPS,
-                                    
localPolicies.get().bookieAffinityGroup.getBookkeeperAffinityGroupSecondary());
-                            
managedLedgerConfig.setBookKeeperEnsemblePlacementPolicyProperties(properties);
-                        }
+                        Map<String, Object> properties = new HashMap<>();
+                        
properties.put(IsolatedBookieEnsemblePlacementPolicy.ISOLATION_BOOKIE_GROUPS,
+                                
localPolicies.get().bookieAffinityGroup.getBookkeeperAffinityGroupPrimary());
+                        
properties.put(IsolatedBookieEnsemblePlacementPolicy.SECONDARY_ISOLATION_BOOKIE_GROUPS,
+                                
localPolicies.get().bookieAffinityGroup.getBookkeeperAffinityGroupSecondary());
+                        
managedLedgerConfig.setBookKeeperEnsemblePlacementPolicyProperties(properties);
                     }
+                }
 
-                    
managedLedgerConfig.setThrottleMarkDelete(persistencePolicies.getManagedLedgerMaxMarkDeleteRate());
-                    
managedLedgerConfig.setDigestType(serviceConfig.getManagedLedgerDigestType());
-                    
managedLedgerConfig.setPassword(serviceConfig.getManagedLedgerPassword());
-
-                    managedLedgerConfig
-                            
.setMaxUnackedRangesToPersist(serviceConfig.getManagedLedgerMaxUnackedRangesToPersist());
-                    
managedLedgerConfig.setPersistentUnackedRangesWithMultipleEntriesEnabled(
-                            
serviceConfig.isPersistentUnackedRangesWithMultipleEntriesEnabled());
-                    
managedLedgerConfig.setMaxUnackedRangesToPersistInMetadataStore(
-                            
serviceConfig.getManagedLedgerMaxUnackedRangesToPersistInMetadataStore());
-                    
managedLedgerConfig.setMaxEntriesPerLedger(serviceConfig.getManagedLedgerMaxEntriesPerLedger());
-                    managedLedgerConfig
-                            
.setMinimumRolloverTime(serviceConfig.getManagedLedgerMinLedgerRolloverTimeMinutes(),
-                                    TimeUnit.MINUTES);
-                    managedLedgerConfig
-                            
.setMaximumRolloverTime(serviceConfig.getManagedLedgerMaxLedgerRolloverTimeMinutes(),
-                                    TimeUnit.MINUTES);
-                    
managedLedgerConfig.setMaxSizePerLedgerMb(serviceConfig.getManagedLedgerMaxSizePerLedgerMbytes());
-
-                    managedLedgerConfig.setMetadataOperationsTimeoutSeconds(
-                            
serviceConfig.getManagedLedgerMetadataOperationsTimeoutSeconds());
-                    managedLedgerConfig
-                            
.setReadEntryTimeoutSeconds(serviceConfig.getManagedLedgerReadEntryTimeoutSeconds());
-                    managedLedgerConfig
-                            
.setAddEntryTimeoutSeconds(serviceConfig.getManagedLedgerAddEntryTimeoutSeconds());
-                    
managedLedgerConfig.setMetadataEnsembleSize(serviceConfig.getManagedLedgerDefaultEnsembleSize());
-                    managedLedgerConfig.setUnackedRangesOpenCacheSetEnabled(
-                            
serviceConfig.isManagedLedgerUnackedRangesOpenCacheSetEnabled());
-                    
managedLedgerConfig.setMetadataWriteQuorumSize(serviceConfig.getManagedLedgerDefaultWriteQuorum());
-                    
managedLedgerConfig.setMetadataAckQuorumSize(serviceConfig.getManagedLedgerDefaultAckQuorum());
-                    managedLedgerConfig
-                            
.setMetadataMaxEntriesPerLedger(serviceConfig.getManagedLedgerCursorMaxEntriesPerLedger());
-
-                    managedLedgerConfig
-                            
.setLedgerRolloverTimeout(serviceConfig.getManagedLedgerCursorRolloverTimeInSeconds());
-                    managedLedgerConfig
-                            
.setRetentionTime(retentionPolicies.getRetentionTimeInMinutes(), 
TimeUnit.MINUTES);
-                    
managedLedgerConfig.setRetentionSizeInMB(retentionPolicies.getRetentionSizeInMB());
-                    
managedLedgerConfig.setAutoSkipNonRecoverableData(serviceConfig.isAutoSkipNonRecoverableData());
-                    
managedLedgerConfig.setLazyCursorRecovery(serviceConfig.isLazyCursorRecovery());
-                    managedLedgerConfig.setInactiveLedgerRollOverTime(
-                            
serviceConfig.getManagedLedgerInactiveLedgerRolloverTimeSeconds(), 
TimeUnit.SECONDS);
-                    managedLedgerConfig.setCacheEvictionByMarkDeletedPosition(
-                            
serviceConfig.isCacheEvictionByMarkDeletedPosition());
-                    managedLedgerConfig.setMinimumBacklogCursorsForCaching(
-                            
serviceConfig.getManagedLedgerMinimumBacklogCursorsForCaching());
-                    managedLedgerConfig.setMinimumBacklogEntriesForCaching(
-                            
serviceConfig.getManagedLedgerMinimumBacklogEntriesForCaching());
-                    managedLedgerConfig.setMaxBacklogBetweenCursorsForCaching(
-                            
serviceConfig.getManagedLedgerMaxBacklogBetweenCursorsForCaching());
-
-                    OffloadPoliciesImpl nsLevelOffloadPolicies =
-                            (OffloadPoliciesImpl) policies.map(p -> 
p.offload_policies).orElse(null);
-                    OffloadPoliciesImpl offloadPolicies = 
OffloadPoliciesImpl.mergeConfiguration(
-                            topicLevelOffloadPolicies,
-                            
OffloadPoliciesImpl.oldPoliciesCompatible(nsLevelOffloadPolicies, 
policies.orElse(null)),
-                            getPulsar().getConfig().getProperties());
-                    if 
(NamespaceService.isSystemServiceNamespace(namespace.toString())) {
-                        
managedLedgerConfig.setLedgerOffloader(NullLedgerOffloader.INSTANCE);
-                    } else  {
-                        if (topicLevelOffloadPolicies != null) {
-                            try {
-                                LedgerOffloader topicLevelLedgerOffLoader =
-                                        
pulsar().createManagedLedgerOffloader(offloadPolicies);
-                                
managedLedgerConfig.setLedgerOffloader(topicLevelLedgerOffLoader);
-                            } catch (PulsarServerException e) {
-                                throw new RuntimeException(e);
-                            }
-                        } else {
-                            //If the topic level policy is null, use the 
namespace level
-                            managedLedgerConfig
-                                    
.setLedgerOffloader(pulsar.getManagedLedgerOffloader(namespace, 
offloadPolicies));
+                
managedLedgerConfig.setThrottleMarkDelete(persistencePolicies.getManagedLedgerMaxMarkDeleteRate());
+                
managedLedgerConfig.setDigestType(serviceConfig.getManagedLedgerDigestType());
+                
managedLedgerConfig.setPassword(serviceConfig.getManagedLedgerPassword());
+
+                managedLedgerConfig
+                        
.setMaxUnackedRangesToPersist(serviceConfig.getManagedLedgerMaxUnackedRangesToPersist());
+                
managedLedgerConfig.setPersistentUnackedRangesWithMultipleEntriesEnabled(
+                        
serviceConfig.isPersistentUnackedRangesWithMultipleEntriesEnabled());
+                
managedLedgerConfig.setMaxUnackedRangesToPersistInMetadataStore(
+                        
serviceConfig.getManagedLedgerMaxUnackedRangesToPersistInMetadataStore());
+                
managedLedgerConfig.setMaxEntriesPerLedger(serviceConfig.getManagedLedgerMaxEntriesPerLedger());
+                managedLedgerConfig
+                        
.setMinimumRolloverTime(serviceConfig.getManagedLedgerMinLedgerRolloverTimeMinutes(),
+                                TimeUnit.MINUTES);
+                managedLedgerConfig
+                        
.setMaximumRolloverTime(serviceConfig.getManagedLedgerMaxLedgerRolloverTimeMinutes(),
+                                TimeUnit.MINUTES);
+                
managedLedgerConfig.setMaxSizePerLedgerMb(serviceConfig.getManagedLedgerMaxSizePerLedgerMbytes());
+
+                managedLedgerConfig.setMetadataOperationsTimeoutSeconds(
+                        
serviceConfig.getManagedLedgerMetadataOperationsTimeoutSeconds());
+                managedLedgerConfig
+                        
.setReadEntryTimeoutSeconds(serviceConfig.getManagedLedgerReadEntryTimeoutSeconds());
+                managedLedgerConfig
+                        
.setAddEntryTimeoutSeconds(serviceConfig.getManagedLedgerAddEntryTimeoutSeconds());
+                
managedLedgerConfig.setMetadataEnsembleSize(serviceConfig.getManagedLedgerDefaultEnsembleSize());
+                managedLedgerConfig.setUnackedRangesOpenCacheSetEnabled(
+                        
serviceConfig.isManagedLedgerUnackedRangesOpenCacheSetEnabled());
+                
managedLedgerConfig.setMetadataWriteQuorumSize(serviceConfig.getManagedLedgerDefaultWriteQuorum());
+                
managedLedgerConfig.setMetadataAckQuorumSize(serviceConfig.getManagedLedgerDefaultAckQuorum());
+                managedLedgerConfig
+                        
.setMetadataMaxEntriesPerLedger(serviceConfig.getManagedLedgerCursorMaxEntriesPerLedger());
+
+                managedLedgerConfig
+                        
.setLedgerRolloverTimeout(serviceConfig.getManagedLedgerCursorRolloverTimeInSeconds());
+                managedLedgerConfig
+                        
.setRetentionTime(retentionPolicies.getRetentionTimeInMinutes(), 
TimeUnit.MINUTES);
+                
managedLedgerConfig.setRetentionSizeInMB(retentionPolicies.getRetentionSizeInMB());
+                
managedLedgerConfig.setAutoSkipNonRecoverableData(serviceConfig.isAutoSkipNonRecoverableData());
+                
managedLedgerConfig.setLazyCursorRecovery(serviceConfig.isLazyCursorRecovery());
+                managedLedgerConfig.setInactiveLedgerRollOverTime(
+                        
serviceConfig.getManagedLedgerInactiveLedgerRolloverTimeSeconds(), 
TimeUnit.SECONDS);
+                managedLedgerConfig.setCacheEvictionByMarkDeletedPosition(
+                        serviceConfig.isCacheEvictionByMarkDeletedPosition());
+                managedLedgerConfig.setMinimumBacklogCursorsForCaching(
+                        
serviceConfig.getManagedLedgerMinimumBacklogCursorsForCaching());
+                managedLedgerConfig.setMinimumBacklogEntriesForCaching(
+                        
serviceConfig.getManagedLedgerMinimumBacklogEntriesForCaching());
+                managedLedgerConfig.setMaxBacklogBetweenCursorsForCaching(
+                        
serviceConfig.getManagedLedgerMaxBacklogBetweenCursorsForCaching());
+
+                OffloadPoliciesImpl nsLevelOffloadPolicies =
+                        (OffloadPoliciesImpl) policies.map(p -> 
p.offload_policies).orElse(null);
+                OffloadPoliciesImpl offloadPolicies = 
OffloadPoliciesImpl.mergeConfiguration(
+                        topicLevelOffloadPolicies,
+                        
OffloadPoliciesImpl.oldPoliciesCompatible(nsLevelOffloadPolicies, 
policies.orElse(null)),
+                        getPulsar().getConfig().getProperties());
+                if 
(NamespaceService.isSystemServiceNamespace(namespace.toString())) {
+                    
managedLedgerConfig.setLedgerOffloader(NullLedgerOffloader.INSTANCE);
+                } else  {
+                    if (topicLevelOffloadPolicies != null) {
+                        try {
+                            LedgerOffloader topicLevelLedgerOffLoader =
+                                    
pulsar().createManagedLedgerOffloader(offloadPolicies);
+                            
managedLedgerConfig.setLedgerOffloader(topicLevelLedgerOffLoader);
+                        } catch (PulsarServerException e) {
+                            throw new RuntimeException(e);
                         }
+                    } else {
+                        //If the topic level policy is null, use the namespace 
level
+                        managedLedgerConfig
+                                
.setLedgerOffloader(pulsar.getManagedLedgerOffloader(namespace, 
offloadPolicies));
                     }
+                }
 
-                    managedLedgerConfig.setDeletionAtBatchIndexLevelEnabled(
-                            
serviceConfig.isAcknowledgmentAtBatchIndexLevelEnabled());
-                    managedLedgerConfig.setNewEntriesCheckDelayInMillis(
-                            
serviceConfig.getManagedLedgerNewEntriesCheckDelayInMillis());
-                    return managedLedgerConfig;
-                });
+                managedLedgerConfig.setDeletionAtBatchIndexLevelEnabled(
+                        
serviceConfig.isAcknowledgmentAtBatchIndexLevelEnabled());
+                managedLedgerConfig.setNewEntriesCheckDelayInMillis(
+                        
serviceConfig.getManagedLedgerNewEntriesCheckDelayInMillis());
+                return managedLedgerConfig;
+            });
+        });
     }
 
     private void addTopicToStatsMaps(TopicName topicName, Topic topic) {
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesService.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesService.java
index 09f8de818db..ed76d37ae25 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesService.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesService.java
@@ -18,12 +18,14 @@
  */
 package org.apache.pulsar.broker.service;
 
+import static java.util.Objects.requireNonNull;
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.collect.Sets;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Objects;
+import java.util.Optional;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.CopyOnWriteArrayList;
@@ -54,6 +56,7 @@ import org.apache.pulsar.common.naming.NamespaceName;
 import org.apache.pulsar.common.naming.TopicName;
 import org.apache.pulsar.common.policies.data.TopicPolicies;
 import org.apache.pulsar.common.util.FutureUtil;
+import org.jetbrains.annotations.NotNull;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -78,8 +81,8 @@ public class SystemTopicBasedTopicPoliciesService implements 
TopicPoliciesServic
 
     private final Map<NamespaceName, 
CompletableFuture<SystemTopicClient.Reader<PulsarEvent>>>
             readerCaches = new ConcurrentHashMap<>();
-    @VisibleForTesting
-    final Map<NamespaceName, Boolean> policyCacheInitMap = new 
ConcurrentHashMap<>();
+
+    final Map<NamespaceName, CompletableFuture<Void>> policyCacheInitMap = new 
ConcurrentHashMap<>();
 
     @VisibleForTesting
     final Map<TopicName, List<TopicPolicyListener<TopicPolicies>>> listeners = 
new ConcurrentHashMap<>();
@@ -219,12 +222,12 @@ public class SystemTopicBasedTopicPoliciesService 
implements TopicPoliciesServic
                                           boolean isGlobal) throws 
TopicPoliciesCacheNotInitException {
         if (!policyCacheInitMap.containsKey(topicName.getNamespaceObject())) {
             NamespaceName namespace = topicName.getNamespaceObject();
-            prepareInitPoliciesCache(namespace, new CompletableFuture<>());
+            prepareInitPoliciesCacheAsync(namespace);
         }
 
         MutablePair<TopicPoliciesCacheNotInitException, TopicPolicies> result 
= new MutablePair<>();
         policyCacheInitMap.compute(topicName.getNamespaceObject(), (k, 
initialized) -> {
-            if (initialized == null || !initialized) {
+            if (initialized == null || !initialized.isDone()) {
                 result.setLeft(new TopicPoliciesCacheNotInitException());
             } else {
                 TopicPolicies topicPolicies =
@@ -242,6 +245,34 @@ public class SystemTopicBasedTopicPoliciesService 
implements TopicPoliciesServic
         }
     }
 
+    @NotNull
+    @Override
+    public CompletableFuture<Optional<TopicPolicies>> 
getTopicPoliciesAsync(@NotNull TopicName topicName,
+                                                                            
boolean isGlobal) {
+        requireNonNull(topicName);
+        final CompletableFuture<Void> preparedFuture = 
prepareInitPoliciesCacheAsync(topicName.getNamespaceObject());
+        return preparedFuture.thenApply(__ -> {
+            final TopicPolicies candidatePolicies = isGlobal
+                    ? 
globalPoliciesCache.get(TopicName.get(topicName.getPartitionedTopicName()))
+                    : 
policiesCache.get(TopicName.get(topicName.getPartitionedTopicName()));
+            return Optional.ofNullable(candidatePolicies);
+        });
+    }
+
+    @NotNull
+    @Override
+    public CompletableFuture<Optional<TopicPolicies>> 
getTopicPoliciesAsync(@NotNull TopicName topicName) {
+        requireNonNull(topicName);
+        final CompletableFuture<Void> preparedFuture = 
prepareInitPoliciesCacheAsync(topicName.getNamespaceObject());
+        return preparedFuture.thenApply(__ -> {
+            final TopicPolicies localPolicies = 
policiesCache.get(TopicName.get(topicName.getPartitionedTopicName()));
+            if (localPolicies != null) {
+                return Optional.of(localPolicies);
+            }
+            return 
Optional.ofNullable(globalPoliciesCache.get(TopicName.get(topicName.getPartitionedTopicName())));
+        });
+    }
+
     @Override
     public TopicPolicies getTopicPoliciesIfExists(TopicName topicName) {
         return 
policiesCache.get(TopicName.get(topicName.getPartitionedTopicName()));
@@ -265,39 +296,48 @@ public class SystemTopicBasedTopicPoliciesService 
implements TopicPoliciesServic
 
     @Override
     public CompletableFuture<Void> 
addOwnedNamespaceBundleAsync(NamespaceBundle namespaceBundle) {
-        CompletableFuture<Void> result = new CompletableFuture<>();
         NamespaceName namespace = namespaceBundle.getNamespaceObject();
         if (NamespaceService.isHeartbeatNamespace(namespace)) {
-            result.complete(null);
-            return result;
+            return CompletableFuture.completedFuture(null);
         }
         synchronized (this) {
             if (readerCaches.get(namespace) != null) {
                 ownedBundlesCountPerNamespace.get(namespace).incrementAndGet();
-                result.complete(null);
+                return CompletableFuture.completedFuture(null);
             } else {
-                prepareInitPoliciesCache(namespace, result);
+                return prepareInitPoliciesCacheAsync(namespace);
             }
         }
-        return result;
     }
 
-    private void prepareInitPoliciesCache(@Nonnull NamespaceName namespace, 
CompletableFuture<Void> result) {
-        if (policyCacheInitMap.putIfAbsent(namespace, false) == null) {
-            CompletableFuture<SystemTopicClient.Reader<PulsarEvent>> 
readerCompletableFuture =
+    private @Nonnull CompletableFuture<Void> 
prepareInitPoliciesCacheAsync(@Nonnull NamespaceName namespace) {
+        requireNonNull(namespace);
+        return policyCacheInitMap.computeIfAbsent(namespace, (k) -> {
+            final CompletableFuture<SystemTopicClient.Reader<PulsarEvent>> 
readerCompletableFuture =
                     createSystemTopicClientWithRetry(namespace);
             readerCaches.put(namespace, readerCompletableFuture);
             ownedBundlesCountPerNamespace.putIfAbsent(namespace, new 
AtomicInteger(1));
-            readerCompletableFuture.thenAccept(reader -> {
-                initPolicesCache(reader, result);
-                result.thenRun(() -> readMorePolicies(reader));
-            }).exceptionally(ex -> {
-                log.error("[{}] Failed to create reader on __change_events 
topic", namespace, ex);
-                cleanCacheAndCloseReader(namespace, false);
-                result.completeExceptionally(ex);
+            final CompletableFuture<Void> initFuture = readerCompletableFuture
+                    .thenCompose(reader -> {
+                        final CompletableFuture<Void> stageFuture = new 
CompletableFuture<>();
+                        initPolicesCache(reader, stageFuture);
+                        return stageFuture
+                                // Read policies in background
+                                .thenAccept(__ -> 
readMorePoliciesAsync(reader));
+                    });
+            initFuture.exceptionally(ex -> {
+                try {
+                    log.error("[{}] Failed to create reader on __change_events 
topic", namespace, ex);
+                    cleanCacheAndCloseReader(namespace, false);
+                } catch (Throwable cleanupEx) {
+                    // Adding this catch to avoid break callback chain
+                    log.error("[{}] Failed to cleanup reader on 
__change_events topic", namespace, cleanupEx);
+                }
                 return null;
             });
-        }
+            // let caller know we've got an exception.
+            return initFuture;
+        });
     }
 
     protected CompletableFuture<SystemTopicClient.Reader<PulsarEvent>> 
createSystemTopicClientWithRetry(
@@ -381,8 +421,7 @@ public class SystemTopicBasedTopicPoliciesService 
implements TopicPoliciesServic
                 if (log.isDebugEnabled()) {
                     log.debug("[{}] Reach the end of the system topic.", 
reader.getSystemTopic().getTopicName());
                 }
-                policyCacheInitMap.computeIfPresent(
-                        
reader.getSystemTopic().getTopicName().getNamespaceObject(), (k, v) -> true);
+
                 // replay policy message
                 policiesCache.forEach(((topicName, topicPolicies) -> {
                     if (listeners.get(topicName) != null) {
@@ -395,6 +434,7 @@ public class SystemTopicBasedTopicPoliciesService 
implements TopicPoliciesServic
                         }
                     }
                 }));
+
                 future.complete(null);
             }
         });
@@ -420,7 +460,13 @@ public class SystemTopicBasedTopicPoliciesService 
implements TopicPoliciesServic
         });
     }
 
-    private void readMorePolicies(SystemTopicClient.Reader<PulsarEvent> 
reader) {
+    /**
+     * This is an async method for the background reader to continue syncing 
new messages.
+     *
+     * Note: You should not do any blocking call here. because it will affect
+     * #{@link 
SystemTopicBasedTopicPoliciesService#getTopicPoliciesAsync(TopicName)} method 
to block loading topic.
+     */
+    private void readMorePoliciesAsync(SystemTopicClient.Reader<PulsarEvent> 
reader) {
         reader.readNextAsync()
                 .thenAccept(msg -> {
                     refreshTopicPoliciesCache(msg);
@@ -428,7 +474,7 @@ public class SystemTopicBasedTopicPoliciesService 
implements TopicPoliciesServic
                 })
                 .whenComplete((__, ex) -> {
                     if (ex == null) {
-                        readMorePolicies(reader);
+                        readMorePoliciesAsync(reader);
                     } else {
                         Throwable cause = 
FutureUtil.unwrapCompletionException(ex);
                         if (cause instanceof 
PulsarClientException.AlreadyClosedException) {
@@ -437,7 +483,7 @@ public class SystemTopicBasedTopicPoliciesService 
implements TopicPoliciesServic
                                     
reader.getSystemTopic().getTopicName().getNamespaceObject(), false);
                         } else {
                             log.warn("Read more topic polices exception, read 
again.", ex);
-                            readMorePolicies(reader);
+                            readMorePoliciesAsync(reader);
                         }
                     }
                 });
@@ -605,7 +651,7 @@ public class SystemTopicBasedTopicPoliciesService 
implements TopicPoliciesServic
     }
 
     @VisibleForTesting
-    public Boolean getPoliciesCacheInit(NamespaceName namespaceName) {
+    public CompletableFuture<Void> getPoliciesCacheInit(NamespaceName 
namespaceName) {
         return policyCacheInitMap.get(namespaceName);
     }
 
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/TopicPoliciesService.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/TopicPoliciesService.java
index c4bcc0c3935..aa3a6aaeff2 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/TopicPoliciesService.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/TopicPoliciesService.java
@@ -22,6 +22,7 @@ import java.util.Optional;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.TimeUnit;
+import javax.annotation.Nonnull;
 import 
org.apache.pulsar.broker.service.BrokerServiceException.TopicPoliciesCacheNotInitException;
 import org.apache.pulsar.client.impl.Backoff;
 import org.apache.pulsar.client.impl.BackoffBuilder;
@@ -31,6 +32,7 @@ import org.apache.pulsar.common.naming.NamespaceBundle;
 import org.apache.pulsar.common.naming.TopicName;
 import org.apache.pulsar.common.policies.data.TopicPolicies;
 import org.apache.pulsar.common.util.FutureUtil;
+import org.jetbrains.annotations.NotNull;
 
 /**
  * Topic policies service.
@@ -109,6 +111,32 @@ public interface TopicPoliciesService {
         return response;
     }
 
+    /**
+     * Asynchronously retrieves topic policies.
+     * This triggers the Pulsar broker's internal client to load policies from 
the
+     * system topic `persistent://tenant/namespace/__change_event`.
+     *
+     * @param topicName The name of the topic.
+     * @param isGlobal Indicates if the policies are global.
+     * @return A CompletableFuture containing an Optional of TopicPolicies.
+     * @throws NullPointerException If the topicName is null.
+     */
+    @Nonnull
+    CompletableFuture<Optional<TopicPolicies>> getTopicPoliciesAsync(@Nonnull 
TopicName topicName, boolean isGlobal);
+
+    /**
+     * Asynchronously retrieves topic policies.
+     * This triggers the Pulsar broker's internal client to load policies from 
the
+     * system topic `persistent://tenant/namespace/__change_event`.
+     *
+     * NOTE: If local policies are not available, it will fallback to using 
topic global policies.
+     * @param topicName The name of the topic.
+     * @return A CompletableFuture containing an Optional of TopicPolicies.
+     * @throws NullPointerException If the topicName is null.
+     */
+    @Nonnull
+    CompletableFuture<Optional<TopicPolicies>> getTopicPoliciesAsync(@Nonnull 
TopicName topicName);
+
     /**
      * Get policies for a topic without cache async.
      * @param topicName topic name
@@ -162,6 +190,19 @@ public interface TopicPoliciesService {
             return null;
         }
 
+        @NotNull
+        @Override
+        public CompletableFuture<Optional<TopicPolicies>> 
getTopicPoliciesAsync(@NotNull TopicName topicName,
+                                                                               
 boolean isGlobal) {
+            return CompletableFuture.completedFuture(Optional.empty());
+        }
+
+        @NotNull
+        @Override
+        public CompletableFuture<Optional<TopicPolicies>> 
getTopicPoliciesAsync(@NotNull TopicName topicName) {
+            return CompletableFuture.completedFuture(Optional.empty());
+        }
+
         @Override
         public TopicPolicies getTopicPoliciesIfExists(TopicName topicName) {
             return null;
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/PersistentTopicsTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/PersistentTopicsTest.java
index a4f6bd4650f..25ad6cab942 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/PersistentTopicsTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/PersistentTopicsTest.java
@@ -127,6 +127,7 @@ public class PersistentTopicsTest extends 
MockedPulsarServiceBaseTest {
     @Override
     @BeforeMethod
     protected void setup() throws Exception {
+        conf.setTopicLevelPoliciesEnabled(false);
         super.internalSetup();
         persistentTopics = spy(PersistentTopics.class);
         persistentTopics.setServletContext(new MockServletContext());
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TopicPoliciesTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TopicPoliciesTest.java
index 87471f4972f..faf141a5d1c 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TopicPoliciesTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TopicPoliciesTest.java
@@ -180,7 +180,7 @@ public class TopicPoliciesTest extends 
MockedPulsarServiceBaseTest {
         assertFalse(pulsar.getBrokerService().getTopics().containsKey(topic));
         //make sure namespace policy reader is fully started.
         Awaitility.await().untilAsserted(()-> {
-            
assertTrue(policyService.getPoliciesCacheInit(topicName.getNamespaceObject()));
+            
assertTrue(policyService.getPoliciesCacheInit(topicName.getNamespaceObject()).isDone());
         });
 
         //load the topic.
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TopicPoliciesWithBrokerRestartTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TopicPoliciesWithBrokerRestartTest.java
new file mode 100644
index 00000000000..672fc2c95f8
--- /dev/null
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TopicPoliciesWithBrokerRestartTest.java
@@ -0,0 +1,104 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.admin;
+
+import lombok.Cleanup;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl;
+import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest;
+import org.apache.pulsar.broker.service.persistent.PersistentTopic;
+import org.apache.pulsar.client.api.Producer;
+import org.apache.pulsar.common.policies.data.RetentionPolicies;
+import org.awaitility.Awaitility;
+import org.testng.Assert;
+import org.testng.annotations.AfterClass;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Test;
+import java.nio.charset.StandardCharsets;
+import java.util.concurrent.TimeUnit;
+
+@Slf4j
+@Test(groups = "broker-admin")
+public class TopicPoliciesWithBrokerRestartTest extends 
MockedPulsarServiceBaseTest {
+
+    @Override
+    @BeforeClass(alwaysRun = true)
+    protected void setup() throws Exception {
+        super.internalSetup();
+        setupDefaultTenantAndNamespace();
+    }
+
+    @Override
+    @AfterClass(alwaysRun = true)
+    protected void cleanup() throws Exception {
+        super.internalCleanup();
+    }
+
+
+    @Test
+    public void testRetentionWithBrokerRestart() throws Exception {
+        final int messages = 1_000;
+        final int topicNum = 500;
+        // (1) Init topic
+        admin.namespaces().createNamespace("public/retention");
+        final String topicName = 
"persistent://public/retention/retention_with_broker_restart";
+        admin.topics().createNonPartitionedTopic(topicName);
+        for (int i = 0; i < topicNum; i++) {
+            final String shadowTopicNames = topicName + "_" + i;
+            admin.topics().createNonPartitionedTopic(shadowTopicNames);
+        }
+        // (2) Set retention
+        final RetentionPolicies retentionPolicies = new RetentionPolicies(20, 
20);
+        for (int i = 0; i < topicNum; i++) {
+            final String shadowTopicNames = topicName + "_" + i;
+            admin.topicPolicies().setRetention(shadowTopicNames, 
retentionPolicies);
+        }
+        admin.topicPolicies().setRetention(topicName, retentionPolicies);
+        // (3) Send messages
+        @Cleanup
+        final Producer<byte[]> publisher = pulsarClient.newProducer()
+                .topic(topicName)
+                .create();
+        for (int i = 0; i < messages; i++) {
+            publisher.send((i + "").getBytes(StandardCharsets.UTF_8));
+        }
+        // (4) Check configuration
+        Awaitility.await().untilAsserted(() -> {
+            final PersistentTopic persistentTopic1 = (PersistentTopic)
+                    pulsar.getBrokerService().getTopic(topicName, 
true).join().get();
+            final ManagedLedgerImpl managedLedger1 = (ManagedLedgerImpl) 
persistentTopic1.getManagedLedger();
+            
Assert.assertEquals(managedLedger1.getConfig().getRetentionSizeInMB(), 20);
+            
Assert.assertEquals(managedLedger1.getConfig().getRetentionTimeMillis(),
+                    TimeUnit.MINUTES.toMillis(20));
+        });
+        // (5) Restart broker
+        restartBroker();
+        // (6) Check configuration again
+        for (int i = 0; i < topicNum; i++) {
+            final String shadowTopicNames = topicName + "_" + i;
+            admin.lookups().lookupTopic(shadowTopicNames);
+            final PersistentTopic persistentTopicTmp = (PersistentTopic)
+                    pulsar.getBrokerService().getTopic(shadowTopicNames, 
true).join().get();
+            final ManagedLedgerImpl managedLedgerTemp = (ManagedLedgerImpl) 
persistentTopicTmp.getManagedLedger();
+            
Assert.assertEquals(managedLedgerTemp.getConfig().getRetentionSizeInMB(), 20);
+            
Assert.assertEquals(managedLedgerTemp.getConfig().getRetentionTimeMillis(),
+                    TimeUnit.MINUTES.toMillis(20));
+        }
+    }
+}
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TopicsAuthTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TopicsAuthTest.java
index efd8b66d754..234af7afa8d 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TopicsAuthTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TopicsAuthTest.java
@@ -84,6 +84,8 @@ public class TopicsAuthTest extends 
MockedPulsarServiceBaseTest {
         Set<String> providers = new HashSet<>();
         providers.add(AuthenticationProviderToken.class.getName());
         conf.setAuthenticationProviders(providers);
+        
conf.setBrokerClientAuthenticationPlugin(AuthenticationToken.class.getName());
+        conf.setBrokerClientAuthenticationParameters("token:" + ADMIN_TOKEN);
         super.internalSetup();
         PulsarAdminBuilder pulsarAdminBuilder = 
PulsarAdmin.builder().serviceHttpUrl(brokerUrl != null
                 ? brokerUrl.toString() : brokerUrlTls.toString())
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/auth/AuthLogsTest.java 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/auth/AuthLogsTest.java
index 6ffcecbeb9f..942a42fa7aa 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/auth/AuthLogsTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/auth/AuthLogsTest.java
@@ -60,6 +60,8 @@ public class AuthLogsTest extends MockedPulsarServiceBaseTest 
{
         conf.setAuthorizationEnabled(true);
         conf.setAuthorizationAllowWildcardsMatching(true);
         conf.setSuperUserRoles(Sets.newHashSet("super"));
+        
conf.setBrokerClientAuthenticationPlugin(MockAuthentication.class.getName());
+        conf.setBrokerClientAuthenticationParameters("user:pass.pass");
         internalSetup();
 
         try (PulsarAdmin admin = PulsarAdmin.builder()
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/auth/MockAuthentication.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/auth/MockAuthentication.java
index 0b1726617f7..25ac59796b0 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/auth/MockAuthentication.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/auth/MockAuthentication.java
@@ -29,7 +29,10 @@ import org.slf4j.LoggerFactory;
 
 public class MockAuthentication implements Authentication {
     private static final Logger log = 
LoggerFactory.getLogger(MockAuthentication.class);
-    private final String user;
+    private String user;
+
+    public MockAuthentication() {
+    }
 
     public MockAuthentication(String user) {
         this.user = user;
@@ -67,6 +70,7 @@ public class MockAuthentication implements Authentication {
 
     @Override
     public void configure(Map<String, String> authParams) {
+        this.user = authParams.get("user");
     }
 
     @Override
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/auth/MockedPulsarServiceBaseTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/auth/MockedPulsarServiceBaseTest.java
index 5803165c1ac..2c8ef373f09 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/auth/MockedPulsarServiceBaseTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/auth/MockedPulsarServiceBaseTest.java
@@ -42,6 +42,7 @@ import lombok.AllArgsConstructor;
 import lombok.Data;
 import org.apache.pulsar.broker.PulsarService;
 import org.apache.pulsar.broker.ServiceConfiguration;
+import org.apache.pulsar.broker.authentication.AuthenticationProviderTls;
 import org.apache.pulsar.broker.service.BrokerService;
 import org.apache.pulsar.broker.service.BrokerTestBase;
 import org.apache.pulsar.broker.service.persistent.PersistentTopic;
@@ -53,6 +54,8 @@ import org.apache.pulsar.client.api.ClientBuilder;
 import org.apache.pulsar.client.api.PulsarClient;
 import org.apache.pulsar.client.api.PulsarClientException;
 import org.apache.pulsar.client.impl.ProducerImpl;
+import org.apache.pulsar.client.impl.auth.AuthenticationDisabled;
+import org.apache.pulsar.client.impl.auth.AuthenticationTls;
 import org.apache.pulsar.common.policies.data.ClusterData;
 import org.apache.pulsar.common.policies.data.TenantInfo;
 import org.apache.pulsar.common.policies.data.TenantInfoImpl;
@@ -235,6 +238,22 @@ public abstract class MockedPulsarServiceBaseTest extends 
TestRetrySupport {
 
     protected final void init() throws Exception {
         doInitConf();
+        // trying to config the broker internal client
+        if (conf.getWebServicePortTls().isPresent()
+            && 
conf.getAuthenticationProviders().contains(AuthenticationProviderTls.class.getName())
+            && !conf.isTlsEnabledWithKeyStore()) {
+            // enabled TLS
+            if (conf.getBrokerClientAuthenticationPlugin() == null
+                || 
conf.getBrokerClientAuthenticationPlugin().equals(AuthenticationDisabled.class.getName()))
 {
+                
conf.setBrokerClientAuthenticationPlugin(AuthenticationTls.class.getName());
+                conf.setBrokerClientAuthenticationParameters("tlsCertFile:" + 
BROKER_CERT_FILE_PATH
+                                                             + ",tlsKeyFile:" 
+ BROKER_KEY_FILE_PATH);
+                conf.setBrokerClientTlsEnabled(true);
+                conf.setBrokerClientTrustCertsFilePath(CA_CERT_FILE_PATH);
+                conf.setBrokerClientCertificateFilePath(BROKER_CERT_FILE_PATH);
+                conf.setBrokerClientKeyFilePath(BROKER_KEY_FILE_PATH);
+            }
+        }
         startBroker();
     }
 
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerBookieIsolationTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerBookieIsolationTest.java
index 951892f4ebf..5252407892e 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerBookieIsolationTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerBookieIsolationTest.java
@@ -304,6 +304,7 @@ public class BrokerBookieIsolationTest {
                 bookies[3].getBookieId());
 
         ServiceConfiguration config = new ServiceConfiguration();
+        config.setTopicLevelPoliciesEnabled(false);
         config.setLoadManagerClassName(ModularLoadManagerImpl.class.getName());
         config.setClusterName(cluster);
         config.setWebServicePort(Optional.of(0));
@@ -612,9 +613,9 @@ public class BrokerBookieIsolationTest {
         config.setBrokerShutdownTimeoutMs(0L);
         config.setLoadBalancerOverrideBrokerNicSpeedGbps(Optional.of(1.0d));
         config.setBrokerServicePort(Optional.of(0));
+        config.setTopicLevelPoliciesEnabled(false);
         config.setAdvertisedAddress("localhost");
         
config.setBookkeeperClientIsolationGroups(brokerBookkeeperClientIsolationGroups);
-
         config.setManagedLedgerDefaultEnsembleSize(2);
         config.setManagedLedgerDefaultWriteQuorum(2);
         config.setManagedLedgerDefaultAckQuorum(2);
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesServiceTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesServiceTest.java
index 31b5bcb23cd..5b70ff99675 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesServiceTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesServiceTest.java
@@ -141,7 +141,7 @@ public class SystemTopicBasedTopicPoliciesServiceTest 
extends MockedPulsarServic
         // Wait for all topic policies updated.
         Awaitility.await().untilAsserted(() ->
                 Assert.assertTrue(systemTopicBasedTopicPoliciesService
-                        .getPoliciesCacheInit(TOPIC1.getNamespaceObject())));
+                        
.getPoliciesCacheInit(TOPIC1.getNamespaceObject()).isDone()));
 
         // Assert broker is cache all topic policies
         Awaitility.await().untilAsserted(() ->
@@ -304,8 +304,8 @@ public class SystemTopicBasedTopicPoliciesServiceTest 
extends MockedPulsarServic
     @Test
     public void testGetPolicyTimeout() throws Exception {
         SystemTopicBasedTopicPoliciesService service = 
(SystemTopicBasedTopicPoliciesService) pulsar.getTopicPoliciesService();
-        Awaitility.await().untilAsserted(() -> 
assertTrue(service.policyCacheInitMap.get(TOPIC1.getNamespaceObject())));
-        service.policyCacheInitMap.put(TOPIC1.getNamespaceObject(), false);
+        Awaitility.await().untilAsserted(() -> 
assertTrue(service.policyCacheInitMap.get(TOPIC1.getNamespaceObject()).isDone()));
+        service.policyCacheInitMap.put(TOPIC1.getNamespaceObject(), new 
CompletableFuture<>());
         long start = System.currentTimeMillis();
         Backoff backoff = new BackoffBuilder()
                 .setInitialTime(500, TimeUnit.MILLISECONDS)
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentTopicTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentTopicTest.java
index e29d015c45d..9995b6a28a9 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentTopicTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentTopicTest.java
@@ -45,6 +45,7 @@ import java.util.Collection;
 import java.util.Collections;
 import java.util.HashSet;
 import java.util.List;
+import java.util.Optional;
 import java.util.Set;
 import java.util.UUID;
 import java.util.concurrent.CompletableFuture;
@@ -620,7 +621,7 @@ public class PersistentTopicTest extends BrokerTestBase {
         doReturn(policiesService).when(pulsar).getTopicPoliciesService();
         TopicPolicies policies = new TopicPolicies();
         policies.setRetentionPolicies(retentionPolicies);
-        
doReturn(policies).when(policiesService).getTopicPoliciesIfExists(TopicName.get(topic));
+        
doReturn(CompletableFuture.completedFuture(Optional.of(policies))).when(policiesService).getTopicPoliciesAsync(TopicName.get(topic));
         persistentTopic.onUpdate(policies);
         verify(persistentTopic, times(1)).checkPersistencePolicies();
         Awaitility.await().untilAsserted(() -> {
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/AuthenticationTlsHostnameVerificationTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/AuthenticationTlsHostnameVerificationTest.java
index e3bd321d763..042c9b328d5 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/AuthenticationTlsHostnameVerificationTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/AuthenticationTlsHostnameVerificationTest.java
@@ -30,6 +30,7 @@ import 
org.apache.pulsar.broker.authentication.AuthenticationProviderTls;
 import org.apache.pulsar.client.impl.auth.AuthenticationTls;
 import org.apache.pulsar.common.tls.PublicSuffixMatcher;
 import org.apache.pulsar.common.tls.TlsHostnameVerifier;
+import org.assertj.core.util.Sets;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.testng.Assert;
@@ -141,6 +142,7 @@ public class AuthenticationTlsHostnameVerificationTest 
extends ProducerConsumerB
         // setup broker cert which has CN = "pulsar" different than broker's 
hostname="localhost"
         conf.setBrokerServicePortTls(Optional.of(0));
         conf.setWebServicePortTls(Optional.of(0));
+        
conf.setAuthenticationProviders(Sets.newTreeSet(AuthenticationProviderTls.class.getName()));
         conf.setTlsTrustCertsFilePath(CA_CERT_FILE_PATH);
         conf.setTlsCertificateFilePath(TLS_MIM_SERVER_CERT_FILE_PATH);
         conf.setTlsKeyFilePath(TLS_MIM_SERVER_KEY_FILE_PATH);
@@ -182,6 +184,7 @@ public class AuthenticationTlsHostnameVerificationTest 
extends ProducerConsumerB
         // setup broker cert which has CN = "localhost"
         conf.setBrokerServicePortTls(Optional.of(0));
         conf.setWebServicePortTls(Optional.of(0));
+        
conf.setAuthenticationProviders(Sets.newTreeSet(AuthenticationProviderTls.class.getName()));
         conf.setTlsTrustCertsFilePath(CA_CERT_FILE_PATH);
         conf.setTlsCertificateFilePath(BROKER_CERT_FILE_PATH);
         conf.setTlsKeyFilePath(BROKER_KEY_FILE_PATH);
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/AuthorizationProducerConsumerTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/AuthorizationProducerConsumerTest.java
index bdbe8efc8e6..9a36e0683b4 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/AuthorizationProducerConsumerTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/AuthorizationProducerConsumerTest.java
@@ -119,6 +119,7 @@ public class AuthorizationProducerConsumerTest extends 
ProducerConsumerBase {
     public void testProducerAndConsumerAuthorization() throws Exception {
         log.info("-- Starting {} test --", methodName);
         cleanup();
+        conf.setTopicLevelPoliciesEnabled(false);
         
conf.setAuthorizationProvider(TestAuthorizationProvider.class.getName());
         setup();
 
@@ -179,6 +180,7 @@ public class AuthorizationProducerConsumerTest extends 
ProducerConsumerBase {
     public void testSubscriberPermission() throws Exception {
         log.info("-- Starting {} test --", methodName);
         cleanup();
+        conf.setTopicLevelPoliciesEnabled(false);
         conf.setEnablePackagesManagement(true);
         
conf.setPackagesManagementStorageProvider(MockedPackagesStorageProvider.class.getName());
         
conf.setAuthorizationProvider(PulsarAuthorizationProvider.class.getName());
@@ -369,6 +371,7 @@ public class AuthorizationProducerConsumerTest extends 
ProducerConsumerBase {
     public void testClearBacklogPermission() throws Exception {
         log.info("-- Starting {} test --", methodName);
         cleanup();
+        conf.setTopicLevelPoliciesEnabled(false);
         
conf.setAuthorizationProvider(PulsarAuthorizationProvider.class.getName());
         setup();
 
@@ -610,6 +613,7 @@ public class AuthorizationProducerConsumerTest extends 
ProducerConsumerBase {
     public void testSubscriptionPrefixAuthorization() throws Exception {
         log.info("-- Starting {} test --", methodName);
         cleanup();
+        conf.setTopicLevelPoliciesEnabled(false);
         
conf.setAuthorizationProvider(TestAuthorizationProviderWithSubscriptionPrefix.class.getName());
         setup();
 
@@ -749,6 +753,7 @@ public class AuthorizationProducerConsumerTest extends 
ProducerConsumerBase {
     public void testPermissionForProducerCreateInitialSubscription() throws 
Exception {
         log.info("-- Starting {} test --", methodName);
         cleanup();
+        conf.setTopicLevelPoliciesEnabled(false);
         
conf.setAuthorizationProvider(PulsarAuthorizationProvider.class.getName());
         setup();
 
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/MutualAuthenticationTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/MutualAuthenticationTest.java
index 2fc8aebf64a..81d65b19204 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/MutualAuthenticationTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/MutualAuthenticationTest.java
@@ -195,7 +195,7 @@ public class MutualAuthenticationTest extends 
ProducerConsumerBase {
         Set<String> superUserRoles = new HashSet<>();
         superUserRoles.add("admin");
         conf.setSuperUserRoles(superUserRoles);
-
+        conf.setTopicLevelPoliciesEnabled(false);
         conf.setAuthorizationEnabled(true);
         conf.setAuthenticationEnabled(true);
         Set<String> providersClassNames = 
Sets.newHashSet(MutualAuthenticationProvider.class.getName());
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/TokenAuthenticatedProducerConsumerTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/TokenAuthenticatedProducerConsumerTest.java
index 87f12e6acdc..4d5e7deaf7d 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/TokenAuthenticatedProducerConsumerTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/TokenAuthenticatedProducerConsumerTest.java
@@ -37,6 +37,7 @@ import java.util.Set;
 import java.util.concurrent.TimeUnit;
 import org.apache.pulsar.broker.authentication.AuthenticationProviderToken;
 import org.apache.pulsar.client.admin.PulsarAdmin;
+import org.apache.pulsar.client.impl.auth.AuthenticationToken;
 import org.apache.pulsar.common.policies.data.ClusterData;
 import org.apache.pulsar.common.policies.data.TenantInfoImpl;
 import org.slf4j.Logger;
@@ -92,6 +93,8 @@ public class TokenAuthenticatedProducerConsumerTest extends 
ProducerConsumerBase
         Set<String> providers = new HashSet<>();
         providers.add(AuthenticationProviderToken.class.getName());
         conf.setAuthenticationProviders(providers);
+        
conf.setBrokerClientAuthenticationPlugin(AuthenticationToken.class.getName());
+        conf.setBrokerClientAuthenticationParameters("token:" + ADMIN_TOKEN);
 
         conf.setClusterName("test");
 
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/TokenOauth2AuthenticatedProducerConsumerTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/TokenOauth2AuthenticatedProducerConsumerTest.java
index fdf41c4a6ad..ba43ee6d6a2 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/TokenOauth2AuthenticatedProducerConsumerTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/TokenOauth2AuthenticatedProducerConsumerTest.java
@@ -27,7 +27,9 @@ import java.net.URL;
 import java.nio.file.Path;
 import java.nio.file.Paths;
 import java.time.Duration;
+import java.util.HashMap;
 import java.util.HashSet;
+import java.util.Map;
 import java.util.Properties;
 import java.util.Set;
 import java.util.concurrent.TimeUnit;
@@ -39,6 +41,7 @@ import 
org.apache.pulsar.client.impl.auth.oauth2.AuthenticationFactoryOAuth2;
 import org.apache.pulsar.client.impl.auth.oauth2.AuthenticationOAuth2;
 import org.apache.pulsar.common.policies.data.ClusterData;
 import org.apache.pulsar.common.policies.data.TenantInfoImpl;
+import org.apache.pulsar.common.util.ObjectMapperFactory;
 import org.awaitility.Awaitility;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -87,11 +90,12 @@ public class TokenOauth2AuthenticatedProducerConsumerTest 
extends ProducerConsum
         conf.setAuthenticationProviders(providers);
 
         
conf.setBrokerClientAuthenticationPlugin(AuthenticationOAuth2.class.getName());
-        conf.setBrokerClientAuthenticationParameters("{\n"
-                + "  \"privateKey\": \"" + CREDENTIALS_FILE + "\",\n"
-                + "  \"issuerUrl\": \"" + server.getIssuer() + "\",\n"
-                + "  \"audience\": \"" + audience + "\",\n"
-                + "}\n");
+        final Map<String, String> oauth2Param = new HashMap<>();
+        oauth2Param.put("privateKey", CREDENTIALS_FILE);
+        oauth2Param.put("issuerUrl", server.getIssuer());
+        oauth2Param.put("audience", audience);
+        conf.setBrokerClientAuthenticationParameters(ObjectMapperFactory
+                
.getMapper().getObjectMapper().writeValueAsString(oauth2Param));
 
         conf.setClusterName("test");
 
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/KeyStoreTlsProducerConsumerTestWithAuthTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/KeyStoreTlsProducerConsumerTestWithAuthTest.java
index 8e508b6cf20..77405e14201 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/KeyStoreTlsProducerConsumerTestWithAuthTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/KeyStoreTlsProducerConsumerTestWithAuthTest.java
@@ -32,6 +32,7 @@ import java.util.function.Supplier;
 
 import io.jsonwebtoken.SignatureAlgorithm;
 import lombok.Cleanup;
+import lombok.SneakyThrows;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.pulsar.broker.authentication.AuthenticationProviderTls;
 import org.apache.pulsar.broker.authentication.AuthenticationProviderToken;
@@ -49,6 +50,7 @@ import 
org.apache.pulsar.client.impl.auth.AuthenticationKeyStoreTls;
 import org.apache.pulsar.client.impl.auth.AuthenticationToken;
 import org.apache.pulsar.common.policies.data.ClusterData;
 import org.apache.pulsar.common.policies.data.TenantInfoImpl;
+import org.apache.pulsar.common.util.ObjectMapperFactory;
 import org.testng.Assert;
 import org.testng.annotations.AfterMethod;
 import org.testng.annotations.BeforeMethod;
@@ -83,6 +85,7 @@ public class KeyStoreTlsProducerConsumerTestWithAuthTest 
extends ProducerConsume
         super.internalCleanup();
     }
 
+    @SneakyThrows
     protected void internalSetUpForBroker() {
         conf.setBrokerServicePortTls(Optional.of(0));
         conf.setWebServicePortTls(Optional.of(0));
@@ -114,6 +117,25 @@ public class KeyStoreTlsProducerConsumerTestWithAuthTest 
extends ProducerConsume
 
         conf.setAuthenticationProviders(providers);
         conf.setNumExecutorThreadPoolSize(5);
+        Set<String> tlsProtocols = Sets.newConcurrentHashSet();
+        tlsProtocols.add("TLSv1.3");
+        tlsProtocols.add("TLSv1.2");
+        
conf.setBrokerClientAuthenticationPlugin(AuthenticationKeyStoreTls.class.getName());
+        Map<String, String> authParams = new HashMap<>();
+        authParams.put(AuthenticationKeyStoreTls.KEYSTORE_TYPE, KEYSTORE_TYPE);
+        authParams.put(AuthenticationKeyStoreTls.KEYSTORE_PATH, 
CLIENT_KEYSTORE_FILE_PATH);
+        authParams.put(AuthenticationKeyStoreTls.KEYSTORE_PW, 
CLIENT_KEYSTORE_PW);
+        
conf.setBrokerClientAuthenticationParameters(ObjectMapperFactory.getMapper()
+                .getObjectMapper().writeValueAsString(authParams));
+        conf.setBrokerClientTlsEnabled(true);
+        conf.setBrokerClientTlsEnabledWithKeyStore(true);
+        conf.setBrokerClientTlsTrustStore(BROKER_TRUSTSTORE_FILE_PATH);
+        conf.setBrokerClientTlsTrustStorePassword(BROKER_TRUSTSTORE_PW);
+        conf.setBrokerClientTlsKeyStore(CLIENT_KEYSTORE_FILE_PATH);
+        conf.setBrokerClientTlsKeyStoreType(KEYSTORE_TYPE);
+        conf.setBrokerClientTlsKeyStorePassword(CLIENT_KEYSTORE_PW);
+        conf.setBrokerClientTlsProtocols(tlsProtocols);
+
     }
 
     protected void internalSetUpForClient(boolean addCertificates, String 
lookupUrl) throws Exception {
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/PatternTopicsConsumerImplAuthTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/PatternTopicsConsumerImplAuthTest.java
index 76936334eb0..b9139dabdf0 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/PatternTopicsConsumerImplAuthTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/PatternTopicsConsumerImplAuthTest.java
@@ -85,6 +85,7 @@ public class PatternTopicsConsumerImplAuthTest extends 
ProducerConsumerBase {
         // set isTcpLookup = true, to use BinaryProtoLookupService to get 
topics for a pattern.
         isTcpLookup = true;
 
+        conf.setTopicLevelPoliciesEnabled(false);
         conf.setAuthenticationEnabled(true);
         conf.setAuthorizationEnabled(true);
 
diff --git 
a/pulsar-broker/src/test/resources/configurations/standalone_no_client_auth.conf
 
b/pulsar-broker/src/test/resources/configurations/standalone_no_client_auth.conf
index d9411e655ad..4e2fd402983 100644
--- 
a/pulsar-broker/src/test/resources/configurations/standalone_no_client_auth.conf
+++ 
b/pulsar-broker/src/test/resources/configurations/standalone_no_client_auth.conf
@@ -29,4 +29,5 @@ authenticationEnabled=true
 authenticationProviders=org.apache.pulsar.MockTokenAuthenticationProvider
 brokerClientAuthenticationPlugin=
 brokerClientAuthenticationParameters=
-loadBalancerOverrideBrokerNicSpeedGbps=2
\ No newline at end of file
+loadBalancerOverrideBrokerNicSpeedGbps=2
+topicLevelPoliciesEnabled=false
\ No newline at end of file
diff --git 
a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyAuthenticationTest.java
 
b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyAuthenticationTest.java
index 8229d929ee5..9c8e5197adf 100644
--- 
a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyAuthenticationTest.java
+++ 
b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyAuthenticationTest.java
@@ -168,7 +168,7 @@ public class ProxyAuthenticationTest extends 
ProducerConsumerBase {
                
conf.setBrokerClientAuthenticationPlugin(BasicAuthentication.class.getName());
                // Expires after an hour
                conf.setBrokerClientAuthenticationParameters(
-                               "entityType:broker,expiryTime:" + 
(System.currentTimeMillis() + 3600 * 1000));
+                               "entityType:admin,expiryTime:" + 
(System.currentTimeMillis() + 3600 * 1000));
 
                Set<String> superUserRoles = new HashSet<>();
                superUserRoles.add("admin");
diff --git 
a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyForwardAuthDataTest.java
 
b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyForwardAuthDataTest.java
index 99af3b1cf6a..b7cfb874747 100644
--- 
a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyForwardAuthDataTest.java
+++ 
b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyForwardAuthDataTest.java
@@ -53,7 +53,7 @@ public class ProxyForwardAuthDataTest extends 
ProducerConsumerBase {
         conf.setAuthenticationEnabled(true);
         conf.setAuthorizationEnabled(true);
         
conf.setBrokerClientAuthenticationPlugin(BasicAuthentication.class.getName());
-        conf.setBrokerClientAuthenticationParameters("authParam:broker");
+        conf.setBrokerClientAuthenticationParameters("authParam:admin");
         conf.setAuthenticateOriginalAuthData(true);
 
         Set<String> superUserRoles = new HashSet<String>();
diff --git 
a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyRolesEnforcementTest.java
 
b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyRolesEnforcementTest.java
index 2c8c382b6a5..3259cfd95c7 100644
--- 
a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyRolesEnforcementTest.java
+++ 
b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyRolesEnforcementTest.java
@@ -144,7 +144,7 @@ public class ProxyRolesEnforcementTest extends 
ProducerConsumerBase {
         conf.setAuthenticationEnabled(true);
         conf.setAuthorizationEnabled(true);
         
conf.setBrokerClientAuthenticationPlugin(BasicAuthentication.class.getName());
-        conf.setBrokerClientAuthenticationParameters("authParam:broker");
+        conf.setBrokerClientAuthenticationParameters("authParam:admin");
 
         Set<String> superUserRoles = new HashSet<>();
         superUserRoles.add("admin");
diff --git 
a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyWithAuthorizationNegTest.java
 
b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyWithAuthorizationNegTest.java
index e8bb128c8c1..2d97a4b06a8 100644
--- 
a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyWithAuthorizationNegTest.java
+++ 
b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyWithAuthorizationNegTest.java
@@ -78,6 +78,8 @@ public class ProxyWithAuthorizationNegTest extends 
ProducerConsumerBase {
     protected void setup() throws Exception {
 
         // enable tls and auth&auth at broker
+        conf.setTopicLevelPoliciesEnabled(false);
+
         conf.setAuthenticationEnabled(true);
         conf.setAuthorizationEnabled(true);
 
diff --git 
a/pulsar-sql/presto-pulsar/src/test/java/org/apache/pulsar/sql/presto/TestPulsarAuth.java
 
b/pulsar-sql/presto-pulsar/src/test/java/org/apache/pulsar/sql/presto/TestPulsarAuth.java
index 9119ffed4e2..7b550b7270f 100644
--- 
a/pulsar-sql/presto-pulsar/src/test/java/org/apache/pulsar/sql/presto/TestPulsarAuth.java
+++ 
b/pulsar-sql/presto-pulsar/src/test/java/org/apache/pulsar/sql/presto/TestPulsarAuth.java
@@ -38,6 +38,7 @@ import 
org.apache.pulsar.broker.authentication.utils.AuthTokenUtils;
 import org.apache.pulsar.client.admin.PulsarAdminBuilder;
 import org.apache.pulsar.client.admin.PulsarAdminException;
 import org.apache.pulsar.client.api.AuthenticationFactory;
+import org.apache.pulsar.client.impl.auth.AuthenticationToken;
 import org.apache.pulsar.common.policies.data.AuthAction;
 import org.apache.pulsar.common.policies.data.ClusterData;
 import org.apache.pulsar.common.policies.data.TenantInfoImpl;
@@ -63,6 +64,9 @@ public class TestPulsarAuth extends 
MockedPulsarServiceBaseTest {
         conf.setProperties(properties);
         conf.setSuperUserRoles(Sets.newHashSet(SUPER_USER_ROLE));
         conf.setClusterName("c1");
+        
conf.setBrokerClientAuthenticationPlugin(AuthenticationToken.class.getName());
+        conf.setBrokerClientAuthenticationParameters("token:" + AuthTokenUtils
+                .createToken(secretKey, SUPER_USER_ROLE, Optional.empty()));
         internalSetup();
 
         admin.clusters().createCluster("c1", ClusterData.builder().build());
diff --git 
a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/loadbalance/ExtensibleLoadManagerTest.java
 
b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/loadbalance/ExtensibleLoadManagerTest.java
index 9f6c5e58a21..035d88c7be0 100644
--- 
a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/loadbalance/ExtensibleLoadManagerTest.java
+++ 
b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/loadbalance/ExtensibleLoadManagerTest.java
@@ -91,6 +91,7 @@ public class ExtensibleLoadManagerTest extends 
TestRetrySupport {
                 
"org.apache.pulsar.broker.loadbalance.extensions.scheduler.TransferShedder");
         brokerEnvs.put("forceDeleteNamespaceAllowed", "true");
         brokerEnvs.put("loadBalancerDebugModeEnabled", "true");
+        brokerEnvs.put("topicLevelPoliciesEnabled", "false");
         brokerEnvs.put("PULSAR_MEM", "-Xmx512M");
         spec.brokerEnvs(brokerEnvs);
         pulsarCluster = PulsarCluster.forSpec(spec);
diff --git 
a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/presto/TestPulsarSQLAuth.java
 
b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/presto/TestPulsarSQLAuth.java
index 0a9bb5e1959..87db46f2bb6 100644
--- 
a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/presto/TestPulsarSQLAuth.java
+++ 
b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/presto/TestPulsarSQLAuth.java
@@ -68,6 +68,7 @@ public class TestPulsarSQLAuth extends TestPulsarSQLBase {
         envMap.put("tokenSecretKey", 
AuthTokenUtils.encodeKeyBase64(secretKey));
         envMap.put("superUserRoles", "admin");
         envMap.put("brokerDeleteInactiveTopicsEnabled", "false");
+        envMap.put("topicLevelPoliciesEnabled", "false");
 
         for (BrokerContainer brokerContainer : pulsarCluster.getBrokers()) {
             brokerContainer.withEnv(envMap);

Reply via email to