This is an automated email from the ASF dual-hosted git repository.

zike pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar-client-go.git


The following commit(s) were added to refs/heads/master by this push:
     new 72aed95a [fix] fix channel deadlock in regexp consumer (#1141)
72aed95a is described below

commit 72aed95ae17baa4aa5ad8db7fb4abbf96f6c1c59
Author: Gonçalo Rodrigues <[email protected]>
AuthorDate: Fri Dec 8 10:53:41 2023 +0100

    [fix] fix channel deadlock in regexp consumer (#1141)
    
    ### Motivation
    
    When using a regexp consumer, there's a race condition between producing 
and consuming new discovered topics. If the discover topic takes too long or 
the auto discovery period is too short, then multiple ticker.C messages may be 
processed in a row which will block on the subcsribe/unsubscribe channels as 
they only have a buffer size of 1. This will block new topics from being 
discovered forever.
    
    ### Modifications
    
    Moved the consumers into their own goroutines and use an unbuffered 
channel. There's multiple ways to go about it but it's good practice to keep 
consumers and producers separate. Consumers are run until the channels they are 
consumed from are closed, which happens when the producer (monitor) returns.
---
 pulsar/consumer_regex.go      | 31 ++++++++-------------
 pulsar/consumer_regex_test.go | 63 +++++++++++++++++++++++++++++++++++++++++++
 2 files changed, 74 insertions(+), 20 deletions(-)

diff --git a/pulsar/consumer_regex.go b/pulsar/consumer_regex.go
index 79e2293b..d36694ef 100644
--- a/pulsar/consumer_regex.go
+++ b/pulsar/consumer_regex.go
@@ -50,8 +50,6 @@ type regexConsumer struct {
 
        consumersLock sync.Mutex
        consumers     map[string]Consumer
-       subscribeCh   chan []string
-       unsubscribeCh chan []string
 
        closeOnce sync.Once
        closeCh   chan struct{}
@@ -75,9 +73,7 @@ func newRegexConsumer(c *client, opts ConsumerOptions, tn 
*internal.TopicName, p
                namespace: tn.Namespace,
                pattern:   pattern,
 
-               consumers:     make(map[string]Consumer),
-               subscribeCh:   make(chan []string, 1),
-               unsubscribeCh: make(chan []string, 1),
+               consumers: make(map[string]Consumer),
 
                closeCh: make(chan struct{}),
 
@@ -163,12 +159,11 @@ func (c *regexConsumer) Ack(msg Message) error {
        return c.AckID(msg.ID())
 }
 
-func (c *regexConsumer) ReconsumeLater(msg Message, delay time.Duration) {
+func (c *regexConsumer) ReconsumeLater(_ Message, _ time.Duration) {
        c.log.Warnf("regexp consumer not support ReconsumeLater yet.")
 }
 
-func (c *regexConsumer) ReconsumeLaterWithCustomProperties(msg Message, 
customProperties map[string]string,
-       delay time.Duration) {
+func (c *regexConsumer) ReconsumeLaterWithCustomProperties(_ Message, _ 
map[string]string, _ time.Duration) {
        c.log.Warnf("regexp consumer not support 
ReconsumeLaterWithCustomProperties yet.")
 }
 
@@ -297,11 +292,11 @@ func (c *regexConsumer) Close() {
        })
 }
 
-func (c *regexConsumer) Seek(msgID MessageID) error {
+func (c *regexConsumer) Seek(_ MessageID) error {
        return newError(SeekFailed, "seek command not allowed for regex 
consumer")
 }
 
-func (c *regexConsumer) SeekByTime(time time.Time) error {
+func (c *regexConsumer) SeekByTime(_ time.Time) error {
        return newError(SeekFailed, "seek command not allowed for regex 
consumer")
 }
 
@@ -329,14 +324,6 @@ func (c *regexConsumer) monitor() {
                        if !c.closed() {
                                c.discover()
                        }
-               case topics := <-c.subscribeCh:
-                       if len(topics) > 0 && !c.closed() {
-                               c.subscribe(topics, c.dlq, c.rlq)
-                       }
-               case topics := <-c.unsubscribeCh:
-                       if len(topics) > 0 && !c.closed() {
-                               c.unsubscribe(topics)
-                       }
                }
        }
 }
@@ -358,8 +345,12 @@ func (c *regexConsumer) discover() {
                }).
                Debug("discover topics")
 
-       c.unsubscribeCh <- staleTopics
-       c.subscribeCh <- newTopics
+       if len(staleTopics) > 0 {
+               c.unsubscribe(staleTopics)
+       }
+       if len(newTopics) > 0 {
+               c.subscribe(newTopics, c.dlq, c.rlq)
+       }
 }
 
 func (c *regexConsumer) knownTopics() []string {
diff --git a/pulsar/consumer_regex_test.go b/pulsar/consumer_regex_test.go
index de946134..3e5f1d61 100644
--- a/pulsar/consumer_regex_test.go
+++ b/pulsar/consumer_regex_test.go
@@ -241,6 +241,7 @@ func runRegexConsumerDiscoverPatternFoo(t *testing.T, c 
Client, namespace string
 func TestRegexConsumer(t *testing.T) {
        t.Run("MatchOneTopic", 
runWithClientNamespace(runRegexConsumerMatchOneTopic))
        t.Run("AddTopic", 
runWithClientNamespace(runRegexConsumerAddMatchingTopic))
+       t.Run("AutoDiscoverTopics", 
runWithClientNamespace(runRegexConsumerAutoDiscoverTopics))
 }
 
 func runRegexConsumerMatchOneTopic(t *testing.T, c Client, namespace string) {
@@ -346,6 +347,68 @@ func runRegexConsumerAddMatchingTopic(t *testing.T, c 
Client, namespace string)
        }
 }
 
+func runRegexConsumerAutoDiscoverTopics(t *testing.T, c Client, namespace 
string) {
+       topicsPattern := fmt.Sprintf("persistent://%s/foo.*", namespace)
+       opts := ConsumerOptions{
+               TopicsPattern:    topicsPattern,
+               SubscriptionName: "regex-sub",
+               // this is purposefully short to test parallelism between 
discover and subscribe calls
+               AutoDiscoveryPeriod: 1 * time.Nanosecond,
+       }
+       consumer, err := c.Subscribe(opts)
+       if err != nil {
+               t.Fatal(err)
+       }
+       defer consumer.Close()
+
+       topicInRegex1 := namespace + "/foo-topic-1"
+       p1, err := c.CreateProducer(ProducerOptions{
+               Topic:           topicInRegex1,
+               DisableBatching: true,
+       })
+       if err != nil {
+               t.Fatal(err)
+       }
+       defer p1.Close()
+
+       topicInRegex2 := namespace + "/foo-topic-2"
+       p2, err := c.CreateProducer(ProducerOptions{
+               Topic:           topicInRegex2,
+               DisableBatching: true,
+       })
+       if err != nil {
+               t.Fatal(err)
+       }
+       defer p2.Close()
+
+       time.Sleep(100 * time.Millisecond)
+
+       err = genMessages(p1, 5, func(idx int) string {
+               return fmt.Sprintf("foo-message-%d", idx)
+       })
+       if err != nil {
+               t.Fatal(err)
+       }
+
+       err = genMessages(p2, 5, func(idx int) string {
+               return fmt.Sprintf("foo-message-%d", idx)
+       })
+       if err != nil {
+               t.Fatal(err)
+       }
+
+       ctx := context.Background()
+       for i := 0; i < 10; i++ {
+               m, err := consumer.Receive(ctx)
+               if err != nil {
+                       t.Errorf("failed to receive message error: %+v", err)
+               } else {
+                       assert.Truef(t, strings.HasPrefix(string(m.Payload()), 
"foo-"),
+                               "message does not start with foo: %s", 
string(m.Payload()))
+               }
+       }
+}
+
 func genMessages(p Producer, num int, msgFn func(idx int) string) error {
        ctx := context.Background()
        for i := 0; i < num; i++ {

Reply via email to