This is an automated email from the ASF dual-hosted git repository.

mmerli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar-client-go.git


The following commit(s) were added to refs/heads/master by this push:
     new 0f7041f  [Issue 781][add consumer seek by time on partitioned topic]  
(#782)
0f7041f is described below

commit 0f7041ffa9085197aa888ac33d3288a3ed81c57b
Author: Garule Prabhudas <[email protected]>
AuthorDate: Fri Jun 24 02:54:49 2022 +0530

    [Issue 781][add consumer seek by time on partitioned topic]  (#782)
    
    * seek and every partition of topic and check for error
    
    * use array to store errors instead of channles
    
    * add test case to test seek by time on partitioned topic
    
    * wrap errors
    
    * refactor method
    
    Co-authored-by: PGarule <[email protected]>
---
 pulsar/consumer_impl.go | 13 ++++++---
 pulsar/consumer_test.go | 73 +++++++++++++++++++++++++++++++++++++++++++++++++
 2 files changed, 82 insertions(+), 4 deletions(-)

diff --git a/pulsar/consumer_impl.go b/pulsar/consumer_impl.go
index e887538..2328ca8 100644
--- a/pulsar/consumer_impl.go
+++ b/pulsar/consumer_impl.go
@@ -30,6 +30,7 @@ import (
        "github.com/apache/pulsar-client-go/pulsar/internal"
        pb "github.com/apache/pulsar-client-go/pulsar/internal/pulsar_proto"
        "github.com/apache/pulsar-client-go/pulsar/log"
+       pkgerrors "github.com/pkg/errors"
 )
 
 const defaultNackRedeliveryDelay = 1 * time.Minute
@@ -589,11 +590,15 @@ func (c *consumer) Seek(msgID MessageID) error {
 func (c *consumer) SeekByTime(time time.Time) error {
        c.Lock()
        defer c.Unlock()
-       if len(c.consumers) > 1 {
-               return newError(SeekFailed, "for partition topic, seek command 
should perform on the individual partitions")
+       var errs error
+       // run SeekByTime on every partition of topic
+       for _, cons := range c.consumers {
+               if err := cons.SeekByTime(time); err != nil {
+                       msg := fmt.Sprintf("unable to SeekByTime for topic=%s 
subscription=%s", c.topic, c.Subscription())
+                       errs = pkgerrors.Wrap(newError(SeekFailed, 
err.Error()), msg)
+               }
        }
-
-       return c.consumers[0].SeekByTime(time)
+       return errs
 }
 
 var r = &random{
diff --git a/pulsar/consumer_test.go b/pulsar/consumer_test.go
index 0366884..20d4290 100644
--- a/pulsar/consumer_test.go
+++ b/pulsar/consumer_test.go
@@ -3052,3 +3052,76 @@ func TestEncryptDecryptRedeliveryOnFailure(t *testing.T) 
{
        assert.NotNil(t, msg)
        consumer.Ack(msg)
 }
+
+// TestConsumerSeekByTimeOnPartitionedTopic test seek by time on partitioned 
topic.
+// It is based on existing test case [TestConsumerSeekByTime] but for 
partitioned topic.
+func TestConsumerSeekByTimeOnPartitionedTopic(t *testing.T) {
+       client, err := NewClient(ClientOptions{
+               URL: lookupURL,
+       })
+       assert.Nil(t, err)
+       defer client.Close()
+
+       // Create topic with 5 partitions
+       topicAdminURL := 
"admin/v2/persistent/public/default/TestSeekByTimeOnPartitionedTopic/partitions"
+       err = httpPut(topicAdminURL, 5)
+       defer httpDelete(topicAdminURL)
+       assert.Nil(t, err)
+
+       topicName := 
"persistent://public/default/TestSeekByTimeOnPartitionedTopic"
+
+       partitions, err := client.TopicPartitions(topicName)
+       assert.Nil(t, err)
+       assert.Equal(t, len(partitions), 5)
+       for i := 0; i < 5; i++ {
+               assert.Equal(t, partitions[i],
+                       fmt.Sprintf("%s-partition-%d", topicName, i))
+       }
+
+       ctx := context.Background()
+
+       producer, err := client.CreateProducer(ProducerOptions{
+               Topic:           topicName,
+               DisableBatching: false,
+       })
+       assert.Nil(t, err)
+       defer producer.Close()
+
+       consumer, err := client.Subscribe(ConsumerOptions{
+               Topic:            topicName,
+               SubscriptionName: "my-sub",
+       })
+       assert.Nil(t, err)
+       defer consumer.Close()
+
+       // Use value bigger than 1000 to full-fill queue channel with size 1000 
and message channel with size 10
+       const N = 1100
+       resetTimeStr := "100s"
+       retentionTimeInSecond, err := 
internal.ParseRelativeTimeInSeconds(resetTimeStr)
+       assert.Nil(t, err)
+
+       for i := 0; i < N; i++ {
+               _, err := producer.Send(ctx, &ProducerMessage{
+                       Payload: []byte(fmt.Sprintf("hello-%d", i)),
+               })
+               assert.Nil(t, err)
+       }
+
+       // Don't consume all messages so some stay in queues
+       for i := 0; i < N-20; i++ {
+               msg, err := consumer.Receive(ctx)
+               assert.Nil(t, err)
+               consumer.Ack(msg)
+       }
+
+       currentTimestamp := time.Now()
+       err = consumer.SeekByTime(currentTimestamp.Add(-retentionTimeInSecond))
+       assert.Nil(t, err)
+
+       // should be able to consume all messages once again
+       for i := 0; i < N; i++ {
+               msg, err := consumer.Receive(ctx)
+               assert.Nil(t, err)
+               consumer.Ack(msg)
+       }
+}

Reply via email to