This is an automated email from the ASF dual-hosted git repository. lhotari pushed a commit to branch branch-3.0 in repository https://gitbox.apache.org/repos/asf/pulsar.git
commit d4610a0d60139f8660c9d58bf2d8a7981f03d19a Author: hanmz <[email protected]> AuthorDate: Mon Mar 18 06:45:02 2024 +0800 [fix][broker] Avoid execute prepareInitPoliciesCacheAsync if namespace is deleted (#22268) (cherry picked from commit 96d77f7e1d5b9c56070eaed5c31213a8144871d3) --- .../SystemTopicBasedTopicPoliciesService.java | 66 +++++++++++++--------- .../SystemTopicBasedTopicPoliciesServiceTest.java | 19 +++++++ 2 files changed, 58 insertions(+), 27 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesService.java index 71f78e21f93..4e9e875bcf4 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesService.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesService.java @@ -324,34 +324,46 @@ public class SystemTopicBasedTopicPoliciesService implements TopicPoliciesServic } } - private @Nonnull CompletableFuture<Void> prepareInitPoliciesCacheAsync(@Nonnull NamespaceName namespace) { + @VisibleForTesting + @Nonnull CompletableFuture<Void> prepareInitPoliciesCacheAsync(@Nonnull NamespaceName namespace) { requireNonNull(namespace); - return policyCacheInitMap.computeIfAbsent(namespace, (k) -> { - final CompletableFuture<SystemTopicClient.Reader<PulsarEvent>> readerCompletableFuture = - createSystemTopicClient(namespace); - readerCaches.put(namespace, readerCompletableFuture); - ownedBundlesCountPerNamespace.putIfAbsent(namespace, new AtomicInteger(1)); - final CompletableFuture<Void> initFuture = readerCompletableFuture - .thenCompose(reader -> { - final CompletableFuture<Void> stageFuture = new CompletableFuture<>(); - initPolicesCache(reader, stageFuture); - return stageFuture - // Read policies in background - .thenAccept(__ -> readMorePoliciesAsync(reader)); - }); - initFuture.exceptionally(ex -> { - try { - log.error("[{}] Failed to create reader on __change_events topic", namespace, ex); - cleanCacheAndCloseReader(namespace, false); - } catch (Throwable cleanupEx) { - // Adding this catch to avoid break callback chain - log.error("[{}] Failed to cleanup reader on __change_events topic", namespace, cleanupEx); - } - return null; - }); - // let caller know we've got an exception. - return initFuture; - }); + return pulsarService.getPulsarResources().getNamespaceResources().getPoliciesAsync(namespace) + .thenCompose(namespacePolicies -> { + if (namespacePolicies.isEmpty() || namespacePolicies.get().deleted) { + log.info("[{}] skip prepare init policies cache since the namespace is deleted", + namespace); + return CompletableFuture.completedFuture(null); + } + + return policyCacheInitMap.computeIfAbsent(namespace, (k) -> { + final CompletableFuture<SystemTopicClient.Reader<PulsarEvent>> readerCompletableFuture = + createSystemTopicClient(namespace); + readerCaches.put(namespace, readerCompletableFuture); + ownedBundlesCountPerNamespace.putIfAbsent(namespace, new AtomicInteger(1)); + final CompletableFuture<Void> initFuture = readerCompletableFuture + .thenCompose(reader -> { + final CompletableFuture<Void> stageFuture = new CompletableFuture<>(); + initPolicesCache(reader, stageFuture); + return stageFuture + // Read policies in background + .thenAccept(__ -> readMorePoliciesAsync(reader)); + }); + initFuture.exceptionally(ex -> { + try { + log.error("[{}] Failed to create reader on __change_events topic", + namespace, ex); + cleanCacheAndCloseReader(namespace, false); + } catch (Throwable cleanupEx) { + // Adding this catch to avoid break callback chain + log.error("[{}] Failed to cleanup reader on __change_events topic", + namespace, cleanupEx); + } + return null; + }); + // let caller know we've got an exception. + return initFuture; + }); + }); } protected CompletableFuture<SystemTopicClient.Reader<PulsarEvent>> createSystemTopicClient( diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesServiceTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesServiceTest.java index 343a18da6d8..e571da13435 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesServiceTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesServiceTest.java @@ -71,6 +71,8 @@ public class SystemTopicBasedTopicPoliciesServiceTest extends MockedPulsarServic private static final String NAMESPACE4 = "system-topic/namespace-4"; + private static final String NAMESPACE5 = "system-topic/namespace-5"; + private static final TopicName TOPIC1 = TopicName.get("persistent", NamespaceName.get(NAMESPACE1), "topic-1"); private static final TopicName TOPIC2 = TopicName.get("persistent", NamespaceName.get(NAMESPACE1), "topic-2"); private static final TopicName TOPIC3 = TopicName.get("persistent", NamespaceName.get(NAMESPACE2), "topic-1"); @@ -464,4 +466,21 @@ public class SystemTopicBasedTopicPoliciesServiceTest extends MockedPulsarServic admin.namespaces().deleteNamespace(NAMESPACE4); Assert.assertNull(service.getWriterCaches().synchronous().getIfPresent(NamespaceName.get(NAMESPACE4))); } + + @Test + public void testPrepareInitPoliciesCacheAsyncWhenNamespaceBeingDeleted() throws Exception { + SystemTopicBasedTopicPoliciesService service = (SystemTopicBasedTopicPoliciesService) pulsar.getTopicPoliciesService(); + admin.namespaces().createNamespace(NAMESPACE5); + + NamespaceName namespaceName = NamespaceName.get(NAMESPACE5); + pulsar.getPulsarResources().getNamespaceResources().setPolicies(namespaceName, + old -> { + old.deleted = true; + return old; + }); + + assertNull(service.getPoliciesCacheInit(namespaceName)); + service.prepareInitPoliciesCacheAsync(namespaceName).get(); + admin.namespaces().deleteNamespace(NAMESPACE5); + } }
