This is an automated email from the ASF dual-hosted git repository.
penghui pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 5d06dae [pulsar-admin] Add remove-subscription-types-enabled command
for topic (#12983)
5d06dae is described below
commit 5d06dae578bccb81521e5d2662753aa84af16659
Author: Ruguo Yu <[email protected]>
AuthorDate: Thu Dec 2 14:01:52 2021 +0800
[pulsar-admin] Add remove-subscription-types-enabled command for topic
(#12983)
---
.../broker/admin/impl/PersistentTopicsBase.java | 11 ++++++++
.../pulsar/broker/admin/v2/PersistentTopics.java | 30 ++++++++++++++++++++++
.../pulsar/broker/admin/TopicPoliciesTest.java | 6 +++++
.../org/apache/pulsar/client/admin/Topics.java | 17 ++++++++++++
.../pulsar/client/admin/internal/TopicsImpl.java | 22 ++++++++++++++++
.../pulsar/admin/cli/PulsarAdminToolTest.java | 3 +++
.../org/apache/pulsar/admin/cli/CmdTopics.java | 13 ++++++++++
7 files changed, 102 insertions(+)
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
index 58e30da..c221f67 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
@@ -4237,6 +4237,17 @@ public class PersistentTopicsBase extends AdminResource {
});
}
+ protected CompletableFuture<Void> internalRemoveSubscriptionTypesEnabled()
{
+ return getTopicPoliciesAsyncWithRetry(topicName)
+ .thenCompose(op -> {
+ if (!op.isPresent()) {
+ return CompletableFuture.completedFuture(null);
+ }
+ op.get().setSubscriptionTypesEnabled(Lists.newArrayList());
+ return
pulsar().getTopicPoliciesService().updateTopicPoliciesAsync(topicName,
op.get());
+ });
+ }
+
protected CompletableFuture<Void> internalRemovePublishRate() {
return getTopicPoliciesAsyncWithRetry(topicName)
.thenCompose(op -> {
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java
index a804e7d..344c9fa 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java
@@ -3271,6 +3271,36 @@ public class PersistentTopics extends
PersistentTopicsBase {
});
}
+ @DELETE
+ @Path("/{tenant}/{namespace}/{topic}/subscriptionTypesEnabled")
+ @ApiOperation(value = "Remove subscription types enabled for specified
topic.")
+ @ApiResponses(value = {@ApiResponse(code = 403, message = "Don't have
admin permission"),
+ @ApiResponse(code = 404, message = "Topic does not exist"),
+ @ApiResponse(code = 405,
+ message = "Topic level policy is disabled, to enable the
topic level policy and retry"),
+ @ApiResponse(code = 409, message = "Concurrent modification")})
+ public void removeSubscriptionTypesEnabled(@Suspended final AsyncResponse
asyncResponse,
+ @PathParam("tenant") String tenant,
+ @PathParam("namespace") String namespace,
+ @PathParam("topic") @Encoded String encodedTopic,
+ @ApiParam(value = "Is authentication required to perform this
operation")
+ @QueryParam("authoritative") @DefaultValue("false") boolean
authoritative) {
+ validateTopicName(tenant, namespace, encodedTopic);
+ preValidation(authoritative)
+ .thenCompose(__ -> internalRemoveSubscriptionTypesEnabled())
+ .thenRun(() -> {
+ log.info("[{}] Successfully remove subscription types
enabled: namespace={}, topic={}",
+ clientAppId(),
+ namespaceName,
+ topicName.getLocalName());
+ asyncResponse.resume(Response.noContent().build());
+ })
+ .exceptionally(ex -> {
+
handleTopicPolicyException("removeSubscriptionTypesEnabled", ex, asyncResponse);
+ return null;
+ });
+ }
+
@GET
@Path("/{tenant}/{namespace}/{topic}/subscribeRate")
@ApiOperation(value = "Get subscribe rate configuration for specified
topic.")
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TopicPoliciesTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TopicPoliciesTest.java
index 1df0756..4acab8e 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TopicPoliciesTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TopicPoliciesTest.java
@@ -2404,6 +2404,12 @@ public class TopicPoliciesTest extends
MockedPulsarServiceBaseTest {
//restore
pulsar.getConfiguration().getSubscriptionTypesEnabled().addAll(old);
}
+
+ Awaitility.await().untilAsserted(() ->
+
assertFalse(admin.topics().getSubscriptionTypesEnabled(topic).isEmpty()));
+ admin.topics().removeSubscriptionTypesEnabled(topic);
+ Awaitility.await().untilAsserted(() ->
+
assertTrue(admin.topics().getSubscriptionTypesEnabled(topic).isEmpty()));
}
@Test(timeOut = 30000)
diff --git
a/pulsar-client-admin-api/src/main/java/org/apache/pulsar/client/admin/Topics.java
b/pulsar-client-admin-api/src/main/java/org/apache/pulsar/client/admin/Topics.java
index 8efd371..549bfe1 100644
---
a/pulsar-client-admin-api/src/main/java/org/apache/pulsar/client/admin/Topics.java
+++
b/pulsar-client-admin-api/src/main/java/org/apache/pulsar/client/admin/Topics.java
@@ -3427,6 +3427,23 @@ public interface Topics {
CompletableFuture<Set<SubscriptionType>>
getSubscriptionTypesEnabledAsync(String topic);
/**
+ * Remove subscription types enabled for a topic.
+ *
+ * @param topic Topic name
+ * @throws PulsarAdminException Unexpected error
+ */
+ @Deprecated
+ void removeSubscriptionTypesEnabled(String topic) throws
PulsarAdminException;
+
+ /**
+ * Remove subscription types enabled for a topic asynchronously.
+ *
+ * @param topic Topic name
+ */
+ @Deprecated
+ CompletableFuture<Void> removeSubscriptionTypesEnabledAsync(String topic);
+
+ /**
* Set topic-subscribe-rate (topic will limit by subscribeRate).
*
* @param topic
diff --git
a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/TopicsImpl.java
b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/TopicsImpl.java
index a41d65f..7be23b9 100644
---
a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/TopicsImpl.java
+++
b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/TopicsImpl.java
@@ -3634,6 +3634,28 @@ public class TopicsImpl extends BaseResource implements
Topics {
}
@Override
+ public void removeSubscriptionTypesEnabled(String topic) throws
PulsarAdminException {
+ try {
+ removeSubscriptionTypesEnabledAsync(topic)
+ .get(this.readTimeoutMs, TimeUnit.MILLISECONDS);
+ } catch (ExecutionException e) {
+ throw (PulsarAdminException) e.getCause();
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ throw new PulsarAdminException(e);
+ } catch (TimeoutException e) {
+ throw new PulsarAdminException.TimeoutException(e);
+ }
+ }
+
+ @Override
+ public CompletableFuture<Void> removeSubscriptionTypesEnabledAsync(String
topic) {
+ TopicName topicName = validateTopic(topic);
+ WebTarget path = topicPath(topicName, "subscriptionTypesEnabled");
+ return asyncDeleteRequest(path);
+ }
+
+ @Override
public DispatchRate getReplicatorDispatchRate(String topic) throws
PulsarAdminException {
return getReplicatorDispatchRate(topic, false);
}
diff --git
a/pulsar-client-tools-test/src/test/java/org/apache/pulsar/admin/cli/PulsarAdminToolTest.java
b/pulsar-client-tools-test/src/test/java/org/apache/pulsar/admin/cli/PulsarAdminToolTest.java
index 3b8387e..545d08b 100644
---
a/pulsar-client-tools-test/src/test/java/org/apache/pulsar/admin/cli/PulsarAdminToolTest.java
+++
b/pulsar-client-tools-test/src/test/java/org/apache/pulsar/admin/cli/PulsarAdminToolTest.java
@@ -1066,6 +1066,9 @@ public class PulsarAdminToolTest {
cmdTopics.run(split("get-subscription-types-enabled
persistent://myprop/clust/ns1/ds1"));
verify(mockTopics).getSubscriptionTypesEnabled("persistent://myprop/clust/ns1/ds1");
+ cmdTopics.run(split("remove-subscription-types-enabled
persistent://myprop/clust/ns1/ds1"));
+
verify(mockTopics).removeSubscriptionTypesEnabled("persistent://myprop/clust/ns1/ds1");
+
cmdTopics.run(split("set-replicator-dispatch-rate
persistent://myprop/clust/ns1/ds1 -md 10 -bd 11 -dt 12"));
verify(mockTopics).setReplicatorDispatchRate("persistent://myprop/clust/ns1/ds1",
DispatchRate.builder()
diff --git
a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdTopics.java
b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdTopics.java
index 2807249..15da733 100644
---
a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdTopics.java
+++
b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdTopics.java
@@ -198,6 +198,7 @@ public class CmdTopics extends CmdBase {
jcommander.addCommand("set-subscription-types-enabled", new
SetSubscriptionTypesEnabled());
jcommander.addCommand("get-subscription-types-enabled", new
GetSubscriptionTypesEnabled());
+ jcommander.addCommand("remove-subscription-types-enabled", new
RemoveSubscriptionTypesEnabled());
//deprecated commands
jcommander.addCommand("get-maxProducers", new GetMaxProducers());
@@ -1994,6 +1995,18 @@ public class CmdTopics extends CmdBase {
}
}
+ @Parameters(commandDescription = "Remove subscription types enabled for a
topic")
+ private class RemoveSubscriptionTypesEnabled extends CliCommand {
+ @Parameter(description = "persistent://tenant/namespace/topic",
required = true)
+ private java.util.List<String> params;
+
+ @Override
+ void run() throws PulsarAdminException {
+ String persistentTopic = validatePersistentTopic(params);
+ getTopics().removeSubscriptionTypesEnabled(persistentTopic);
+ }
+ }
+
@Parameters(commandDescription = "Get compaction threshold for a topic")
private class GetCompactionThreshold extends CliCommand {
@Parameter(description = "persistent://tenant/namespace/topic",
required = true)