This is an automated email from the ASF dual-hosted git repository. penghui pushed a commit to branch branch-2.9 in repository https://gitbox.apache.org/repos/asf/pulsar.git
commit 9d0740b47a16cf21d8c53f74198d364a4ce4b516 Author: Qiang Zhao <74767115+mattisonc...@users.noreply.github.com> AuthorDate: Wed Apr 6 15:41:17 2022 +0800 [improve][broker] Avoid using blocking calls for the async method ``checkTopicOwnership`` (#15023) (cherry picked from commit c59402ef09c870469a4d5ff835fa6222518704b9) --- .../apache/pulsar/broker/namespace/NamespaceService.java | 2 +- .../org/apache/pulsar/broker/namespace/OwnershipCache.java | 13 +++++++++++-- .../apache/pulsar/broker/namespace/OwnershipCacheTest.java | 2 +- 3 files changed, 13 insertions(+), 4 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java index 058354cd01a..e576e864467 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java @@ -1031,7 +1031,7 @@ public class NamespaceService implements AutoCloseable { public CompletableFuture<Boolean> checkTopicOwnership(TopicName topicName) { return getBundleAsync(topicName) - .thenApply(ownershipCache::checkOwnership); + .thenCompose(ownershipCache::checkOwnershipAsync); } public void removeOwnedServiceUnit(NamespaceBundle nsBundle) throws Exception { diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/OwnershipCache.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/OwnershipCache.java index daedb712e29..fc014414f5e 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/OwnershipCache.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/OwnershipCache.java @@ -148,8 +148,13 @@ public class OwnershipCache { * @param bundle namespace bundle * @return future that will complete with check result */ - public boolean checkOwnership(NamespaceBundle bundle) { - return getOwnedBundle(bundle) != null; + public CompletableFuture<Boolean> checkOwnershipAsync(NamespaceBundle bundle) { + Optional<CompletableFuture<OwnedBundle>> ownedBundleFuture = getOwnedBundleAsync(bundle); + if (!ownedBundleFuture.isPresent()) { + return CompletableFuture.completedFuture(false); + } + return ownedBundleFuture.get() + .thenApply(bd -> bd != null && bd.isActive()); } /** @@ -277,6 +282,10 @@ public class OwnershipCache { } } + public Optional<CompletableFuture<OwnedBundle>> getOwnedBundleAsync(NamespaceBundle bundle) { + return Optional.ofNullable(ownedBundlesCache.getIfPresent(bundle)); + } + /** * Disable bundle in local cache and on zk. * diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/namespace/OwnershipCacheTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/namespace/OwnershipCacheTest.java index 143d5ef78f5..dde25fa2eed 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/namespace/OwnershipCacheTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/namespace/OwnershipCacheTest.java @@ -401,7 +401,7 @@ public class OwnershipCacheTest { assertFalse(data3.isDisabled()); assertNotNull(cache.getOwnedBundle(testFullBundle)); - assertTrue(cache.checkOwnership(testFullBundle)); + assertTrue(cache.checkOwnershipAsync(testFullBundle).get()); assertEquals(data2.getNativeUrl(), selfBrokerUrl); assertFalse(data2.isDisabled()); assertNotNull(cache.getOwnedBundle(testFullBundle));