This is an automated email from the ASF dual-hosted git repository.

rxl pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar-client-go.git


The following commit(s) were added to refs/heads/master by this push:
     new 2f57a6f  [Issue: 201] Expose GetDeliveryCount method from Message 
interface (#202)
2f57a6f is described below

commit 2f57a6f86a0ca076def0347564fed64f640d9c9b
Author: 冉小龙 <[email protected]>
AuthorDate: Sat Mar 21 14:06:10 2020 +0800

    [Issue: 201] Expose GetDeliveryCount method from Message interface (#202)
    
    * Expose GetDeliveryCount method from Message interface
    
    Signed-off-by: xiaolong.ran <[email protected]>
    
    * fix ci error
    
    Signed-off-by: xiaolong.ran <[email protected]>
    
    * fix comments
    
    Signed-off-by: xiaolong.ran <[email protected]>
---
 pulsar/consumer_test.go | 65 +++++++++++++++++++++++++++++++++++++++++++++++++
 pulsar/dlq_router.go    |  3 ++-
 pulsar/impl_message.go  |  4 +++
 pulsar/message.go       |  7 ++++++
 4 files changed, 78 insertions(+), 1 deletion(-)

diff --git a/pulsar/consumer_test.go b/pulsar/consumer_test.go
index 3e256f8..488ca59 100644
--- a/pulsar/consumer_test.go
+++ b/pulsar/consumer_test.go
@@ -1143,3 +1143,68 @@ func TestDLQMultiTopics(t *testing.T) {
        assert.Error(t, err)
        assert.Nil(t, msg)
 }
+
+func TestGetDeliveryCount(t *testing.T) {
+       client, err := NewClient(ClientOptions{
+               URL: lookupURL,
+       })
+
+       assert.Nil(t, err)
+       defer client.Close()
+
+       topic := newTopicName()
+       ctx := context.Background()
+
+       // create consumer
+       consumer, err := client.Subscribe(ConsumerOptions{
+               Topic:               topic,
+               SubscriptionName:    "my-sub",
+               NackRedeliveryDelay: 1 * time.Second,
+               Type:                Shared,
+       })
+       assert.Nil(t, err)
+       defer consumer.Close()
+
+       // create producer
+       producer, err := client.CreateProducer(ProducerOptions{
+               Topic: topic,
+       })
+       assert.Nil(t, err)
+       defer producer.Close()
+
+       // send 10 messages
+       for i := 0; i < 10; i++ {
+               if _, err := producer.Send(ctx, &ProducerMessage{
+                       Payload: []byte(fmt.Sprintf("hello-%d", i)),
+               }); err != nil {
+                       log.Fatal(err)
+               }
+       }
+
+       // receive 10 messages and only ack half-of-them
+       for i := 0; i < 10; i++ {
+               msg, _ := consumer.Receive(context.Background())
+
+               if i%2 == 0 {
+                       // ack message
+                       consumer.Ack(msg)
+               } else {
+                       consumer.Nack(msg)
+               }
+       }
+
+       // Receive the unacked messages other 2 times, failing at processing
+       for i := 0; i < 2; i++ {
+               var msg Message
+               for i := 0; i < 5; i++ {
+                       msg, err = consumer.Receive(context.Background())
+                       assert.Nil(t, err)
+                       consumer.Nack(msg)
+               }
+               assert.Equal(t, uint32(i+1), msg.RedeliveryCount())
+       }
+
+       msg, err := consumer.Receive(context.Background())
+       assert.Nil(t, err)
+       assert.Equal(t, uint32(3), msg.RedeliveryCount())
+}
diff --git a/pulsar/dlq_router.go b/pulsar/dlq_router.go
index 69b45d7..d6e7b30 100644
--- a/pulsar/dlq_router.go
+++ b/pulsar/dlq_router.go
@@ -76,7 +76,8 @@ func (r *dlqRouter) shouldSendToDlq(cm *ConsumerMessage) bool 
{
        //  * when we receive the message and redeliveryCount == 10, it means
        //    that the application has already got (and Nack())  the message 10
        //    times, so this time we should just go to DLQ.
-       return cm.Message.(*message).redeliveryCount >= r.policy.MaxDeliveries
+
+       return msg.redeliveryCount >= r.policy.MaxDeliveries
 }
 
 func (r *dlqRouter) Chan() chan ConsumerMessage {
diff --git a/pulsar/impl_message.go b/pulsar/impl_message.go
index a8033a7..9b85c8a 100644
--- a/pulsar/impl_message.go
+++ b/pulsar/impl_message.go
@@ -176,6 +176,10 @@ func (msg *message) Key() string {
        return msg.key
 }
 
+func (msg *message) RedeliveryCount() uint32 {
+       return msg.redeliveryCount
+}
+
 func newAckTracker(size int) *ackTracker {
        var batchIDs *big.Int
        if size <= 64 {
diff --git a/pulsar/message.go b/pulsar/message.go
index bc38ebb..8505321 100644
--- a/pulsar/message.go
+++ b/pulsar/message.go
@@ -86,6 +86,13 @@ type Message interface {
 
        // Key get the key of the message, if any
        Key() string
+
+       // Get message redelivery count, redelivery count maintain in pulsar 
broker. When client nack acknowledge messages,
+       // broker will dispatch message again with message redelivery count in 
CommandMessage defined.
+       //
+       // Message redelivery increases monotonically in a broker, when topic 
switch ownership to a another broker
+       // redelivery count will be recalculated.
+       RedeliveryCount() uint32
 }
 
 // MessageID identifier for a particular message

Reply via email to