This is an automated email from the ASF dual-hosted git repository.

technoboy pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 8d9340aca6e [improve][broker] Make some methods of ClusterBase pure 
async.  (#15685)
8d9340aca6e is described below

commit 8d9340aca6e5581183153738338744cb59106e11
Author: Qiang Zhao <[email protected]>
AuthorDate: Fri May 27 17:10:20 2022 +0800

    [improve][broker] Make some methods of ClusterBase pure async.  (#15685)
---
 .../pulsar/broker/resources/ClusterResources.java  |  12 +
 .../pulsar/broker/admin/impl/ClustersBase.java     | 274 +++++++++++----------
 .../org/apache/pulsar/broker/admin/AdminTest.java  |   5 +-
 3 files changed, 156 insertions(+), 135 deletions(-)

diff --git 
a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/resources/ClusterResources.java
 
b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/resources/ClusterResources.java
index 7dbf2839b39..91639578d26 100644
--- 
a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/resources/ClusterResources.java
+++ 
b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/resources/ClusterResources.java
@@ -141,10 +141,17 @@ public class ClusterResources extends 
BaseResources<ClusterData> {
             super(store, clazz, operationTimeoutSec);
         }
 
+        public CompletableFuture<List<String>> listFailureDomainsAsync(String 
clusterName) {
+            return getChildrenAsync(joinPath(BASE_CLUSTERS_PATH, clusterName, 
FAILURE_DOMAIN));
+        }
         public List<String> listFailureDomains(String clusterName) throws 
MetadataStoreException {
             return getChildren(joinPath(BASE_CLUSTERS_PATH, clusterName, 
FAILURE_DOMAIN));
         }
 
+        public CompletableFuture<Optional<FailureDomainImpl>> 
getFailureDomainAsync(String clusterName,
+                                                                               
     String domainName) {
+            return getAsync(joinPath(BASE_CLUSTERS_PATH, clusterName, 
FAILURE_DOMAIN, domainName));
+        }
         public Optional<FailureDomainImpl> getFailureDomain(String 
clusterName, String domainName)
                 throws MetadataStoreException {
             return get(joinPath(BASE_CLUSTERS_PATH, clusterName, 
FAILURE_DOMAIN, domainName));
@@ -183,6 +190,11 @@ public class ClusterResources extends 
BaseResources<ClusterData> {
             delete(failureDomainPath);
         }
 
+        public CompletableFuture<Void> setFailureDomainWithCreateAsync(String 
clusterName, String domainName,
+                                           
Function<Optional<FailureDomainImpl>, FailureDomainImpl> createFunction) {
+            return setWithCreateAsync(
+                    joinPath(BASE_CLUSTERS_PATH, clusterName, FAILURE_DOMAIN, 
domainName), createFunction);
+        }
         public void setFailureDomainWithCreate(String clusterName, String 
domainName,
                                                
Function<Optional<FailureDomainImpl>, FailureDomainImpl> createFunction)
                 throws MetadataStoreException {
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/ClustersBase.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/ClustersBase.java
index a1fd8a24733..034d7b5f9fe 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/ClustersBase.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/ClustersBase.java
@@ -19,7 +19,6 @@
 package org.apache.pulsar.broker.admin.impl;
 
 import static javax.ws.rs.core.Response.Status.PRECONDITION_FAILED;
-import com.google.common.collect.Maps;
 import io.swagger.annotations.ApiOperation;
 import io.swagger.annotations.ApiParam;
 import io.swagger.annotations.ApiResponse;
@@ -31,6 +30,7 @@ import java.util.Collections;
 import java.util.LinkedHashSet;
 import java.util.List;
 import java.util.Map;
+import java.util.Objects;
 import java.util.Optional;
 import java.util.concurrent.CompletableFuture;
 import java.util.stream.Collectors;
@@ -41,6 +41,7 @@ import javax.ws.rs.POST;
 import javax.ws.rs.PUT;
 import javax.ws.rs.Path;
 import javax.ws.rs.PathParam;
+import javax.ws.rs.WebApplicationException;
 import javax.ws.rs.container.AsyncResponse;
 import javax.ws.rs.container.Suspended;
 import javax.ws.rs.core.MediaType;
@@ -48,9 +49,9 @@ import javax.ws.rs.core.Response;
 import javax.ws.rs.core.Response.Status;
 import org.apache.bookkeeper.common.util.JsonUtil;
 import org.apache.commons.collections4.CollectionUtils;
+import org.apache.commons.lang3.tuple.Pair;
 import org.apache.pulsar.broker.PulsarServerException;
 import org.apache.pulsar.broker.admin.AdminResource;
-import 
org.apache.pulsar.broker.resources.ClusterResources.FailureDomainResources;
 import org.apache.pulsar.broker.web.RestException;
 import org.apache.pulsar.client.admin.PulsarAdmin;
 import org.apache.pulsar.common.naming.Constants;
@@ -686,46 +687,38 @@ public class ClustersBase extends AdminResource {
         @ApiResponse(code = 500, message = "Internal server error.")
     })
     public void deleteNamespaceIsolationPolicy(
-        @ApiParam(
-            value = "The cluster name",
-            required = true
-        )
+        @Suspended AsyncResponse asyncResponse,
+        @ApiParam(value = "The cluster name", required = true)
         @PathParam("cluster") String cluster,
-        @ApiParam(
-            value = "The namespace isolation policy name",
-            required = true
-        )
+        @ApiParam(value = "The namespace isolation policy name", required = 
true)
         @PathParam("policyName") String policyName
-    ) throws Exception {
-        validateSuperUserAccess();
-        validateClusterExists(cluster);
-        validatePoliciesReadOnlyAccess();
-
-        try {
-
-            NamespaceIsolationPolicies nsIsolationPolicies = 
namespaceIsolationPolicies()
-                    .getIsolationDataPolicies(cluster).orElseGet(() -> {
-                        try {
-                            
namespaceIsolationPolicies().setIsolationDataWithCreate(cluster,
-                                    (p) -> Collections.emptyMap());
-                            return new NamespaceIsolationPolicies();
-                        } catch (Exception e) {
-                            throw new RestException(e);
-                        }
-                    });
-
-            nsIsolationPolicies.deletePolicy(policyName);
-            namespaceIsolationPolicies().setIsolationData(cluster, old -> 
nsIsolationPolicies.getPolicies());
-        } catch (NotFoundException nne) {
-            log.warn("[{}] Failed to update 
brokers/{}/namespaceIsolationPolicies: Does not exist", clientAppId(),
-                    cluster);
-            throw new RestException(Status.NOT_FOUND,
-                    "NamespaceIsolationPolicies for cluster " + cluster + " 
does not exist");
-        } catch (Exception e) {
-            log.error("[{}] Failed to update 
brokers/{}/namespaceIsolationPolicies/{}", clientAppId(), cluster,
-                    policyName, e);
-            throw new RestException(e);
-        }
+    ) {
+        validateSuperUserAccessAsync()
+                .thenCompose(__ -> validateClusterExistAsync(cluster, 
PRECONDITION_FAILED))
+                .thenCompose(__ -> validatePoliciesReadOnlyAccessAsync())
+                .thenCompose(__ -> 
namespaceIsolationPolicies().getIsolationDataPoliciesAsync(cluster))
+                .thenCompose(nsIsolationPoliciesOpt -> 
nsIsolationPoliciesOpt.map(CompletableFuture::completedFuture)
+                        .orElseGet(() -> namespaceIsolationPolicies()
+                                .setIsolationDataWithCreateAsync(cluster, (p) 
-> Collections.emptyMap())
+                                .thenApply(__ -> new 
NamespaceIsolationPolicies())))
+                .thenCompose(policies -> {
+                    policies.deletePolicy(policyName);
+                    return 
namespaceIsolationPolicies().setIsolationDataAsync(cluster, old -> 
policies.getPolicies());
+                }).thenAccept(__ -> 
asyncResponse.resume(Response.noContent().build()))
+                .exceptionally(ex -> {
+                    Throwable realCause = 
FutureUtil.unwrapCompletionException(ex);
+                    if (realCause instanceof NotFoundException) {
+                        log.warn("[{}] Failed to update 
brokers/{}/namespaceIsolationPolicies: Does not exist",
+                                clientAppId(), cluster);
+                        asyncResponse.resume(new 
RestException(Status.NOT_FOUND,
+                                "NamespaceIsolationPolicies for cluster " + 
cluster + " does not exist"));
+                        return null;
+                    }
+                    log.error("[{}] Failed to update 
brokers/{}/namespaceIsolationPolicies/{}", clientAppId(), cluster,
+                            policyName, ex);
+                    resumeAsyncResponseExceptionally(asyncResponse, ex);
+                    return null;
+                });
     }
 
     @POST
@@ -742,38 +735,37 @@ public class ClustersBase extends AdminResource {
         @ApiResponse(code = 500, message = "Internal server error.")
     })
     public void setFailureDomain(
-        @ApiParam(
-            value = "The cluster name",
-            required = true
-        )
+        @Suspended AsyncResponse asyncResponse,
+        @ApiParam(value = "The cluster name", required = true)
         @PathParam("cluster") String cluster,
-        @ApiParam(
-            value = "The failure domain name",
-            required = true
-        )
+        @ApiParam(value = "The failure domain name", required = true)
         @PathParam("domainName") String domainName,
-        @ApiParam(
-            value = "The configuration data of a failure domain",
-            required = true
-        )
-                FailureDomainImpl domain
-    ) throws Exception {
-        validateSuperUserAccess();
-        validateClusterExists(cluster);
-        validateBrokerExistsInOtherDomain(cluster, domainName, domain);
-
-        try {
-            clusterResources().getFailureDomainResources()
-                    .setFailureDomainWithCreate(cluster, domainName, old -> 
domain);
-        } catch (NotFoundException nne) {
-            log.warn("[{}] Failed to update domain {}. clusters {}  Does not 
exist", clientAppId(), cluster,
-                    domainName);
-            throw new RestException(Status.NOT_FOUND,
-                    "Domain " + domainName + " for cluster " + cluster + " 
does not exist");
-        } catch (Exception e) {
-            log.error("[{}] Failed to update clusters/{}/domainName/{}", 
clientAppId(), cluster, domainName, e);
-            throw new RestException(e);
-        }
+        @ApiParam(value = "The configuration data of a failure domain", 
required = true) FailureDomainImpl domain
+    ) {
+        validateSuperUserAccessAsync()
+                .thenCompose(__ -> validateClusterExistAsync(cluster, 
PRECONDITION_FAILED))
+                .thenCompose(__ -> validateBrokerExistsInOtherDomain(cluster, 
domainName, domain))
+                .thenCompose(__ -> 
clusterResources().getFailureDomainResources()
+                        .setFailureDomainWithCreateAsync(cluster, domainName, 
old -> domain))
+                .thenAccept(__ -> {
+                    log.info("[{}] Successful set failure domain {} for 
cluster {}",
+                            clientAppId(), domainName, cluster);
+                    asyncResponse.resume(Response.noContent().build());
+                })
+                .exceptionally(ex -> {
+                    Throwable realCause = 
FutureUtil.unwrapCompletionException(ex);
+                    if (realCause instanceof NotFoundException) {
+                        log.warn("[{}] Failed to update domain {}. clusters {} 
 Does not exist", clientAppId(), cluster,
+                                domainName);
+                        asyncResponse.resume(new 
RestException(Status.NOT_FOUND,
+                                "Domain " + domainName + " for cluster " + 
cluster + " does not exist"));
+                        return null;
+                    }
+                    log.error("[{}] Failed to update 
clusters/{}/domainName/{}",
+                            clientAppId(), cluster, domainName, ex);
+                    resumeAsyncResponseExceptionally(asyncResponse, ex);
+                    return null;
+                });
     }
 
     @GET
@@ -788,34 +780,45 @@ public class ClustersBase extends AdminResource {
         @ApiResponse(code = 403, message = "Don't have admin permission"),
         @ApiResponse(code = 500, message = "Internal server error")
     })
-    public Map<String, FailureDomainImpl> getFailureDomains(
-        @ApiParam(
-            value = "The cluster name",
-            required = true
-        )
+    public void getFailureDomains(
+        @Suspended AsyncResponse asyncResponse,
+        @ApiParam(value = "The cluster name", required = true)
         @PathParam("cluster") String cluster
-    ) throws Exception {
-        validateSuperUserAccess();
-
-        Map<String, FailureDomainImpl> domains = Maps.newHashMap();
-        try {
-            FailureDomainResources fdr = 
clusterResources().getFailureDomainResources();
-            for (String domainName : fdr.listFailureDomains(cluster)) {
-                try {
-                    Optional<FailureDomainImpl> domain = 
fdr.getFailureDomain(cluster, domainName);
-                    domain.ifPresent(failureDomain -> domains.put(domainName, 
failureDomain));
-                } catch (Exception e) {
-                    log.warn("Failed to get domain {}", domainName, e);
-                }
-            }
-        } catch (NotFoundException e) {
-            log.warn("[{}] Failure-domain is not configured for cluster {}", 
clientAppId(), cluster, e);
-            return Collections.emptyMap();
-        } catch (Exception e) {
-            log.error("[{}] Failed to get failure-domains for cluster {}", 
clientAppId(), cluster, e);
-            throw new RestException(e);
-        }
-        return domains;
+    ) {
+        validateSuperUserAccessAsync()
+                .thenCompose(__ -> 
clusterResources().getFailureDomainResources()
+                        .listFailureDomainsAsync(cluster)
+                        .thenCompose(domainNames -> {
+                            List<CompletableFuture<Pair<String, 
Optional<FailureDomainImpl>>>> futures =
+                                domainNames.stream()
+                                    .map(domainName -> 
clusterResources().getFailureDomainResources()
+                                            .getFailureDomainAsync(cluster, 
domainName)
+                                            .thenApply(failureDomainImpl -> 
Pair.of(domainName, failureDomainImpl))
+                                            .exceptionally(ex -> {
+                                                log.warn("Failed to get domain 
{}", domainName, ex);
+                                                return null;
+                                            })).collect(Collectors.toList());
+                            return FutureUtil.waitForAll(futures)
+                                    .thenApply(unused -> futures.stream()
+                                            .map(CompletableFuture::join)
+                                            .filter(Objects::nonNull)
+                                            .filter(v -> 
v.getRight().isPresent())
+                                            
.collect(Collectors.toMap(Pair::getLeft, v -> v.getRight().get())));
+                        }).exceptionally(ex -> {
+                            Throwable realCause = 
FutureUtil.unwrapCompletionException(ex);
+                            if (realCause instanceof NotFoundException) {
+                                log.warn("[{}] Failure-domain is not 
configured for cluster {}",
+                                        clientAppId(), cluster, ex);
+                                return Collections.emptyMap();
+                            }
+                            throw FutureUtil.wrapToCompletionException(ex);
+                        })
+                ).thenAccept(asyncResponse::resume)
+                .exceptionally(ex -> {
+                    log.error("[{}] Failed to get failure-domains for cluster 
{}", clientAppId(), cluster, ex);
+                    resumeAsyncResponseExceptionally(asyncResponse, ex);
+                    return null;
+                });
     }
 
     @GET
@@ -897,42 +900,49 @@ public class ClustersBase extends AdminResource {
         }
     }
 
-    private void validateBrokerExistsInOtherDomain(final String cluster, final 
String inputDomainName,
-            final FailureDomainImpl inputDomain) {
-        if (inputDomain != null && inputDomain.brokers != null) {
-            try {
-                for (String domainName : 
clusterResources().getFailureDomainResources()
-                        .listFailureDomains(cluster)) {
-                    if (inputDomainName.equals(domainName)) {
-                        continue;
-                    }
-                    try {
-                        Optional<FailureDomainImpl> domain =
-                                
clusterResources().getFailureDomainResources().getFailureDomain(cluster, 
domainName);
-                        if (domain.isPresent() && domain.get().brokers != 
null) {
-                            List<String> duplicateBrokers = 
domain.get().brokers.stream().parallel()
-                                    
.filter(inputDomain.brokers::contains).collect(Collectors.toList());
-                            if (!duplicateBrokers.isEmpty()) {
-                                throw new RestException(Status.CONFLICT,
-                                        duplicateBrokers + " already exists in 
" + domainName);
-                            }
-                        }
-                    } catch (Exception e) {
-                        if (e instanceof RestException) {
-                            throw e;
-                        }
-                        log.warn("Failed to get domain {}", domainName, e);
-                    }
-                }
-            } catch (NotFoundException e) {
-                if (log.isDebugEnabled()) {
-                    log.debug("[{}] Domain is not configured for cluster", 
clientAppId(), e);
-                }
-            } catch (Exception e) {
-                log.error("[{}] Failed to get domains for cluster {}", 
clientAppId(), e);
-                throw new RestException(e);
-            }
+    private CompletableFuture<Void> validateBrokerExistsInOtherDomain(final 
String cluster,
+                                                                      final 
String inputDomainName,
+                                                                      final 
FailureDomainImpl inputDomain) {
+        if (inputDomain == null || inputDomain.brokers == null) {
+            return CompletableFuture.completedFuture(null);
         }
+        return clusterResources().getFailureDomainResources()
+                .listFailureDomainsAsync(cluster)
+                .thenCompose(domainNames -> {
+                    List<CompletableFuture<Void>> futures = 
domainNames.stream()
+                            .filter(domainName -> 
!domainName.equals(inputDomainName))
+                            .map(domainName -> clusterResources()
+                                    
.getFailureDomainResources().getFailureDomainAsync(cluster, domainName)
+                                    .thenAccept(failureDomainOpt -> {
+                                        if (failureDomainOpt.isPresent()
+                                                && 
CollectionUtils.isNotEmpty(failureDomainOpt.get().getBrokers())) {
+                                            List<String> duplicateBrokers = 
failureDomainOpt.get()
+                                                    
.getBrokers().stream().parallel()
+                                                    
.filter(inputDomain.brokers::contains)
+                                                    
.collect(Collectors.toList());
+                                            if 
(CollectionUtils.isNotEmpty(duplicateBrokers)) {
+                                                throw new 
RestException(Status.CONFLICT,
+                                                        duplicateBrokers + " 
already exists in " + domainName);
+                                            }
+                                        }
+                                    }).exceptionally(ex -> {
+                                        Throwable realCause = 
FutureUtil.unwrapCompletionException(ex);
+                                        if (realCause instanceof 
WebApplicationException) {
+                                            throw 
FutureUtil.wrapToCompletionException(ex);
+                                        }
+                                        if (realCause instanceof 
NotFoundException) {
+                                            if (log.isDebugEnabled()) {
+                                                log.debug("[{}] Domain is not 
configured for cluster",
+                                                        clientAppId(), ex);
+                                            }
+                                            return null;
+                                        }
+                                        log.warn("Failed to get domain {}", 
domainName, ex);
+                                        return null;
+                                    })
+                            ).collect(Collectors.toList());
+                    return FutureUtil.waitForAll(futures);
+                });
     }
 
     private static final Logger log = 
LoggerFactory.getLogger(ClustersBase.class);
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminTest.java 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminTest.java
index ff41edfb48b..c6b1fbd65bf 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminTest.java
@@ -264,7 +264,7 @@ public class AdminTest extends MockedPulsarServiceBaseTest {
             assertEquals(e.getResponse().getStatus(), 412);
         }
 
-        clusters.deleteNamespaceIsolationPolicy("use", "policy1");
+        asyncRequests(ctx -> clusters.deleteNamespaceIsolationPolicy(ctx, 
"use", "policy1"));
         assertTrue(((Map<String, NamespaceIsolationDataImpl>) 
asyncRequests(ctx ->
                 clusters.getNamespaceIsolationPolicies(ctx, 
"use"))).isEmpty());
 
@@ -403,8 +403,7 @@ public class AdminTest extends MockedPulsarServiceBaseTest {
         } catch (RestException e) {
             assertEquals(e.getResponse().getStatus(), 
Status.PRECONDITION_FAILED.getStatusCode());
         }
-        verify(clusters, times(23)).validateSuperUserAccessAsync();
-        verify(clusters, times(1)).validateSuperUserAccess();
+        verify(clusters, times(24)).validateSuperUserAccessAsync();
     }
 
     @Test

Reply via email to