This is an automated email from the ASF dual-hosted git repository.

kwang pushed a commit to branch branch-3.0
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/branch-3.0 by this push:
     new 7b2e72464b8 [feat][broker][branch-3.0] PIP-321 Introduce 
allowed-cluster at the namespace level (#22378) (#22960)
7b2e72464b8 is described below

commit 7b2e72464b83507c0cf17ff0b364a4883d682d1f
Author: Kai Wang <[email protected]>
AuthorDate: Mon Jul 1 10:01:10 2024 +0800

    [feat][broker][branch-3.0] PIP-321 Introduce allowed-cluster at the 
namespace level (#22378) (#22960)
    
    (cherry-picked from commit 
https://github.com/apache/pulsar/commit/36bae695fb07f3ee790bee603149c4c2712187e0)
    Co-authored-by: Xiangying Meng 
<[email protected]>
---
 .../apache/pulsar/broker/admin/AdminResource.java  |  44 ++++---
 .../pulsar/broker/admin/impl/NamespacesBase.java   |  79 ++++++++++++-
 .../apache/pulsar/broker/admin/v2/Namespaces.java  |  46 ++++++++
 .../broker/service/persistent/PersistentTopic.java |  94 +++++++++------
 .../pulsar/broker/web/PulsarWebResource.java       |  15 ++-
 .../broker/namespace/NamespaceServiceTest.java     | 127 +++++++++++++++++++++
 .../pulsar/broker/service/ReplicatorTest.java      |  56 +++++++++
 .../org/apache/pulsar/client/admin/Namespaces.java |  84 ++++++++++++++
 .../pulsar/common/policies/data/Policies.java      |   5 +-
 .../client/admin/internal/NamespacesImpl.java      |  22 ++++
 .../pulsar/admin/cli/PulsarAdminToolTest.java      |   8 ++
 .../org/apache/pulsar/admin/cli/CmdNamespaces.java |  32 ++++++
 .../pulsar/common/policies/data/PolicyName.java    |   3 +-
 13 files changed, 546 insertions(+), 69 deletions(-)

diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/AdminResource.java 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/AdminResource.java
index 1982cc2df3c..f6be219aba3 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/AdminResource.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/AdminResource.java
@@ -320,32 +320,28 @@ public abstract class AdminResource extends 
PulsarWebResource {
     }
 
     protected CompletableFuture<Policies> 
getNamespacePoliciesAsync(NamespaceName namespaceName) {
-        return 
namespaceResources().getPoliciesAsync(namespaceName).thenCompose(policies -> {
-            if (policies.isPresent()) {
-                return pulsar()
-                        .getNamespaceService()
-                        .getNamespaceBundleFactory()
-                        .getBundlesAsync(namespaceName)
-                        .thenCompose(bundles -> {
-                    BundlesData bundleData = null;
-                    try {
-                        bundleData = bundles.getBundlesData();
-                    } catch (Exception e) {
-                        log.error("[{}] Failed to get namespace policies {}", 
clientAppId(), namespaceName, e);
-                        return FutureUtil.failedFuture(new RestException(e));
-                    }
-                    policies.get().bundles = bundleData != null ? bundleData : 
policies.get().bundles;
-                    if (policies.get().is_allow_auto_update_schema == null) {
-                        // the type changed from boolean to Boolean. return 
broker value here for keeping compatibility.
-                        policies.get().is_allow_auto_update_schema = 
pulsar().getConfig()
-                                .isAllowAutoUpdateSchemaEnabled();
+        CompletableFuture<Policies> result = new CompletableFuture<>();
+        namespaceResources().getPoliciesAsync(namespaceName)
+                
.thenCombine(getLocalPolicies().getLocalPoliciesAsync(namespaceName), (pl, 
localPolicies) -> {
+                    if (pl.isPresent()) {
+                        Policies policies = pl.get();
+                        localPolicies.ifPresent(value -> policies.bundles = 
value.bundles);
+                        if (policies.is_allow_auto_update_schema == null) {
+                            // the type changed from boolean to Boolean. return
+                            // broker value here for keeping compatibility.
+                            policies.is_allow_auto_update_schema = 
pulsar().getConfig()
+                                    .isAllowAutoUpdateSchemaEnabled();
+                        }
+                        result.complete(policies);
+                    } else {
+                        result.completeExceptionally(new 
RestException(Status.NOT_FOUND, "Namespace does not exist"));
                     }
-                    return CompletableFuture.completedFuture(policies.get());
+                    return null;
+                }).exceptionally(ex -> {
+                    result.completeExceptionally(ex.getCause());
+                    return null;
                 });
-            } else {
-                return FutureUtil.failedFuture(new 
RestException(Status.NOT_FOUND, "Namespace does not exist"));
-            }
-        });
+        return result;
     }
 
     protected BacklogQuota namespaceBacklogQuota(NamespaceName namespace,
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java
index 0d58055f236..7aeb2de96ac 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java
@@ -726,9 +726,21 @@ public abstract class NamespacesBase extends AdminResource 
{
                                                     "Invalid cluster id: " + 
clusterId);
                                         }
                                         return 
validatePeerClusterConflictAsync(clusterId, replicationClusterSet)
-                                                .thenCompose(__ ->
-                                                        
validateClusterForTenantAsync(
-                                                                
namespaceName.getTenant(), clusterId));
+                                                .thenCompose(__ -> 
getNamespacePoliciesAsync(this.namespaceName)
+                                                        
.thenCompose(nsPolicies -> {
+                                                            if 
(nsPolicies.allowed_clusters.isEmpty()) {
+                                                                return 
validateClusterForTenantAsync(
+                                                                        
namespaceName.getTenant(), clusterId);
+                                                            }
+                                                            if 
(!nsPolicies.allowed_clusters.contains(clusterId)) {
+                                                                String msg = 
String.format("Cluster [%s] is not in the "
+                                                                        + 
"list of allowed clusters list for namespace "
+                                                                        + 
"[%s]", clusterId, namespaceName.toString());
+                                                                log.info(msg);
+                                                                throw new 
RestException(Status.FORBIDDEN, msg);
+                                                            }
+                                                            return 
CompletableFuture.completedFuture(null);
+                                                        }));
                                     }).collect(Collectors.toList());
                             return FutureUtil.waitForAll(futures).thenApply(__ 
-> replicationClusterSet);
                         }))
@@ -2719,4 +2731,65 @@ public abstract class NamespacesBase extends 
AdminResource {
                     return null;
                 });
     }
+
+    protected CompletableFuture<Void> 
internalSetNamespaceAllowedClusters(List<String> clusterIds) {
+        return validateNamespacePolicyOperationAsync(namespaceName, 
PolicyName.ALLOW_CLUSTERS, PolicyOperation.WRITE)
+                .thenCompose(__ -> validatePoliciesReadOnlyAccessAsync())
+                // Allowed clusters in the namespace policy should be included 
in the allowed clusters in the tenant
+                // policy.
+                .thenCompose(__ -> 
FutureUtil.waitForAll(clusterIds.stream().map(clusterId ->
+                        
validateClusterForTenantAsync(namespaceName.getTenant(), clusterId))
+                        .collect(Collectors.toList())))
+                // Allowed clusters should include all the existed replication 
clusters and could not contain global
+                // cluster.
+                .thenCompose(__ -> {
+                    checkNotNull(clusterIds, "ClusterIds should not be null");
+                    if (clusterIds.contains("global")) {
+                        throw new RestException(Status.PRECONDITION_FAILED,
+                                "Cannot specify global in the list of allowed 
clusters");
+                    }
+                    return 
getNamespacePoliciesAsync(this.namespaceName).thenApply(namespacePolicies -> {
+                        
namespacePolicies.replication_clusters.forEach(replicationCluster -> {
+                            if (!clusterIds.contains(replicationCluster)) {
+                                throw new RestException(Status.BAD_REQUEST,
+                                        String.format("Allowed clusters do not 
contain the replication cluster %s. "
+                                                + "Please remove the 
replication cluster if the cluster is not allowed "
+                                                + "for this namespace", 
replicationCluster));
+                            }
+                        });
+                        return Sets.newHashSet(clusterIds);
+                    });
+                })
+                // Verify the allowed clusters are valid and they do not 
contain the peer clusters.
+                .thenCompose(allowedClusters -> clustersAsync()
+                        .thenCompose(clusters -> {
+                            List<CompletableFuture<Void>> futures =
+                                    allowedClusters.stream().map(clusterId -> {
+                                        if (!clusters.contains(clusterId)) {
+                                            throw new 
RestException(Status.FORBIDDEN,
+                                                    "Invalid cluster id: " + 
clusterId);
+                                        }
+                                        return 
validatePeerClusterConflictAsync(clusterId, allowedClusters);
+                                    }).collect(Collectors.toList());
+                            return FutureUtil.waitForAll(futures).thenApply(__ 
-> allowedClusters);
+                        }))
+                // Update allowed clusters into policies.
+                .thenCompose(allowedClusterSet -> 
updatePoliciesAsync(namespaceName, policies -> {
+                    policies.allowed_clusters = allowedClusterSet;
+                    return policies;
+                }));
+    }
+
+    protected CompletableFuture<Set<String>> 
internalGetNamespaceAllowedClustersAsync() {
+        return validateNamespacePolicyOperationAsync(namespaceName, 
PolicyName.ALLOW_CLUSTERS, PolicyOperation.READ)
+                .thenAccept(__ -> {
+                    if (!namespaceName.isGlobal()) {
+                        throw new RestException(Status.PRECONDITION_FAILED,
+                                "Cannot get the allowed clusters for a 
non-global namespace");
+                    }
+                }).thenCompose(__ -> getNamespacePoliciesAsync(namespaceName))
+                .thenApply(policies -> policies.allowed_clusters);
+    }
+
+
 }
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/Namespaces.java 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/Namespaces.java
index d673d925e77..03c23ca6563 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/Namespaces.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/Namespaces.java
@@ -2976,5 +2976,51 @@ public class Namespaces extends NamespacesBase {
                 });
     }
 
+    @POST
+    @Path("/{tenant}/{namespace}/allowedClusters")
+    @ApiOperation(value = "Set the allowed clusters for a namespace.")
+    @ApiResponses(value = {
+            @ApiResponse(code = 400, message = "The list of allowed clusters 
should include all replication clusters."),
+            @ApiResponse(code = 403, message = "The requester does not have 
admin permissions."),
+            @ApiResponse(code = 404, message = "The specified tenant, cluster, 
or namespace does not exist."),
+            @ApiResponse(code = 409, message = "A peer-cluster cannot be part 
of an allowed-cluster."),
+            @ApiResponse(code = 412, message = "The namespace is not global or 
the provided cluster IDs are invalid.")})
+    public void setNamespaceAllowedClusters(@Suspended AsyncResponse 
asyncResponse,
+                                                @PathParam("tenant") String 
tenant,
+                                                @PathParam("namespace") String 
namespace,
+                                                @ApiParam(value = "List of 
allowed clusters", required = true)
+                                                List<String> clusterIds) {
+        validateNamespaceName(tenant, namespace);
+        internalSetNamespaceAllowedClusters(clusterIds)
+                .thenAccept(asyncResponse::resume)
+                .exceptionally(e -> {
+                    log.error("[{}] Failed to set namespace allowed clusters 
on namespace {}",
+                            clientAppId(), namespace, e);
+                    resumeAsyncResponseExceptionally(asyncResponse, e);
+                    return null;
+                });
+    }
+
+    @GET
+    @Path("/{tenant}/{namespace}/allowedClusters")
+    @ApiOperation(value = "Get the allowed clusters for a namespace.",
+            response = String.class, responseContainer = "List")
+    @ApiResponses(value = {@ApiResponse(code = 403, message = "Don't have 
admin permission"),
+            @ApiResponse(code = 404, message = "Tenant or cluster or namespace 
doesn't exist"),
+            @ApiResponse(code = 412, message = "Namespace is not global")})
+    public void getNamespaceAllowedClusters(@Suspended AsyncResponse 
asyncResponse,
+                                                @PathParam("tenant") String 
tenant,
+                                                @PathParam("namespace") String 
namespace) {
+        validateNamespaceName(tenant, namespace);
+        internalGetNamespaceAllowedClustersAsync()
+                .thenAccept(asyncResponse::resume)
+                .exceptionally(e -> {
+                    log.error("[{}] Failed to get namespace allowed clusters 
on namespace {}", clientAppId(),
+                            namespace, e);
+                    resumeAsyncResponseExceptionally(asyncResponse, e);
+                    return null;
+                });
+    }
+
     private static final Logger log = 
LoggerFactory.getLogger(Namespaces.class);
 }
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
index 3eb7648614f..5478327bb81 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
@@ -1744,52 +1744,78 @@ public class PersistentTopic extends AbstractTopic 
implements Topic, AddEntryCal
         if (log.isDebugEnabled()) {
             log.debug("[{}] Checking replication status", name);
         }
-
         List<String> configuredClusters = 
topicPolicies.getReplicationClusters().get();
         if (CollectionUtils.isEmpty(configuredClusters)) {
             log.warn("[{}] No replication clusters configured", name);
             return CompletableFuture.completedFuture(null);
         }
 
-        int newMessageTTLInSeconds = 
topicPolicies.getMessageTTLInSeconds().get();
-
         String localCluster = 
brokerService.pulsar().getConfiguration().getClusterName();
 
-        // if local cluster is removed from global namespace cluster-list : 
then delete topic forcefully
-        // because pulsar doesn't serve global topic without local 
repl-cluster configured.
-        if (TopicName.get(topic).isGlobal() && 
!configuredClusters.contains(localCluster)) {
-            log.info("Deleting topic [{}] because local cluster is not part of 
"
-                    + " global namespace repl list {}", topic, 
configuredClusters);
-            return deleteForcefully();
-        }
-
-        removeTerminatedReplicators(replicators);
-        List<CompletableFuture<Void>> futures = new ArrayList<>();
-
-        // Check for missing replicators
-        for (String cluster : configuredClusters) {
-            if (cluster.equals(localCluster)) {
-                continue;
-            }
-            if (!replicators.containsKey(cluster)) {
-                futures.add(startReplicator(cluster));
-            }
-        }
-
-        // Check for replicators to be stopped
-        replicators.forEach((cluster, replicator) -> {
-            // Update message TTL
-            ((PersistentReplicator) 
replicator).updateMessageTTL(newMessageTTLInSeconds);
-            if (!cluster.equals(localCluster)) {
-                if (!configuredClusters.contains(cluster)) {
-                    futures.add(removeReplicator(cluster));
+        return checkAllowedCluster(localCluster).thenCompose(success -> {
+            if (!success) {
+                // if local cluster is removed from global namespace 
cluster-list : then delete topic forcefully
+                // because pulsar doesn't serve global topic without local 
repl-cluster configured.
+                return deleteForcefully();
+            }
+
+            int newMessageTTLInSeconds = 
topicPolicies.getMessageTTLInSeconds().get();
+
+            removeTerminatedReplicators(replicators);
+            List<CompletableFuture<Void>> futures = new ArrayList<>();
+
+            // The replication clusters at namespace level will get local 
cluster when creating a namespace.
+            // If there are only one cluster in the replication clusters, it 
means the replication is not enabled.
+            // If the cluster 1 and cluster 2 use the same configuration store 
and the namespace is created in cluster1
+            // without enabling geo-replication, then the replication clusters 
always has cluster1.
+            //
+            // When a topic under the namespace is load in the cluster2, the 
`cluster1` may be identified as
+            // remote cluster and start geo-replication. This check is to 
avoid the above case.
+            if (!(configuredClusters.size() == 1 && replicators.isEmpty())) {
+                // Check for missing replicators
+                for (String cluster : configuredClusters) {
+                    if (cluster.equals(localCluster)) {
+                        continue;
+                    }
+                    if (!replicators.containsKey(cluster)) {
+                        futures.add(startReplicator(cluster));
+                    }
                 }
+                // Check for replicators to be stopped
+                replicators.forEach((cluster, replicator) -> {
+                    // Update message TTL
+                    ((PersistentReplicator) 
replicator).updateMessageTTL(newMessageTTLInSeconds);
+                    if (!cluster.equals(localCluster)) {
+                        if (!configuredClusters.contains(cluster)) {
+                            futures.add(removeReplicator(cluster));
+                        }
+                    }
+                });
             }
-        });
 
-        futures.add(checkShadowReplication());
+            futures.add(checkShadowReplication());
 
-        return FutureUtil.waitForAll(futures);
+            return FutureUtil.waitForAll(futures);
+        });
+    }
+
+    private CompletableFuture<Boolean> checkAllowedCluster(String 
localCluster) {
+        List<String> replicationClusters = 
topicPolicies.getReplicationClusters().get();
+        return 
brokerService.pulsar().getPulsarResources().getNamespaceResources()
+                
.getPoliciesAsync(TopicName.get(topic).getNamespaceObject()).thenCompose(policiesOptional
 -> {
+                    Set<String> allowedClusters = Set.of();
+                    if (policiesOptional.isPresent()) {
+                        allowedClusters = 
policiesOptional.get().allowed_clusters;
+                    }
+                    if (TopicName.get(topic).isGlobal() && 
!replicationClusters.contains(localCluster)
+                            && !allowedClusters.contains(localCluster)) {
+                        log.warn("Local cluster {} is not part of global 
namespace repl list {} and allowed list {}",
+                                localCluster, replicationClusters, 
allowedClusters);
+                        return CompletableFuture.completedFuture(false);
+                    } else {
+                        return CompletableFuture.completedFuture(true);
+                    }
+                });
     }
 
     private CompletableFuture<Void> checkShadowReplication() {
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/web/PulsarWebResource.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/web/PulsarWebResource.java
index 2726ccc7c98..f9b1f0d08c0 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/web/PulsarWebResource.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/web/PulsarWebResource.java
@@ -905,14 +905,16 @@ public abstract class PulsarWebResource {
                     log.warn(msg);
                     validationFuture.completeExceptionally(new 
RestException(Status.NOT_FOUND,
                             "Namespace is deleted"));
-                } else if (policies.replication_clusters.isEmpty()) {
+                } else if (policies.replication_clusters.isEmpty() && 
policies.allowed_clusters.isEmpty()) {
                     String msg = String.format(
                             "Namespace does not have any clusters configured : 
local_cluster=%s ns=%s",
                             localCluster, namespace.toString());
                     log.warn(msg);
                     validationFuture.completeExceptionally(new 
RestException(Status.PRECONDITION_FAILED, msg));
-                } else if 
(!policies.replication_clusters.contains(localCluster)) {
-                    getOwnerFromPeerClusterListAsync(pulsarService, 
policies.replication_clusters)
+                } else if 
(!policies.replication_clusters.contains(localCluster) && 
!policies.allowed_clusters
+                        .contains(localCluster)) {
+                    getOwnerFromPeerClusterListAsync(pulsarService, 
policies.replication_clusters,
+                            policies.allowed_clusters)
                             .thenAccept(ownerPeerCluster -> {
                                 if (ownerPeerCluster != null) {
                                     // found a peer that own this namespace
@@ -952,9 +954,9 @@ public abstract class PulsarWebResource {
     }
 
     private static CompletableFuture<ClusterDataImpl> 
getOwnerFromPeerClusterListAsync(PulsarService pulsar,
-            Set<String> replicationClusters) {
+            Set<String> replicationClusters, Set<String> allowedClusters) {
         String currentCluster = pulsar.getConfiguration().getClusterName();
-        if (replicationClusters == null || replicationClusters.isEmpty() || 
isBlank(currentCluster)) {
+        if (replicationClusters.isEmpty() && allowedClusters.isEmpty() || 
isBlank(currentCluster)) {
             return CompletableFuture.completedFuture(null);
         }
 
@@ -964,7 +966,8 @@ public abstract class PulsarWebResource {
                         return CompletableFuture.completedFuture(null);
                     }
                     for (String peerCluster : 
cluster.get().getPeerClusterNames()) {
-                        if (replicationClusters.contains(peerCluster)) {
+                        if (replicationClusters.contains(peerCluster)
+                                || allowedClusters.contains(peerCluster)) {
                             return 
pulsar.getPulsarResources().getClusterResources().getClusterAsync(peerCluster)
                                     .thenApply(ret -> {
                                         if (!ret.isPresent()) {
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/namespace/NamespaceServiceTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/namespace/NamespaceServiceTest.java
index 38a60165d56..e975fe3cfa9 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/namespace/NamespaceServiceTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/namespace/NamespaceServiceTest.java
@@ -32,6 +32,7 @@ import static org.testng.Assert.assertNull;
 import static org.testng.Assert.assertTrue;
 import static org.testng.Assert.fail;
 import com.github.benmanes.caffeine.cache.AsyncLoadingCache;
+import com.google.common.collect.Sets;
 import com.google.common.hash.Hashing;
 import java.lang.reflect.Field;
 import java.lang.reflect.Method;
@@ -40,6 +41,7 @@ import java.util.ArrayList;
 import java.util.EnumSet;
 import java.util.HashMap;
 import java.util.HashSet;
+import java.util.LinkedHashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Optional;
@@ -64,6 +66,7 @@ import org.apache.pulsar.broker.lookup.LookupResult;
 import org.apache.pulsar.broker.service.BrokerTestBase;
 import org.apache.pulsar.broker.service.Topic;
 import org.apache.pulsar.broker.service.persistent.PersistentTopic;
+import org.apache.pulsar.client.admin.Namespaces;
 import org.apache.pulsar.client.admin.PulsarAdminException;
 import org.apache.pulsar.client.api.Consumer;
 import org.apache.pulsar.client.api.PulsarClient;
@@ -76,8 +79,11 @@ import org.apache.pulsar.common.naming.ServiceUnitId;
 import org.apache.pulsar.common.naming.TopicDomain;
 import org.apache.pulsar.common.naming.TopicName;
 import org.apache.pulsar.common.policies.data.BundlesData;
+import org.apache.pulsar.common.policies.data.ClusterData;
 import org.apache.pulsar.common.policies.data.LocalPolicies;
 import org.apache.pulsar.common.policies.data.Policies;
+import org.apache.pulsar.common.policies.data.TenantInfo;
+import org.apache.pulsar.common.policies.data.TenantInfoImpl;
 import org.apache.pulsar.common.util.ObjectMapperFactory;
 import org.apache.pulsar.common.util.collections.ConcurrentOpenHashMap;
 import org.apache.pulsar.metadata.api.GetResult;
@@ -828,6 +834,127 @@ public class NamespaceServiceTest extends BrokerTestBase {
         });
     }
 
+    @Test
+    public void 
testAllowedClustersAtNamespaceLevelShouldBeIncludedInAllowedClustersAtTenantLevel()
 throws Exception {
+        // 1. Setup
+        pulsar.getConfiguration().setForceDeleteNamespaceAllowed(true);
+        pulsar.getConfiguration().setForceDeleteTenantAllowed(true);
+        Set<String> tenantAllowedClusters = Set.of("test", "r1", "r2");
+        Set<String> allowedClusters1 = Set.of("test", "r1", "r2", "r3");
+        Set<String> allowedClusters2 = Set.of("test", "r1", "r2");
+        Set<String> clusters = Set.of("r1", "r2", "r3", "r4");
+        final String tenant = "my-tenant";
+        final String namespace = tenant + "/testAllowedCluster";
+        admin.tenants().createTenant(tenant,
+                new TenantInfoImpl(Sets.newHashSet("appid1"), 
Sets.newHashSet("test")));
+        admin.namespaces().createNamespace(namespace);
+        
pulsar.getPulsarResources().getTenantResources().updateTenantAsync(tenant, 
tenantInfo ->
+                
TenantInfo.builder().allowedClusters(tenantAllowedClusters).build());
+        for (String cluster : clusters) {
+            
pulsar.getPulsarResources().getClusterResources().createCluster(cluster, 
ClusterData.builder().build());
+        }
+        // 2. Verify
+        admin.namespaces().setNamespaceAllowedClusters(namespace, 
allowedClusters2);
+
+        try {
+            admin.namespaces().setNamespaceAllowedClusters(namespace, 
allowedClusters1);
+            fail();
+        } catch (PulsarAdminException e) {
+            assertEquals(e.getStatusCode(), 403);
+            assertEquals(e.getMessage(),
+                    "Cluster [r3] is not in the list of allowed clusters list 
for tenant [my-tenant]");
+        }
+        // 3. Clean up
+        admin.namespaces().deleteNamespace(namespace, true);
+        admin.tenants().deleteTenant(tenant, true);
+        for (String cluster : clusters) {
+            
pulsar.getPulsarResources().getClusterResources().deleteCluster(cluster);
+        }
+        pulsar.getConfiguration().setForceDeleteNamespaceAllowed(false);
+        pulsar.getConfiguration().setForceDeleteTenantAllowed(false);
+    }
+
+    /**
+     * Test case:
+     *      1. Replication clusters should be included in the allowed 
clusters. For compatibility, the replication
+     *      clusters could be set before the allowed clusters are set.
+     *      2. Peer cluster can not be a part of the allowed clusters.
+     */
+    @Test
+    public void 
testNewAllowedClusterAdminAPIAndItsImpactOnReplicationClusterAPI() throws 
Exception {
+        // 1. Setup
+        pulsar.getConfiguration().setForceDeleteNamespaceAllowed(true);
+        pulsar.getConfiguration().setForceDeleteTenantAllowed(true);
+        // Setup: Prepare cluster resource, tenant and namespace
+        Set<String> replicationClusters = Set.of("test", "r1", "r2");
+        Set<String> tenantAllowedClusters = Set.of("test", "r1", "r2", "r3");
+        Set<String> allowedClusters = Set.of("test", "r1", "r2", "r3");
+        Set<String> clusters = Set.of("r1", "r2", "r3", "r4");
+        final String tenant = "my-tenant";
+        final String namespace = tenant + "/testAllowedCluster";
+        admin.tenants().createTenant(tenant,
+                new TenantInfoImpl(Sets.newHashSet("appid1"), 
Sets.newHashSet("test")));
+        admin.namespaces().createNamespace(namespace);
+        
pulsar.getPulsarResources().getTenantResources().updateTenantAsync(tenant, 
tenantInfo ->
+                
TenantInfo.builder().allowedClusters(tenantAllowedClusters).build());
+
+        Namespaces namespaces = admin.namespaces();
+        for (String cluster : clusters) {
+            
pulsar.getPulsarResources().getClusterResources().createCluster(cluster, 
ClusterData.builder().build());
+        }
+        // 2. Verify
+        // 2.1 Replication clusters should be included in the allowed clusters.
+
+        // SUCCESS
+        // 2.1.1. Set replication clusters without allowed clusters at 
namespace level.
+        namespaces.setNamespaceReplicationClusters(namespace, 
replicationClusters);
+        // 2..1.2 Set allowed clusters.
+        namespaces.setNamespaceAllowedClusters(namespace, allowedClusters);
+        // 2.1.3. Get allowed clusters and replication clusters.
+        List<String> allowedClustersResponse = 
namespaces.getNamespaceAllowedClusters(namespace);
+
+        List<String> replicationClustersResponse = 
namespaces.getNamespaceReplicationClusters(namespace);
+
+        assertEquals(replicationClustersResponse.size(), 
replicationClusters.size());
+        assertEquals(allowedClustersResponse.size(), allowedClusters.size());
+
+        // FAIL
+        // 2.1.4. Fail: Set allowed clusters whose scope is smaller than 
replication clusters.
+        Set<String> allowedClustersSmallScope = Set.of("r1", "r3");
+        try {
+            namespaces.setNamespaceAllowedClusters(namespace, 
allowedClustersSmallScope);
+            fail();
+        } catch (PulsarAdminException ignore) {}
+        // 2.1.5. Fail: Set replication clusters whose scope is excel the 
allowed clusters.
+        Set<String> replicationClustersExcel = Set.of("r1", "r4");
+        try {
+            namespaces.setNamespaceReplicationClusters(namespace, 
replicationClustersExcel);
+            fail();
+            //Todo: The status code in the old implementation is confused.
+        } catch (PulsarAdminException.NotAuthorizedException ignore) {}
+
+        // 2.2 Peer cluster can not be a part of the allowed clusters.
+        LinkedHashSet<String> peerCluster = new LinkedHashSet<>();
+        peerCluster.add("r2");
+        pulsar.getPulsarResources().getClusterResources().deleteCluster("r1");
+        pulsar.getPulsarResources().getClusterResources().createCluster("r1",
+                ClusterData.builder().peerClusterNames(peerCluster).build());
+        try {
+            namespaces.setNamespaceAllowedClusters(namespace, Set.of("test", 
"r1", "r2", "r3"));
+            fail();
+        } catch (PulsarAdminException.ConflictException ignore) {}
+
+        // CleanUp: Namespace with replication clusters can not be deleted by 
force.
+        namespaces.setNamespaceReplicationClusters(namespace, 
Set.of(conf.getClusterName()));
+        admin.namespaces().deleteNamespace(namespace, true);
+        admin.tenants().deleteTenant(tenant, true);
+        for (String cluster : clusters) {
+            
pulsar.getPulsarResources().getClusterResources().deleteCluster(cluster);
+        }
+        pulsar.getConfiguration().setForceDeleteNamespaceAllowed(false);
+        pulsar.getConfiguration().setForceDeleteTenantAllowed(false);
+    }
+
     /**
      * 1. Manually trigger "LoadReportUpdaterTask"
      * 2. Registry another new zk-node-listener "waitForBrokerChangeNotice".
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorTest.java
index 3cc2ca2457a..75ff51055fc 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorTest.java
@@ -62,6 +62,7 @@ import org.apache.bookkeeper.mledger.impl.ManagedCursorImpl;
 import org.apache.bookkeeper.mledger.impl.ManagedLedgerFactoryImpl;
 import org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl;
 import org.apache.bookkeeper.mledger.impl.PositionImpl;
+import org.apache.commons.lang3.RandomUtils;
 import org.apache.pulsar.broker.BrokerTestUtil;
 import org.apache.pulsar.broker.PulsarService;
 import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest;
@@ -1782,6 +1783,61 @@ public class ReplicatorTest extends ReplicatorTestBase {
         assertEquals(result, Lists.newArrayList("V1", "V2", "V3", "V4"));
     }
 
+    @Test
+    public void testEnableReplicationWithNamespaceAllowedClustersPolices() 
throws Exception {
+        log.info("--- testEnableReplicationWithNamespaceAllowedClustersPolices 
---");
+        String namespace1 = "pulsar/ns" + RandomUtils.nextLong();
+        admin1.namespaces().createNamespace(namespace1);
+        admin2.namespaces().createNamespace(namespace1 + "init_cluster_node");
+        admin1.namespaces().setNamespaceAllowedClusters(namespace1, 
Sets.newHashSet("r1", "r2"));
+        final TopicName topicName = TopicName.get(
+                BrokerTestUtil.newUniqueName("persistent://" + namespace1 + 
"/testReplicatorProducerNotExceed1"));
+
+        @Cleanup PulsarClient client1 = PulsarClient
+                .builder()
+                .serviceUrl(pulsar1.getBrokerServiceUrl())
+                .build();
+        @Cleanup Producer<byte[]> producer = client1
+                .newProducer()
+                .topic(topicName.toString())
+                .create();
+        producer.newMessage().send();
+        // Enable replication at the topic level in the cluster1.
+        admin1.topics().setReplicationClusters(topicName.toString(), 
List.of("r1", "r2"));
+
+        PersistentTopic persistentTopic1 = (PersistentTopic) 
pulsar1.getBrokerService().getTopic(topicName.toString(),
+                false)
+                .get()
+                .get();
+        // Verify the replication from cluster1 to cluster2 is ready, but the 
replication form the cluster2 to cluster1
+        // is not ready.
+        Awaitility.await().untilAsserted(() -> {
+            ConcurrentOpenHashMap<String, Replicator> replicatorMap = 
persistentTopic1.getReplicators();
+            assertEquals(replicatorMap.size(), 1);
+            Replicator replicator = 
replicatorMap.get(replicatorMap.keys().get(0));
+            assertTrue(replicator.isConnected());
+        });
+
+        PersistentTopic persistentTopic2 = (PersistentTopic) 
pulsar2.getBrokerService().getTopic(topicName.toString(),
+                        false)
+                .get()
+                .get();
+
+        Awaitility.await().untilAsserted(() -> {
+            ConcurrentOpenHashMap<String, Replicator> replicatorMap = 
persistentTopic2.getReplicators();
+            assertEquals(replicatorMap.size(), 0);
+        });
+        // Enable replication at the topic level in the cluster2.
+        admin2.topics().setReplicationClusters(topicName.toString(), 
List.of("r1", "r2"));
+        //  Verify the replication between cluster1 and cluster2  is ready.
+        Awaitility.await().untilAsserted(() -> {
+            ConcurrentOpenHashMap<String, Replicator> replicatorMap = 
persistentTopic2.getReplicators();
+            assertEquals(replicatorMap.size(), 1);
+            Replicator replicator = 
replicatorMap.get(replicatorMap.keys().get(0));
+            assertTrue(replicator.isConnected());
+        });
+    }
+
     private void pauseReplicator(PersistentReplicator replicator) {
         Awaitility.await().untilAsserted(() -> {
             assertTrue(replicator.isConnected());
diff --git 
a/pulsar-client-admin-api/src/main/java/org/apache/pulsar/client/admin/Namespaces.java
 
b/pulsar-client-admin-api/src/main/java/org/apache/pulsar/client/admin/Namespaces.java
index 2690df658b7..32c659dc01d 100644
--- 
a/pulsar-client-admin-api/src/main/java/org/apache/pulsar/client/admin/Namespaces.java
+++ 
b/pulsar-client-admin-api/src/main/java/org/apache/pulsar/client/admin/Namespaces.java
@@ -4623,4 +4623,88 @@ public interface Namespaces {
      * @return
      */
     CompletableFuture<Void> removeNamespaceEntryFiltersAsync(String namespace);
+
+    /**
+     * Get the allowed clusters for a namespace.
+     * <p/>
+     * Response example:
+     *
+     * <pre>
+     * <code>["use", "usw", "usc"]</code>
+     * </pre>
+     *
+     * @param namespace
+     *            Namespace name
+     * @throws NotAuthorizedException
+     *             Don't have admin permission
+     * @throws NotFoundException
+     *             Namespace does not exist
+     * @throws PreconditionFailedException
+     *             Namespace is not global
+     * @throws PulsarAdminException
+     *             Unexpected error
+     */
+    List<String> getNamespaceAllowedClusters(String namespace) throws 
PulsarAdminException;
+
+    /**
+     * Get the allowed clusters for a namespace asynchronously.
+     * <p/>
+     * Response example:
+     *
+     * <pre>
+     * <code>["use", "usw", "usc"]</code>
+     * </pre>
+     *
+     * @param namespace
+     *            Namespace name
+     */
+    CompletableFuture<List<String>> getNamespaceAllowedClustersAsync(String 
namespace);
+
+    /**
+     * Set the allowed clusters for a namespace.
+     * <p/>
+     * Request example:
+     *
+     * <pre>
+     * <code>["us-west", "us-east", "us-cent"]</code>
+     * </pre>
+     *
+     * @param namespace
+     *            Namespace name
+     * @param clusterIds
+     *            Pulsar Cluster Ids
+     *
+     * @throws ConflictException
+     *             Peer-cluster cannot be part of an allowed-cluster
+     * @throws NotAuthorizedException
+     *             Don't have admin permission
+     * @throws NotFoundException
+     *             Namespace does not exist
+     * @throws PreconditionFailedException
+     *             Namespace is not global
+     * @throws PreconditionFailedException
+     *             Invalid cluster ids
+     * @throws PulsarAdminException
+     *             The list of allowed clusters should include all replication 
clusters.
+     * @throws PulsarAdminException
+     *             Unexpected error
+     */
+    void setNamespaceAllowedClusters(String namespace, Set<String> clusterIds) 
throws PulsarAdminException;
+
+    /**
+     * Set the allowed clusters for a namespace asynchronously.
+     * <p/>
+     * Request example:
+     *
+     * <pre>
+     * <code>["us-west", "us-east", "us-cent"]</code>
+     * </pre>
+     *
+     * @param namespace
+     *            Namespace name
+     * @param clusterIds
+     *            Pulsar Cluster Ids
+     */
+    CompletableFuture<Void> setNamespaceAllowedClustersAsync(String namespace, 
Set<String> clusterIds);
+
 }
diff --git 
a/pulsar-client-admin-api/src/main/java/org/apache/pulsar/common/policies/data/Policies.java
 
b/pulsar-client-admin-api/src/main/java/org/apache/pulsar/common/policies/data/Policies.java
index 066fdf1df4f..4e0c68bed3a 100644
--- 
a/pulsar-client-admin-api/src/main/java/org/apache/pulsar/common/policies/data/Policies.java
+++ 
b/pulsar-client-admin-api/src/main/java/org/apache/pulsar/common/policies/data/Policies.java
@@ -36,6 +36,8 @@ public class Policies {
     public final AuthPolicies auth_policies = AuthPolicies.builder().build();
     @SuppressWarnings("checkstyle:MemberName")
     public Set<String> replication_clusters = new HashSet<>();
+    @SuppressWarnings("checkstyle:MemberName")
+    public Set<String> allowed_clusters = new HashSet<>();
     public BundlesData bundles;
     @SuppressWarnings("checkstyle:MemberName")
     public Map<BacklogQuota.BacklogQuotaType, BacklogQuota> backlog_quota_map 
= new HashMap<>();
@@ -135,7 +137,7 @@ public class Policies {
 
     @Override
     public int hashCode() {
-        return Objects.hash(auth_policies, replication_clusters,
+        return Objects.hash(auth_policies, replication_clusters, 
allowed_clusters,
                 backlog_quota_map, publishMaxMessageRate, clusterDispatchRate,
                 topicDispatchRate, subscriptionDispatchRate, 
replicatorDispatchRate,
                 clusterSubscribeRate, deduplicationEnabled, 
autoTopicCreationOverride,
@@ -165,6 +167,7 @@ public class Policies {
             Policies other = (Policies) obj;
             return Objects.equals(auth_policies, other.auth_policies)
                     && Objects.equals(replication_clusters, 
other.replication_clusters)
+                    && Objects.equals(allowed_clusters, other.allowed_clusters)
                     && Objects.equals(backlog_quota_map, 
other.backlog_quota_map)
                     && Objects.equals(clusterDispatchRate, 
other.clusterDispatchRate)
                     && Objects.equals(topicDispatchRate, 
other.topicDispatchRate)
diff --git 
a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/NamespacesImpl.java
 
b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/NamespacesImpl.java
index 59f0ef3b347..792fbdc91d1 100644
--- 
a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/NamespacesImpl.java
+++ 
b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/NamespacesImpl.java
@@ -1950,4 +1950,26 @@ public class NamespacesImpl extends BaseResource 
implements Namespaces {
         WebTarget path = namespacePath(ns, "entryFilters");
         return asyncDeleteRequest(path);
     }
+
+    @Override
+    public List<String> getNamespaceAllowedClusters(String namespace) throws 
PulsarAdminException {
+        return sync(() -> getNamespaceAllowedClustersAsync(namespace));
+    }
+
+    @Override
+    public CompletableFuture<List<String>> 
getNamespaceAllowedClustersAsync(String namespace) {
+        return asyncGetNamespaceParts(new FutureCallback<List<String>>(){}, 
namespace, "allowedClusters");
+    }
+
+    @Override
+    public void setNamespaceAllowedClusters(String namespace, Set<String> 
clusterIds) throws PulsarAdminException {
+        sync(() -> setNamespaceAllowedClustersAsync(namespace, clusterIds));
+    }
+
+    @Override
+    public CompletableFuture<Void> setNamespaceAllowedClustersAsync(String 
namespace, Set<String> clusterIds) {
+        NamespaceName ns = NamespaceName.get(namespace);
+        WebTarget path = namespacePath(ns, "allowedClusters");
+        return asyncPostRequest(path, Entity.entity(clusterIds, 
MediaType.APPLICATION_JSON));
+    }
 }
diff --git 
a/pulsar-client-tools-test/src/test/java/org/apache/pulsar/admin/cli/PulsarAdminToolTest.java
 
b/pulsar-client-tools-test/src/test/java/org/apache/pulsar/admin/cli/PulsarAdminToolTest.java
index 5e7a9e92d15..b1768eee3c2 100644
--- 
a/pulsar-client-tools-test/src/test/java/org/apache/pulsar/admin/cli/PulsarAdminToolTest.java
+++ 
b/pulsar-client-tools-test/src/test/java/org/apache/pulsar/admin/cli/PulsarAdminToolTest.java
@@ -396,6 +396,14 @@ public class PulsarAdminToolTest {
         namespaces.run(split("get-clusters myprop/clust/ns1"));
         
verify(mockNamespaces).getNamespaceReplicationClusters("myprop/clust/ns1");
 
+        namespaces.run(split("set-allowed-clusters myprop/clust/ns1 -c 
use,usw,usc"));
+        verify(mockNamespaces).setNamespaceAllowedClusters("myprop/clust/ns1",
+                Sets.newHashSet("use", "usw", "usc"));
+
+        namespaces.run(split("get-allowed-clusters myprop/clust/ns1"));
+        verify(mockNamespaces).getNamespaceAllowedClusters("myprop/clust/ns1");
+
+
         namespaces.run(split("set-subscription-types-enabled myprop/clust/ns1 
-t Shared,Failover"));
         verify(mockNamespaces).setSubscriptionTypesEnabled("myprop/clust/ns1",
                 Sets.newHashSet(SubscriptionType.Shared, 
SubscriptionType.Failover));
diff --git 
a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdNamespaces.java
 
b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdNamespaces.java
index 9c32041a365..95d23b74fed 100644
--- 
a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdNamespaces.java
+++ 
b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdNamespaces.java
@@ -2672,6 +2672,35 @@ public class CmdNamespaces extends CmdBase {
         }
     }
 
+    @Parameters(commandDescription = "Set allowed clusters for a namespace")
+    private class SetAllowedClusters extends CliCommand {
+        @Parameter(description = "tenant/namespace", required = true)
+        private java.util.List<String> params;
+
+        @Parameter(names = { "--clusters",
+                "-c" }, description = "Replication Cluster Ids list (comma 
separated values)", required = true)
+        private String clusterIds;
+
+        @Override
+        void run() throws PulsarAdminException {
+            String namespace = validateNamespace(params);
+            List<String> clusters = Lists.newArrayList(clusterIds.split(","));
+            getAdmin().namespaces().setNamespaceAllowedClusters(namespace, 
Sets.newHashSet(clusters));
+        }
+    }
+
+    @Parameters(commandDescription = "Get allowed clusters for a namespace")
+    private class GetAllowedClusters extends CliCommand {
+        @Parameter(description = "tenant/namespace", required = true)
+        private java.util.List<String> params;
+
+        @Override
+        void run() throws PulsarAdminException {
+            String namespace = validateNamespace(params);
+            
print(getAdmin().namespaces().getNamespaceAllowedClusters(namespace));
+        }
+    }
+
     public CmdNamespaces(Supplier<PulsarAdmin> admin) {
         super("namespaces", admin);
         jcommander.addCommand("list", new GetNamespacesPerProperty());
@@ -2699,6 +2728,9 @@ public class CmdNamespaces extends CmdBase {
         jcommander.addCommand("get-subscription-types-enabled", new 
GetSubscriptionTypesEnabled());
         jcommander.addCommand("remove-subscription-types-enabled", new 
RemoveSubscriptionTypesEnabled());
 
+        jcommander.addCommand("set-allowed-clusters", new 
SetAllowedClusters());
+        jcommander.addCommand("get-allowed-clusters", new 
GetAllowedClusters());
+
         jcommander.addCommand("get-backlog-quotas", new GetBacklogQuotaMap());
         jcommander.addCommand("set-backlog-quota", new SetBacklogQuota());
         jcommander.addCommand("remove-backlog-quota", new 
RemoveBacklogQuota());
diff --git 
a/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/PolicyName.java
 
b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/PolicyName.java
index 456d4b9270c..a01e1e90027 100644
--- 
a/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/PolicyName.java
+++ 
b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/PolicyName.java
@@ -51,5 +51,6 @@ public enum PolicyName {
     MAX_TOPICS,
     RESOURCEGROUP,
     ENTRY_FILTERS,
-    SHADOW_TOPIC
+    SHADOW_TOPIC,
+    ALLOW_CLUSTERS
 }

Reply via email to