This is an automated email from the ASF dual-hosted git repository.

mmerli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 2ed8fea  [go client] add SubscriptionInitPos option in ConsumerOptions 
(#3588)
2ed8fea is described below

commit 2ed8fea94e3565ebe2ffc999412377cf920354c5
Author: 冉小龙 <[email protected]>
AuthorDate: Wed Feb 13 21:26:43 2019 +0800

    [go client] add SubscriptionInitPos option in ConsumerOptions (#3588)
    
    * [go client] add SubscriptionInitPos option in ConsumerOptions
    
    Signed-off-by: xiaolong.ran <[email protected]>
    
    * add comment for SubscriptionInitPos option
    
    Signed-off-by: xiaolong.ran <[email protected]>
---
 pulsar-client-go/pulsar/c_consumer.go    |  4 +++
 pulsar-client-go/pulsar/consumer.go      | 14 ++++++++++
 pulsar-client-go/pulsar/consumer_test.go | 48 +++++++++++++++++++++++++++++++-
 3 files changed, 65 insertions(+), 1 deletion(-)

diff --git a/pulsar-client-go/pulsar/c_consumer.go 
b/pulsar-client-go/pulsar/c_consumer.go
index 6f333f1..9ca73e8 100644
--- a/pulsar-client-go/pulsar/c_consumer.go
+++ b/pulsar-client-go/pulsar/c_consumer.go
@@ -100,6 +100,10 @@ func subscribeAsync(client *client, options 
ConsumerOptions, callback func(Consu
                C.pulsar_consumer_configuration_set_consumer_type(conf, 
C.pulsar_consumer_type(options.Type))
        }
 
+       if options.SubscriptionInitPos != Latest {
+               C.pulsar_consumer_set_subscription_initial_position(conf, 
C.initial_position(options.SubscriptionInitPos))
+       }
+
        // ReceiverQueueSize==0 means to use the default queue size
        // -1 means to disable the consumer prefetching
        if options.ReceiverQueueSize > 0 {
diff --git a/pulsar-client-go/pulsar/consumer.go 
b/pulsar-client-go/pulsar/consumer.go
index d431daa..d7549c4 100644
--- a/pulsar-client-go/pulsar/consumer.go
+++ b/pulsar-client-go/pulsar/consumer.go
@@ -46,6 +46,16 @@ const (
        Failover
 )
 
+type InitialPosition int
+
+const (
+       // Latest position which means the start consuming position will be the 
last message
+       Latest InitialPosition = iota
+
+       // Earliest position which means the start consuming position will be 
the first message
+       Earliest
+)
+
 // ConsumerBuilder is used to configure and create instances of Consumer
 type ConsumerOptions struct {
        // Specify the topic this consumer will subscribe on.
@@ -77,6 +87,10 @@ type ConsumerOptions struct {
        // Default is `Exclusive`
        Type SubscriptionType
 
+       // InitialPosition at which the cursor will be set when subscribe
+       // Default is `Latest`
+       SubscriptionInitPos InitialPosition
+
        // Sets a `MessageChannel` for the consumer
        // When a message is received, it will be pushed to the channel for 
consumption
        MessageChannel chan ConsumerMessage
diff --git a/pulsar-client-go/pulsar/consumer_test.go 
b/pulsar-client-go/pulsar/consumer_test.go
index 963921e..e82ffe6 100644
--- a/pulsar-client-go/pulsar/consumer_test.go
+++ b/pulsar-client-go/pulsar/consumer_test.go
@@ -454,7 +454,53 @@ func TestConsumer_Seek(t *testing.T) {
        msg, err := consumer.Receive(ctx)
        assert.Nil(t, err)
        t.Logf("again received message:%+v", msg.ID())
-       assert.Equal(t,string(msg.Payload()),"msg-content-0")
+       assert.Equal(t, "msg-content-0", string(msg.Payload()))
 
        consumer.Unsubscribe()
 }
+
+func TestConsumer_SubscriptionInitPos(t *testing.T) {
+       client, err := NewClient(ClientOptions{
+               URL: "pulsar://localhost:6650",
+       })
+
+       assert.Nil(t, err)
+       defer client.Close()
+
+       topicName := "persistent://public/default/testSeek"
+       subName := "test-subscription-initial-earliest-position"
+
+       // create producer
+       producer, err := client.CreateProducer(ProducerOptions{
+               Topic: topicName,
+       })
+       assert.Nil(t, err)
+       defer producer.Close()
+
+       //sent message
+       ctx := context.Background()
+
+       err = producer.Send(ctx, ProducerMessage{
+               Payload: []byte("msg-1-content-1"),
+       })
+       assert.Nil(t, err)
+
+       err = producer.Send(ctx, ProducerMessage{
+               Payload: []byte("msg-1-content-2"),
+       })
+       assert.Nil(t, err)
+
+       // create consumer
+       consumer, err := client.Subscribe(ConsumerOptions{
+               Topic:               topicName,
+               SubscriptionName:    subName,
+               SubscriptionInitPos: Earliest,
+       })
+       assert.Nil(t, err)
+       defer consumer.Close()
+
+       msg, err := consumer.Receive(ctx)
+       assert.Nil(t, err)
+
+       assert.Equal(t, "msg-1-content-1", string(msg.Payload()))
+}

Reply via email to