This is an automated email from the ASF dual-hosted git repository.

rxl pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar-client-go.git


The following commit(s) were added to refs/heads/master by this push:
     new f41441f  [Issue-148][pulsar-client-go] add seek by messageID (#168)
f41441f is described below

commit f41441fa89edabda47ecca92cdc486d0c7927b03
Author: steven.wang <[email protected]>
AuthorDate: Mon Feb 3 17:58:26 2020 +1300

    [Issue-148][pulsar-client-go] add seek by messageID (#168)
    
    ### Motivation
    
    Add seek by messageID
---
 pulsar/consumer.go            |  7 +++++
 pulsar/consumer_impl.go       | 56 +++++++++++++++++++++++++---------------
 pulsar/consumer_multitopic.go |  5 ++++
 pulsar/consumer_partition.go  | 49 +++++++++++++++++++++++++++++++++++
 pulsar/consumer_regex.go      |  5 ++++
 pulsar/consumer_test.go       | 59 ++++++++++++++++++++++++++++++++++++++++++-
 pulsar/internal/connection.go |  1 +
 7 files changed, 160 insertions(+), 22 deletions(-)

diff --git a/pulsar/consumer.go b/pulsar/consumer.go
index af69471..e832b21 100644
--- a/pulsar/consumer.go
+++ b/pulsar/consumer.go
@@ -167,4 +167,11 @@ type Consumer interface {
 
        // Close the consumer and stop the broker to push more messages
        Close()
+
+       // Reset the subscription associated with this consumer to a specific 
message id.
+       // The message id can either be a specific message or represent the 
first or last messages in the topic.
+       //
+       // Note: this operation can only be done on non-partitioned topics. For 
these, one can rather perform the
+       //       seek() on the individual partitions.
+       Seek(MessageID) error
 }
diff --git a/pulsar/consumer_impl.go b/pulsar/consumer_impl.go
index ede02ef..095050f 100644
--- a/pulsar/consumer_impl.go
+++ b/pulsar/consumer_impl.go
@@ -258,9 +258,8 @@ func (c *consumer) Ack(msg Message) {
 
 // Ack the consumption of a single message, identified by its MessageID
 func (c *consumer) AckID(msgID MessageID) {
-       mid, ok := msgID.(*messageID)
+       mid, ok := c.messageID(msgID)
        if !ok {
-               c.log.Warnf("invalid message id type")
                return
        }
 
@@ -269,14 +268,7 @@ func (c *consumer) AckID(msgID MessageID) {
                return
        }
 
-       partition := mid.partitionIdx
-       // did we receive a valid partition index?
-       if partition < 0 || partition >= len(c.consumers) {
-               c.log.Warnf("invalid partition index %d expected a partition 
between [0-%d]",
-                       partition, len(c.consumers))
-               return
-       }
-       c.consumers[partition].AckID(mid)
+       c.consumers[mid.partitionIdx].AckID(mid)
 }
 
 func (c *consumer) Nack(msg Message) {
@@ -284,9 +276,8 @@ func (c *consumer) Nack(msg Message) {
 }
 
 func (c *consumer) NackID(msgID MessageID) {
-       mid, ok := msgID.(*messageID)
+       mid, ok := c.messageID(msgID)
        if !ok {
-               c.log.Warnf("invalid message id type")
                return
        }
 
@@ -295,15 +286,7 @@ func (c *consumer) NackID(msgID MessageID) {
                return
        }
 
-       partition := mid.partitionIdx
-       // did we receive a valid partition index?
-       if partition < 0 || partition >= len(c.consumers) {
-               c.log.Warnf("invalid partition index %d expected a partition 
between [0-%d]",
-                       partition, len(c.consumers))
-               return
-       }
-
-       c.consumers[partition].NackID(mid)
+       c.consumers[mid.partitionIdx].NackID(mid)
 }
 
 func (c *consumer) Close() {
@@ -322,6 +305,19 @@ func (c *consumer) Close() {
        })
 }
 
+func (c *consumer) Seek(msgID MessageID) error {
+       if len(c.consumers) > 1 {
+               return errors.New("for partition topic, seek command should 
perform on the individual partitions")
+       }
+
+       mid, ok := c.messageID(msgID)
+       if !ok {
+               return nil
+       }
+
+       return c.consumers[mid.partitionIdx].Seek(mid)
+}
+
 var r = &random{
        R: rand.New(rand.NewSource(time.Now().UnixNano())),
 }
@@ -367,3 +363,21 @@ func toProtoInitialPosition(p SubscriptionInitialPosition) 
pb.CommandSubscribe_I
 
        return pb.CommandSubscribe_Latest
 }
+
+func (c *consumer) messageID(msgID MessageID) (*messageID, bool) {
+       mid, ok := msgID.(*messageID)
+       if !ok {
+               c.log.Warnf("invalid message id type")
+               return nil, false
+       }
+
+       partition := mid.partitionIdx
+       // did we receive a valid partition index?
+       if partition < 0 || partition >= len(c.consumers) {
+               c.log.Warnf("invalid partition index %d expected a partition 
between [0-%d]",
+                       partition, len(c.consumers))
+               return nil, false
+       }
+
+       return mid, true
+}
diff --git a/pulsar/consumer_multitopic.go b/pulsar/consumer_multitopic.go
index 04a8423..6ba3a4a 100644
--- a/pulsar/consumer_multitopic.go
+++ b/pulsar/consumer_multitopic.go
@@ -19,6 +19,7 @@ package pulsar
 
 import (
        "context"
+       "errors"
        "fmt"
        "sync"
 
@@ -160,3 +161,7 @@ func (c *multiTopicConsumer) Close() {
                close(c.closeCh)
        })
 }
+
+func (c *multiTopicConsumer) Seek(msgID MessageID) error {
+       return errors.New("seek command not allowed for multi topic consumer")
+}
diff --git a/pulsar/consumer_partition.go b/pulsar/consumer_partition.go
index ae07e76..185f351 100644
--- a/pulsar/consumer_partition.go
+++ b/pulsar/consumer_partition.go
@@ -256,6 +256,47 @@ func (pc *partitionConsumer) Close() {
        <-req.doneCh
 }
 
+func (pc *partitionConsumer) Seek(msgID *messageID) error {
+       req := &seekRequest{
+               doneCh: make(chan struct{}),
+               msgID:  msgID,
+       }
+       pc.eventsCh <- req
+
+       // wait for the request to complete
+       <-req.doneCh
+       return req.err
+}
+
+func (pc *partitionConsumer) internalSeek(seek *seekRequest) {
+       defer close(seek.doneCh)
+
+       if pc.state == consumerClosing || pc.state == consumerClosed {
+               pc.log.Error("Consumer was already closed")
+               return
+       }
+
+       id := &pb.MessageIdData{}
+       err := proto.Unmarshal(seek.msgID.Serialize(), id)
+       if err != nil {
+               pc.log.WithError(err).Errorf("deserialize message id error: 
%s", err.Error())
+               seek.err = err
+       }
+
+       requestID := pc.client.rpcClient.NewRequestID()
+       cmdSeek := &pb.CommandSeek{
+               ConsumerId: proto.Uint64(pc.consumerID),
+               RequestId:  proto.Uint64(requestID),
+               MessageId:  id,
+       }
+
+       _, err = pc.client.rpcClient.RequestOnCnx(pc.conn, requestID, 
pb.BaseCommand_SEEK, cmdSeek)
+       if err != nil {
+               pc.log.WithError(err).Error("Failed to reset to message id")
+               seek.err = err
+       }
+}
+
 func (pc *partitionConsumer) internalAck(req *ackRequest) {
        msgID := req.msgID
 
@@ -510,6 +551,12 @@ type getLastMsgIDRequest struct {
        err    error
 }
 
+type seekRequest struct {
+       doneCh chan struct{}
+       msgID  *messageID
+       err    error
+}
+
 func (pc *partitionConsumer) runEventsLoop() {
        defer func() {
                pc.log.Debug("exiting events loop")
@@ -528,6 +575,8 @@ func (pc *partitionConsumer) runEventsLoop() {
                                pc.internalUnsubscribe(v)
                        case *getLastMsgIDRequest:
                                pc.internalGetLastMessageID(v)
+                       case *seekRequest:
+                               pc.internalSeek(v)
                        case *connectionClosed:
                                pc.reconnectToBroker()
                        case *closeRequest:
diff --git a/pulsar/consumer_regex.go b/pulsar/consumer_regex.go
index 1d2f157..4043380 100644
--- a/pulsar/consumer_regex.go
+++ b/pulsar/consumer_regex.go
@@ -19,6 +19,7 @@ package pulsar
 
 import (
        "context"
+       "errors"
        "fmt"
        "regexp"
        "strings"
@@ -210,6 +211,10 @@ func (c *regexConsumer) Close() {
        })
 }
 
+func (c *regexConsumer) Seek(msgID MessageID) error {
+       return errors.New("seek command not allowed for regex consumer")
+}
+
 func (c *regexConsumer) closed() bool {
        select {
        case <-c.closeCh:
diff --git a/pulsar/consumer_test.go b/pulsar/consumer_test.go
index 52ca7c2..2ba76e8 100644
--- a/pulsar/consumer_test.go
+++ b/pulsar/consumer_test.go
@@ -146,8 +146,9 @@ func TestBatchMessageReceive(t *testing.T) {
                SubscriptionName: subName,
        })
        assert.Nil(t, err)
-       count := 0
+       defer consumer.Close()
 
+       count := 0
        for i := 0; i < numOfMessages; i++ {
                messageContent := prefix + fmt.Sprintf("%d", i)
                msg := &ProducerMessage{
@@ -534,6 +535,7 @@ func TestConsumerFlow(t *testing.T) {
                ReceiverQueueSize: 4,
        })
        assert.Nil(t, err)
+       defer consumer.Close()
 
        for msgNum := 0; msgNum < 100; msgNum++ {
                if _, err := producer.Send(ctx, &ProducerMessage{
@@ -640,6 +642,7 @@ func TestConsumerNack(t *testing.T) {
                NackRedeliveryDelay: 1 * time.Second,
        })
        assert.Nil(t, err)
+       defer consumer.Close()
 
        const N = 100
 
@@ -700,6 +703,7 @@ func TestConsumerCompression(t *testing.T) {
                SubscriptionName: "sub-1",
        })
        assert.Nil(t, err)
+       defer consumer.Close()
 
        const N = 100
 
@@ -743,6 +747,7 @@ func TestConsumerCompressionWithBatches(t *testing.T) {
                SubscriptionName: "sub-1",
        })
        assert.Nil(t, err)
+       defer consumer.Close()
 
        const N = 100
 
@@ -762,6 +767,58 @@ func TestConsumerCompressionWithBatches(t *testing.T) {
        }
 }
 
+func TestConsumerSeek(t *testing.T) {
+       client, err := NewClient(ClientOptions{
+               URL: lookupURL,
+       })
+       assert.Nil(t, err)
+       defer client.Close()
+
+       topicName := newTopicName()
+       ctx := context.Background()
+
+       producer, err := client.CreateProducer(ProducerOptions{
+               Topic:           topicName,
+               DisableBatching: false,
+       })
+       assert.Nil(t, err)
+       defer producer.Close()
+
+       consumer, err := client.Subscribe(ConsumerOptions{
+               Topic:            topicName,
+               SubscriptionName: "sub-1",
+       })
+       assert.Nil(t, err)
+       defer consumer.Close()
+
+       const N = 10
+       var seekID MessageID
+       for i := 0; i < 10; i++ {
+               id, err := producer.Send(ctx, &ProducerMessage{
+                       Payload: []byte(fmt.Sprintf("hello-%d", i)),
+               })
+               assert.Nil(t, err)
+
+               if i == 4 {
+                       seekID = id
+               }
+       }
+
+       for i := 0; i < N; i++ {
+               msg, err := consumer.Receive(ctx)
+               assert.Nil(t, err)
+               assert.Equal(t, fmt.Sprintf("hello-%d", i), 
string(msg.Payload()))
+               consumer.Ack(msg)
+       }
+
+       err = consumer.Seek(seekID)
+       assert.Nil(t, err)
+
+       msg, err := consumer.Receive(ctx)
+       assert.Nil(t, err)
+       assert.Equal(t, "hello-4", string(msg.Payload()))
+}
+
 func TestConsumerMetadata(t *testing.T) {
        client, err := NewClient(ClientOptions{
                URL: lookupURL,
diff --git a/pulsar/internal/connection.go b/pulsar/internal/connection.go
index b31e70f..dace305 100644
--- a/pulsar/internal/connection.go
+++ b/pulsar/internal/connection.go
@@ -426,6 +426,7 @@ func (c *connection) internalReceivedCommand(cmd 
*pb.BaseCommand, headersAndPayl
 
        case pb.BaseCommand_CLOSE_PRODUCER:
                c.handleCloseProducer(cmd.GetCloseProducer())
+
        case pb.BaseCommand_CLOSE_CONSUMER:
                c.handleCloseConsumer(cmd.GetCloseConsumer())
 

Reply via email to