This is an automated email from the ASF dual-hosted git repository.

penghui pushed a commit to branch branch-2.7
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit ff6b054b515ea7b89c2942637d630aac1ac60a65
Author: feynmanlin <[email protected]>
AuthorDate: Tue Mar 23 11:56:15 2021 +0800

    Fix the useless retry when the maximum number of subscriptions is reached 
(#9991)
    
    When the maximum number of subscriptions is reached, due to the long 
default value of operationTimeout, the client will keep retrying and get stuck 
for a long time.
    If a NotAllowedException is returned, the client will not continue to retry 
and will fail directly.
    
    Can the test fail in a short time
    
    (cherry picked from commit 31c5fb52bc841e40d678bd3787269160b7d14c12)
---
 .../broker/service/persistent/PersistentTopic.java | 10 ++++--
 .../pulsar/broker/admin/TopicPoliciesTest.java     | 39 ++++++++++++++++++++++
 2 files changed, 46 insertions(+), 3 deletions(-)

diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
index 7ae2543..5bee031 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
@@ -131,7 +131,6 @@ import 
org.apache.pulsar.common.protocol.schema.SchemaVersion;
 import org.apache.pulsar.common.util.Codec;
 import org.apache.pulsar.common.util.DateFormatter;
 import org.apache.pulsar.common.util.FutureUtil;
-import org.apache.pulsar.common.util.RestException;
 import org.apache.pulsar.common.util.collections.ConcurrentOpenHashMap;
 import org.apache.pulsar.compaction.CompactedTopic;
 import org.apache.pulsar.compaction.CompactedTopicImpl;
@@ -697,6 +696,11 @@ public class PersistentTopic extends AbstractTopic 
implements Topic, AddEntryCal
             log.error("[{}] Failed to create subscription: {} error: {}", 
topic, subscriptionName, ex);
             USAGE_COUNT_UPDATER.decrementAndGet(PersistentTopic.this);
             future.completeExceptionally(new PersistenceException(ex));
+            if (ex.getCause() instanceof NotAllowedException) {
+                future.completeExceptionally(ex.getCause());
+            } else {
+                future.completeExceptionally(new PersistenceException(ex));
+            }
             return null;
         });
 
@@ -723,7 +727,7 @@ public class PersistentTopic extends AbstractTopic 
implements Topic, AddEntryCal
             InitialPosition initialPosition, long 
startMessageRollbackDurationSec, boolean replicated) {
         CompletableFuture<Subscription> subscriptionFuture = new 
CompletableFuture<>();
         if (checkMaxSubscriptionsPerTopicExceed(subscriptionName)) {
-            subscriptionFuture.completeExceptionally(new 
RestException(Response.Status.PRECONDITION_FAILED,
+            subscriptionFuture.completeExceptionally(new NotAllowedException(
                     "Exceed the maximum number of subscriptions of the topic: 
" + topic));
             return subscriptionFuture;
         }
@@ -767,7 +771,7 @@ public class PersistentTopic extends AbstractTopic 
implements Topic, AddEntryCal
 
         CompletableFuture<Subscription> subscriptionFuture = new 
CompletableFuture<>();
         if (checkMaxSubscriptionsPerTopicExceed(subscriptionName)) {
-            subscriptionFuture.completeExceptionally(new 
RestException(Response.Status.PRECONDITION_FAILED,
+            subscriptionFuture.completeExceptionally(new NotAllowedException(
                     "Exceed the maximum number of subscriptions of the topic: 
" + topic));
             return subscriptionFuture;
         }
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TopicPoliciesTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TopicPoliciesTest.java
index 9e75f51..65dcfa8 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TopicPoliciesTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TopicPoliciesTest.java
@@ -29,11 +29,13 @@ import 
org.apache.pulsar.broker.service.persistent.DispatchRateLimiter;
 import org.apache.pulsar.broker.service.persistent.PersistentTopic;
 import org.apache.pulsar.client.admin.PulsarAdminException;
 import org.apache.pulsar.client.api.Consumer;
+import org.apache.pulsar.client.api.ConsumerBuilder;
 import org.apache.pulsar.client.api.MessageId;
 import org.apache.pulsar.client.api.Producer;
 import org.apache.pulsar.client.api.PulsarClient;
 import org.apache.pulsar.client.api.PulsarClientException;
 import org.apache.pulsar.client.api.Schema;
+import org.apache.pulsar.client.api.SubscriptionMode;
 import org.apache.pulsar.client.api.SubscriptionType;
 import org.apache.pulsar.common.naming.TopicName;
 import org.apache.pulsar.common.policies.data.BacklogQuota;
@@ -1317,6 +1319,43 @@ public class TopicPoliciesTest extends 
MockedPulsarServiceBaseTest {
     }
 
     @Test(timeOut = 20000)
+    public void testMaxSubscriptionsFailFast() throws Exception {
+        doTestMaxSubscriptionsFailFast(SubscriptionMode.Durable);
+        doTestMaxSubscriptionsFailFast(SubscriptionMode.NonDurable);
+    }
+
+    private void doTestMaxSubscriptionsFailFast(SubscriptionMode subMode) 
throws Exception {
+        final String topic = "persistent://" + myNamespace + "/test-" + 
UUID.randomUUID();
+        // init cache
+        pulsarClient.newProducer().topic(topic).create().close();
+        Awaitility.await().atMost(5, TimeUnit.SECONDS)
+                .until(() -> 
pulsar.getTopicPoliciesService().cacheIsInitialized(TopicName.get(topic)));
+        int maxSubInNamespace = 2;
+        List<Consumer> consumers = new ArrayList<>();
+        ConsumerBuilder consumerBuilder = 
pulsarClient.newConsumer().subscriptionMode(subMode)
+                .subscriptionType(SubscriptionType.Shared).topic(topic);
+        admin.namespaces().setMaxSubscriptionsPerTopic(myNamespace, 
maxSubInNamespace);
+        Awaitility.await().untilAsserted(()
+                -> 
assertNotNull(admin.namespaces().getMaxSubscriptionsPerTopic(myNamespace)));
+        for (int i = 0; i < maxSubInNamespace; i++) {
+            consumers.add(consumerBuilder.subscriptionName("sub" + 
i).subscribe());
+        }
+        long start = System.currentTimeMillis();
+        try {
+            consumerBuilder.subscriptionName("sub").subscribe();
+            fail("should fail");
+        } catch (PulsarClientException e) {
+            assertTrue(e instanceof PulsarClientException.NotAllowedException);
+        }
+        //fail fast
+        assertTrue(System.currentTimeMillis() - start < 3000);
+        //clean
+        for (Consumer consumer : consumers) {
+            consumer.close();
+        }
+    }
+
+    @Test(timeOut = 20000)
     public void testMaxSubscriptionsPerTopicApi() throws Exception {
         final String topic = "persistent://" + myNamespace + "/test-" + 
UUID.randomUUID();
         // init cache

Reply via email to