This is an automated email from the ASF dual-hosted git repository.
zike pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar-client-go.git
The following commit(s) were added to refs/heads/master by this push:
new dad98f16 [Improve] Add InitialSubscriptionName for DLQPolicy (#1264)
dad98f16 is described below
commit dad98f16886e319291b93089a47e70835f08de0f
Author: crossoverJie <[email protected]>
AuthorDate: Thu Aug 1 12:04:24 2024 +0800
[Improve] Add InitialSubscriptionName for DLQPolicy (#1264)
Fixes #1239
### Modifications
Add `InitialSubscriptionName` for DLQPolicy.
---
pulsar/consumer.go | 6 ++++
pulsar/consumer_test.go | 85 ++++++++++++++++++++++++++++++++++++++++++++
pulsar/dlq_router.go | 1 +
pulsar/producer.go | 6 ++++
pulsar/producer_partition.go | 1 +
5 files changed, 99 insertions(+)
diff --git a/pulsar/consumer.go b/pulsar/consumer.go
index 31f89a54..bf2eafbf 100644
--- a/pulsar/consumer.go
+++ b/pulsar/consumer.go
@@ -78,6 +78,12 @@ type DLQPolicy struct {
// RetryLetterTopic specifies the name of the topic where the retry
messages will be sent.
RetryLetterTopic string
+
+ // InitialSubscriptionName Name of the initial subscription name of the
dead letter topic.
+ // If this field is not set, the initial subscription for the dead
letter topic will not be created.
+ // If this field is set but the broker's
`allowAutoSubscriptionCreation` is disabled, the DLQ producer
+ // will fail to be created.
+ InitialSubscriptionName string
}
// AckGroupingOptions controls how to group ACK requests
diff --git a/pulsar/consumer_test.go b/pulsar/consumer_test.go
index 2ecdf8eb..04439cbc 100644
--- a/pulsar/consumer_test.go
+++ b/pulsar/consumer_test.go
@@ -1539,6 +1539,91 @@ func DLQWithProducerOptions(t *testing.T, prodOpt
*ProducerOptions) {
assert.Error(t, err)
assert.Nil(t, msg)
}
+func TestDeadLetterTopicWithInitialSubscription(t *testing.T) {
+ client, err := NewClient(ClientOptions{
+ URL: lookupURL,
+ })
+
+ assert.Nil(t, err)
+ defer client.Close()
+
+ topic := "persistent://public/default/" + newTopicName()
+ dlqSub, sub, consumerName := "init-sub", "my-sub", "my-consumer"
+ dlqTopic := fmt.Sprintf("%s-%s-DLQ", topic, sub)
+ ctx := context.Background()
+
+ // create consumer
+ maxRedeliveryCount, sendMessages := 1, 100
+
+ consumer, err := client.Subscribe(ConsumerOptions{
+ Topic: topic,
+ SubscriptionName: sub,
+ NackRedeliveryDelay: 1 * time.Second,
+ Type: Shared,
+ DLQ: &DLQPolicy{
+ MaxDeliveries: uint32(maxRedeliveryCount),
+ DeadLetterTopic: dlqTopic,
+ InitialSubscriptionName: dlqSub,
+ },
+ Name: consumerName,
+ ReceiverQueueSize: sendMessages,
+ })
+ assert.Nil(t, err)
+ defer consumer.Close()
+
+ // create producer
+ producer, err := client.CreateProducer(ProducerOptions{
+ Topic: topic,
+ })
+ assert.Nil(t, err)
+ defer producer.Close()
+
+ // send messages
+ for i := 0; i < sendMessages; i++ {
+ if _, err := producer.Send(ctx, &ProducerMessage{
+ Payload: []byte(fmt.Sprintf("hello-%d", i)),
+ }); err != nil {
+ log.Fatal(err)
+ }
+ }
+
+ // nack all messages
+ for i := 0; i < sendMessages*(maxRedeliveryCount+1); i++ {
+ ctx, canc := context.WithTimeout(context.Background(),
3*time.Second)
+ defer canc()
+ msg, _ := consumer.Receive(ctx)
+ if msg == nil {
+ break
+ }
+ consumer.Nack(msg)
+ }
+
+ // create dlq consumer
+ dlqConsumer, err := client.Subscribe(ConsumerOptions{
+ Topic: dlqTopic,
+ SubscriptionName: dlqSub,
+ })
+ assert.Nil(t, err)
+ defer dlqConsumer.Close()
+
+ for i := 0; i < sendMessages; i++ {
+ ctx, canc := context.WithTimeout(context.Background(),
100*time.Millisecond)
+ defer canc()
+ msg, err := dlqConsumer.Receive(ctx)
+ assert.Nil(t, err)
+ assert.NotNil(t, msg)
+ err = dlqConsumer.Ack(msg)
+ assert.Nil(t, err)
+ }
+
+ // No more messages on the DLQ
+ ctx, canc := context.WithTimeout(context.Background(),
100*time.Millisecond)
+ defer canc()
+ msg, err := dlqConsumer.Receive(ctx)
+ assert.Error(t, err)
+ assert.Nil(t, msg)
+
+}
func TestDLQMultiTopics(t *testing.T) {
client, err := NewClient(ClientOptions{
diff --git a/pulsar/dlq_router.go b/pulsar/dlq_router.go
index 6be35d74..647c022d 100644
--- a/pulsar/dlq_router.go
+++ b/pulsar/dlq_router.go
@@ -163,6 +163,7 @@ func (r *dlqRouter) getProducer(schema Schema) Producer {
if opt.Name == "" {
opt.Name = fmt.Sprintf("%s-%s-%s-DLQ", r.topicName,
r.subscriptionName, r.consumerName)
}
+ opt.initialSubscriptionName = r.policy.InitialSubscriptionName
// the origin code sets to LZ4 compression with no options
// so the new design allows compression type to be overwritten
but still set lz4 by default
diff --git a/pulsar/producer.go b/pulsar/producer.go
index f8013a16..0ae51bd4 100644
--- a/pulsar/producer.go
+++ b/pulsar/producer.go
@@ -207,6 +207,12 @@ type ProducerOptions struct {
// - ProducerAccessModeShared
// - ProducerAccessModeExclusive
ProducerAccessMode
+
+ // initialSubscriptionName Name of the initial subscription name of the
dead letter topic.
+ // If this field is not set, the initial subscription for the dead
letter topic will not be created.
+ // If this field is set but the broker's
`allowAutoSubscriptionCreation` is disabled, the DLQ producer
+ // will fail to be created.
+ initialSubscriptionName string
}
// Producer is used to publish messages on a topic
diff --git a/pulsar/producer_partition.go b/pulsar/producer_partition.go
index f5fd493b..1677c570 100755
--- a/pulsar/producer_partition.go
+++ b/pulsar/producer_partition.go
@@ -273,6 +273,7 @@ func (p *partitionProducer) grabCnx(assignedBrokerURL
string) error {
Epoch:
proto.Uint64(atomic.LoadUint64(&p.epoch)),
UserProvidedProducerName:
proto.Bool(p.userProvidedProducerName),
ProducerAccessMode:
toProtoProducerAccessMode(p.options.ProducerAccessMode).Enum(),
+ InitialSubscriptionName:
proto.String(p.options.initialSubscriptionName),
}
if p.topicEpoch != nil {