This is an automated email from the ASF dual-hosted git repository.
rxl pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar-client-go.git
The following commit(s) were added to refs/heads/master by this push:
new cf031b8 add messageId and topic as props of DLQ message (#907)
cf031b8 is described below
commit cf031b8951aec1356f2e064f1bf679d24c81ccd0
Author: Garule Prabhudas <[email protected]>
AuthorDate: Tue Jan 10 17:46:33 2023 +0530
add messageId and topic as props of DLQ message (#907)
Co-authored-by: Prabhudas Garule <[email protected]>
---
pulsar/consumer_test.go | 6 ++++++
pulsar/dlq_router.go | 12 +++++++++++-
pulsar/message.go | 3 +++
pulsar/reader_test.go | 8 ++++++++
pulsar/retry_router.go | 1 +
5 files changed, 29 insertions(+), 1 deletion(-)
diff --git a/pulsar/consumer_test.go b/pulsar/consumer_test.go
index 11e72a7..2f1a056 100644
--- a/pulsar/consumer_test.go
+++ b/pulsar/consumer_test.go
@@ -1505,6 +1505,12 @@ func DLQWithProducerOptions(t *testing.T, prodOpt
*ProducerOptions) {
expectMsg := fmt.Sprintf("hello-%d", expectedMsgIdx)
assert.Equal(t, []byte(expectMsg), msg.Payload())
+
+ // check original messageId
+ assert.NotEmpty(t, msg.Properties()[PropertyOriginMessageID])
+
+ // check original topic
+ assert.NotEmpty(t, msg.Properties()[SysPropertyRealTopic])
}
// No more messages on the DLQ
diff --git a/pulsar/dlq_router.go b/pulsar/dlq_router.go
index 000faaa..5ecd8f8 100644
--- a/pulsar/dlq_router.go
+++ b/pulsar/dlq_router.go
@@ -92,11 +92,21 @@ func (r *dlqRouter) run() {
producer :=
r.getProducer(cm.Consumer.(*consumer).options.Schema)
msg := cm.Message.(*message)
msgID := msg.ID()
+
+ // properties associated with original message
+ properties := msg.Properties()
+
+ // include orinal message id in string format in
properties
+ properties[PropertyOriginMessageID] = msgID.String()
+
+ // include original topic name of the message in
properties
+ properties[SysPropertyRealTopic] = msg.Topic()
+
producer.SendAsync(context.Background(),
&ProducerMessage{
Payload: msg.Payload(),
Key: msg.Key(),
OrderingKey: msg.OrderingKey(),
- Properties: msg.Properties(),
+ Properties: properties,
EventTime: msg.EventTime(),
ReplicationClusters: msg.replicationClusters,
}, func(MessageID, *ProducerMessage, error) {
diff --git a/pulsar/message.go b/pulsar/message.go
index 7f2e07f..76d0176 100644
--- a/pulsar/message.go
+++ b/pulsar/message.go
@@ -154,6 +154,9 @@ type MessageID interface {
// PartitionIdx returns the message partitionIdx
PartitionIdx() int32
+
+ // String returns message id in string format
+ String() string
}
// DeserializeMessageID reconstruct a MessageID object from its serialized
representation
diff --git a/pulsar/reader_test.go b/pulsar/reader_test.go
index 07e7ed3..53bd459 100644
--- a/pulsar/reader_test.go
+++ b/pulsar/reader_test.go
@@ -430,6 +430,14 @@ func (id *myMessageID) PartitionIdx() int32 {
return id.PartitionIdx()
}
+func (id *myMessageID) String() string {
+ mid, err := DeserializeMessageID(id.data)
+ if err != nil {
+ return ""
+ }
+ return fmt.Sprintf("%d:%d:%d", mid.LedgerID(), mid.EntryID(),
mid.PartitionIdx())
+}
+
func TestReaderOnSpecificMessageWithCustomMessageID(t *testing.T) {
client, err := NewClient(ClientOptions{
URL: lookupURL,
diff --git a/pulsar/retry_router.go b/pulsar/retry_router.go
index 7b5f6b8..75792ad 100644
--- a/pulsar/retry_router.go
+++ b/pulsar/retry_router.go
@@ -35,6 +35,7 @@ const (
SysPropertyRetryTopic = "RETRY_TOPIC"
SysPropertyReconsumeTimes = "RECONSUMETIMES"
SysPropertyOriginMessageID = "ORIGIN_MESSAGE_IDY_TIME"
+ PropertyOriginMessageID = "ORIGIN_MESSAGE_ID"
)
type RetryMessage struct {