This is an automated email from the ASF dual-hosted git repository.

lhotari pushed a commit to branch branch-4.0
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit 0868e21eb347f6c50302c8504f6bd640c1013101
Author: 道君- Tao Jiuming <[email protected]>
AuthorDate: Tue Sep 9 11:31:12 2025 +0800

    [improve][broker] Optimize Reader creation in TopicPoliciesService (#24658)
    
    Co-authored-by: Zixuan Liu <[email protected]>
    (cherry picked from commit 0cda4f400b0cb785447fc1a64edb4f97c17c309a)
---
 .../SystemTopicBasedTopicPoliciesService.java      | 93 ++++++++++++++++++----
 .../pulsar/broker/admin/TopicPoliciesTest.java     |  6 ++
 2 files changed, 84 insertions(+), 15 deletions(-)

diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesService.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesService.java
index c745d591a4e..8287583a3d7 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesService.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesService.java
@@ -579,8 +579,7 @@ public class SystemTopicBasedTopicPoliciesService 
implements TopicPoliciesServic
 
                             return 
policyCacheInitMap.computeIfAbsent(namespace, (k) -> {
                                 final 
CompletableFuture<SystemTopicClient.Reader<PulsarEvent>> 
readerCompletableFuture =
-                                        createSystemTopicClient(namespace);
-                                readerCaches.put(namespace, 
readerCompletableFuture);
+                                        newReader(namespace);
                                 final CompletableFuture<Void> initFuture = 
readerCompletableFuture
                                         .thenCompose(reader -> {
                                             final CompletableFuture<Void> 
stageFuture = new CompletableFuture<>();
@@ -594,9 +593,8 @@ public class SystemTopicBasedTopicPoliciesService 
implements TopicPoliciesServic
                                         if (closed.get()) {
                                             return null;
                                         }
-                                        log.error("[{}] Failed to create 
reader on __change_events topic",
-                                                namespace, ex);
-                                        cleanCacheAndCloseReader(namespace, 
false);
+                                        cleanPoliciesCacheInitMap(
+                                                namespace, 
readerCompletableFuture.isCompletedExceptionally());
                                     } catch (Throwable cleanupEx) {
                                         // Adding this catch to avoid break 
callback chain
                                         log.error("[{}] Failed to cleanup 
reader on __change_events topic",
@@ -610,6 +608,20 @@ public class SystemTopicBasedTopicPoliciesService 
implements TopicPoliciesServic
                         });
     }
 
+    private CompletableFuture<SystemTopicClient.Reader<PulsarEvent>> 
newReader(NamespaceName ns) {
+        return readerCaches.compute(ns, (__, existingFuture) -> {
+            if (existingFuture == null) {
+                return createSystemTopicClient(ns);
+            }
+
+            if (existingFuture.isDone() && 
existingFuture.isCompletedExceptionally()) {
+                return existingFuture.exceptionallyCompose(ex ->
+                        isAlreadyClosedException(ex) ? existingFuture : 
createSystemTopicClient(ns));
+            }
+            return existingFuture;
+        });
+    }
+
     protected CompletableFuture<SystemTopicClient.Reader<PulsarEvent>> 
createSystemTopicClient(
             NamespaceName namespace) {
         if (closed.get()) {
@@ -633,7 +645,9 @@ public class SystemTopicBasedTopicPoliciesService 
implements TopicPoliciesServic
         }
         AtomicInteger bundlesCount = 
ownedBundlesCountPerNamespace.get(namespace);
         if (bundlesCount == null || bundlesCount.decrementAndGet() <= 0) {
-            cleanCacheAndCloseReader(namespace, true, true);
+            cleanPoliciesCacheInitMap(namespace, true);
+            cleanWriterCache(namespace);
+            cleanOwnedBundlesCount(namespace);
         }
     }
 
@@ -665,7 +679,7 @@ public class SystemTopicBasedTopicPoliciesService 
implements TopicPoliciesServic
     private void initPolicesCache(SystemTopicClient.Reader<PulsarEvent> 
reader, CompletableFuture<Void> future) {
         if (closed.get()) {
             future.completeExceptionally(new 
BrokerServiceException(getClass().getName() + " is closed."));
-            
cleanCacheAndCloseReader(reader.getSystemTopic().getTopicName().getNamespaceObject(),
 false);
+            
cleanPoliciesCacheInitMap(reader.getSystemTopic().getTopicName().getNamespaceObject(),
 true);
             return;
         }
         reader.hasMoreEventsAsync().whenComplete((hasMore, ex) -> {
@@ -673,7 +687,8 @@ public class SystemTopicBasedTopicPoliciesService 
implements TopicPoliciesServic
                 log.error("[{}] Failed to check the move events for the system 
topic",
                         reader.getSystemTopic().getTopicName(), ex);
                 future.completeExceptionally(ex);
-                
cleanCacheAndCloseReader(reader.getSystemTopic().getTopicName().getNamespaceObject(),
 false);
+                
cleanPoliciesCacheInitMap(reader.getSystemTopic().getTopicName().getNamespaceObject(),
+                        isAlreadyClosedException(ex));
                 return;
             }
             if (hasMore) {
@@ -692,7 +707,8 @@ public class SystemTopicBasedTopicPoliciesService 
implements TopicPoliciesServic
                     log.error("[{}] Failed to read event from the system 
topic.",
                             reader.getSystemTopic().getTopicName(), e);
                     future.completeExceptionally(e);
-                    
cleanCacheAndCloseReader(reader.getSystemTopic().getTopicName().getNamespaceObject(),
 false);
+                    
cleanPoliciesCacheInitMap(reader.getSystemTopic().getTopicName().getNamespaceObject(),
+                            isAlreadyClosedException(ex));
                     return null;
                 });
             } else {
@@ -718,10 +734,45 @@ public class SystemTopicBasedTopicPoliciesService 
implements TopicPoliciesServic
         });
     }
 
-    private void cleanCacheAndCloseReader(@NonNull NamespaceName namespace, 
boolean cleanOwnedBundlesCount) {
-        cleanCacheAndCloseReader(namespace, cleanOwnedBundlesCount, false);
+
+    private void cleanPoliciesCacheInitMap(@NonNull NamespaceName namespace, 
boolean closeReader) {
+        if (!closeReader) {
+            policyCacheInitMap.remove(namespace);
+            return;
+        }
+
+        TopicPolicyMessageHandlerTracker topicPolicyMessageHandlerTracker =
+                topicPolicyMessageHandlerTrackers.remove(namespace);
+        if (topicPolicyMessageHandlerTracker != null) {
+            topicPolicyMessageHandlerTracker.close();
+        }
+
+        CompletableFuture<SystemTopicClient.Reader<PulsarEvent>> readerFuture 
= readerCaches.remove(namespace);
+        policyCacheInitMap.compute(namespace, (k, v) -> {
+            policiesCache.entrySet().removeIf(entry -> 
Objects.equals(entry.getKey().getNamespaceObject(), namespace));
+            globalPoliciesCache.entrySet()
+                    .removeIf(entry -> 
Objects.equals(entry.getKey().getNamespaceObject(), namespace));
+            return null;
+        });
+        if (readerFuture != null && !readerFuture.isCompletedExceptionally()) {
+            readerFuture
+                    .thenCompose(SystemTopicClient.Reader::closeAsync)
+                    .exceptionally(ex -> {
+                        log.warn("[{}] Close change_event reader fail.", 
namespace, ex);
+                        return null;
+                    });
+        }
+    }
+
+    private void cleanWriterCache(@NonNull NamespaceName namespace) {
+        writerCaches.synchronous().invalidate(namespace);
     }
 
+    private void cleanOwnedBundlesCount(@NonNull NamespaceName namespace) {
+        ownedBundlesCountPerNamespace.remove(namespace);
+    }
+
+
     private void cleanCacheAndCloseReader(@NonNull NamespaceName namespace, 
boolean cleanOwnedBundlesCount,
                                           boolean cleanWriterCache) {
         if (cleanWriterCache) {
@@ -754,6 +805,9 @@ public class SystemTopicBasedTopicPoliciesService 
implements TopicPoliciesServic
         });
     }
 
+
+
+
     /**
      * This is an async method for the background reader to continue syncing 
new messages.
      *
@@ -763,7 +817,7 @@ public class SystemTopicBasedTopicPoliciesService 
implements TopicPoliciesServic
     private void readMorePoliciesAsync(SystemTopicClient.Reader<PulsarEvent> 
reader) {
         NamespaceName namespaceObject = 
reader.getSystemTopic().getTopicName().getNamespaceObject();
         if (closed.get()) {
-            cleanCacheAndCloseReader(namespaceObject, false);
+            cleanPoliciesCacheInitMap(namespaceObject, true);
             return;
         }
         reader.readNextAsync()
@@ -784,11 +838,10 @@ public class SystemTopicBasedTopicPoliciesService 
implements TopicPoliciesServic
                     if (ex == null) {
                         readMorePoliciesAsync(reader);
                     } else {
-                        Throwable cause = 
FutureUtil.unwrapCompletionException(ex);
-                        if (cause instanceof 
PulsarClientException.AlreadyClosedException) {
+                        if (isAlreadyClosedException(ex)) {
                             log.info("Closing the topic policies reader for 
{}",
                                     reader.getSystemTopic().getTopicName());
-                            cleanCacheAndCloseReader(namespaceObject, false);
+                            cleanPoliciesCacheInitMap(namespaceObject, true);
                         } else {
                             log.warn("Read more topic polices exception, read 
again.", ex);
                             readMorePoliciesAsync(reader);
@@ -797,6 +850,11 @@ public class SystemTopicBasedTopicPoliciesService 
implements TopicPoliciesServic
                 });
     }
 
+    private boolean isAlreadyClosedException(Throwable ex) {
+        Throwable cause = FutureUtil.unwrapCompletionException(ex);
+        return cause instanceof PulsarClientException.AlreadyClosedException;
+    }
+
     private void refreshTopicPoliciesCache(Message<PulsarEvent> msg) {
         // delete policies
         if (msg.getValue() == null) {
@@ -884,6 +942,11 @@ public class SystemTopicBasedTopicPoliciesService 
implements TopicPoliciesServic
     }
 
 
+    @VisibleForTesting
+    public Map<NamespaceName, 
CompletableFuture<SystemTopicClient.Reader<PulsarEvent>>> getReaderCaches() {
+        return readerCaches;
+    }
+
     @VisibleForTesting
     long getPoliciesCacheSize() {
         return policiesCache.size();
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TopicPoliciesTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TopicPoliciesTest.java
index f8c3f580fc1..8d8e7c385b5 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TopicPoliciesTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TopicPoliciesTest.java
@@ -64,6 +64,7 @@ import 
org.apache.pulsar.broker.service.persistent.DispatchRateLimiter;
 import org.apache.pulsar.broker.service.persistent.PersistentSubscription;
 import org.apache.pulsar.broker.service.persistent.PersistentTopic;
 import org.apache.pulsar.broker.service.persistent.SubscribeRateLimiter;
+import org.apache.pulsar.broker.systopic.SystemTopicClient;
 import org.apache.pulsar.client.admin.PulsarAdminException;
 import org.apache.pulsar.client.api.Consumer;
 import org.apache.pulsar.client.api.ConsumerBuilder;
@@ -77,6 +78,7 @@ import 
org.apache.pulsar.client.api.SubscriptionInitialPosition;
 import org.apache.pulsar.client.api.SubscriptionMode;
 import org.apache.pulsar.client.api.SubscriptionType;
 import org.apache.pulsar.common.api.proto.CommandSubscribe;
+import org.apache.pulsar.common.events.PulsarEvent;
 import org.apache.pulsar.common.naming.NamespaceName;
 import org.apache.pulsar.common.naming.SystemTopicNames;
 import org.apache.pulsar.common.naming.TopicDomain;
@@ -3578,6 +3580,10 @@ public class TopicPoliciesTest extends 
MockedPulsarServiceBaseTest {
         policyCacheInitMap.clear();
         policiesCache.clear();
         globalPoliciesCache.clear();
+
+        Map<NamespaceName, 
CompletableFuture<SystemTopicClient.Reader<PulsarEvent>>> readerCaches =
+                ((SystemTopicBasedTopicPoliciesService) 
topicPoliciesService).getReaderCaches();
+        readerCaches.clear();
     }
 
     @DataProvider(name = "reloadPolicyTypes")

Reply via email to