This is an automated email from the ASF dual-hosted git repository.
lhotari pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 344905f136a [fix][broker] Avoid recursive update in ConcurrentHashMap
during policy cache cleanup (#24939)
344905f136a is described below
commit 344905f136af9dbc6b132d12b4a7050a8964055d
Author: Cong Zhao <[email protected]>
AuthorDate: Wed Nov 5 02:46:05 2025 +0800
[fix][broker] Avoid recursive update in ConcurrentHashMap during policy
cache cleanup (#24939)
---
.../SystemTopicBasedTopicPoliciesService.java | 4 +-
.../SystemTopicBasedTopicPoliciesServiceTest.java | 44 ++++++++++++++++++++++
2 files changed, 46 insertions(+), 2 deletions(-)
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesService.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesService.java
index 7e3590f1bb8..8287583a3d7 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesService.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesService.java
@@ -588,7 +588,7 @@ public class SystemTopicBasedTopicPoliciesService
implements TopicPoliciesServic
// Read policies in
background
.thenAccept(__ ->
readMorePoliciesAsync(reader));
});
- initFuture.exceptionally(ex -> {
+ initFuture.exceptionallyAsync(ex -> {
try {
if (closed.get()) {
return null;
@@ -601,7 +601,7 @@ public class SystemTopicBasedTopicPoliciesService
implements TopicPoliciesServic
namespace, cleanupEx);
}
return null;
- });
+ }, pulsarService.getExecutor());
// let caller know we've got an exception.
return initFuture;
}).thenApply(__ -> true);
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesServiceTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesServiceTest.java
index 8149b7a9435..607a8e3f9c2 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesServiceTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesServiceTest.java
@@ -25,6 +25,7 @@ import static org.testng.AssertJUnit.assertNull;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
+import java.util.Optional;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
@@ -421,4 +422,47 @@ public class SystemTopicBasedTopicPoliciesServiceTest
extends MockedPulsarServic
service.prepareInitPoliciesCacheAsync(namespaceName).get();
admin.namespaces().deleteNamespace(NAMESPACE5);
}
+
+ @Test
+ public void testCreateNamespaceEventsSystemTopicFactoryException() throws
Exception {
+ final String namespace = "system-topic/namespace-6";
+
+ admin.namespaces().createNamespace(namespace);
+
+ TopicName topicName = TopicName.get("persistent",
NamespaceName.get(namespace), "topic-1");
+
+ SystemTopicBasedTopicPoliciesService service =
+ Mockito.spy((SystemTopicBasedTopicPoliciesService)
pulsar.getTopicPoliciesService());
+
+ // inject exception when create NamespaceEventsSystemTopicFactory
+ Mockito.doThrow(new RuntimeException("test exception")).when(service)
+ .getNamespaceEventsSystemTopicFactory();
+
+ CompletableFuture<Optional<TopicPolicies>> topicPoliciesFuture;
+ Optional<TopicPolicies> topicPoliciesOptional;
+ try {
+ topicPoliciesFuture =
+ service.getTopicPoliciesAsync(topicName,
TopicPoliciesService.GetType.LOCAL_ONLY);
+ topicPoliciesOptional = topicPoliciesFuture.join();
+ Assert.fail();
+ } catch (Exception e) {
+ Assert.assertTrue(e.getCause().getMessage().contains("test
exception"));
+ }
+
+ Mockito.reset(service);
+
+ service.updateTopicPoliciesAsync(topicName, false, false,
topicPolicies ->
+ topicPolicies.setMaxConsumerPerTopic(10)).get();
+
+ topicPoliciesFuture =
+ service.getTopicPoliciesAsync(topicName,
TopicPoliciesService.GetType.LOCAL_ONLY);
+ topicPoliciesOptional = topicPoliciesFuture.join();
+
+ Assert.assertNotNull(topicPoliciesOptional);
+ Assert.assertTrue(topicPoliciesOptional.isPresent());
+
+ TopicPolicies topicPolicies = topicPoliciesOptional.get();
+ Assert.assertNotNull(topicPolicies);
+ Assert.assertEquals(topicPolicies.getMaxConsumerPerTopic(), 10);
+ }
}