git-enzo opened a new issue #8214:
URL: https://github.com/apache/pulsar/issues/8214


   **Describe the bug**
   When we pause a consumer which subscribes to multiple topics, for example by 
parsing a regular expression, and then the new producer sends a message to the 
new topic that the consumer discovers/subscribes, then messages from that topic 
will be "consumable" by the consumer during pause.
   
   **To Reproduce**
   Steps to reproduce the behavior:
   1. Create a ```multi-topic-consumer``` which subscribes to multiple topics 
by parsing regular expression, for example 
```persistent://public/default/foo.*```. This type of consumer periodically 
checks if new topics are available based on regexp 
(patternAutoDiscoveryPeriod). Let's say, we set this property to 10 sec. For 
example:
   
   ```
   Pattern topicsPattern = Pattern.compile("persistent://public/default/foo.*");
   pulsarClient
           .newConsumer()
           .subscriptionName("subscription")
           .subscriptionType(SubscriptionType.Shared)
           .topicsPattern(topicsPattern)
           .patternAutoDiscoveryPeriod(10, TimeUnit.SECONDS)
           .receiverQueueSize(1)
           .subscribe();
   ```
   
   2. Create ```producer1``` which sends messages to topic 
```persistent://public/default/foo1```. If ```multi-topic-consumer``` discovers 
this new topic (max after 10 seconds in our case), then messages will be 
consumed by consumer. Example producer:
   ```
   Producer<String> producer1 = pulsarClient
           .newProducer()
           .topic("foo1")
           .create();
   ```
   3. Now, if ```producer1``` sends messages, a ```multi-topic-consumer``` can 
receive them.
   4. Let's **pause** ```multi-topic-consumer``` by invoking ```pause()```. In 
this case consumer is a PatternMultiTopicsConsumerImpl type. That means 
underneath there is a list of internal consumers, where each of them consumes 
single topic (subscribes single topic). When we invoke ```pause()```, then each 
consumer of this list will be paused separately.
   5. Now, when we send messages from ```producer1```, then 
```multi-topic-consumer``` will not consume them. There is an exception, 
because consumer can still receive messages until receiverQueueSize is used up 
even if ```pause()``` was invoked, but after that this is impossible. For this 
case it is irrelevant, we can skip it.
   6. Create ```producer2``` which sends messages to topic 
```persistent://public/default/foo2```. If ```multi-topic-consumer``` discovers 
this new topic ```foo2``` (max after 10 seconds in our case), then messages 
**will be consumed** by consumer **even if consumer is paused**. Underneath new 
internal consumer is created, which subscribes ```foo2```, but this particular 
consumer **is not paused**.
   7. Now we have following state:
   -  ```multi-topic-consumer``` - is paused
   - ```producer1``` sends messages to topic ```foo1``` and 
```multi-topic-consumer``` cannot receive them - it is OK
   - ```producer2``` sends messages to topic ```foo2``` and 
```multi-topic-consumer``` is able to receive them - it is not expected behavior
   
   
   **Expected behavior**
   When new topic will be discovered/subscribed by multi-topic consumer during 
consumer is paused, then this consumer should not be able to receive messages 
from this topic.
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to