This is an automated email from the ASF dual-hosted git repository.

yong pushed a commit to branch branch-2.7
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/branch-2.7 by this push:
     new bff7caa  Fix cannot consume using older subscriptions if max 
subscription limi… (#9758)
bff7caa is described below

commit bff7caa23043e8a661e74fd081d7bbbaec3af116
Author: feynmanlin <[email protected]>
AuthorDate: Mon Mar 1 21:52:10 2021 +0800

    Fix cannot consume using older subscriptions if max subscription limi… 
(#9758)
    
    Fixes #9755
    
    ### Motivation
    If maxSubscriptionsPerTopic is set in the broker (or even as a namespace 
policy), once this limit is reached, broker does not allow consumers to be 
created for even older subscriptions.
    
    ### Modifications
    Existing subscriptions are not restricted
    
    ### Verifying this change
    TopicPoliciesTest#testMaxSubscriptionsPerTopicWithExistingSubs
    
    (cherry picked from commit 52a28635c0cc190118c9027d248754b8c3e3abe7)
---
 .../broker/service/persistent/PersistentTopic.java | 12 ++++--
 .../pulsar/broker/admin/TopicPoliciesTest.java     | 43 ++++++++++++++++++++++
 2 files changed, 51 insertions(+), 4 deletions(-)

diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
index 154471d..2a1cc89 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
@@ -66,6 +66,7 @@ import org.apache.bookkeeper.mledger.Position;
 import org.apache.bookkeeper.mledger.impl.ManagedCursorImpl;
 import org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl;
 import org.apache.bookkeeper.mledger.impl.PositionImpl;
+import org.apache.commons.lang3.StringUtils;
 import org.apache.pulsar.broker.PulsarServerException;
 import org.apache.pulsar.broker.ServiceConfiguration;
 import org.apache.pulsar.broker.admin.AdminResource;
@@ -721,8 +722,7 @@ public class PersistentTopic extends AbstractTopic 
implements Topic, AddEntryCal
     private CompletableFuture<Subscription> getDurableSubscription(String 
subscriptionName,
             InitialPosition initialPosition, long 
startMessageRollbackDurationSec, boolean replicated) {
         CompletableFuture<Subscription> subscriptionFuture = new 
CompletableFuture<>();
-
-        if (checkMaxSubscriptionsPerTopicExceed()) {
+        if (checkMaxSubscriptionsPerTopicExceed(subscriptionName)) {
             subscriptionFuture.completeExceptionally(new 
RestException(Response.Status.PRECONDITION_FAILED,
                     "Exceed the maximum number of subscriptions of the topic: 
" + topic));
             return subscriptionFuture;
@@ -766,7 +766,7 @@ public class PersistentTopic extends AbstractTopic 
implements Topic, AddEntryCal
         log.info("[{}][{}] Creating non-durable subscription at msg id {}", 
topic, subscriptionName, startMessageId);
 
         CompletableFuture<Subscription> subscriptionFuture = new 
CompletableFuture<>();
-        if (checkMaxSubscriptionsPerTopicExceed()) {
+        if (checkMaxSubscriptionsPerTopicExceed(subscriptionName)) {
             subscriptionFuture.completeExceptionally(new 
RestException(Response.Status.PRECONDITION_FAILED,
                     "Exceed the maximum number of subscriptions of the topic: 
" + topic));
             return subscriptionFuture;
@@ -2731,7 +2731,11 @@ public class PersistentTopic extends AbstractTopic 
implements Topic, AddEntryCal
         return messageDeduplication;
     }
 
-    private boolean checkMaxSubscriptionsPerTopicExceed() {
+    private boolean checkMaxSubscriptionsPerTopicExceed(String 
subscriptionName) {
+        //Existing subscriptions are not affected
+        if (StringUtils.isNotEmpty(subscriptionName) && 
getSubscription(subscriptionName) != null) {
+            return false;
+        }
         TopicPolicies topicPolicies = getTopicPolicies(TopicName.get(topic));
         Integer maxSubsPerTopic = null;
         if (topicPolicies != null && 
topicPolicies.isMaxSubscriptionsPerTopicSet()) {
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TopicPoliciesTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TopicPoliciesTest.java
index 5c3d9ff..a2fc8fb 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TopicPoliciesTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TopicPoliciesTest.java
@@ -1342,6 +1342,49 @@ public class TopicPoliciesTest extends 
MockedPulsarServiceBaseTest {
     }
 
     @Test(timeOut = 20000)
+    public void testMaxSubscriptionsPerTopicWithExistingSubs() throws 
Exception {
+        final String topic = "persistent://" + myNamespace + "/test-" + 
UUID.randomUUID();
+        // init cache
+        pulsarClient.newProducer().topic(topic).create().close();
+        Awaitility.await().atMost(5, TimeUnit.SECONDS)
+                .until(() -> 
pulsar.getTopicPoliciesService().cacheIsInitialized(TopicName.get(topic)));
+        // Set topic-level max subscriptions
+        final int topicLevelMaxSubNum = 2;
+        admin.topics().setMaxSubscriptionsPerTopic(topic, topicLevelMaxSubNum);
+        Awaitility.await().atMost(5, TimeUnit.SECONDS).until(()
+                -> 
pulsar.getTopicPoliciesService().getTopicPolicies(TopicName.get(topic)) != 
null);
+        List<Consumer<String>> consumerList = new ArrayList<>();
+        String subName = "my-sub-";
+        for (int i = 0; i < topicLevelMaxSubNum; i++) {
+            Consumer<String> consumer = pulsarClient.newConsumer(Schema.STRING)
+                    .subscriptionType(SubscriptionType.Shared)
+                    .subscriptionName(subName + i)
+                    .topic(topic).subscribe();
+            consumerList.add(consumer);
+        }
+        // should fail
+        try (PulsarClient client = PulsarClient.builder().operationTimeout(2, 
TimeUnit.SECONDS)
+                .serviceUrl(brokerUrl.toString()).build()) {
+            consumerList.add(client.newConsumer(Schema.STRING)
+                    .subscriptionName(UUID.randomUUID().toString())
+                    .topic(topic).subscribe());
+            fail("should fail");
+        } catch (PulsarClientException ignore) {
+            assertEquals(consumerList.size(), topicLevelMaxSubNum);
+        }
+        //create a consumer with the same subscription name, it should succeed
+        pulsarClient.newConsumer(Schema.STRING)
+                .subscriptionType(SubscriptionType.Shared)
+                .subscriptionName(subName + "0")
+                .topic(topic).subscribe().close();
+
+        //Clean up
+        for (Consumer<String> c : consumerList) {
+            c.close();
+        }
+    }
+
+    @Test(timeOut = 20000)
     public void testMaxSubscriptionsPerTopic() throws Exception {
         int brokerLevelMaxSub = 4;
         conf.setMaxSubscriptionsPerTopic(4);

Reply via email to