anvinjain opened a new issue #9755:
URL: https://github.com/apache/pulsar/issues/9755
**Describe the bug**
If maxSubscriptionsPerTopic is set in the broker (or even as a namespace
policy), once this limit is reached, broker does not allow consumers to be
created for even older subscriptions.
**To Reproduce**
Steps to reproduce the behavior:
1. Broker config: `maxSubscriptionsPerTopic=5`
2. Create a topic with which we reproduce this behaviour:
`./pulsar-admin topics create-partitioned-topic -p 5
persistent://public/default/maxsub-topic`
2. Start producing to this topic: `./pulsar-perf produce -m 1000 -r 1
persistent://public/default/maxsub-topic`
3. Create 5 subscriptions (and an associated consumer) using pulsar-perf or
pulsar-client:
`./pulsar-client consume -s s1 -t Shared
persistent://public/default/maxsub-topic`
4. Observe that subscriptions are created and message is successfully
consumed every time.
```
./pulsar-admin topics subscriptions persistent://public/default/maxsub-topic
"s1"
"s5"
"s2"
"s3"
"s4"
```
4. **Consume from an existing subscription again. Error is observed that
maximum number of subscriptions of the topic have exceeded. This is not
expected since this is an already existing subscription.** The same error is
also observed when consuming from a new subscription but that is expected.
**./pulsar-client consume -s s1 -t Shared
persistent://public/default/maxsub-topic**
```
00:05:28.970 [pulsar-client-io-1-1] INFO
org.apache.pulsar.client.impl.ConnectionPool - [[id: 0xa5da68bb,
L:/172.20.208.225:52509 - R:/10.24.13.161:6650]] Connected to server
00:05:30.065 [pulsar-client-io-1-1] INFO
org.apache.pulsar.client.impl.ConsumerStatsRecorderImpl - Starting Pulsar
consumer status recorder with config: {
"topicNames" : [ ],
"topicsPattern" : null,
"subscriptionName" : "s1",
"subscriptionType" : "Shared",
"subscriptionMode" : "Durable",
"receiverQueueSize" : 1000,
"acknowledgementsGroupTimeMicros" : 100000,
"negativeAckRedeliveryDelayMicros" : 60000000,
"maxTotalReceiverQueueSizeAcrossPartitions" : 50000,
"consumerName" : "1884c",
"ackTimeoutMillis" : 0,
"tickDurationMillis" : 1000,
"priorityLevel" : 0,
"maxPendingChuckedMessage" : 10,
"autoAckOldestChunkedMessageOnQueueFull" : false,
"expireTimeOfIncompleteChunkedMessageMillis" : 60000,
"cryptoFailureAction" : "FAIL",
"properties" : { },
"readCompacted" : false,
"subscriptionInitialPosition" : "Latest",
"patternAutoDiscoveryPeriod" : 60,
"regexSubscriptionMode" : "PersistentOnly",
"deadLetterPolicy" : null,
"retryEnable" : false,
"autoUpdatePartitions" : true,
"autoUpdatePartitionsIntervalSeconds" : 60,
"replicateSubscriptionState" : false,
"resetIncludeHead" : false,
"keySharedPolicy" : null,
"batchIndexAckEnabled" : false
}
00:05:30.070 [pulsar-client-io-1-1] INFO
org.apache.pulsar.client.impl.ConsumerStatsRecorderImpl - Pulsar client config:
{
"serviceUrl" : "pulsar://10.24.13.161:6650",
"authPluginClassName" : null,
"operationTimeoutMs" : 30000,
"statsIntervalSeconds" : 60,
"numIoThreads" : 1,
"numListenerThreads" : 1,
"connectionsPerBroker" : 1,
"useTcpNoDelay" : true,
"useTls" : false,
"tlsTrustCertsFilePath" : "",
"tlsAllowInsecureConnection" : false,
"tlsHostnameVerificationEnable" : false,
"concurrentLookupRequest" : 5000,
"maxLookupRequest" : 50000,
"maxLookupRedirects" : 20,
"maxNumberOfRejectedRequestPerConnection" : 50,
"keepAliveIntervalSeconds" : 30,
"connectionTimeoutMs" : 10000,
"requestTimeoutMs" : 60000,
"initialBackoffIntervalNanos" : 100000000,
"maxBackoffIntervalNanos" : 60000000000,
"listenerName" : null,
"useKeyStoreTls" : false,
"sslProvider" : null,
"tlsTrustStoreType" : "JKS",
"tlsTrustStorePath" : "",
"tlsTrustStorePassword" : "",
"tlsCiphers" : [ ],
"tlsProtocols" : [ ],
"proxyServiceUrl" : null,
"proxyProtocol" : null,
"enableTransaction" : false
}
...
...
00:05:30.216 [pulsar-client-io-1-1] INFO
org.apache.pulsar.client.impl.ConnectionPool - [[id: 0x2a6585b8,
L:/172.20.208.225:52512 - R:/10.24.13.161:6650]] Connected to server
00:05:30.217 [pulsar-client-io-1-1] INFO
org.apache.pulsar.client.impl.ClientCnx - [id: 0x2a6585b8,
L:/172.20.208.225:52512 - R:/10.24.13.161:6650] Connected through proxy to
target broker at 10.48.50.65:6650
...
...
00:05:30.379 [pulsar-client-io-1-1] INFO
org.apache.pulsar.client.impl.ConsumerImpl -
[persistent://public/default/maxsub-topic-partition-4][s1] Subscribing to topic
on cnx [id: 0x5b223f9b, L:/172.20.208.225:52511 - R:/10.24.13.161:6650],
consumerId 4
...
...
00:05:30.501 [pulsar-client-io-1-1] WARN
org.apache.pulsar.client.impl.ClientCnx - [id: 0x2a6585b8,
L:/172.20.208.225:52512 - R:/10.24.13.161:6650] Received error from server:
java.util.concurrent.CompletionException:
org.apache.pulsar.common.util.RestException: Exceed the maximum number of
subscriptions of the topic: persistent://public/default/maxsub-topic-partition-1
00:05:30.503 [pulsar-client-io-1-1] WARN
org.apache.pulsar.client.impl.ConsumerImpl -
[persistent://public/default/maxsub-topic-partition-1][s1] Failed to subscribe
to topic on /10.24.13.161:6650
00:05:30.503 [pulsar-client-io-1-1] WARN
org.apache.pulsar.client.impl.ConnectionHandler -
[persistent://public/default/maxsub-topic-partition-1] [s1] Could not get
connection to broker: java.util.concurrent.CompletionException:
org.apache.pulsar.common.util.RestException: Exceed the maximum number of
subscriptions of the topic:
persistent://public/default/maxsub-topic-partition-1 -- Will try again in 0.1 s
```
**Expected behavior**
Expected that consumer with existing subscriptions would get attached
without any error.
**Desktop (please complete the following information):**
- streamnative/pulsar image used for setting up 2.7.0
**Additional context**
The bug seems to be in getDurableSubscription method of
PersistentTopic.java. Here the check of exceeding subs is done before we
establish whether this is a new or existing subscription:
https://github.com/apache/pulsar/blob/a8b921cf15c0a0f652f1d9f62a6481efea243881/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java#L785-L806
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]