This is an automated email from the ASF dual-hosted git repository.

rxl pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar-client-go.git


The following commit(s) were added to refs/heads/master by this push:
     new 1ccc5de  Add seek by time (#180)
1ccc5de is described below

commit 1ccc5de7cb497d49c0ee61f212bad5cce0c94d52
Author: 冉小龙 <[email protected]>
AuthorDate: Wed Feb 5 10:52:50 2020 +0800

    Add seek by time (#180)
    
    Signed-off-by: xiaolong.ran <[email protected]>
    
    ### Motivation
    
    - Add seek by time on consumer
    
    ### Modifications
    
    - Add seek by time on consumer
    - Add test case
---
 pulsar/consumer.go            | 10 ++++++++
 pulsar/consumer_impl.go       |  8 ++++++
 pulsar/consumer_multitopic.go |  5 ++++
 pulsar/consumer_partition.go  | 42 +++++++++++++++++++++++++++++++
 pulsar/consumer_regex.go      |  4 +++
 pulsar/consumer_test.go       | 56 ++++++++++++++++++++++++++++++++++++++++++
 pulsar/internal/utils.go      | 34 ++++++++++++++++++++++++++
 pulsar/internal/utils_test.go | 57 +++++++++++++++++++++++++++++++++++++++++++
 8 files changed, 216 insertions(+)

diff --git a/pulsar/consumer.go b/pulsar/consumer.go
index e832b21..18eced8 100644
--- a/pulsar/consumer.go
+++ b/pulsar/consumer.go
@@ -174,4 +174,14 @@ type Consumer interface {
        // Note: this operation can only be done on non-partitioned topics. For 
these, one can rather perform the
        //       seek() on the individual partitions.
        Seek(MessageID) error
+
+       // Reset the subscription associated with this consumer to a specific 
message publish time.
+       //
+       // Note: this operation can only be done on non-partitioned topics. For 
these, one can rather perform the seek() on
+       // the individual partitions.
+       //
+       // @param timestamp
+       //            the message publish time where to reposition the 
subscription
+       //
+       SeekByTime(time time.Time) error
 }
diff --git a/pulsar/consumer_impl.go b/pulsar/consumer_impl.go
index 095050f..da98b1d 100644
--- a/pulsar/consumer_impl.go
+++ b/pulsar/consumer_impl.go
@@ -318,6 +318,14 @@ func (c *consumer) Seek(msgID MessageID) error {
        return c.consumers[mid.partitionIdx].Seek(mid)
 }
 
+func (c *consumer) SeekByTime(time time.Time) error {
+       if len(c.consumers) > 1 {
+               return errors.New("for partition topic, seek command should 
perform on the individual partitions")
+       }
+
+       return c.consumers[0].SeekByTime(time)
+}
+
 var r = &random{
        R: rand.New(rand.NewSource(time.Now().UnixNano())),
 }
diff --git a/pulsar/consumer_multitopic.go b/pulsar/consumer_multitopic.go
index 6ba3a4a..7e9994d 100644
--- a/pulsar/consumer_multitopic.go
+++ b/pulsar/consumer_multitopic.go
@@ -22,6 +22,7 @@ import (
        "errors"
        "fmt"
        "sync"
+       "time"
 
        pkgerrors "github.com/pkg/errors"
 
@@ -165,3 +166,7 @@ func (c *multiTopicConsumer) Close() {
 func (c *multiTopicConsumer) Seek(msgID MessageID) error {
        return errors.New("seek command not allowed for multi topic consumer")
 }
+
+func (c *multiTopicConsumer) SeekByTime(time time.Time) error {
+       return errors.New("seek command not allowed for multi topic consumer")
+}
diff --git a/pulsar/consumer_partition.go b/pulsar/consumer_partition.go
index 185f351..c252e74 100644
--- a/pulsar/consumer_partition.go
+++ b/pulsar/consumer_partition.go
@@ -297,6 +297,40 @@ func (pc *partitionConsumer) internalSeek(seek 
*seekRequest) {
        }
 }
 
+func (pc *partitionConsumer) SeekByTime(time time.Time) error {
+       req := &seekByTimeRequest{
+               doneCh:      make(chan struct{}),
+               publishTime: time,
+       }
+       pc.eventsCh <- req
+
+       // wait for the request to complete
+       <-req.doneCh
+       return req.err
+}
+
+func (pc *partitionConsumer) internalSeekByTime(seek *seekByTimeRequest) {
+       defer close(seek.doneCh)
+
+       if pc.state == consumerClosing || pc.state == consumerClosed {
+               pc.log.Error("Consumer was already closed")
+               return
+       }
+
+       requestID := pc.client.rpcClient.NewRequestID()
+       cmdSeek := &pb.CommandSeek{
+               ConsumerId:         proto.Uint64(pc.consumerID),
+               RequestId:          proto.Uint64(requestID),
+               MessagePublishTime: 
proto.Uint64(uint64(seek.publishTime.Unix())),
+       }
+
+       _, err := pc.client.rpcClient.RequestOnCnx(pc.conn, requestID, 
pb.BaseCommand_SEEK, cmdSeek)
+       if err != nil {
+               pc.log.WithError(err).Error("Failed to reset to message publish 
time")
+               seek.err = err
+       }
+}
+
 func (pc *partitionConsumer) internalAck(req *ackRequest) {
        msgID := req.msgID
 
@@ -557,6 +591,12 @@ type seekRequest struct {
        err    error
 }
 
+type seekByTimeRequest struct {
+       doneCh      chan struct{}
+       publishTime time.Time
+       err         error
+}
+
 func (pc *partitionConsumer) runEventsLoop() {
        defer func() {
                pc.log.Debug("exiting events loop")
@@ -577,6 +617,8 @@ func (pc *partitionConsumer) runEventsLoop() {
                                pc.internalGetLastMessageID(v)
                        case *seekRequest:
                                pc.internalSeek(v)
+                       case *seekByTimeRequest:
+                               pc.internalSeekByTime(v)
                        case *connectionClosed:
                                pc.reconnectToBroker()
                        case *closeRequest:
diff --git a/pulsar/consumer_regex.go b/pulsar/consumer_regex.go
index 4043380..1518af6 100644
--- a/pulsar/consumer_regex.go
+++ b/pulsar/consumer_regex.go
@@ -215,6 +215,10 @@ func (c *regexConsumer) Seek(msgID MessageID) error {
        return errors.New("seek command not allowed for regex consumer")
 }
 
+func (c *regexConsumer) SeekByTime(time time.Time) error {
+       return errors.New("seek command not allowed for regex consumer")
+}
+
 func (c *regexConsumer) closed() bool {
        select {
        case <-c.closeCh:
diff --git a/pulsar/consumer_test.go b/pulsar/consumer_test.go
index 2ba76e8..d05b8ca 100644
--- a/pulsar/consumer_test.go
+++ b/pulsar/consumer_test.go
@@ -25,6 +25,7 @@ import (
        "testing"
        "time"
 
+       "github.com/apache/pulsar-client-go/pulsar/internal"
        "github.com/stretchr/testify/assert"
 )
 
@@ -819,6 +820,61 @@ func TestConsumerSeek(t *testing.T) {
        assert.Equal(t, "hello-4", string(msg.Payload()))
 }
 
+func TestConsumerSeekByTime(t *testing.T) {
+       client, err := NewClient(ClientOptions{
+               URL: lookupURL,
+       })
+       assert.Nil(t, err)
+       defer client.Close()
+
+       topicName := newTopicName()
+       ctx := context.Background()
+
+       producer, err := client.CreateProducer(ProducerOptions{
+               Topic:           topicName,
+               DisableBatching: false,
+       })
+       assert.Nil(t, err)
+       defer producer.Close()
+
+       consumer, err := client.Subscribe(ConsumerOptions{
+               Topic:            topicName,
+               SubscriptionName: "my-sub",
+       })
+       assert.Nil(t, err)
+       defer consumer.Close()
+
+       const N = 10
+       resetTimeStr := "100s"
+       retentionTimeInSecond, err := 
internal.ParseRelativeTimeInSeconds(resetTimeStr)
+       assert.Nil(t, err)
+
+       for i := 0; i < 10; i++ {
+               _, err := producer.Send(ctx, &ProducerMessage{
+                       Payload: []byte(fmt.Sprintf("hello-%d", i)),
+               })
+               assert.Nil(t, err)
+       }
+
+       for i := 0; i < N; i++ {
+               msg, err := consumer.Receive(ctx)
+               assert.Nil(t, err)
+               assert.Equal(t, fmt.Sprintf("hello-%d", i), 
string(msg.Payload()))
+               consumer.Ack(msg)
+       }
+
+       currentTimestamp := time.Now()
+       err = consumer.SeekByTime(currentTimestamp.Add(-retentionTimeInSecond))
+       assert.Nil(t, err)
+
+       for i := 0; i < N; i++ {
+               msg, err := consumer.Receive(ctx)
+               assert.Nil(t, err)
+               assert.Equal(t, fmt.Sprintf("hello-%d", i), 
string(msg.Payload()))
+               consumer.Ack(msg)
+       }
+}
+
 func TestConsumerMetadata(t *testing.T) {
        client, err := NewClient(ClientOptions{
                URL: lookupURL,
diff --git a/pulsar/internal/utils.go b/pulsar/internal/utils.go
index f0d5fb3..0763ced 100644
--- a/pulsar/internal/utils.go
+++ b/pulsar/internal/utils.go
@@ -18,8 +18,12 @@
 package internal
 
 import (
+       "strconv"
+       "strings"
        "sync/atomic"
        "time"
+
+       "github.com/pkg/errors"
 )
 
 // TimestampMillis return a time unix nano.
@@ -36,3 +40,33 @@ func GetAndAdd(n *uint64, diff uint64) uint64 {
                }
        }
 }
+
+func ParseRelativeTimeInSeconds(relativeTime string) (time.Duration, error) {
+       if relativeTime == "" {
+               return -1, errors.New("time can not be empty")
+       }
+
+       unitTime := relativeTime[len(relativeTime)-1:]
+       t := relativeTime[:len(relativeTime)-1]
+       timeValue, err := strconv.ParseInt(t, 10, 64)
+       if err != nil {
+               return -1, errors.Errorf("invalid time '%s'", t)
+       }
+
+       switch strings.ToLower(unitTime) {
+       case "s":
+               return time.Duration(timeValue) * time.Second, nil
+       case "m":
+               return time.Duration(timeValue) * time.Minute, nil
+       case "h":
+               return time.Duration(timeValue) * time.Hour, nil
+       case "d":
+               return time.Duration(timeValue) * time.Hour * 24, nil
+       case "w":
+               return time.Duration(timeValue) * time.Hour * 24 * 7, nil
+       case "y":
+               return time.Duration(timeValue) * time.Hour * 24 * 7 * 365, nil
+       default:
+               return -1, errors.Errorf("invalid time unit '%s'", unitTime)
+       }
+}
diff --git a/pulsar/internal/utils_test.go b/pulsar/internal/utils_test.go
new file mode 100644
index 0000000..65babd0
--- /dev/null
+++ b/pulsar/internal/utils_test.go
@@ -0,0 +1,57 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package internal
+
+import (
+       "testing"
+       "time"
+
+       "github.com/stretchr/testify/assert"
+)
+
+func TestParseRelativeTimeInSeconds(t *testing.T) {
+       testSecondStr := "10s"
+       timestamp, err := ParseRelativeTimeInSeconds(testSecondStr)
+       assert.Nil(t, err)
+       assert.Equal(t, time.Duration(10)*time.Second, timestamp)
+
+       testMinuteStr := "10m"
+       timestamp, err = ParseRelativeTimeInSeconds(testMinuteStr)
+       assert.Nil(t, err)
+       assert.Equal(t, time.Duration(10)*time.Minute, timestamp)
+
+       testHourStr := "10h"
+       timestamp, err = ParseRelativeTimeInSeconds(testHourStr)
+       assert.Nil(t, err)
+       assert.Equal(t, time.Duration(10)*time.Hour, timestamp)
+
+       testDaysStr := "10d"
+       timestamp, err = ParseRelativeTimeInSeconds(testDaysStr)
+       assert.Nil(t, err)
+       assert.Equal(t, time.Duration(10)*time.Hour*24, timestamp)
+
+       testWeekStr := "10w"
+       timestamp, err = ParseRelativeTimeInSeconds(testWeekStr)
+       assert.Nil(t, err)
+       assert.Equal(t, time.Duration(10)*time.Hour*24*7, timestamp)
+
+       testYearStr := "10y"
+       timestamp, err = ParseRelativeTimeInSeconds(testYearStr)
+       assert.Nil(t, err)
+       assert.Equal(t, time.Duration(10)*time.Hour*24*7*365, timestamp)
+}

Reply via email to