This is an automated email from the ASF dual-hosted git repository.
xiangying pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar-client-go.git
The following commit(s) were added to refs/heads/master by this push:
new 7fef6a98 [improve][client] Implement GetLastMSgID for Reader (#1087)
7fef6a98 is described below
commit 7fef6a985b2a880bd47948afe0bc546ffc0c958d
Author: Xiangying Meng <[email protected]>
AuthorDate: Wed Sep 13 14:52:40 2023 +0800
[improve][client] Implement GetLastMSgID for Reader (#1087)
### Motivation
Implement the GetLastMSgID API for Reader.
---------
Co-authored-by: Yunze Xu <[email protected]>
---
pulsar/reader.go | 3 +++
pulsar/reader_impl.go | 4 ++++
pulsar/reader_test.go | 42 ++++++++++++++++++++++++++++++++++++++++++
3 files changed, 49 insertions(+)
diff --git a/pulsar/reader.go b/pulsar/reader.go
index d58d06f6..5e1a73b9 100644
--- a/pulsar/reader.go
+++ b/pulsar/reader.go
@@ -134,4 +134,7 @@ type Reader interface {
// the message publish time where to reposition the
subscription
//
SeekByTime(time time.Time) error
+
+ // GetLastMessageID get the last message id available for consume.
+ GetLastMessageID() (MessageID, error)
}
diff --git a/pulsar/reader_impl.go b/pulsar/reader_impl.go
index 5a2128a3..ffc92ded 100644
--- a/pulsar/reader_impl.go
+++ b/pulsar/reader_impl.go
@@ -244,3 +244,7 @@ func (r *reader) SeekByTime(time time.Time) error {
return r.pc.SeekByTime(time)
}
+
+func (r *reader) GetLastMessageID() (MessageID, error) {
+ return r.pc.getLastMessageID()
+}
diff --git a/pulsar/reader_test.go b/pulsar/reader_test.go
index 0a1b2a1d..ec10f8f1 100644
--- a/pulsar/reader_test.go
+++ b/pulsar/reader_test.go
@@ -901,3 +901,45 @@ func TestReaderWithBackoffPolicy(t *testing.T) {
partitionConsumerImp.reconnectToBroker()
assert.True(t, backoff.IsExpectedIntervalFrom(startTime))
}
+
+func TestReaderGetLastMessageID(t *testing.T) {
+ client, err := NewClient(ClientOptions{
+ URL: serviceURL,
+ })
+ assert.Nil(t, err)
+ topic := newTopicName()
+ ctx := context.Background()
+ schema := NewStringSchema(nil)
+ // create producer
+ producer, err := client.CreateProducer(ProducerOptions{
+ Topic: topic,
+ DisableBatching: true,
+ Schema: schema,
+ })
+ assert.Nil(t, err)
+ defer producer.Close()
+
+ var lastMsgID MessageID
+ // send 10 messages
+ for i := 0; i < 10; i++ {
+ msgID, err := producer.Send(ctx, &ProducerMessage{
+ Payload: []byte(fmt.Sprintf("hello-%d", i)),
+ })
+ assert.NoError(t, err)
+ assert.NotNil(t, msgID)
+ lastMsgID = msgID
+ }
+
+ reader, err := client.CreateReader(ReaderOptions{
+ Topic: topic,
+ StartMessageID: EarliestMessageID(),
+ })
+ assert.Nil(t, err)
+ getLastMessageID, err := reader.GetLastMessageID()
+ if err != nil {
+ return
+ }
+
+ assert.Equal(t, lastMsgID.LedgerID(), getLastMessageID.LedgerID())
+ assert.Equal(t, lastMsgID.EntryID(), getLastMessageID.EntryID())
+}