This is an automated email from the ASF dual-hosted git repository.

yubiao pushed a commit to branch branch-3.0
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/branch-3.0 by this push:
     new 01dfd3e03ea Revert "[fix][broker] Fix inconsistent topic policy 
(#21231)"
01dfd3e03ea is described below

commit 01dfd3e03eab64a17d2d925448f3b571ce710d90
Author: fengyubiao <[email protected]>
AuthorDate: Sat Oct 7 23:45:30 2023 +0800

    Revert "[fix][broker] Fix inconsistent topic policy (#21231)"
    
    This reverts commit a0e2104642c5081986dbe7847c71997b06030eaa.
---
 .../ProxySaslAuthenticationTest.java               |  13 +-
 .../authentication/SaslAuthenticateTest.java       |  11 +-
 .../pulsar/broker/service/BrokerService.java       | 295 ++++++++++-----------
 .../SystemTopicBasedTopicPoliciesService.java      | 100 ++-----
 .../broker/service/TopicPoliciesService.java       |  41 ---
 .../pulsar/broker/admin/PersistentTopicsTest.java  |   1 -
 .../pulsar/broker/admin/TopicPoliciesTest.java     |   2 +-
 .../admin/TopicPoliciesWithBrokerRestartTest.java  | 104 --------
 .../apache/pulsar/broker/admin/TopicsAuthTest.java |   2 -
 .../apache/pulsar/broker/auth/AuthLogsTest.java    |   2 -
 .../pulsar/broker/auth/MockAuthentication.java     |   6 +-
 .../broker/auth/MockedPulsarServiceBaseTest.java   |  19 --
 .../broker/service/BrokerBookieIsolationTest.java  |   3 +-
 .../SystemTopicBasedTopicPoliciesServiceTest.java  |   6 +-
 .../service/persistent/PersistentTopicTest.java    |   3 +-
 .../AuthenticationTlsHostnameVerificationTest.java |   3 -
 .../api/AuthorizationProducerConsumerTest.java     |   5 -
 .../client/api/MutualAuthenticationTest.java       |   2 +-
 .../TokenAuthenticatedProducerConsumerTest.java    |   3 -
 ...kenOauth2AuthenticatedProducerConsumerTest.java |  14 +-
 ...eyStoreTlsProducerConsumerTestWithAuthTest.java |  22 --
 .../impl/PatternTopicsConsumerImplAuthTest.java    |   1 -
 .../configurations/standalone_no_client_auth.conf  |   3 +-
 .../proxy/server/ProxyAuthenticationTest.java      |   2 +-
 .../proxy/server/ProxyForwardAuthDataTest.java     |   2 +-
 .../proxy/server/ProxyRolesEnforcementTest.java    |   2 +-
 .../server/ProxyWithAuthorizationNegTest.java      |   2 -
 .../apache/pulsar/sql/presto/TestPulsarAuth.java   |   4 -
 .../loadbalance/ExtensibleLoadManagerTest.java     |   1 -
 .../integration/presto/TestPulsarSQLAuth.java      |   1 -
 30 files changed, 197 insertions(+), 478 deletions(-)

diff --git 
a/pulsar-broker-auth-sasl/src/test/java/org/apache/pulsar/broker/authentication/ProxySaslAuthenticationTest.java
 
b/pulsar-broker-auth-sasl/src/test/java/org/apache/pulsar/broker/authentication/ProxySaslAuthenticationTest.java
index f0e45aa734a..261efe680f8 100644
--- 
a/pulsar-broker-auth-sasl/src/test/java/org/apache/pulsar/broker/authentication/ProxySaslAuthenticationTest.java
+++ 
b/pulsar-broker-auth-sasl/src/test/java/org/apache/pulsar/broker/authentication/ProxySaslAuthenticationTest.java
@@ -49,7 +49,6 @@ import org.apache.pulsar.client.api.PulsarClientException;
 import org.apache.pulsar.client.api.Schema;
 import org.apache.pulsar.client.impl.auth.AuthenticationSasl;
 import org.apache.pulsar.common.configuration.PulsarConfigurationLoader;
-import org.apache.pulsar.common.util.ObjectMapperFactory;
 import org.apache.pulsar.proxy.server.ProxyConfiguration;
 import org.apache.pulsar.proxy.server.ProxyService;
 import org.slf4j.Logger;
@@ -194,17 +193,15 @@ public class ProxySaslAuthenticationTest extends 
ProducerConsumerBase {
                conf.setAuthenticationProviders(providers);
                conf.setClusterName("test");
                conf.setSuperUserRoles(ImmutableSet.of("client/" + 
localHostname + "@" + kdc.getRealm()));
-               // set admin auth, to verify admin web resources
-               Map<String, String> clientSaslConfig = new HashMap<>();
-               clientSaslConfig.put("saslJaasClientSectionName", 
"PulsarClient");
-               clientSaslConfig.put("serverType", "broker");
-               
conf.setBrokerClientAuthenticationPlugin(AuthenticationSasl.class.getName());
-               conf.setBrokerClientAuthenticationParameters(ObjectMapperFactory
-                               
.getMapper().getObjectMapper().writeValueAsString(clientSaslConfig));
 
                super.init();
 
                lookupUrl = new URI(pulsar.getBrokerServiceUrl());
+
+               // set admin auth, to verify admin web resources
+               Map<String, String> clientSaslConfig = new HashMap<>();
+               clientSaslConfig.put("saslJaasClientSectionName", 
"PulsarClient");
+               clientSaslConfig.put("serverType", "broker");
                log.info("set client jaas section name: PulsarClient");
                admin = PulsarAdmin.builder()
                        .serviceHttpUrl(brokerUrl.toString())
diff --git 
a/pulsar-broker-auth-sasl/src/test/java/org/apache/pulsar/broker/authentication/SaslAuthenticateTest.java
 
b/pulsar-broker-auth-sasl/src/test/java/org/apache/pulsar/broker/authentication/SaslAuthenticateTest.java
index 76c6f023d9a..8a0d0392d13 100644
--- 
a/pulsar-broker-auth-sasl/src/test/java/org/apache/pulsar/broker/authentication/SaslAuthenticateTest.java
+++ 
b/pulsar-broker-auth-sasl/src/test/java/org/apache/pulsar/broker/authentication/SaslAuthenticateTest.java
@@ -53,7 +53,6 @@ import org.apache.pulsar.client.api.PulsarClient;
 import org.apache.pulsar.client.impl.auth.AuthenticationSasl;
 import org.apache.pulsar.common.api.AuthData;
 import org.apache.pulsar.common.sasl.SaslConstants;
-import org.apache.pulsar.common.util.ObjectMapperFactory;
 import org.testng.Assert;
 import org.testng.annotations.AfterClass;
 import org.testng.annotations.AfterMethod;
@@ -181,12 +180,7 @@ public class SaslAuthenticateTest extends 
ProducerConsumerBase {
         conf.setAuthenticationProviders(providers);
         conf.setClusterName("test");
         conf.setSuperUserRoles(ImmutableSet.of("client" + "@" + 
kdc.getRealm()));
-        Map<String, String> clientSaslConfig = new HashMap<>();
-        clientSaslConfig.put("saslJaasClientSectionName", "PulsarClient");
-        clientSaslConfig.put("serverType", "broker");
-        
conf.setBrokerClientAuthenticationPlugin(AuthenticationSasl.class.getName());
-        conf.setBrokerClientAuthenticationParameters(ObjectMapperFactory
-                
.getMapper().getObjectMapper().writeValueAsString(clientSaslConfig));
+
         super.init();
 
         lookupUrl = new URI(pulsar.getWebServiceAddress());
@@ -197,6 +191,9 @@ public class SaslAuthenticateTest extends 
ProducerConsumerBase {
             .authentication(authSasl));
 
         // set admin auth, to verify admin web resources
+        Map<String, String> clientSaslConfig = new HashMap<>();
+        clientSaslConfig.put("saslJaasClientSectionName", "PulsarClient");
+        clientSaslConfig.put("serverType", "broker");
         log.info("set client jaas section name: PulsarClient");
         admin = PulsarAdmin.builder()
             .serviceHttpUrl(brokerUrl.toString())
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
index 7c7dec864e2..d6b17f4faa4 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
@@ -1769,172 +1769,165 @@ public class BrokerService implements Closeable {
         });
     }
 
-    public CompletableFuture<ManagedLedgerConfig> 
getManagedLedgerConfig(@Nonnull TopicName topicName) {
-        requireNonNull(topicName);
+    public CompletableFuture<ManagedLedgerConfig> 
getManagedLedgerConfig(TopicName topicName) {
         NamespaceName namespace = topicName.getNamespaceObject();
         ServiceConfiguration serviceConfig = pulsar.getConfiguration();
 
         NamespaceResources nsr = 
pulsar.getPulsarResources().getNamespaceResources();
         LocalPoliciesResources lpr = 
pulsar.getPulsarResources().getLocalPolicies();
-        final CompletableFuture<Optional<TopicPolicies>> topicPoliciesFuture;
-        if (pulsar.getConfig().isTopicLevelPoliciesEnabled()
-            && !NamespaceService.isSystemServiceNamespace(namespace.toString())
-            && 
!SystemTopicNames.isTopicPoliciesSystemTopic(topicName.toString())) {
-            topicPoliciesFuture = 
pulsar.getTopicPoliciesService().getTopicPoliciesAsync(topicName);
-        } else {
-            topicPoliciesFuture = 
CompletableFuture.completedFuture(Optional.empty());
-        }
-        return topicPoliciesFuture.thenCompose(topicPoliciesOptional -> {
-            final CompletableFuture<Optional<Policies>> nsPolicies = 
nsr.getPoliciesAsync(namespace);
-            final CompletableFuture<Optional<LocalPolicies>> lcPolicies = 
lpr.getLocalPoliciesAsync(namespace);
-            return nsPolicies.thenCombine(lcPolicies, (policies, 
localPolicies) -> {
-                PersistencePolicies persistencePolicies = null;
-                RetentionPolicies retentionPolicies = null;
-                OffloadPoliciesImpl topicLevelOffloadPolicies = null;
-                if (topicPoliciesOptional.isPresent()) {
-                    final TopicPolicies topicPolicies = 
topicPoliciesOptional.get();
-                    persistencePolicies = topicPolicies.getPersistence();
-                    retentionPolicies = topicPolicies.getRetentionPolicies();
-                    topicLevelOffloadPolicies = 
topicPolicies.getOffloadPolicies();
-                }
-
-                if (persistencePolicies == null) {
-                    persistencePolicies = policies.map(p -> 
p.persistence).orElseGet(
-                            () -> new 
PersistencePolicies(serviceConfig.getManagedLedgerDefaultEnsembleSize(),
-                                    
serviceConfig.getManagedLedgerDefaultWriteQuorum(),
-                                    
serviceConfig.getManagedLedgerDefaultAckQuorum(),
-                                    
serviceConfig.getManagedLedgerDefaultMarkDeleteRateLimit()));
-                }
+        return nsr.getPoliciesAsync(namespace)
+                .thenCombine(lpr.getLocalPoliciesAsync(namespace), (policies, 
localPolicies) -> {
+                    PersistencePolicies persistencePolicies = null;
+                    RetentionPolicies retentionPolicies = null;
+                    OffloadPoliciesImpl topicLevelOffloadPolicies = null;
+
+                    if (pulsar.getConfig().isTopicLevelPoliciesEnabled()
+                            && 
!NamespaceService.isSystemServiceNamespace(namespace.toString())) {
+                        final TopicPolicies topicPolicies = 
pulsar.getTopicPoliciesService()
+                                .getTopicPoliciesIfExists(topicName);
+                        if (topicPolicies != null) {
+                            persistencePolicies = 
topicPolicies.getPersistence();
+                            retentionPolicies = 
topicPolicies.getRetentionPolicies();
+                            topicLevelOffloadPolicies = 
topicPolicies.getOffloadPolicies();
+                        }
+                    }
 
-                if (retentionPolicies == null) {
-                    retentionPolicies = policies.map(p -> 
p.retention_policies).orElseGet(
-                            () -> new 
RetentionPolicies(serviceConfig.getDefaultRetentionTimeInMinutes(),
-                                    
serviceConfig.getDefaultRetentionSizeInMB())
-                    );
-                }
+                    if (persistencePolicies == null) {
+                        persistencePolicies = policies.map(p -> 
p.persistence).orElseGet(
+                                () -> new 
PersistencePolicies(serviceConfig.getManagedLedgerDefaultEnsembleSize(),
+                                        
serviceConfig.getManagedLedgerDefaultWriteQuorum(),
+                                        
serviceConfig.getManagedLedgerDefaultAckQuorum(),
+                                        
serviceConfig.getManagedLedgerDefaultMarkDeleteRateLimit()));
+                    }
 
-                ManagedLedgerConfig managedLedgerConfig = new 
ManagedLedgerConfig();
-                
managedLedgerConfig.setEnsembleSize(persistencePolicies.getBookkeeperEnsemble());
-                
managedLedgerConfig.setWriteQuorumSize(persistencePolicies.getBookkeeperWriteQuorum());
-                
managedLedgerConfig.setAckQuorumSize(persistencePolicies.getBookkeeperAckQuorum());
-
-                if (serviceConfig.isStrictBookieAffinityEnabled()) {
-                    
managedLedgerConfig.setBookKeeperEnsemblePlacementPolicyClassName(
-                            IsolatedBookieEnsemblePlacementPolicy.class);
-                    if (localPolicies.isPresent() && 
localPolicies.get().bookieAffinityGroup != null) {
-                        Map<String, Object> properties = new HashMap<>();
-                        
properties.put(IsolatedBookieEnsemblePlacementPolicy.ISOLATION_BOOKIE_GROUPS,
-                                
localPolicies.get().bookieAffinityGroup.getBookkeeperAffinityGroupPrimary());
-                        
properties.put(IsolatedBookieEnsemblePlacementPolicy.SECONDARY_ISOLATION_BOOKIE_GROUPS,
-                                
localPolicies.get().bookieAffinityGroup.getBookkeeperAffinityGroupSecondary());
-                        
managedLedgerConfig.setBookKeeperEnsemblePlacementPolicyProperties(properties);
-                    } else if (isSystemTopic(topicName)) {
-                        Map<String, Object> properties = new HashMap<>();
-                        
properties.put(IsolatedBookieEnsemblePlacementPolicy.ISOLATION_BOOKIE_GROUPS, 
"*");
-                        properties.put(IsolatedBookieEnsemblePlacementPolicy
-                                .SECONDARY_ISOLATION_BOOKIE_GROUPS, "*");
-                        
managedLedgerConfig.setBookKeeperEnsemblePlacementPolicyProperties(properties);
-                    } else {
-                        Map<String, Object> properties = new HashMap<>();
-                        
properties.put(IsolatedBookieEnsemblePlacementPolicy.ISOLATION_BOOKIE_GROUPS, 
"");
-                        
properties.put(IsolatedBookieEnsemblePlacementPolicy.SECONDARY_ISOLATION_BOOKIE_GROUPS,
 "");
-                        
managedLedgerConfig.setBookKeeperEnsemblePlacementPolicyProperties(properties);
+                    if (retentionPolicies == null) {
+                        retentionPolicies = policies.map(p -> 
p.retention_policies).orElseGet(
+                                () -> new 
RetentionPolicies(serviceConfig.getDefaultRetentionTimeInMinutes(),
+                                        
serviceConfig.getDefaultRetentionSizeInMB())
+                        );
                     }
-                } else {
-                    if (localPolicies.isPresent() && 
localPolicies.get().bookieAffinityGroup != null) {
+
+                    ManagedLedgerConfig managedLedgerConfig = new 
ManagedLedgerConfig();
+                    
managedLedgerConfig.setEnsembleSize(persistencePolicies.getBookkeeperEnsemble());
+                    
managedLedgerConfig.setWriteQuorumSize(persistencePolicies.getBookkeeperWriteQuorum());
+                    
managedLedgerConfig.setAckQuorumSize(persistencePolicies.getBookkeeperAckQuorum());
+
+                    if (serviceConfig.isStrictBookieAffinityEnabled()) {
                         
managedLedgerConfig.setBookKeeperEnsemblePlacementPolicyClassName(
                                 IsolatedBookieEnsemblePlacementPolicy.class);
-                        Map<String, Object> properties = new HashMap<>();
-                        
properties.put(IsolatedBookieEnsemblePlacementPolicy.ISOLATION_BOOKIE_GROUPS,
-                                
localPolicies.get().bookieAffinityGroup.getBookkeeperAffinityGroupPrimary());
-                        
properties.put(IsolatedBookieEnsemblePlacementPolicy.SECONDARY_ISOLATION_BOOKIE_GROUPS,
-                                
localPolicies.get().bookieAffinityGroup.getBookkeeperAffinityGroupSecondary());
-                        
managedLedgerConfig.setBookKeeperEnsemblePlacementPolicyProperties(properties);
+                        if (localPolicies.isPresent() && 
localPolicies.get().bookieAffinityGroup != null) {
+                            Map<String, Object> properties = new HashMap<>();
+                            
properties.put(IsolatedBookieEnsemblePlacementPolicy.ISOLATION_BOOKIE_GROUPS,
+                                    
localPolicies.get().bookieAffinityGroup.getBookkeeperAffinityGroupPrimary());
+                            
properties.put(IsolatedBookieEnsemblePlacementPolicy.SECONDARY_ISOLATION_BOOKIE_GROUPS,
+                                    
localPolicies.get().bookieAffinityGroup.getBookkeeperAffinityGroupSecondary());
+                            
managedLedgerConfig.setBookKeeperEnsemblePlacementPolicyProperties(properties);
+                        } else if (isSystemTopic(topicName)) {
+                            Map<String, Object> properties = new HashMap<>();
+                            
properties.put(IsolatedBookieEnsemblePlacementPolicy.ISOLATION_BOOKIE_GROUPS, 
"*");
+                            
properties.put(IsolatedBookieEnsemblePlacementPolicy
+                                    .SECONDARY_ISOLATION_BOOKIE_GROUPS, "*");
+                            
managedLedgerConfig.setBookKeeperEnsemblePlacementPolicyProperties(properties);
+                        } else {
+                            Map<String, Object> properties = new HashMap<>();
+                            
properties.put(IsolatedBookieEnsemblePlacementPolicy.ISOLATION_BOOKIE_GROUPS, 
"");
+                            
properties.put(IsolatedBookieEnsemblePlacementPolicy.SECONDARY_ISOLATION_BOOKIE_GROUPS,
 "");
+                            
managedLedgerConfig.setBookKeeperEnsemblePlacementPolicyProperties(properties);
+                        }
+                    } else {
+                        if (localPolicies.isPresent() && 
localPolicies.get().bookieAffinityGroup != null) {
+                            
managedLedgerConfig.setBookKeeperEnsemblePlacementPolicyClassName(
+                                            
IsolatedBookieEnsemblePlacementPolicy.class);
+                            Map<String, Object> properties = new HashMap<>();
+                            
properties.put(IsolatedBookieEnsemblePlacementPolicy.ISOLATION_BOOKIE_GROUPS,
+                                    
localPolicies.get().bookieAffinityGroup.getBookkeeperAffinityGroupPrimary());
+                            
properties.put(IsolatedBookieEnsemblePlacementPolicy.SECONDARY_ISOLATION_BOOKIE_GROUPS,
+                                    
localPolicies.get().bookieAffinityGroup.getBookkeeperAffinityGroupSecondary());
+                            
managedLedgerConfig.setBookKeeperEnsemblePlacementPolicyProperties(properties);
+                        }
                     }
-                }
 
-                
managedLedgerConfig.setThrottleMarkDelete(persistencePolicies.getManagedLedgerMaxMarkDeleteRate());
-                
managedLedgerConfig.setDigestType(serviceConfig.getManagedLedgerDigestType());
-                
managedLedgerConfig.setPassword(serviceConfig.getManagedLedgerPassword());
-
-                managedLedgerConfig
-                        
.setMaxUnackedRangesToPersist(serviceConfig.getManagedLedgerMaxUnackedRangesToPersist());
-                
managedLedgerConfig.setPersistentUnackedRangesWithMultipleEntriesEnabled(
-                        
serviceConfig.isPersistentUnackedRangesWithMultipleEntriesEnabled());
-                
managedLedgerConfig.setMaxUnackedRangesToPersistInMetadataStore(
-                        
serviceConfig.getManagedLedgerMaxUnackedRangesToPersistInMetadataStore());
-                
managedLedgerConfig.setMaxEntriesPerLedger(serviceConfig.getManagedLedgerMaxEntriesPerLedger());
-                managedLedgerConfig
-                        
.setMinimumRolloverTime(serviceConfig.getManagedLedgerMinLedgerRolloverTimeMinutes(),
-                                TimeUnit.MINUTES);
-                managedLedgerConfig
-                        
.setMaximumRolloverTime(serviceConfig.getManagedLedgerMaxLedgerRolloverTimeMinutes(),
-                                TimeUnit.MINUTES);
-                
managedLedgerConfig.setMaxSizePerLedgerMb(serviceConfig.getManagedLedgerMaxSizePerLedgerMbytes());
-
-                managedLedgerConfig.setMetadataOperationsTimeoutSeconds(
-                        
serviceConfig.getManagedLedgerMetadataOperationsTimeoutSeconds());
-                managedLedgerConfig
-                        
.setReadEntryTimeoutSeconds(serviceConfig.getManagedLedgerReadEntryTimeoutSeconds());
-                managedLedgerConfig
-                        
.setAddEntryTimeoutSeconds(serviceConfig.getManagedLedgerAddEntryTimeoutSeconds());
-                
managedLedgerConfig.setMetadataEnsembleSize(serviceConfig.getManagedLedgerDefaultEnsembleSize());
-                managedLedgerConfig.setUnackedRangesOpenCacheSetEnabled(
-                        
serviceConfig.isManagedLedgerUnackedRangesOpenCacheSetEnabled());
-                
managedLedgerConfig.setMetadataWriteQuorumSize(serviceConfig.getManagedLedgerDefaultWriteQuorum());
-                
managedLedgerConfig.setMetadataAckQuorumSize(serviceConfig.getManagedLedgerDefaultAckQuorum());
-                managedLedgerConfig
-                        
.setMetadataMaxEntriesPerLedger(serviceConfig.getManagedLedgerCursorMaxEntriesPerLedger());
-
-                managedLedgerConfig
-                        
.setLedgerRolloverTimeout(serviceConfig.getManagedLedgerCursorRolloverTimeInSeconds());
-                managedLedgerConfig
-                        
.setRetentionTime(retentionPolicies.getRetentionTimeInMinutes(), 
TimeUnit.MINUTES);
-                
managedLedgerConfig.setRetentionSizeInMB(retentionPolicies.getRetentionSizeInMB());
-                
managedLedgerConfig.setAutoSkipNonRecoverableData(serviceConfig.isAutoSkipNonRecoverableData());
-                
managedLedgerConfig.setLazyCursorRecovery(serviceConfig.isLazyCursorRecovery());
-                managedLedgerConfig.setInactiveLedgerRollOverTime(
-                        
serviceConfig.getManagedLedgerInactiveLedgerRolloverTimeSeconds(), 
TimeUnit.SECONDS);
-                managedLedgerConfig.setCacheEvictionByMarkDeletedPosition(
-                        serviceConfig.isCacheEvictionByMarkDeletedPosition());
-                managedLedgerConfig.setMinimumBacklogCursorsForCaching(
-                        
serviceConfig.getManagedLedgerMinimumBacklogCursorsForCaching());
-                managedLedgerConfig.setMinimumBacklogEntriesForCaching(
-                        
serviceConfig.getManagedLedgerMinimumBacklogEntriesForCaching());
-                managedLedgerConfig.setMaxBacklogBetweenCursorsForCaching(
-                        
serviceConfig.getManagedLedgerMaxBacklogBetweenCursorsForCaching());
-
-                OffloadPoliciesImpl nsLevelOffloadPolicies =
-                        (OffloadPoliciesImpl) policies.map(p -> 
p.offload_policies).orElse(null);
-                OffloadPoliciesImpl offloadPolicies = 
OffloadPoliciesImpl.mergeConfiguration(
-                        topicLevelOffloadPolicies,
-                        
OffloadPoliciesImpl.oldPoliciesCompatible(nsLevelOffloadPolicies, 
policies.orElse(null)),
-                        getPulsar().getConfig().getProperties());
-                if 
(NamespaceService.isSystemServiceNamespace(namespace.toString())) {
-                    
managedLedgerConfig.setLedgerOffloader(NullLedgerOffloader.INSTANCE);
-                } else  {
-                    if (topicLevelOffloadPolicies != null) {
-                        try {
-                            LedgerOffloader topicLevelLedgerOffLoader =
-                                    
pulsar().createManagedLedgerOffloader(offloadPolicies);
-                            
managedLedgerConfig.setLedgerOffloader(topicLevelLedgerOffLoader);
-                        } catch (PulsarServerException e) {
-                            throw new RuntimeException(e);
+                    
managedLedgerConfig.setThrottleMarkDelete(persistencePolicies.getManagedLedgerMaxMarkDeleteRate());
+                    
managedLedgerConfig.setDigestType(serviceConfig.getManagedLedgerDigestType());
+                    
managedLedgerConfig.setPassword(serviceConfig.getManagedLedgerPassword());
+
+                    managedLedgerConfig
+                            
.setMaxUnackedRangesToPersist(serviceConfig.getManagedLedgerMaxUnackedRangesToPersist());
+                    
managedLedgerConfig.setPersistentUnackedRangesWithMultipleEntriesEnabled(
+                            
serviceConfig.isPersistentUnackedRangesWithMultipleEntriesEnabled());
+                    
managedLedgerConfig.setMaxUnackedRangesToPersistInMetadataStore(
+                            
serviceConfig.getManagedLedgerMaxUnackedRangesToPersistInMetadataStore());
+                    
managedLedgerConfig.setMaxEntriesPerLedger(serviceConfig.getManagedLedgerMaxEntriesPerLedger());
+                    managedLedgerConfig
+                            
.setMinimumRolloverTime(serviceConfig.getManagedLedgerMinLedgerRolloverTimeMinutes(),
+                                    TimeUnit.MINUTES);
+                    managedLedgerConfig
+                            
.setMaximumRolloverTime(serviceConfig.getManagedLedgerMaxLedgerRolloverTimeMinutes(),
+                                    TimeUnit.MINUTES);
+                    
managedLedgerConfig.setMaxSizePerLedgerMb(serviceConfig.getManagedLedgerMaxSizePerLedgerMbytes());
+
+                    managedLedgerConfig.setMetadataOperationsTimeoutSeconds(
+                            
serviceConfig.getManagedLedgerMetadataOperationsTimeoutSeconds());
+                    managedLedgerConfig
+                            
.setReadEntryTimeoutSeconds(serviceConfig.getManagedLedgerReadEntryTimeoutSeconds());
+                    managedLedgerConfig
+                            
.setAddEntryTimeoutSeconds(serviceConfig.getManagedLedgerAddEntryTimeoutSeconds());
+                    
managedLedgerConfig.setMetadataEnsembleSize(serviceConfig.getManagedLedgerDefaultEnsembleSize());
+                    managedLedgerConfig.setUnackedRangesOpenCacheSetEnabled(
+                            
serviceConfig.isManagedLedgerUnackedRangesOpenCacheSetEnabled());
+                    
managedLedgerConfig.setMetadataWriteQuorumSize(serviceConfig.getManagedLedgerDefaultWriteQuorum());
+                    
managedLedgerConfig.setMetadataAckQuorumSize(serviceConfig.getManagedLedgerDefaultAckQuorum());
+                    managedLedgerConfig
+                            
.setMetadataMaxEntriesPerLedger(serviceConfig.getManagedLedgerCursorMaxEntriesPerLedger());
+
+                    managedLedgerConfig
+                            
.setLedgerRolloverTimeout(serviceConfig.getManagedLedgerCursorRolloverTimeInSeconds());
+                    managedLedgerConfig
+                            
.setRetentionTime(retentionPolicies.getRetentionTimeInMinutes(), 
TimeUnit.MINUTES);
+                    
managedLedgerConfig.setRetentionSizeInMB(retentionPolicies.getRetentionSizeInMB());
+                    
managedLedgerConfig.setAutoSkipNonRecoverableData(serviceConfig.isAutoSkipNonRecoverableData());
+                    
managedLedgerConfig.setLazyCursorRecovery(serviceConfig.isLazyCursorRecovery());
+                    managedLedgerConfig.setInactiveLedgerRollOverTime(
+                            
serviceConfig.getManagedLedgerInactiveLedgerRolloverTimeSeconds(), 
TimeUnit.SECONDS);
+                    managedLedgerConfig.setCacheEvictionByMarkDeletedPosition(
+                            
serviceConfig.isCacheEvictionByMarkDeletedPosition());
+                    managedLedgerConfig.setMinimumBacklogCursorsForCaching(
+                            
serviceConfig.getManagedLedgerMinimumBacklogCursorsForCaching());
+                    managedLedgerConfig.setMinimumBacklogEntriesForCaching(
+                            
serviceConfig.getManagedLedgerMinimumBacklogEntriesForCaching());
+                    managedLedgerConfig.setMaxBacklogBetweenCursorsForCaching(
+                            
serviceConfig.getManagedLedgerMaxBacklogBetweenCursorsForCaching());
+
+                    OffloadPoliciesImpl nsLevelOffloadPolicies =
+                            (OffloadPoliciesImpl) policies.map(p -> 
p.offload_policies).orElse(null);
+                    OffloadPoliciesImpl offloadPolicies = 
OffloadPoliciesImpl.mergeConfiguration(
+                            topicLevelOffloadPolicies,
+                            
OffloadPoliciesImpl.oldPoliciesCompatible(nsLevelOffloadPolicies, 
policies.orElse(null)),
+                            getPulsar().getConfig().getProperties());
+                    if 
(NamespaceService.isSystemServiceNamespace(namespace.toString())) {
+                        
managedLedgerConfig.setLedgerOffloader(NullLedgerOffloader.INSTANCE);
+                    } else  {
+                        if (topicLevelOffloadPolicies != null) {
+                            try {
+                                LedgerOffloader topicLevelLedgerOffLoader =
+                                        
pulsar().createManagedLedgerOffloader(offloadPolicies);
+                                
managedLedgerConfig.setLedgerOffloader(topicLevelLedgerOffLoader);
+                            } catch (PulsarServerException e) {
+                                throw new RuntimeException(e);
+                            }
+                        } else {
+                            //If the topic level policy is null, use the 
namespace level
+                            managedLedgerConfig
+                                    
.setLedgerOffloader(pulsar.getManagedLedgerOffloader(namespace, 
offloadPolicies));
                         }
-                    } else {
-                        //If the topic level policy is null, use the namespace 
level
-                        managedLedgerConfig
-                                
.setLedgerOffloader(pulsar.getManagedLedgerOffloader(namespace, 
offloadPolicies));
                     }
-                }
 
-                managedLedgerConfig.setDeletionAtBatchIndexLevelEnabled(
-                        
serviceConfig.isAcknowledgmentAtBatchIndexLevelEnabled());
-                managedLedgerConfig.setNewEntriesCheckDelayInMillis(
-                        
serviceConfig.getManagedLedgerNewEntriesCheckDelayInMillis());
-                return managedLedgerConfig;
-            });
-        });
+                    managedLedgerConfig.setDeletionAtBatchIndexLevelEnabled(
+                            
serviceConfig.isAcknowledgmentAtBatchIndexLevelEnabled());
+                    managedLedgerConfig.setNewEntriesCheckDelayInMillis(
+                            
serviceConfig.getManagedLedgerNewEntriesCheckDelayInMillis());
+                    return managedLedgerConfig;
+                });
     }
 
     private void addTopicToStatsMaps(TopicName topicName, Topic topic) {
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesService.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesService.java
index ed76d37ae25..09f8de818db 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesService.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesService.java
@@ -18,14 +18,12 @@
  */
 package org.apache.pulsar.broker.service;
 
-import static java.util.Objects.requireNonNull;
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.collect.Sets;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Objects;
-import java.util.Optional;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.CopyOnWriteArrayList;
@@ -56,7 +54,6 @@ import org.apache.pulsar.common.naming.NamespaceName;
 import org.apache.pulsar.common.naming.TopicName;
 import org.apache.pulsar.common.policies.data.TopicPolicies;
 import org.apache.pulsar.common.util.FutureUtil;
-import org.jetbrains.annotations.NotNull;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -81,8 +78,8 @@ public class SystemTopicBasedTopicPoliciesService implements 
TopicPoliciesServic
 
     private final Map<NamespaceName, 
CompletableFuture<SystemTopicClient.Reader<PulsarEvent>>>
             readerCaches = new ConcurrentHashMap<>();
-
-    final Map<NamespaceName, CompletableFuture<Void>> policyCacheInitMap = new 
ConcurrentHashMap<>();
+    @VisibleForTesting
+    final Map<NamespaceName, Boolean> policyCacheInitMap = new 
ConcurrentHashMap<>();
 
     @VisibleForTesting
     final Map<TopicName, List<TopicPolicyListener<TopicPolicies>>> listeners = 
new ConcurrentHashMap<>();
@@ -222,12 +219,12 @@ public class SystemTopicBasedTopicPoliciesService 
implements TopicPoliciesServic
                                           boolean isGlobal) throws 
TopicPoliciesCacheNotInitException {
         if (!policyCacheInitMap.containsKey(topicName.getNamespaceObject())) {
             NamespaceName namespace = topicName.getNamespaceObject();
-            prepareInitPoliciesCacheAsync(namespace);
+            prepareInitPoliciesCache(namespace, new CompletableFuture<>());
         }
 
         MutablePair<TopicPoliciesCacheNotInitException, TopicPolicies> result 
= new MutablePair<>();
         policyCacheInitMap.compute(topicName.getNamespaceObject(), (k, 
initialized) -> {
-            if (initialized == null || !initialized.isDone()) {
+            if (initialized == null || !initialized) {
                 result.setLeft(new TopicPoliciesCacheNotInitException());
             } else {
                 TopicPolicies topicPolicies =
@@ -245,34 +242,6 @@ public class SystemTopicBasedTopicPoliciesService 
implements TopicPoliciesServic
         }
     }
 
-    @NotNull
-    @Override
-    public CompletableFuture<Optional<TopicPolicies>> 
getTopicPoliciesAsync(@NotNull TopicName topicName,
-                                                                            
boolean isGlobal) {
-        requireNonNull(topicName);
-        final CompletableFuture<Void> preparedFuture = 
prepareInitPoliciesCacheAsync(topicName.getNamespaceObject());
-        return preparedFuture.thenApply(__ -> {
-            final TopicPolicies candidatePolicies = isGlobal
-                    ? 
globalPoliciesCache.get(TopicName.get(topicName.getPartitionedTopicName()))
-                    : 
policiesCache.get(TopicName.get(topicName.getPartitionedTopicName()));
-            return Optional.ofNullable(candidatePolicies);
-        });
-    }
-
-    @NotNull
-    @Override
-    public CompletableFuture<Optional<TopicPolicies>> 
getTopicPoliciesAsync(@NotNull TopicName topicName) {
-        requireNonNull(topicName);
-        final CompletableFuture<Void> preparedFuture = 
prepareInitPoliciesCacheAsync(topicName.getNamespaceObject());
-        return preparedFuture.thenApply(__ -> {
-            final TopicPolicies localPolicies = 
policiesCache.get(TopicName.get(topicName.getPartitionedTopicName()));
-            if (localPolicies != null) {
-                return Optional.of(localPolicies);
-            }
-            return 
Optional.ofNullable(globalPoliciesCache.get(TopicName.get(topicName.getPartitionedTopicName())));
-        });
-    }
-
     @Override
     public TopicPolicies getTopicPoliciesIfExists(TopicName topicName) {
         return 
policiesCache.get(TopicName.get(topicName.getPartitionedTopicName()));
@@ -296,48 +265,39 @@ public class SystemTopicBasedTopicPoliciesService 
implements TopicPoliciesServic
 
     @Override
     public CompletableFuture<Void> 
addOwnedNamespaceBundleAsync(NamespaceBundle namespaceBundle) {
+        CompletableFuture<Void> result = new CompletableFuture<>();
         NamespaceName namespace = namespaceBundle.getNamespaceObject();
         if (NamespaceService.isHeartbeatNamespace(namespace)) {
-            return CompletableFuture.completedFuture(null);
+            result.complete(null);
+            return result;
         }
         synchronized (this) {
             if (readerCaches.get(namespace) != null) {
                 ownedBundlesCountPerNamespace.get(namespace).incrementAndGet();
-                return CompletableFuture.completedFuture(null);
+                result.complete(null);
             } else {
-                return prepareInitPoliciesCacheAsync(namespace);
+                prepareInitPoliciesCache(namespace, result);
             }
         }
+        return result;
     }
 
-    private @Nonnull CompletableFuture<Void> 
prepareInitPoliciesCacheAsync(@Nonnull NamespaceName namespace) {
-        requireNonNull(namespace);
-        return policyCacheInitMap.computeIfAbsent(namespace, (k) -> {
-            final CompletableFuture<SystemTopicClient.Reader<PulsarEvent>> 
readerCompletableFuture =
+    private void prepareInitPoliciesCache(@Nonnull NamespaceName namespace, 
CompletableFuture<Void> result) {
+        if (policyCacheInitMap.putIfAbsent(namespace, false) == null) {
+            CompletableFuture<SystemTopicClient.Reader<PulsarEvent>> 
readerCompletableFuture =
                     createSystemTopicClientWithRetry(namespace);
             readerCaches.put(namespace, readerCompletableFuture);
             ownedBundlesCountPerNamespace.putIfAbsent(namespace, new 
AtomicInteger(1));
-            final CompletableFuture<Void> initFuture = readerCompletableFuture
-                    .thenCompose(reader -> {
-                        final CompletableFuture<Void> stageFuture = new 
CompletableFuture<>();
-                        initPolicesCache(reader, stageFuture);
-                        return stageFuture
-                                // Read policies in background
-                                .thenAccept(__ -> 
readMorePoliciesAsync(reader));
-                    });
-            initFuture.exceptionally(ex -> {
-                try {
-                    log.error("[{}] Failed to create reader on __change_events 
topic", namespace, ex);
-                    cleanCacheAndCloseReader(namespace, false);
-                } catch (Throwable cleanupEx) {
-                    // Adding this catch to avoid break callback chain
-                    log.error("[{}] Failed to cleanup reader on 
__change_events topic", namespace, cleanupEx);
-                }
+            readerCompletableFuture.thenAccept(reader -> {
+                initPolicesCache(reader, result);
+                result.thenRun(() -> readMorePolicies(reader));
+            }).exceptionally(ex -> {
+                log.error("[{}] Failed to create reader on __change_events 
topic", namespace, ex);
+                cleanCacheAndCloseReader(namespace, false);
+                result.completeExceptionally(ex);
                 return null;
             });
-            // let caller know we've got an exception.
-            return initFuture;
-        });
+        }
     }
 
     protected CompletableFuture<SystemTopicClient.Reader<PulsarEvent>> 
createSystemTopicClientWithRetry(
@@ -421,7 +381,8 @@ public class SystemTopicBasedTopicPoliciesService 
implements TopicPoliciesServic
                 if (log.isDebugEnabled()) {
                     log.debug("[{}] Reach the end of the system topic.", 
reader.getSystemTopic().getTopicName());
                 }
-
+                policyCacheInitMap.computeIfPresent(
+                        
reader.getSystemTopic().getTopicName().getNamespaceObject(), (k, v) -> true);
                 // replay policy message
                 policiesCache.forEach(((topicName, topicPolicies) -> {
                     if (listeners.get(topicName) != null) {
@@ -434,7 +395,6 @@ public class SystemTopicBasedTopicPoliciesService 
implements TopicPoliciesServic
                         }
                     }
                 }));
-
                 future.complete(null);
             }
         });
@@ -460,13 +420,7 @@ public class SystemTopicBasedTopicPoliciesService 
implements TopicPoliciesServic
         });
     }
 
-    /**
-     * This is an async method for the background reader to continue syncing 
new messages.
-     *
-     * Note: You should not do any blocking call here. because it will affect
-     * #{@link 
SystemTopicBasedTopicPoliciesService#getTopicPoliciesAsync(TopicName)} method 
to block loading topic.
-     */
-    private void readMorePoliciesAsync(SystemTopicClient.Reader<PulsarEvent> 
reader) {
+    private void readMorePolicies(SystemTopicClient.Reader<PulsarEvent> 
reader) {
         reader.readNextAsync()
                 .thenAccept(msg -> {
                     refreshTopicPoliciesCache(msg);
@@ -474,7 +428,7 @@ public class SystemTopicBasedTopicPoliciesService 
implements TopicPoliciesServic
                 })
                 .whenComplete((__, ex) -> {
                     if (ex == null) {
-                        readMorePoliciesAsync(reader);
+                        readMorePolicies(reader);
                     } else {
                         Throwable cause = 
FutureUtil.unwrapCompletionException(ex);
                         if (cause instanceof 
PulsarClientException.AlreadyClosedException) {
@@ -483,7 +437,7 @@ public class SystemTopicBasedTopicPoliciesService 
implements TopicPoliciesServic
                                     
reader.getSystemTopic().getTopicName().getNamespaceObject(), false);
                         } else {
                             log.warn("Read more topic polices exception, read 
again.", ex);
-                            readMorePoliciesAsync(reader);
+                            readMorePolicies(reader);
                         }
                     }
                 });
@@ -651,7 +605,7 @@ public class SystemTopicBasedTopicPoliciesService 
implements TopicPoliciesServic
     }
 
     @VisibleForTesting
-    public CompletableFuture<Void> getPoliciesCacheInit(NamespaceName 
namespaceName) {
+    public Boolean getPoliciesCacheInit(NamespaceName namespaceName) {
         return policyCacheInitMap.get(namespaceName);
     }
 
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/TopicPoliciesService.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/TopicPoliciesService.java
index aa3a6aaeff2..c4bcc0c3935 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/TopicPoliciesService.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/TopicPoliciesService.java
@@ -22,7 +22,6 @@ import java.util.Optional;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.TimeUnit;
-import javax.annotation.Nonnull;
 import 
org.apache.pulsar.broker.service.BrokerServiceException.TopicPoliciesCacheNotInitException;
 import org.apache.pulsar.client.impl.Backoff;
 import org.apache.pulsar.client.impl.BackoffBuilder;
@@ -32,7 +31,6 @@ import org.apache.pulsar.common.naming.NamespaceBundle;
 import org.apache.pulsar.common.naming.TopicName;
 import org.apache.pulsar.common.policies.data.TopicPolicies;
 import org.apache.pulsar.common.util.FutureUtil;
-import org.jetbrains.annotations.NotNull;
 
 /**
  * Topic policies service.
@@ -111,32 +109,6 @@ public interface TopicPoliciesService {
         return response;
     }
 
-    /**
-     * Asynchronously retrieves topic policies.
-     * This triggers the Pulsar broker's internal client to load policies from 
the
-     * system topic `persistent://tenant/namespace/__change_event`.
-     *
-     * @param topicName The name of the topic.
-     * @param isGlobal Indicates if the policies are global.
-     * @return A CompletableFuture containing an Optional of TopicPolicies.
-     * @throws NullPointerException If the topicName is null.
-     */
-    @Nonnull
-    CompletableFuture<Optional<TopicPolicies>> getTopicPoliciesAsync(@Nonnull 
TopicName topicName, boolean isGlobal);
-
-    /**
-     * Asynchronously retrieves topic policies.
-     * This triggers the Pulsar broker's internal client to load policies from 
the
-     * system topic `persistent://tenant/namespace/__change_event`.
-     *
-     * NOTE: If local policies are not available, it will fallback to using 
topic global policies.
-     * @param topicName The name of the topic.
-     * @return A CompletableFuture containing an Optional of TopicPolicies.
-     * @throws NullPointerException If the topicName is null.
-     */
-    @Nonnull
-    CompletableFuture<Optional<TopicPolicies>> getTopicPoliciesAsync(@Nonnull 
TopicName topicName);
-
     /**
      * Get policies for a topic without cache async.
      * @param topicName topic name
@@ -190,19 +162,6 @@ public interface TopicPoliciesService {
             return null;
         }
 
-        @NotNull
-        @Override
-        public CompletableFuture<Optional<TopicPolicies>> 
getTopicPoliciesAsync(@NotNull TopicName topicName,
-                                                                               
 boolean isGlobal) {
-            return CompletableFuture.completedFuture(Optional.empty());
-        }
-
-        @NotNull
-        @Override
-        public CompletableFuture<Optional<TopicPolicies>> 
getTopicPoliciesAsync(@NotNull TopicName topicName) {
-            return CompletableFuture.completedFuture(Optional.empty());
-        }
-
         @Override
         public TopicPolicies getTopicPoliciesIfExists(TopicName topicName) {
             return null;
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/PersistentTopicsTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/PersistentTopicsTest.java
index 8dee90af4af..284e50c8302 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/PersistentTopicsTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/PersistentTopicsTest.java
@@ -126,7 +126,6 @@ public class PersistentTopicsTest extends 
MockedPulsarServiceBaseTest {
     @Override
     @BeforeMethod
     protected void setup() throws Exception {
-        conf.setTopicLevelPoliciesEnabled(false);
         super.internalSetup();
         persistentTopics = spy(PersistentTopics.class);
         persistentTopics.setServletContext(new MockServletContext());
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TopicPoliciesTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TopicPoliciesTest.java
index faf141a5d1c..87471f4972f 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TopicPoliciesTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TopicPoliciesTest.java
@@ -180,7 +180,7 @@ public class TopicPoliciesTest extends 
MockedPulsarServiceBaseTest {
         assertFalse(pulsar.getBrokerService().getTopics().containsKey(topic));
         //make sure namespace policy reader is fully started.
         Awaitility.await().untilAsserted(()-> {
-            
assertTrue(policyService.getPoliciesCacheInit(topicName.getNamespaceObject()).isDone());
+            
assertTrue(policyService.getPoliciesCacheInit(topicName.getNamespaceObject()));
         });
 
         //load the topic.
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TopicPoliciesWithBrokerRestartTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TopicPoliciesWithBrokerRestartTest.java
deleted file mode 100644
index 672fc2c95f8..00000000000
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TopicPoliciesWithBrokerRestartTest.java
+++ /dev/null
@@ -1,104 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.pulsar.broker.admin;
-
-import lombok.Cleanup;
-import lombok.extern.slf4j.Slf4j;
-import org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl;
-import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest;
-import org.apache.pulsar.broker.service.persistent.PersistentTopic;
-import org.apache.pulsar.client.api.Producer;
-import org.apache.pulsar.common.policies.data.RetentionPolicies;
-import org.awaitility.Awaitility;
-import org.testng.Assert;
-import org.testng.annotations.AfterClass;
-import org.testng.annotations.BeforeClass;
-import org.testng.annotations.Test;
-import java.nio.charset.StandardCharsets;
-import java.util.concurrent.TimeUnit;
-
-@Slf4j
-@Test(groups = "broker-admin")
-public class TopicPoliciesWithBrokerRestartTest extends 
MockedPulsarServiceBaseTest {
-
-    @Override
-    @BeforeClass(alwaysRun = true)
-    protected void setup() throws Exception {
-        super.internalSetup();
-        setupDefaultTenantAndNamespace();
-    }
-
-    @Override
-    @AfterClass(alwaysRun = true)
-    protected void cleanup() throws Exception {
-        super.internalCleanup();
-    }
-
-
-    @Test
-    public void testRetentionWithBrokerRestart() throws Exception {
-        final int messages = 1_000;
-        final int topicNum = 500;
-        // (1) Init topic
-        admin.namespaces().createNamespace("public/retention");
-        final String topicName = 
"persistent://public/retention/retention_with_broker_restart";
-        admin.topics().createNonPartitionedTopic(topicName);
-        for (int i = 0; i < topicNum; i++) {
-            final String shadowTopicNames = topicName + "_" + i;
-            admin.topics().createNonPartitionedTopic(shadowTopicNames);
-        }
-        // (2) Set retention
-        final RetentionPolicies retentionPolicies = new RetentionPolicies(20, 
20);
-        for (int i = 0; i < topicNum; i++) {
-            final String shadowTopicNames = topicName + "_" + i;
-            admin.topicPolicies().setRetention(shadowTopicNames, 
retentionPolicies);
-        }
-        admin.topicPolicies().setRetention(topicName, retentionPolicies);
-        // (3) Send messages
-        @Cleanup
-        final Producer<byte[]> publisher = pulsarClient.newProducer()
-                .topic(topicName)
-                .create();
-        for (int i = 0; i < messages; i++) {
-            publisher.send((i + "").getBytes(StandardCharsets.UTF_8));
-        }
-        // (4) Check configuration
-        Awaitility.await().untilAsserted(() -> {
-            final PersistentTopic persistentTopic1 = (PersistentTopic)
-                    pulsar.getBrokerService().getTopic(topicName, 
true).join().get();
-            final ManagedLedgerImpl managedLedger1 = (ManagedLedgerImpl) 
persistentTopic1.getManagedLedger();
-            
Assert.assertEquals(managedLedger1.getConfig().getRetentionSizeInMB(), 20);
-            
Assert.assertEquals(managedLedger1.getConfig().getRetentionTimeMillis(),
-                    TimeUnit.MINUTES.toMillis(20));
-        });
-        // (5) Restart broker
-        restartBroker();
-        // (6) Check configuration again
-        for (int i = 0; i < topicNum; i++) {
-            final String shadowTopicNames = topicName + "_" + i;
-            admin.lookups().lookupTopic(shadowTopicNames);
-            final PersistentTopic persistentTopicTmp = (PersistentTopic)
-                    pulsar.getBrokerService().getTopic(shadowTopicNames, 
true).join().get();
-            final ManagedLedgerImpl managedLedgerTemp = (ManagedLedgerImpl) 
persistentTopicTmp.getManagedLedger();
-            
Assert.assertEquals(managedLedgerTemp.getConfig().getRetentionSizeInMB(), 20);
-            
Assert.assertEquals(managedLedgerTemp.getConfig().getRetentionTimeMillis(),
-                    TimeUnit.MINUTES.toMillis(20));
-        }
-    }
-}
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TopicsAuthTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TopicsAuthTest.java
index 234af7afa8d..efd8b66d754 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TopicsAuthTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TopicsAuthTest.java
@@ -84,8 +84,6 @@ public class TopicsAuthTest extends 
MockedPulsarServiceBaseTest {
         Set<String> providers = new HashSet<>();
         providers.add(AuthenticationProviderToken.class.getName());
         conf.setAuthenticationProviders(providers);
-        
conf.setBrokerClientAuthenticationPlugin(AuthenticationToken.class.getName());
-        conf.setBrokerClientAuthenticationParameters("token:" + ADMIN_TOKEN);
         super.internalSetup();
         PulsarAdminBuilder pulsarAdminBuilder = 
PulsarAdmin.builder().serviceHttpUrl(brokerUrl != null
                 ? brokerUrl.toString() : brokerUrlTls.toString())
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/auth/AuthLogsTest.java 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/auth/AuthLogsTest.java
index 942a42fa7aa..6ffcecbeb9f 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/auth/AuthLogsTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/auth/AuthLogsTest.java
@@ -60,8 +60,6 @@ public class AuthLogsTest extends MockedPulsarServiceBaseTest 
{
         conf.setAuthorizationEnabled(true);
         conf.setAuthorizationAllowWildcardsMatching(true);
         conf.setSuperUserRoles(Sets.newHashSet("super"));
-        
conf.setBrokerClientAuthenticationPlugin(MockAuthentication.class.getName());
-        conf.setBrokerClientAuthenticationParameters("user:pass.pass");
         internalSetup();
 
         try (PulsarAdmin admin = PulsarAdmin.builder()
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/auth/MockAuthentication.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/auth/MockAuthentication.java
index 25ac59796b0..0b1726617f7 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/auth/MockAuthentication.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/auth/MockAuthentication.java
@@ -29,10 +29,7 @@ import org.slf4j.LoggerFactory;
 
 public class MockAuthentication implements Authentication {
     private static final Logger log = 
LoggerFactory.getLogger(MockAuthentication.class);
-    private String user;
-
-    public MockAuthentication() {
-    }
+    private final String user;
 
     public MockAuthentication(String user) {
         this.user = user;
@@ -70,7 +67,6 @@ public class MockAuthentication implements Authentication {
 
     @Override
     public void configure(Map<String, String> authParams) {
-        this.user = authParams.get("user");
     }
 
     @Override
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/auth/MockedPulsarServiceBaseTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/auth/MockedPulsarServiceBaseTest.java
index 0f04088e21a..b16dc27e265 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/auth/MockedPulsarServiceBaseTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/auth/MockedPulsarServiceBaseTest.java
@@ -42,7 +42,6 @@ import lombok.AllArgsConstructor;
 import lombok.Data;
 import org.apache.pulsar.broker.PulsarService;
 import org.apache.pulsar.broker.ServiceConfiguration;
-import org.apache.pulsar.broker.authentication.AuthenticationProviderTls;
 import org.apache.pulsar.broker.service.BrokerService;
 import org.apache.pulsar.broker.service.BrokerTestBase;
 import org.apache.pulsar.broker.service.persistent.PersistentTopic;
@@ -54,8 +53,6 @@ import org.apache.pulsar.client.api.ClientBuilder;
 import org.apache.pulsar.client.api.PulsarClient;
 import org.apache.pulsar.client.api.PulsarClientException;
 import org.apache.pulsar.client.impl.ProducerImpl;
-import org.apache.pulsar.client.impl.auth.AuthenticationDisabled;
-import org.apache.pulsar.client.impl.auth.AuthenticationTls;
 import org.apache.pulsar.common.policies.data.ClusterData;
 import org.apache.pulsar.common.policies.data.TenantInfo;
 import org.apache.pulsar.common.policies.data.TenantInfoImpl;
@@ -225,22 +222,6 @@ public abstract class MockedPulsarServiceBaseTest extends 
TestRetrySupport {
 
     protected final void init() throws Exception {
         doInitConf();
-        // trying to config the broker internal client
-        if (conf.getWebServicePortTls().isPresent()
-            && 
conf.getAuthenticationProviders().contains(AuthenticationProviderTls.class.getName())
-            && !conf.isTlsEnabledWithKeyStore()) {
-            // enabled TLS
-            if (conf.getBrokerClientAuthenticationPlugin() == null
-                || 
conf.getBrokerClientAuthenticationPlugin().equals(AuthenticationDisabled.class.getName()))
 {
-                
conf.setBrokerClientAuthenticationPlugin(AuthenticationTls.class.getName());
-                conf.setBrokerClientAuthenticationParameters("tlsCertFile:" + 
BROKER_CERT_FILE_PATH
-                                                             + ",tlsKeyFile:" 
+ BROKER_KEY_FILE_PATH);
-                conf.setBrokerClientTlsEnabled(true);
-                conf.setBrokerClientTrustCertsFilePath(CA_CERT_FILE_PATH);
-                conf.setBrokerClientCertificateFilePath(BROKER_CERT_FILE_PATH);
-                conf.setBrokerClientKeyFilePath(BROKER_KEY_FILE_PATH);
-            }
-        }
         startBroker();
     }
 
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerBookieIsolationTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerBookieIsolationTest.java
index 5252407892e..951892f4ebf 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerBookieIsolationTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerBookieIsolationTest.java
@@ -304,7 +304,6 @@ public class BrokerBookieIsolationTest {
                 bookies[3].getBookieId());
 
         ServiceConfiguration config = new ServiceConfiguration();
-        config.setTopicLevelPoliciesEnabled(false);
         config.setLoadManagerClassName(ModularLoadManagerImpl.class.getName());
         config.setClusterName(cluster);
         config.setWebServicePort(Optional.of(0));
@@ -613,9 +612,9 @@ public class BrokerBookieIsolationTest {
         config.setBrokerShutdownTimeoutMs(0L);
         config.setLoadBalancerOverrideBrokerNicSpeedGbps(Optional.of(1.0d));
         config.setBrokerServicePort(Optional.of(0));
-        config.setTopicLevelPoliciesEnabled(false);
         config.setAdvertisedAddress("localhost");
         
config.setBookkeeperClientIsolationGroups(brokerBookkeeperClientIsolationGroups);
+
         config.setManagedLedgerDefaultEnsembleSize(2);
         config.setManagedLedgerDefaultWriteQuorum(2);
         config.setManagedLedgerDefaultAckQuorum(2);
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesServiceTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesServiceTest.java
index 5b70ff99675..31b5bcb23cd 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesServiceTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesServiceTest.java
@@ -141,7 +141,7 @@ public class SystemTopicBasedTopicPoliciesServiceTest 
extends MockedPulsarServic
         // Wait for all topic policies updated.
         Awaitility.await().untilAsserted(() ->
                 Assert.assertTrue(systemTopicBasedTopicPoliciesService
-                        
.getPoliciesCacheInit(TOPIC1.getNamespaceObject()).isDone()));
+                        .getPoliciesCacheInit(TOPIC1.getNamespaceObject())));
 
         // Assert broker is cache all topic policies
         Awaitility.await().untilAsserted(() ->
@@ -304,8 +304,8 @@ public class SystemTopicBasedTopicPoliciesServiceTest 
extends MockedPulsarServic
     @Test
     public void testGetPolicyTimeout() throws Exception {
         SystemTopicBasedTopicPoliciesService service = 
(SystemTopicBasedTopicPoliciesService) pulsar.getTopicPoliciesService();
-        Awaitility.await().untilAsserted(() -> 
assertTrue(service.policyCacheInitMap.get(TOPIC1.getNamespaceObject()).isDone()));
-        service.policyCacheInitMap.put(TOPIC1.getNamespaceObject(), new 
CompletableFuture<>());
+        Awaitility.await().untilAsserted(() -> 
assertTrue(service.policyCacheInitMap.get(TOPIC1.getNamespaceObject())));
+        service.policyCacheInitMap.put(TOPIC1.getNamespaceObject(), false);
         long start = System.currentTimeMillis();
         Backoff backoff = new BackoffBuilder()
                 .setInitialTime(500, TimeUnit.MILLISECONDS)
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentTopicTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentTopicTest.java
index ac2727e33eb..41704af0b8b 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentTopicTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentTopicTest.java
@@ -45,7 +45,6 @@ import java.util.Collection;
 import java.util.Collections;
 import java.util.HashSet;
 import java.util.List;
-import java.util.Optional;
 import java.util.Set;
 import java.util.UUID;
 import java.util.concurrent.CompletableFuture;
@@ -621,7 +620,7 @@ public class PersistentTopicTest extends BrokerTestBase {
         doReturn(policiesService).when(pulsar).getTopicPoliciesService();
         TopicPolicies policies = new TopicPolicies();
         policies.setRetentionPolicies(retentionPolicies);
-        
doReturn(CompletableFuture.completedFuture(Optional.of(policies))).when(policiesService).getTopicPoliciesAsync(TopicName.get(topic));
+        
doReturn(policies).when(policiesService).getTopicPoliciesIfExists(TopicName.get(topic));
         persistentTopic.onUpdate(policies);
         verify(persistentTopic, times(1)).checkPersistencePolicies();
         Awaitility.await().untilAsserted(() -> {
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/AuthenticationTlsHostnameVerificationTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/AuthenticationTlsHostnameVerificationTest.java
index a392f150e85..f2631f59121 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/AuthenticationTlsHostnameVerificationTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/AuthenticationTlsHostnameVerificationTest.java
@@ -30,7 +30,6 @@ import 
org.apache.pulsar.broker.authentication.AuthenticationProviderTls;
 import org.apache.pulsar.client.impl.auth.AuthenticationTls;
 import org.apache.pulsar.common.tls.PublicSuffixMatcher;
 import org.apache.pulsar.common.tls.TlsHostnameVerifier;
-import org.assertj.core.util.Sets;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.testng.Assert;
@@ -148,7 +147,6 @@ public class AuthenticationTlsHostnameVerificationTest 
extends ProducerConsumerB
         // setup broker cert which has CN = "pulsar" different than broker's 
hostname="localhost"
         conf.setBrokerServicePortTls(Optional.of(0));
         conf.setWebServicePortTls(Optional.of(0));
-        
conf.setAuthenticationProviders(Sets.newTreeSet(AuthenticationProviderTls.class.getName()));
         conf.setTlsTrustCertsFilePath(TLS_TRUST_CERT_FILE_PATH);
         conf.setTlsCertificateFilePath(TLS_MIM_SERVER_CERT_FILE_PATH);
         conf.setTlsKeyFilePath(TLS_MIM_SERVER_KEY_FILE_PATH);
@@ -190,7 +188,6 @@ public class AuthenticationTlsHostnameVerificationTest 
extends ProducerConsumerB
         // setup broker cert which has CN = "localhost"
         conf.setBrokerServicePortTls(Optional.of(0));
         conf.setWebServicePortTls(Optional.of(0));
-        
conf.setAuthenticationProviders(Sets.newTreeSet(AuthenticationProviderTls.class.getName()));
         conf.setTlsTrustCertsFilePath(TLS_TRUST_CERT_FILE_PATH);
         conf.setTlsCertificateFilePath(TLS_SERVER_CERT_FILE_PATH);
         conf.setTlsKeyFilePath(TLS_SERVER_KEY_FILE_PATH);
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/AuthorizationProducerConsumerTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/AuthorizationProducerConsumerTest.java
index ba41848bf2c..0ce3b7df07d 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/AuthorizationProducerConsumerTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/AuthorizationProducerConsumerTest.java
@@ -119,7 +119,6 @@ public class AuthorizationProducerConsumerTest extends 
ProducerConsumerBase {
     public void testProducerAndConsumerAuthorization() throws Exception {
         log.info("-- Starting {} test --", methodName);
         cleanup();
-        conf.setTopicLevelPoliciesEnabled(false);
         
conf.setAuthorizationProvider(TestAuthorizationProvider.class.getName());
         setup();
 
@@ -180,7 +179,6 @@ public class AuthorizationProducerConsumerTest extends 
ProducerConsumerBase {
     public void testSubscriberPermission() throws Exception {
         log.info("-- Starting {} test --", methodName);
         cleanup();
-        conf.setTopicLevelPoliciesEnabled(false);
         conf.setEnablePackagesManagement(true);
         
conf.setPackagesManagementStorageProvider(MockedPackagesStorageProvider.class.getName());
         
conf.setAuthorizationProvider(PulsarAuthorizationProvider.class.getName());
@@ -371,7 +369,6 @@ public class AuthorizationProducerConsumerTest extends 
ProducerConsumerBase {
     public void testClearBacklogPermission() throws Exception {
         log.info("-- Starting {} test --", methodName);
         cleanup();
-        conf.setTopicLevelPoliciesEnabled(false);
         
conf.setAuthorizationProvider(PulsarAuthorizationProvider.class.getName());
         setup();
 
@@ -613,7 +610,6 @@ public class AuthorizationProducerConsumerTest extends 
ProducerConsumerBase {
     public void testSubscriptionPrefixAuthorization() throws Exception {
         log.info("-- Starting {} test --", methodName);
         cleanup();
-        conf.setTopicLevelPoliciesEnabled(false);
         
conf.setAuthorizationProvider(TestAuthorizationProviderWithSubscriptionPrefix.class.getName());
         setup();
 
@@ -698,7 +694,6 @@ public class AuthorizationProducerConsumerTest extends 
ProducerConsumerBase {
     public void testPermissionForProducerCreateInitialSubscription() throws 
Exception {
         log.info("-- Starting {} test --", methodName);
         cleanup();
-        conf.setTopicLevelPoliciesEnabled(false);
         
conf.setAuthorizationProvider(PulsarAuthorizationProvider.class.getName());
         setup();
 
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/MutualAuthenticationTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/MutualAuthenticationTest.java
index 81d65b19204..2fc8aebf64a 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/MutualAuthenticationTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/MutualAuthenticationTest.java
@@ -195,7 +195,7 @@ public class MutualAuthenticationTest extends 
ProducerConsumerBase {
         Set<String> superUserRoles = new HashSet<>();
         superUserRoles.add("admin");
         conf.setSuperUserRoles(superUserRoles);
-        conf.setTopicLevelPoliciesEnabled(false);
+
         conf.setAuthorizationEnabled(true);
         conf.setAuthenticationEnabled(true);
         Set<String> providersClassNames = 
Sets.newHashSet(MutualAuthenticationProvider.class.getName());
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/TokenAuthenticatedProducerConsumerTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/TokenAuthenticatedProducerConsumerTest.java
index 4d5e7deaf7d..87f12e6acdc 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/TokenAuthenticatedProducerConsumerTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/TokenAuthenticatedProducerConsumerTest.java
@@ -37,7 +37,6 @@ import java.util.Set;
 import java.util.concurrent.TimeUnit;
 import org.apache.pulsar.broker.authentication.AuthenticationProviderToken;
 import org.apache.pulsar.client.admin.PulsarAdmin;
-import org.apache.pulsar.client.impl.auth.AuthenticationToken;
 import org.apache.pulsar.common.policies.data.ClusterData;
 import org.apache.pulsar.common.policies.data.TenantInfoImpl;
 import org.slf4j.Logger;
@@ -93,8 +92,6 @@ public class TokenAuthenticatedProducerConsumerTest extends 
ProducerConsumerBase
         Set<String> providers = new HashSet<>();
         providers.add(AuthenticationProviderToken.class.getName());
         conf.setAuthenticationProviders(providers);
-        
conf.setBrokerClientAuthenticationPlugin(AuthenticationToken.class.getName());
-        conf.setBrokerClientAuthenticationParameters("token:" + ADMIN_TOKEN);
 
         conf.setClusterName("test");
 
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/TokenOauth2AuthenticatedProducerConsumerTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/TokenOauth2AuthenticatedProducerConsumerTest.java
index ba43ee6d6a2..fdf41c4a6ad 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/TokenOauth2AuthenticatedProducerConsumerTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/TokenOauth2AuthenticatedProducerConsumerTest.java
@@ -27,9 +27,7 @@ import java.net.URL;
 import java.nio.file.Path;
 import java.nio.file.Paths;
 import java.time.Duration;
-import java.util.HashMap;
 import java.util.HashSet;
-import java.util.Map;
 import java.util.Properties;
 import java.util.Set;
 import java.util.concurrent.TimeUnit;
@@ -41,7 +39,6 @@ import 
org.apache.pulsar.client.impl.auth.oauth2.AuthenticationFactoryOAuth2;
 import org.apache.pulsar.client.impl.auth.oauth2.AuthenticationOAuth2;
 import org.apache.pulsar.common.policies.data.ClusterData;
 import org.apache.pulsar.common.policies.data.TenantInfoImpl;
-import org.apache.pulsar.common.util.ObjectMapperFactory;
 import org.awaitility.Awaitility;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -90,12 +87,11 @@ public class TokenOauth2AuthenticatedProducerConsumerTest 
extends ProducerConsum
         conf.setAuthenticationProviders(providers);
 
         
conf.setBrokerClientAuthenticationPlugin(AuthenticationOAuth2.class.getName());
-        final Map<String, String> oauth2Param = new HashMap<>();
-        oauth2Param.put("privateKey", CREDENTIALS_FILE);
-        oauth2Param.put("issuerUrl", server.getIssuer());
-        oauth2Param.put("audience", audience);
-        conf.setBrokerClientAuthenticationParameters(ObjectMapperFactory
-                
.getMapper().getObjectMapper().writeValueAsString(oauth2Param));
+        conf.setBrokerClientAuthenticationParameters("{\n"
+                + "  \"privateKey\": \"" + CREDENTIALS_FILE + "\",\n"
+                + "  \"issuerUrl\": \"" + server.getIssuer() + "\",\n"
+                + "  \"audience\": \"" + audience + "\",\n"
+                + "}\n");
 
         conf.setClusterName("test");
 
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/KeyStoreTlsProducerConsumerTestWithAuthTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/KeyStoreTlsProducerConsumerTestWithAuthTest.java
index 77405e14201..8e508b6cf20 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/KeyStoreTlsProducerConsumerTestWithAuthTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/KeyStoreTlsProducerConsumerTestWithAuthTest.java
@@ -32,7 +32,6 @@ import java.util.function.Supplier;
 
 import io.jsonwebtoken.SignatureAlgorithm;
 import lombok.Cleanup;
-import lombok.SneakyThrows;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.pulsar.broker.authentication.AuthenticationProviderTls;
 import org.apache.pulsar.broker.authentication.AuthenticationProviderToken;
@@ -50,7 +49,6 @@ import 
org.apache.pulsar.client.impl.auth.AuthenticationKeyStoreTls;
 import org.apache.pulsar.client.impl.auth.AuthenticationToken;
 import org.apache.pulsar.common.policies.data.ClusterData;
 import org.apache.pulsar.common.policies.data.TenantInfoImpl;
-import org.apache.pulsar.common.util.ObjectMapperFactory;
 import org.testng.Assert;
 import org.testng.annotations.AfterMethod;
 import org.testng.annotations.BeforeMethod;
@@ -85,7 +83,6 @@ public class KeyStoreTlsProducerConsumerTestWithAuthTest 
extends ProducerConsume
         super.internalCleanup();
     }
 
-    @SneakyThrows
     protected void internalSetUpForBroker() {
         conf.setBrokerServicePortTls(Optional.of(0));
         conf.setWebServicePortTls(Optional.of(0));
@@ -117,25 +114,6 @@ public class KeyStoreTlsProducerConsumerTestWithAuthTest 
extends ProducerConsume
 
         conf.setAuthenticationProviders(providers);
         conf.setNumExecutorThreadPoolSize(5);
-        Set<String> tlsProtocols = Sets.newConcurrentHashSet();
-        tlsProtocols.add("TLSv1.3");
-        tlsProtocols.add("TLSv1.2");
-        
conf.setBrokerClientAuthenticationPlugin(AuthenticationKeyStoreTls.class.getName());
-        Map<String, String> authParams = new HashMap<>();
-        authParams.put(AuthenticationKeyStoreTls.KEYSTORE_TYPE, KEYSTORE_TYPE);
-        authParams.put(AuthenticationKeyStoreTls.KEYSTORE_PATH, 
CLIENT_KEYSTORE_FILE_PATH);
-        authParams.put(AuthenticationKeyStoreTls.KEYSTORE_PW, 
CLIENT_KEYSTORE_PW);
-        
conf.setBrokerClientAuthenticationParameters(ObjectMapperFactory.getMapper()
-                .getObjectMapper().writeValueAsString(authParams));
-        conf.setBrokerClientTlsEnabled(true);
-        conf.setBrokerClientTlsEnabledWithKeyStore(true);
-        conf.setBrokerClientTlsTrustStore(BROKER_TRUSTSTORE_FILE_PATH);
-        conf.setBrokerClientTlsTrustStorePassword(BROKER_TRUSTSTORE_PW);
-        conf.setBrokerClientTlsKeyStore(CLIENT_KEYSTORE_FILE_PATH);
-        conf.setBrokerClientTlsKeyStoreType(KEYSTORE_TYPE);
-        conf.setBrokerClientTlsKeyStorePassword(CLIENT_KEYSTORE_PW);
-        conf.setBrokerClientTlsProtocols(tlsProtocols);
-
     }
 
     protected void internalSetUpForClient(boolean addCertificates, String 
lookupUrl) throws Exception {
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/PatternTopicsConsumerImplAuthTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/PatternTopicsConsumerImplAuthTest.java
index b9139dabdf0..76936334eb0 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/PatternTopicsConsumerImplAuthTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/PatternTopicsConsumerImplAuthTest.java
@@ -85,7 +85,6 @@ public class PatternTopicsConsumerImplAuthTest extends 
ProducerConsumerBase {
         // set isTcpLookup = true, to use BinaryProtoLookupService to get 
topics for a pattern.
         isTcpLookup = true;
 
-        conf.setTopicLevelPoliciesEnabled(false);
         conf.setAuthenticationEnabled(true);
         conf.setAuthorizationEnabled(true);
 
diff --git 
a/pulsar-broker/src/test/resources/configurations/standalone_no_client_auth.conf
 
b/pulsar-broker/src/test/resources/configurations/standalone_no_client_auth.conf
index 4e2fd402983..d9411e655ad 100644
--- 
a/pulsar-broker/src/test/resources/configurations/standalone_no_client_auth.conf
+++ 
b/pulsar-broker/src/test/resources/configurations/standalone_no_client_auth.conf
@@ -29,5 +29,4 @@ authenticationEnabled=true
 authenticationProviders=org.apache.pulsar.MockTokenAuthenticationProvider
 brokerClientAuthenticationPlugin=
 brokerClientAuthenticationParameters=
-loadBalancerOverrideBrokerNicSpeedGbps=2
-topicLevelPoliciesEnabled=false
\ No newline at end of file
+loadBalancerOverrideBrokerNicSpeedGbps=2
\ No newline at end of file
diff --git 
a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyAuthenticationTest.java
 
b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyAuthenticationTest.java
index 9c8e5197adf..8229d929ee5 100644
--- 
a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyAuthenticationTest.java
+++ 
b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyAuthenticationTest.java
@@ -168,7 +168,7 @@ public class ProxyAuthenticationTest extends 
ProducerConsumerBase {
                
conf.setBrokerClientAuthenticationPlugin(BasicAuthentication.class.getName());
                // Expires after an hour
                conf.setBrokerClientAuthenticationParameters(
-                               "entityType:admin,expiryTime:" + 
(System.currentTimeMillis() + 3600 * 1000));
+                               "entityType:broker,expiryTime:" + 
(System.currentTimeMillis() + 3600 * 1000));
 
                Set<String> superUserRoles = new HashSet<>();
                superUserRoles.add("admin");
diff --git 
a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyForwardAuthDataTest.java
 
b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyForwardAuthDataTest.java
index b7cfb874747..99af3b1cf6a 100644
--- 
a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyForwardAuthDataTest.java
+++ 
b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyForwardAuthDataTest.java
@@ -53,7 +53,7 @@ public class ProxyForwardAuthDataTest extends 
ProducerConsumerBase {
         conf.setAuthenticationEnabled(true);
         conf.setAuthorizationEnabled(true);
         
conf.setBrokerClientAuthenticationPlugin(BasicAuthentication.class.getName());
-        conf.setBrokerClientAuthenticationParameters("authParam:admin");
+        conf.setBrokerClientAuthenticationParameters("authParam:broker");
         conf.setAuthenticateOriginalAuthData(true);
 
         Set<String> superUserRoles = new HashSet<String>();
diff --git 
a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyRolesEnforcementTest.java
 
b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyRolesEnforcementTest.java
index 3259cfd95c7..2c8c382b6a5 100644
--- 
a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyRolesEnforcementTest.java
+++ 
b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyRolesEnforcementTest.java
@@ -144,7 +144,7 @@ public class ProxyRolesEnforcementTest extends 
ProducerConsumerBase {
         conf.setAuthenticationEnabled(true);
         conf.setAuthorizationEnabled(true);
         
conf.setBrokerClientAuthenticationPlugin(BasicAuthentication.class.getName());
-        conf.setBrokerClientAuthenticationParameters("authParam:admin");
+        conf.setBrokerClientAuthenticationParameters("authParam:broker");
 
         Set<String> superUserRoles = new HashSet<>();
         superUserRoles.add("admin");
diff --git 
a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyWithAuthorizationNegTest.java
 
b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyWithAuthorizationNegTest.java
index 2d97a4b06a8..e8bb128c8c1 100644
--- 
a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyWithAuthorizationNegTest.java
+++ 
b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyWithAuthorizationNegTest.java
@@ -78,8 +78,6 @@ public class ProxyWithAuthorizationNegTest extends 
ProducerConsumerBase {
     protected void setup() throws Exception {
 
         // enable tls and auth&auth at broker
-        conf.setTopicLevelPoliciesEnabled(false);
-
         conf.setAuthenticationEnabled(true);
         conf.setAuthorizationEnabled(true);
 
diff --git 
a/pulsar-sql/presto-pulsar/src/test/java/org/apache/pulsar/sql/presto/TestPulsarAuth.java
 
b/pulsar-sql/presto-pulsar/src/test/java/org/apache/pulsar/sql/presto/TestPulsarAuth.java
index 7b550b7270f..9119ffed4e2 100644
--- 
a/pulsar-sql/presto-pulsar/src/test/java/org/apache/pulsar/sql/presto/TestPulsarAuth.java
+++ 
b/pulsar-sql/presto-pulsar/src/test/java/org/apache/pulsar/sql/presto/TestPulsarAuth.java
@@ -38,7 +38,6 @@ import 
org.apache.pulsar.broker.authentication.utils.AuthTokenUtils;
 import org.apache.pulsar.client.admin.PulsarAdminBuilder;
 import org.apache.pulsar.client.admin.PulsarAdminException;
 import org.apache.pulsar.client.api.AuthenticationFactory;
-import org.apache.pulsar.client.impl.auth.AuthenticationToken;
 import org.apache.pulsar.common.policies.data.AuthAction;
 import org.apache.pulsar.common.policies.data.ClusterData;
 import org.apache.pulsar.common.policies.data.TenantInfoImpl;
@@ -64,9 +63,6 @@ public class TestPulsarAuth extends 
MockedPulsarServiceBaseTest {
         conf.setProperties(properties);
         conf.setSuperUserRoles(Sets.newHashSet(SUPER_USER_ROLE));
         conf.setClusterName("c1");
-        
conf.setBrokerClientAuthenticationPlugin(AuthenticationToken.class.getName());
-        conf.setBrokerClientAuthenticationParameters("token:" + AuthTokenUtils
-                .createToken(secretKey, SUPER_USER_ROLE, Optional.empty()));
         internalSetup();
 
         admin.clusters().createCluster("c1", ClusterData.builder().build());
diff --git 
a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/loadbalance/ExtensibleLoadManagerTest.java
 
b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/loadbalance/ExtensibleLoadManagerTest.java
index 49e5ae37834..057039edc3b 100644
--- 
a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/loadbalance/ExtensibleLoadManagerTest.java
+++ 
b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/loadbalance/ExtensibleLoadManagerTest.java
@@ -89,7 +89,6 @@ public class ExtensibleLoadManagerTest extends 
TestRetrySupport {
                 
"org.apache.pulsar.broker.loadbalance.extensions.scheduler.TransferShedder");
         brokerEnvs.put("forceDeleteNamespaceAllowed", "true");
         brokerEnvs.put("loadBalancerDebugModeEnabled", "true");
-        brokerEnvs.put("topicLevelPoliciesEnabled", "false");
         brokerEnvs.put("PULSAR_MEM", "-Xmx512M");
         spec.brokerEnvs(brokerEnvs);
         pulsarCluster = PulsarCluster.forSpec(spec);
diff --git 
a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/presto/TestPulsarSQLAuth.java
 
b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/presto/TestPulsarSQLAuth.java
index 87db46f2bb6..0a9bb5e1959 100644
--- 
a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/presto/TestPulsarSQLAuth.java
+++ 
b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/presto/TestPulsarSQLAuth.java
@@ -68,7 +68,6 @@ public class TestPulsarSQLAuth extends TestPulsarSQLBase {
         envMap.put("tokenSecretKey", 
AuthTokenUtils.encodeKeyBase64(secretKey));
         envMap.put("superUserRoles", "admin");
         envMap.put("brokerDeleteInactiveTopicsEnabled", "false");
-        envMap.put("topicLevelPoliciesEnabled", "false");
 
         for (BrokerContainer brokerContainer : pulsarCluster.getBrokers()) {
             brokerContainer.withEnv(envMap);

Reply via email to