This is an automated email from the ASF dual-hosted git repository. lhotari pushed a commit to branch branch-4.0 in repository https://gitbox.apache.org/repos/asf/pulsar.git
commit c2ea6a9afeee9f436d9c44564b5f9e8d343e334c Author: Cong Zhao <[email protected]> AuthorDate: Wed Nov 5 02:46:05 2025 +0800 [fix][broker] Avoid recursive update in ConcurrentHashMap during policy cache cleanup (#24939) (cherry picked from commit 344905f136af9dbc6b132d12b4a7050a8964055d) --- .../SystemTopicBasedTopicPoliciesService.java | 4 +- .../SystemTopicBasedTopicPoliciesServiceTest.java | 54 ++++++++++++++++++++-- 2 files changed, 53 insertions(+), 5 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesService.java index 880498209a6..c745d591a4e 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesService.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesService.java @@ -589,7 +589,7 @@ public class SystemTopicBasedTopicPoliciesService implements TopicPoliciesServic // Read policies in background .thenAccept(__ -> readMorePoliciesAsync(reader)); }); - initFuture.exceptionally(ex -> { + initFuture.exceptionallyAsync(ex -> { try { if (closed.get()) { return null; @@ -603,7 +603,7 @@ public class SystemTopicBasedTopicPoliciesService implements TopicPoliciesServic namespace, cleanupEx); } return null; - }); + }, pulsarService.getExecutor()); // let caller know we've got an exception. return initFuture; }).thenApply(__ -> true); diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesServiceTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesServiceTest.java index 8149b7a9435..4326f83d763 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesServiceTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesServiceTest.java @@ -18,13 +18,16 @@ */ package org.apache.pulsar.broker.service; +import static org.assertj.core.api.Assertions.assertThat; import static org.mockito.Mockito.spy; import static org.testng.AssertJUnit.assertEquals; import static org.testng.AssertJUnit.assertNotNull; import static org.testng.AssertJUnit.assertNull; +import java.time.Duration; import java.util.HashSet; import java.util.List; import java.util.Map; +import java.util.Optional; import java.util.Set; import java.util.UUID; import java.util.concurrent.CompletableFuture; @@ -45,7 +48,6 @@ import org.apache.pulsar.common.naming.TopicName; import org.apache.pulsar.common.policies.data.ClusterData; import org.apache.pulsar.common.policies.data.TenantInfoImpl; import org.apache.pulsar.common.policies.data.TopicPolicies; -import org.assertj.core.api.Assertions; import org.awaitility.Awaitility; import org.mockito.Mockito; import org.testng.Assert; @@ -319,12 +321,12 @@ public class SystemTopicBasedTopicPoliciesServiceTest extends MockedPulsarServic spy(new ConcurrentHashMap<TopicName, TopicPolicies>()); FieldUtils.writeDeclaredField(topicPoliciesService, "policiesCache", spyPoliciesCache, true); - Awaitility.await().untilAsserted(() -> Assertions.assertThat( + Awaitility.await().untilAsserted(() -> assertThat( TopicPolicyTestUtils.getTopicPolicies(topicPoliciesService, TopicName.get(topic))).isNull()); admin.topicPolicies().setMaxConsumersPerSubscription(topic, 1); Awaitility.await().untilAsserted(() -> { - Assertions.assertThat(TopicPolicyTestUtils.getTopicPolicies(pulsar.getTopicPoliciesService(), + assertThat(TopicPolicyTestUtils.getTopicPolicies(pulsar.getTopicPoliciesService(), TopicName.get(topic))).isNotNull(); }); @@ -421,4 +423,50 @@ public class SystemTopicBasedTopicPoliciesServiceTest extends MockedPulsarServic service.prepareInitPoliciesCacheAsync(namespaceName).get(); admin.namespaces().deleteNamespace(NAMESPACE5); } + + @Test + public void testCreateNamespaceEventsSystemTopicFactoryException() throws Exception { + final String namespace = "system-topic/namespace-6"; + + admin.namespaces().createNamespace(namespace); + + TopicName topicName = TopicName.get("persistent", NamespaceName.get(namespace), "topic-1"); + + SystemTopicBasedTopicPoliciesService service = + Mockito.spy((SystemTopicBasedTopicPoliciesService) pulsar.getTopicPoliciesService()); + + // inject exception when create NamespaceEventsSystemTopicFactory + Mockito.doThrow(new RuntimeException("test exception")) + .doCallRealMethod() + .when(service) + .getNamespaceEventsSystemTopicFactory(); + + CompletableFuture<Optional<TopicPolicies>> topicPoliciesFuture; + Optional<TopicPolicies> topicPoliciesOptional; + try { + topicPoliciesFuture = + service.getTopicPoliciesAsync(topicName, TopicPoliciesService.GetType.LOCAL_ONLY); + topicPoliciesOptional = topicPoliciesFuture.join(); + Assert.fail(); + } catch (Exception e) { + Assert.assertTrue(e.getCause().getMessage().contains("test exception")); + } + + Awaitility.await().untilAsserted(() -> { + assertThat(service.updateTopicPoliciesAsync(topicName, false, false, topicPolicies -> + topicPolicies.setMaxConsumerPerTopic(10))) + .succeedsWithin(Duration.ofSeconds(2)); + }); + + topicPoliciesFuture = + service.getTopicPoliciesAsync(topicName, TopicPoliciesService.GetType.LOCAL_ONLY); + topicPoliciesOptional = topicPoliciesFuture.join(); + + Assert.assertNotNull(topicPoliciesOptional); + Assert.assertTrue(topicPoliciesOptional.isPresent()); + + TopicPolicies topicPolicies = topicPoliciesOptional.get(); + Assert.assertNotNull(topicPolicies); + Assert.assertEquals(topicPolicies.getMaxConsumerPerTopic(), 10); + } }
