This is an automated email from the ASF dual-hosted git repository. mmerli pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/incubator-pulsar.git
The following commit(s) were added to refs/heads/master by this push: new fc67fe2 Introduce admin api to get broker and namespace-isolation policy map (#1565) fc67fe2 is described below commit fc67fe2c98eeb7c47023ff2dd29144972a4a693f Author: Rajan Dhabalia <rdhaba...@apache.org> AuthorDate: Fri Apr 13 13:17:20 2018 -0700 Introduce admin api to get broker and namespace-isolation policy map (#1565) * Introduce admin api to get broker and namespace-isolation policy map * add missed commit --- .../pulsar/broker/admin/impl/BrokersBase.java | 2 +- .../pulsar/broker/admin/impl/ClustersBase.java | 113 ++++++++++++++++++++- .../pulsar/broker/loadbalance/LoadManager.java | 10 ++ .../broker/loadbalance/ModularLoadManager.java | 8 ++ .../loadbalance/impl/ModularLoadManagerImpl.java | 1 + .../impl/ModularLoadManagerWrapper.java | 6 ++ .../loadbalance/impl/SimpleLoadManagerImpl.java | 5 + .../impl/SimpleResourceAllocationPolicies.java | 3 +- .../pulsar/broker/namespace/NamespaceService.java | 4 +- .../apache/pulsar/broker/admin/AdminApiTest2.java | 50 +++++++++ .../org/apache/pulsar/client/admin/Clusters.java | 23 +++++ .../pulsar/client/admin/internal/ClustersImpl.java | 25 +++++ .../pulsar/admin/cli/PulsarAdminToolTest.java | 21 +++- .../admin/cli/CmdNamespaceIsolationPolicy.java | 36 +++++++ .../data/BrokerNamespaceIsolationData.java | 39 +++++++ 15 files changed, 335 insertions(+), 11 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/BrokersBase.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/BrokersBase.java index 60329d7..59027c2 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/BrokersBase.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/BrokersBase.java @@ -95,7 +95,7 @@ public class BrokersBase extends AdminResource { throw new RestException(e); } } - + @POST @Path("/configuration/{configName}/{configValue}") @ApiOperation(value = "Update dynamic serviceconfiguration into zk only. This operation requires Pulsar super-user privileges.") diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/ClustersBase.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/ClustersBase.java index de670c1..c05ea77 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/ClustersBase.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/ClustersBase.java @@ -40,12 +40,15 @@ import javax.ws.rs.core.Response.Status; import org.apache.bookkeeper.util.ZkUtils; import org.apache.pulsar.broker.admin.AdminResource; import org.apache.pulsar.broker.cache.ConfigurationCacheService; +import static org.apache.pulsar.broker.namespace.NamespaceService.NAMESPACE_ISOLATION_POLICIES; import org.apache.pulsar.broker.web.RestException; import org.apache.pulsar.common.naming.NamedEntity; +import org.apache.pulsar.common.policies.data.BrokerNamespaceIsolationData; import org.apache.pulsar.common.policies.data.ClusterData; import org.apache.pulsar.common.policies.data.FailureDomain; import org.apache.pulsar.common.policies.data.NamespaceIsolationData; import org.apache.pulsar.common.policies.impl.NamespaceIsolationPolicies; +import org.apache.pulsar.common.policies.impl.NamespaceIsolationPolicyImpl; import org.apache.pulsar.common.util.ObjectMapperFactory; import org.apache.zookeeper.CreateMode; import org.apache.zookeeper.KeeperException; @@ -56,6 +59,7 @@ import org.slf4j.LoggerFactory; import com.fasterxml.jackson.core.JsonGenerationException; import com.fasterxml.jackson.databind.JsonMappingException; +import com.google.common.collect.Lists; import com.google.common.collect.Maps; import io.swagger.annotations.ApiOperation; @@ -262,7 +266,7 @@ public class ClustersBase extends AdminResource { } // check the namespaceIsolationPolicies associated with the cluster - String path = path("clusters", cluster, "namespaceIsolationPolicies"); + String path = path("clusters", cluster, NAMESPACE_ISOLATION_POLICIES); Optional<NamespaceIsolationPolicies> nsIsolationPolicies = namespaceIsolationPoliciesCache().get(path); // Need to delete the isolation policies if present @@ -332,7 +336,7 @@ public class ClustersBase extends AdminResource { try { NamespaceIsolationPolicies nsIsolationPolicies = namespaceIsolationPoliciesCache() - .get(path("clusters", cluster, "namespaceIsolationPolicies")) + .get(path("clusters", cluster, NAMESPACE_ISOLATION_POLICIES)) .orElseThrow(() -> new RestException(Status.NOT_FOUND, "NamespaceIsolationPolicies for cluster " + cluster + " does not exist")); // construct the response to NamespaceisolationData map @@ -356,7 +360,7 @@ public class ClustersBase extends AdminResource { try { NamespaceIsolationPolicies nsIsolationPolicies = namespaceIsolationPoliciesCache() - .get(path("clusters", cluster, "namespaceIsolationPolicies")) + .get(path("clusters", cluster, NAMESPACE_ISOLATION_POLICIES)) .orElseThrow(() -> new RestException(Status.NOT_FOUND, "NamespaceIsolationPolicies for cluster " + cluster + " does not exist")); // construct the response to NamespaceisolationData map @@ -374,6 +378,105 @@ public class ClustersBase extends AdminResource { } } + @GET + @Path("/{cluster}/namespaceIsolationPolicies/brokers") + @ApiOperation(value = "Get list of brokers with namespace-isolation policies attached to them", response = BrokerNamespaceIsolationData.class) + @ApiResponses(value = { @ApiResponse(code = 403, message = "Don't have admin permission"), + @ApiResponse(code = 404, message = "Namespace-isolation policies not found"), + @ApiResponse(code = 412, message = "Cluster doesn't exist") }) + public List<BrokerNamespaceIsolationData> getBrokersWithNamespaceIsolationPolicy( + @PathParam("cluster") String cluster) { + validateSuperUserAccess(); + validateClusterExists(cluster); + + Set<String> availableBrokers; + final String nsIsolationPoliciesPath = AdminResource.path("clusters", cluster, NAMESPACE_ISOLATION_POLICIES); + Map<String, NamespaceIsolationData> nsPolicies; + try { + availableBrokers = pulsar().getLoadManager().get().getAvailableBrokers(); + } catch (Exception e) { + log.error("[{}] Failed to get list of brokers in cluster {}", clientAppId(), cluster, e); + throw new RestException(e); + } + try { + Optional<NamespaceIsolationPolicies> nsPoliciesResult = namespaceIsolationPoliciesCache() + .get(nsIsolationPoliciesPath); + if (!nsPoliciesResult.isPresent()) { + throw new RestException(Status.NOT_FOUND, "namespace-isolation policies not found for " + cluster); + } + nsPolicies = nsPoliciesResult.get().getPolicies(); + } catch (Exception e) { + log.error("[{}] Failed to get namespace isolation-policies {}", clientAppId(), cluster, e); + throw new RestException(e); + } + return availableBrokers.stream().map(broker -> { + BrokerNamespaceIsolationData brokerIsolationData = new BrokerNamespaceIsolationData(); + brokerIsolationData.brokerName = broker; + if (nsPolicies != null) { + nsPolicies.forEach((name, policyData) -> { + NamespaceIsolationPolicyImpl nsPolicyImpl = new NamespaceIsolationPolicyImpl(policyData); + if (nsPolicyImpl.isPrimaryBroker(broker) || nsPolicyImpl.isSecondaryBroker(broker)) { + if (brokerIsolationData.namespaceRegex == null) { + brokerIsolationData.namespaceRegex = Lists.newArrayList(); + } + brokerIsolationData.namespaceRegex.addAll(policyData.namespaces); + } + }); + } + return brokerIsolationData; + }).collect(Collectors.toList()); + } + + @GET + @Path("/{cluster}/namespaceIsolationPolicies/brokers/{broker}") + @ApiOperation(value = "Get a broker with namespace-isolation policies attached to it", response = BrokerNamespaceIsolationData.class) + @ApiResponses(value = { @ApiResponse(code = 403, message = "Don't have admin permission"), + @ApiResponse(code = 404, message = "Namespace-isolation policies/ Broker not found"), + @ApiResponse(code = 412, message = "Cluster doesn't exist") }) + public BrokerNamespaceIsolationData getBrokerWithNamespaceIsolationPolicy(@PathParam("cluster") String cluster, + @PathParam("broker") String broker) { + validateSuperUserAccess(); + validateClusterExists(cluster); + + Set<String> availableBrokers; + final String nsIsolationPoliciesPath = AdminResource.path("clusters", cluster, NAMESPACE_ISOLATION_POLICIES); + Map<String, NamespaceIsolationData> nsPolicies; + try { + availableBrokers = pulsar().getLoadManager().get().getAvailableBrokers(); + } catch (Exception e) { + log.error("[{}] Failed to get list of brokers in cluster {}", clientAppId(), cluster, e); + throw new RestException(e); + } + if (availableBrokers == null || !availableBrokers.contains(broker)) { + throw new RestException(Status.NOT_FOUND, "Broker is not part of active broker list " + broker); + } + try { + Optional<NamespaceIsolationPolicies> nsPoliciesResult = namespaceIsolationPoliciesCache() + .get(nsIsolationPoliciesPath); + if (!nsPoliciesResult.isPresent()) { + throw new RestException(Status.NOT_FOUND, "namespace-isolation policies not found for " + cluster); + } + nsPolicies = nsPoliciesResult.get().getPolicies(); + } catch (Exception e) { + log.error("[{}] Failed to get namespace isolation-policies {}", clientAppId(), cluster, e); + throw new RestException(e); + } + BrokerNamespaceIsolationData brokerIsolationData = new BrokerNamespaceIsolationData(); + brokerIsolationData.brokerName = broker; + if (nsPolicies != null) { + nsPolicies.forEach((name, policyData) -> { + NamespaceIsolationPolicyImpl nsPolicyImpl = new NamespaceIsolationPolicyImpl(policyData); + if (nsPolicyImpl.isPrimaryBroker(broker) || nsPolicyImpl.isSecondaryBroker(broker)) { + if (brokerIsolationData.namespaceRegex == null) { + brokerIsolationData.namespaceRegex = Lists.newArrayList(); + } + brokerIsolationData.namespaceRegex.addAll(policyData.namespaces); + } + }); + } + return brokerIsolationData; + } + @POST @Path("/{cluster}/namespaceIsolationPolicies/{policyName}") @ApiOperation(value = "Set namespace isolation policy") @@ -389,7 +492,7 @@ public class ClustersBase extends AdminResource { // validate the policy data before creating the node policyData.validate(); - String nsIsolationPolicyPath = path("clusters", cluster, "namespaceIsolationPolicies"); + String nsIsolationPolicyPath = path("clusters", cluster, NAMESPACE_ISOLATION_POLICIES); NamespaceIsolationPolicies nsIsolationPolicies = namespaceIsolationPoliciesCache() .get(nsIsolationPolicyPath).orElseGet(() -> { try { @@ -458,7 +561,7 @@ public class ClustersBase extends AdminResource { try { - String nsIsolationPolicyPath = path("clusters", cluster, "namespaceIsolationPolicies"); + String nsIsolationPolicyPath = path("clusters", cluster, NAMESPACE_ISOLATION_POLICIES); NamespaceIsolationPolicies nsIsolationPolicies = namespaceIsolationPoliciesCache() .get(nsIsolationPolicyPath).orElseGet(() -> { try { diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/LoadManager.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/LoadManager.java index 3a7409e..6a9dc38 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/LoadManager.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/LoadManager.java @@ -20,6 +20,7 @@ package org.apache.pulsar.broker.loadbalance; import java.util.List; import java.util.Optional; +import java.util.Set; import org.apache.pulsar.broker.PulsarServerException; import org.apache.pulsar.broker.PulsarService; @@ -107,6 +108,14 @@ public interface LoadManager { * @throws Exception */ public void disableBroker() throws Exception; + + /** + * Get list of available brokers in cluster + * + * @return + * @throws Exception + */ + Set<String> getAvailableBrokers() throws Exception; public void stop() throws PulsarServerException; @@ -139,4 +148,5 @@ public interface LoadManager { // If we failed to create a load manager, default to SimpleLoadManagerImpl. return new SimpleLoadManagerImpl(pulsar); } + } diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/ModularLoadManager.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/ModularLoadManager.java index 9d5603d..369bf5a 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/ModularLoadManager.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/ModularLoadManager.java @@ -19,6 +19,7 @@ package org.apache.pulsar.broker.loadbalance; import java.util.Optional; +import java.util.Set; import org.apache.pulsar.broker.PulsarServerException; import org.apache.pulsar.broker.PulsarService; @@ -103,4 +104,11 @@ public interface ModularLoadManager { * @return */ Deserializer<? extends ServiceLookupData> getLoadReportDeserializer(); + + /** + * Get available broker list in cluster + * + * @return + */ + Set<String> getAvailableBrokers(); } diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/ModularLoadManagerImpl.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/ModularLoadManagerImpl.java index 23f651e..714389f 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/ModularLoadManagerImpl.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/ModularLoadManagerImpl.java @@ -335,6 +335,7 @@ public class ModularLoadManagerImpl implements ModularLoadManager, ZooKeeperCach } } + @Override public Set<String> getAvailableBrokers() { try { return availableActiveBrokers.get(); diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/ModularLoadManagerWrapper.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/ModularLoadManagerWrapper.java index 54b9d56..82d99b6 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/ModularLoadManagerWrapper.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/ModularLoadManagerWrapper.java @@ -21,6 +21,7 @@ package org.apache.pulsar.broker.loadbalance.impl; import java.util.Collections; import java.util.List; import java.util.Optional; +import java.util.Set; import org.apache.pulsar.broker.PulsarServerException; import org.apache.pulsar.broker.PulsarService; @@ -122,4 +123,9 @@ public class ModularLoadManagerWrapper implements LoadManager { public ModularLoadManager getLoadManager() { return loadManager; } + + @Override + public Set<String> getAvailableBrokers() throws Exception { + return loadManager.getAvailableBrokers(); + } } diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/SimpleLoadManagerImpl.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/SimpleLoadManagerImpl.java index bbaf8bb..da4c0e3 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/SimpleLoadManagerImpl.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/SimpleLoadManagerImpl.java @@ -351,6 +351,11 @@ public class SimpleLoadManagerImpl implements LoadManager, ZooKeeperCacheListene return this.availableActiveBrokers; } + @Override + public Set<String> getAvailableBrokers() throws Exception { + return this.availableActiveBrokers.get(); + } + public ZooKeeperDataCache<LoadReport> getLoadReportCache() { return this.loadReportCacheZk; } diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/SimpleResourceAllocationPolicies.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/SimpleResourceAllocationPolicies.java index fa67c0d..2c1aa99 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/SimpleResourceAllocationPolicies.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/SimpleResourceAllocationPolicies.java @@ -26,6 +26,7 @@ import org.apache.pulsar.broker.admin.AdminResource; import org.apache.pulsar.broker.loadbalance.LoadReport; import org.apache.pulsar.broker.loadbalance.ResourceUnit; import org.apache.pulsar.broker.loadbalance.ServiceUnit; +import static org.apache.pulsar.broker.namespace.NamespaceService.NAMESPACE_ISOLATION_POLICIES; import org.apache.pulsar.common.naming.NamespaceName; import org.apache.pulsar.common.policies.NamespaceIsolationPolicy; import org.apache.pulsar.common.policies.impl.NamespaceIsolationPolicies; @@ -51,7 +52,7 @@ public class SimpleResourceAllocationPolicies { private Optional<NamespaceIsolationPolicies> getIsolationPolicies(String clusterName) { try { return namespaceIsolationPolicies - .get(AdminResource.path("clusters", clusterName, "namespaceIsolationPolicies")); + .get(AdminResource.path("clusters", clusterName, NAMESPACE_ISOLATION_POLICIES)); } catch (Exception e) { LOG.warn("GetIsolationPolicies: Unable to get the namespaceIsolationPolicies [{}]", e); return Optional.empty(); diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java index 2668f95..a712aec 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java @@ -119,6 +119,8 @@ public class NamespaceService { public static final Pattern SLA_NAMESPACE_PATTERN = Pattern.compile(SLA_NAMESPACE_PROPERTY + "/[^/]+/([^:]+:\\d+)"); public static final String HEARTBEAT_NAMESPACE_FMT = "pulsar/%s/%s:%s"; public static final String SLA_NAMESPACE_FMT = SLA_NAMESPACE_PROPERTY + "/%s/%s:%s"; + + public static final String NAMESPACE_ISOLATION_POLICIES = "namespaceIsolationPolicies"; /** * Default constructor. @@ -519,7 +521,7 @@ public class NamespaceService { private NamespaceIsolationPolicies getLocalNamespaceIsolationPolicies() throws Exception { String localCluster = pulsar.getConfiguration().getClusterName(); return pulsar.getConfigurationCache().namespaceIsolationPoliciesCache() - .get(AdminResource.path("clusters", localCluster, "namespaceIsolationPolicies")).orElseGet(() -> { + .get(AdminResource.path("clusters", localCluster, NAMESPACE_ISOLATION_POLICIES)).orElseGet(() -> { // the namespace isolation policies are empty/undefined = an empty object return new NamespaceIsolationPolicies(); }); diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiTest2.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiTest2.java index c8c3b75..7e4262f 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiTest2.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiTest2.java @@ -27,6 +27,8 @@ import static org.testng.Assert.assertTrue; import static org.testng.Assert.fail; import java.net.URL; +import java.util.ArrayList; +import java.util.HashMap; import java.util.HashSet; import java.util.List; import java.util.Map; @@ -54,9 +56,13 @@ import org.apache.pulsar.client.api.SubscriptionType; import org.apache.pulsar.client.impl.MessageIdImpl; import org.apache.pulsar.common.naming.TopicDomain; import org.apache.pulsar.common.naming.TopicName; +import org.apache.pulsar.common.policies.data.AutoFailoverPolicyData; +import org.apache.pulsar.common.policies.data.AutoFailoverPolicyType; +import org.apache.pulsar.common.policies.data.BrokerNamespaceIsolationData; import org.apache.pulsar.common.policies.data.ClusterData; import org.apache.pulsar.common.policies.data.ConsumerStats; import org.apache.pulsar.common.policies.data.FailureDomain; +import org.apache.pulsar.common.policies.data.NamespaceIsolationData; import org.apache.pulsar.common.policies.data.NonPersistentTopicStats; import org.apache.pulsar.common.policies.data.PartitionedTopicStats; import org.apache.pulsar.common.policies.data.PersistencePolicies; @@ -65,6 +71,7 @@ import org.apache.pulsar.common.policies.data.PersistentTopicStats; import org.apache.pulsar.common.policies.data.PropertyAdmin; import org.apache.pulsar.common.policies.data.RetentionPolicies; import org.apache.pulsar.common.policies.data.SubscriptionStats; +import org.testng.Assert; import org.testng.annotations.AfterMethod; import org.testng.annotations.BeforeMethod; import org.testng.annotations.DataProvider; @@ -805,4 +812,47 @@ public class AdminApiTest2 extends MockedPulsarServiceBaseTest { // Expected } } + + @Test + public void brokerNamespaceIsolationPolicies() throws Exception { + + // create + String policyName1 = "policy-1"; + String namespaceRegex = "other/use/other.*"; + String cluster = "use"; + String brokerName = pulsar.getAdvertisedAddress(); + String brokerAddress = brokerName + ":" + pulsar.getConfiguration().getWebServicePort(); + NamespaceIsolationData nsPolicyData1 = new NamespaceIsolationData(); + nsPolicyData1.namespaces = new ArrayList<String>(); + nsPolicyData1.namespaces.add(namespaceRegex); + nsPolicyData1.primary = new ArrayList<String>(); + nsPolicyData1.primary.add(brokerName + ":[0-9]*"); + nsPolicyData1.secondary = new ArrayList<String>(); + nsPolicyData1.secondary.add(brokerName + ".*"); + nsPolicyData1.auto_failover_policy = new AutoFailoverPolicyData(); + nsPolicyData1.auto_failover_policy.policy_type = AutoFailoverPolicyType.min_available; + nsPolicyData1.auto_failover_policy.parameters = new HashMap<String, String>(); + nsPolicyData1.auto_failover_policy.parameters.put("min_limit", "1"); + nsPolicyData1.auto_failover_policy.parameters.put("usage_threshold", "100"); + admin.clusters().createNamespaceIsolationPolicy(cluster, policyName1, nsPolicyData1); + + List<BrokerNamespaceIsolationData> brokerIsolationDataList = admin.clusters() + .getBrokersWithNamespaceIsolationPolicy(cluster); + assertEquals(brokerIsolationDataList.size(), 1); + assertEquals(brokerIsolationDataList.get(0).brokerName, brokerAddress); + assertEquals(brokerIsolationDataList.get(0).namespaceRegex.size(), 1); + assertEquals(brokerIsolationDataList.get(0).namespaceRegex.get(0), namespaceRegex); + + BrokerNamespaceIsolationData brokerIsolationData = admin.clusters() + .getBrokerWithNamespaceIsolationPolicy(cluster, brokerAddress); + assertEquals(brokerIsolationData.brokerName, brokerAddress); + assertEquals(brokerIsolationData.namespaceRegex.size(), 1); + assertEquals(brokerIsolationData.namespaceRegex.get(0), namespaceRegex); + + try { + admin.clusters().getBrokerWithNamespaceIsolationPolicy(cluster, "invalid-broker"); + Assert.fail("should have failed due to invalid broker address"); + } catch (PulsarAdminException.NotFoundException e) {// expected + } + } } diff --git a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/Clusters.java b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/Clusters.java index 4048367..54c9c86 100644 --- a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/Clusters.java +++ b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/Clusters.java @@ -27,6 +27,7 @@ import org.apache.pulsar.client.admin.PulsarAdminException.ConflictException; import org.apache.pulsar.client.admin.PulsarAdminException.NotAuthorizedException; import org.apache.pulsar.client.admin.PulsarAdminException.NotFoundException; import org.apache.pulsar.client.admin.PulsarAdminException.PreconditionFailedException; +import org.apache.pulsar.common.policies.data.BrokerNamespaceIsolationData; import org.apache.pulsar.common.policies.data.ClusterData; import org.apache.pulsar.common.policies.data.FailureDomain; import org.apache.pulsar.common.policies.data.NamespaceIsolationData; @@ -227,6 +228,28 @@ public interface Clusters { void createNamespaceIsolationPolicy(String cluster, String policyName, NamespaceIsolationData namespaceIsolationData) throws PulsarAdminException; + + /** + * Returns list of active brokers with namespace-isolation policies attached to it. + * + * @param cluster + * @return + * @throws PulsarAdminException + */ + List<BrokerNamespaceIsolationData> getBrokersWithNamespaceIsolationPolicy(String cluster) + throws PulsarAdminException; + + /** + * Returns active broker with namespace-isolation policies attached to it. + * + * @param cluster + * @param broker + * @return + * @throws PulsarAdminException + */ + BrokerNamespaceIsolationData getBrokerWithNamespaceIsolationPolicy(String cluster, String broker) + throws PulsarAdminException; + /** * Update a namespace isolation policy for a cluster diff --git a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/ClustersImpl.java b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/ClustersImpl.java index 562fbee..32cb5c0 100644 --- a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/ClustersImpl.java +++ b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/ClustersImpl.java @@ -31,6 +31,7 @@ import javax.ws.rs.core.MediaType; import org.apache.pulsar.client.admin.Clusters; import org.apache.pulsar.client.admin.PulsarAdminException; import org.apache.pulsar.client.api.Authentication; +import org.apache.pulsar.common.policies.data.BrokerNamespaceIsolationData; import org.apache.pulsar.common.policies.data.ClusterData; import org.apache.pulsar.common.policies.data.FailureDomain; import org.apache.pulsar.common.policies.data.ErrorData; @@ -124,6 +125,30 @@ public class ClustersImpl extends BaseResource implements Clusters { } } + + @Override + public List<BrokerNamespaceIsolationData> getBrokersWithNamespaceIsolationPolicy(String cluster) + throws PulsarAdminException { + try { + return request(adminClusters.path(cluster).path("namespaceIsolationPolicies").path("brokers")) + .get(new GenericType<List<BrokerNamespaceIsolationData>>() { + }); + } catch (Exception e) { + throw getApiException(e); + } + } + + @Override + public BrokerNamespaceIsolationData getBrokerWithNamespaceIsolationPolicy(String cluster, String broker) + throws PulsarAdminException { + try { + return request(adminClusters.path(cluster).path("namespaceIsolationPolicies").path("brokers").path(broker)) + .get(BrokerNamespaceIsolationData.class); + } catch (Exception e) { + throw getApiException(e); + } + } + @Override public void createNamespaceIsolationPolicy(String cluster, String policyName, NamespaceIsolationData namespaceIsolationData) throws PulsarAdminException { diff --git a/pulsar-client-tools-test/src/test/java/org/apache/pulsar/admin/cli/PulsarAdminToolTest.java b/pulsar-client-tools-test/src/test/java/org/apache/pulsar/admin/cli/PulsarAdminToolTest.java index e70f20b..d081ccf 100644 --- a/pulsar-client-tools-test/src/test/java/org/apache/pulsar/admin/cli/PulsarAdminToolTest.java +++ b/pulsar-client-tools-test/src/test/java/org/apache/pulsar/admin/cli/PulsarAdminToolTest.java @@ -23,9 +23,6 @@ import static org.mockito.Mockito.mock; import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; -import com.google.common.collect.Lists; -import com.google.common.collect.Sets; - import java.util.EnumSet; import org.apache.pulsar.client.admin.BrokerStats; @@ -53,6 +50,9 @@ import org.mockito.Matchers; import org.mockito.Mockito; import org.testng.annotations.Test; +import com.google.common.collect.Lists; +import com.google.common.collect.Sets; + @Test public class PulsarAdminToolTest { @@ -427,6 +427,21 @@ public class PulsarAdminToolTest { } @Test + void namespaceIsolationPolicy() throws Exception { + PulsarAdmin admin = Mockito.mock(PulsarAdmin.class); + Clusters mockClusters = mock(Clusters.class); + when(admin.clusters()).thenReturn(mockClusters); + + CmdNamespaceIsolationPolicy nsIsolationPoliciesCmd = new CmdNamespaceIsolationPolicy(admin); + + nsIsolationPoliciesCmd.run(split("brokers use")); + verify(mockClusters).getBrokersWithNamespaceIsolationPolicy("use"); + + nsIsolationPoliciesCmd.run(split("broker use --broker my-broker")); + verify(mockClusters).getBrokerWithNamespaceIsolationPolicy("use", "my-broker"); + } + + @Test void persistentTopics() throws Exception { PulsarAdmin admin = Mockito.mock(PulsarAdmin.class); PersistentTopics mockTopics = mock(PersistentTopics.class); diff --git a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdNamespaceIsolationPolicy.java b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdNamespaceIsolationPolicy.java index f215462..ed3bda6 100644 --- a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdNamespaceIsolationPolicy.java +++ b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdNamespaceIsolationPolicy.java @@ -27,6 +27,7 @@ import org.apache.pulsar.client.admin.PulsarAdmin; import org.apache.pulsar.client.admin.PulsarAdminException; import org.apache.pulsar.common.policies.data.AutoFailoverPolicyData; import org.apache.pulsar.common.policies.data.AutoFailoverPolicyType; +import org.apache.pulsar.common.policies.data.BrokerNamespaceIsolationData; import org.apache.pulsar.common.policies.data.NamespaceIsolationData; import com.beust.jcommander.Parameter; @@ -82,6 +83,39 @@ public class CmdNamespaceIsolationPolicy extends CmdBase { } } + @Parameters(commandDescription = "List all brokers with namespace-isolation policies attached to it. This operation requires Pulsar super-user privileges") + private class GetAllBrokersWithPolicies extends CliCommand { + @Parameter(description = "cluster-name\n", required = true) + private List<String> params; + + void run() throws PulsarAdminException { + String clusterName = getOneArgument(params); + + List<BrokerNamespaceIsolationData> brokers = admin.clusters() + .getBrokersWithNamespaceIsolationPolicy(clusterName); + + print(brokers); + } + } + + @Parameters(commandDescription = "Get broker with namespace-isolation policies attached to it. This operation requires Pulsar super-user privileges") + private class GetBrokerWithPolicies extends CliCommand { + @Parameter(description = "cluster-name\n", required = true) + private List<String> params; + + @Parameter(names = "--broker", description = "Broker-name to get namespace-isolation policies attached to it", required = true) + private String broker; + + void run() throws PulsarAdminException { + String clusterName = getOneArgument(params); + + BrokerNamespaceIsolationData brokerData = admin.clusters() + .getBrokerWithNamespaceIsolationPolicy(clusterName, broker); + + print(brokerData); + } + } + @Parameters(commandDescription = "Get namespace isolation policy of a cluster. This operation requires Pulsar super-user privileges") private class GetPolicy extends CliCommand { @Parameter(description = "cluster-name policy-name\n", required = true) @@ -195,6 +229,8 @@ public class CmdNamespaceIsolationPolicy extends CmdBase { jcommander.addCommand("get", new GetPolicy()); jcommander.addCommand("list", new GetAllPolicies()); jcommander.addCommand("delete", new DeletePolicy()); + jcommander.addCommand("brokers", new GetAllBrokersWithPolicies()); + jcommander.addCommand("broker", new GetBrokerWithPolicies()); } } diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/BrokerNamespaceIsolationData.java b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/BrokerNamespaceIsolationData.java new file mode 100644 index 0000000..35fc73c --- /dev/null +++ b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/BrokerNamespaceIsolationData.java @@ -0,0 +1,39 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.common.policies.data; + +import java.util.List; + +import com.google.common.base.Objects; + +public class BrokerNamespaceIsolationData { + + public String brokerName; + public List<String> namespaceRegex; //isolated namespace regex + + @Override + public boolean equals(Object obj) { + if (obj instanceof BrokerNamespaceIsolationData) { + BrokerNamespaceIsolationData other = (BrokerNamespaceIsolationData) obj; + return Objects.equal(brokerName, other.brokerName) && Objects.equal(namespaceRegex, other.namespaceRegex); + } + return false; + } + +} -- To stop receiving notification emails like this one, please contact mme...@apache.org.