This is an automated email from the ASF dual-hosted git repository.

mmerli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar-client-go.git


The following commit(s) were added to refs/heads/master by this push:
     new 1e9d4d5  Added integrration test for acks and nacks (#97)
1e9d4d5 is described below

commit 1e9d4d59eeaea8b15256b8ac1344ecbd297e296d
Author: Matteo Merli <[email protected]>
AuthorDate: Wed Nov 13 20:41:58 2019 -0800

    Added integrration test for acks and nacks (#97)
---
 pulsar/consumer.go      |   2 +-
 pulsar/consumer_impl.go |   6 +--
 pulsar/consumer_test.go | 128 ++++++++++++++++++++++++++++++++++++++++++++++++
 3 files changed, 132 insertions(+), 4 deletions(-)

diff --git a/pulsar/consumer.go b/pulsar/consumer.go
index 961611e..9fec475 100644
--- a/pulsar/consumer.go
+++ b/pulsar/consumer.go
@@ -109,7 +109,7 @@ type ConsumerOptions struct {
 
        // The delay after which to redeliver the messages that failed to be
        // processed. Default is 1min. (See `Consumer.Nack()`)
-       NackRedeliveryDelay *time.Duration
+       NackRedeliveryDelay time.Duration
 
        // Set the consumer name.
        Name string
diff --git a/pulsar/consumer_impl.go b/pulsar/consumer_impl.go
index 0a628b7..216ad0d 100644
--- a/pulsar/consumer_impl.go
+++ b/pulsar/consumer_impl.go
@@ -121,10 +121,10 @@ func topicSubscribe(client *client, options 
ConsumerOptions, topic string,
                        defer wg.Done()
 
                        var nackRedeliveryDelay time.Duration
-                       if options.NackRedeliveryDelay == nil {
+                       if options.NackRedeliveryDelay == 0 {
                                nackRedeliveryDelay = defaultNackRedeliveryDelay
                        } else {
-                               nackRedeliveryDelay = 
*options.NackRedeliveryDelay
+                               nackRedeliveryDelay = 
options.NackRedeliveryDelay
                        }
                        opts := &partitionConsumerOpts{
                                topic:               pt,
@@ -232,7 +232,7 @@ func (c *consumer) AckID(msgID MessageID) {
 }
 
 func (c *consumer) Nack(msg Message) {
-       c.AckID(msg.ID())
+       c.NackID(msg.ID())
 }
 
 func (c *consumer) NackID(msgID MessageID) {
diff --git a/pulsar/consumer_test.go b/pulsar/consumer_test.go
index c45b932..cc40d22 100644
--- a/pulsar/consumer_test.go
+++ b/pulsar/consumer_test.go
@@ -547,3 +547,131 @@ func TestConsumerFlow(t *testing.T) {
                assert.Equal(t, fmt.Sprintf("msg-content-%d", msgNum), 
string(msg.Payload()))
        }
 }
+
+func TestConsumerAck(t *testing.T) {
+       client, err := NewClient(ClientOptions{
+               URL: lookupURL,
+       })
+
+       assert.Nil(t, err)
+       defer client.Close()
+
+       topicName := newTopicName()
+       ctx := context.Background()
+
+       producer, err := client.CreateProducer(ProducerOptions{
+               Topic: topicName,
+       })
+       assert.Nil(t, err)
+       defer producer.Close()
+
+       consumer, err := client.Subscribe(ConsumerOptions{
+               Topic:            topicName,
+               SubscriptionName: "sub-1",
+               Type:             Shared,
+       })
+       assert.Nil(t, err)
+
+       const N = 100
+
+       for i := 0; i < N; i++ {
+               if err := producer.Send(ctx, &ProducerMessage{
+                       Payload: []byte(fmt.Sprintf("msg-content-%d", i)),
+               }); err != nil {
+                       t.Fatal(err)
+               }
+       }
+
+       for i := 0; i < N; i++ {
+               msg, err := consumer.Receive(ctx)
+               assert.Nil(t, err)
+               assert.Equal(t, fmt.Sprintf("msg-content-%d", i), 
string(msg.Payload()))
+
+               if i < N/2 {
+                       // Only acks the first half of messages
+                       consumer.Ack(msg)
+               }
+       }
+
+       err = consumer.Close()
+       assert.Nil(t, err)
+
+       // Subscribe again
+       consumer, err = client.Subscribe(ConsumerOptions{
+               Topic:            topicName,
+               SubscriptionName: "sub-1",
+               Type:             Shared,
+       })
+       assert.Nil(t, err)
+       defer consumer.Close()
+
+       // We should only receive the 2nd half of messages
+       for i := N / 2; i < N; i++ {
+               msg, err := consumer.Receive(ctx)
+               assert.Nil(t, err)
+               assert.Equal(t, fmt.Sprintf("msg-content-%d", i), 
string(msg.Payload()))
+
+               consumer.Ack(msg)
+       }
+}
+
+func TestConsumerNack(t *testing.T) {
+       client, err := NewClient(ClientOptions{
+               URL: lookupURL,
+       })
+
+       assert.Nil(t, err)
+       defer client.Close()
+
+       topicName := newTopicName()
+       ctx := context.Background()
+
+       producer, err := client.CreateProducer(ProducerOptions{
+               Topic: topicName,
+       })
+       assert.Nil(t, err)
+       defer producer.Close()
+
+       consumer, err := client.Subscribe(ConsumerOptions{
+               Topic:               topicName,
+               SubscriptionName:    "sub-1",
+               Type:                Shared,
+               NackRedeliveryDelay: 1 * time.Second,
+       })
+       assert.Nil(t, err)
+
+       const N = 100
+
+       for i := 0; i < N; i++ {
+               if err := producer.Send(ctx, &ProducerMessage{
+                       Payload: []byte(fmt.Sprintf("msg-content-%d", i)),
+               }); err != nil {
+                       t.Fatal(err)
+               }
+       }
+
+       for i := 0; i < N; i++ {
+               msg, err := consumer.Receive(ctx)
+               assert.Nil(t, err)
+               assert.Equal(t, fmt.Sprintf("msg-content-%d", i), 
string(msg.Payload()))
+
+               if i%2 == 0 {
+                       // Only acks even messages
+                       consumer.Ack(msg)
+               } else {
+                       // Fails to process odd messages
+                       consumer.Nack(msg)
+               }
+       }
+
+       // Failed messages should be resent
+
+       // We should only receive the odd messages
+       for i := 1; i < N; i += 2 {
+               msg, err := consumer.Receive(ctx)
+               assert.Nil(t, err)
+               assert.Equal(t, fmt.Sprintf("msg-content-%d", i), 
string(msg.Payload()))
+
+               consumer.Ack(msg)
+       }
+}

Reply via email to