This is an automated email from the ASF dual-hosted git repository.

xiangying pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar-client-go.git


The following commit(s) were added to refs/heads/master by this push:
     new 7fef6a98 [improve][client] Implement GetLastMSgID for Reader (#1087)
7fef6a98 is described below

commit 7fef6a985b2a880bd47948afe0bc546ffc0c958d
Author: Xiangying Meng <[email protected]>
AuthorDate: Wed Sep 13 14:52:40 2023 +0800

    [improve][client] Implement GetLastMSgID for Reader (#1087)
    
    ### Motivation
    Implement the GetLastMSgID API for Reader.
    
    ---------
    
    Co-authored-by: Yunze Xu <[email protected]>
---
 pulsar/reader.go      |  3 +++
 pulsar/reader_impl.go |  4 ++++
 pulsar/reader_test.go | 42 ++++++++++++++++++++++++++++++++++++++++++
 3 files changed, 49 insertions(+)

diff --git a/pulsar/reader.go b/pulsar/reader.go
index d58d06f6..5e1a73b9 100644
--- a/pulsar/reader.go
+++ b/pulsar/reader.go
@@ -134,4 +134,7 @@ type Reader interface {
        //            the message publish time where to reposition the 
subscription
        //
        SeekByTime(time time.Time) error
+
+       // GetLastMessageID get the last message id available for consume.
+       GetLastMessageID() (MessageID, error)
 }
diff --git a/pulsar/reader_impl.go b/pulsar/reader_impl.go
index 5a2128a3..ffc92ded 100644
--- a/pulsar/reader_impl.go
+++ b/pulsar/reader_impl.go
@@ -244,3 +244,7 @@ func (r *reader) SeekByTime(time time.Time) error {
 
        return r.pc.SeekByTime(time)
 }
+
+func (r *reader) GetLastMessageID() (MessageID, error) {
+       return r.pc.getLastMessageID()
+}
diff --git a/pulsar/reader_test.go b/pulsar/reader_test.go
index 0a1b2a1d..ec10f8f1 100644
--- a/pulsar/reader_test.go
+++ b/pulsar/reader_test.go
@@ -901,3 +901,45 @@ func TestReaderWithBackoffPolicy(t *testing.T) {
        partitionConsumerImp.reconnectToBroker()
        assert.True(t, backoff.IsExpectedIntervalFrom(startTime))
 }
+
+func TestReaderGetLastMessageID(t *testing.T) {
+       client, err := NewClient(ClientOptions{
+               URL: serviceURL,
+       })
+       assert.Nil(t, err)
+       topic := newTopicName()
+       ctx := context.Background()
+       schema := NewStringSchema(nil)
+       // create producer
+       producer, err := client.CreateProducer(ProducerOptions{
+               Topic:           topic,
+               DisableBatching: true,
+               Schema:          schema,
+       })
+       assert.Nil(t, err)
+       defer producer.Close()
+
+       var lastMsgID MessageID
+       // send 10 messages
+       for i := 0; i < 10; i++ {
+               msgID, err := producer.Send(ctx, &ProducerMessage{
+                       Payload: []byte(fmt.Sprintf("hello-%d", i)),
+               })
+               assert.NoError(t, err)
+               assert.NotNil(t, msgID)
+               lastMsgID = msgID
+       }
+
+       reader, err := client.CreateReader(ReaderOptions{
+               Topic:          topic,
+               StartMessageID: EarliestMessageID(),
+       })
+       assert.Nil(t, err)
+       getLastMessageID, err := reader.GetLastMessageID()
+       if err != nil {
+               return
+       }
+
+       assert.Equal(t, lastMsgID.LedgerID(), getLastMessageID.LedgerID())
+       assert.Equal(t, lastMsgID.EntryID(), getLastMessageID.EntryID())
+}

Reply via email to