merlimat closed pull request #1463: Provide v2 support for  admin non 
persistent topics.
URL: https://github.com/apache/incubator-pulsar/pull/1463
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git 
a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/NonPersistentTopicsImpl.java
 
b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/NonPersistentTopicsImpl.java
index d71a4de36..ccd712a72 100644
--- 
a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/NonPersistentTopicsImpl.java
+++ 
b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/NonPersistentTopicsImpl.java
@@ -27,7 +27,6 @@
 import javax.ws.rs.client.Entity;
 import javax.ws.rs.client.InvocationCallback;
 import javax.ws.rs.client.WebTarget;
-import javax.ws.rs.core.GenericType;
 import javax.ws.rs.core.MediaType;
 
 import org.apache.pulsar.client.admin.NonPersistentTopics;
@@ -42,10 +41,12 @@
 public class NonPersistentTopicsImpl extends BaseResource implements 
NonPersistentTopics {
 
     private final WebTarget adminNonPersistentTopics;
+    private final WebTarget adminV2NonPersistentTopics;
 
     public NonPersistentTopicsImpl(WebTarget web, Authentication auth) {
         super(auth);
         adminNonPersistentTopics = web.path("/admin/non-persistent");
+        adminV2NonPersistentTopics = web.path("/admin/v2/non-persistent");
     }
 
     @Override
@@ -63,10 +64,9 @@ public void createPartitionedTopic(String topic, int 
numPartitions) throws Pulsa
     @Override
     public CompletableFuture<Void> createPartitionedTopicAsync(String topic, 
int numPartitions) {
         checkArgument(numPartitions > 1, "Number of partitions should be more 
than 1");
-        TopicName ds = validateTopic(topic);
-        return asyncPutRequest(
-                
adminNonPersistentTopics.path(ds.getNamespace()).path(ds.getEncodedLocalName()).path("partitions"),
-                Entity.entity(numPartitions, MediaType.APPLICATION_JSON));
+        TopicName topicName = validateTopic(topic);
+        WebTarget path = topicPath(topicName, "partitions");
+        return asyncPutRequest(path, Entity.entity(numPartitions, 
MediaType.APPLICATION_JSON));
     }
 
     @Override
@@ -83,9 +83,10 @@ public PartitionedTopicMetadata 
getPartitionedTopicMetadata(String topic) throws
 
     @Override
     public CompletableFuture<PartitionedTopicMetadata> 
getPartitionedTopicMetadataAsync(String topic) {
-        TopicName ds = validateTopic(topic);
+        TopicName topicName = validateTopic(topic);
         final CompletableFuture<PartitionedTopicMetadata> future = new 
CompletableFuture<>();
-        
asyncGetRequest(adminNonPersistentTopics.path(ds.getNamespace()).path(ds.getEncodedLocalName()).path("partitions"),
+        WebTarget path = topicPath(topicName, "partitions");
+        asyncGetRequest(path,
                 new InvocationCallback<PartitionedTopicMetadata>() {
 
                     @Override
@@ -115,9 +116,10 @@ public NonPersistentTopicStats getStats(String topic) 
throws PulsarAdminExceptio
 
     @Override
     public CompletableFuture<NonPersistentTopicStats> getStatsAsync(String 
topic) {
-        TopicName ds = validateTopic(topic);
+        TopicName topicName = validateTopic(topic);
         final CompletableFuture<NonPersistentTopicStats> future = new 
CompletableFuture<>();
-        
asyncGetRequest(adminNonPersistentTopics.path(ds.getNamespace()).path(ds.getEncodedLocalName()).path("stats"),
+        WebTarget path = topicPath(topicName, "stats");
+        asyncGetRequest(path,
                 new InvocationCallback<NonPersistentTopicStats>() {
 
                     @Override
@@ -147,9 +149,10 @@ public PersistentTopicInternalStats 
getInternalStats(String topic) throws Pulsar
 
     @Override
     public CompletableFuture<PersistentTopicInternalStats> 
getInternalStatsAsync(String topic) {
-        TopicName ds = validateTopic(topic);
+        TopicName topicName = validateTopic(topic);
         final CompletableFuture<PersistentTopicInternalStats> future = new 
CompletableFuture<>();
-        
asyncGetRequest(adminNonPersistentTopics.path(ds.getNamespace()).path(ds.getEncodedLocalName()).path("internalStats"),
+        WebTarget path = topicPath(topicName, "internalStats");
+        asyncGetRequest(path,
                 new InvocationCallback<PersistentTopicInternalStats>() {
 
                     @Override
@@ -179,9 +182,9 @@ public void unload(String topic) throws 
PulsarAdminException {
 
     @Override
     public CompletableFuture<Void> unloadAsync(String topic) {
-        TopicName ds = validateTopic(topic);
-        return 
asyncPutRequest(adminNonPersistentTopics.path(ds.getNamespace()).path(ds.getEncodedLocalName()).path("unload"),
-                Entity.entity("", MediaType.APPLICATION_JSON));
+        TopicName topicName = validateTopic(topic);
+        WebTarget path = topicPath(topicName, "unload");
+        return asyncPutRequest(path, Entity.entity("", 
MediaType.APPLICATION_JSON));
     }
 
     @Override
@@ -200,8 +203,9 @@ public void unload(String topic) throws 
PulsarAdminException {
     public CompletableFuture<List<String>> getListInBundleAsync(String 
namespace, String bundleRange) {
         NamespaceName ns = NamespaceName.get(namespace);
         final CompletableFuture<List<String>> future = new 
CompletableFuture<>();
-        
asyncGetRequest(adminNonPersistentTopics.path(ns.getProperty()).path(ns.getCluster()).path(ns.getLocalName())
-                .path(bundleRange), new InvocationCallback<List<String>>() {
+        WebTarget path = namespacePath(ns, bundleRange);
+        asyncGetRequest(path,
+                new InvocationCallback<List<String>>() {
                     @Override
                     public void completed(List<String> response) {
                         future.complete(response);
@@ -230,7 +234,8 @@ public void failed(Throwable throwable) {
     public CompletableFuture<List<String>> getListAsync(String namespace) {
         NamespaceName ns = NamespaceName.get(namespace);
         final CompletableFuture<List<String>> future = new 
CompletableFuture<>();
-        
asyncGetRequest(adminNonPersistentTopics.path(ns.getProperty()).path(ns.getCluster()).path(ns.getLocalName()),
+        WebTarget path = namespacePath(ns);
+        asyncGetRequest(path,
                 new InvocationCallback<List<String>>() {
                     @Override
                     public void completed(List<String> response) {
@@ -253,4 +258,17 @@ private TopicName validateTopic(String topic) {
         return TopicName.get(topic);
     }
 
+    private WebTarget namespacePath(NamespaceName namespace, String... parts) {
+        final WebTarget base = namespace.isV2() ? adminV2NonPersistentTopics : 
adminNonPersistentTopics;
+        WebTarget namespacePath = base.path(namespace.toString());
+        namespacePath = WebTargets.addParts(namespacePath, parts);
+        return namespacePath;
+    }
+
+    private WebTarget topicPath(TopicName topic, String... parts) {
+        final WebTarget base = topic.isV2() ? adminV2NonPersistentTopics : 
adminNonPersistentTopics;
+        WebTarget topicPath = base.path(topic.getRestPath());
+        topicPath = WebTargets.addParts(topicPath, parts);
+        return topicPath;
+    }
 }


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


With regards,
Apache Git Services

Reply via email to