This is an automated email from the ASF dual-hosted git repository.

lhotari pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 344905f136a [fix][broker] Avoid recursive update in ConcurrentHashMap 
during policy cache cleanup (#24939)
344905f136a is described below

commit 344905f136af9dbc6b132d12b4a7050a8964055d
Author: Cong Zhao <[email protected]>
AuthorDate: Wed Nov 5 02:46:05 2025 +0800

    [fix][broker] Avoid recursive update in ConcurrentHashMap during policy 
cache cleanup (#24939)
---
 .../SystemTopicBasedTopicPoliciesService.java      |  4 +-
 .../SystemTopicBasedTopicPoliciesServiceTest.java  | 44 ++++++++++++++++++++++
 2 files changed, 46 insertions(+), 2 deletions(-)

diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesService.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesService.java
index 7e3590f1bb8..8287583a3d7 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesService.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesService.java
@@ -588,7 +588,7 @@ public class SystemTopicBasedTopicPoliciesService 
implements TopicPoliciesServic
                                                     // Read policies in 
background
                                                     .thenAccept(__ -> 
readMorePoliciesAsync(reader));
                                         });
-                                initFuture.exceptionally(ex -> {
+                                initFuture.exceptionallyAsync(ex -> {
                                     try {
                                         if (closed.get()) {
                                             return null;
@@ -601,7 +601,7 @@ public class SystemTopicBasedTopicPoliciesService 
implements TopicPoliciesServic
                                                 namespace, cleanupEx);
                                     }
                                     return null;
-                                });
+                                }, pulsarService.getExecutor());
                                 // let caller know we've got an exception.
                                 return initFuture;
                             }).thenApply(__ -> true);
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesServiceTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesServiceTest.java
index 8149b7a9435..607a8e3f9c2 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesServiceTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesServiceTest.java
@@ -25,6 +25,7 @@ import static org.testng.AssertJUnit.assertNull;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
+import java.util.Optional;
 import java.util.Set;
 import java.util.UUID;
 import java.util.concurrent.CompletableFuture;
@@ -421,4 +422,47 @@ public class SystemTopicBasedTopicPoliciesServiceTest 
extends MockedPulsarServic
         service.prepareInitPoliciesCacheAsync(namespaceName).get();
         admin.namespaces().deleteNamespace(NAMESPACE5);
     }
+
+    @Test
+    public void testCreateNamespaceEventsSystemTopicFactoryException() throws 
Exception {
+        final String namespace = "system-topic/namespace-6";
+
+        admin.namespaces().createNamespace(namespace);
+
+        TopicName topicName = TopicName.get("persistent", 
NamespaceName.get(namespace), "topic-1");
+
+        SystemTopicBasedTopicPoliciesService service =
+            Mockito.spy((SystemTopicBasedTopicPoliciesService) 
pulsar.getTopicPoliciesService());
+
+        // inject exception when create NamespaceEventsSystemTopicFactory
+        Mockito.doThrow(new RuntimeException("test exception")).when(service)
+            .getNamespaceEventsSystemTopicFactory();
+
+        CompletableFuture<Optional<TopicPolicies>> topicPoliciesFuture;
+        Optional<TopicPolicies> topicPoliciesOptional;
+        try {
+            topicPoliciesFuture =
+            service.getTopicPoliciesAsync(topicName, 
TopicPoliciesService.GetType.LOCAL_ONLY);
+            topicPoliciesOptional = topicPoliciesFuture.join();
+            Assert.fail();
+        } catch (Exception e) {
+            Assert.assertTrue(e.getCause().getMessage().contains("test 
exception"));
+        }
+
+        Mockito.reset(service);
+
+        service.updateTopicPoliciesAsync(topicName, false, false, 
topicPolicies ->
+            topicPolicies.setMaxConsumerPerTopic(10)).get();
+
+        topicPoliciesFuture =
+            service.getTopicPoliciesAsync(topicName, 
TopicPoliciesService.GetType.LOCAL_ONLY);
+        topicPoliciesOptional = topicPoliciesFuture.join();
+
+        Assert.assertNotNull(topicPoliciesOptional);
+        Assert.assertTrue(topicPoliciesOptional.isPresent());
+
+        TopicPolicies topicPolicies = topicPoliciesOptional.get();
+        Assert.assertNotNull(topicPolicies);
+        Assert.assertEquals(topicPolicies.getMaxConsumerPerTopic(), 10);
+    }
 }

Reply via email to