This is an automated email from the ASF dual-hosted git repository.

mattisonchao pushed a commit to branch branch-2.10
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/branch-2.10 by this push:
     new ec2f17aca56 [fix][broker][branch-2.10] Fix inconsistent topic policy 
(#21258)
ec2f17aca56 is described below

commit ec2f17aca565f102762ed3300c0ff393a31ff928
Author: Qiang Zhao <[email protected]>
AuthorDate: Sat Oct 7 14:36:44 2023 +0800

    [fix][broker][branch-2.10] Fix inconsistent topic policy (#21258)
---
 .../pulsar/broker/service/BrokerService.java       | 239 +++++++++++----------
 .../SystemTopicBasedTopicPoliciesService.java      |  99 ++++++---
 .../broker/service/TopicPoliciesService.java       |  40 ++++
 .../pulsar/broker/admin/TopicPoliciesTest.java     |   2 +-
 .../admin/TopicPoliciesWithBrokerRestartTest.java  | 112 ++++++++++
 .../SystemTopicBasedTopicPoliciesServiceTest.java  |   6 +-
 .../pulsar/client/api/BrokerServiceLookupTest.java |   4 +-
 7 files changed, 352 insertions(+), 150 deletions(-)

diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
index 08550886ecb..b028bfa3e4f 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
@@ -150,6 +150,7 @@ import 
org.apache.pulsar.common.policies.data.AutoSubscriptionCreationOverride;
 import org.apache.pulsar.common.policies.data.AutoTopicCreationOverride;
 import org.apache.pulsar.common.policies.data.BacklogQuota;
 import org.apache.pulsar.common.policies.data.ClusterData;
+import org.apache.pulsar.common.policies.data.LocalPolicies;
 import org.apache.pulsar.common.policies.data.OffloadPoliciesImpl;
 import org.apache.pulsar.common.policies.data.PersistencePolicies;
 import org.apache.pulsar.common.policies.data.PersistentOfflineTopicStats;
@@ -1577,137 +1578,141 @@ public class BrokerService implements Closeable {
         });
     }
 
-    public CompletableFuture<ManagedLedgerConfig> 
getManagedLedgerConfig(TopicName topicName) {
+    public CompletableFuture<ManagedLedgerConfig> 
getManagedLedgerConfig(@Nonnull TopicName topicName) {
+        requireNonNull(topicName);
         NamespaceName namespace = topicName.getNamespaceObject();
         ServiceConfiguration serviceConfig = pulsar.getConfiguration();
 
         NamespaceResources nsr = 
pulsar.getPulsarResources().getNamespaceResources();
         LocalPoliciesResources lpr = 
pulsar.getPulsarResources().getLocalPolicies();
-        return nsr.getPoliciesAsync(namespace)
-                .thenCombine(lpr.getLocalPoliciesAsync(namespace), (policies, 
localPolicies) -> {
-                    PersistencePolicies persistencePolicies = null;
-                    RetentionPolicies retentionPolicies = null;
-                    OffloadPoliciesImpl topicLevelOffloadPolicies = null;
-
-                    if (pulsar.getConfig().isTopicLevelPoliciesEnabled()
-                            && 
!NamespaceService.isSystemServiceNamespace(namespace.toString())) {
-                        try {
-                            TopicPolicies topicPolicies = 
pulsar.getTopicPoliciesService().getTopicPolicies(topicName);
-                            if (topicPolicies != null) {
-                                persistencePolicies = 
topicPolicies.getPersistence();
-                                retentionPolicies = 
topicPolicies.getRetentionPolicies();
-                                topicLevelOffloadPolicies = 
topicPolicies.getOffloadPolicies();
-                            }
-                        } catch 
(BrokerServiceException.TopicPoliciesCacheNotInitException e) {
-                            log.debug("Topic {} policies have not been 
initialized yet.", topicName);
-                        }
-                    }
-
-                    if (persistencePolicies == null) {
-                        persistencePolicies = policies.map(p -> 
p.persistence).orElseGet(
-                                () -> new 
PersistencePolicies(serviceConfig.getManagedLedgerDefaultEnsembleSize(),
-                                        
serviceConfig.getManagedLedgerDefaultWriteQuorum(),
-                                        
serviceConfig.getManagedLedgerDefaultAckQuorum(),
-                                        
serviceConfig.getManagedLedgerDefaultMarkDeleteRateLimit()));
-                    }
-
-                    if (retentionPolicies == null) {
-                        retentionPolicies = policies.map(p -> 
p.retention_policies).orElseGet(
-                                () -> new 
RetentionPolicies(serviceConfig.getDefaultRetentionTimeInMinutes(),
-                                        
serviceConfig.getDefaultRetentionSizeInMB())
-                        );
-                    }
-
+        final CompletableFuture<Optional<TopicPolicies>> topicPoliciesFuture;
+        if (pulsar.getConfig().isTopicLevelPoliciesEnabled()
+            && !NamespaceService.isSystemServiceNamespace(namespace.toString())
+            && 
!EventsTopicNames.isTopicPoliciesSystemTopic(topicName.toString())) {
+            topicPoliciesFuture = 
pulsar.getTopicPoliciesService().getTopicPoliciesAsync(topicName);
+        } else {
+            topicPoliciesFuture = 
CompletableFuture.completedFuture(Optional.empty());
+        }
+        return topicPoliciesFuture.thenCompose(topicPoliciesOptional -> {
+            final CompletableFuture<Optional<Policies>> nsPolicies = 
nsr.getPoliciesAsync(namespace);
+            final CompletableFuture<Optional<LocalPolicies>> lcPolicies = 
lpr.getLocalPoliciesAsync(namespace);
+            return nsPolicies.thenCombine(lcPolicies, (policies, 
localPolicies) -> {
+                PersistencePolicies persistencePolicies = null;
+                RetentionPolicies retentionPolicies = null;
+                OffloadPoliciesImpl topicLevelOffloadPolicies = null;
+                if (topicPoliciesOptional.isPresent()) {
+                    final TopicPolicies topicPolicies = 
topicPoliciesOptional.get();
+                    persistencePolicies = topicPolicies.getPersistence();
+                    retentionPolicies = topicPolicies.getRetentionPolicies();
+                    topicLevelOffloadPolicies = 
topicPolicies.getOffloadPolicies();
+                }
 
-                    ManagedLedgerConfig managedLedgerConfig = new 
ManagedLedgerConfig();
-                    
managedLedgerConfig.setEnsembleSize(persistencePolicies.getBookkeeperEnsemble());
-                    
managedLedgerConfig.setWriteQuorumSize(persistencePolicies.getBookkeeperWriteQuorum());
-                    
managedLedgerConfig.setAckQuorumSize(persistencePolicies.getBookkeeperAckQuorum());
-                    if (localPolicies.isPresent() && 
localPolicies.get().bookieAffinityGroup != null) {
-                        managedLedgerConfig
-                                .setBookKeeperEnsemblePlacementPolicyClassName(
-                                        
IsolatedBookieEnsemblePlacementPolicy.class);
-                        Map<String, Object> properties = Maps.newHashMap();
-                        
properties.put(IsolatedBookieEnsemblePlacementPolicy.ISOLATION_BOOKIE_GROUPS,
-                                
localPolicies.get().bookieAffinityGroup.getBookkeeperAffinityGroupPrimary());
-                        
properties.put(IsolatedBookieEnsemblePlacementPolicy.SECONDARY_ISOLATION_BOOKIE_GROUPS,
-                                
localPolicies.get().bookieAffinityGroup.getBookkeeperAffinityGroupSecondary());
-                        
managedLedgerConfig.setBookKeeperEnsemblePlacementPolicyProperties(properties);
-                    }
-                    
managedLedgerConfig.setThrottleMarkDelete(persistencePolicies.getManagedLedgerMaxMarkDeleteRate());
-                    
managedLedgerConfig.setDigestType(serviceConfig.getManagedLedgerDigestType());
-                    
managedLedgerConfig.setPassword(serviceConfig.getManagedLedgerPassword());
+                if (persistencePolicies == null) {
+                    persistencePolicies = policies.map(p -> 
p.persistence).orElseGet(
+                            () -> new 
PersistencePolicies(serviceConfig.getManagedLedgerDefaultEnsembleSize(),
+                                    
serviceConfig.getManagedLedgerDefaultWriteQuorum(),
+                                    
serviceConfig.getManagedLedgerDefaultAckQuorum(),
+                                    
serviceConfig.getManagedLedgerDefaultMarkDeleteRateLimit()));
+                }
 
-                    managedLedgerConfig
-                            
.setMaxUnackedRangesToPersist(serviceConfig.getManagedLedgerMaxUnackedRangesToPersist());
-                    managedLedgerConfig.setMaxUnackedRangesToPersistInZk(
-                            
serviceConfig.getManagedLedgerMaxUnackedRangesToPersistInZooKeeper());
-                    
managedLedgerConfig.setMaxEntriesPerLedger(serviceConfig.getManagedLedgerMaxEntriesPerLedger());
-                    managedLedgerConfig
-                            
.setMinimumRolloverTime(serviceConfig.getManagedLedgerMinLedgerRolloverTimeMinutes(),
-                                    TimeUnit.MINUTES);
-                    managedLedgerConfig
-                            
.setMaximumRolloverTime(serviceConfig.getManagedLedgerMaxLedgerRolloverTimeMinutes(),
-                                    TimeUnit.MINUTES);
-                    
managedLedgerConfig.setMaxSizePerLedgerMb(serviceConfig.getManagedLedgerMaxSizePerLedgerMbytes());
+                if (retentionPolicies == null) {
+                    retentionPolicies = policies.map(p -> 
p.retention_policies).orElseGet(
+                            () -> new 
RetentionPolicies(serviceConfig.getDefaultRetentionTimeInMinutes(),
+                                    
serviceConfig.getDefaultRetentionSizeInMB())
+                    );
+                }
 
-                    managedLedgerConfig.setMetadataOperationsTimeoutSeconds(
-                            
serviceConfig.getManagedLedgerMetadataOperationsTimeoutSeconds());
-                    managedLedgerConfig
-                            
.setReadEntryTimeoutSeconds(serviceConfig.getManagedLedgerReadEntryTimeoutSeconds());
-                    managedLedgerConfig
-                            
.setAddEntryTimeoutSeconds(serviceConfig.getManagedLedgerAddEntryTimeoutSeconds());
-                    
managedLedgerConfig.setMetadataEnsembleSize(serviceConfig.getManagedLedgerDefaultEnsembleSize());
-                    managedLedgerConfig.setUnackedRangesOpenCacheSetEnabled(
-                            
serviceConfig.isManagedLedgerUnackedRangesOpenCacheSetEnabled());
-                    
managedLedgerConfig.setMetadataWriteQuorumSize(serviceConfig.getManagedLedgerDefaultWriteQuorum());
-                    
managedLedgerConfig.setMetadataAckQuorumSize(serviceConfig.getManagedLedgerDefaultAckQuorum());
-                    managedLedgerConfig
-                            
.setMetadataMaxEntriesPerLedger(serviceConfig.getManagedLedgerCursorMaxEntriesPerLedger());
 
+                ManagedLedgerConfig managedLedgerConfig = new 
ManagedLedgerConfig();
+                
managedLedgerConfig.setEnsembleSize(persistencePolicies.getBookkeeperEnsemble());
+                
managedLedgerConfig.setWriteQuorumSize(persistencePolicies.getBookkeeperWriteQuorum());
+                
managedLedgerConfig.setAckQuorumSize(persistencePolicies.getBookkeeperAckQuorum());
+                if (localPolicies.isPresent() && 
localPolicies.get().bookieAffinityGroup != null) {
                     managedLedgerConfig
-                            
.setLedgerRolloverTimeout(serviceConfig.getManagedLedgerCursorRolloverTimeInSeconds());
-                    managedLedgerConfig
-                            
.setRetentionTime(retentionPolicies.getRetentionTimeInMinutes(), 
TimeUnit.MINUTES);
-                    
managedLedgerConfig.setRetentionSizeInMB(retentionPolicies.getRetentionSizeInMB());
-                    
managedLedgerConfig.setAutoSkipNonRecoverableData(serviceConfig.isAutoSkipNonRecoverableData());
-                    
managedLedgerConfig.setLazyCursorRecovery(serviceConfig.isLazyCursorRecovery());
-                    managedLedgerConfig.setInactiveLedgerRollOverTime(
-                            
serviceConfig.getManagedLedgerInactiveLedgerRolloverTimeSeconds(), 
TimeUnit.SECONDS);
-                    managedLedgerConfig.setCacheEvictionByMarkDeletedPosition(
-                            
serviceConfig.isCacheEvictionByMarkDeletedPosition());
-
-                    OffloadPoliciesImpl nsLevelOffloadPolicies =
-                            (OffloadPoliciesImpl) policies.map(p -> 
p.offload_policies).orElse(null);
-                    OffloadPoliciesImpl offloadPolicies = 
OffloadPoliciesImpl.mergeConfiguration(
-                            topicLevelOffloadPolicies,
-                            
OffloadPoliciesImpl.oldPoliciesCompatible(nsLevelOffloadPolicies, 
policies.orElse(null)),
-                            getPulsar().getConfig().getProperties());
-                    if 
(NamespaceService.isSystemServiceNamespace(namespace.toString())) {
-                        
managedLedgerConfig.setLedgerOffloader(NullLedgerOffloader.INSTANCE);
-                    } else  {
-                        if (topicLevelOffloadPolicies != null) {
-                            try {
-                                LedgerOffloader topicLevelLedgerOffLoader =
-                                        
pulsar().createManagedLedgerOffloader(offloadPolicies);
-                                
managedLedgerConfig.setLedgerOffloader(topicLevelLedgerOffLoader);
-                            } catch (PulsarServerException e) {
-                                throw new RuntimeException(e);
-                            }
-                        } else {
-                            //If the topic level policy is null, use the 
namespace level
-                            managedLedgerConfig
-                                    
.setLedgerOffloader(pulsar.getManagedLedgerOffloader(namespace, 
offloadPolicies));
+                            .setBookKeeperEnsemblePlacementPolicyClassName(
+                                    
IsolatedBookieEnsemblePlacementPolicy.class);
+                    Map<String, Object> properties = Maps.newHashMap();
+                    
properties.put(IsolatedBookieEnsemblePlacementPolicy.ISOLATION_BOOKIE_GROUPS,
+                            
localPolicies.get().bookieAffinityGroup.getBookkeeperAffinityGroupPrimary());
+                    
properties.put(IsolatedBookieEnsemblePlacementPolicy.SECONDARY_ISOLATION_BOOKIE_GROUPS,
+                            
localPolicies.get().bookieAffinityGroup.getBookkeeperAffinityGroupSecondary());
+                    
managedLedgerConfig.setBookKeeperEnsemblePlacementPolicyProperties(properties);
+                }
+                
managedLedgerConfig.setThrottleMarkDelete(persistencePolicies.getManagedLedgerMaxMarkDeleteRate());
+                
managedLedgerConfig.setDigestType(serviceConfig.getManagedLedgerDigestType());
+                
managedLedgerConfig.setPassword(serviceConfig.getManagedLedgerPassword());
+
+                managedLedgerConfig
+                        
.setMaxUnackedRangesToPersist(serviceConfig.getManagedLedgerMaxUnackedRangesToPersist());
+                managedLedgerConfig.setMaxUnackedRangesToPersistInZk(
+                        
serviceConfig.getManagedLedgerMaxUnackedRangesToPersistInZooKeeper());
+                
managedLedgerConfig.setMaxEntriesPerLedger(serviceConfig.getManagedLedgerMaxEntriesPerLedger());
+                managedLedgerConfig
+                        
.setMinimumRolloverTime(serviceConfig.getManagedLedgerMinLedgerRolloverTimeMinutes(),
+                                TimeUnit.MINUTES);
+                managedLedgerConfig
+                        
.setMaximumRolloverTime(serviceConfig.getManagedLedgerMaxLedgerRolloverTimeMinutes(),
+                                TimeUnit.MINUTES);
+                
managedLedgerConfig.setMaxSizePerLedgerMb(serviceConfig.getManagedLedgerMaxSizePerLedgerMbytes());
+
+                managedLedgerConfig.setMetadataOperationsTimeoutSeconds(
+                        
serviceConfig.getManagedLedgerMetadataOperationsTimeoutSeconds());
+                managedLedgerConfig
+                        
.setReadEntryTimeoutSeconds(serviceConfig.getManagedLedgerReadEntryTimeoutSeconds());
+                managedLedgerConfig
+                        
.setAddEntryTimeoutSeconds(serviceConfig.getManagedLedgerAddEntryTimeoutSeconds());
+                
managedLedgerConfig.setMetadataEnsembleSize(serviceConfig.getManagedLedgerDefaultEnsembleSize());
+                managedLedgerConfig.setUnackedRangesOpenCacheSetEnabled(
+                        
serviceConfig.isManagedLedgerUnackedRangesOpenCacheSetEnabled());
+                
managedLedgerConfig.setMetadataWriteQuorumSize(serviceConfig.getManagedLedgerDefaultWriteQuorum());
+                
managedLedgerConfig.setMetadataAckQuorumSize(serviceConfig.getManagedLedgerDefaultAckQuorum());
+                managedLedgerConfig
+                        
.setMetadataMaxEntriesPerLedger(serviceConfig.getManagedLedgerCursorMaxEntriesPerLedger());
+
+                managedLedgerConfig
+                        
.setLedgerRolloverTimeout(serviceConfig.getManagedLedgerCursorRolloverTimeInSeconds());
+                managedLedgerConfig
+                        
.setRetentionTime(retentionPolicies.getRetentionTimeInMinutes(), 
TimeUnit.MINUTES);
+                
managedLedgerConfig.setRetentionSizeInMB(retentionPolicies.getRetentionSizeInMB());
+                
managedLedgerConfig.setAutoSkipNonRecoverableData(serviceConfig.isAutoSkipNonRecoverableData());
+                
managedLedgerConfig.setLazyCursorRecovery(serviceConfig.isLazyCursorRecovery());
+                managedLedgerConfig.setInactiveLedgerRollOverTime(
+                        
serviceConfig.getManagedLedgerInactiveLedgerRolloverTimeSeconds(), 
TimeUnit.SECONDS);
+                managedLedgerConfig.setCacheEvictionByMarkDeletedPosition(
+                        serviceConfig.isCacheEvictionByMarkDeletedPosition());
+
+                OffloadPoliciesImpl nsLevelOffloadPolicies =
+                        (OffloadPoliciesImpl) policies.map(p -> 
p.offload_policies).orElse(null);
+                OffloadPoliciesImpl offloadPolicies = 
OffloadPoliciesImpl.mergeConfiguration(
+                        topicLevelOffloadPolicies,
+                        
OffloadPoliciesImpl.oldPoliciesCompatible(nsLevelOffloadPolicies, 
policies.orElse(null)),
+                        getPulsar().getConfig().getProperties());
+                if 
(NamespaceService.isSystemServiceNamespace(namespace.toString())) {
+                    
managedLedgerConfig.setLedgerOffloader(NullLedgerOffloader.INSTANCE);
+                } else  {
+                    if (topicLevelOffloadPolicies != null) {
+                        try {
+                            LedgerOffloader topicLevelLedgerOffLoader =
+                                    
pulsar().createManagedLedgerOffloader(offloadPolicies);
+                            
managedLedgerConfig.setLedgerOffloader(topicLevelLedgerOffLoader);
+                        } catch (PulsarServerException e) {
+                            throw new RuntimeException(e);
                         }
+                    } else {
+                        //If the topic level policy is null, use the namespace 
level
+                        managedLedgerConfig
+                                
.setLedgerOffloader(pulsar.getManagedLedgerOffloader(namespace, 
offloadPolicies));
                     }
+                }
 
-                    managedLedgerConfig.setDeletionAtBatchIndexLevelEnabled(
-                            
serviceConfig.isAcknowledgmentAtBatchIndexLevelEnabled());
-                    managedLedgerConfig.setNewEntriesCheckDelayInMillis(
-                            
serviceConfig.getManagedLedgerNewEntriesCheckDelayInMillis());
+                managedLedgerConfig.setDeletionAtBatchIndexLevelEnabled(
+                        
serviceConfig.isAcknowledgmentAtBatchIndexLevelEnabled());
+                managedLedgerConfig.setNewEntriesCheckDelayInMillis(
+                        
serviceConfig.getManagedLedgerNewEntriesCheckDelayInMillis());
 
-                    return managedLedgerConfig;
-                });
+                return managedLedgerConfig;
+            });
+        });
     }
 
     private void addTopicToStatsMaps(TopicName topicName, Topic topic) {
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesService.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesService.java
index 419c6d88b22..daa2b249a9c 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesService.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesService.java
@@ -18,6 +18,7 @@
  */
 package org.apache.pulsar.broker.service;
 
+import static java.util.Objects.requireNonNull;
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.collect.Lists;
 import com.google.common.collect.Sets;
@@ -25,6 +26,7 @@ import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Objects;
+import java.util.Optional;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.TimeUnit;
@@ -78,8 +80,8 @@ public class SystemTopicBasedTopicPoliciesService implements 
TopicPoliciesServic
 
     private final Map<NamespaceName, 
CompletableFuture<SystemTopicClient.Reader<PulsarEvent>>>
             readerCaches = new ConcurrentHashMap<>();
-    @VisibleForTesting
-    final Map<NamespaceName, Boolean> policyCacheInitMap = new 
ConcurrentHashMap<>();
+
+    final Map<NamespaceName, CompletableFuture<Void>> policyCacheInitMap = new 
ConcurrentHashMap<>();
 
     @VisibleForTesting
     final Map<TopicName, List<TopicPolicyListener<TopicPolicies>>> listeners = 
new ConcurrentHashMap<>();
@@ -219,12 +221,12 @@ public class SystemTopicBasedTopicPoliciesService 
implements TopicPoliciesServic
                                           boolean isGlobal) throws 
TopicPoliciesCacheNotInitException {
         if (!policyCacheInitMap.containsKey(topicName.getNamespaceObject())) {
             NamespaceName namespace = topicName.getNamespaceObject();
-            prepareInitPoliciesCache(namespace, new CompletableFuture<>());
+            prepareInitPoliciesCacheAsync(namespace);
         }
 
         MutablePair<TopicPoliciesCacheNotInitException, TopicPolicies> result 
= new MutablePair<>();
         policyCacheInitMap.compute(topicName.getNamespaceObject(), (k, 
initialized) -> {
-            if (initialized == null || !initialized) {
+            if (initialized == null || !initialized.isDone()) {
                 result.setLeft(new TopicPoliciesCacheNotInitException());
             } else {
                 TopicPolicies topicPolicies =
@@ -242,6 +244,34 @@ public class SystemTopicBasedTopicPoliciesService 
implements TopicPoliciesServic
         }
     }
 
+    @Nonnull
+    @Override
+    public CompletableFuture<Optional<TopicPolicies>> 
getTopicPoliciesAsync(@Nonnull TopicName topicName,
+                                                                            
boolean isGlobal) {
+        requireNonNull(topicName);
+        final CompletableFuture<Void> preparedFuture = 
prepareInitPoliciesCacheAsync(topicName.getNamespaceObject());
+        return preparedFuture.thenApply(__ -> {
+            final TopicPolicies candidatePolicies = isGlobal
+                    ? 
globalPoliciesCache.get(TopicName.get(topicName.getPartitionedTopicName()))
+                    : 
policiesCache.get(TopicName.get(topicName.getPartitionedTopicName()));
+            return Optional.ofNullable(candidatePolicies);
+        });
+    }
+
+    @Nonnull
+    @Override
+    public CompletableFuture<Optional<TopicPolicies>> 
getTopicPoliciesAsync(@Nonnull TopicName topicName) {
+        requireNonNull(topicName);
+        final CompletableFuture<Void> preparedFuture = 
prepareInitPoliciesCacheAsync(topicName.getNamespaceObject());
+        return preparedFuture.thenApply(__ -> {
+            final TopicPolicies localPolicies = 
policiesCache.get(TopicName.get(topicName.getPartitionedTopicName()));
+            if (localPolicies != null) {
+                return Optional.of(localPolicies);
+            }
+            return 
Optional.ofNullable(globalPoliciesCache.get(TopicName.get(topicName.getPartitionedTopicName())));
+        });
+    }
+
     @Override
     public TopicPolicies getTopicPoliciesIfExists(TopicName topicName) {
         return 
policiesCache.get(TopicName.get(topicName.getPartitionedTopicName()));
@@ -265,40 +295,49 @@ public class SystemTopicBasedTopicPoliciesService 
implements TopicPoliciesServic
 
     @Override
     public CompletableFuture<Void> 
addOwnedNamespaceBundleAsync(NamespaceBundle namespaceBundle) {
-        CompletableFuture<Void> result = new CompletableFuture<>();
         NamespaceName namespace = namespaceBundle.getNamespaceObject();
         if (NamespaceService.checkHeartbeatNamespace(namespace) != null
                 || NamespaceService.checkHeartbeatNamespaceV2(namespace) != 
null) {
-            result.complete(null);
-            return result;
+            return CompletableFuture.completedFuture(null);
         }
         synchronized (this) {
             if (readerCaches.get(namespace) != null) {
                 ownedBundlesCountPerNamespace.get(namespace).incrementAndGet();
-                result.complete(null);
+                return CompletableFuture.completedFuture(null);
             } else {
-                prepareInitPoliciesCache(namespace, result);
+                return prepareInitPoliciesCacheAsync(namespace);
             }
         }
-        return result;
     }
 
-    private void prepareInitPoliciesCache(@Nonnull NamespaceName namespace, 
CompletableFuture<Void> result) {
-        if (policyCacheInitMap.putIfAbsent(namespace, false) == null) {
-            CompletableFuture<SystemTopicClient.Reader<PulsarEvent>> 
readerCompletableFuture =
+    private @Nonnull CompletableFuture<Void> 
prepareInitPoliciesCacheAsync(@Nonnull NamespaceName namespace) {
+        requireNonNull(namespace);
+        return policyCacheInitMap.computeIfAbsent(namespace, (k) -> {
+            final CompletableFuture<SystemTopicClient.Reader<PulsarEvent>> 
readerCompletableFuture =
                     createSystemTopicClientWithRetry(namespace);
             readerCaches.put(namespace, readerCompletableFuture);
             ownedBundlesCountPerNamespace.putIfAbsent(namespace, new 
AtomicInteger(1));
-            readerCompletableFuture.thenAccept(reader -> {
-                initPolicesCache(reader, result);
-                result.thenRun(() -> readMorePolicies(reader));
-            }).exceptionally(ex -> {
-                log.error("[{}] Failed to create reader on __change_events 
topic", namespace, ex);
-                cleanCacheAndCloseReader(namespace, false);
-                result.completeExceptionally(ex);
+            final CompletableFuture<Void> initFuture = readerCompletableFuture
+                    .thenCompose(reader -> {
+                        final CompletableFuture<Void> stageFuture = new 
CompletableFuture<>();
+                        initPolicesCache(reader, stageFuture);
+                        return stageFuture
+                                // Read policies in background
+                                .thenAccept(__ -> 
readMorePoliciesAsync(reader));
+                    });
+            initFuture.exceptionally(ex -> {
+                try {
+                    log.error("[{}] Failed to create reader on __change_events 
topic", namespace, ex);
+                    cleanCacheAndCloseReader(namespace, false);
+                } catch (Throwable cleanupEx) {
+                    // Adding this catch to avoid break callback chain
+                    log.error("[{}] Failed to cleanup reader on 
__change_events topic", namespace, cleanupEx);
+                }
                 return null;
             });
-        }
+            // let caller know we've got an exception.
+            return initFuture;
+        });
     }
 
     protected CompletableFuture<SystemTopicClient.Reader<PulsarEvent>> 
createSystemTopicClientWithRetry(
@@ -382,8 +421,7 @@ public class SystemTopicBasedTopicPoliciesService 
implements TopicPoliciesServic
                 if (log.isDebugEnabled()) {
                     log.debug("[{}] Reach the end of the system topic.", 
reader.getSystemTopic().getTopicName());
                 }
-                policyCacheInitMap.computeIfPresent(
-                        
reader.getSystemTopic().getTopicName().getNamespaceObject(), (k, v) -> true);
+
                 // replay policy message
                 policiesCache.forEach(((topicName, topicPolicies) -> {
                     if (listeners.get(topicName) != null) {
@@ -396,6 +434,7 @@ public class SystemTopicBasedTopicPoliciesService 
implements TopicPoliciesServic
                         }
                     }
                 }));
+
                 future.complete(null);
             }
         });
@@ -421,7 +460,13 @@ public class SystemTopicBasedTopicPoliciesService 
implements TopicPoliciesServic
         });
     }
 
-    private void readMorePolicies(SystemTopicClient.Reader<PulsarEvent> 
reader) {
+    /**
+     * This is an async method for the background reader to continue syncing 
new messages.
+     *
+     * Note: You should not do any blocking call here. because it will affect
+     * #{@link 
SystemTopicBasedTopicPoliciesService#getTopicPoliciesAsync(TopicName)} method 
to block loading topic.
+     */
+    private void readMorePoliciesAsync(SystemTopicClient.Reader<PulsarEvent> 
reader) {
         reader.readNextAsync()
                 .thenAccept(msg -> {
                     refreshTopicPoliciesCache(msg);
@@ -429,7 +474,7 @@ public class SystemTopicBasedTopicPoliciesService 
implements TopicPoliciesServic
                 })
                 .whenComplete((__, ex) -> {
                     if (ex == null) {
-                        readMorePolicies(reader);
+                        readMorePoliciesAsync(reader);
                     } else {
                         Throwable cause = 
FutureUtil.unwrapCompletionException(ex);
                         if (cause instanceof 
PulsarClientException.AlreadyClosedException) {
@@ -438,7 +483,7 @@ public class SystemTopicBasedTopicPoliciesService 
implements TopicPoliciesServic
                                     
reader.getSystemTopic().getTopicName().getNamespaceObject(), false);
                         } else {
                             log.warn("Read more topic polices exception, read 
again.", ex);
-                            readMorePolicies(reader);
+                            readMorePoliciesAsync(reader);
                         }
                     }
                 });
@@ -591,7 +636,7 @@ public class SystemTopicBasedTopicPoliciesService 
implements TopicPoliciesServic
     }
 
     @VisibleForTesting
-    public Boolean getPoliciesCacheInit(NamespaceName namespaceName) {
+    public CompletableFuture<Void> getPoliciesCacheInit(NamespaceName 
namespaceName) {
         return policyCacheInitMap.get(namespaceName);
     }
 
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/TopicPoliciesService.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/TopicPoliciesService.java
index 5b2aa6e8ce9..98c8e71bee0 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/TopicPoliciesService.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/TopicPoliciesService.java
@@ -22,6 +22,7 @@ import java.util.Optional;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.TimeUnit;
+import javax.annotation.Nonnull;
 import 
org.apache.pulsar.broker.service.BrokerServiceException.TopicPoliciesCacheNotInitException;
 import org.apache.pulsar.client.impl.Backoff;
 import org.apache.pulsar.client.impl.BackoffBuilder;
@@ -109,6 +110,32 @@ public interface TopicPoliciesService {
         return response;
     }
 
+    /**
+     * Asynchronously retrieves topic policies.
+     * This triggers the Pulsar broker's internal client to load policies from 
the
+     * system topic `persistent://tenant/namespace/__change_event`.
+     *
+     * @param topicName The name of the topic.
+     * @param isGlobal Indicates if the policies are global.
+     * @return A CompletableFuture containing an Optional of TopicPolicies.
+     * @throws NullPointerException If the topicName is null.
+     */
+    @Nonnull
+    CompletableFuture<Optional<TopicPolicies>> getTopicPoliciesAsync(@Nonnull 
TopicName topicName, boolean isGlobal);
+
+    /**
+     * Asynchronously retrieves topic policies.
+     * This triggers the Pulsar broker's internal client to load policies from 
the
+     * system topic `persistent://tenant/namespace/__change_event`.
+     *
+     * NOTE: If local policies are not available, it will fallback to using 
topic global policies.
+     * @param topicName The name of the topic.
+     * @return A CompletableFuture containing an Optional of TopicPolicies.
+     * @throws NullPointerException If the topicName is null.
+     */
+    @Nonnull
+    CompletableFuture<Optional<TopicPolicies>> getTopicPoliciesAsync(@Nonnull 
TopicName topicName);
+
     /**
      * Get policies for a topic without cache async.
      * @param topicName topic name
@@ -162,6 +189,19 @@ public interface TopicPoliciesService {
             return null;
         }
 
+        @Nonnull
+        @Override
+        public CompletableFuture<Optional<TopicPolicies>> 
getTopicPoliciesAsync(@Nonnull TopicName topicName,
+                                                                               
 boolean isGlobal) {
+            return CompletableFuture.completedFuture(Optional.empty());
+        }
+
+        @Nonnull
+        @Override
+        public CompletableFuture<Optional<TopicPolicies>> 
getTopicPoliciesAsync(@Nonnull TopicName topicName) {
+            return CompletableFuture.completedFuture(Optional.empty());
+        }
+
         @Override
         public TopicPolicies getTopicPoliciesIfExists(TopicName topicName) {
             return null;
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TopicPoliciesTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TopicPoliciesTest.java
index f3e3188de4b..adfa95e8f4d 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TopicPoliciesTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TopicPoliciesTest.java
@@ -169,7 +169,7 @@ public class TopicPoliciesTest extends 
MockedPulsarServiceBaseTest {
         assertFalse(pulsar.getBrokerService().getTopics().containsKey(topic));
         //make sure namespace policy reader is fully started.
         Awaitility.await().untilAsserted(()-> {
-            
assertTrue(policyService.getPoliciesCacheInit(topicName.getNamespaceObject()));
+            
assertTrue(policyService.getPoliciesCacheInit(topicName.getNamespaceObject()).isDone());
         });
 
         //load the topic.
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TopicPoliciesWithBrokerRestartTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TopicPoliciesWithBrokerRestartTest.java
new file mode 100644
index 00000000000..f39981eec48
--- /dev/null
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TopicPoliciesWithBrokerRestartTest.java
@@ -0,0 +1,112 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.admin;
+
+import lombok.Cleanup;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl;
+import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest;
+import org.apache.pulsar.broker.service.persistent.PersistentTopic;
+import org.apache.pulsar.client.api.Producer;
+import org.apache.pulsar.common.policies.data.RetentionPolicies;
+import org.awaitility.Awaitility;
+import org.testng.Assert;
+import org.testng.annotations.AfterClass;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Test;
+import java.nio.charset.StandardCharsets;
+import java.util.concurrent.TimeUnit;
+
+@Slf4j
+@Test(groups = "broker-admin")
+public class TopicPoliciesWithBrokerRestartTest extends 
MockedPulsarServiceBaseTest {
+
+
+    @Override
+    protected void doInitConf() throws Exception {
+        super.doInitConf();
+        conf.setSystemTopicEnabled(true);
+        conf.setTopicLevelPoliciesEnabled(true);
+    }
+
+    @Override
+    @BeforeClass(alwaysRun = true)
+    protected void setup() throws Exception {
+        super.internalSetup();
+        setupDefaultTenantAndNamespace();
+    }
+
+    @Override
+    @AfterClass(alwaysRun = true)
+    protected void cleanup() throws Exception {
+        super.internalCleanup();
+    }
+
+
+    @Test
+    public void testRetentionWithBrokerRestart() throws Exception {
+        final int messages = 1_000;
+        final int topicNum = 500;
+        // (1) Init topic
+        admin.namespaces().createNamespace("public/retention");
+        final String topicName = 
"persistent://public/retention/retention_with_broker_restart";
+        admin.topics().createNonPartitionedTopic(topicName);
+        for (int i = 0; i < topicNum; i++) {
+            final String shadowTopicNames = topicName + "_" + i;
+            admin.topics().createNonPartitionedTopic(shadowTopicNames);
+        }
+        // (2) Set retention
+        final RetentionPolicies retentionPolicies = new RetentionPolicies(20, 
20);
+        for (int i = 0; i < topicNum; i++) {
+            final String shadowTopicNames = topicName + "_" + i;
+            admin.topicPolicies().setRetention(shadowTopicNames, 
retentionPolicies);
+        }
+        admin.topicPolicies().setRetention(topicName, retentionPolicies);
+        // (3) Send messages
+        @Cleanup
+        final Producer<byte[]> publisher = pulsarClient.newProducer()
+                .topic(topicName)
+                .create();
+        for (int i = 0; i < messages; i++) {
+            publisher.send((i + "").getBytes(StandardCharsets.UTF_8));
+        }
+        // (4) Check configuration
+        Awaitility.await().untilAsserted(() -> {
+            final PersistentTopic persistentTopic1 = (PersistentTopic)
+                    pulsar.getBrokerService().getTopic(topicName, 
true).join().get();
+            final ManagedLedgerImpl managedLedger1 = (ManagedLedgerImpl) 
persistentTopic1.getManagedLedger();
+            
Assert.assertEquals(managedLedger1.getConfig().getRetentionSizeInMB(), 20);
+            
Assert.assertEquals(managedLedger1.getConfig().getRetentionTimeMillis(),
+                    TimeUnit.MINUTES.toMillis(20));
+        });
+        // (5) Restart broker
+        restartBroker();
+        // (6) Check configuration again
+        for (int i = 0; i < topicNum; i++) {
+            final String shadowTopicNames = topicName + "_" + i;
+            admin.lookups().lookupTopic(shadowTopicNames);
+            final PersistentTopic persistentTopicTmp = (PersistentTopic)
+                    pulsar.getBrokerService().getTopic(shadowTopicNames, 
true).join().get();
+            final ManagedLedgerImpl managedLedgerTemp = (ManagedLedgerImpl) 
persistentTopicTmp.getManagedLedger();
+            
Assert.assertEquals(managedLedgerTemp.getConfig().getRetentionSizeInMB(), 20);
+            
Assert.assertEquals(managedLedgerTemp.getConfig().getRetentionTimeMillis(),
+                    TimeUnit.MINUTES.toMillis(20));
+        }
+    }
+}
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesServiceTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesServiceTest.java
index c90c4a3a5bc..e69230b2d2d 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesServiceTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesServiceTest.java
@@ -142,7 +142,7 @@ public class SystemTopicBasedTopicPoliciesServiceTest 
extends MockedPulsarServic
         // Wait for all topic policies updated.
         Awaitility.await().untilAsserted(() ->
                 Assert.assertTrue(systemTopicBasedTopicPoliciesService
-                        .getPoliciesCacheInit(TOPIC1.getNamespaceObject())));
+                        
.getPoliciesCacheInit(TOPIC1.getNamespaceObject()).isDone()));
 
         // Assert broker is cache all topic policies
         Awaitility.await().untilAsserted(() ->
@@ -305,8 +305,8 @@ public class SystemTopicBasedTopicPoliciesServiceTest 
extends MockedPulsarServic
     @Test
     public void testGetPolicyTimeout() throws Exception {
         SystemTopicBasedTopicPoliciesService service = 
(SystemTopicBasedTopicPoliciesService) pulsar.getTopicPoliciesService();
-        Awaitility.await().untilAsserted(() -> 
assertTrue(service.policyCacheInitMap.get(TOPIC1.getNamespaceObject())));
-        service.policyCacheInitMap.put(TOPIC1.getNamespaceObject(), false);
+        Awaitility.await().untilAsserted(() -> 
assertTrue(service.policyCacheInitMap.get(TOPIC1.getNamespaceObject()).isDone()));
+        service.policyCacheInitMap.put(TOPIC1.getNamespaceObject(), new 
CompletableFuture<>());
         long start = System.currentTimeMillis();
         Backoff backoff = new BackoffBuilder()
                 .setInitialTime(500, TimeUnit.MILLISECONDS)
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/BrokerServiceLookupTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/BrokerServiceLookupTest.java
index 3b018e70481..dac7f4ab2aa 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/BrokerServiceLookupTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/BrokerServiceLookupTest.java
@@ -923,9 +923,9 @@ public class BrokerServiceLookupTest extends 
ProducerConsumerBase {
     }
 
     private int calculateLookupRequestCount() throws Exception {
-        int failures = 
CollectorRegistry.defaultRegistry.getSampleValue("pulsar_broker_lookup_failures_total")
+        int failures = 
CollectorRegistry.defaultRegistry.getSampleValue("pulsar_broker_lookup_failures")
                 .intValue();
-        int answers = 
CollectorRegistry.defaultRegistry.getSampleValue("pulsar_broker_lookup_answers_total")
+        int answers = 
CollectorRegistry.defaultRegistry.getSampleValue("pulsar_broker_lookup_answers")
                 .intValue();
         return failures + answers;
     }


Reply via email to