This is an automated email from the ASF dual-hosted git repository.
zike pushed a commit to branch branch-0.12.0
in repository https://gitbox.apache.org/repos/asf/pulsar-client-go.git
The following commit(s) were added to refs/heads/branch-0.12.0 by this push:
new b0db28ca [fix] Fix DLQ producer name conflicts when multiples
consumers send messages to DLQ (#1156)
b0db28ca is described below
commit b0db28caf6fb8c2622b8d38bc13c1e52c50e9e07
Author: crossoverJie <[email protected]>
AuthorDate: Mon Jan 15 10:02:23 2024 +0800
[fix] Fix DLQ producer name conflicts when multiples consumers send
messages to DLQ (#1156)
### Motivation
To keep consistent with the Java client.
Releted PR: https://github.com/apache/pulsar/pull/21890
### Modifications
Set DLQ producerName `fmt.Sprintf("%s-%s-%s-DLQ", r.topicName,
r.subscriptionName, r.consumerName)`
(cherry picked from commit 4e138228fe501ec39856afbc5e541e87a143b73f)
---
pulsar/consumer_impl.go | 2 +-
pulsar/consumer_regex_test.go | 6 ++++--
pulsar/consumer_test.go | 6 ++++--
pulsar/dlq_router.go | 6 ++++--
pulsar/reader_impl.go | 2 +-
5 files changed, 14 insertions(+), 8 deletions(-)
diff --git a/pulsar/consumer_impl.go b/pulsar/consumer_impl.go
index d701ab16..75d839b4 100644
--- a/pulsar/consumer_impl.go
+++ b/pulsar/consumer_impl.go
@@ -167,7 +167,7 @@ func newConsumer(client *client, options ConsumerOptions)
(Consumer, error) {
}
}
- dlq, err := newDlqRouter(client, options.DLQ, options.Topic,
options.SubscriptionName, client.log)
+ dlq, err := newDlqRouter(client, options.DLQ, options.Topic,
options.SubscriptionName, options.Name, client.log)
if err != nil {
return nil, err
}
diff --git a/pulsar/consumer_regex_test.go b/pulsar/consumer_regex_test.go
index 3e5f1d61..ebd7e4e1 100644
--- a/pulsar/consumer_regex_test.go
+++ b/pulsar/consumer_regex_test.go
@@ -152,9 +152,10 @@ func runRegexConsumerDiscoverPatternAll(t *testing.T, c
Client, namespace string
opts := ConsumerOptions{
SubscriptionName: "regex-sub",
AutoDiscoveryPeriod: 5 * time.Minute,
+ Name: "regex-consumer",
}
- dlq, _ := newDlqRouter(c.(*client), nil, tn.Topic, "regex-sub",
log.DefaultNopLogger())
+ dlq, _ := newDlqRouter(c.(*client), nil, tn.Topic, "regex-sub",
"regex-consumer", log.DefaultNopLogger())
rlq, _ := newRetryRouter(c.(*client), nil, false,
log.DefaultNopLogger())
consumer, err := newRegexConsumer(c.(*client), opts, tn, pattern,
make(chan ConsumerMessage, 1), dlq, rlq)
if err != nil {
@@ -190,9 +191,10 @@ func runRegexConsumerDiscoverPatternFoo(t *testing.T, c
Client, namespace string
opts := ConsumerOptions{
SubscriptionName: "regex-sub",
AutoDiscoveryPeriod: 5 * time.Minute,
+ Name: "regex-consumer",
}
- dlq, _ := newDlqRouter(c.(*client), nil, tn.Topic, "regex-sub",
log.DefaultNopLogger())
+ dlq, _ := newDlqRouter(c.(*client), nil, tn.Topic, "regex-sub",
"regex-consumer", log.DefaultNopLogger())
rlq, _ := newRetryRouter(c.(*client), nil, false,
log.DefaultNopLogger())
consumer, err := newRegexConsumer(c.(*client), opts, tn, pattern,
make(chan ConsumerMessage, 1), dlq, rlq)
if err != nil {
diff --git a/pulsar/consumer_test.go b/pulsar/consumer_test.go
index 8b983d0d..df70b0dd 100644
--- a/pulsar/consumer_test.go
+++ b/pulsar/consumer_test.go
@@ -1449,13 +1449,15 @@ func DLQWithProducerOptions(t *testing.T, prodOpt
*ProducerOptions) {
if prodOpt != nil {
dlqPolicy.ProducerOptions = *prodOpt
}
- sub := "my-sub"
+ sub, consumerName := "my-sub", "my-consumer"
+
consumer, err := client.Subscribe(ConsumerOptions{
Topic: topic,
SubscriptionName: sub,
NackRedeliveryDelay: 1 * time.Second,
Type: Shared,
DLQ: &dlqPolicy,
+ Name: consumerName,
})
assert.Nil(t, err)
defer consumer.Close()
@@ -1508,7 +1510,7 @@ func DLQWithProducerOptions(t *testing.T, prodOpt
*ProducerOptions) {
assert.Equal(t, []byte(expectMsg), msg.Payload())
// check dql produceName
- assert.Equal(t, msg.ProducerName(), fmt.Sprintf("%s-%s-DLQ",
topic, sub))
+ assert.Equal(t, msg.ProducerName(), fmt.Sprintf("%s-%s-%s-DLQ",
topic, sub, consumerName))
// check original messageId
assert.NotEmpty(t, msg.Properties()[PropertyOriginMessageID])
diff --git a/pulsar/dlq_router.go b/pulsar/dlq_router.go
index 5b9314bd..6be35d74 100644
--- a/pulsar/dlq_router.go
+++ b/pulsar/dlq_router.go
@@ -34,16 +34,18 @@ type dlqRouter struct {
closeCh chan interface{}
topicName string
subscriptionName string
+ consumerName string
log log.Logger
}
-func newDlqRouter(client Client, policy *DLQPolicy, topicName,
subscriptionName string,
+func newDlqRouter(client Client, policy *DLQPolicy, topicName,
subscriptionName, consumerName string,
logger log.Logger) (*dlqRouter, error) {
r := &dlqRouter{
client: client,
policy: policy,
topicName: topicName,
subscriptionName: subscriptionName,
+ consumerName: consumerName,
log: logger,
}
@@ -159,7 +161,7 @@ func (r *dlqRouter) getProducer(schema Schema) Producer {
opt.Topic = r.policy.DeadLetterTopic
opt.Schema = schema
if opt.Name == "" {
- opt.Name = fmt.Sprintf("%s-%s-DLQ", r.topicName,
r.subscriptionName)
+ opt.Name = fmt.Sprintf("%s-%s-%s-DLQ", r.topicName,
r.subscriptionName, r.consumerName)
}
// the origin code sets to LZ4 compression with no options
diff --git a/pulsar/reader_impl.go b/pulsar/reader_impl.go
index 0999e88f..7b260b88 100644
--- a/pulsar/reader_impl.go
+++ b/pulsar/reader_impl.go
@@ -127,7 +127,7 @@ func newReader(client *client, options ReaderOptions)
(Reader, error) {
}
// Provide dummy dlq router with not dlq policy
- dlq, err := newDlqRouter(client, nil, options.Topic,
options.SubscriptionName, client.log)
+ dlq, err := newDlqRouter(client, nil, options.Topic,
options.SubscriptionName, options.Name, client.log)
if err != nil {
return nil, err
}