mattisonchao commented on code in PR #21231: URL: https://github.com/apache/pulsar/pull/21231#discussion_r1335350369
########## pulsar-broker/src/main/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesService.java: ########## @@ -265,39 +296,48 @@ public CompletableFuture<TopicPolicies> getTopicPoliciesBypassCacheAsync(TopicNa @Override public CompletableFuture<Void> addOwnedNamespaceBundleAsync(NamespaceBundle namespaceBundle) { - CompletableFuture<Void> result = new CompletableFuture<>(); NamespaceName namespace = namespaceBundle.getNamespaceObject(); if (NamespaceService.isHeartbeatNamespace(namespace)) { - result.complete(null); - return result; + return CompletableFuture.completedFuture(null); } synchronized (this) { if (readerCaches.get(namespace) != null) { ownedBundlesCountPerNamespace.get(namespace).incrementAndGet(); - result.complete(null); + return CompletableFuture.completedFuture(null); } else { - prepareInitPoliciesCache(namespace, result); + return prepareInitPoliciesCacheAsync(namespace); } } - return result; } - private void prepareInitPoliciesCache(@Nonnull NamespaceName namespace, CompletableFuture<Void> result) { - if (policyCacheInitMap.putIfAbsent(namespace, false) == null) { - CompletableFuture<SystemTopicClient.Reader<PulsarEvent>> readerCompletableFuture = + private @Nonnull CompletableFuture<Void> prepareInitPoliciesCacheAsync(@Nonnull NamespaceName namespace) { + requireNonNull(namespace); + return policyCacheInitMap.computeIfAbsent(namespace, (k) -> { + final CompletableFuture<SystemTopicClient.Reader<PulsarEvent>> readerCompletableFuture = createSystemTopicClientWithRetry(namespace); readerCaches.put(namespace, readerCompletableFuture); ownedBundlesCountPerNamespace.putIfAbsent(namespace, new AtomicInteger(1)); - readerCompletableFuture.thenAccept(reader -> { - initPolicesCache(reader, result); - result.thenRun(() -> readMorePolicies(reader)); - }).exceptionally(ex -> { - log.error("[{}] Failed to create reader on __change_events topic", namespace, ex); - cleanCacheAndCloseReader(namespace, false); - result.completeExceptionally(ex); + final CompletableFuture<Void> initFuture = readerCompletableFuture + .thenCompose(reader -> { + final CompletableFuture<Void> stageFuture = new CompletableFuture<>(); + initPolicesCache(reader, stageFuture); + return stageFuture + // Read policies in background + .thenAccept(__ -> readMorePolicies(reader)); Review Comment: We should call this async method to let the reader read the next message. :) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org