rdhabalia closed pull request #1268: Add get-peer clusters admin api URL: https://github.com/apache/incubator-pulsar/pull/1268
This is a PR merged from a forked repository. As GitHub hides the original diff on merge, it is displayed below for the sake of provenance: As this is a foreign pull request (from a fork), the diff is supplied below (as it won't show otherwise due to GitHub magic): diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/ClustersBase.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/ClustersBase.java index 733105a37..de670c136 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/ClustersBase.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/ClustersBase.java @@ -213,6 +213,28 @@ public void setPeerClusterNames(@PathParam("cluster") String cluster, LinkedHash } } + @GET + @Path("/{cluster}/peers") + @ApiOperation(value = "Get the peer-cluster data for the specified cluster.", response = Set.class) + @ApiResponses(value = { @ApiResponse(code = 403, message = "Don't have admin permission"), + @ApiResponse(code = 404, message = "Cluster doesn't exist") }) + public Set<String> getPeerCluster(@PathParam("cluster") String cluster) { + validateSuperUserAccess(); + + try { + String clusterPath = path("clusters", cluster); + byte[] content = globalZk().getData(clusterPath, null, null); + ClusterData clusterData = jsonMapper().readValue(content, ClusterData.class); + return clusterData.getPeerClusterNames(); + } catch (KeeperException.NoNodeException e) { + log.warn("[{}] Failed to get cluster {}: Does not exist", clientAppId(), cluster); + throw new RestException(Status.NOT_FOUND, "Cluster does not exist"); + } catch (Exception e) { + log.error("[{}] Failed to get cluster {}", clientAppId(), cluster, e); + throw new RestException(e); + } + } + @DELETE @Path("/{cluster}") @ApiOperation(value = "Delete an existing cluster") diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PeerReplicatorTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PeerReplicatorTest.java index c344af55a..d5456fcca 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PeerReplicatorTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PeerReplicatorTest.java @@ -18,13 +18,16 @@ */ package org.apache.pulsar.broker.service; +import static org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest.retryStrategically; import static org.testng.Assert.assertEquals; import static org.testng.Assert.assertNotNull; import static org.testng.Assert.fail; +import java.util.LinkedHashSet; import java.util.concurrent.TimeUnit; import org.apache.pulsar.broker.service.persistent.PersistentTopic; +import org.apache.pulsar.client.admin.PulsarAdminException; import org.apache.pulsar.client.api.ClientConfiguration; import org.apache.pulsar.client.api.Producer; import org.apache.pulsar.client.api.PulsarClient; @@ -151,6 +154,22 @@ public void testPeerClusterTopicLookup(String protocol) throws Exception { } + @Test + public void testGetPeerClusters() throws Exception { + final String mainClusterName = "r1"; + assertEquals(admin1.clusters().getPeerClusterNames(mainClusterName), null); + LinkedHashSet<String> peerClusters = Sets.newLinkedHashSet(Lists.newArrayList("r2", "r3")); + admin1.clusters().updatePeerClusterNames(mainClusterName, peerClusters); + retryStrategically((test) -> { + try { + return admin1.clusters().getPeerClusterNames(mainClusterName).size() == 1; + } catch (PulsarAdminException e) { + return false; + } + }, 5, 100); + assertEquals(admin1.clusters().getPeerClusterNames(mainClusterName), peerClusters); + } + private static final Logger log = LoggerFactory.getLogger(PeerReplicatorTest.class); } diff --git a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/Clusters.java b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/Clusters.java index 779e7b117..4048367de 100644 --- a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/Clusters.java +++ b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/Clusters.java @@ -21,6 +21,7 @@ import java.util.LinkedHashSet; import java.util.List; import java.util.Map; +import java.util.Set; import org.apache.pulsar.client.admin.PulsarAdminException.ConflictException; import org.apache.pulsar.client.admin.PulsarAdminException.NotAuthorizedException; @@ -133,6 +134,28 @@ * Unexpected error */ void updatePeerClusterNames(String cluster, LinkedHashSet<String> peerClusterNames) throws PulsarAdminException; + + /** + * Get peer-cluster names + * <p> + * + * @param cluster + * Cluster name + * @return + * @throws NotAuthorizedException + * You don't have admin permission to create the cluster + * + * @throws NotFoundException + * Domain doesn't exist + * + * @throws PreconditionFailedException + * Cluster doesn't exist + * + * @throws PulsarAdminException + * Unexpected error + */ + Set<String> getPeerClusterNames(String cluster) throws PulsarAdminException; + /** * Delete an existing cluster diff --git a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/ClustersImpl.java b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/ClustersImpl.java index 666a2e8b0..fcbf01267 100644 --- a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/ClustersImpl.java +++ b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/ClustersImpl.java @@ -21,6 +21,7 @@ import java.util.LinkedHashSet; import java.util.List; import java.util.Map; +import java.util.Set; import javax.ws.rs.client.Entity; import javax.ws.rs.client.WebTarget; @@ -94,6 +95,15 @@ public void updatePeerClusterNames(String cluster, LinkedHashSet<String> peerClu } + @Override + public Set<String> getPeerClusterNames(String cluster) throws PulsarAdminException { + try { + return request(clusters.path(cluster).path("peers")).get(LinkedHashSet.class); + } catch (Exception e) { + throw getApiException(e); + } + } + @Override public void deleteCluster(String cluster) throws PulsarAdminException { try { diff --git a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdClusters.java b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdClusters.java index 5ba6c1077..88f3b3f04 100644 --- a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdClusters.java +++ b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdClusters.java @@ -127,6 +127,19 @@ void run() throws PulsarAdminException { } } + @Parameters(commandDescription = "Get list of peer-clusters") + private class GetPeerClusters extends CliCommand { + + @Parameter(description = "cluster-name\n", required = true) + private java.util.List<String> params; + + void run() throws PulsarAdminException { + String cluster = getOneArgument(params); + print(admin.clusters().getPeerClusterNames(cluster)); + } + } + + @Parameters(commandDescription = "Create a new failure-domain for a cluster. updates it if already created.") private class CreateFailureDomain extends CliCommand { @Parameter(description = "cluster-name\n", required = true) @@ -213,6 +226,7 @@ public CmdClusters(PulsarAdmin admin) { jcommander.addCommand("delete", new Delete()); jcommander.addCommand("list", new List()); jcommander.addCommand("update-peer-clusters", new UpdatePeerClusters()); + jcommander.addCommand("get-peer-clusters", new GetPeerClusters()); jcommander.addCommand("get-failure-domain", new GetFailureDomain()); jcommander.addCommand("create-failure-domain", new CreateFailureDomain()); jcommander.addCommand("update-failure-domain", new UpdateFailureDomain()); diff --git a/pulsar-client-tools/src/test/java/org/apache/pulsar/admin/cli/PulsarAdminToolTest.java b/pulsar-client-tools/src/test/java/org/apache/pulsar/admin/cli/PulsarAdminToolTest.java index 6328f41d3..16d078551 100644 --- a/pulsar-client-tools/src/test/java/org/apache/pulsar/admin/cli/PulsarAdminToolTest.java +++ b/pulsar-client-tools/src/test/java/org/apache/pulsar/admin/cli/PulsarAdminToolTest.java @@ -175,6 +175,9 @@ void clusters() throws Exception { clusters.run(split("update-peer-clusters my-cluster --peer-clusters c1,c2")); verify(mockClusters).updatePeerClusterNames("my-cluster", Sets.newLinkedHashSet(Lists.newArrayList("c1", "c2"))); + + clusters.run(split("get-peer-clusters my-cluster")); + verify(mockClusters).getPeerClusterNames("my-cluster"); } @Test ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services