This is an automated email from the ASF dual-hosted git repository.

penghui pushed a commit to branch branch-2.9
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit 9d0740b47a16cf21d8c53f74198d364a4ce4b516
Author: Qiang Zhao <74767115+mattisonc...@users.noreply.github.com>
AuthorDate: Wed Apr 6 15:41:17 2022 +0800

    [improve][broker] Avoid using blocking calls for the async method 
``checkTopicOwnership`` (#15023)
    
    (cherry picked from commit c59402ef09c870469a4d5ff835fa6222518704b9)
---
 .../apache/pulsar/broker/namespace/NamespaceService.java    |  2 +-
 .../org/apache/pulsar/broker/namespace/OwnershipCache.java  | 13 +++++++++++--
 .../apache/pulsar/broker/namespace/OwnershipCacheTest.java  |  2 +-
 3 files changed, 13 insertions(+), 4 deletions(-)

diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java
index 058354cd01a..e576e864467 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java
@@ -1031,7 +1031,7 @@ public class NamespaceService implements AutoCloseable {
 
     public CompletableFuture<Boolean> checkTopicOwnership(TopicName topicName) 
{
         return getBundleAsync(topicName)
-                .thenApply(ownershipCache::checkOwnership);
+                .thenCompose(ownershipCache::checkOwnershipAsync);
     }
 
     public void removeOwnedServiceUnit(NamespaceBundle nsBundle) throws 
Exception {
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/OwnershipCache.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/OwnershipCache.java
index daedb712e29..fc014414f5e 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/OwnershipCache.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/OwnershipCache.java
@@ -148,8 +148,13 @@ public class OwnershipCache {
      * @param bundle namespace bundle
      * @return future that will complete with check result
      */
-    public boolean checkOwnership(NamespaceBundle bundle) {
-        return getOwnedBundle(bundle) != null;
+    public CompletableFuture<Boolean> checkOwnershipAsync(NamespaceBundle 
bundle) {
+        Optional<CompletableFuture<OwnedBundle>> ownedBundleFuture = 
getOwnedBundleAsync(bundle);
+        if (!ownedBundleFuture.isPresent()) {
+            return CompletableFuture.completedFuture(false);
+        }
+        return ownedBundleFuture.get()
+                .thenApply(bd -> bd != null && bd.isActive());
     }
 
     /**
@@ -277,6 +282,10 @@ public class OwnershipCache {
         }
     }
 
+    public Optional<CompletableFuture<OwnedBundle>> 
getOwnedBundleAsync(NamespaceBundle bundle) {
+        return Optional.ofNullable(ownedBundlesCache.getIfPresent(bundle));
+    }
+
     /**
      * Disable bundle in local cache and on zk.
      *
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/namespace/OwnershipCacheTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/namespace/OwnershipCacheTest.java
index 143d5ef78f5..dde25fa2eed 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/namespace/OwnershipCacheTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/namespace/OwnershipCacheTest.java
@@ -401,7 +401,7 @@ public class OwnershipCacheTest {
         assertFalse(data3.isDisabled());
         assertNotNull(cache.getOwnedBundle(testFullBundle));
 
-        assertTrue(cache.checkOwnership(testFullBundle));
+        assertTrue(cache.checkOwnershipAsync(testFullBundle).get());
         assertEquals(data2.getNativeUrl(), selfBrokerUrl);
         assertFalse(data2.isDisabled());
         assertNotNull(cache.getOwnedBundle(testFullBundle));

Reply via email to