git-enzo opened a new issue #8214:
URL: https://github.com/apache/pulsar/issues/8214
**Describe the bug**
When we pause a consumer which subscribes to multiple topics, for example by
parsing a regular expression, and then the new producer sends a message to the
new topic that the consumer discovers/subscribes, then messages from that topic
will be "consumable" by the consumer during pause.
**To Reproduce**
Steps to reproduce the behavior:
1. Create a ```multi-topic-consumer``` which subscribes to multiple topics
by parsing regular expression, for example
```persistent://public/default/foo.*```. This type of consumer periodically
checks if new topics are available based on regexp
(patternAutoDiscoveryPeriod). Let's say, we set this property to 10 sec. For
example:
```
Pattern topicsPattern = Pattern.compile("persistent://public/default/foo.*");
pulsarClient
.newConsumer()
.subscriptionName("subscription")
.subscriptionType(SubscriptionType.Shared)
.topicsPattern(topicsPattern)
.patternAutoDiscoveryPeriod(10, TimeUnit.SECONDS)
.receiverQueueSize(1)
.subscribe();
```
2. Create ```producer1``` which sends messages to topic
```persistent://public/default/foo1```. If ```multi-topic-consumer``` discovers
this new topic (max after 10 seconds in our case), then messages will be
consumed by consumer. Example producer:
```
Producer<String> producer1 = pulsarClient
.newProducer()
.topic("foo1")
.create();
```
3. Now, if ```producer1``` sends messages, a ```multi-topic-consumer``` can
receive them.
4. Let's **pause** ```multi-topic-consumer``` by invoking ```pause()```. In
this case consumer is a PatternMultiTopicsConsumerImpl type. That means
underneath there is a list of internal consumers, where each of them consumes
single topic (subscribes single topic). When we invoke ```pause()```, then each
consumer of this list will be paused separately.
5. Now, when we send messages from ```producer1```, then
```multi-topic-consumer``` will not consume them. There is an exception,
because consumer can still receive messages until receiverQueueSize is used up
even if ```pause()``` was invoked, but after that this is impossible. For this
case it is irrelevant, we can skip it.
6. Create ```producer2``` which sends messages to topic
```persistent://public/default/foo2```. If ```multi-topic-consumer``` discovers
this new topic ```foo2``` (max after 10 seconds in our case), then messages
**will be consumed** by consumer **even if consumer is paused**. Underneath new
internal consumer is created, which subscribes ```foo2```, but this particular
consumer **is not paused**.
7. Now we have following state:
- ```multi-topic-consumer``` - is paused
- ```producer1``` sends messages to topic ```foo1``` and
```multi-topic-consumer``` cannot receive them - it is OK
- ```producer2``` sends messages to topic ```foo2``` and
```multi-topic-consumer``` is able to receive them - it is not expected behavior
**Expected behavior**
When new topic will be discovered/subscribed by multi-topic consumer during
consumer is paused, then this consumer should not be able to receive messages
from this topic.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]