This is an automated email from the ASF dual-hosted git repository.
rxl pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar-client-go.git
The following commit(s) were added to refs/heads/master by this push:
new a03349a add LedgerId,EntryId,BatchIdx,PartitionIdx func for MessageId
interface (#529)
a03349a is described below
commit a03349ac0466a6c7cf595914f6d01d4425eb4aeb
Author: yorkxyzhang <[email protected]>
AuthorDate: Tue Jun 1 15:15:52 2021 +0800
add LedgerId,EntryId,BatchIdx,PartitionIdx func for MessageId interface
(#529)
* [feature] add LedgerId,EntryId,BatchIdx,PartitionIdx func for MessageId
interface
* [feature] add a new unit test for MessageId Get func
* [format] adjust format
---
pulsar/impl_message.go | 16 ++++++++++++++++
pulsar/impl_message_test.go | 9 +++++++++
pulsar/message.go | 12 ++++++++++++
pulsar/reader_test.go | 16 ++++++++++++++++
4 files changed, 53 insertions(+)
diff --git a/pulsar/impl_message.go b/pulsar/impl_message.go
index a84fc2d..c4f215c 100644
--- a/pulsar/impl_message.go
+++ b/pulsar/impl_message.go
@@ -123,6 +123,22 @@ func (id messageID) Serialize() []byte {
return data
}
+func (id messageID) LedgerID() int64 {
+ return id.ledgerID
+}
+
+func (id messageID) EntryID() int64 {
+ return id.entryID
+}
+
+func (id messageID) BatchIdx() int32 {
+ return id.batchIdx
+}
+
+func (id messageID) PartitionIdx() int32 {
+ return id.partitionIdx
+}
+
func (id messageID) String() string {
return fmt.Sprintf("%d:%d:%d", id.ledgerID, id.entryID, id.partitionIdx)
}
diff --git a/pulsar/impl_message_test.go b/pulsar/impl_message_test.go
index 4e8b644..3421ab4 100644
--- a/pulsar/impl_message_test.go
+++ b/pulsar/impl_message_test.go
@@ -45,6 +45,15 @@ func TestMessageId(t *testing.T) {
assert.Nil(t, id)
}
+func TestMessageIdGetFuncs(t *testing.T) {
+ // test LedgerId,EntryId,BatchIdx,PartitionIdx
+ id := newMessageID(1, 2, 3, 4)
+ assert.Equal(t, int64(1), id.LedgerID())
+ assert.Equal(t, int64(2), id.EntryID())
+ assert.Equal(t, int32(3), id.BatchIdx())
+ assert.Equal(t, int32(4), id.PartitionIdx())
+}
+
func TestAckTracker(t *testing.T) {
tracker := newAckTracker(1)
assert.Equal(t, true, tracker.ack(0))
diff --git a/pulsar/message.go b/pulsar/message.go
index 55ecbcc..2a4343f 100644
--- a/pulsar/message.go
+++ b/pulsar/message.go
@@ -119,6 +119,18 @@ type Message interface {
type MessageID interface {
// Serialize the message id into a sequence of bytes that can be stored
somewhere else
Serialize() []byte
+
+ // Get the message ledgerID
+ LedgerID() int64
+
+ // Get the message entryID
+ EntryID() int64
+
+ // Get the message batchIdx
+ BatchIdx() int32
+
+ // Get the message partitionIdx
+ PartitionIdx() int32
}
// DeserializeMessageID reconstruct a MessageID object from its serialized
representation
diff --git a/pulsar/reader_test.go b/pulsar/reader_test.go
index f72ba1d..618f5ab 100644
--- a/pulsar/reader_test.go
+++ b/pulsar/reader_test.go
@@ -391,6 +391,22 @@ func (id *myMessageID) Serialize() []byte {
return id.data
}
+func (id *myMessageID) LedgerID() int64 {
+ return id.LedgerID()
+}
+
+func (id *myMessageID) EntryID() int64 {
+ return id.EntryID()
+}
+
+func (id *myMessageID) BatchIdx() int32 {
+ return id.BatchIdx()
+}
+
+func (id *myMessageID) PartitionIdx() int32 {
+ return id.PartitionIdx()
+}
+
func TestReaderOnSpecificMessageWithCustomMessageID(t *testing.T) {
client, err := NewClient(ClientOptions{
URL: lookupURL,