This is an automated email from the ASF dual-hosted git repository.

mmerli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 9917784  [go-client] support consumer seek (#3478)
9917784 is described below

commit 991778474273e9fa150e367bf0e788748559a1a3
Author: 冉小龙 <[email protected]>
AuthorDate: Thu Jan 31 22:47:36 2019 +0800

    [go-client] support consumer seek (#3478)
    
    * [go-client] support consumer seek
    
    Signed-off-by: xiaolong.ran <[email protected]>
    
    * add unit test for consumer seek
    
    Signed-off-by: xiaolong.ran <[email protected]>
---
 pulsar-client-go/pulsar/c_consumer.go    | 24 ++++++++++
 pulsar-client-go/pulsar/c_go_pulsar.h    |  6 +++
 pulsar-client-go/pulsar/consumer.go      |  7 +++
 pulsar-client-go/pulsar/consumer_test.go | 75 +++++++++++++++++++++++++++++---
 4 files changed, 106 insertions(+), 6 deletions(-)

diff --git a/pulsar-client-go/pulsar/c_consumer.go 
b/pulsar-client-go/pulsar/c_consumer.go
index c78a58e..6f333f1 100644
--- a/pulsar-client-go/pulsar/c_consumer.go
+++ b/pulsar-client-go/pulsar/c_consumer.go
@@ -278,3 +278,27 @@ func pulsarConsumerCloseCallbackProxy(res C.pulsar_result, 
ctx unsafe.Pointer) {
 func (c *consumer) RedeliverUnackedMessages() {
        C.pulsar_consumer_redeliver_unacknowledged_messages(c.ptr)
 }
+
+func (c *consumer) Seek(msgID MessageID) error {
+       channel := make(chan error)
+       c.SeekAsync(msgID, func(err error) {
+               channel <- err
+               close(channel)
+       })
+       return <-channel
+}
+
+func (c *consumer) SeekAsync(msgID MessageID, callback func(error)) {
+       C._pulsar_consumer_seek_async(c.ptr, msgID.(*messageID).ptr, 
savePointer(callback))
+}
+
+//export pulsarConsumerSeekCallbackProxy
+func pulsarConsumerSeekCallbackProxy(res C.pulsar_result, ctx unsafe.Pointer) {
+       callback := restorePointer(ctx).(func(err error))
+
+       if res != C.pulsar_result_Ok {
+               go callback(newError(res, "Failed to seek Consumer"))
+       } else {
+               go callback(nil)
+       }
+}
diff --git a/pulsar-client-go/pulsar/c_go_pulsar.h 
b/pulsar-client-go/pulsar/c_go_pulsar.h
index 53c19e3..463503d 100644
--- a/pulsar-client-go/pulsar/c_go_pulsar.h
+++ b/pulsar-client-go/pulsar/c_go_pulsar.h
@@ -118,6 +118,12 @@ static inline void 
_pulsar_consumer_close_async(pulsar_consumer_t *consumer, voi
     pulsar_consumer_close_async(consumer, pulsarConsumerCloseCallbackProxy, 
ctx);
 }
 
+void pulsarConsumerSeekCallbackProxy(pulsar_result result, void *ctx);
+
+static inline void _pulsar_consumer_seek_async(pulsar_consumer_t *consumer, 
pulsar_message_id_t *messageId,void *ctx) {
+    pulsar_consumer_seek_async(consumer, 
messageId,pulsarConsumerSeekCallbackProxy, ctx);
+}
+
 //// Reader callbacks
 
 void pulsarCreateReaderCallbackProxy(pulsar_result result, pulsar_reader_t 
*reader, void *ctx);
diff --git a/pulsar-client-go/pulsar/consumer.go 
b/pulsar-client-go/pulsar/consumer.go
index 6c4ae42..d431daa 100644
--- a/pulsar-client-go/pulsar/consumer.go
+++ b/pulsar-client-go/pulsar/consumer.go
@@ -150,6 +150,13 @@ type Consumer interface {
        // Close the consumer and stop the broker to push more messages
        Close() error
 
+       // Reset the subscription associated with this consumer to a specific 
message id.
+       // The message id can either be a specific message or represent the 
first or last messages in the topic.
+       //
+       // Note: this operation can only be done on non-partitioned topics. For 
these, one can rather perform the
+       //       seek() on the individual partitions.
+       Seek(msgID MessageID) error
+
        // Redelivers all the unacknowledged messages. In Failover mode, the 
request is ignored if the consumer is not
        // active for the given topic. In Shared mode, the consumers messages 
to be redelivered are distributed across all
        // the connected consumers. This is a non blocking call and doesn't 
throw an exception. In case the connection
diff --git a/pulsar-client-go/pulsar/consumer_test.go 
b/pulsar-client-go/pulsar/consumer_test.go
index 5b865a4..963921e 100644
--- a/pulsar-client-go/pulsar/consumer_test.go
+++ b/pulsar-client-go/pulsar/consumer_test.go
@@ -67,13 +67,13 @@ func TestConsumer(t *testing.T) {
        defer producer.Close()
 
        consumer, err := client.Subscribe(ConsumerOptions{
-               Topic:                                     "my-topic",
-               SubscriptionName:                          "my-sub",
-               AckTimeout:                                1 * time.Minute,
-               Name:                                      "my-consumer-name",
-               ReceiverQueueSize:                         100,
+               Topic:             "my-topic",
+               SubscriptionName:  "my-sub",
+               AckTimeout:        1 * time.Minute,
+               Name:              "my-consumer-name",
+               ReceiverQueueSize: 100,
                MaxTotalReceiverQueueSizeAcrossPartitions: 10000,
-               Type:                                      Shared,
+               Type: Shared,
        })
 
        assert.Nil(t, err)
@@ -101,6 +101,9 @@ func TestConsumer(t *testing.T) {
                consumer.Ack(msg)
        }
 
+       err = consumer.Seek(EarliestMessage)
+       assert.Nil(t, err)
+
        consumer.Unsubscribe()
 }
 
@@ -395,3 +398,63 @@ func TestConsumerRegex(t *testing.T) {
 
        consumer.Unsubscribe()
 }
+
+func TestConsumer_Seek(t *testing.T) {
+       client, err := NewClient(ClientOptions{
+               URL: "pulsar://localhost:6650",
+       })
+
+       assert.Nil(t, err)
+       defer client.Close()
+
+       topicName := "persistent://public/default/testSeek"
+       subName := "sub-testSeek"
+
+       producer, err := client.CreateProducer(ProducerOptions{
+               Topic: topicName,
+       })
+       assert.Nil(t, err)
+       assert.Equal(t, producer.Topic(), topicName)
+       defer producer.Close()
+
+       consumer, err := client.Subscribe(ConsumerOptions{
+               Topic:            topicName,
+               SubscriptionName: subName,
+       })
+       assert.Nil(t, err)
+       assert.Equal(t, consumer.Topic(), topicName)
+       assert.Equal(t, consumer.Subscription(), subName)
+       defer consumer.Close()
+
+       ctx := context.Background()
+
+       // Send 10 messages synchronously
+       t.Log("Publishing 10 messages synchronously")
+       for msgNum := 0; msgNum < 10; msgNum++ {
+               if err := producer.Send(ctx, ProducerMessage{
+                       Payload: []byte(fmt.Sprintf("msg-content-%d", msgNum)),
+               }); err != nil {
+                       t.Fatal(err)
+               }
+       }
+
+       t.Log("Trying to receive 10 messages")
+       for msgNum := 0; msgNum < 10; msgNum++ {
+               _, err := consumer.Receive(ctx)
+               assert.Nil(t, err)
+       }
+
+       // seek to earliest, expected receive first message.
+       err = consumer.Seek(EarliestMessage)
+       assert.Nil(t, err)
+
+       // Sleeping for 500ms to wait for consumer re-connect
+       time.Sleep(500 * time.Millisecond)
+
+       msg, err := consumer.Receive(ctx)
+       assert.Nil(t, err)
+       t.Logf("again received message:%+v", msg.ID())
+       assert.Equal(t,string(msg.Payload()),"msg-content-0")
+
+       consumer.Unsubscribe()
+}

Reply via email to