This is an automated email from the ASF dual-hosted git repository.

zike pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar-client-go.git


The following commit(s) were added to refs/heads/master by this push:
     new dad98f16 [Improve] Add InitialSubscriptionName for DLQPolicy (#1264)
dad98f16 is described below

commit dad98f16886e319291b93089a47e70835f08de0f
Author: crossoverJie <[email protected]>
AuthorDate: Thu Aug 1 12:04:24 2024 +0800

    [Improve] Add InitialSubscriptionName for DLQPolicy (#1264)
    
    Fixes #1239
    
    ### Modifications
    Add `InitialSubscriptionName` for DLQPolicy.
---
 pulsar/consumer.go           |  6 ++++
 pulsar/consumer_test.go      | 85 ++++++++++++++++++++++++++++++++++++++++++++
 pulsar/dlq_router.go         |  1 +
 pulsar/producer.go           |  6 ++++
 pulsar/producer_partition.go |  1 +
 5 files changed, 99 insertions(+)

diff --git a/pulsar/consumer.go b/pulsar/consumer.go
index 31f89a54..bf2eafbf 100644
--- a/pulsar/consumer.go
+++ b/pulsar/consumer.go
@@ -78,6 +78,12 @@ type DLQPolicy struct {
 
        // RetryLetterTopic specifies the name of the topic where the retry 
messages will be sent.
        RetryLetterTopic string
+
+       // InitialSubscriptionName Name of the initial subscription name of the 
dead letter topic.
+       // If this field is not set, the initial subscription for the dead 
letter topic will not be created.
+       // If this field is set but the broker's 
`allowAutoSubscriptionCreation` is disabled, the DLQ producer
+       // will fail to be created.
+       InitialSubscriptionName string
 }
 
 // AckGroupingOptions controls how to group ACK requests
diff --git a/pulsar/consumer_test.go b/pulsar/consumer_test.go
index 2ecdf8eb..04439cbc 100644
--- a/pulsar/consumer_test.go
+++ b/pulsar/consumer_test.go
@@ -1539,6 +1539,91 @@ func DLQWithProducerOptions(t *testing.T, prodOpt 
*ProducerOptions) {
        assert.Error(t, err)
        assert.Nil(t, msg)
 }
+func TestDeadLetterTopicWithInitialSubscription(t *testing.T) {
+       client, err := NewClient(ClientOptions{
+               URL: lookupURL,
+       })
+
+       assert.Nil(t, err)
+       defer client.Close()
+
+       topic := "persistent://public/default/" + newTopicName()
+       dlqSub, sub, consumerName := "init-sub", "my-sub", "my-consumer"
+       dlqTopic := fmt.Sprintf("%s-%s-DLQ", topic, sub)
+       ctx := context.Background()
+
+       // create consumer
+       maxRedeliveryCount, sendMessages := 1, 100
+
+       consumer, err := client.Subscribe(ConsumerOptions{
+               Topic:               topic,
+               SubscriptionName:    sub,
+               NackRedeliveryDelay: 1 * time.Second,
+               Type:                Shared,
+               DLQ: &DLQPolicy{
+                       MaxDeliveries:           uint32(maxRedeliveryCount),
+                       DeadLetterTopic:         dlqTopic,
+                       InitialSubscriptionName: dlqSub,
+               },
+               Name:              consumerName,
+               ReceiverQueueSize: sendMessages,
+       })
+       assert.Nil(t, err)
+       defer consumer.Close()
+
+       // create producer
+       producer, err := client.CreateProducer(ProducerOptions{
+               Topic: topic,
+       })
+       assert.Nil(t, err)
+       defer producer.Close()
+
+       // send messages
+       for i := 0; i < sendMessages; i++ {
+               if _, err := producer.Send(ctx, &ProducerMessage{
+                       Payload: []byte(fmt.Sprintf("hello-%d", i)),
+               }); err != nil {
+                       log.Fatal(err)
+               }
+       }
+
+       // nack all messages
+       for i := 0; i < sendMessages*(maxRedeliveryCount+1); i++ {
+               ctx, canc := context.WithTimeout(context.Background(), 
3*time.Second)
+               defer canc()
+               msg, _ := consumer.Receive(ctx)
+               if msg == nil {
+                       break
+               }
+               consumer.Nack(msg)
+       }
+
+       // create dlq consumer
+       dlqConsumer, err := client.Subscribe(ConsumerOptions{
+               Topic:            dlqTopic,
+               SubscriptionName: dlqSub,
+       })
+       assert.Nil(t, err)
+       defer dlqConsumer.Close()
+
+       for i := 0; i < sendMessages; i++ {
+               ctx, canc := context.WithTimeout(context.Background(), 
100*time.Millisecond)
+               defer canc()
+               msg, err := dlqConsumer.Receive(ctx)
+               assert.Nil(t, err)
+               assert.NotNil(t, msg)
+               err = dlqConsumer.Ack(msg)
+               assert.Nil(t, err)
+       }
+
+       // No more messages on the DLQ
+       ctx, canc := context.WithTimeout(context.Background(), 
100*time.Millisecond)
+       defer canc()
+       msg, err := dlqConsumer.Receive(ctx)
+       assert.Error(t, err)
+       assert.Nil(t, msg)
+
+}
 
 func TestDLQMultiTopics(t *testing.T) {
        client, err := NewClient(ClientOptions{
diff --git a/pulsar/dlq_router.go b/pulsar/dlq_router.go
index 6be35d74..647c022d 100644
--- a/pulsar/dlq_router.go
+++ b/pulsar/dlq_router.go
@@ -163,6 +163,7 @@ func (r *dlqRouter) getProducer(schema Schema) Producer {
                if opt.Name == "" {
                        opt.Name = fmt.Sprintf("%s-%s-%s-DLQ", r.topicName, 
r.subscriptionName, r.consumerName)
                }
+               opt.initialSubscriptionName = r.policy.InitialSubscriptionName
 
                // the origin code sets to LZ4 compression with no options
                // so the new design allows compression type to be overwritten 
but still set lz4 by default
diff --git a/pulsar/producer.go b/pulsar/producer.go
index f8013a16..0ae51bd4 100644
--- a/pulsar/producer.go
+++ b/pulsar/producer.go
@@ -207,6 +207,12 @@ type ProducerOptions struct {
        // - ProducerAccessModeShared
        // - ProducerAccessModeExclusive
        ProducerAccessMode
+
+       // initialSubscriptionName Name of the initial subscription name of the 
dead letter topic.
+       // If this field is not set, the initial subscription for the dead 
letter topic will not be created.
+       // If this field is set but the broker's 
`allowAutoSubscriptionCreation` is disabled, the DLQ producer
+       // will fail to be created.
+       initialSubscriptionName string
 }
 
 // Producer is used to publish messages on a topic
diff --git a/pulsar/producer_partition.go b/pulsar/producer_partition.go
index f5fd493b..1677c570 100755
--- a/pulsar/producer_partition.go
+++ b/pulsar/producer_partition.go
@@ -273,6 +273,7 @@ func (p *partitionProducer) grabCnx(assignedBrokerURL 
string) error {
                Epoch:                    
proto.Uint64(atomic.LoadUint64(&p.epoch)),
                UserProvidedProducerName: 
proto.Bool(p.userProvidedProducerName),
                ProducerAccessMode:       
toProtoProducerAccessMode(p.options.ProducerAccessMode).Enum(),
+               InitialSubscriptionName:  
proto.String(p.options.initialSubscriptionName),
        }
 
        if p.topicEpoch != nil {

Reply via email to