This is an automated email from the ASF dual-hosted git repository.

technoboy pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new eb1daaeb05a Make some operation replication clusters methods in 
NamespacesBase async (#15760)
eb1daaeb05a is described below

commit eb1daaeb05a351c5b832a3f49e1f96e8ba4df3f1
Author: Baodi Shi <[email protected]>
AuthorDate: Thu Jun 2 09:17:54 2022 +0800

    Make some operation replication clusters methods in NamespacesBase async 
(#15760)
---
 .../apache/pulsar/broker/admin/AdminResource.java  |  9 +++
 .../pulsar/broker/admin/impl/NamespacesBase.java   | 81 ++++++++++++----------
 .../apache/pulsar/broker/admin/v1/Namespaces.java  | 35 +++++++---
 .../apache/pulsar/broker/admin/v2/Namespaces.java  | 28 ++++++--
 .../pulsar/broker/web/PulsarWebResource.java       | 22 ++++++
 .../apache/pulsar/broker/admin/NamespacesTest.java | 78 +++++++++++----------
 6 files changed, 166 insertions(+), 87 deletions(-)

diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/AdminResource.java 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/AdminResource.java
index 43addc30af7..3743a6e9ed9 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/AdminResource.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/AdminResource.java
@@ -426,6 +426,15 @@ public abstract class AdminResource extends 
PulsarWebResource {
         }
     }
 
+    protected CompletableFuture<Set<String>> clustersAsync() {
+        return clusterResources().listAsync()
+                .thenApply(list ->
+                        list.stream()
+                                .filter(cluster -> 
!Constants.GLOBAL_CLUSTER.equals(cluster))
+                                .collect(Collectors.toSet())
+                );
+    }
+
     protected void setServletContext(ServletContext servletContext) {
         this.servletContext = servletContext;
     }
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java
index b143a03038b..af6f465e433 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java
@@ -751,45 +751,52 @@ public abstract class NamespacesBase extends 
AdminResource {
         }
     }
 
-    protected Set<String> internalGetNamespaceReplicationClusters() {
-        validateNamespacePolicyOperation(namespaceName, 
PolicyName.REPLICATION, PolicyOperation.READ);
-
-        if (!namespaceName.isGlobal()) {
-            throw new RestException(Status.PRECONDITION_FAILED,
-                    "Cannot get the replication clusters for a non-global 
namespace");
-        }
-
-        Policies policies = getNamespacePolicies(namespaceName);
-        return policies.replication_clusters;
+    protected CompletableFuture<Set<String>> 
internalGetNamespaceReplicationClustersAsync() {
+        return validateNamespacePolicyOperationAsync(namespaceName, 
PolicyName.REPLICATION, PolicyOperation.READ)
+                .thenAccept(__ -> {
+                    if (!namespaceName.isGlobal()) {
+                        throw new RestException(Status.PRECONDITION_FAILED,
+                                "Cannot get the replication clusters for a 
non-global namespace");
+                    }
+                }).thenCompose(__ -> getNamespacePoliciesAsync(namespaceName))
+                .thenApply(policies -> policies.replication_clusters);
     }
 
-    protected void internalSetNamespaceReplicationClusters(List<String> 
clusterIds) {
-        validateNamespacePolicyOperation(namespaceName, 
PolicyName.REPLICATION, PolicyOperation.WRITE);
-        validatePoliciesReadOnlyAccess();
-        checkNotNull(clusterIds, "ClusterIds should not be null");
-
-        Set<String> replicationClusterSet = Sets.newHashSet(clusterIds);
-        if (!namespaceName.isGlobal()) {
-            throw new RestException(Status.PRECONDITION_FAILED, "Cannot set 
replication on a non-global namespace");
-        }
-
-        if (replicationClusterSet.contains("global")) {
-            throw new RestException(Status.PRECONDITION_FAILED,
-                    "Cannot specify global in the list of replication 
clusters");
-        }
-
-        Set<String> clusters = clusters();
-        for (String clusterId : replicationClusterSet) {
-            if (!clusters.contains(clusterId)) {
-                throw new RestException(Status.FORBIDDEN, "Invalid cluster id: 
" + clusterId);
-            }
-            validatePeerClusterConflict(clusterId, replicationClusterSet);
-            validateClusterForTenant(namespaceName.getTenant(), clusterId);
-        }
-        updatePolicies(namespaceName, policies ->{
-            policies.replication_clusters = replicationClusterSet;
-            return policies;
-        });
+    @SuppressWarnings("checkstyle:WhitespaceAfter")
+    protected CompletableFuture<Void> 
internalSetNamespaceReplicationClusters(List<String> clusterIds) {
+        return validateNamespacePolicyOperationAsync(namespaceName, 
PolicyName.REPLICATION, PolicyOperation.WRITE)
+                .thenCompose(__ -> validatePoliciesReadOnlyAccessAsync())
+                .thenApply(__ -> {
+                    checkNotNull(clusterIds, "ClusterIds should not be null");
+                    if (!namespaceName.isGlobal()) {
+                        throw new RestException(Status.PRECONDITION_FAILED,
+                                "Cannot set replication on a non-global 
namespace");
+                    }
+                    Set<String> replicationClusterSet = 
Sets.newHashSet(clusterIds);
+                    if (replicationClusterSet.contains("global")) {
+                        throw new RestException(Status.PRECONDITION_FAILED,
+                                "Cannot specify global in the list of 
replication clusters");
+                    }
+                    return replicationClusterSet;
+                }).thenCompose(replicationClusterSet -> clustersAsync()
+                        .thenCompose(clusters -> {
+                            List<CompletableFuture<Void>> futures =
+                                    
replicationClusterSet.stream().map(clusterId -> {
+                                        if (!clusters.contains(clusterId)) {
+                                            throw new 
RestException(Status.FORBIDDEN,
+                                                    "Invalid cluster id: " + 
clusterId);
+                                        }
+                                        return 
validatePeerClusterConflictAsync(clusterId, replicationClusterSet)
+                                                .thenCompose(__ ->
+                                                        
validateClusterForTenantAsync(
+                                                                
namespaceName.getTenant(), clusterId));
+                                    }).collect(Collectors.toList());
+                            return FutureUtil.waitForAll(futures).thenApply(__ 
-> replicationClusterSet);
+                        }))
+                .thenCompose(replicationClusterSet -> 
updatePoliciesAsync(namespaceName, policies -> {
+                    policies.replication_clusters = replicationClusterSet;
+                    return policies;
+                }));
     }
 
     protected CompletableFuture<Void> 
internalSetNamespaceMessageTTLAsync(Integer messageTTL) {
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/Namespaces.java 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/Namespaces.java
index fc207efd7a7..ba95927dcd5 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/Namespaces.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/Namespaces.java
@@ -363,13 +363,21 @@ public class Namespaces extends NamespacesBase {
     @ApiResponses(value = {@ApiResponse(code = 403, message = "Don't have 
admin permission"),
             @ApiResponse(code = 404, message = "Property or cluster or 
namespace doesn't exist"),
             @ApiResponse(code = 412, message = "Namespace is not global")})
-    public Set<String> getNamespaceReplicationClusters(@PathParam("property") 
String property,
-            @PathParam("cluster") String cluster, @PathParam("namespace") 
String namespace) {
+    public void getNamespaceReplicationClusters(@Suspended AsyncResponse 
asyncResponse,
+                                                @PathParam("property") String 
property,
+                                                @PathParam("cluster") String 
cluster,
+                                                @PathParam("namespace") String 
namespace) {
         validateNamespaceName(property, cluster, namespace);
-        validateNamespacePolicyOperation(NamespaceName.get(property, 
namespace),
-                PolicyName.REPLICATION, PolicyOperation.READ);
-
-        return internalGetNamespaceReplicationClusters();
+        validateNamespacePolicyOperationAsync(NamespaceName.get(property, 
namespace),
+                PolicyName.REPLICATION, PolicyOperation.READ)
+                .thenCompose(__ -> 
internalGetNamespaceReplicationClustersAsync())
+                .thenAccept(asyncResponse::resume)
+                .exceptionally(e -> {
+                    log.error("[{}] Failed to get namespace replication 
clusters on namespace {}", clientAppId(),
+                            namespace, e);
+                    resumeAsyncResponseExceptionally(asyncResponse, e);
+                    return null;
+                });
     }
 
     @POST
@@ -379,10 +387,19 @@ public class Namespaces extends NamespacesBase {
             @ApiResponse(code = 404, message = "Property or cluster or 
namespace doesn't exist"),
             @ApiResponse(code = 409, message = "Peer-cluster can't be part of 
replication-cluster"),
             @ApiResponse(code = 412, message = "Namespace is not global or 
invalid cluster ids") })
-    public void setNamespaceReplicationClusters(@PathParam("property") String 
property,
-            @PathParam("cluster") String cluster, @PathParam("namespace") 
String namespace, List<String> clusterIds) {
+    public void setNamespaceReplicationClusters(@Suspended AsyncResponse 
asyncResponse,
+                                                @PathParam("property") String 
property,
+                                                @PathParam("cluster") String 
cluster,
+                                                @PathParam("namespace") String 
namespace, List<String> clusterIds) {
         validateNamespaceName(property, cluster, namespace);
-        internalSetNamespaceReplicationClusters(clusterIds);
+        internalSetNamespaceReplicationClusters(clusterIds)
+                .thenAccept(asyncResponse::resume)
+                .exceptionally(e -> {
+                    log.error("[{}] Failed to set namespace replication 
clusters on namespace {}", clientAppId(),
+                            namespace, e);
+                    resumeAsyncResponseExceptionally(asyncResponse, e);
+                    return null;
+                });
     }
 
     @GET
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/Namespaces.java 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/Namespaces.java
index e246c7d86e6..eaa8232032b 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/Namespaces.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/Namespaces.java
@@ -308,10 +308,18 @@ public class Namespaces extends NamespacesBase {
     @ApiResponses(value = {@ApiResponse(code = 403, message = "Don't have 
admin permission"),
             @ApiResponse(code = 404, message = "Tenant or cluster or namespace 
doesn't exist"),
             @ApiResponse(code = 412, message = "Namespace is not global")})
-    public Set<String> getNamespaceReplicationClusters(@PathParam("tenant") 
String tenant,
-            @PathParam("namespace") String namespace) {
+    public void getNamespaceReplicationClusters(@Suspended AsyncResponse 
asyncResponse,
+                                                @PathParam("tenant") String 
tenant,
+                                                @PathParam("namespace") String 
namespace) {
         validateNamespaceName(tenant, namespace);
-        return internalGetNamespaceReplicationClusters();
+        internalGetNamespaceReplicationClustersAsync()
+                .thenAccept(asyncResponse::resume)
+                .exceptionally(e -> {
+                    log.error("[{}] Failed to get namespace replication 
clusters on namespace {}", clientAppId(),
+                            namespace, e);
+                    resumeAsyncResponseExceptionally(asyncResponse, e);
+                    return null;
+                });
     }
 
     @POST
@@ -321,11 +329,19 @@ public class Namespaces extends NamespacesBase {
             @ApiResponse(code = 404, message = "Tenant or cluster or namespace 
doesn't exist"),
             @ApiResponse(code = 409, message = "Peer-cluster can't be part of 
replication-cluster"),
             @ApiResponse(code = 412, message = "Namespace is not global or 
invalid cluster ids") })
-    public void setNamespaceReplicationClusters(@PathParam("tenant") String 
tenant,
-            @PathParam("namespace") String namespace,
+    public void setNamespaceReplicationClusters(@Suspended AsyncResponse 
asyncResponse,
+                                                @PathParam("tenant") String 
tenant,
+                                                @PathParam("namespace") String 
namespace,
             @ApiParam(value = "List of replication clusters", required = true) 
List<String> clusterIds) {
         validateNamespaceName(tenant, namespace);
-        internalSetNamespaceReplicationClusters(clusterIds);
+        internalSetNamespaceReplicationClusters(clusterIds)
+                .thenAccept(asyncResponse::resume)
+                .exceptionally(e -> {
+                    log.error("[{}] Failed to set namespace replication 
clusters on namespace {}",
+                            clientAppId(), namespace, e);
+                    resumeAsyncResponseExceptionally(asyncResponse, e);
+                    return null;
+                });
     }
 
     @GET
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/web/PulsarWebResource.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/web/PulsarWebResource.java
index e491f78c9e2..22449e7f119 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/web/PulsarWebResource.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/web/PulsarWebResource.java
@@ -395,6 +395,28 @@ public abstract class PulsarWebResource {
         }
     }
 
+    protected CompletableFuture<Void> validatePeerClusterConflictAsync(String 
clusterName,
+                                                                       
Set<String> replicationClusters) {
+        return clusterResources().getClusterAsync(clusterName)
+                .thenAccept(data -> {
+                    ClusterData clusterData = data.orElseThrow(() -> new 
RestException(
+                            Status.PRECONDITION_FAILED, "Invalid replication 
cluster " + clusterName));
+                    Set<String> peerClusters = 
clusterData.getPeerClusterNames();
+                    if (peerClusters != null && !peerClusters.isEmpty()) {
+                        Sets.SetView<String> conflictPeerClusters =
+                                Sets.intersection(peerClusters, 
replicationClusters);
+                        if (!conflictPeerClusters.isEmpty()) {
+                            log.warn("[{}] {}'s peer cluster can't be part of 
replication clusters {}", clientAppId(),
+                                    clusterName, conflictPeerClusters);
+                            throw new RestException(Status.CONFLICT,
+                                    String.format("%s's peer-clusters %s can't 
be part of replication-clusters %s",
+                                            clusterName,
+                                            conflictPeerClusters, 
replicationClusters));
+                        }
+                    }
+                });
+    }
+
     protected void validateClusterForTenant(String tenant, String cluster) {
         TenantInfo tenantInfo;
         try {
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/NamespacesTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/NamespacesTest.java
index dba0b6bd6ba..e6b928a57a7 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/NamespacesTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/NamespacesTest.java
@@ -486,32 +486,37 @@ public class NamespacesTest extends 
MockedPulsarServiceBaseTest {
 
     @Test
     public void testGlobalNamespaceReplicationConfiguration() throws Exception 
{
-        assertEquals(
-                
namespaces.getNamespaceReplicationClusters(this.testGlobalNamespaces.get(0).getTenant(),
-                        this.testGlobalNamespaces.get(0).getCluster(), 
this.testGlobalNamespaces.get(0).getLocalName()),
-                Sets.newHashSet());
-
-        
namespaces.setNamespaceReplicationClusters(this.testGlobalNamespaces.get(0).getTenant(),
-                this.testGlobalNamespaces.get(0).getCluster(), 
this.testGlobalNamespaces.get(0).getLocalName(),
-                Lists.newArrayList("use", "usw"));
-        assertEquals(
-                
namespaces.getNamespaceReplicationClusters(this.testGlobalNamespaces.get(0).getTenant(),
-                        this.testGlobalNamespaces.get(0).getCluster(), 
this.testGlobalNamespaces.get(0).getLocalName()),
-                Lists.newArrayList("use", "usw"));
+
+        Set<String> repCluster = (Set<String>) asyncRequests(rsp -> 
namespaces.getNamespaceReplicationClusters(rsp,
+                this.testGlobalNamespaces.get(0).getTenant(), 
this.testGlobalNamespaces.get(0).getCluster(),
+                this.testGlobalNamespaces.get(0).getLocalName()));
+        assertEquals(repCluster, Sets.newHashSet());
+
+        asyncRequests(rsp -> namespaces.setNamespaceReplicationClusters(rsp,
+                this.testGlobalNamespaces.get(0).getTenant(), 
this.testGlobalNamespaces.get(0).getCluster(),
+                this.testGlobalNamespaces.get(0).getLocalName(),
+                Lists.newArrayList("use", "usw")));
+
+        repCluster = (Set<String>) asyncRequests(rsp -> 
namespaces.getNamespaceReplicationClusters(rsp,
+                this.testGlobalNamespaces.get(0).getTenant(), 
this.testGlobalNamespaces.get(0).getCluster(),
+                this.testGlobalNamespaces.get(0).getLocalName()));
+        assertEquals(repCluster, Lists.newArrayList("use", "usw"));
 
         try {
-            
namespaces.setNamespaceReplicationClusters(this.testGlobalNamespaces.get(0).getTenant(),
-                    this.testGlobalNamespaces.get(0).getCluster(), 
this.testGlobalNamespaces.get(0).getLocalName(),
-                    Lists.newArrayList("use", "invalid-cluster"));
+            asyncRequests(rsp -> 
namespaces.setNamespaceReplicationClusters(rsp,
+                    this.testGlobalNamespaces.get(0).getTenant(), 
this.testGlobalNamespaces.get(0).getCluster(),
+                    this.testGlobalNamespaces.get(0).getLocalName(),
+                    Lists.newArrayList("use", "invalid-cluster")));
             fail("should have failed");
         } catch (RestException e) {
             assertEquals(e.getResponse().getStatus(), 
Status.FORBIDDEN.getStatusCode());
         }
 
         try {
-            
namespaces.setNamespaceReplicationClusters(this.testGlobalNamespaces.get(0).getTenant(),
-                    this.testGlobalNamespaces.get(0).getCluster(), 
this.testGlobalNamespaces.get(0).getLocalName(),
-                    Lists.newArrayList("use", "global"));
+            asyncRequests(rsp -> 
namespaces.setNamespaceReplicationClusters(rsp,
+                    this.testGlobalNamespaces.get(0).getTenant(), 
this.testGlobalNamespaces.get(0).getCluster(),
+                    this.testGlobalNamespaces.get(0).getLocalName(),
+                    Lists.newArrayList("use", "global")));
             fail("should have failed");
         } catch (RestException e) {
             // Ok, global should not be allowed in the list of replication 
clusters
@@ -519,8 +524,9 @@ public class NamespacesTest extends 
MockedPulsarServiceBaseTest {
         }
 
         try {
-            namespaces.setNamespaceReplicationClusters(this.testTenant, 
"global",
-                    this.testGlobalNamespaces.get(0).getLocalName(), 
Lists.newArrayList("use", "invalid-cluster"));
+            asyncRequests(rsp -> 
namespaces.setNamespaceReplicationClusters(rsp, this.testTenant, "global",
+                    this.testGlobalNamespaces.get(0).getLocalName(),
+                    Lists.newArrayList("use", "invalid-cluster")));
             fail("should have failed");
         } catch (RestException e) {
             // Ok, invalid-cluster is an invalid cluster id
@@ -531,8 +537,8 @@ public class NamespacesTest extends 
MockedPulsarServiceBaseTest {
                 new TenantInfoImpl(Sets.newHashSet("role1", "role2"), 
Sets.newHashSet("use", "usc")));
 
         try {
-            namespaces.setNamespaceReplicationClusters(this.testTenant, 
"global",
-                    this.testGlobalNamespaces.get(0).getLocalName(), 
Lists.newArrayList("use", "usw"));
+            asyncRequests(rsp -> 
namespaces.setNamespaceReplicationClusters(rsp, this.testTenant, "global",
+                    this.testGlobalNamespaces.get(0).getLocalName(), 
Lists.newArrayList("use", "usw")));
             fail("should have failed");
         } catch (RestException e) {
             // Ok, usw was not configured in the list of allowed clusters
@@ -544,8 +550,8 @@ public class NamespacesTest extends 
MockedPulsarServiceBaseTest {
         mockZooKeeperGlobal.setAlwaysFail(Code.SESSIONEXPIRED);
 
         try {
-            namespaces.setNamespaceReplicationClusters(this.testTenant, 
"global",
-                    this.testGlobalNamespaces.get(0).getLocalName(), 
Lists.newArrayList("use"));
+            asyncRequests(rsp -> 
namespaces.setNamespaceReplicationClusters(rsp, this.testTenant, "global",
+                    this.testGlobalNamespaces.get(0).getLocalName(), 
Lists.newArrayList("use")));
             fail("should have failed");
         } catch (RestException e) {
             assertEquals(e.getResponse().getStatus(), 
Status.INTERNAL_SERVER_ERROR.getStatusCode());
@@ -566,23 +572,24 @@ public class NamespacesTest extends 
MockedPulsarServiceBaseTest {
         policiesCache.invalidateAll();
         store.invalidateAll();
         try {
-            namespaces.setNamespaceReplicationClusters(this.testTenant, 
"global",
-                    this.testGlobalNamespaces.get(0).getLocalName(), 
Lists.newArrayList("use"));
+            asyncRequests(rsp -> 
namespaces.setNamespaceReplicationClusters(rsp, this.testTenant, "global",
+                    this.testGlobalNamespaces.get(0).getLocalName(), 
Lists.newArrayList("use")));
             fail("should have failed");
         } catch (RestException e) {
             assertEquals(e.getResponse().getStatus(), 500);
         }
 
         try {
-            namespaces.getNamespaceReplicationClusters(this.testTenant, 
"global", "non-existing-ns");
+            asyncRequests(rsp -> 
namespaces.getNamespaceReplicationClusters(rsp, this.testTenant,
+                                                                     "global", 
"non-existing-ns"));
             fail("should have failed");
         } catch (RestException e) {
             assertEquals(e.getResponse().getStatus(), 
Status.NOT_FOUND.getStatusCode());
         }
 
         try {
-            namespaces.setNamespaceReplicationClusters(this.testTenant, 
"global", "non-existing-ns",
-                    Lists.newArrayList("use"));
+            asyncRequests(rsp -> 
namespaces.setNamespaceReplicationClusters(rsp, this.testTenant,
+                    "global", "non-existing-ns", Lists.newArrayList("use")));
             fail("should have failed");
         } catch (RestException e) {
             assertEquals(e.getResponse().getStatus(), 
Status.NOT_FOUND.getStatusCode());
@@ -597,24 +604,25 @@ public class NamespacesTest extends 
MockedPulsarServiceBaseTest {
         store.invalidateAll();
         // ensure the ZooKeeper read happens, bypassing the cache
         try {
-            namespaces.getNamespaceReplicationClusters(this.testTenant, 
"global",
-                    this.testGlobalNamespaces.get(0).getLocalName());
+            asyncRequests(rsp -> 
namespaces.getNamespaceReplicationClusters(rsp,
+                    this.testGlobalNamespaces.get(0).getTenant(), 
this.testGlobalNamespaces.get(0).getCluster(),
+                    this.testGlobalNamespaces.get(0).getLocalName()));
             fail("should have failed");
         } catch (RestException e) {
             assertEquals(e.getResponse().getStatus(), 500);
         }
 
         try {
-            namespaces.getNamespaceReplicationClusters(this.testTenant, 
this.testLocalCluster,
-                    this.testLocalNamespaces.get(0).getLocalName());
+            asyncRequests(rsp -> 
namespaces.getNamespaceReplicationClusters(rsp, this.testTenant,
+                    this.testLocalCluster, 
this.testLocalNamespaces.get(0).getLocalName()));
             fail("should have failed");
         } catch (RestException e) {
             assertEquals(e.getResponse().getStatus(), 
Status.PRECONDITION_FAILED.getStatusCode());
         }
 
         try {
-            namespaces.setNamespaceReplicationClusters(this.testTenant, 
this.testLocalCluster,
-                    this.testLocalNamespaces.get(0).getLocalName(), 
Lists.newArrayList("use"));
+            asyncRequests(rsp -> 
namespaces.setNamespaceReplicationClusters(rsp, this.testTenant, 
this.testLocalCluster,
+                    this.testLocalNamespaces.get(0).getLocalName(), 
Lists.newArrayList("use")));
             fail("should have failed");
         } catch (RestException e) {
             assertEquals(e.getResponse().getStatus(), 
Status.PRECONDITION_FAILED.getStatusCode());

Reply via email to