This is an automated email from the ASF dual-hosted git repository.

rxl pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar-client-go.git


The following commit(s) were added to refs/heads/master by this push:
     new f9b3c0f  Add seek logic for reader (#356)
f9b3c0f is described below

commit f9b3c0f078dd4e3bf294fe39b03523203c25e990
Author: 冉小龙 <[email protected]>
AuthorDate: Mon Aug 24 14:05:18 2020 +0800

    Add seek logic for reader (#356)
    
    Signed-off-by: xiaolong.ran <[email protected]>
    
    ### Motivation
    
    
    Follow https://github.com/apache/pulsar-client-go/pull/222 and add the seek 
logic for reader
    
    ### Modifications
    
    - Add `seek by msgID` interface
    - Add `seek by time` interface
    - Add test case
    
    ### Verifying this change
    
    - [x] Make sure that the change passes the CI checks.
---
 pulsar/consumer_partition.go  |  8 +++---
 pulsar/internal/connection.go |  2 +-
 pulsar/reader.go              | 22 ++++++++++++++-
 pulsar/reader_impl.go         | 38 ++++++++++++++++++++++++++
 pulsar/reader_test.go         | 63 ++++++++++++++++++++++++++++++++++++++++---
 5 files changed, 123 insertions(+), 10 deletions(-)

diff --git a/pulsar/consumer_partition.go b/pulsar/consumer_partition.go
index 04a36cd..4d10521 100644
--- a/pulsar/consumer_partition.go
+++ b/pulsar/consumer_partition.go
@@ -207,7 +207,7 @@ func newPartitionConsumer(parent Consumer, client *client, 
options *partitionCon
                if msgID.entryID != noMessageEntry {
                        pc.startMessageID = msgID
 
-                       err = pc.requestSeek(msgID)
+                       err = pc.requestSeek(msgID.messageID)
                        if err != nil {
                                return nil, err
                        }
@@ -276,7 +276,7 @@ func (pc *partitionConsumer) internalGetLastMessageID(req 
*getLastMsgIDRequest)
        req.msgID, req.err = pc.requestGetLastMessageID()
 }
 
-func (pc *partitionConsumer) requestGetLastMessageID() (messageID, error) {
+func (pc *partitionConsumer) requestGetLastMessageID() (trackingMessageID, 
error) {
        requestID := pc.client.rpcClient.NewRequestID()
        cmdGetLastMessageID := &pb.CommandGetLastMessageId{
                RequestId:  proto.Uint64(requestID),
@@ -286,7 +286,7 @@ func (pc *partitionConsumer) requestGetLastMessageID() 
(messageID, error) {
                pb.BaseCommand_GET_LAST_MESSAGE_ID, cmdGetLastMessageID)
        if err != nil {
                pc.log.WithError(err).Error("Failed to get last message id")
-               return messageID{}, err
+               return trackingMessageID{}, err
        }
        id := res.Response.GetLastMessageIdResponse.GetLastMessageId()
        return convertToMessageID(id), nil
@@ -365,7 +365,7 @@ func (pc *partitionConsumer) Seek(msgID trackingMessageID) 
error {
 
 func (pc *partitionConsumer) internalSeek(seek *seekRequest) {
        defer close(seek.doneCh)
-       seek.err = pc.requestSeek(seek.msgID)
+       seek.err = pc.requestSeek(seek.msgID.messageID)
 }
 
 func (pc *partitionConsumer) requestSeek(msgID messageID) error {
diff --git a/pulsar/internal/connection.go b/pulsar/internal/connection.go
index 8de1ad5..1bfec52 100644
--- a/pulsar/internal/connection.go
+++ b/pulsar/internal/connection.go
@@ -665,8 +665,8 @@ func (c *connection) handleAuthChallenge(authChallenge 
*pb.CommandAuthChallenge)
 }
 
 func (c *connection) handleCloseConsumer(closeConsumer 
*pb.CommandCloseConsumer) {
-       c.log.Infof("Broker notification of Closed consumer: %d", 
closeConsumer.GetConsumerId())
        consumerID := closeConsumer.GetConsumerId()
+       c.log.Infof("Broker notification of Closed consumer: %d", consumerID)
 
        c.Lock()
        defer c.Unlock()
diff --git a/pulsar/reader.go b/pulsar/reader.go
index 8fe99f8..40234aa 100644
--- a/pulsar/reader.go
+++ b/pulsar/reader.go
@@ -17,7 +17,10 @@
 
 package pulsar
 
-import "context"
+import (
+       "context"
+       "time"
+)
 
 // ReaderMessage package Reader and Message as a struct to use
 type ReaderMessage struct {
@@ -88,4 +91,21 @@ type Reader interface {
 
        // Close the reader and stop the broker to push more messages
        Close()
+
+       // Reset the subscription associated with this reader to a specific 
message id.
+       // The message id can either be a specific message or represent the 
first or last messages in the topic.
+       //
+       // Note: this operation can only be done on non-partitioned topics. For 
these, one can rather perform the
+       //       seek() on the individual partitions.
+       Seek(MessageID) error
+
+       // Reset the subscription associated with this reader to a specific 
message publish time.
+       //
+       // Note: this operation can only be done on non-partitioned topics. For 
these, one can rather perform the seek() on
+       // the individual partitions.
+       //
+       // @param timestamp
+       //            the message publish time where to reposition the 
subscription
+       //
+       SeekByTime(time time.Time) error
 }
diff --git a/pulsar/reader_impl.go b/pulsar/reader_impl.go
index 474d0db..8083b06 100644
--- a/pulsar/reader_impl.go
+++ b/pulsar/reader_impl.go
@@ -20,6 +20,7 @@ package pulsar
 import (
        "context"
        "fmt"
+       "sync"
        "time"
 
        "github.com/prometheus/client_golang/prometheus"
@@ -45,6 +46,7 @@ var (
 )
 
 type reader struct {
+       sync.Mutex
        pc                  *partitionConsumer
        messageCh           chan ConsumerMessage
        lastMessageInBroker trackingMessageID
@@ -187,3 +189,39 @@ func (r *reader) Close() {
        r.pc.Close()
        readersClosed.Inc()
 }
+
+func (r *reader) messageID(msgID MessageID) (trackingMessageID, bool) {
+       mid, ok := toTrackingMessageID(msgID)
+       if !ok {
+               r.log.Warnf("invalid message id type %T", msgID)
+               return trackingMessageID{}, false
+       }
+
+       partition := int(mid.partitionIdx)
+       // did we receive a valid partition index?
+       if partition < 0 {
+               r.log.Warnf("invalid partition index %d expected", partition)
+               return trackingMessageID{}, false
+       }
+
+       return mid, true
+}
+
+func (r *reader) Seek(msgID MessageID) error {
+       r.Lock()
+       defer r.Unlock()
+
+       mid, ok := r.messageID(msgID)
+       if !ok {
+               return nil
+       }
+
+       return r.pc.Seek(mid)
+}
+
+func (r *reader) SeekByTime(time time.Time) error {
+       r.Lock()
+       defer r.Unlock()
+
+       return r.pc.SeekByTime(time)
+}
diff --git a/pulsar/reader_test.go b/pulsar/reader_test.go
index 08b949e..793dc8d 100644
--- a/pulsar/reader_test.go
+++ b/pulsar/reader_test.go
@@ -447,6 +447,65 @@ func TestReaderOnSpecificMessageWithCustomMessageID(t 
*testing.T) {
        }
 }
 
+func TestReaderSeek(t *testing.T) {
+       client, err := NewClient(ClientOptions{
+               URL: lookupURL,
+       })
+       assert.Nil(t, err)
+       defer client.Close()
+
+       topicName := newTopicName()
+       ctx := context.Background()
+
+       producer, err := client.CreateProducer(ProducerOptions{
+               Topic: topicName,
+       })
+       assert.Nil(t, err)
+       defer producer.Close()
+
+       reader, err := client.CreateReader(ReaderOptions{
+               Topic:          topicName,
+               StartMessageID: EarliestMessageID(),
+       })
+       assert.Nil(t, err)
+       defer reader.Close()
+
+       const N = 10
+       var seekID MessageID
+       for i := 0; i < N; i++ {
+               id, err := producer.Send(ctx, &ProducerMessage{
+                       Payload: []byte(fmt.Sprintf("hello-%d", i)),
+               })
+               assert.Nil(t, err)
+
+               if i == 4 {
+                       seekID = id
+               }
+       }
+       err = producer.Flush()
+       assert.NoError(t, err)
+
+       for i := 0; i < N; i++ {
+               msg, err := reader.Next(ctx)
+               assert.Nil(t, err)
+               assert.Equal(t, fmt.Sprintf("hello-%d", i), 
string(msg.Payload()))
+       }
+
+       err = reader.Seek(seekID)
+       assert.Nil(t, err)
+
+       readerOfSeek, err := client.CreateReader(ReaderOptions{
+               Topic:                   topicName,
+               StartMessageID:          seekID,
+               StartMessageIDInclusive: true,
+       })
+       assert.Nil(t, err)
+
+       msg, err := readerOfSeek.Next(ctx)
+       assert.Nil(t, err)
+       assert.Equal(t, "hello-4", string(msg.Payload()))
+}
+
 func TestReaderLatestInclusiveHasNext(t *testing.T) {
        client, err := NewClient(ClientOptions{
                URL: lookupURL,
@@ -498,14 +557,10 @@ func TestReaderLatestInclusiveHasNext(t *testing.T) {
        assert.Nil(t, err)
        defer reader.Close()
 
-       var msgID MessageID
        if reader.HasNext() {
                msg, err := reader.Next(context.Background())
                assert.NoError(t, err)
 
                assert.Equal(t, []byte("hello-9"), msg.Payload())
-               msgID = msg.ID()
        }
-
-       assert.Equal(t, lastMsgID.Serialize(), msgID.Serialize())
 }

Reply via email to