This is an automated email from the ASF dual-hosted git repository.
yong pushed a commit to branch branch-2.7
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/branch-2.7 by this push:
new bff7caa Fix cannot consume using older subscriptions if max
subscription limi… (#9758)
bff7caa is described below
commit bff7caa23043e8a661e74fd081d7bbbaec3af116
Author: feynmanlin <[email protected]>
AuthorDate: Mon Mar 1 21:52:10 2021 +0800
Fix cannot consume using older subscriptions if max subscription limi…
(#9758)
Fixes #9755
### Motivation
If maxSubscriptionsPerTopic is set in the broker (or even as a namespace
policy), once this limit is reached, broker does not allow consumers to be
created for even older subscriptions.
### Modifications
Existing subscriptions are not restricted
### Verifying this change
TopicPoliciesTest#testMaxSubscriptionsPerTopicWithExistingSubs
(cherry picked from commit 52a28635c0cc190118c9027d248754b8c3e3abe7)
---
.../broker/service/persistent/PersistentTopic.java | 12 ++++--
.../pulsar/broker/admin/TopicPoliciesTest.java | 43 ++++++++++++++++++++++
2 files changed, 51 insertions(+), 4 deletions(-)
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
index 154471d..2a1cc89 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
@@ -66,6 +66,7 @@ import org.apache.bookkeeper.mledger.Position;
import org.apache.bookkeeper.mledger.impl.ManagedCursorImpl;
import org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl;
import org.apache.bookkeeper.mledger.impl.PositionImpl;
+import org.apache.commons.lang3.StringUtils;
import org.apache.pulsar.broker.PulsarServerException;
import org.apache.pulsar.broker.ServiceConfiguration;
import org.apache.pulsar.broker.admin.AdminResource;
@@ -721,8 +722,7 @@ public class PersistentTopic extends AbstractTopic
implements Topic, AddEntryCal
private CompletableFuture<Subscription> getDurableSubscription(String
subscriptionName,
InitialPosition initialPosition, long
startMessageRollbackDurationSec, boolean replicated) {
CompletableFuture<Subscription> subscriptionFuture = new
CompletableFuture<>();
-
- if (checkMaxSubscriptionsPerTopicExceed()) {
+ if (checkMaxSubscriptionsPerTopicExceed(subscriptionName)) {
subscriptionFuture.completeExceptionally(new
RestException(Response.Status.PRECONDITION_FAILED,
"Exceed the maximum number of subscriptions of the topic:
" + topic));
return subscriptionFuture;
@@ -766,7 +766,7 @@ public class PersistentTopic extends AbstractTopic
implements Topic, AddEntryCal
log.info("[{}][{}] Creating non-durable subscription at msg id {}",
topic, subscriptionName, startMessageId);
CompletableFuture<Subscription> subscriptionFuture = new
CompletableFuture<>();
- if (checkMaxSubscriptionsPerTopicExceed()) {
+ if (checkMaxSubscriptionsPerTopicExceed(subscriptionName)) {
subscriptionFuture.completeExceptionally(new
RestException(Response.Status.PRECONDITION_FAILED,
"Exceed the maximum number of subscriptions of the topic:
" + topic));
return subscriptionFuture;
@@ -2731,7 +2731,11 @@ public class PersistentTopic extends AbstractTopic
implements Topic, AddEntryCal
return messageDeduplication;
}
- private boolean checkMaxSubscriptionsPerTopicExceed() {
+ private boolean checkMaxSubscriptionsPerTopicExceed(String
subscriptionName) {
+ //Existing subscriptions are not affected
+ if (StringUtils.isNotEmpty(subscriptionName) &&
getSubscription(subscriptionName) != null) {
+ return false;
+ }
TopicPolicies topicPolicies = getTopicPolicies(TopicName.get(topic));
Integer maxSubsPerTopic = null;
if (topicPolicies != null &&
topicPolicies.isMaxSubscriptionsPerTopicSet()) {
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TopicPoliciesTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TopicPoliciesTest.java
index 5c3d9ff..a2fc8fb 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TopicPoliciesTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TopicPoliciesTest.java
@@ -1342,6 +1342,49 @@ public class TopicPoliciesTest extends
MockedPulsarServiceBaseTest {
}
@Test(timeOut = 20000)
+ public void testMaxSubscriptionsPerTopicWithExistingSubs() throws
Exception {
+ final String topic = "persistent://" + myNamespace + "/test-" +
UUID.randomUUID();
+ // init cache
+ pulsarClient.newProducer().topic(topic).create().close();
+ Awaitility.await().atMost(5, TimeUnit.SECONDS)
+ .until(() ->
pulsar.getTopicPoliciesService().cacheIsInitialized(TopicName.get(topic)));
+ // Set topic-level max subscriptions
+ final int topicLevelMaxSubNum = 2;
+ admin.topics().setMaxSubscriptionsPerTopic(topic, topicLevelMaxSubNum);
+ Awaitility.await().atMost(5, TimeUnit.SECONDS).until(()
+ ->
pulsar.getTopicPoliciesService().getTopicPolicies(TopicName.get(topic)) !=
null);
+ List<Consumer<String>> consumerList = new ArrayList<>();
+ String subName = "my-sub-";
+ for (int i = 0; i < topicLevelMaxSubNum; i++) {
+ Consumer<String> consumer = pulsarClient.newConsumer(Schema.STRING)
+ .subscriptionType(SubscriptionType.Shared)
+ .subscriptionName(subName + i)
+ .topic(topic).subscribe();
+ consumerList.add(consumer);
+ }
+ // should fail
+ try (PulsarClient client = PulsarClient.builder().operationTimeout(2,
TimeUnit.SECONDS)
+ .serviceUrl(brokerUrl.toString()).build()) {
+ consumerList.add(client.newConsumer(Schema.STRING)
+ .subscriptionName(UUID.randomUUID().toString())
+ .topic(topic).subscribe());
+ fail("should fail");
+ } catch (PulsarClientException ignore) {
+ assertEquals(consumerList.size(), topicLevelMaxSubNum);
+ }
+ //create a consumer with the same subscription name, it should succeed
+ pulsarClient.newConsumer(Schema.STRING)
+ .subscriptionType(SubscriptionType.Shared)
+ .subscriptionName(subName + "0")
+ .topic(topic).subscribe().close();
+
+ //Clean up
+ for (Consumer<String> c : consumerList) {
+ c.close();
+ }
+ }
+
+ @Test(timeOut = 20000)
public void testMaxSubscriptionsPerTopic() throws Exception {
int brokerLevelMaxSub = 4;
conf.setMaxSubscriptionsPerTopic(4);