This is an automated email from the ASF dual-hosted git repository. penghui pushed a commit to branch branch-2.7 in repository https://gitbox.apache.org/repos/asf/pulsar.git
commit ff6b054b515ea7b89c2942637d630aac1ac60a65 Author: feynmanlin <[email protected]> AuthorDate: Tue Mar 23 11:56:15 2021 +0800 Fix the useless retry when the maximum number of subscriptions is reached (#9991) When the maximum number of subscriptions is reached, due to the long default value of operationTimeout, the client will keep retrying and get stuck for a long time. If a NotAllowedException is returned, the client will not continue to retry and will fail directly. Can the test fail in a short time (cherry picked from commit 31c5fb52bc841e40d678bd3787269160b7d14c12) --- .../broker/service/persistent/PersistentTopic.java | 10 ++++-- .../pulsar/broker/admin/TopicPoliciesTest.java | 39 ++++++++++++++++++++++ 2 files changed, 46 insertions(+), 3 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java index 7ae2543..5bee031 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java @@ -131,7 +131,6 @@ import org.apache.pulsar.common.protocol.schema.SchemaVersion; import org.apache.pulsar.common.util.Codec; import org.apache.pulsar.common.util.DateFormatter; import org.apache.pulsar.common.util.FutureUtil; -import org.apache.pulsar.common.util.RestException; import org.apache.pulsar.common.util.collections.ConcurrentOpenHashMap; import org.apache.pulsar.compaction.CompactedTopic; import org.apache.pulsar.compaction.CompactedTopicImpl; @@ -697,6 +696,11 @@ public class PersistentTopic extends AbstractTopic implements Topic, AddEntryCal log.error("[{}] Failed to create subscription: {} error: {}", topic, subscriptionName, ex); USAGE_COUNT_UPDATER.decrementAndGet(PersistentTopic.this); future.completeExceptionally(new PersistenceException(ex)); + if (ex.getCause() instanceof NotAllowedException) { + future.completeExceptionally(ex.getCause()); + } else { + future.completeExceptionally(new PersistenceException(ex)); + } return null; }); @@ -723,7 +727,7 @@ public class PersistentTopic extends AbstractTopic implements Topic, AddEntryCal InitialPosition initialPosition, long startMessageRollbackDurationSec, boolean replicated) { CompletableFuture<Subscription> subscriptionFuture = new CompletableFuture<>(); if (checkMaxSubscriptionsPerTopicExceed(subscriptionName)) { - subscriptionFuture.completeExceptionally(new RestException(Response.Status.PRECONDITION_FAILED, + subscriptionFuture.completeExceptionally(new NotAllowedException( "Exceed the maximum number of subscriptions of the topic: " + topic)); return subscriptionFuture; } @@ -767,7 +771,7 @@ public class PersistentTopic extends AbstractTopic implements Topic, AddEntryCal CompletableFuture<Subscription> subscriptionFuture = new CompletableFuture<>(); if (checkMaxSubscriptionsPerTopicExceed(subscriptionName)) { - subscriptionFuture.completeExceptionally(new RestException(Response.Status.PRECONDITION_FAILED, + subscriptionFuture.completeExceptionally(new NotAllowedException( "Exceed the maximum number of subscriptions of the topic: " + topic)); return subscriptionFuture; } diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TopicPoliciesTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TopicPoliciesTest.java index 9e75f51..65dcfa8 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TopicPoliciesTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TopicPoliciesTest.java @@ -29,11 +29,13 @@ import org.apache.pulsar.broker.service.persistent.DispatchRateLimiter; import org.apache.pulsar.broker.service.persistent.PersistentTopic; import org.apache.pulsar.client.admin.PulsarAdminException; import org.apache.pulsar.client.api.Consumer; +import org.apache.pulsar.client.api.ConsumerBuilder; import org.apache.pulsar.client.api.MessageId; import org.apache.pulsar.client.api.Producer; import org.apache.pulsar.client.api.PulsarClient; import org.apache.pulsar.client.api.PulsarClientException; import org.apache.pulsar.client.api.Schema; +import org.apache.pulsar.client.api.SubscriptionMode; import org.apache.pulsar.client.api.SubscriptionType; import org.apache.pulsar.common.naming.TopicName; import org.apache.pulsar.common.policies.data.BacklogQuota; @@ -1317,6 +1319,43 @@ public class TopicPoliciesTest extends MockedPulsarServiceBaseTest { } @Test(timeOut = 20000) + public void testMaxSubscriptionsFailFast() throws Exception { + doTestMaxSubscriptionsFailFast(SubscriptionMode.Durable); + doTestMaxSubscriptionsFailFast(SubscriptionMode.NonDurable); + } + + private void doTestMaxSubscriptionsFailFast(SubscriptionMode subMode) throws Exception { + final String topic = "persistent://" + myNamespace + "/test-" + UUID.randomUUID(); + // init cache + pulsarClient.newProducer().topic(topic).create().close(); + Awaitility.await().atMost(5, TimeUnit.SECONDS) + .until(() -> pulsar.getTopicPoliciesService().cacheIsInitialized(TopicName.get(topic))); + int maxSubInNamespace = 2; + List<Consumer> consumers = new ArrayList<>(); + ConsumerBuilder consumerBuilder = pulsarClient.newConsumer().subscriptionMode(subMode) + .subscriptionType(SubscriptionType.Shared).topic(topic); + admin.namespaces().setMaxSubscriptionsPerTopic(myNamespace, maxSubInNamespace); + Awaitility.await().untilAsserted(() + -> assertNotNull(admin.namespaces().getMaxSubscriptionsPerTopic(myNamespace))); + for (int i = 0; i < maxSubInNamespace; i++) { + consumers.add(consumerBuilder.subscriptionName("sub" + i).subscribe()); + } + long start = System.currentTimeMillis(); + try { + consumerBuilder.subscriptionName("sub").subscribe(); + fail("should fail"); + } catch (PulsarClientException e) { + assertTrue(e instanceof PulsarClientException.NotAllowedException); + } + //fail fast + assertTrue(System.currentTimeMillis() - start < 3000); + //clean + for (Consumer consumer : consumers) { + consumer.close(); + } + } + + @Test(timeOut = 20000) public void testMaxSubscriptionsPerTopicApi() throws Exception { final String topic = "persistent://" + myNamespace + "/test-" + UUID.randomUUID(); // init cache
