This is an automated email from the ASF dual-hosted git repository.
kwang pushed a commit to branch branch-3.0
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/branch-3.0 by this push:
new 7b2e72464b8 [feat][broker][branch-3.0] PIP-321 Introduce
allowed-cluster at the namespace level (#22378) (#22960)
7b2e72464b8 is described below
commit 7b2e72464b83507c0cf17ff0b364a4883d682d1f
Author: Kai Wang <[email protected]>
AuthorDate: Mon Jul 1 10:01:10 2024 +0800
[feat][broker][branch-3.0] PIP-321 Introduce allowed-cluster at the
namespace level (#22378) (#22960)
(cherry-picked from commit
https://github.com/apache/pulsar/commit/36bae695fb07f3ee790bee603149c4c2712187e0)
Co-authored-by: Xiangying Meng
<[email protected]>
---
.../apache/pulsar/broker/admin/AdminResource.java | 44 ++++---
.../pulsar/broker/admin/impl/NamespacesBase.java | 79 ++++++++++++-
.../apache/pulsar/broker/admin/v2/Namespaces.java | 46 ++++++++
.../broker/service/persistent/PersistentTopic.java | 94 +++++++++------
.../pulsar/broker/web/PulsarWebResource.java | 15 ++-
.../broker/namespace/NamespaceServiceTest.java | 127 +++++++++++++++++++++
.../pulsar/broker/service/ReplicatorTest.java | 56 +++++++++
.../org/apache/pulsar/client/admin/Namespaces.java | 84 ++++++++++++++
.../pulsar/common/policies/data/Policies.java | 5 +-
.../client/admin/internal/NamespacesImpl.java | 22 ++++
.../pulsar/admin/cli/PulsarAdminToolTest.java | 8 ++
.../org/apache/pulsar/admin/cli/CmdNamespaces.java | 32 ++++++
.../pulsar/common/policies/data/PolicyName.java | 3 +-
13 files changed, 546 insertions(+), 69 deletions(-)
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/AdminResource.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/AdminResource.java
index 1982cc2df3c..f6be219aba3 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/AdminResource.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/AdminResource.java
@@ -320,32 +320,28 @@ public abstract class AdminResource extends
PulsarWebResource {
}
protected CompletableFuture<Policies>
getNamespacePoliciesAsync(NamespaceName namespaceName) {
- return
namespaceResources().getPoliciesAsync(namespaceName).thenCompose(policies -> {
- if (policies.isPresent()) {
- return pulsar()
- .getNamespaceService()
- .getNamespaceBundleFactory()
- .getBundlesAsync(namespaceName)
- .thenCompose(bundles -> {
- BundlesData bundleData = null;
- try {
- bundleData = bundles.getBundlesData();
- } catch (Exception e) {
- log.error("[{}] Failed to get namespace policies {}",
clientAppId(), namespaceName, e);
- return FutureUtil.failedFuture(new RestException(e));
- }
- policies.get().bundles = bundleData != null ? bundleData :
policies.get().bundles;
- if (policies.get().is_allow_auto_update_schema == null) {
- // the type changed from boolean to Boolean. return
broker value here for keeping compatibility.
- policies.get().is_allow_auto_update_schema =
pulsar().getConfig()
- .isAllowAutoUpdateSchemaEnabled();
+ CompletableFuture<Policies> result = new CompletableFuture<>();
+ namespaceResources().getPoliciesAsync(namespaceName)
+
.thenCombine(getLocalPolicies().getLocalPoliciesAsync(namespaceName), (pl,
localPolicies) -> {
+ if (pl.isPresent()) {
+ Policies policies = pl.get();
+ localPolicies.ifPresent(value -> policies.bundles =
value.bundles);
+ if (policies.is_allow_auto_update_schema == null) {
+ // the type changed from boolean to Boolean. return
+ // broker value here for keeping compatibility.
+ policies.is_allow_auto_update_schema =
pulsar().getConfig()
+ .isAllowAutoUpdateSchemaEnabled();
+ }
+ result.complete(policies);
+ } else {
+ result.completeExceptionally(new
RestException(Status.NOT_FOUND, "Namespace does not exist"));
}
- return CompletableFuture.completedFuture(policies.get());
+ return null;
+ }).exceptionally(ex -> {
+ result.completeExceptionally(ex.getCause());
+ return null;
});
- } else {
- return FutureUtil.failedFuture(new
RestException(Status.NOT_FOUND, "Namespace does not exist"));
- }
- });
+ return result;
}
protected BacklogQuota namespaceBacklogQuota(NamespaceName namespace,
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java
index 0d58055f236..7aeb2de96ac 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java
@@ -726,9 +726,21 @@ public abstract class NamespacesBase extends AdminResource
{
"Invalid cluster id: " +
clusterId);
}
return
validatePeerClusterConflictAsync(clusterId, replicationClusterSet)
- .thenCompose(__ ->
-
validateClusterForTenantAsync(
-
namespaceName.getTenant(), clusterId));
+ .thenCompose(__ ->
getNamespacePoliciesAsync(this.namespaceName)
+
.thenCompose(nsPolicies -> {
+ if
(nsPolicies.allowed_clusters.isEmpty()) {
+ return
validateClusterForTenantAsync(
+
namespaceName.getTenant(), clusterId);
+ }
+ if
(!nsPolicies.allowed_clusters.contains(clusterId)) {
+ String msg =
String.format("Cluster [%s] is not in the "
+ +
"list of allowed clusters list for namespace "
+ +
"[%s]", clusterId, namespaceName.toString());
+ log.info(msg);
+ throw new
RestException(Status.FORBIDDEN, msg);
+ }
+ return
CompletableFuture.completedFuture(null);
+ }));
}).collect(Collectors.toList());
return FutureUtil.waitForAll(futures).thenApply(__
-> replicationClusterSet);
}))
@@ -2719,4 +2731,65 @@ public abstract class NamespacesBase extends
AdminResource {
return null;
});
}
+
+ protected CompletableFuture<Void>
internalSetNamespaceAllowedClusters(List<String> clusterIds) {
+ return validateNamespacePolicyOperationAsync(namespaceName,
PolicyName.ALLOW_CLUSTERS, PolicyOperation.WRITE)
+ .thenCompose(__ -> validatePoliciesReadOnlyAccessAsync())
+ // Allowed clusters in the namespace policy should be included
in the allowed clusters in the tenant
+ // policy.
+ .thenCompose(__ ->
FutureUtil.waitForAll(clusterIds.stream().map(clusterId ->
+
validateClusterForTenantAsync(namespaceName.getTenant(), clusterId))
+ .collect(Collectors.toList())))
+ // Allowed clusters should include all the existed replication
clusters and could not contain global
+ // cluster.
+ .thenCompose(__ -> {
+ checkNotNull(clusterIds, "ClusterIds should not be null");
+ if (clusterIds.contains("global")) {
+ throw new RestException(Status.PRECONDITION_FAILED,
+ "Cannot specify global in the list of allowed
clusters");
+ }
+ return
getNamespacePoliciesAsync(this.namespaceName).thenApply(namespacePolicies -> {
+
namespacePolicies.replication_clusters.forEach(replicationCluster -> {
+ if (!clusterIds.contains(replicationCluster)) {
+ throw new RestException(Status.BAD_REQUEST,
+ String.format("Allowed clusters do not
contain the replication cluster %s. "
+ + "Please remove the
replication cluster if the cluster is not allowed "
+ + "for this namespace",
replicationCluster));
+ }
+ });
+ return Sets.newHashSet(clusterIds);
+ });
+ })
+ // Verify the allowed clusters are valid and they do not
contain the peer clusters.
+ .thenCompose(allowedClusters -> clustersAsync()
+ .thenCompose(clusters -> {
+ List<CompletableFuture<Void>> futures =
+ allowedClusters.stream().map(clusterId -> {
+ if (!clusters.contains(clusterId)) {
+ throw new
RestException(Status.FORBIDDEN,
+ "Invalid cluster id: " +
clusterId);
+ }
+ return
validatePeerClusterConflictAsync(clusterId, allowedClusters);
+ }).collect(Collectors.toList());
+ return FutureUtil.waitForAll(futures).thenApply(__
-> allowedClusters);
+ }))
+ // Update allowed clusters into policies.
+ .thenCompose(allowedClusterSet ->
updatePoliciesAsync(namespaceName, policies -> {
+ policies.allowed_clusters = allowedClusterSet;
+ return policies;
+ }));
+ }
+
+ protected CompletableFuture<Set<String>>
internalGetNamespaceAllowedClustersAsync() {
+ return validateNamespacePolicyOperationAsync(namespaceName,
PolicyName.ALLOW_CLUSTERS, PolicyOperation.READ)
+ .thenAccept(__ -> {
+ if (!namespaceName.isGlobal()) {
+ throw new RestException(Status.PRECONDITION_FAILED,
+ "Cannot get the allowed clusters for a
non-global namespace");
+ }
+ }).thenCompose(__ -> getNamespacePoliciesAsync(namespaceName))
+ .thenApply(policies -> policies.allowed_clusters);
+ }
+
+
}
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/Namespaces.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/Namespaces.java
index d673d925e77..03c23ca6563 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/Namespaces.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/Namespaces.java
@@ -2976,5 +2976,51 @@ public class Namespaces extends NamespacesBase {
});
}
+ @POST
+ @Path("/{tenant}/{namespace}/allowedClusters")
+ @ApiOperation(value = "Set the allowed clusters for a namespace.")
+ @ApiResponses(value = {
+ @ApiResponse(code = 400, message = "The list of allowed clusters
should include all replication clusters."),
+ @ApiResponse(code = 403, message = "The requester does not have
admin permissions."),
+ @ApiResponse(code = 404, message = "The specified tenant, cluster,
or namespace does not exist."),
+ @ApiResponse(code = 409, message = "A peer-cluster cannot be part
of an allowed-cluster."),
+ @ApiResponse(code = 412, message = "The namespace is not global or
the provided cluster IDs are invalid.")})
+ public void setNamespaceAllowedClusters(@Suspended AsyncResponse
asyncResponse,
+ @PathParam("tenant") String
tenant,
+ @PathParam("namespace") String
namespace,
+ @ApiParam(value = "List of
allowed clusters", required = true)
+ List<String> clusterIds) {
+ validateNamespaceName(tenant, namespace);
+ internalSetNamespaceAllowedClusters(clusterIds)
+ .thenAccept(asyncResponse::resume)
+ .exceptionally(e -> {
+ log.error("[{}] Failed to set namespace allowed clusters
on namespace {}",
+ clientAppId(), namespace, e);
+ resumeAsyncResponseExceptionally(asyncResponse, e);
+ return null;
+ });
+ }
+
+ @GET
+ @Path("/{tenant}/{namespace}/allowedClusters")
+ @ApiOperation(value = "Get the allowed clusters for a namespace.",
+ response = String.class, responseContainer = "List")
+ @ApiResponses(value = {@ApiResponse(code = 403, message = "Don't have
admin permission"),
+ @ApiResponse(code = 404, message = "Tenant or cluster or namespace
doesn't exist"),
+ @ApiResponse(code = 412, message = "Namespace is not global")})
+ public void getNamespaceAllowedClusters(@Suspended AsyncResponse
asyncResponse,
+ @PathParam("tenant") String
tenant,
+ @PathParam("namespace") String
namespace) {
+ validateNamespaceName(tenant, namespace);
+ internalGetNamespaceAllowedClustersAsync()
+ .thenAccept(asyncResponse::resume)
+ .exceptionally(e -> {
+ log.error("[{}] Failed to get namespace allowed clusters
on namespace {}", clientAppId(),
+ namespace, e);
+ resumeAsyncResponseExceptionally(asyncResponse, e);
+ return null;
+ });
+ }
+
private static final Logger log =
LoggerFactory.getLogger(Namespaces.class);
}
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
index 3eb7648614f..5478327bb81 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
@@ -1744,52 +1744,78 @@ public class PersistentTopic extends AbstractTopic
implements Topic, AddEntryCal
if (log.isDebugEnabled()) {
log.debug("[{}] Checking replication status", name);
}
-
List<String> configuredClusters =
topicPolicies.getReplicationClusters().get();
if (CollectionUtils.isEmpty(configuredClusters)) {
log.warn("[{}] No replication clusters configured", name);
return CompletableFuture.completedFuture(null);
}
- int newMessageTTLInSeconds =
topicPolicies.getMessageTTLInSeconds().get();
-
String localCluster =
brokerService.pulsar().getConfiguration().getClusterName();
- // if local cluster is removed from global namespace cluster-list :
then delete topic forcefully
- // because pulsar doesn't serve global topic without local
repl-cluster configured.
- if (TopicName.get(topic).isGlobal() &&
!configuredClusters.contains(localCluster)) {
- log.info("Deleting topic [{}] because local cluster is not part of
"
- + " global namespace repl list {}", topic,
configuredClusters);
- return deleteForcefully();
- }
-
- removeTerminatedReplicators(replicators);
- List<CompletableFuture<Void>> futures = new ArrayList<>();
-
- // Check for missing replicators
- for (String cluster : configuredClusters) {
- if (cluster.equals(localCluster)) {
- continue;
- }
- if (!replicators.containsKey(cluster)) {
- futures.add(startReplicator(cluster));
- }
- }
-
- // Check for replicators to be stopped
- replicators.forEach((cluster, replicator) -> {
- // Update message TTL
- ((PersistentReplicator)
replicator).updateMessageTTL(newMessageTTLInSeconds);
- if (!cluster.equals(localCluster)) {
- if (!configuredClusters.contains(cluster)) {
- futures.add(removeReplicator(cluster));
+ return checkAllowedCluster(localCluster).thenCompose(success -> {
+ if (!success) {
+ // if local cluster is removed from global namespace
cluster-list : then delete topic forcefully
+ // because pulsar doesn't serve global topic without local
repl-cluster configured.
+ return deleteForcefully();
+ }
+
+ int newMessageTTLInSeconds =
topicPolicies.getMessageTTLInSeconds().get();
+
+ removeTerminatedReplicators(replicators);
+ List<CompletableFuture<Void>> futures = new ArrayList<>();
+
+ // The replication clusters at namespace level will get local
cluster when creating a namespace.
+ // If there are only one cluster in the replication clusters, it
means the replication is not enabled.
+ // If the cluster 1 and cluster 2 use the same configuration store
and the namespace is created in cluster1
+ // without enabling geo-replication, then the replication clusters
always has cluster1.
+ //
+ // When a topic under the namespace is load in the cluster2, the
`cluster1` may be identified as
+ // remote cluster and start geo-replication. This check is to
avoid the above case.
+ if (!(configuredClusters.size() == 1 && replicators.isEmpty())) {
+ // Check for missing replicators
+ for (String cluster : configuredClusters) {
+ if (cluster.equals(localCluster)) {
+ continue;
+ }
+ if (!replicators.containsKey(cluster)) {
+ futures.add(startReplicator(cluster));
+ }
}
+ // Check for replicators to be stopped
+ replicators.forEach((cluster, replicator) -> {
+ // Update message TTL
+ ((PersistentReplicator)
replicator).updateMessageTTL(newMessageTTLInSeconds);
+ if (!cluster.equals(localCluster)) {
+ if (!configuredClusters.contains(cluster)) {
+ futures.add(removeReplicator(cluster));
+ }
+ }
+ });
}
- });
- futures.add(checkShadowReplication());
+ futures.add(checkShadowReplication());
- return FutureUtil.waitForAll(futures);
+ return FutureUtil.waitForAll(futures);
+ });
+ }
+
+ private CompletableFuture<Boolean> checkAllowedCluster(String
localCluster) {
+ List<String> replicationClusters =
topicPolicies.getReplicationClusters().get();
+ return
brokerService.pulsar().getPulsarResources().getNamespaceResources()
+
.getPoliciesAsync(TopicName.get(topic).getNamespaceObject()).thenCompose(policiesOptional
-> {
+ Set<String> allowedClusters = Set.of();
+ if (policiesOptional.isPresent()) {
+ allowedClusters =
policiesOptional.get().allowed_clusters;
+ }
+ if (TopicName.get(topic).isGlobal() &&
!replicationClusters.contains(localCluster)
+ && !allowedClusters.contains(localCluster)) {
+ log.warn("Local cluster {} is not part of global
namespace repl list {} and allowed list {}",
+ localCluster, replicationClusters,
allowedClusters);
+ return CompletableFuture.completedFuture(false);
+ } else {
+ return CompletableFuture.completedFuture(true);
+ }
+ });
}
private CompletableFuture<Void> checkShadowReplication() {
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/web/PulsarWebResource.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/web/PulsarWebResource.java
index 2726ccc7c98..f9b1f0d08c0 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/web/PulsarWebResource.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/web/PulsarWebResource.java
@@ -905,14 +905,16 @@ public abstract class PulsarWebResource {
log.warn(msg);
validationFuture.completeExceptionally(new
RestException(Status.NOT_FOUND,
"Namespace is deleted"));
- } else if (policies.replication_clusters.isEmpty()) {
+ } else if (policies.replication_clusters.isEmpty() &&
policies.allowed_clusters.isEmpty()) {
String msg = String.format(
"Namespace does not have any clusters configured :
local_cluster=%s ns=%s",
localCluster, namespace.toString());
log.warn(msg);
validationFuture.completeExceptionally(new
RestException(Status.PRECONDITION_FAILED, msg));
- } else if
(!policies.replication_clusters.contains(localCluster)) {
- getOwnerFromPeerClusterListAsync(pulsarService,
policies.replication_clusters)
+ } else if
(!policies.replication_clusters.contains(localCluster) &&
!policies.allowed_clusters
+ .contains(localCluster)) {
+ getOwnerFromPeerClusterListAsync(pulsarService,
policies.replication_clusters,
+ policies.allowed_clusters)
.thenAccept(ownerPeerCluster -> {
if (ownerPeerCluster != null) {
// found a peer that own this namespace
@@ -952,9 +954,9 @@ public abstract class PulsarWebResource {
}
private static CompletableFuture<ClusterDataImpl>
getOwnerFromPeerClusterListAsync(PulsarService pulsar,
- Set<String> replicationClusters) {
+ Set<String> replicationClusters, Set<String> allowedClusters) {
String currentCluster = pulsar.getConfiguration().getClusterName();
- if (replicationClusters == null || replicationClusters.isEmpty() ||
isBlank(currentCluster)) {
+ if (replicationClusters.isEmpty() && allowedClusters.isEmpty() ||
isBlank(currentCluster)) {
return CompletableFuture.completedFuture(null);
}
@@ -964,7 +966,8 @@ public abstract class PulsarWebResource {
return CompletableFuture.completedFuture(null);
}
for (String peerCluster :
cluster.get().getPeerClusterNames()) {
- if (replicationClusters.contains(peerCluster)) {
+ if (replicationClusters.contains(peerCluster)
+ || allowedClusters.contains(peerCluster)) {
return
pulsar.getPulsarResources().getClusterResources().getClusterAsync(peerCluster)
.thenApply(ret -> {
if (!ret.isPresent()) {
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/namespace/NamespaceServiceTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/namespace/NamespaceServiceTest.java
index 38a60165d56..e975fe3cfa9 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/namespace/NamespaceServiceTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/namespace/NamespaceServiceTest.java
@@ -32,6 +32,7 @@ import static org.testng.Assert.assertNull;
import static org.testng.Assert.assertTrue;
import static org.testng.Assert.fail;
import com.github.benmanes.caffeine.cache.AsyncLoadingCache;
+import com.google.common.collect.Sets;
import com.google.common.hash.Hashing;
import java.lang.reflect.Field;
import java.lang.reflect.Method;
@@ -40,6 +41,7 @@ import java.util.ArrayList;
import java.util.EnumSet;
import java.util.HashMap;
import java.util.HashSet;
+import java.util.LinkedHashSet;
import java.util.List;
import java.util.Map;
import java.util.Optional;
@@ -64,6 +66,7 @@ import org.apache.pulsar.broker.lookup.LookupResult;
import org.apache.pulsar.broker.service.BrokerTestBase;
import org.apache.pulsar.broker.service.Topic;
import org.apache.pulsar.broker.service.persistent.PersistentTopic;
+import org.apache.pulsar.client.admin.Namespaces;
import org.apache.pulsar.client.admin.PulsarAdminException;
import org.apache.pulsar.client.api.Consumer;
import org.apache.pulsar.client.api.PulsarClient;
@@ -76,8 +79,11 @@ import org.apache.pulsar.common.naming.ServiceUnitId;
import org.apache.pulsar.common.naming.TopicDomain;
import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.policies.data.BundlesData;
+import org.apache.pulsar.common.policies.data.ClusterData;
import org.apache.pulsar.common.policies.data.LocalPolicies;
import org.apache.pulsar.common.policies.data.Policies;
+import org.apache.pulsar.common.policies.data.TenantInfo;
+import org.apache.pulsar.common.policies.data.TenantInfoImpl;
import org.apache.pulsar.common.util.ObjectMapperFactory;
import org.apache.pulsar.common.util.collections.ConcurrentOpenHashMap;
import org.apache.pulsar.metadata.api.GetResult;
@@ -828,6 +834,127 @@ public class NamespaceServiceTest extends BrokerTestBase {
});
}
+ @Test
+ public void
testAllowedClustersAtNamespaceLevelShouldBeIncludedInAllowedClustersAtTenantLevel()
throws Exception {
+ // 1. Setup
+ pulsar.getConfiguration().setForceDeleteNamespaceAllowed(true);
+ pulsar.getConfiguration().setForceDeleteTenantAllowed(true);
+ Set<String> tenantAllowedClusters = Set.of("test", "r1", "r2");
+ Set<String> allowedClusters1 = Set.of("test", "r1", "r2", "r3");
+ Set<String> allowedClusters2 = Set.of("test", "r1", "r2");
+ Set<String> clusters = Set.of("r1", "r2", "r3", "r4");
+ final String tenant = "my-tenant";
+ final String namespace = tenant + "/testAllowedCluster";
+ admin.tenants().createTenant(tenant,
+ new TenantInfoImpl(Sets.newHashSet("appid1"),
Sets.newHashSet("test")));
+ admin.namespaces().createNamespace(namespace);
+
pulsar.getPulsarResources().getTenantResources().updateTenantAsync(tenant,
tenantInfo ->
+
TenantInfo.builder().allowedClusters(tenantAllowedClusters).build());
+ for (String cluster : clusters) {
+
pulsar.getPulsarResources().getClusterResources().createCluster(cluster,
ClusterData.builder().build());
+ }
+ // 2. Verify
+ admin.namespaces().setNamespaceAllowedClusters(namespace,
allowedClusters2);
+
+ try {
+ admin.namespaces().setNamespaceAllowedClusters(namespace,
allowedClusters1);
+ fail();
+ } catch (PulsarAdminException e) {
+ assertEquals(e.getStatusCode(), 403);
+ assertEquals(e.getMessage(),
+ "Cluster [r3] is not in the list of allowed clusters list
for tenant [my-tenant]");
+ }
+ // 3. Clean up
+ admin.namespaces().deleteNamespace(namespace, true);
+ admin.tenants().deleteTenant(tenant, true);
+ for (String cluster : clusters) {
+
pulsar.getPulsarResources().getClusterResources().deleteCluster(cluster);
+ }
+ pulsar.getConfiguration().setForceDeleteNamespaceAllowed(false);
+ pulsar.getConfiguration().setForceDeleteTenantAllowed(false);
+ }
+
+ /**
+ * Test case:
+ * 1. Replication clusters should be included in the allowed
clusters. For compatibility, the replication
+ * clusters could be set before the allowed clusters are set.
+ * 2. Peer cluster can not be a part of the allowed clusters.
+ */
+ @Test
+ public void
testNewAllowedClusterAdminAPIAndItsImpactOnReplicationClusterAPI() throws
Exception {
+ // 1. Setup
+ pulsar.getConfiguration().setForceDeleteNamespaceAllowed(true);
+ pulsar.getConfiguration().setForceDeleteTenantAllowed(true);
+ // Setup: Prepare cluster resource, tenant and namespace
+ Set<String> replicationClusters = Set.of("test", "r1", "r2");
+ Set<String> tenantAllowedClusters = Set.of("test", "r1", "r2", "r3");
+ Set<String> allowedClusters = Set.of("test", "r1", "r2", "r3");
+ Set<String> clusters = Set.of("r1", "r2", "r3", "r4");
+ final String tenant = "my-tenant";
+ final String namespace = tenant + "/testAllowedCluster";
+ admin.tenants().createTenant(tenant,
+ new TenantInfoImpl(Sets.newHashSet("appid1"),
Sets.newHashSet("test")));
+ admin.namespaces().createNamespace(namespace);
+
pulsar.getPulsarResources().getTenantResources().updateTenantAsync(tenant,
tenantInfo ->
+
TenantInfo.builder().allowedClusters(tenantAllowedClusters).build());
+
+ Namespaces namespaces = admin.namespaces();
+ for (String cluster : clusters) {
+
pulsar.getPulsarResources().getClusterResources().createCluster(cluster,
ClusterData.builder().build());
+ }
+ // 2. Verify
+ // 2.1 Replication clusters should be included in the allowed clusters.
+
+ // SUCCESS
+ // 2.1.1. Set replication clusters without allowed clusters at
namespace level.
+ namespaces.setNamespaceReplicationClusters(namespace,
replicationClusters);
+ // 2..1.2 Set allowed clusters.
+ namespaces.setNamespaceAllowedClusters(namespace, allowedClusters);
+ // 2.1.3. Get allowed clusters and replication clusters.
+ List<String> allowedClustersResponse =
namespaces.getNamespaceAllowedClusters(namespace);
+
+ List<String> replicationClustersResponse =
namespaces.getNamespaceReplicationClusters(namespace);
+
+ assertEquals(replicationClustersResponse.size(),
replicationClusters.size());
+ assertEquals(allowedClustersResponse.size(), allowedClusters.size());
+
+ // FAIL
+ // 2.1.4. Fail: Set allowed clusters whose scope is smaller than
replication clusters.
+ Set<String> allowedClustersSmallScope = Set.of("r1", "r3");
+ try {
+ namespaces.setNamespaceAllowedClusters(namespace,
allowedClustersSmallScope);
+ fail();
+ } catch (PulsarAdminException ignore) {}
+ // 2.1.5. Fail: Set replication clusters whose scope is excel the
allowed clusters.
+ Set<String> replicationClustersExcel = Set.of("r1", "r4");
+ try {
+ namespaces.setNamespaceReplicationClusters(namespace,
replicationClustersExcel);
+ fail();
+ //Todo: The status code in the old implementation is confused.
+ } catch (PulsarAdminException.NotAuthorizedException ignore) {}
+
+ // 2.2 Peer cluster can not be a part of the allowed clusters.
+ LinkedHashSet<String> peerCluster = new LinkedHashSet<>();
+ peerCluster.add("r2");
+ pulsar.getPulsarResources().getClusterResources().deleteCluster("r1");
+ pulsar.getPulsarResources().getClusterResources().createCluster("r1",
+ ClusterData.builder().peerClusterNames(peerCluster).build());
+ try {
+ namespaces.setNamespaceAllowedClusters(namespace, Set.of("test",
"r1", "r2", "r3"));
+ fail();
+ } catch (PulsarAdminException.ConflictException ignore) {}
+
+ // CleanUp: Namespace with replication clusters can not be deleted by
force.
+ namespaces.setNamespaceReplicationClusters(namespace,
Set.of(conf.getClusterName()));
+ admin.namespaces().deleteNamespace(namespace, true);
+ admin.tenants().deleteTenant(tenant, true);
+ for (String cluster : clusters) {
+
pulsar.getPulsarResources().getClusterResources().deleteCluster(cluster);
+ }
+ pulsar.getConfiguration().setForceDeleteNamespaceAllowed(false);
+ pulsar.getConfiguration().setForceDeleteTenantAllowed(false);
+ }
+
/**
* 1. Manually trigger "LoadReportUpdaterTask"
* 2. Registry another new zk-node-listener "waitForBrokerChangeNotice".
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorTest.java
index 3cc2ca2457a..75ff51055fc 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorTest.java
@@ -62,6 +62,7 @@ import org.apache.bookkeeper.mledger.impl.ManagedCursorImpl;
import org.apache.bookkeeper.mledger.impl.ManagedLedgerFactoryImpl;
import org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl;
import org.apache.bookkeeper.mledger.impl.PositionImpl;
+import org.apache.commons.lang3.RandomUtils;
import org.apache.pulsar.broker.BrokerTestUtil;
import org.apache.pulsar.broker.PulsarService;
import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest;
@@ -1782,6 +1783,61 @@ public class ReplicatorTest extends ReplicatorTestBase {
assertEquals(result, Lists.newArrayList("V1", "V2", "V3", "V4"));
}
+ @Test
+ public void testEnableReplicationWithNamespaceAllowedClustersPolices()
throws Exception {
+ log.info("--- testEnableReplicationWithNamespaceAllowedClustersPolices
---");
+ String namespace1 = "pulsar/ns" + RandomUtils.nextLong();
+ admin1.namespaces().createNamespace(namespace1);
+ admin2.namespaces().createNamespace(namespace1 + "init_cluster_node");
+ admin1.namespaces().setNamespaceAllowedClusters(namespace1,
Sets.newHashSet("r1", "r2"));
+ final TopicName topicName = TopicName.get(
+ BrokerTestUtil.newUniqueName("persistent://" + namespace1 +
"/testReplicatorProducerNotExceed1"));
+
+ @Cleanup PulsarClient client1 = PulsarClient
+ .builder()
+ .serviceUrl(pulsar1.getBrokerServiceUrl())
+ .build();
+ @Cleanup Producer<byte[]> producer = client1
+ .newProducer()
+ .topic(topicName.toString())
+ .create();
+ producer.newMessage().send();
+ // Enable replication at the topic level in the cluster1.
+ admin1.topics().setReplicationClusters(topicName.toString(),
List.of("r1", "r2"));
+
+ PersistentTopic persistentTopic1 = (PersistentTopic)
pulsar1.getBrokerService().getTopic(topicName.toString(),
+ false)
+ .get()
+ .get();
+ // Verify the replication from cluster1 to cluster2 is ready, but the
replication form the cluster2 to cluster1
+ // is not ready.
+ Awaitility.await().untilAsserted(() -> {
+ ConcurrentOpenHashMap<String, Replicator> replicatorMap =
persistentTopic1.getReplicators();
+ assertEquals(replicatorMap.size(), 1);
+ Replicator replicator =
replicatorMap.get(replicatorMap.keys().get(0));
+ assertTrue(replicator.isConnected());
+ });
+
+ PersistentTopic persistentTopic2 = (PersistentTopic)
pulsar2.getBrokerService().getTopic(topicName.toString(),
+ false)
+ .get()
+ .get();
+
+ Awaitility.await().untilAsserted(() -> {
+ ConcurrentOpenHashMap<String, Replicator> replicatorMap =
persistentTopic2.getReplicators();
+ assertEquals(replicatorMap.size(), 0);
+ });
+ // Enable replication at the topic level in the cluster2.
+ admin2.topics().setReplicationClusters(topicName.toString(),
List.of("r1", "r2"));
+ // Verify the replication between cluster1 and cluster2 is ready.
+ Awaitility.await().untilAsserted(() -> {
+ ConcurrentOpenHashMap<String, Replicator> replicatorMap =
persistentTopic2.getReplicators();
+ assertEquals(replicatorMap.size(), 1);
+ Replicator replicator =
replicatorMap.get(replicatorMap.keys().get(0));
+ assertTrue(replicator.isConnected());
+ });
+ }
+
private void pauseReplicator(PersistentReplicator replicator) {
Awaitility.await().untilAsserted(() -> {
assertTrue(replicator.isConnected());
diff --git
a/pulsar-client-admin-api/src/main/java/org/apache/pulsar/client/admin/Namespaces.java
b/pulsar-client-admin-api/src/main/java/org/apache/pulsar/client/admin/Namespaces.java
index 2690df658b7..32c659dc01d 100644
---
a/pulsar-client-admin-api/src/main/java/org/apache/pulsar/client/admin/Namespaces.java
+++
b/pulsar-client-admin-api/src/main/java/org/apache/pulsar/client/admin/Namespaces.java
@@ -4623,4 +4623,88 @@ public interface Namespaces {
* @return
*/
CompletableFuture<Void> removeNamespaceEntryFiltersAsync(String namespace);
+
+ /**
+ * Get the allowed clusters for a namespace.
+ * <p/>
+ * Response example:
+ *
+ * <pre>
+ * <code>["use", "usw", "usc"]</code>
+ * </pre>
+ *
+ * @param namespace
+ * Namespace name
+ * @throws NotAuthorizedException
+ * Don't have admin permission
+ * @throws NotFoundException
+ * Namespace does not exist
+ * @throws PreconditionFailedException
+ * Namespace is not global
+ * @throws PulsarAdminException
+ * Unexpected error
+ */
+ List<String> getNamespaceAllowedClusters(String namespace) throws
PulsarAdminException;
+
+ /**
+ * Get the allowed clusters for a namespace asynchronously.
+ * <p/>
+ * Response example:
+ *
+ * <pre>
+ * <code>["use", "usw", "usc"]</code>
+ * </pre>
+ *
+ * @param namespace
+ * Namespace name
+ */
+ CompletableFuture<List<String>> getNamespaceAllowedClustersAsync(String
namespace);
+
+ /**
+ * Set the allowed clusters for a namespace.
+ * <p/>
+ * Request example:
+ *
+ * <pre>
+ * <code>["us-west", "us-east", "us-cent"]</code>
+ * </pre>
+ *
+ * @param namespace
+ * Namespace name
+ * @param clusterIds
+ * Pulsar Cluster Ids
+ *
+ * @throws ConflictException
+ * Peer-cluster cannot be part of an allowed-cluster
+ * @throws NotAuthorizedException
+ * Don't have admin permission
+ * @throws NotFoundException
+ * Namespace does not exist
+ * @throws PreconditionFailedException
+ * Namespace is not global
+ * @throws PreconditionFailedException
+ * Invalid cluster ids
+ * @throws PulsarAdminException
+ * The list of allowed clusters should include all replication
clusters.
+ * @throws PulsarAdminException
+ * Unexpected error
+ */
+ void setNamespaceAllowedClusters(String namespace, Set<String> clusterIds)
throws PulsarAdminException;
+
+ /**
+ * Set the allowed clusters for a namespace asynchronously.
+ * <p/>
+ * Request example:
+ *
+ * <pre>
+ * <code>["us-west", "us-east", "us-cent"]</code>
+ * </pre>
+ *
+ * @param namespace
+ * Namespace name
+ * @param clusterIds
+ * Pulsar Cluster Ids
+ */
+ CompletableFuture<Void> setNamespaceAllowedClustersAsync(String namespace,
Set<String> clusterIds);
+
}
diff --git
a/pulsar-client-admin-api/src/main/java/org/apache/pulsar/common/policies/data/Policies.java
b/pulsar-client-admin-api/src/main/java/org/apache/pulsar/common/policies/data/Policies.java
index 066fdf1df4f..4e0c68bed3a 100644
---
a/pulsar-client-admin-api/src/main/java/org/apache/pulsar/common/policies/data/Policies.java
+++
b/pulsar-client-admin-api/src/main/java/org/apache/pulsar/common/policies/data/Policies.java
@@ -36,6 +36,8 @@ public class Policies {
public final AuthPolicies auth_policies = AuthPolicies.builder().build();
@SuppressWarnings("checkstyle:MemberName")
public Set<String> replication_clusters = new HashSet<>();
+ @SuppressWarnings("checkstyle:MemberName")
+ public Set<String> allowed_clusters = new HashSet<>();
public BundlesData bundles;
@SuppressWarnings("checkstyle:MemberName")
public Map<BacklogQuota.BacklogQuotaType, BacklogQuota> backlog_quota_map
= new HashMap<>();
@@ -135,7 +137,7 @@ public class Policies {
@Override
public int hashCode() {
- return Objects.hash(auth_policies, replication_clusters,
+ return Objects.hash(auth_policies, replication_clusters,
allowed_clusters,
backlog_quota_map, publishMaxMessageRate, clusterDispatchRate,
topicDispatchRate, subscriptionDispatchRate,
replicatorDispatchRate,
clusterSubscribeRate, deduplicationEnabled,
autoTopicCreationOverride,
@@ -165,6 +167,7 @@ public class Policies {
Policies other = (Policies) obj;
return Objects.equals(auth_policies, other.auth_policies)
&& Objects.equals(replication_clusters,
other.replication_clusters)
+ && Objects.equals(allowed_clusters, other.allowed_clusters)
&& Objects.equals(backlog_quota_map,
other.backlog_quota_map)
&& Objects.equals(clusterDispatchRate,
other.clusterDispatchRate)
&& Objects.equals(topicDispatchRate,
other.topicDispatchRate)
diff --git
a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/NamespacesImpl.java
b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/NamespacesImpl.java
index 59f0ef3b347..792fbdc91d1 100644
---
a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/NamespacesImpl.java
+++
b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/NamespacesImpl.java
@@ -1950,4 +1950,26 @@ public class NamespacesImpl extends BaseResource
implements Namespaces {
WebTarget path = namespacePath(ns, "entryFilters");
return asyncDeleteRequest(path);
}
+
+ @Override
+ public List<String> getNamespaceAllowedClusters(String namespace) throws
PulsarAdminException {
+ return sync(() -> getNamespaceAllowedClustersAsync(namespace));
+ }
+
+ @Override
+ public CompletableFuture<List<String>>
getNamespaceAllowedClustersAsync(String namespace) {
+ return asyncGetNamespaceParts(new FutureCallback<List<String>>(){},
namespace, "allowedClusters");
+ }
+
+ @Override
+ public void setNamespaceAllowedClusters(String namespace, Set<String>
clusterIds) throws PulsarAdminException {
+ sync(() -> setNamespaceAllowedClustersAsync(namespace, clusterIds));
+ }
+
+ @Override
+ public CompletableFuture<Void> setNamespaceAllowedClustersAsync(String
namespace, Set<String> clusterIds) {
+ NamespaceName ns = NamespaceName.get(namespace);
+ WebTarget path = namespacePath(ns, "allowedClusters");
+ return asyncPostRequest(path, Entity.entity(clusterIds,
MediaType.APPLICATION_JSON));
+ }
}
diff --git
a/pulsar-client-tools-test/src/test/java/org/apache/pulsar/admin/cli/PulsarAdminToolTest.java
b/pulsar-client-tools-test/src/test/java/org/apache/pulsar/admin/cli/PulsarAdminToolTest.java
index 5e7a9e92d15..b1768eee3c2 100644
---
a/pulsar-client-tools-test/src/test/java/org/apache/pulsar/admin/cli/PulsarAdminToolTest.java
+++
b/pulsar-client-tools-test/src/test/java/org/apache/pulsar/admin/cli/PulsarAdminToolTest.java
@@ -396,6 +396,14 @@ public class PulsarAdminToolTest {
namespaces.run(split("get-clusters myprop/clust/ns1"));
verify(mockNamespaces).getNamespaceReplicationClusters("myprop/clust/ns1");
+ namespaces.run(split("set-allowed-clusters myprop/clust/ns1 -c
use,usw,usc"));
+ verify(mockNamespaces).setNamespaceAllowedClusters("myprop/clust/ns1",
+ Sets.newHashSet("use", "usw", "usc"));
+
+ namespaces.run(split("get-allowed-clusters myprop/clust/ns1"));
+ verify(mockNamespaces).getNamespaceAllowedClusters("myprop/clust/ns1");
+
+
namespaces.run(split("set-subscription-types-enabled myprop/clust/ns1
-t Shared,Failover"));
verify(mockNamespaces).setSubscriptionTypesEnabled("myprop/clust/ns1",
Sets.newHashSet(SubscriptionType.Shared,
SubscriptionType.Failover));
diff --git
a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdNamespaces.java
b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdNamespaces.java
index 9c32041a365..95d23b74fed 100644
---
a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdNamespaces.java
+++
b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdNamespaces.java
@@ -2672,6 +2672,35 @@ public class CmdNamespaces extends CmdBase {
}
}
+ @Parameters(commandDescription = "Set allowed clusters for a namespace")
+ private class SetAllowedClusters extends CliCommand {
+ @Parameter(description = "tenant/namespace", required = true)
+ private java.util.List<String> params;
+
+ @Parameter(names = { "--clusters",
+ "-c" }, description = "Replication Cluster Ids list (comma
separated values)", required = true)
+ private String clusterIds;
+
+ @Override
+ void run() throws PulsarAdminException {
+ String namespace = validateNamespace(params);
+ List<String> clusters = Lists.newArrayList(clusterIds.split(","));
+ getAdmin().namespaces().setNamespaceAllowedClusters(namespace,
Sets.newHashSet(clusters));
+ }
+ }
+
+ @Parameters(commandDescription = "Get allowed clusters for a namespace")
+ private class GetAllowedClusters extends CliCommand {
+ @Parameter(description = "tenant/namespace", required = true)
+ private java.util.List<String> params;
+
+ @Override
+ void run() throws PulsarAdminException {
+ String namespace = validateNamespace(params);
+
print(getAdmin().namespaces().getNamespaceAllowedClusters(namespace));
+ }
+ }
+
public CmdNamespaces(Supplier<PulsarAdmin> admin) {
super("namespaces", admin);
jcommander.addCommand("list", new GetNamespacesPerProperty());
@@ -2699,6 +2728,9 @@ public class CmdNamespaces extends CmdBase {
jcommander.addCommand("get-subscription-types-enabled", new
GetSubscriptionTypesEnabled());
jcommander.addCommand("remove-subscription-types-enabled", new
RemoveSubscriptionTypesEnabled());
+ jcommander.addCommand("set-allowed-clusters", new
SetAllowedClusters());
+ jcommander.addCommand("get-allowed-clusters", new
GetAllowedClusters());
+
jcommander.addCommand("get-backlog-quotas", new GetBacklogQuotaMap());
jcommander.addCommand("set-backlog-quota", new SetBacklogQuota());
jcommander.addCommand("remove-backlog-quota", new
RemoveBacklogQuota());
diff --git
a/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/PolicyName.java
b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/PolicyName.java
index 456d4b9270c..a01e1e90027 100644
---
a/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/PolicyName.java
+++
b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/PolicyName.java
@@ -51,5 +51,6 @@ public enum PolicyName {
MAX_TOPICS,
RESOURCEGROUP,
ENTRY_FILTERS,
- SHADOW_TOPIC
+ SHADOW_TOPIC,
+ ALLOW_CLUSTERS
}