RobertIndie commented on code in PR #1141:
URL: https://github.com/apache/pulsar-client-go/pull/1141#discussion_r1417203425


##########
pulsar/consumer_regex.go:
##########
@@ -320,6 +319,25 @@ func (c *regexConsumer) closed() bool {
 }
 
 func (c *regexConsumer) monitor() {
+       defer close(c.subscribeCh)
+       defer close(c.unsubscribeCh)
+
+       go func() {
+               for topics := range c.subscribeCh {
+                       if len(topics) > 0 && !c.closed() {
+                               c.subscribe(topics, c.dlq, c.rlq)
+                       }
+               }
+       }()
+
+       go func() {
+               for topics := range c.unsubscribeCh {
+                       if len(topics) > 0 && !c.closed() {
+                               c.unsubscribe(topics)
+                       }
+               }
+       }()

Review Comment:
   What about combining these two channels into one single channel?
   Otherwise, under poor network conditions, where the process of subscribing 
or unsubscribing may take an long period, scenarios like the following might 
occur:
   
   - A consumer is created.
   - Topic A is created.
   - The consumer discovers the topic and starts to subscribe to Topic A.
   - Topic A is deleted.
   - The consumer discovers the topic again, but the knownTopics list remains 
unchanged. Thus, the consumer doesn't unsubscribe from Topic A.
   - Topic A is subscribed to, but this action fails.
   
   This chain of events leads to the final outcome: Topic A should be 
unsubscribed from, but it isn't.
   
   Another scenario that could result in data loss:
   - A consumer is created and subscribes to Topic A.
   - Topic A is deleted.
   - The consumer discovers the topic and begins to unsubscribe from Topic A.
   - Topic A is recreated.
   - The consumer discovers the topic again, but the knownTopics list remains 
unchanged. As a result, the consumer doesn't resubscribe to Topic A.
   - Topic A remains unsubscribed.
   The final outcome in this case: Topic A is not subscribed to, and any 
subsequent messages in Topic A will be lost.
   
   Merging the channels could simplify this logic and make it more robust. It 
would be more effective to have the monitoring of this topics list occur on a 
single thread. This approach is guaranteed in the Java client, preventing the 
aforementioned issues from occurring.
   
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to