This is an automated email from the ASF dual-hosted git repository.
mmerli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar-client-go.git
The following commit(s) were added to refs/heads/master by this push:
new 1e9d4d5 Added integrration test for acks and nacks (#97)
1e9d4d5 is described below
commit 1e9d4d59eeaea8b15256b8ac1344ecbd297e296d
Author: Matteo Merli <[email protected]>
AuthorDate: Wed Nov 13 20:41:58 2019 -0800
Added integrration test for acks and nacks (#97)
---
pulsar/consumer.go | 2 +-
pulsar/consumer_impl.go | 6 +--
pulsar/consumer_test.go | 128 ++++++++++++++++++++++++++++++++++++++++++++++++
3 files changed, 132 insertions(+), 4 deletions(-)
diff --git a/pulsar/consumer.go b/pulsar/consumer.go
index 961611e..9fec475 100644
--- a/pulsar/consumer.go
+++ b/pulsar/consumer.go
@@ -109,7 +109,7 @@ type ConsumerOptions struct {
// The delay after which to redeliver the messages that failed to be
// processed. Default is 1min. (See `Consumer.Nack()`)
- NackRedeliveryDelay *time.Duration
+ NackRedeliveryDelay time.Duration
// Set the consumer name.
Name string
diff --git a/pulsar/consumer_impl.go b/pulsar/consumer_impl.go
index 0a628b7..216ad0d 100644
--- a/pulsar/consumer_impl.go
+++ b/pulsar/consumer_impl.go
@@ -121,10 +121,10 @@ func topicSubscribe(client *client, options
ConsumerOptions, topic string,
defer wg.Done()
var nackRedeliveryDelay time.Duration
- if options.NackRedeliveryDelay == nil {
+ if options.NackRedeliveryDelay == 0 {
nackRedeliveryDelay = defaultNackRedeliveryDelay
} else {
- nackRedeliveryDelay =
*options.NackRedeliveryDelay
+ nackRedeliveryDelay =
options.NackRedeliveryDelay
}
opts := &partitionConsumerOpts{
topic: pt,
@@ -232,7 +232,7 @@ func (c *consumer) AckID(msgID MessageID) {
}
func (c *consumer) Nack(msg Message) {
- c.AckID(msg.ID())
+ c.NackID(msg.ID())
}
func (c *consumer) NackID(msgID MessageID) {
diff --git a/pulsar/consumer_test.go b/pulsar/consumer_test.go
index c45b932..cc40d22 100644
--- a/pulsar/consumer_test.go
+++ b/pulsar/consumer_test.go
@@ -547,3 +547,131 @@ func TestConsumerFlow(t *testing.T) {
assert.Equal(t, fmt.Sprintf("msg-content-%d", msgNum),
string(msg.Payload()))
}
}
+
+func TestConsumerAck(t *testing.T) {
+ client, err := NewClient(ClientOptions{
+ URL: lookupURL,
+ })
+
+ assert.Nil(t, err)
+ defer client.Close()
+
+ topicName := newTopicName()
+ ctx := context.Background()
+
+ producer, err := client.CreateProducer(ProducerOptions{
+ Topic: topicName,
+ })
+ assert.Nil(t, err)
+ defer producer.Close()
+
+ consumer, err := client.Subscribe(ConsumerOptions{
+ Topic: topicName,
+ SubscriptionName: "sub-1",
+ Type: Shared,
+ })
+ assert.Nil(t, err)
+
+ const N = 100
+
+ for i := 0; i < N; i++ {
+ if err := producer.Send(ctx, &ProducerMessage{
+ Payload: []byte(fmt.Sprintf("msg-content-%d", i)),
+ }); err != nil {
+ t.Fatal(err)
+ }
+ }
+
+ for i := 0; i < N; i++ {
+ msg, err := consumer.Receive(ctx)
+ assert.Nil(t, err)
+ assert.Equal(t, fmt.Sprintf("msg-content-%d", i),
string(msg.Payload()))
+
+ if i < N/2 {
+ // Only acks the first half of messages
+ consumer.Ack(msg)
+ }
+ }
+
+ err = consumer.Close()
+ assert.Nil(t, err)
+
+ // Subscribe again
+ consumer, err = client.Subscribe(ConsumerOptions{
+ Topic: topicName,
+ SubscriptionName: "sub-1",
+ Type: Shared,
+ })
+ assert.Nil(t, err)
+ defer consumer.Close()
+
+ // We should only receive the 2nd half of messages
+ for i := N / 2; i < N; i++ {
+ msg, err := consumer.Receive(ctx)
+ assert.Nil(t, err)
+ assert.Equal(t, fmt.Sprintf("msg-content-%d", i),
string(msg.Payload()))
+
+ consumer.Ack(msg)
+ }
+}
+
+func TestConsumerNack(t *testing.T) {
+ client, err := NewClient(ClientOptions{
+ URL: lookupURL,
+ })
+
+ assert.Nil(t, err)
+ defer client.Close()
+
+ topicName := newTopicName()
+ ctx := context.Background()
+
+ producer, err := client.CreateProducer(ProducerOptions{
+ Topic: topicName,
+ })
+ assert.Nil(t, err)
+ defer producer.Close()
+
+ consumer, err := client.Subscribe(ConsumerOptions{
+ Topic: topicName,
+ SubscriptionName: "sub-1",
+ Type: Shared,
+ NackRedeliveryDelay: 1 * time.Second,
+ })
+ assert.Nil(t, err)
+
+ const N = 100
+
+ for i := 0; i < N; i++ {
+ if err := producer.Send(ctx, &ProducerMessage{
+ Payload: []byte(fmt.Sprintf("msg-content-%d", i)),
+ }); err != nil {
+ t.Fatal(err)
+ }
+ }
+
+ for i := 0; i < N; i++ {
+ msg, err := consumer.Receive(ctx)
+ assert.Nil(t, err)
+ assert.Equal(t, fmt.Sprintf("msg-content-%d", i),
string(msg.Payload()))
+
+ if i%2 == 0 {
+ // Only acks even messages
+ consumer.Ack(msg)
+ } else {
+ // Fails to process odd messages
+ consumer.Nack(msg)
+ }
+ }
+
+ // Failed messages should be resent
+
+ // We should only receive the odd messages
+ for i := 1; i < N; i += 2 {
+ msg, err := consumer.Receive(ctx)
+ assert.Nil(t, err)
+ assert.Equal(t, fmt.Sprintf("msg-content-%d", i),
string(msg.Payload()))
+
+ consumer.Ack(msg)
+ }
+}