This is an automated email from the ASF dual-hosted git repository.

zike pushed a commit to branch branch-0.12.0
in repository https://gitbox.apache.org/repos/asf/pulsar-client-go.git


The following commit(s) were added to refs/heads/branch-0.12.0 by this push:
     new b0db28ca [fix] Fix DLQ producer name conflicts when multiples 
consumers send messages to DLQ (#1156)
b0db28ca is described below

commit b0db28caf6fb8c2622b8d38bc13c1e52c50e9e07
Author: crossoverJie <[email protected]>
AuthorDate: Mon Jan 15 10:02:23 2024 +0800

    [fix] Fix DLQ producer name conflicts when multiples consumers send 
messages to DLQ (#1156)
    
    ### Motivation
    
    To keep consistent with the Java client.
    Releted PR: https://github.com/apache/pulsar/pull/21890
    
    ### Modifications
    
    Set DLQ producerName `fmt.Sprintf("%s-%s-%s-DLQ", r.topicName, 
r.subscriptionName, r.consumerName)`
    
    (cherry picked from commit 4e138228fe501ec39856afbc5e541e87a143b73f)
---
 pulsar/consumer_impl.go       | 2 +-
 pulsar/consumer_regex_test.go | 6 ++++--
 pulsar/consumer_test.go       | 6 ++++--
 pulsar/dlq_router.go          | 6 ++++--
 pulsar/reader_impl.go         | 2 +-
 5 files changed, 14 insertions(+), 8 deletions(-)

diff --git a/pulsar/consumer_impl.go b/pulsar/consumer_impl.go
index d701ab16..75d839b4 100644
--- a/pulsar/consumer_impl.go
+++ b/pulsar/consumer_impl.go
@@ -167,7 +167,7 @@ func newConsumer(client *client, options ConsumerOptions) 
(Consumer, error) {
                }
        }
 
-       dlq, err := newDlqRouter(client, options.DLQ, options.Topic, 
options.SubscriptionName, client.log)
+       dlq, err := newDlqRouter(client, options.DLQ, options.Topic, 
options.SubscriptionName, options.Name, client.log)
        if err != nil {
                return nil, err
        }
diff --git a/pulsar/consumer_regex_test.go b/pulsar/consumer_regex_test.go
index 3e5f1d61..ebd7e4e1 100644
--- a/pulsar/consumer_regex_test.go
+++ b/pulsar/consumer_regex_test.go
@@ -152,9 +152,10 @@ func runRegexConsumerDiscoverPatternAll(t *testing.T, c 
Client, namespace string
        opts := ConsumerOptions{
                SubscriptionName:    "regex-sub",
                AutoDiscoveryPeriod: 5 * time.Minute,
+               Name:                "regex-consumer",
        }
 
-       dlq, _ := newDlqRouter(c.(*client), nil, tn.Topic, "regex-sub", 
log.DefaultNopLogger())
+       dlq, _ := newDlqRouter(c.(*client), nil, tn.Topic, "regex-sub", 
"regex-consumer", log.DefaultNopLogger())
        rlq, _ := newRetryRouter(c.(*client), nil, false, 
log.DefaultNopLogger())
        consumer, err := newRegexConsumer(c.(*client), opts, tn, pattern, 
make(chan ConsumerMessage, 1), dlq, rlq)
        if err != nil {
@@ -190,9 +191,10 @@ func runRegexConsumerDiscoverPatternFoo(t *testing.T, c 
Client, namespace string
        opts := ConsumerOptions{
                SubscriptionName:    "regex-sub",
                AutoDiscoveryPeriod: 5 * time.Minute,
+               Name:                "regex-consumer",
        }
 
-       dlq, _ := newDlqRouter(c.(*client), nil, tn.Topic, "regex-sub", 
log.DefaultNopLogger())
+       dlq, _ := newDlqRouter(c.(*client), nil, tn.Topic, "regex-sub", 
"regex-consumer", log.DefaultNopLogger())
        rlq, _ := newRetryRouter(c.(*client), nil, false, 
log.DefaultNopLogger())
        consumer, err := newRegexConsumer(c.(*client), opts, tn, pattern, 
make(chan ConsumerMessage, 1), dlq, rlq)
        if err != nil {
diff --git a/pulsar/consumer_test.go b/pulsar/consumer_test.go
index 8b983d0d..df70b0dd 100644
--- a/pulsar/consumer_test.go
+++ b/pulsar/consumer_test.go
@@ -1449,13 +1449,15 @@ func DLQWithProducerOptions(t *testing.T, prodOpt 
*ProducerOptions) {
        if prodOpt != nil {
                dlqPolicy.ProducerOptions = *prodOpt
        }
-       sub := "my-sub"
+       sub, consumerName := "my-sub", "my-consumer"
+
        consumer, err := client.Subscribe(ConsumerOptions{
                Topic:               topic,
                SubscriptionName:    sub,
                NackRedeliveryDelay: 1 * time.Second,
                Type:                Shared,
                DLQ:                 &dlqPolicy,
+               Name:                consumerName,
        })
        assert.Nil(t, err)
        defer consumer.Close()
@@ -1508,7 +1510,7 @@ func DLQWithProducerOptions(t *testing.T, prodOpt 
*ProducerOptions) {
                assert.Equal(t, []byte(expectMsg), msg.Payload())
 
                // check dql produceName
-               assert.Equal(t, msg.ProducerName(), fmt.Sprintf("%s-%s-DLQ", 
topic, sub))
+               assert.Equal(t, msg.ProducerName(), fmt.Sprintf("%s-%s-%s-DLQ", 
topic, sub, consumerName))
 
                // check original messageId
                assert.NotEmpty(t, msg.Properties()[PropertyOriginMessageID])
diff --git a/pulsar/dlq_router.go b/pulsar/dlq_router.go
index 5b9314bd..6be35d74 100644
--- a/pulsar/dlq_router.go
+++ b/pulsar/dlq_router.go
@@ -34,16 +34,18 @@ type dlqRouter struct {
        closeCh          chan interface{}
        topicName        string
        subscriptionName string
+       consumerName     string
        log              log.Logger
 }
 
-func newDlqRouter(client Client, policy *DLQPolicy, topicName, 
subscriptionName string,
+func newDlqRouter(client Client, policy *DLQPolicy, topicName, 
subscriptionName, consumerName string,
        logger log.Logger) (*dlqRouter, error) {
        r := &dlqRouter{
                client:           client,
                policy:           policy,
                topicName:        topicName,
                subscriptionName: subscriptionName,
+               consumerName:     consumerName,
                log:              logger,
        }
 
@@ -159,7 +161,7 @@ func (r *dlqRouter) getProducer(schema Schema) Producer {
                opt.Topic = r.policy.DeadLetterTopic
                opt.Schema = schema
                if opt.Name == "" {
-                       opt.Name = fmt.Sprintf("%s-%s-DLQ", r.topicName, 
r.subscriptionName)
+                       opt.Name = fmt.Sprintf("%s-%s-%s-DLQ", r.topicName, 
r.subscriptionName, r.consumerName)
                }
 
                // the origin code sets to LZ4 compression with no options
diff --git a/pulsar/reader_impl.go b/pulsar/reader_impl.go
index 0999e88f..7b260b88 100644
--- a/pulsar/reader_impl.go
+++ b/pulsar/reader_impl.go
@@ -127,7 +127,7 @@ func newReader(client *client, options ReaderOptions) 
(Reader, error) {
        }
 
        // Provide dummy dlq router with not dlq policy
-       dlq, err := newDlqRouter(client, nil, options.Topic, 
options.SubscriptionName, client.log)
+       dlq, err := newDlqRouter(client, nil, options.Topic, 
options.SubscriptionName, options.Name, client.log)
        if err != nil {
                return nil, err
        }

Reply via email to