This is an automated email from the ASF dual-hosted git repository.

penghui pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 5d06dae  [pulsar-admin] Add remove-subscription-types-enabled command 
for topic (#12983)
5d06dae is described below

commit 5d06dae578bccb81521e5d2662753aa84af16659
Author: Ruguo Yu <[email protected]>
AuthorDate: Thu Dec 2 14:01:52 2021 +0800

    [pulsar-admin] Add remove-subscription-types-enabled command for topic 
(#12983)
---
 .../broker/admin/impl/PersistentTopicsBase.java    | 11 ++++++++
 .../pulsar/broker/admin/v2/PersistentTopics.java   | 30 ++++++++++++++++++++++
 .../pulsar/broker/admin/TopicPoliciesTest.java     |  6 +++++
 .../org/apache/pulsar/client/admin/Topics.java     | 17 ++++++++++++
 .../pulsar/client/admin/internal/TopicsImpl.java   | 22 ++++++++++++++++
 .../pulsar/admin/cli/PulsarAdminToolTest.java      |  3 +++
 .../org/apache/pulsar/admin/cli/CmdTopics.java     | 13 ++++++++++
 7 files changed, 102 insertions(+)

diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
index 58e30da..c221f67 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
@@ -4237,6 +4237,17 @@ public class PersistentTopicsBase extends AdminResource {
             });
     }
 
+    protected CompletableFuture<Void> internalRemoveSubscriptionTypesEnabled() 
{
+        return getTopicPoliciesAsyncWithRetry(topicName)
+                .thenCompose(op -> {
+                    if (!op.isPresent()) {
+                        return CompletableFuture.completedFuture(null);
+                    }
+                    op.get().setSubscriptionTypesEnabled(Lists.newArrayList());
+                    return 
pulsar().getTopicPoliciesService().updateTopicPoliciesAsync(topicName, 
op.get());
+                });
+    }
+
     protected CompletableFuture<Void> internalRemovePublishRate() {
         return getTopicPoliciesAsyncWithRetry(topicName)
             .thenCompose(op -> {
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java
index a804e7d..344c9fa 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java
@@ -3271,6 +3271,36 @@ public class PersistentTopics extends 
PersistentTopicsBase {
             });
     }
 
+    @DELETE
+    @Path("/{tenant}/{namespace}/{topic}/subscriptionTypesEnabled")
+    @ApiOperation(value = "Remove subscription types enabled for specified 
topic.")
+    @ApiResponses(value = {@ApiResponse(code = 403, message = "Don't have 
admin permission"),
+            @ApiResponse(code = 404, message = "Topic does not exist"),
+            @ApiResponse(code = 405,
+                    message = "Topic level policy is disabled, to enable the 
topic level policy and retry"),
+            @ApiResponse(code = 409, message = "Concurrent modification")})
+    public void removeSubscriptionTypesEnabled(@Suspended final AsyncResponse 
asyncResponse,
+            @PathParam("tenant") String tenant,
+            @PathParam("namespace") String namespace,
+            @PathParam("topic") @Encoded String encodedTopic,
+            @ApiParam(value = "Is authentication required to perform this 
operation")
+            @QueryParam("authoritative") @DefaultValue("false") boolean 
authoritative) {
+        validateTopicName(tenant, namespace, encodedTopic);
+        preValidation(authoritative)
+                .thenCompose(__ -> internalRemoveSubscriptionTypesEnabled())
+                .thenRun(() -> {
+                    log.info("[{}] Successfully remove subscription types 
enabled: namespace={}, topic={}",
+                            clientAppId(),
+                            namespaceName,
+                            topicName.getLocalName());
+                    asyncResponse.resume(Response.noContent().build());
+                })
+                .exceptionally(ex -> {
+                    
handleTopicPolicyException("removeSubscriptionTypesEnabled", ex, asyncResponse);
+                    return null;
+                });
+    }
+
     @GET
     @Path("/{tenant}/{namespace}/{topic}/subscribeRate")
     @ApiOperation(value = "Get subscribe rate configuration for specified 
topic.")
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TopicPoliciesTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TopicPoliciesTest.java
index 1df0756..4acab8e 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TopicPoliciesTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TopicPoliciesTest.java
@@ -2404,6 +2404,12 @@ public class TopicPoliciesTest extends 
MockedPulsarServiceBaseTest {
             //restore
             
pulsar.getConfiguration().getSubscriptionTypesEnabled().addAll(old);
         }
+
+        Awaitility.await().untilAsserted(() ->
+                
assertFalse(admin.topics().getSubscriptionTypesEnabled(topic).isEmpty()));
+        admin.topics().removeSubscriptionTypesEnabled(topic);
+        Awaitility.await().untilAsserted(() ->
+                
assertTrue(admin.topics().getSubscriptionTypesEnabled(topic).isEmpty()));
     }
 
     @Test(timeOut = 30000)
diff --git 
a/pulsar-client-admin-api/src/main/java/org/apache/pulsar/client/admin/Topics.java
 
b/pulsar-client-admin-api/src/main/java/org/apache/pulsar/client/admin/Topics.java
index 8efd371..549bfe1 100644
--- 
a/pulsar-client-admin-api/src/main/java/org/apache/pulsar/client/admin/Topics.java
+++ 
b/pulsar-client-admin-api/src/main/java/org/apache/pulsar/client/admin/Topics.java
@@ -3427,6 +3427,23 @@ public interface Topics {
     CompletableFuture<Set<SubscriptionType>> 
getSubscriptionTypesEnabledAsync(String topic);
 
     /**
+     * Remove subscription types enabled for a topic.
+     *
+     * @param topic Topic name
+     * @throws PulsarAdminException Unexpected error
+     */
+    @Deprecated
+    void removeSubscriptionTypesEnabled(String topic) throws 
PulsarAdminException;
+
+    /**
+     * Remove subscription types enabled for a topic asynchronously.
+     *
+     * @param topic Topic name
+     */
+    @Deprecated
+    CompletableFuture<Void> removeSubscriptionTypesEnabledAsync(String topic);
+
+    /**
      * Set topic-subscribe-rate (topic will limit by subscribeRate).
      *
      * @param topic
diff --git 
a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/TopicsImpl.java
 
b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/TopicsImpl.java
index a41d65f..7be23b9 100644
--- 
a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/TopicsImpl.java
+++ 
b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/TopicsImpl.java
@@ -3634,6 +3634,28 @@ public class TopicsImpl extends BaseResource implements 
Topics {
     }
 
     @Override
+    public void removeSubscriptionTypesEnabled(String topic) throws 
PulsarAdminException {
+        try {
+            removeSubscriptionTypesEnabledAsync(topic)
+                    .get(this.readTimeoutMs, TimeUnit.MILLISECONDS);
+        } catch (ExecutionException e) {
+            throw (PulsarAdminException) e.getCause();
+        } catch (InterruptedException e) {
+            Thread.currentThread().interrupt();
+            throw new PulsarAdminException(e);
+        } catch (TimeoutException e) {
+            throw new PulsarAdminException.TimeoutException(e);
+        }
+    }
+
+    @Override
+    public CompletableFuture<Void> removeSubscriptionTypesEnabledAsync(String 
topic) {
+        TopicName topicName = validateTopic(topic);
+        WebTarget path = topicPath(topicName, "subscriptionTypesEnabled");
+        return asyncDeleteRequest(path);
+    }
+
+    @Override
     public DispatchRate getReplicatorDispatchRate(String topic) throws 
PulsarAdminException {
         return getReplicatorDispatchRate(topic, false);
     }
diff --git 
a/pulsar-client-tools-test/src/test/java/org/apache/pulsar/admin/cli/PulsarAdminToolTest.java
 
b/pulsar-client-tools-test/src/test/java/org/apache/pulsar/admin/cli/PulsarAdminToolTest.java
index 3b8387e..545d08b 100644
--- 
a/pulsar-client-tools-test/src/test/java/org/apache/pulsar/admin/cli/PulsarAdminToolTest.java
+++ 
b/pulsar-client-tools-test/src/test/java/org/apache/pulsar/admin/cli/PulsarAdminToolTest.java
@@ -1066,6 +1066,9 @@ public class PulsarAdminToolTest {
         cmdTopics.run(split("get-subscription-types-enabled 
persistent://myprop/clust/ns1/ds1"));
         
verify(mockTopics).getSubscriptionTypesEnabled("persistent://myprop/clust/ns1/ds1");
 
+        cmdTopics.run(split("remove-subscription-types-enabled 
persistent://myprop/clust/ns1/ds1"));
+        
verify(mockTopics).removeSubscriptionTypesEnabled("persistent://myprop/clust/ns1/ds1");
+
         cmdTopics.run(split("set-replicator-dispatch-rate 
persistent://myprop/clust/ns1/ds1 -md 10 -bd 11 -dt 12"));
         
verify(mockTopics).setReplicatorDispatchRate("persistent://myprop/clust/ns1/ds1",
                 DispatchRate.builder()
diff --git 
a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdTopics.java 
b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdTopics.java
index 2807249..15da733 100644
--- 
a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdTopics.java
+++ 
b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdTopics.java
@@ -198,6 +198,7 @@ public class CmdTopics extends CmdBase {
 
         jcommander.addCommand("set-subscription-types-enabled", new 
SetSubscriptionTypesEnabled());
         jcommander.addCommand("get-subscription-types-enabled", new 
GetSubscriptionTypesEnabled());
+        jcommander.addCommand("remove-subscription-types-enabled", new 
RemoveSubscriptionTypesEnabled());
 
         //deprecated commands
         jcommander.addCommand("get-maxProducers", new GetMaxProducers());
@@ -1994,6 +1995,18 @@ public class CmdTopics extends CmdBase {
         }
     }
 
+    @Parameters(commandDescription = "Remove subscription types enabled for a 
topic")
+    private class RemoveSubscriptionTypesEnabled extends CliCommand {
+        @Parameter(description = "persistent://tenant/namespace/topic", 
required = true)
+        private java.util.List<String> params;
+
+        @Override
+        void run() throws PulsarAdminException {
+            String persistentTopic = validatePersistentTopic(params);
+            getTopics().removeSubscriptionTypesEnabled(persistentTopic);
+        }
+    }
+
     @Parameters(commandDescription = "Get compaction threshold for a topic")
     private class GetCompactionThreshold extends CliCommand {
         @Parameter(description = "persistent://tenant/namespace/topic", 
required = true)

Reply via email to