This is an automated email from the ASF dual-hosted git repository.

yubiao pushed a commit to branch branch-3.0
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/branch-3.0 by this push:
     new c0e5ce5ce07 [fix][broker]Global topic policies do not affect after 
unloading topic and persistence global topic policies never affect (#24279)
c0e5ce5ce07 is described below

commit c0e5ce5ce07186aa8804840c4c3d48368bbdc792
Author: fengyubiao <yubiao.f...@streamnative.io>
AuthorDate: Fri May 16 13:35:30 2025 +0800

    [fix][broker]Global topic policies do not affect after unloading topic and 
persistence global topic policies never affect (#24279)
    
    (cherry picked from commit 46c2b74f4c9727d6399e9a1b48b031696d8c7cfd)
---
 .../pulsar/broker/service/BrokerService.java       |  89 ++++++++++-------
 .../SystemTopicBasedTopicPoliciesService.java      |  14 +++
 .../broker/service/persistent/PersistentTopic.java |  12 ++-
 .../pulsar/broker/admin/TopicPoliciesTest.java     | 108 +++++++++++++++++++++
 .../pulsar/broker/service/BrokerServiceTest.java   |   2 +-
 5 files changed, 184 insertions(+), 41 deletions(-)

diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
index 4ccb111aa00..b48159d7ab5 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
@@ -94,6 +94,7 @@ import 
org.apache.bookkeeper.mledger.impl.NonAppendableLedgerOffloader;
 import org.apache.bookkeeper.mledger.util.Futures;
 import org.apache.commons.collections4.MapUtils;
 import org.apache.commons.lang3.StringUtils;
+import org.apache.commons.lang3.tuple.ImmutablePair;
 import 
org.apache.pulsar.bookie.rackawareness.IsolatedBookieEnsemblePlacementPolicy;
 import org.apache.pulsar.broker.PulsarServerException;
 import org.apache.pulsar.broker.PulsarService;
@@ -1101,7 +1102,10 @@ public class BrokerService implements Closeable {
                     if (!exists && !createIfMissing) {
                         return 
CompletableFuture.completedFuture(Optional.empty());
                     }
-                    return 
getTopicPoliciesBypassSystemTopic(topicName).exceptionally(ex -> {
+                    // The topic level policies are not needed now, but the 
meaning of calling
+                    // "getTopicPoliciesBypassSystemTopic" will wait for 
system topic policies initialization.
+                    return getTopicPoliciesBypassSystemTopic(topicName, false)
+                            .exceptionally(ex -> {
                         final Throwable rc = 
FutureUtil.unwrapCompletionException(ex);
                         final String errorInfo = String.format("Topic creation 
encountered an exception by initialize"
                                 + " topic policies service. topic_name=%s 
error_message=%s", topicName,
@@ -1109,7 +1113,6 @@ public class BrokerService implements Closeable {
                         log.error(errorInfo, rc);
                         throw FutureUtil.wrapToCompletionException(new 
ServiceUnitNotReadyException(errorInfo));
                     }).thenCompose(optionalTopicPolicies -> {
-                        final TopicPolicies topicPolicies = 
optionalTopicPolicies.orElse(null);
                         if (topicName.isPartitioned()) {
                             final TopicName topicNameEntity = 
TopicName.get(topicName.getPartitionedTopicName());
                             return 
fetchPartitionedTopicMetadataAsync(topicNameEntity)
@@ -1120,7 +1123,7 @@ public class BrokerService implements Closeable {
                                                 || 
topicName.getPartitionIndex() < metadata.partitions) {
                                             return 
topics.computeIfAbsent(topicName.toString(), (tpName) ->
                                                     
loadOrCreatePersistentTopic(tpName,
-                                                            createIfMissing, 
properties, topicPolicies));
+                                                            createIfMissing, 
properties));
                                         } else {
                                             final String errorMsg =
                                                     String.format("Illegal 
topic partition name %s with max allowed "
@@ -1132,7 +1135,7 @@ public class BrokerService implements Closeable {
                                     });
                         } else {
                             return 
topics.computeIfAbsent(topicName.toString(), (tpName) ->
-                                    loadOrCreatePersistentTopic(tpName, 
createIfMissing, properties, topicPolicies));
+                                    loadOrCreatePersistentTopic(tpName, 
createIfMissing, properties));
                         }
                     });
                 });
@@ -1203,13 +1206,14 @@ public class BrokerService implements Closeable {
         }
     }
 
-    private CompletableFuture<Optional<TopicPolicies>> 
getTopicPoliciesBypassSystemTopic(@Nonnull TopicName topicName) {
+    private CompletableFuture<Optional<TopicPolicies>> 
getTopicPoliciesBypassSystemTopic(@Nonnull TopicName topicName,
+                                                                               
          boolean isGlobal) {
         Objects.requireNonNull(topicName);
         final ServiceConfiguration serviceConfiguration = 
pulsar.getConfiguration();
         if (serviceConfiguration.isSystemTopicEnabled() && 
serviceConfiguration.isTopicLevelPoliciesEnabled()
                 && 
!NamespaceService.isSystemServiceNamespace(topicName.getNamespace())
                 && 
!SystemTopicNames.isTopicPoliciesSystemTopic(topicName.toString())) {
-            return 
pulsar.getTopicPoliciesService().getTopicPoliciesAsync(topicName);
+            return 
pulsar.getTopicPoliciesService().getTopicPoliciesAsync(topicName, isGlobal);
         } else {
             return CompletableFuture.completedFuture(Optional.empty());
         }
@@ -1620,7 +1624,7 @@ public class BrokerService implements Closeable {
      * @throws RuntimeException
      */
     protected CompletableFuture<Optional<Topic>> 
loadOrCreatePersistentTopic(final String topic,
-            boolean createIfMissing, Map<String, String> properties, @Nullable 
TopicPolicies topicPolicies) {
+            boolean createIfMissing, Map<String, String> properties) {
         final CompletableFuture<Optional<Topic>> topicFuture = 
FutureUtil.createFutureWithTimeout(
                 
Duration.ofSeconds(pulsar.getConfiguration().getTopicLoadTimeoutSeconds()), 
executor(),
                 () -> FAILED_TO_LOAD_TOPIC_TIMEOUT_EXCEPTION);
@@ -1636,7 +1640,7 @@ public class BrokerService implements Closeable {
 
                     if (topicLoadSemaphore.tryAcquire()) {
                         checkOwnershipAndCreatePersistentTopic(topic, 
createIfMissing, topicFuture,
-                                properties, topicPolicies);
+                                properties);
                         topicFuture.handle((persistentTopic, ex) -> {
                             // release permit and process pending topic
                             topicLoadSemaphore.release();
@@ -1645,7 +1649,7 @@ public class BrokerService implements Closeable {
                         });
                     } else {
                         pendingTopicLoadingQueue.add(new 
TopicLoadingContext(topic,
-                                topicFuture, properties, topicPolicies));
+                                topicFuture, properties));
                         if (log.isDebugEnabled()) {
                             log.debug("topic-loading for {} added into pending 
queue", topic);
                         }
@@ -1687,7 +1691,7 @@ public class BrokerService implements Closeable {
 
     private void checkOwnershipAndCreatePersistentTopic(final String topic, 
boolean createIfMissing,
                                        CompletableFuture<Optional<Topic>> 
topicFuture,
-                                       Map<String, String> properties, 
@Nullable TopicPolicies topicPolicies) {
+                                       Map<String, String> properties) {
         TopicName topicName = TopicName.get(topic);
         pulsar.getNamespaceService().isServiceUnitActiveAsync(topicName)
                 .thenAccept(isActive -> {
@@ -1701,8 +1705,8 @@ public class BrokerService implements Closeable {
                         }
                         propertiesFuture.thenAccept(finalProperties ->
                                 //TODO add topicName in properties?
-                                createPersistentTopic(topic, createIfMissing, 
topicFuture,
-                                        finalProperties, topicPolicies)
+                                createPersistentTopic0(topic, createIfMissing, 
topicFuture,
+                                        finalProperties)
                         ).exceptionally(throwable -> {
                             log.warn("[{}] Read topic property failed", topic, 
throwable);
                             pulsar.getExecutor().execute(() -> 
topics.remove(topic, topicFuture));
@@ -1723,17 +1727,10 @@ public class BrokerService implements Closeable {
                 });
     }
 
-
     @VisibleForTesting
     public void createPersistentTopic0(final String topic, boolean 
createIfMissing,
                                        CompletableFuture<Optional<Topic>> 
topicFuture,
                                        Map<String, String> properties) {
-        createPersistentTopic(topic, createIfMissing, topicFuture, properties, 
null);
-    }
-
-    private void createPersistentTopic(final String topic, boolean 
createIfMissing,
-                                       CompletableFuture<Optional<Topic>> 
topicFuture,
-                                       Map<String, String> properties, 
@Nullable TopicPolicies topicPolicies) {
         TopicName topicName = TopicName.get(topic);
         final long topicCreateTimeMs = 
TimeUnit.NANOSECONDS.toMillis(System.nanoTime());
 
@@ -1750,7 +1747,7 @@ public class BrokerService implements Closeable {
                 : CompletableFuture.completedFuture(null);
 
         maxTopicsCheck.thenCompose(__ ->
-                        getManagedLedgerConfig(topicName, 
topicPolicies)).thenAccept(managedLedgerConfig -> {
+                        
getManagedLedgerConfig(topicName)).thenAccept(managedLedgerConfig -> {
             if (isBrokerEntryMetadataEnabled() || 
isBrokerPayloadProcessorEnabled()) {
                 // init managedLedger interceptor
                 Set<BrokerEntryMetadataInterceptor> interceptors = new 
HashSet<>();
@@ -1884,31 +1881,45 @@ public class BrokerService implements Closeable {
         });
     }
 
-    public CompletableFuture<ManagedLedgerConfig> 
getManagedLedgerConfig(@Nonnull TopicName topicName) {
-        final CompletableFuture<Optional<TopicPolicies>> topicPoliciesFuture =
-                getTopicPoliciesBypassSystemTopic(topicName);
-        return topicPoliciesFuture.thenCompose(optionalTopicPolicies ->
-                getManagedLedgerConfig(topicName, 
optionalTopicPolicies.orElse(null)));
-    }
-
-    private CompletableFuture<ManagedLedgerConfig> 
getManagedLedgerConfig(@Nonnull TopicName topicName,
-                                                                          
@Nullable TopicPolicies topicPolicies) {
+    public CompletableFuture<ManagedLedgerConfig> 
getManagedLedgerConfig(TopicName topicName) {
         requireNonNull(topicName);
         NamespaceName namespace = topicName.getNamespaceObject();
         ServiceConfiguration serviceConfig = pulsar.getConfiguration();
 
         NamespaceResources nsr = 
pulsar.getPulsarResources().getNamespaceResources();
         LocalPoliciesResources lpr = 
pulsar.getPulsarResources().getLocalPolicies();
+        final CompletableFuture<Optional<TopicPolicies>> topicPoliciesFuture =
+                getTopicPoliciesBypassSystemTopic(topicName, false);
+        final CompletableFuture<Optional<TopicPolicies>> 
globalTopicPoliciesFuture =
+                getTopicPoliciesBypassSystemTopic(topicName, true);
         final CompletableFuture<Optional<Policies>> nsPolicies = 
nsr.getPoliciesAsync(namespace);
         final CompletableFuture<Optional<LocalPolicies>> lcPolicies = 
lpr.getLocalPoliciesAsync(namespace);
-        return nsPolicies.thenCombine(lcPolicies, (policies, localPolicies) -> 
{
+        return topicPoliciesFuture.thenCombine(globalTopicPoliciesFuture, 
(topicP, globalTopicP) -> {
+            return new ImmutablePair<>(topicP, globalTopicP);
+        }).thenCombine(nsPolicies, (topicPoliciesPair, np) -> {
+            return new ImmutablePair<>(topicPoliciesPair, np);
+        }).thenCombine(lcPolicies, (combined, localPolicies) -> {
+            Optional<TopicPolicies> topicP = combined.getLeft().getLeft();
+            Optional<TopicPolicies> globalTopicP = 
combined.getLeft().getRight();
+            Optional<Policies> policies = combined.getRight();
+
             PersistencePolicies persistencePolicies = null;
             RetentionPolicies retentionPolicies = null;
             OffloadPoliciesImpl topicLevelOffloadPolicies = null;
-            if (topicPolicies != null) {
-                persistencePolicies = topicPolicies.getPersistence();
-                retentionPolicies = topicPolicies.getRetentionPolicies();
-                topicLevelOffloadPolicies = topicPolicies.getOffloadPolicies();
+            if (topicP.isPresent() && topicP.get().getPersistence() != null) {
+                persistencePolicies = topicP.get().getPersistence();
+            } else if (globalTopicP.isPresent() && 
globalTopicP.get().getPersistence() != null) {
+                persistencePolicies = globalTopicP.get().getPersistence();
+            }
+            if (topicP.isPresent() && topicP.get().getRetentionPolicies() != 
null) {
+                retentionPolicies = topicP.get().getRetentionPolicies();
+            } else if (globalTopicP.isPresent() && 
globalTopicP.get().getRetentionPolicies() != null) {
+                retentionPolicies = globalTopicP.get().getRetentionPolicies();
+            }
+            if (topicP.isPresent() && topicP.get().getOffloadPolicies() != 
null) {
+                topicLevelOffloadPolicies = topicP.get().getOffloadPolicies();
+            } else if (globalTopicP.isPresent() && 
globalTopicP.get().getOffloadPolicies() != null) {
+                topicLevelOffloadPolicies = 
globalTopicP.get().getOffloadPolicies();
             }
 
             if (persistencePolicies == null) {
@@ -2057,6 +2068,13 @@ public class BrokerService implements Closeable {
             managedLedgerConfig.setNewEntriesCheckDelayInMillis(
                     
serviceConfig.getManagedLedgerNewEntriesCheckDelayInMillis());
             return managedLedgerConfig;
+        }).exceptionally(ex -> {
+            final Throwable rc = FutureUtil.unwrapCompletionException(ex);
+            final String errorInfo = String.format("Topic creation encountered 
an exception by initialize"
+                            + " topic policies service. topic_name=%s 
error_message=%s", topicName,
+                    rc.getMessage());
+            log.error(errorInfo, rc);
+            throw FutureUtil.wrapToCompletionException(new 
ServiceUnitNotReadyException(errorInfo));
         });
     }
 
@@ -3214,7 +3232,7 @@ public class BrokerService implements Closeable {
             checkOwnershipAndCreatePersistentTopic(topic,
                     true,
                     pendingFuture,
-                    pendingTopic.getProperties(), 
pendingTopic.getTopicPolicies());
+                    pendingTopic.getProperties());
             pendingFuture.handle((persistentTopic, ex) -> {
                 // release permit and process next pending topic
                 if (acquiredPermit) {
@@ -3821,6 +3839,5 @@ public class BrokerService implements Closeable {
         private final String topic;
         private final CompletableFuture<Optional<Topic>> topicFuture;
         private final Map<String, String> properties;
-        private final TopicPolicies topicPolicies;
     }
 }
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesService.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesService.java
index 88a4e1d755a..50eba440747 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesService.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesService.java
@@ -56,6 +56,7 @@ import org.apache.pulsar.common.events.PulsarEvent;
 import org.apache.pulsar.common.events.TopicPoliciesEvent;
 import org.apache.pulsar.common.naming.NamespaceBundle;
 import org.apache.pulsar.common.naming.NamespaceName;
+import org.apache.pulsar.common.naming.SystemTopicNames;
 import org.apache.pulsar.common.naming.TopicName;
 import org.apache.pulsar.common.policies.data.TopicPolicies;
 import org.apache.pulsar.common.util.FutureUtil;
@@ -312,6 +313,10 @@ public class SystemTopicBasedTopicPoliciesService 
implements TopicPoliciesServic
     public CompletableFuture<Optional<TopicPolicies>> 
getTopicPoliciesAsync(@NotNull TopicName topicName,
                                                                             
boolean isGlobal) {
         requireNonNull(topicName);
+        final var namespace = topicName.getNamespaceObject();
+        if (NamespaceService.isHeartbeatNamespace(namespace) || 
isSelf(topicName)) {
+            return CompletableFuture.completedFuture(Optional.empty());
+        }
         final CompletableFuture<Void> preparedFuture = 
prepareInitPoliciesCacheAsync(topicName.getNamespaceObject());
         return preparedFuture.thenApply(__ -> {
             final TopicPolicies candidatePolicies = isGlobal
@@ -818,4 +823,13 @@ public class SystemTopicBasedTopicPoliciesService 
implements TopicPoliciesServic
             readerCaches.clear();
         }
     }
+
+    private static boolean isSelf(TopicName topicName) {
+        final var localName = topicName.getLocalName();
+        if (!topicName.isPartitioned()) {
+            return 
localName.equals(SystemTopicNames.NAMESPACE_EVENTS_LOCAL_NAME);
+        }
+        final var index = 
localName.lastIndexOf(TopicName.PARTITIONED_TOPIC_SUFFIX);
+        return localName.substring(0, 
index).equals(SystemTopicNames.NAMESPACE_EVENTS_LOCAL_NAME);
+    }
 }
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
index 22f4afe910a..bc9bf6288a3 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
@@ -4129,10 +4129,14 @@ public class PersistentTopic extends AbstractTopic 
implements Topic, AddEntryCal
                 && 
brokerService.pulsar().getConfig().isTopicLevelPoliciesEnabled()) {
             brokerService.getPulsar().getTopicPoliciesService()
                     
.registerListener(TopicName.getPartitionedTopicName(topic), this);
-            return CompletableFuture.completedFuture(null).thenRunAsync(() -> 
onUpdate(
-                            brokerService.getPulsar().getTopicPoliciesService()
-                                    
.getTopicPoliciesIfExists(TopicName.getPartitionedTopicName(topic))),
-                    brokerService.getTopicOrderedExecutor());
+            final var topicPoliciesService = 
brokerService.getPulsar().getTopicPoliciesService();
+            final var partitionedTopicName = 
TopicName.getPartitionedTopicName(topic);
+            return 
topicPoliciesService.getTopicPoliciesAsync(partitionedTopicName, true)
+                    .thenAcceptAsync(optionalPolicies -> 
optionalPolicies.ifPresent(this::onUpdate),
+                            brokerService.getTopicOrderedExecutor())
+                    .thenCompose(__ -> 
topicPoliciesService.getTopicPoliciesAsync(partitionedTopicName, false))
+                    .thenAcceptAsync(optionalPolicies -> 
optionalPolicies.ifPresent(this::onUpdate),
+                            brokerService.getTopicOrderedExecutor());
         }
         return CompletableFuture.completedFuture(null);
     }
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TopicPoliciesTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TopicPoliciesTest.java
index 1d860fbc3fc..ade80f9b2ab 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TopicPoliciesTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TopicPoliciesTest.java
@@ -609,6 +609,114 @@ public class TopicPoliciesTest extends 
MockedPulsarServiceBaseTest {
         admin.topics().delete(topic, false);
     }
 
+    @Test
+    public void testGlobalPolicyStillAffectsAfterUnloading() throws Exception {
+        // create topic and load it up.
+        final String namespace = myNamespace;
+        final String topic = BrokerTestUtil.newUniqueName("persistent://" + 
namespace + "/tp");
+        final TopicName topicName = TopicName.get(topic);
+        admin.topics().createNonPartitionedTopic(topic);
+        pulsarClient.newProducer().topic(topic).create().close();
+        final SystemTopicBasedTopicPoliciesService topicPoliciesService
+                = (SystemTopicBasedTopicPoliciesService) 
pulsar.getTopicPoliciesService();
+
+        // Set non-global policy of the limitation of max consumers.
+        // Set global policy of the limitation of max producers.
+        admin.topicPolicies(false).setMaxConsumers(topic, 10);
+        admin.topicPolicies(true).setMaxProducers(topic, 20);
+        Awaitility.await().untilAsserted(() -> {
+            assertEquals(topicPoliciesService.getTopicPoliciesAsync(topicName, 
false).join().get()
+                    .getMaxConsumerPerTopic(), 10);
+            assertEquals(topicPoliciesService.getTopicPoliciesAsync(topicName, 
true).join().get()
+                    .getMaxProducerPerTopic(), 20);
+            PersistentTopic persistentTopic =
+                    (PersistentTopic) 
pulsar.getBrokerService().getTopics().get(topic).get().get();
+            HierarchyTopicPolicies hierarchyTopicPolicies = 
persistentTopic.getHierarchyTopicPolicies();
+            
assertEquals(hierarchyTopicPolicies.getMaxConsumerPerTopic().get(), 10);
+            
assertEquals(hierarchyTopicPolicies.getMaxProducersPerTopic().get(), 20);
+        });
+
+        // Reload topic and verify: both global policy and non-global policy 
affect.
+        admin.topics().unload(topic);
+        pulsarClient.newProducer().topic(topic).create().close();
+        Awaitility.await().untilAsserted(() -> {
+            PersistentTopic persistentTopic =
+                    (PersistentTopic) 
pulsar.getBrokerService().getTopics().get(topic).get().get();
+            HierarchyTopicPolicies hierarchyTopicPolicies = 
persistentTopic.getHierarchyTopicPolicies();
+            
assertEquals(hierarchyTopicPolicies.getMaxConsumerPerTopic().get(), 10);
+            
assertEquals(hierarchyTopicPolicies.getMaxProducersPerTopic().get(), 20);
+        });
+
+        // cleanup.
+        admin.topics().delete(topic, false);
+    }
+
+    @Test
+    public void testRetentionGlobalPolicyAffects() throws Exception {
+        // create topic and load it up.
+        final String namespace = myNamespace;
+        final String topic = BrokerTestUtil.newUniqueName("persistent://" + 
namespace + "/tp");
+        final TopicName topicName = TopicName.get(topic);
+        admin.topics().createNonPartitionedTopic(topic);
+        pulsarClient.newProducer().topic(topic).create().close();
+        final SystemTopicBasedTopicPoliciesService topicPoliciesService
+                = (SystemTopicBasedTopicPoliciesService) 
pulsar.getTopicPoliciesService();
+
+        // Set non-global policy of the limitation of max consumers.
+        // Set global policy of the persistence policies.
+        admin.topicPolicies(false).setMaxConsumers(topic, 10);
+        RetentionPolicies retentionPolicies = new RetentionPolicies(100, 200);
+        admin.topicPolicies(true).setRetention(topic, retentionPolicies);
+        Awaitility.await().atMost(20, TimeUnit.SECONDS).untilAsserted(() -> {
+            assertEquals(topicPoliciesService.getTopicPoliciesAsync(topicName, 
false).join().get()
+                    .getMaxConsumerPerTopic(), 10);
+            PersistentTopic persistentTopic =
+                    (PersistentTopic) 
pulsar.getBrokerService().getTopics().get(topic).get().get();
+            HierarchyTopicPolicies hierarchyTopicPolicies = 
persistentTopic.getHierarchyTopicPolicies();
+            
assertEquals(hierarchyTopicPolicies.getMaxConsumerPerTopic().get(), 10);
+            ManagedLedgerConfig mlConfig = 
persistentTopic.getManagedLedger().getConfig();
+            assertEquals(mlConfig.getRetentionTimeMillis(), 
TimeUnit.MINUTES.toMillis(100));
+            assertEquals(mlConfig.getRetentionSizeInMB(), 200);
+        });
+        PersistencePolicies persistencePolicy = new PersistencePolicies(3, 2, 
1, 4);
+        admin.topicPolicies(true).setPersistence(topic, persistencePolicy);
+        Awaitility.await().atMost(20, TimeUnit.SECONDS).untilAsserted(() -> {
+            assertEquals(topicPoliciesService.getTopicPoliciesAsync(topicName, 
false).join().get()
+                    .getMaxConsumerPerTopic(), 10);
+            PersistentTopic persistentTopic =
+                    (PersistentTopic) 
pulsar.getBrokerService().getTopics().get(topic).get().get();
+            HierarchyTopicPolicies hierarchyTopicPolicies = 
persistentTopic.getHierarchyTopicPolicies();
+            
assertEquals(hierarchyTopicPolicies.getMaxConsumerPerTopic().get(), 10);
+            ManagedLedgerConfig mlConfig = 
persistentTopic.getManagedLedger().getConfig();
+            assertEquals(mlConfig.getRetentionTimeMillis(), 
TimeUnit.MINUTES.toMillis(100));
+            assertEquals(mlConfig.getRetentionSizeInMB(), 200);
+            assertEquals(mlConfig.getEnsembleSize(), 3);
+            assertEquals(mlConfig.getWriteQuorumSize(), 2);
+            assertEquals(mlConfig.getAckQuorumSize(), 1);
+            assertEquals(mlConfig.getThrottleMarkDelete(), 4D);
+        });
+
+        // Reload topic and verify: retention policy of global policy affects.
+        admin.topics().unload(topic);
+        pulsarClient.newProducer().topic(topic).create().close();
+        Awaitility.await().untilAsserted(() -> {
+            PersistentTopic persistentTopic =
+                    (PersistentTopic) 
pulsar.getBrokerService().getTopics().get(topic).get().get();
+            HierarchyTopicPolicies hierarchyTopicPolicies = 
persistentTopic.getHierarchyTopicPolicies();
+            ManagedLedgerConfig mlConfig = 
persistentTopic.getManagedLedger().getConfig();
+            
assertEquals(hierarchyTopicPolicies.getMaxConsumerPerTopic().get(), 10);
+            assertEquals(mlConfig.getRetentionTimeMillis(), 
TimeUnit.MINUTES.toMillis(100));
+            assertEquals(mlConfig.getRetentionSizeInMB(), 200);
+            assertEquals(mlConfig.getEnsembleSize(), 3);
+            assertEquals(mlConfig.getWriteQuorumSize(), 2);
+            assertEquals(mlConfig.getAckQuorumSize(), 1);
+            assertEquals(mlConfig.getThrottleMarkDelete(), 4D);
+        });
+
+        // cleanup.
+        admin.topics().delete(topic, false);
+    }
+
     @Test(timeOut = 20000)
     public void testGetSizeBasedBacklogQuotaApplied() throws Exception {
         final String topic = testTopic + UUID.randomUUID();
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerServiceTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerServiceTest.java
index 8407c8d8cef..9be0c67f199 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerServiceTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerServiceTest.java
@@ -1166,7 +1166,7 @@ public class BrokerServiceTest extends BrokerTestBase {
 
         // try to create topic which should fail as bundle is disable
         CompletableFuture<Optional<Topic>> futureResult = 
pulsar.getBrokerService()
-                .loadOrCreatePersistentTopic(topicName, true, null, null);
+                .loadOrCreatePersistentTopic(topicName, true, null);
 
         try {
             futureResult.get();

Reply via email to