This is an automated email from the ASF dual-hosted git repository.
crossoverjie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar-client-go.git
The following commit(s) were added to refs/heads/master by this push:
new 279e1d74 [Issue 1297][consumer] Fix DLQ producer name conflicts when
there are same name consumers (#1314)
279e1d74 is described below
commit 279e1d74c216465bccdde492f392c9b5d52e4546
Author: zhou zhuohan <[email protected]>
AuthorDate: Mon Dec 9 16:04:06 2024 +0800
[Issue 1297][consumer] Fix DLQ producer name conflicts when there are same
name consumers (#1314)
---
pulsar/consumer_test.go | 6 ++++--
pulsar/dlq_router.go | 2 +-
2 files changed, 5 insertions(+), 3 deletions(-)
diff --git a/pulsar/consumer_test.go b/pulsar/consumer_test.go
index 07b34ee5..eae8dd3c 100644
--- a/pulsar/consumer_test.go
+++ b/pulsar/consumer_test.go
@@ -24,6 +24,7 @@ import (
"log"
"net/http"
"os"
+ "regexp"
"strconv"
"sync"
"sync/atomic"
@@ -1521,8 +1522,9 @@ func DLQWithProducerOptions(t *testing.T, prodOpt
*ProducerOptions) {
expectMsg := fmt.Sprintf("hello-%d", expectedMsgIdx)
assert.Equal(t, []byte(expectMsg), msg.Payload())
- // check dql produceName
- assert.Equal(t, msg.ProducerName(), fmt.Sprintf("%s-%s-%s-DLQ",
topic, sub, consumerName))
+ // check dlq produceName
+ regex :=
regexp.MustCompile(fmt.Sprintf("%s-%s-%s-[a-z]{5}-DLQ", topic, sub,
consumerName))
+ assert.True(t, regex.MatchString(msg.ProducerName()))
// check original messageId
assert.NotEmpty(t, msg.Properties()[PropertyOriginMessageID])
diff --git a/pulsar/dlq_router.go b/pulsar/dlq_router.go
index 7d908ff6..c65f01aa 100644
--- a/pulsar/dlq_router.go
+++ b/pulsar/dlq_router.go
@@ -172,7 +172,7 @@ func (r *dlqRouter) getProducer(schema Schema) Producer {
opt.Topic = r.policy.DeadLetterTopic
opt.Schema = schema
if opt.Name == "" {
- opt.Name = fmt.Sprintf("%s-%s-%s-DLQ", r.topicName,
r.subscriptionName, r.consumerName)
+ opt.Name = fmt.Sprintf("%s-%s-%s-%s-DLQ", r.topicName,
r.subscriptionName, r.consumerName, generateRandomName())
}
opt.initialSubscriptionName = r.policy.InitialSubscriptionName