This is an automated email from the ASF dual-hosted git repository.

penghui pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 4709f3aea Fix can not disable and remove max consumer per subscription 
(#10070)
4709f3aea is described below

commit 4709f3aeaed1bc6a68a9a683c95e0398c940cd54
Author: feynmanlin <[email protected]>
AuthorDate: Wed Mar 31 22:03:42 2021 +0800

    Fix can not disable and remove max consumer per subscription (#10070)
    
    ### Motivation
    1)Now, we cannot disable `MaxConsumersPerSubscription` in any level of 
Policy
    2) The Namespace level MaxConsumersPerSubscription cannot be deleted as 
long as it is set
    3)The default value of the namespace level is incorrect, and the broker 
level data will be returned
---
 .../apache/pulsar/broker/admin/AdminResource.java  |  5 --
 .../pulsar/broker/admin/impl/NamespacesBase.java   |  6 +-
 .../apache/pulsar/broker/admin/v2/Namespaces.java  | 15 ++++-
 .../broker/service/AbstractBaseDispatcher.java     |  4 +-
 .../apache/pulsar/broker/admin/AdminApiTest2.java  | 15 +++++
 .../pulsar/broker/admin/TopicPoliciesTest.java     | 66 ++++++++++++++++++++++
 .../org/apache/pulsar/client/admin/Namespaces.java | 16 +++++-
 .../client/admin/internal/NamespacesImpl.java      | 26 ++++++++-
 .../pulsar/admin/cli/PulsarAdminToolTest.java      |  3 +
 .../org/apache/pulsar/admin/cli/CmdNamespaces.java | 13 +++++
 .../pulsar/common/policies/data/Policies.java      |  6 +-
 11 files changed, 160 insertions(+), 15 deletions(-)

diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/AdminResource.java 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/AdminResource.java
index 063120b..7170f0f 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/AdminResource.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/AdminResource.java
@@ -339,9 +339,6 @@ public abstract class AdminResource extends 
PulsarWebResource {
             BundlesData bundleData = 
NamespaceBundleFactory.getBundlesData(bundles);
             policies.bundles = bundleData != null ? bundleData : 
policies.bundles;
 
-            // hydrate the namespace polices
-            mergeNamespaceWithDefaults(policies, namespace, policyPath);
-
             return policies;
         } catch (RestException re) {
             throw re;
@@ -371,8 +368,6 @@ public abstract class AdminResource extends 
PulsarWebResource {
                         return FutureUtil.failedFuture(new RestException(e));
                     }
                     policies.get().bundles = bundleData != null ? bundleData : 
policies.get().bundles;
-                    // hydrate the namespace polices
-                    mergeNamespaceWithDefaults(policies.get(), namespace, 
policyPath);
                     return CompletableFuture.completedFuture(policies.get());
                 });
             } else {
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java
index 7f582ea..0b53838 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java
@@ -2089,17 +2089,17 @@ public abstract class NamespacesBase extends 
AdminResource {
         }
     }
 
-    protected int internalGetMaxConsumersPerSubscription() {
+    protected Integer internalGetMaxConsumersPerSubscription() {
         validateNamespacePolicyOperation(namespaceName, 
PolicyName.MAX_CONSUMERS, PolicyOperation.READ);
         return 
getNamespacePolicies(namespaceName).max_consumers_per_subscription;
     }
 
-    protected void internalSetMaxConsumersPerSubscription(int 
maxConsumersPerSubscription) {
+    protected void internalSetMaxConsumersPerSubscription(Integer 
maxConsumersPerSubscription) {
         validateNamespacePolicyOperation(namespaceName, 
PolicyName.MAX_CONSUMERS, PolicyOperation.WRITE);
         validatePoliciesReadOnlyAccess();
 
         try {
-            if (maxConsumersPerSubscription < 0) {
+            if (maxConsumersPerSubscription != null && 
maxConsumersPerSubscription < 0) {
                 throw new RestException(Status.PRECONDITION_FAILED,
                         "maxConsumersPerSubscription must be 0 or more");
             }
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/Namespaces.java 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/Namespaces.java
index 72fc840..9afcb24 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/Namespaces.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/Namespaces.java
@@ -1138,7 +1138,7 @@ public class Namespaces extends NamespacesBase {
     @ApiOperation(value = "Get maxConsumersPerSubscription config on a 
namespace.")
     @ApiResponses(value = { @ApiResponse(code = 403, message = "Don't have 
admin permission"),
             @ApiResponse(code = 404, message = "Namespace does not exist") })
-    public int getMaxConsumersPerSubscription(@PathParam("tenant") String 
tenant,
+    public Integer getMaxConsumersPerSubscription(@PathParam("tenant") String 
tenant,
             @PathParam("namespace") String namespace) {
         validateNamespaceName(tenant, namespace);
         return internalGetMaxConsumersPerSubscription();
@@ -1160,6 +1160,19 @@ public class Namespaces extends NamespacesBase {
         internalSetMaxConsumersPerSubscription(maxConsumersPerSubscription);
     }
 
+    @DELETE
+    @Path("/{tenant}/{namespace}/maxConsumersPerSubscription")
+    @ApiOperation(value = " Set maxConsumersPerSubscription configuration on a 
namespace.")
+    @ApiResponses(value = {@ApiResponse(code = 403, message = "Don't have 
admin permission"),
+            @ApiResponse(code = 404, message = "Namespace does not exist"),
+            @ApiResponse(code = 409, message = "Concurrent modification"),
+            @ApiResponse(code = 412, message = "maxConsumersPerSubscription 
value is not valid")})
+    public void removeMaxConsumersPerSubscription(@PathParam("tenant") String 
tenant,
+                                               @PathParam("namespace") String 
namespace) {
+        validateNamespaceName(tenant, namespace);
+        internalSetMaxConsumersPerSubscription(null);
+    }
+
     @GET
     @Path("/{tenant}/{namespace}/maxUnackedMessagesPerConsumer")
     @ApiOperation(value = "Get maxUnackedMessagesPerConsumer config on a 
namespace.")
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractBaseDispatcher.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractBaseDispatcher.java
index d57fc7b..7610677 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractBaseDispatcher.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractBaseDispatcher.java
@@ -179,7 +179,9 @@ public abstract class AbstractBaseDispatcher implements 
Dispatcher {
         }
 
         if (maxConsumersPerSubscription == null) {
-            maxConsumersPerSubscription = policies != null && 
policies.max_consumers_per_subscription > 0
+            maxConsumersPerSubscription = policies != null
+                    && policies.max_consumers_per_subscription != null
+                    && policies.max_consumers_per_subscription >= 0
                     ? policies.max_consumers_per_subscription :
                     
brokerService.pulsar().getConfiguration().getMaxConsumersPerSubscription();
         }
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiTest2.java 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiTest2.java
index 7087c1a..cca01e0 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiTest2.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiTest2.java
@@ -609,6 +609,21 @@ public class AdminApiTest2 extends 
MockedPulsarServiceBaseTest {
         producer.close();
     }
 
+
+    @Test(timeOut = 20000)
+    public void testMaxConsumersOnSubApi() throws Exception {
+        final String namespace = "prop-xyz/ns1";
+        
assertNull(admin.namespaces().getMaxConsumersPerSubscription(namespace));
+        admin.namespaces().setMaxConsumersPerSubscription(namespace, 10);
+        Awaitility.await().untilAsserted(() -> {
+            
assertNotNull(admin.namespaces().getMaxConsumersPerSubscription(namespace));
+            
assertEquals(admin.namespaces().getMaxConsumersPerSubscription(namespace).intValue(),
 10);
+        });
+        admin.namespaces().removeMaxConsumersPerSubscription(namespace);
+        Awaitility.await().untilAsserted(() ->
+                admin.namespaces().getMaxConsumersPerSubscription(namespace));
+    }
+
     /**
      * It verifies that pulsar with different load-manager generates different 
load-report and returned by admin-api
      *
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TopicPoliciesTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TopicPoliciesTest.java
index 35527e1..023e604 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TopicPoliciesTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TopicPoliciesTest.java
@@ -1381,6 +1381,72 @@ public class TopicPoliciesTest extends 
MockedPulsarServiceBaseTest {
         assertEquals(admin.topics().getSubscribeRate(topic, true), 
brokerPolicy);
     }
 
+    @Test(timeOut = 30000)
+    public void testPriorityAndDisableMaxConsumersOnSub() throws Exception {
+        final String topic = testTopic + UUID.randomUUID();
+        int maxConsumerInBroker = 1;
+        int maxConsumerInNs = 2;
+        int maxConsumerInTopic = 4;
+        String mySub = "my-sub";
+        conf.setMaxConsumersPerSubscription(maxConsumerInBroker);
+        pulsarClient.newProducer().topic(topic).create().close();
+        Awaitility.await().until(() ->
+                
pulsar.getTopicPoliciesService().cacheIsInitialized(TopicName.get(topic)));
+        List<Consumer<String>> consumerList = new ArrayList<>();
+        ConsumerBuilder<String> builder = 
pulsarClient.newConsumer(Schema.STRING)
+                .subscriptionType(SubscriptionType.Shared)
+                .topic(topic).subscriptionName(mySub);
+        consumerList.add(builder.subscribe());
+        try {
+            builder.subscribe();
+            fail("should fail");
+        } catch (PulsarClientException ignored) {
+        }
+
+        admin.namespaces().setMaxConsumersPerSubscription(myNamespace, 
maxConsumerInNs);
+        Awaitility.await().untilAsserted(() ->
+                
assertNotNull(admin.namespaces().getMaxConsumersPerSubscription(myNamespace)));
+        consumerList.add(builder.subscribe());
+        try {
+            builder.subscribe();
+            fail("should fail");
+        } catch (PulsarClientException ignored) {
+        }
+        //disabled
+        admin.namespaces().setMaxConsumersPerSubscription(myNamespace, 0);
+        Awaitility.await().untilAsserted(() ->
+                
assertEquals(admin.namespaces().getMaxConsumersPerSubscription(myNamespace).intValue(),
 0));
+        consumerList.add(builder.subscribe());
+        //set topic-level
+        admin.topics().setMaxConsumersPerSubscription(topic, 
maxConsumerInTopic);
+        Awaitility.await().untilAsserted(() ->
+                
assertNotNull(admin.topics().getMaxConsumersPerSubscription(topic)));
+        consumerList.add(builder.subscribe());
+        try {
+            builder.subscribe();
+            fail("should fail");
+        } catch (PulsarClientException ignored) {
+        }
+        //remove topic policies
+        admin.topics().removeMaxConsumersPerSubscription(topic);
+        Awaitility.await().untilAsserted(() ->
+                
assertNull(admin.topics().getMaxConsumersPerSubscription(topic)));
+        consumerList.add(builder.subscribe());
+        //remove namespace policies, then use broker-level
+        admin.namespaces().removeMaxConsumersPerSubscription(myNamespace);
+        Awaitility.await().untilAsserted(() ->
+                
assertNull(admin.namespaces().getMaxConsumersPerSubscription(myNamespace)));
+        try {
+            builder.subscribe();
+            fail("should fail");
+        } catch (PulsarClientException ignored) {
+        }
+
+        for (Consumer<String> consumer : consumerList) {
+            consumer.close();
+        }
+    }
+
     @Test
     public void testRemoveSubscribeRate() throws Exception {
         admin.topics().createPartitionedTopic(persistenceTopic, 2);
diff --git 
a/pulsar-client-admin-api/src/main/java/org/apache/pulsar/client/admin/Namespaces.java
 
b/pulsar-client-admin-api/src/main/java/org/apache/pulsar/client/admin/Namespaces.java
index 3c61ce8..703269a 100644
--- 
a/pulsar-client-admin-api/src/main/java/org/apache/pulsar/client/admin/Namespaces.java
+++ 
b/pulsar-client-admin-api/src/main/java/org/apache/pulsar/client/admin/Namespaces.java
@@ -2903,7 +2903,7 @@ public interface Namespaces {
      * @throws PulsarAdminException
      *             Unexpected error
      */
-    int getMaxConsumersPerSubscription(String namespace) throws 
PulsarAdminException;
+    Integer getMaxConsumersPerSubscription(String namespace) throws 
PulsarAdminException;
 
     /**
      * Get the maxConsumersPerSubscription for a namespace asynchronously.
@@ -2959,6 +2959,20 @@ public interface Namespaces {
     CompletableFuture<Void> setMaxConsumersPerSubscriptionAsync(String 
namespace, int maxConsumersPerSubscription);
 
     /**
+     * Remove maxConsumersPerSubscription for a namespace.
+     * @param namespace
+     * @throws PulsarAdminException
+     */
+    void removeMaxConsumersPerSubscription(String namespace) throws 
PulsarAdminException;
+
+    /**
+     * Remove maxConsumersPerSubscription for a namespace asynchronously.
+     * @param namespace
+     * @return
+     */
+    CompletableFuture<Void> removeMaxConsumersPerSubscriptionAsync(String 
namespace);
+
+    /**
      * Get the maxUnackedMessagesPerConsumer for a namespace.
      * <p/>
      * Response example:
diff --git 
a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/NamespacesImpl.java
 
b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/NamespacesImpl.java
index fd2c422..e110645 100644
--- 
a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/NamespacesImpl.java
+++ 
b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/NamespacesImpl.java
@@ -2543,7 +2543,7 @@ public class NamespacesImpl extends BaseResource 
implements Namespaces {
     }
 
     @Override
-    public int getMaxConsumersPerSubscription(String namespace) throws 
PulsarAdminException {
+    public Integer getMaxConsumersPerSubscription(String namespace) throws 
PulsarAdminException {
         try {
             return getMaxConsumersPerSubscriptionAsync(namespace).
                     get(this.readTimeoutMs, TimeUnit.MILLISECONDS);
@@ -2602,6 +2602,30 @@ public class NamespacesImpl extends BaseResource 
implements Namespaces {
     }
 
     @Override
+    public void removeMaxConsumersPerSubscription(String namespace)
+            throws PulsarAdminException {
+        try {
+            removeMaxConsumersPerSubscriptionAsync(namespace)
+                    .get(this.readTimeoutMs, TimeUnit.MILLISECONDS);
+        } catch (ExecutionException e) {
+            throw (PulsarAdminException) e.getCause();
+        } catch (InterruptedException e) {
+            Thread.currentThread().interrupt();
+            throw new PulsarAdminException(e);
+        } catch (TimeoutException e) {
+            throw new PulsarAdminException.TimeoutException(e);
+        }
+    }
+
+    @Override
+    public CompletableFuture<Void> removeMaxConsumersPerSubscriptionAsync(
+            String namespace) {
+        NamespaceName ns = NamespaceName.get(namespace);
+        WebTarget path = namespacePath(ns, "maxConsumersPerSubscription");
+        return asyncDeleteRequest(path);
+    }
+
+    @Override
     public Integer getMaxUnackedMessagesPerConsumer(String namespace) throws 
PulsarAdminException {
         try {
             return getMaxUnackedMessagesPerConsumerAsync(namespace).
diff --git 
a/pulsar-client-tools-test/src/test/java/org/apache/pulsar/admin/cli/PulsarAdminToolTest.java
 
b/pulsar-client-tools-test/src/test/java/org/apache/pulsar/admin/cli/PulsarAdminToolTest.java
index 7c03f2f..93d547f 100644
--- 
a/pulsar-client-tools-test/src/test/java/org/apache/pulsar/admin/cli/PulsarAdminToolTest.java
+++ 
b/pulsar-client-tools-test/src/test/java/org/apache/pulsar/admin/cli/PulsarAdminToolTest.java
@@ -536,6 +536,9 @@ public class PulsarAdminToolTest {
         namespaces.run(split("get-max-consumers-per-subscription 
myprop/clust/ns1"));
         
verify(mockNamespaces).getMaxConsumersPerSubscription("myprop/clust/ns1");
 
+        namespaces.run(split("remove-max-consumers-per-subscription 
myprop/clust/ns1"));
+        
verify(mockNamespaces).removeMaxConsumersPerSubscription("myprop/clust/ns1");
+
         namespaces.run(split("set-max-consumers-per-subscription 
myprop/clust/ns1 -c 3"));
         
verify(mockNamespaces).setMaxConsumersPerSubscription("myprop/clust/ns1", 3);
 
diff --git 
a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdNamespaces.java
 
b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdNamespaces.java
index a749271..7a2e52d 100644
--- 
a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdNamespaces.java
+++ 
b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdNamespaces.java
@@ -1475,6 +1475,18 @@ public class CmdNamespaces extends CmdBase {
         }
     }
 
+    @Parameters(commandDescription = "Remove maxConsumersPerSubscription for a 
namespace")
+    private class RemoveMaxConsumersPerSubscription extends CliCommand {
+        @Parameter(description = "tenant/namespace\n", required = true)
+        private java.util.List<String> params;
+
+        @Override
+        void run() throws PulsarAdminException {
+            String namespace = validateNamespace(params);
+            
getAdmin().namespaces().removeMaxConsumersPerSubscription(namespace);
+        }
+    }
+
     @Parameters(commandDescription = "Set maxConsumersPerSubscription for a 
namespace")
     private class SetMaxConsumersPerSubscription extends CliCommand {
         @Parameter(description = "tenant/namespace", required = true)
@@ -2190,6 +2202,7 @@ public class CmdNamespaces extends CmdBase {
 
         jcommander.addCommand("get-max-consumers-per-subscription", new 
GetMaxConsumersPerSubscription());
         jcommander.addCommand("set-max-consumers-per-subscription", new 
SetMaxConsumersPerSubscription());
+        jcommander.addCommand("remove-max-consumers-per-subscription", new 
RemoveMaxConsumersPerSubscription());
 
         jcommander.addCommand("get-max-unacked-messages-per-subscription", new 
GetMaxUnackedMessagesPerSubscription());
         jcommander.addCommand("set-max-unacked-messages-per-subscription", new 
SetMaxUnackedMessagesPerSubscription());
diff --git 
a/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/Policies.java
 
b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/Policies.java
index 184c58d..72d6f6f 100644
--- 
a/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/Policies.java
+++ 
b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/Policies.java
@@ -86,7 +86,7 @@ public class Policies {
     @SuppressWarnings("checkstyle:MemberName")
     public Integer max_consumers_per_topic = null;
     @SuppressWarnings("checkstyle:MemberName")
-    public int max_consumers_per_subscription = 0;
+    public Integer max_consumers_per_subscription = null;
     @SuppressWarnings("checkstyle:MemberName")
     public Integer max_unacked_messages_per_consumer = null;
     @SuppressWarnings("checkstyle:MemberName")
@@ -178,8 +178,8 @@ public class Policies {
                     && Objects.equals(max_consumers_per_topic, 
other.max_consumers_per_topic)
                     && Objects.equals(max_unacked_messages_per_consumer, 
other.max_unacked_messages_per_consumer)
                     && Objects.equals(max_unacked_messages_per_subscription, 
max_unacked_messages_per_subscription)
-                    && max_consumers_per_subscription == 
other.max_consumers_per_subscription
-                    && compaction_threshold == other.compaction_threshold
+                    && Objects.equals(max_consumers_per_subscription, 
max_consumers_per_subscription)
+                    && Objects.equals(compaction_threshold, 
compaction_threshold)
                     && offload_threshold == other.offload_threshold
                     && Objects.equals(offload_deletion_lag_ms, 
other.offload_deletion_lag_ms)
                     && schema_auto_update_compatibility_strategy == 
other.schema_auto_update_compatibility_strategy

Reply via email to