This is an automated email from the ASF dual-hosted git repository.

penghui pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new a98593607dc [improve][broker] Make some methods of `ClusterBase` pure 
async. (#15527)
a98593607dc is described below

commit a98593607dc25dc739c2558452c417b970e5b23a
Author: Qiang Zhao <[email protected]>
AuthorDate: Fri May 20 08:47:41 2022 +0800

    [improve][broker] Make some methods of `ClusterBase` pure async. (#15527)
---
 .../broker/resources/NamespaceResources.java       |  15 +
 .../pulsar/broker/admin/impl/ClustersBase.java     | 303 ++++++++-------------
 .../org/apache/pulsar/broker/admin/AdminTest.java  |   8 +-
 .../org/apache/pulsar/common/util/FutureUtil.java  |  11 +
 4 files changed, 143 insertions(+), 194 deletions(-)

diff --git 
a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/resources/NamespaceResources.java
 
b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/resources/NamespaceResources.java
index c24df6c586f..2223e951f66 100644
--- 
a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/resources/NamespaceResources.java
+++ 
b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/resources/NamespaceResources.java
@@ -212,6 +212,21 @@ public class NamespaceResources extends 
BaseResources<Policies> {
             set(joinPath(BASE_CLUSTERS_PATH, cluster, 
NAMESPACE_ISOLATION_POLICIES), modifyFunction);
         }
 
+        public CompletableFuture<Void> setIsolationDataAsync(String cluster,
+                                                             
Function<Map<String, NamespaceIsolationDataImpl>,
+                                                             Map<String, 
NamespaceIsolationDataImpl>> modifyFunction) {
+            return setAsync(joinPath(BASE_CLUSTERS_PATH, cluster, 
NAMESPACE_ISOLATION_POLICIES), modifyFunction);
+        }
+
+        public CompletableFuture<Void> setIsolationDataWithCreateAsync(String 
cluster,
+                                                                       
Function<Optional<Map<String,
+                                                                       
NamespaceIsolationDataImpl>>,
+                                                                       
Map<String, NamespaceIsolationDataImpl>>
+                                                                               
createFunction) {
+            return setWithCreateAsync(joinPath(BASE_CLUSTERS_PATH, cluster, 
NAMESPACE_ISOLATION_POLICIES),
+                    createFunction);
+        }
+
         public void setIsolationDataWithCreate(String cluster,
                                      Function<Optional<Map<String, 
NamespaceIsolationDataImpl>>, Map<String,
                                              NamespaceIsolationDataImpl>> 
createFunction)
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/ClustersBase.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/ClustersBase.java
index 80309b2a026..a1fd8a24733 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/ClustersBase.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/ClustersBase.java
@@ -18,7 +18,7 @@
  */
 package org.apache.pulsar.broker.admin.impl;
 
-import com.google.common.collect.Lists;
+import static javax.ws.rs.core.Response.Status.PRECONDITION_FAILED;
 import com.google.common.collect.Maps;
 import io.swagger.annotations.ApiOperation;
 import io.swagger.annotations.ApiParam;
@@ -33,8 +33,8 @@ import java.util.List;
 import java.util.Map;
 import java.util.Optional;
 import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.atomic.AtomicInteger;
 import java.util.stream.Collectors;
+import java.util.stream.Stream;
 import javax.ws.rs.DELETE;
 import javax.ws.rs.GET;
 import javax.ws.rs.POST;
@@ -46,11 +46,13 @@ import javax.ws.rs.container.Suspended;
 import javax.ws.rs.core.MediaType;
 import javax.ws.rs.core.Response;
 import javax.ws.rs.core.Response.Status;
+import org.apache.bookkeeper.common.util.JsonUtil;
 import org.apache.commons.collections4.CollectionUtils;
+import org.apache.pulsar.broker.PulsarServerException;
 import org.apache.pulsar.broker.admin.AdminResource;
 import 
org.apache.pulsar.broker.resources.ClusterResources.FailureDomainResources;
 import org.apache.pulsar.broker.web.RestException;
-import org.apache.pulsar.client.admin.Namespaces;
+import org.apache.pulsar.client.admin.PulsarAdmin;
 import org.apache.pulsar.common.naming.Constants;
 import org.apache.pulsar.common.naming.NamedEntity;
 import org.apache.pulsar.common.policies.data.BrokerNamespaceIsolationData;
@@ -58,12 +60,10 @@ import 
org.apache.pulsar.common.policies.data.BrokerNamespaceIsolationDataImpl;
 import org.apache.pulsar.common.policies.data.ClusterData;
 import org.apache.pulsar.common.policies.data.ClusterDataImpl;
 import org.apache.pulsar.common.policies.data.FailureDomainImpl;
-import org.apache.pulsar.common.policies.data.NamespaceIsolationData;
 import org.apache.pulsar.common.policies.data.NamespaceIsolationDataImpl;
 import org.apache.pulsar.common.policies.impl.NamespaceIsolationPolicies;
 import org.apache.pulsar.common.policies.impl.NamespaceIsolationPolicyImpl;
 import org.apache.pulsar.common.util.FutureUtil;
-import org.apache.pulsar.common.util.ObjectMapperFactory;
 import org.apache.pulsar.metadata.api.MetadataStoreException;
 import org.apache.pulsar.metadata.api.MetadataStoreException.NotFoundException;
 import org.slf4j.Logger;
@@ -170,7 +170,7 @@ public class ClustersBase extends AdminResource {
                     log.error("[{}] Failed to create cluster {}", 
clientAppId(), cluster, ex);
                     Throwable realCause = 
FutureUtil.unwrapCompletionException(ex);
                     if (realCause instanceof IllegalArgumentException) {
-                        asyncResponse.resume(new 
RestException(Status.PRECONDITION_FAILED,
+                        asyncResponse.resume(new 
RestException(PRECONDITION_FAILED,
                                 "Cluster name is not valid"));
                         return null;
                     }
@@ -278,13 +278,13 @@ public class ClustersBase extends AdminResource {
         if (CollectionUtils.isNotEmpty(peerClusterNames)) {
             future = 
FutureUtil.waitForAll(peerClusterNames.stream().map(peerCluster -> {
                 if (cluster.equalsIgnoreCase(peerCluster)) {
-                    return FutureUtil.failedFuture(new 
RestException(Status.PRECONDITION_FAILED,
+                    return FutureUtil.failedFuture(new 
RestException(PRECONDITION_FAILED,
                             cluster + " itself can't be part of peer-list"));
                 }
                 return clusterResources().getClusterAsync(peerCluster)
                         .thenAccept(peerClusterOpt -> {
                             if (!peerClusterOpt.isPresent()) {
-                                throw new 
RestException(Status.PRECONDITION_FAILED,
+                                throw new RestException(PRECONDITION_FAILED,
                                         "Peer cluster " + peerCluster + " does 
not exist");
                             }
                         });
@@ -365,14 +365,14 @@ public class ClustersBase extends AdminResource {
         return 
pulsar().getPulsarResources().getClusterResources().isClusterUsedAsync(cluster)
                 .thenCompose(isClusterUsed -> {
                     if (isClusterUsed) {
-                        throw new RestException(Status.PRECONDITION_FAILED, 
"Cluster not empty");
+                        throw new RestException(PRECONDITION_FAILED, "Cluster 
not empty");
                     }
                     // check the namespaceIsolationPolicies associated with 
the cluster
                     return 
namespaceIsolationPolicies().getIsolationDataPoliciesAsync(cluster);
                 }).thenCompose(nsIsolationPoliciesOpt -> {
                     if (nsIsolationPoliciesOpt.isPresent()) {
                         if 
(!nsIsolationPoliciesOpt.get().getPolicies().isEmpty()) {
-                            throw new 
RestException(Status.PRECONDITION_FAILED, "Cluster not empty");
+                            throw new RestException(PRECONDITION_FAILED, 
"Cluster not empty");
                         }
                         // Need to delete the isolation policies if present
                         return 
namespaceIsolationPolicies().deleteIsolationDataAsync(cluster);
@@ -509,9 +509,10 @@ public class ClustersBase extends AdminResource {
                 });
     }
 
+
     private BrokerNamespaceIsolationData internalGetBrokerNsIsolationData(
-                                                                    String 
broker,
-                                                                    
Map<String, NamespaceIsolationDataImpl> policies) {
+            String broker,
+            Map<String, NamespaceIsolationDataImpl> policies) {
         BrokerNamespaceIsolationData.Builder brokerIsolationData =
                 BrokerNamespaceIsolationData.builder().brokerName(broker);
         if (policies == null) {
@@ -543,49 +544,23 @@ public class ClustersBase extends AdminResource {
         @ApiResponse(code = 412, message = "Cluster doesn't exist."),
         @ApiResponse(code = 500, message = "Internal server error.")
     })
-    public BrokerNamespaceIsolationData getBrokerWithNamespaceIsolationPolicy(
-        @ApiParam(
-            value = "The cluster name",
-            required = true
-        )
+    public void getBrokerWithNamespaceIsolationPolicy(
+        @Suspended AsyncResponse asyncResponse,
+        @ApiParam(value = "The cluster name", required = true)
         @PathParam("cluster") String cluster,
-        @ApiParam(
-            value = "The broker name (<broker-hostname>:<web-service-port>)",
-            required = true,
-            example = "broker1:8080"
-        )
+        @ApiParam(value = "The broker name 
(<broker-hostname>:<web-service-port>)", required = true,
+            example = "broker1:8080")
         @PathParam("broker") String broker) {
-        validateSuperUserAccess();
-        validateClusterExists(cluster);
-
-        Map<String, ? extends NamespaceIsolationData> nsPolicies;
-        try {
-            Optional<NamespaceIsolationPolicies> nsPoliciesResult = 
namespaceIsolationPolicies()
-                    .getIsolationDataPolicies(cluster);
-            if (!nsPoliciesResult.isPresent()) {
-                throw new RestException(Status.NOT_FOUND, "namespace-isolation 
policies not found for " + cluster);
-            }
-            nsPolicies = nsPoliciesResult.get().getPolicies();
-        } catch (Exception e) {
-            log.error("[{}] Failed to get namespace isolation-policies {}", 
clientAppId(), cluster, e);
-            throw new RestException(e);
-        }
-        BrokerNamespaceIsolationData.Builder brokerIsolationData = 
BrokerNamespaceIsolationData.builder()
-                .brokerName(broker);
-        if (nsPolicies != null) {
-            List<String> namespaceRegexes = new ArrayList<>();
-            nsPolicies.forEach((name, policyData) -> {
-                NamespaceIsolationPolicyImpl nsPolicyImpl = new 
NamespaceIsolationPolicyImpl(policyData);
-                boolean isPrimary = nsPolicyImpl.isPrimaryBroker(broker);
-                if (isPrimary || nsPolicyImpl.isSecondaryBroker(broker)) {
-                    namespaceRegexes.addAll(policyData.getNamespaces());
-                    brokerIsolationData.primary(isPrimary);
-                    brokerIsolationData.policyName(name);
-                }
-            });
-            brokerIsolationData.namespaceRegex(namespaceRegexes);
-        }
-        return brokerIsolationData.build();
+        validateSuperUserAccessAsync()
+                .thenCompose(__ -> validateClusterExistAsync(cluster, 
PRECONDITION_FAILED))
+                .thenCompose(__ -> 
internalGetNamespaceIsolationPolicies(cluster))
+                .thenApply(policies -> 
internalGetBrokerNsIsolationData(broker, policies))
+                .thenAccept(asyncResponse::resume)
+                .exceptionally(ex -> {
+                    log.error("[{}] Failed to get namespace isolation-policies 
{}", clientAppId(), cluster, ex);
+                    resumeAsyncResponseExceptionally(asyncResponse, ex);
+                    return null;
+                });
     }
 
     @POST
@@ -603,150 +578,99 @@ public class ClustersBase extends AdminResource {
     })
     public void setNamespaceIsolationPolicy(
         @Suspended final AsyncResponse asyncResponse,
-        @ApiParam(
-            value = "The cluster name",
-            required = true
-        )
+        @ApiParam(value = "The cluster name", required = true)
         @PathParam("cluster") String cluster,
-        @ApiParam(
-            value = "The namespace isolation policy name",
-            required = true
-        )
+        @ApiParam(value = "The namespace isolation policy name", required = 
true)
         @PathParam("policyName") String policyName,
-        @ApiParam(
-            value = "The namespace isolation policy data",
-            required = true
-        )
-                NamespaceIsolationDataImpl policyData
+        @ApiParam(value = "The namespace isolation policy data", required = 
true)
+        NamespaceIsolationDataImpl policyData
     ) {
-        validateSuperUserAccess();
-        validateClusterExists(cluster);
-        validatePoliciesReadOnlyAccess();
-
-        String jsonInput = null;
-        try {
-            // validate the policy data before creating the node
-            policyData.validate();
-            jsonInput = 
ObjectMapperFactory.create().writeValueAsString(policyData);
-
-            NamespaceIsolationPolicies nsIsolationPolicies = 
namespaceIsolationPolicies()
-                    .getIsolationDataPolicies(cluster).orElseGet(() -> {
+        validateSuperUserAccessAsync()
+                .thenCompose(__ -> validatePoliciesReadOnlyAccessAsync())
+                .thenCompose(__ -> validateClusterExistAsync(cluster, 
PRECONDITION_FAILED))
+                .thenCompose(__ -> {
+                    // validate the policy data before creating the node
+                    policyData.validate();
+                    return 
namespaceIsolationPolicies().getIsolationDataPoliciesAsync(cluster);
+                }).thenCompose(nsIsolationPoliciesOpt ->
+                        
nsIsolationPoliciesOpt.map(CompletableFuture::completedFuture)
+                                .orElseGet(() -> namespaceIsolationPolicies()
+                                        
.setIsolationDataWithCreateAsync(cluster, (p) -> Collections.emptyMap())
+                                        .thenApply(__ -> new 
NamespaceIsolationPolicies()))
+                ).thenCompose(nsIsolationPolicies -> {
+                    nsIsolationPolicies.setPolicy(policyName, policyData);
+                    return namespaceIsolationPolicies()
+                                    .setIsolationDataAsync(cluster, old -> 
nsIsolationPolicies.getPolicies());
+                }).thenCompose(__ -> 
filterAndUnloadMatchedNamespaceAsync(policyData))
+                .thenAccept(__ -> {
+                    log.info("[{}] Successful to update 
clusters/{}/namespaceIsolationPolicies/{}.",
+                            clientAppId(), cluster, policyName);
+                    asyncResponse.resume(Response.noContent().build());
+                }).exceptionally(ex -> {
+                    Throwable realCause = 
FutureUtil.unwrapCompletionException(ex);
+                    if (realCause instanceof IllegalArgumentException) {
+                        String jsonData;
                         try {
-                            
namespaceIsolationPolicies().setIsolationDataWithCreate(cluster,
-                                    (p) -> Collections.emptyMap());
-                            return new NamespaceIsolationPolicies();
-                        } catch (Exception e) {
-                            throw new RestException(e);
+                            jsonData = JsonUtil.toJson(policyData);
+                        } catch (JsonUtil.ParseJsonException e) {
+                            jsonData = "[Failed to serialize]";
                         }
-                    });
-
-            nsIsolationPolicies.setPolicy(policyName, policyData);
-            namespaceIsolationPolicies().setIsolationData(cluster, old -> 
nsIsolationPolicies.getPolicies());
-
-            // whether or not make the isolation update on time.
-            if 
(pulsar().getConfiguration().isEnableNamespaceIsolationUpdateOnTime()) {
-                filterAndUnloadMatchedNameSpaces(asyncResponse, policyData);
-            } else {
-                asyncResponse.resume(Response.noContent().build());
-                return;
-            }
-        } catch (IllegalArgumentException iae) {
-            log.info("[{}] Failed to update 
clusters/{}/namespaceIsolationPolicies/{}. Input data is invalid",
-                    clientAppId(), cluster, policyName, iae);
-            asyncResponse.resume(new RestException(Status.BAD_REQUEST,
-                    "Invalid format of input policy data. policy: " + 
policyName + "; data: " + jsonInput));
-        } catch (NotFoundException nne) {
-            log.warn("[{}] Failed to update 
clusters/{}/namespaceIsolationPolicies: Does not exist", clientAppId(),
-                    cluster);
-            asyncResponse.resume(new RestException(Status.NOT_FOUND,
-                    "NamespaceIsolationPolicies for cluster " + cluster + " 
does not exist"));
-        } catch (Exception e) {
-            log.error("[{}] Failed to update 
clusters/{}/namespaceIsolationPolicies/{}", clientAppId(), cluster,
-                    policyName, e);
-            asyncResponse.resume(new RestException(e));
-        }
-    }
-
-    // get matched namespaces; call unload for each namespaces;
-    private void filterAndUnloadMatchedNameSpaces(AsyncResponse asyncResponse,
-                                                  NamespaceIsolationDataImpl 
policyData) throws Exception {
-        Namespaces namespaces = pulsar().getAdminClient().namespaces();
-
-        List<String> nssToUnload = Lists.newArrayList();
-
-        pulsar().getAdminClient().tenants().getTenantsAsync()
-            .whenComplete((tenants, ex) -> {
-                if (ex != null) {
-                    log.error("[{}] Failed to get tenants when 
setNamespaceIsolationPolicy.", clientAppId(), ex);
-                    return;
-                }
-                AtomicInteger tenantsNumber = new 
AtomicInteger(tenants.size());
-                // get all tenants now, for each tenants, get its namespaces
-                tenants.forEach(tenant -> namespaces.getNamespacesAsync(tenant)
-                    .whenComplete((nss, e) -> {
-                        int leftTenantsToHandle = 
tenantsNumber.decrementAndGet();
-                        if (e != null) {
-                            log.error("[{}] Failed to get namespaces for 
tenant {} when setNamespaceIsolationPolicy.",
-                                clientAppId(), tenant, e);
-
-                            if (leftTenantsToHandle == 0) {
-                                unloadMatchedNamespacesList(asyncResponse, 
nssToUnload, namespaces);
-                            }
-
-                            return;
-                        }
-
-                        AtomicInteger nssNumber = new 
AtomicInteger(nss.size());
-
-                        // get all namespaces for this tenant now.
-                        nss.forEach(namespaceName -> {
-                            int leftNssToHandle = nssNumber.decrementAndGet();
-
-                            // if namespace match any policy regex, add it to 
ns list to be unload.
-                            if (policyData.getNamespaces().stream()
-                                .anyMatch(nsnameRegex -> 
namespaceName.matches(nsnameRegex))) {
-                                nssToUnload.add(namespaceName);
-                            }
-
-                            // all the tenants & namespaces get filtered.
-                            if (leftNssToHandle == 0 && leftTenantsToHandle == 
0) {
-                                unloadMatchedNamespacesList(asyncResponse, 
nssToUnload, namespaces);
-                            }
-                        });
-                    }));
-            });
+                        asyncResponse.resume(new 
RestException(Status.BAD_REQUEST,
+                                "Invalid format of input policy data. policy: 
" + policyName + "; data: " + jsonData));
+                        return null;
+                    } else if (realCause instanceof NotFoundException) {
+                        log.warn("[{}] Failed to update 
clusters/{}/namespaceIsolationPolicies: Does not exist",
+                                clientAppId(), cluster);
+                        asyncResponse.resume(new 
RestException(Status.NOT_FOUND,
+                                "NamespaceIsolationPolicies for cluster " + 
cluster + " does not exist"));
+                        return null;
+                    }
+                    log.info("[{}] Failed to update 
clusters/{}/namespaceIsolationPolicies/{}. Input data is invalid",
+                            clientAppId(), cluster, policyName, realCause);
+                    resumeAsyncResponseExceptionally(asyncResponse, ex);
+                    return null;
+                });
     }
 
-    private void unloadMatchedNamespacesList(AsyncResponse asyncResponse,
-                                             List<String> nssToUnload,
-                                             Namespaces namespaces) {
-        if (nssToUnload.size() == 0) {
-            asyncResponse.resume(Response.noContent().build());
-            return;
+    /**
+     * Get matched namespaces; call unload for each namespaces.
+     */
+    private CompletableFuture<Void> 
filterAndUnloadMatchedNamespaceAsync(NamespaceIsolationDataImpl policyData) {
+        PulsarAdmin adminClient;
+        try {
+            adminClient = pulsar().getAdminClient();
+        } catch (PulsarServerException e) {
+            return FutureUtil.failedFuture(e);
         }
-
-        List<CompletableFuture<Void>> futures = nssToUnload.stream()
-            .map(namespaceName -> namespaces.unloadAsync(namespaceName))
-            .collect(Collectors.toList());
-
-        FutureUtil.waitForAll(futures).whenComplete((result, exception) -> {
-            if (exception != null) {
-                log.error("[{}] Failed to unload namespace while 
setNamespaceIsolationPolicy.",
-                    clientAppId(), exception);
-                asyncResponse.resume(new RestException(exception));
-                return;
-            }
-
-            try {
-                // write load info to load manager to make the load happens 
fast
-                
pulsar().getLoadManager().get().writeLoadReportOnZookeeper(true);
-            } catch (Exception e) {
-                log.warn("[{}] Failed to writeLoadReportOnZookeeper.", 
clientAppId(), e);
-            }
-
-            asyncResponse.resume(Response.noContent().build());
-            return;
-        });
+        return adminClient.tenants().getTenantsAsync()
+                .thenCompose(tenants -> {
+                    Stream<CompletableFuture<List<String>>> 
completableFutureStream = tenants.stream()
+                            .map(tenant -> 
adminClient.namespaces().getNamespacesAsync(tenant));
+                    return FutureUtil.waitForAll(completableFutureStream)
+                            .thenApply(namespaces -> {
+                                // if namespace match any policy regex, add it 
to ns list to be unload.
+                                return namespaces.stream()
+                                        .filter(namespaceName ->
+                                                
policyData.getNamespaces().stream().anyMatch(namespaceName::matches))
+                                        .collect(Collectors.toList());
+                            });
+                }).thenCompose(shouldUnloadNamespaces -> {
+                    if (CollectionUtils.isEmpty(shouldUnloadNamespaces)) {
+                        return CompletableFuture.completedFuture(null);
+                    }
+                    List<CompletableFuture<Void>> futures = 
shouldUnloadNamespaces.stream()
+                            .map(namespaceName -> 
adminClient.namespaces().unloadAsync(namespaceName))
+                            .collect(Collectors.toList());
+                    return FutureUtil.waitForAll(futures)
+                            .thenAccept(__ -> {
+                                try {
+                                    // write load info to load manager to make 
the load happens fast
+                                    
pulsar().getLoadManager().get().writeLoadReportOnZookeeper(true);
+                                } catch (Exception e) {
+                                    log.warn("[{}] Failed to 
writeLoadReportOnZookeeper.", clientAppId(), e);
+                                }
+                            });
+                });
     }
 
     @DELETE
@@ -1012,5 +936,4 @@ public class ClustersBase extends AdminResource {
     }
 
     private static final Logger log = 
LoggerFactory.getLogger(ClustersBase.class);
-
 }
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminTest.java 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminTest.java
index 3f28f9d8e44..ff41edfb48b 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminTest.java
@@ -253,8 +253,8 @@ public class AdminTest extends MockedPulsarServiceBaseTest {
                         .parameters(parameters1)
                         .build())
                 .build();
-        AsyncResponse response = mock(AsyncResponse.class);
-        clusters.setNamespaceIsolationPolicy(response,"use", "policy1", 
policyData);
+        asyncRequests(ctx -> clusters.setNamespaceIsolationPolicy(ctx,
+                "use", "policy1", policyData));
         asyncRequests(ctx -> clusters.getNamespaceIsolationPolicies(ctx, 
"use"));
 
         try {
@@ -403,8 +403,8 @@ public class AdminTest extends MockedPulsarServiceBaseTest {
         } catch (RestException e) {
             assertEquals(e.getResponse().getStatus(), 
Status.PRECONDITION_FAILED.getStatusCode());
         }
-        verify(clusters, times(22)).validateSuperUserAccessAsync();
-        verify(clusters, times(2)).validateSuperUserAccess();
+        verify(clusters, times(23)).validateSuperUserAccessAsync();
+        verify(clusters, times(1)).validateSuperUserAccess();
     }
 
     @Test
diff --git 
a/pulsar-common/src/main/java/org/apache/pulsar/common/util/FutureUtil.java 
b/pulsar-common/src/main/java/org/apache/pulsar/common/util/FutureUtil.java
index e5c2caeb7d0..afad61eb669 100644
--- a/pulsar-common/src/main/java/org/apache/pulsar/common/util/FutureUtil.java
+++ b/pulsar-common/src/main/java/org/apache/pulsar/common/util/FutureUtil.java
@@ -19,7 +19,9 @@
 package org.apache.pulsar.common.util;
 
 import java.time.Duration;
+import java.util.ArrayList;
 import java.util.Collection;
+import java.util.List;
 import java.util.Optional;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.CompletionException;
@@ -31,6 +33,7 @@ import java.util.concurrent.TimeoutException;
 import java.util.function.Predicate;
 import java.util.function.Supplier;
 import java.util.stream.Collectors;
+import java.util.stream.Stream;
 
 /**
  * This class is aimed at simplifying work with {@code CompletableFuture}.
@@ -47,6 +50,14 @@ public class FutureUtil {
         return CompletableFuture.allOf(futures.toArray(new 
CompletableFuture[0]));
     }
 
+    public static <T> CompletableFuture<List<T>> 
waitForAll(Stream<CompletableFuture<List<T>>> futures) {
+        return futures.reduce(CompletableFuture.completedFuture(new 
ArrayList<>()),
+                (pre, curr) -> pre.thenCompose(preV -> curr.thenApply(currV -> 
{
+                    preV.addAll(currV);
+                    return preV;
+                })));
+    }
+
     /**
      * Return a future that represents the completion of any future in the 
provided Collection.
      *

Reply via email to