This is an automated email from the ASF dual-hosted git repository.

technoboy pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 183340776f3 [improve][broker] Make some methods of `BrokersBase` pure 
async. (#15280)
183340776f3 is described below

commit 183340776f3a5248ec384ea6f94fff7c3f38bc03
Author: Qiang Zhao <[email protected]>
AuthorDate: Wed Apr 27 17:10:40 2022 +0800

    [improve][broker] Make some methods of `BrokersBase` pure async. (#15280)
---
 .../broker/resources/NamespaceResources.java       |  6 +++
 .../pulsar/broker/admin/impl/BrokersBase.java      | 62 ++++++++++++----------
 .../pulsar/broker/namespace/NamespaceService.java  | 35 ++++++------
 .../pulsar/broker/namespace/OwnershipCache.java    |  4 ++
 4 files changed, 62 insertions(+), 45 deletions(-)

diff --git 
a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/resources/NamespaceResources.java
 
b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/resources/NamespaceResources.java
index 3c35f32f36c..54122bbcf45 100644
--- 
a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/resources/NamespaceResources.java
+++ 
b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/resources/NamespaceResources.java
@@ -183,6 +183,12 @@ public class NamespaceResources extends 
BaseResources<Policies> {
             return data.isPresent() ? Optional.of(new 
NamespaceIsolationPolicies(data.get())) : Optional.empty();
         }
 
+        public CompletableFuture<NamespaceIsolationPolicies> 
getIsolationDataPoliciesAsync(String cluster) {
+            return getAsync(joinPath(BASE_CLUSTERS_PATH, cluster, 
NAMESPACE_ISOLATION_POLICIES))
+                    .thenApply(data -> 
data.map(NamespaceIsolationPolicies::new)
+                            .orElseGet(NamespaceIsolationPolicies::new));
+        }
+
         public void deleteIsolationData(String cluster) throws 
MetadataStoreException {
             delete(joinPath(BASE_CLUSTERS_PATH, cluster, 
NAMESPACE_ISOLATION_POLICIES));
         }
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/BrokersBase.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/BrokersBase.java
index 8e9167a886c..7f6740e3ed2 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/BrokersBase.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/BrokersBase.java
@@ -147,20 +147,23 @@ public class BrokersBase extends AdminResource {
             @ApiResponse(code = 307, message = "Current broker doesn't serve 
the cluster"),
             @ApiResponse(code = 403, message = "Don't have admin permission"),
             @ApiResponse(code = 404, message = "Cluster doesn't exist") })
-    public Map<String, NamespaceOwnershipStatus> 
getOwnedNamespaces(@PathParam("clusterName") String cluster,
-            @PathParam("broker-webserviceurl") String broker) throws Exception 
{
-        validateSuperUserAccess();
-        validateClusterOwnership(cluster);
-        validateBrokerName(broker);
-
-        try {
-            // now we validated that this is the broker specified in the 
request
-            return pulsar().getNamespaceService().getOwnedNameSpacesStatus();
-        } catch (Exception e) {
-            LOG.error("[{}] Failed to get the namespace ownership status. 
cluster={}, broker={}", clientAppId(),
-                    cluster, broker);
-            throw new RestException(e);
-        }
+    public void getOwnedNamespaces(@Suspended final AsyncResponse 
asyncResponse,
+                                   @PathParam("clusterName") String cluster,
+                                   @PathParam("broker-webserviceurl") String 
broker) {
+        validateSuperUserAccessAsync()
+                .thenAccept(__ -> validateBrokerName(broker))
+                .thenCompose(__ -> validateClusterOwnershipAsync(cluster))
+                .thenCompose(__ -> 
pulsar().getNamespaceService().getOwnedNameSpacesStatusAsync())
+                .thenAccept(asyncResponse::resume)
+                .exceptionally(ex -> {
+                    // If the exception is not redirect exception we need to 
log it.
+                    if (!isRedirectException(ex)) {
+                        LOG.error("[{}] Failed to get the namespace ownership 
status. cluster={}, broker={}",
+                                clientAppId(), cluster, broker);
+                    }
+                    resumeAsyncResponseExceptionally(asyncResponse, ex);
+                    return null;
+                });
     }
 
     @POST
@@ -219,17 +222,15 @@ public class BrokersBase extends AdminResource {
         @ApiResponse(code = 403, message = "You don't have admin permission to 
view configuration"),
         @ApiResponse(code = 404, message = "Configuration not found"),
         @ApiResponse(code = 500, message = "Internal server error")})
-    public Map<String, String> getAllDynamicConfigurations() throws Exception {
-        validateSuperUserAccess();
-        try {
-            return 
dynamicConfigurationResources().getDynamicConfiguration().orElseGet(Collections::emptyMap);
-        } catch (RestException e) {
-            LOG.error("[{}] couldn't find any configuration in zk {}", 
clientAppId(), e.getMessage(), e);
-            throw e;
-        } catch (Exception e) {
-            LOG.error("[{}] Failed to retrieve configuration from zk {}", 
clientAppId(), e.getMessage(), e);
-            throw new RestException(e);
-        }
+    public void getAllDynamicConfigurations(@Suspended AsyncResponse 
asyncResponse) {
+        validateSuperUserAccessAsync()
+                .thenCompose(__ -> 
dynamicConfigurationResources().getDynamicConfigurationAsync())
+                .thenAccept(configOpt -> 
asyncResponse.resume(configOpt.orElseGet(Collections::emptyMap)))
+                .exceptionally(ex -> {
+                    LOG.error("[{}] Failed to get all dynamic configuration.", 
clientAppId(), ex);
+                    resumeAsyncResponseExceptionally(asyncResponse, ex);
+                    return null;
+                });
     }
 
     @GET
@@ -237,9 +238,14 @@ public class BrokersBase extends AdminResource {
     @ApiOperation(value = "Get all updatable dynamic configurations's name")
     @ApiResponses(value = {
             @ApiResponse(code = 403, message = "You don't have admin 
permission to get configuration")})
-    public List<String> getDynamicConfigurationName() {
-        validateSuperUserAccess();
-        return BrokerService.getDynamicConfiguration();
+    public void getDynamicConfigurationName(@Suspended AsyncResponse 
asyncResponse) {
+        validateSuperUserAccessAsync()
+                .thenAccept(__ -> 
asyncResponse.resume(BrokerService.getDynamicConfiguration()))
+                .exceptionally(ex -> {
+                    LOG.error("[{}] Failed to get all dynamic configuration 
names.", clientAppId(), ex);
+                    resumeAsyncResponseExceptionally(asyncResponse, ex);
+                    return null;
+                });
     }
 
     @GET
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java
index e1e1dc10676..fdfd9bf1880 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java
@@ -29,8 +29,8 @@ import com.google.common.hash.Hashing;
 import io.prometheus.client.Counter;
 import java.net.URI;
 import java.net.URL;
+import java.util.Collection;
 import java.util.Collections;
-import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Optional;
@@ -732,16 +732,21 @@ public class NamespaceService implements AutoCloseable {
         return 
pulsar.getLocalMetadataStore().exists(ServiceUnitUtils.path(bundle));
     }
 
-    public Map<String, NamespaceOwnershipStatus> getOwnedNameSpacesStatus() 
throws Exception {
-        NamespaceIsolationPolicies nsIsolationPolicies = 
this.getLocalNamespaceIsolationPolicies();
-        Map<String, NamespaceOwnershipStatus> ownedNsStatus = new 
HashMap<String, NamespaceOwnershipStatus>();
-        for (OwnedBundle nsObj : 
this.ownershipCache.getOwnedBundles().values()) {
-            NamespaceOwnershipStatus nsStatus = 
this.getNamespaceOwnershipStatus(nsObj,
-                    
nsIsolationPolicies.getPolicyByNamespace(nsObj.getNamespaceBundle().getNamespaceObject()));
-            ownedNsStatus.put(nsObj.getNamespaceBundle().toString(), nsStatus);
-        }
-
-        return ownedNsStatus;
+    public CompletableFuture<Map<String, NamespaceOwnershipStatus>> 
getOwnedNameSpacesStatusAsync() {
+       return getLocalNamespaceIsolationPoliciesAsync()
+                .thenCompose(namespaceIsolationPolicies -> {
+                    Collection<CompletableFuture<OwnedBundle>> futures =
+                            ownershipCache.getOwnedBundlesAsync().values();
+                    return FutureUtil.waitForAll(futures)
+                            .thenApply(__ -> futures.stream()
+                                    .map(CompletableFuture::join)
+                                    .collect(Collectors.toMap(bundle -> 
bundle.getNamespaceBundle().toString(),
+                                            bundle -> 
getNamespaceOwnershipStatus(bundle,
+                                                    
namespaceIsolationPolicies.getPolicyByNamespace(
+                                                            
bundle.getNamespaceBundle().getNamespaceObject()))
+                                    ))
+                            );
+                });
     }
 
     private NamespaceOwnershipStatus getNamespaceOwnershipStatus(OwnedBundle 
nsObj,
@@ -763,14 +768,10 @@ public class NamespaceService implements AutoCloseable {
         return nsOwnedStatus;
     }
 
-    private NamespaceIsolationPolicies getLocalNamespaceIsolationPolicies() 
throws Exception {
+    private CompletableFuture<NamespaceIsolationPolicies> 
getLocalNamespaceIsolationPoliciesAsync() {
         String localCluster = pulsar.getConfiguration().getClusterName();
         return 
pulsar.getPulsarResources().getNamespaceResources().getIsolationPolicies()
-                .getIsolationDataPolicies(localCluster)
-                .orElseGet(() -> {
-                    // the namespace isolation policies are empty/undefined = 
an empty object
-                    return new NamespaceIsolationPolicies();
-                });
+                .getIsolationDataPoliciesAsync(localCluster);
     }
 
     public boolean isNamespaceBundleDisabled(NamespaceBundle bundle) throws 
Exception {
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/OwnershipCache.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/OwnershipCache.java
index 3d8bf2654c6..67e986b804c 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/OwnershipCache.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/OwnershipCache.java
@@ -250,6 +250,10 @@ public class OwnershipCache {
         return this.ownedBundlesCache.synchronous().asMap();
     }
 
+    public Map<NamespaceBundle, CompletableFuture<OwnedBundle>> 
getOwnedBundlesAsync() {
+        return ownedBundlesCache.asMap();
+    }
+
     /**
      * Checked whether a particular bundle is currently owned by this broker.
      *

Reply via email to