This is an automated email from the ASF dual-hosted git repository.

rxl pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar-client-go.git


The following commit(s) were added to refs/heads/master by this push:
     new cf031b8  add messageId and topic as props of DLQ message (#907)
cf031b8 is described below

commit cf031b8951aec1356f2e064f1bf679d24c81ccd0
Author: Garule Prabhudas <[email protected]>
AuthorDate: Tue Jan 10 17:46:33 2023 +0530

    add messageId and topic as props of DLQ message (#907)
    
    Co-authored-by: Prabhudas Garule <[email protected]>
---
 pulsar/consumer_test.go |  6 ++++++
 pulsar/dlq_router.go    | 12 +++++++++++-
 pulsar/message.go       |  3 +++
 pulsar/reader_test.go   |  8 ++++++++
 pulsar/retry_router.go  |  1 +
 5 files changed, 29 insertions(+), 1 deletion(-)

diff --git a/pulsar/consumer_test.go b/pulsar/consumer_test.go
index 11e72a7..2f1a056 100644
--- a/pulsar/consumer_test.go
+++ b/pulsar/consumer_test.go
@@ -1505,6 +1505,12 @@ func DLQWithProducerOptions(t *testing.T, prodOpt 
*ProducerOptions) {
 
                expectMsg := fmt.Sprintf("hello-%d", expectedMsgIdx)
                assert.Equal(t, []byte(expectMsg), msg.Payload())
+
+               // check original messageId
+               assert.NotEmpty(t, msg.Properties()[PropertyOriginMessageID])
+
+               // check original topic
+               assert.NotEmpty(t, msg.Properties()[SysPropertyRealTopic])
        }
 
        // No more messages on the DLQ
diff --git a/pulsar/dlq_router.go b/pulsar/dlq_router.go
index 000faaa..5ecd8f8 100644
--- a/pulsar/dlq_router.go
+++ b/pulsar/dlq_router.go
@@ -92,11 +92,21 @@ func (r *dlqRouter) run() {
                        producer := 
r.getProducer(cm.Consumer.(*consumer).options.Schema)
                        msg := cm.Message.(*message)
                        msgID := msg.ID()
+
+                       // properties associated with original message
+                       properties := msg.Properties()
+
+                       // include orinal message id in string format in 
properties
+                       properties[PropertyOriginMessageID] = msgID.String()
+
+                       // include original topic name of the message in 
properties
+                       properties[SysPropertyRealTopic] = msg.Topic()
+
                        producer.SendAsync(context.Background(), 
&ProducerMessage{
                                Payload:             msg.Payload(),
                                Key:                 msg.Key(),
                                OrderingKey:         msg.OrderingKey(),
-                               Properties:          msg.Properties(),
+                               Properties:          properties,
                                EventTime:           msg.EventTime(),
                                ReplicationClusters: msg.replicationClusters,
                        }, func(MessageID, *ProducerMessage, error) {
diff --git a/pulsar/message.go b/pulsar/message.go
index 7f2e07f..76d0176 100644
--- a/pulsar/message.go
+++ b/pulsar/message.go
@@ -154,6 +154,9 @@ type MessageID interface {
 
        // PartitionIdx returns the message partitionIdx
        PartitionIdx() int32
+
+       // String returns message id in string format
+       String() string
 }
 
 // DeserializeMessageID reconstruct a MessageID object from its serialized 
representation
diff --git a/pulsar/reader_test.go b/pulsar/reader_test.go
index 07e7ed3..53bd459 100644
--- a/pulsar/reader_test.go
+++ b/pulsar/reader_test.go
@@ -430,6 +430,14 @@ func (id *myMessageID) PartitionIdx() int32 {
        return id.PartitionIdx()
 }
 
+func (id *myMessageID) String() string {
+       mid, err := DeserializeMessageID(id.data)
+       if err != nil {
+               return ""
+       }
+       return fmt.Sprintf("%d:%d:%d", mid.LedgerID(), mid.EntryID(), 
mid.PartitionIdx())
+}
+
 func TestReaderOnSpecificMessageWithCustomMessageID(t *testing.T) {
        client, err := NewClient(ClientOptions{
                URL: lookupURL,
diff --git a/pulsar/retry_router.go b/pulsar/retry_router.go
index 7b5f6b8..75792ad 100644
--- a/pulsar/retry_router.go
+++ b/pulsar/retry_router.go
@@ -35,6 +35,7 @@ const (
        SysPropertyRetryTopic      = "RETRY_TOPIC"
        SysPropertyReconsumeTimes  = "RECONSUMETIMES"
        SysPropertyOriginMessageID = "ORIGIN_MESSAGE_IDY_TIME"
+       PropertyOriginMessageID    = "ORIGIN_MESSAGE_ID"
 )
 
 type RetryMessage struct {

Reply via email to