This is an automated email from the ASF dual-hosted git repository.
zike pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar-client-go.git
The following commit(s) were added to refs/heads/master by this push:
new 72aed95a [fix] fix channel deadlock in regexp consumer (#1141)
72aed95a is described below
commit 72aed95ae17baa4aa5ad8db7fb4abbf96f6c1c59
Author: Gonçalo Rodrigues <[email protected]>
AuthorDate: Fri Dec 8 10:53:41 2023 +0100
[fix] fix channel deadlock in regexp consumer (#1141)
### Motivation
When using a regexp consumer, there's a race condition between producing
and consuming new discovered topics. If the discover topic takes too long or
the auto discovery period is too short, then multiple ticker.C messages may be
processed in a row which will block on the subcsribe/unsubscribe channels as
they only have a buffer size of 1. This will block new topics from being
discovered forever.
### Modifications
Moved the consumers into their own goroutines and use an unbuffered
channel. There's multiple ways to go about it but it's good practice to keep
consumers and producers separate. Consumers are run until the channels they are
consumed from are closed, which happens when the producer (monitor) returns.
---
pulsar/consumer_regex.go | 31 ++++++++-------------
pulsar/consumer_regex_test.go | 63 +++++++++++++++++++++++++++++++++++++++++++
2 files changed, 74 insertions(+), 20 deletions(-)
diff --git a/pulsar/consumer_regex.go b/pulsar/consumer_regex.go
index 79e2293b..d36694ef 100644
--- a/pulsar/consumer_regex.go
+++ b/pulsar/consumer_regex.go
@@ -50,8 +50,6 @@ type regexConsumer struct {
consumersLock sync.Mutex
consumers map[string]Consumer
- subscribeCh chan []string
- unsubscribeCh chan []string
closeOnce sync.Once
closeCh chan struct{}
@@ -75,9 +73,7 @@ func newRegexConsumer(c *client, opts ConsumerOptions, tn
*internal.TopicName, p
namespace: tn.Namespace,
pattern: pattern,
- consumers: make(map[string]Consumer),
- subscribeCh: make(chan []string, 1),
- unsubscribeCh: make(chan []string, 1),
+ consumers: make(map[string]Consumer),
closeCh: make(chan struct{}),
@@ -163,12 +159,11 @@ func (c *regexConsumer) Ack(msg Message) error {
return c.AckID(msg.ID())
}
-func (c *regexConsumer) ReconsumeLater(msg Message, delay time.Duration) {
+func (c *regexConsumer) ReconsumeLater(_ Message, _ time.Duration) {
c.log.Warnf("regexp consumer not support ReconsumeLater yet.")
}
-func (c *regexConsumer) ReconsumeLaterWithCustomProperties(msg Message,
customProperties map[string]string,
- delay time.Duration) {
+func (c *regexConsumer) ReconsumeLaterWithCustomProperties(_ Message, _
map[string]string, _ time.Duration) {
c.log.Warnf("regexp consumer not support
ReconsumeLaterWithCustomProperties yet.")
}
@@ -297,11 +292,11 @@ func (c *regexConsumer) Close() {
})
}
-func (c *regexConsumer) Seek(msgID MessageID) error {
+func (c *regexConsumer) Seek(_ MessageID) error {
return newError(SeekFailed, "seek command not allowed for regex
consumer")
}
-func (c *regexConsumer) SeekByTime(time time.Time) error {
+func (c *regexConsumer) SeekByTime(_ time.Time) error {
return newError(SeekFailed, "seek command not allowed for regex
consumer")
}
@@ -329,14 +324,6 @@ func (c *regexConsumer) monitor() {
if !c.closed() {
c.discover()
}
- case topics := <-c.subscribeCh:
- if len(topics) > 0 && !c.closed() {
- c.subscribe(topics, c.dlq, c.rlq)
- }
- case topics := <-c.unsubscribeCh:
- if len(topics) > 0 && !c.closed() {
- c.unsubscribe(topics)
- }
}
}
}
@@ -358,8 +345,12 @@ func (c *regexConsumer) discover() {
}).
Debug("discover topics")
- c.unsubscribeCh <- staleTopics
- c.subscribeCh <- newTopics
+ if len(staleTopics) > 0 {
+ c.unsubscribe(staleTopics)
+ }
+ if len(newTopics) > 0 {
+ c.subscribe(newTopics, c.dlq, c.rlq)
+ }
}
func (c *regexConsumer) knownTopics() []string {
diff --git a/pulsar/consumer_regex_test.go b/pulsar/consumer_regex_test.go
index de946134..3e5f1d61 100644
--- a/pulsar/consumer_regex_test.go
+++ b/pulsar/consumer_regex_test.go
@@ -241,6 +241,7 @@ func runRegexConsumerDiscoverPatternFoo(t *testing.T, c
Client, namespace string
func TestRegexConsumer(t *testing.T) {
t.Run("MatchOneTopic",
runWithClientNamespace(runRegexConsumerMatchOneTopic))
t.Run("AddTopic",
runWithClientNamespace(runRegexConsumerAddMatchingTopic))
+ t.Run("AutoDiscoverTopics",
runWithClientNamespace(runRegexConsumerAutoDiscoverTopics))
}
func runRegexConsumerMatchOneTopic(t *testing.T, c Client, namespace string) {
@@ -346,6 +347,68 @@ func runRegexConsumerAddMatchingTopic(t *testing.T, c
Client, namespace string)
}
}
+func runRegexConsumerAutoDiscoverTopics(t *testing.T, c Client, namespace
string) {
+ topicsPattern := fmt.Sprintf("persistent://%s/foo.*", namespace)
+ opts := ConsumerOptions{
+ TopicsPattern: topicsPattern,
+ SubscriptionName: "regex-sub",
+ // this is purposefully short to test parallelism between
discover and subscribe calls
+ AutoDiscoveryPeriod: 1 * time.Nanosecond,
+ }
+ consumer, err := c.Subscribe(opts)
+ if err != nil {
+ t.Fatal(err)
+ }
+ defer consumer.Close()
+
+ topicInRegex1 := namespace + "/foo-topic-1"
+ p1, err := c.CreateProducer(ProducerOptions{
+ Topic: topicInRegex1,
+ DisableBatching: true,
+ })
+ if err != nil {
+ t.Fatal(err)
+ }
+ defer p1.Close()
+
+ topicInRegex2 := namespace + "/foo-topic-2"
+ p2, err := c.CreateProducer(ProducerOptions{
+ Topic: topicInRegex2,
+ DisableBatching: true,
+ })
+ if err != nil {
+ t.Fatal(err)
+ }
+ defer p2.Close()
+
+ time.Sleep(100 * time.Millisecond)
+
+ err = genMessages(p1, 5, func(idx int) string {
+ return fmt.Sprintf("foo-message-%d", idx)
+ })
+ if err != nil {
+ t.Fatal(err)
+ }
+
+ err = genMessages(p2, 5, func(idx int) string {
+ return fmt.Sprintf("foo-message-%d", idx)
+ })
+ if err != nil {
+ t.Fatal(err)
+ }
+
+ ctx := context.Background()
+ for i := 0; i < 10; i++ {
+ m, err := consumer.Receive(ctx)
+ if err != nil {
+ t.Errorf("failed to receive message error: %+v", err)
+ } else {
+ assert.Truef(t, strings.HasPrefix(string(m.Payload()),
"foo-"),
+ "message does not start with foo: %s",
string(m.Payload()))
+ }
+ }
+}
+
func genMessages(p Producer, num int, msgFn func(idx int) string) error {
ctx := context.Background()
for i := 0; i < num; i++ {