This is an automated email from the ASF dual-hosted git repository.
mmerli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 7f37914 Provude v2 support for admin non persistent topics. (#1463)
7f37914 is described below
commit 7f379143feecc0a04de5701b63fa6758be63971e
Author: cckellogg <[email protected]>
AuthorDate: Thu Mar 29 12:09:02 2018 -0700
Provude v2 support for admin non persistent topics. (#1463)
---
.../admin/internal/NonPersistentTopicsImpl.java | 52 +++++++++++++++-------
1 file changed, 35 insertions(+), 17 deletions(-)
diff --git
a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/NonPersistentTopicsImpl.java
b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/NonPersistentTopicsImpl.java
index d71a4de..ccd712a 100644
---
a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/NonPersistentTopicsImpl.java
+++
b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/NonPersistentTopicsImpl.java
@@ -27,7 +27,6 @@ import java.util.concurrent.ExecutionException;
import javax.ws.rs.client.Entity;
import javax.ws.rs.client.InvocationCallback;
import javax.ws.rs.client.WebTarget;
-import javax.ws.rs.core.GenericType;
import javax.ws.rs.core.MediaType;
import org.apache.pulsar.client.admin.NonPersistentTopics;
@@ -42,10 +41,12 @@ import
org.apache.pulsar.common.policies.data.PersistentTopicInternalStats;
public class NonPersistentTopicsImpl extends BaseResource implements
NonPersistentTopics {
private final WebTarget adminNonPersistentTopics;
+ private final WebTarget adminV2NonPersistentTopics;
public NonPersistentTopicsImpl(WebTarget web, Authentication auth) {
super(auth);
adminNonPersistentTopics = web.path("/admin/non-persistent");
+ adminV2NonPersistentTopics = web.path("/admin/v2/non-persistent");
}
@Override
@@ -63,10 +64,9 @@ public class NonPersistentTopicsImpl extends BaseResource
implements NonPersiste
@Override
public CompletableFuture<Void> createPartitionedTopicAsync(String topic,
int numPartitions) {
checkArgument(numPartitions > 1, "Number of partitions should be more
than 1");
- TopicName ds = validateTopic(topic);
- return asyncPutRequest(
-
adminNonPersistentTopics.path(ds.getNamespace()).path(ds.getEncodedLocalName()).path("partitions"),
- Entity.entity(numPartitions, MediaType.APPLICATION_JSON));
+ TopicName topicName = validateTopic(topic);
+ WebTarget path = topicPath(topicName, "partitions");
+ return asyncPutRequest(path, Entity.entity(numPartitions,
MediaType.APPLICATION_JSON));
}
@Override
@@ -83,9 +83,10 @@ public class NonPersistentTopicsImpl extends BaseResource
implements NonPersiste
@Override
public CompletableFuture<PartitionedTopicMetadata>
getPartitionedTopicMetadataAsync(String topic) {
- TopicName ds = validateTopic(topic);
+ TopicName topicName = validateTopic(topic);
final CompletableFuture<PartitionedTopicMetadata> future = new
CompletableFuture<>();
-
asyncGetRequest(adminNonPersistentTopics.path(ds.getNamespace()).path(ds.getEncodedLocalName()).path("partitions"),
+ WebTarget path = topicPath(topicName, "partitions");
+ asyncGetRequest(path,
new InvocationCallback<PartitionedTopicMetadata>() {
@Override
@@ -115,9 +116,10 @@ public class NonPersistentTopicsImpl extends BaseResource
implements NonPersiste
@Override
public CompletableFuture<NonPersistentTopicStats> getStatsAsync(String
topic) {
- TopicName ds = validateTopic(topic);
+ TopicName topicName = validateTopic(topic);
final CompletableFuture<NonPersistentTopicStats> future = new
CompletableFuture<>();
-
asyncGetRequest(adminNonPersistentTopics.path(ds.getNamespace()).path(ds.getEncodedLocalName()).path("stats"),
+ WebTarget path = topicPath(topicName, "stats");
+ asyncGetRequest(path,
new InvocationCallback<NonPersistentTopicStats>() {
@Override
@@ -147,9 +149,10 @@ public class NonPersistentTopicsImpl extends BaseResource
implements NonPersiste
@Override
public CompletableFuture<PersistentTopicInternalStats>
getInternalStatsAsync(String topic) {
- TopicName ds = validateTopic(topic);
+ TopicName topicName = validateTopic(topic);
final CompletableFuture<PersistentTopicInternalStats> future = new
CompletableFuture<>();
-
asyncGetRequest(adminNonPersistentTopics.path(ds.getNamespace()).path(ds.getEncodedLocalName()).path("internalStats"),
+ WebTarget path = topicPath(topicName, "internalStats");
+ asyncGetRequest(path,
new InvocationCallback<PersistentTopicInternalStats>() {
@Override
@@ -179,9 +182,9 @@ public class NonPersistentTopicsImpl extends BaseResource
implements NonPersiste
@Override
public CompletableFuture<Void> unloadAsync(String topic) {
- TopicName ds = validateTopic(topic);
- return
asyncPutRequest(adminNonPersistentTopics.path(ds.getNamespace()).path(ds.getEncodedLocalName()).path("unload"),
- Entity.entity("", MediaType.APPLICATION_JSON));
+ TopicName topicName = validateTopic(topic);
+ WebTarget path = topicPath(topicName, "unload");
+ return asyncPutRequest(path, Entity.entity("",
MediaType.APPLICATION_JSON));
}
@Override
@@ -200,8 +203,9 @@ public class NonPersistentTopicsImpl extends BaseResource
implements NonPersiste
public CompletableFuture<List<String>> getListInBundleAsync(String
namespace, String bundleRange) {
NamespaceName ns = NamespaceName.get(namespace);
final CompletableFuture<List<String>> future = new
CompletableFuture<>();
-
asyncGetRequest(adminNonPersistentTopics.path(ns.getProperty()).path(ns.getCluster()).path(ns.getLocalName())
- .path(bundleRange), new InvocationCallback<List<String>>() {
+ WebTarget path = namespacePath(ns, bundleRange);
+ asyncGetRequest(path,
+ new InvocationCallback<List<String>>() {
@Override
public void completed(List<String> response) {
future.complete(response);
@@ -230,7 +234,8 @@ public class NonPersistentTopicsImpl extends BaseResource
implements NonPersiste
public CompletableFuture<List<String>> getListAsync(String namespace) {
NamespaceName ns = NamespaceName.get(namespace);
final CompletableFuture<List<String>> future = new
CompletableFuture<>();
-
asyncGetRequest(adminNonPersistentTopics.path(ns.getProperty()).path(ns.getCluster()).path(ns.getLocalName()),
+ WebTarget path = namespacePath(ns);
+ asyncGetRequest(path,
new InvocationCallback<List<String>>() {
@Override
public void completed(List<String> response) {
@@ -253,4 +258,17 @@ public class NonPersistentTopicsImpl extends BaseResource
implements NonPersiste
return TopicName.get(topic);
}
+ private WebTarget namespacePath(NamespaceName namespace, String... parts) {
+ final WebTarget base = namespace.isV2() ? adminV2NonPersistentTopics :
adminNonPersistentTopics;
+ WebTarget namespacePath = base.path(namespace.toString());
+ namespacePath = WebTargets.addParts(namespacePath, parts);
+ return namespacePath;
+ }
+
+ private WebTarget topicPath(TopicName topic, String... parts) {
+ final WebTarget base = topic.isV2() ? adminV2NonPersistentTopics :
adminNonPersistentTopics;
+ WebTarget topicPath = base.path(topic.getRestPath());
+ topicPath = WebTargets.addParts(topicPath, parts);
+ return topicPath;
+ }
}
--
To stop receiving notification emails like this one, please contact
[email protected].