merlimat closed pull request #1463: Provide v2 support for admin non
persistent topics.
URL: https://github.com/apache/incubator-pulsar/pull/1463
This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:
As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):
diff --git
a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/NonPersistentTopicsImpl.java
b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/NonPersistentTopicsImpl.java
index d71a4de36..ccd712a72 100644
---
a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/NonPersistentTopicsImpl.java
+++
b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/NonPersistentTopicsImpl.java
@@ -27,7 +27,6 @@
import javax.ws.rs.client.Entity;
import javax.ws.rs.client.InvocationCallback;
import javax.ws.rs.client.WebTarget;
-import javax.ws.rs.core.GenericType;
import javax.ws.rs.core.MediaType;
import org.apache.pulsar.client.admin.NonPersistentTopics;
@@ -42,10 +41,12 @@
public class NonPersistentTopicsImpl extends BaseResource implements
NonPersistentTopics {
private final WebTarget adminNonPersistentTopics;
+ private final WebTarget adminV2NonPersistentTopics;
public NonPersistentTopicsImpl(WebTarget web, Authentication auth) {
super(auth);
adminNonPersistentTopics = web.path("/admin/non-persistent");
+ adminV2NonPersistentTopics = web.path("/admin/v2/non-persistent");
}
@Override
@@ -63,10 +64,9 @@ public void createPartitionedTopic(String topic, int
numPartitions) throws Pulsa
@Override
public CompletableFuture<Void> createPartitionedTopicAsync(String topic,
int numPartitions) {
checkArgument(numPartitions > 1, "Number of partitions should be more
than 1");
- TopicName ds = validateTopic(topic);
- return asyncPutRequest(
-
adminNonPersistentTopics.path(ds.getNamespace()).path(ds.getEncodedLocalName()).path("partitions"),
- Entity.entity(numPartitions, MediaType.APPLICATION_JSON));
+ TopicName topicName = validateTopic(topic);
+ WebTarget path = topicPath(topicName, "partitions");
+ return asyncPutRequest(path, Entity.entity(numPartitions,
MediaType.APPLICATION_JSON));
}
@Override
@@ -83,9 +83,10 @@ public PartitionedTopicMetadata
getPartitionedTopicMetadata(String topic) throws
@Override
public CompletableFuture<PartitionedTopicMetadata>
getPartitionedTopicMetadataAsync(String topic) {
- TopicName ds = validateTopic(topic);
+ TopicName topicName = validateTopic(topic);
final CompletableFuture<PartitionedTopicMetadata> future = new
CompletableFuture<>();
-
asyncGetRequest(adminNonPersistentTopics.path(ds.getNamespace()).path(ds.getEncodedLocalName()).path("partitions"),
+ WebTarget path = topicPath(topicName, "partitions");
+ asyncGetRequest(path,
new InvocationCallback<PartitionedTopicMetadata>() {
@Override
@@ -115,9 +116,10 @@ public NonPersistentTopicStats getStats(String topic)
throws PulsarAdminExceptio
@Override
public CompletableFuture<NonPersistentTopicStats> getStatsAsync(String
topic) {
- TopicName ds = validateTopic(topic);
+ TopicName topicName = validateTopic(topic);
final CompletableFuture<NonPersistentTopicStats> future = new
CompletableFuture<>();
-
asyncGetRequest(adminNonPersistentTopics.path(ds.getNamespace()).path(ds.getEncodedLocalName()).path("stats"),
+ WebTarget path = topicPath(topicName, "stats");
+ asyncGetRequest(path,
new InvocationCallback<NonPersistentTopicStats>() {
@Override
@@ -147,9 +149,10 @@ public PersistentTopicInternalStats
getInternalStats(String topic) throws Pulsar
@Override
public CompletableFuture<PersistentTopicInternalStats>
getInternalStatsAsync(String topic) {
- TopicName ds = validateTopic(topic);
+ TopicName topicName = validateTopic(topic);
final CompletableFuture<PersistentTopicInternalStats> future = new
CompletableFuture<>();
-
asyncGetRequest(adminNonPersistentTopics.path(ds.getNamespace()).path(ds.getEncodedLocalName()).path("internalStats"),
+ WebTarget path = topicPath(topicName, "internalStats");
+ asyncGetRequest(path,
new InvocationCallback<PersistentTopicInternalStats>() {
@Override
@@ -179,9 +182,9 @@ public void unload(String topic) throws
PulsarAdminException {
@Override
public CompletableFuture<Void> unloadAsync(String topic) {
- TopicName ds = validateTopic(topic);
- return
asyncPutRequest(adminNonPersistentTopics.path(ds.getNamespace()).path(ds.getEncodedLocalName()).path("unload"),
- Entity.entity("", MediaType.APPLICATION_JSON));
+ TopicName topicName = validateTopic(topic);
+ WebTarget path = topicPath(topicName, "unload");
+ return asyncPutRequest(path, Entity.entity("",
MediaType.APPLICATION_JSON));
}
@Override
@@ -200,8 +203,9 @@ public void unload(String topic) throws
PulsarAdminException {
public CompletableFuture<List<String>> getListInBundleAsync(String
namespace, String bundleRange) {
NamespaceName ns = NamespaceName.get(namespace);
final CompletableFuture<List<String>> future = new
CompletableFuture<>();
-
asyncGetRequest(adminNonPersistentTopics.path(ns.getProperty()).path(ns.getCluster()).path(ns.getLocalName())
- .path(bundleRange), new InvocationCallback<List<String>>() {
+ WebTarget path = namespacePath(ns, bundleRange);
+ asyncGetRequest(path,
+ new InvocationCallback<List<String>>() {
@Override
public void completed(List<String> response) {
future.complete(response);
@@ -230,7 +234,8 @@ public void failed(Throwable throwable) {
public CompletableFuture<List<String>> getListAsync(String namespace) {
NamespaceName ns = NamespaceName.get(namespace);
final CompletableFuture<List<String>> future = new
CompletableFuture<>();
-
asyncGetRequest(adminNonPersistentTopics.path(ns.getProperty()).path(ns.getCluster()).path(ns.getLocalName()),
+ WebTarget path = namespacePath(ns);
+ asyncGetRequest(path,
new InvocationCallback<List<String>>() {
@Override
public void completed(List<String> response) {
@@ -253,4 +258,17 @@ private TopicName validateTopic(String topic) {
return TopicName.get(topic);
}
+ private WebTarget namespacePath(NamespaceName namespace, String... parts) {
+ final WebTarget base = namespace.isV2() ? adminV2NonPersistentTopics :
adminNonPersistentTopics;
+ WebTarget namespacePath = base.path(namespace.toString());
+ namespacePath = WebTargets.addParts(namespacePath, parts);
+ return namespacePath;
+ }
+
+ private WebTarget topicPath(TopicName topic, String... parts) {
+ final WebTarget base = topic.isV2() ? adminV2NonPersistentTopics :
adminNonPersistentTopics;
+ WebTarget topicPath = base.path(topic.getRestPath());
+ topicPath = WebTargets.addParts(topicPath, parts);
+ return topicPath;
+ }
}
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
With regards,
Apache Git Services