This is an automated email from the ASF dual-hosted git repository.
mmerli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 9917784 [go-client] support consumer seek (#3478)
9917784 is described below
commit 991778474273e9fa150e367bf0e788748559a1a3
Author: 冉小龙 <[email protected]>
AuthorDate: Thu Jan 31 22:47:36 2019 +0800
[go-client] support consumer seek (#3478)
* [go-client] support consumer seek
Signed-off-by: xiaolong.ran <[email protected]>
* add unit test for consumer seek
Signed-off-by: xiaolong.ran <[email protected]>
---
pulsar-client-go/pulsar/c_consumer.go | 24 ++++++++++
pulsar-client-go/pulsar/c_go_pulsar.h | 6 +++
pulsar-client-go/pulsar/consumer.go | 7 +++
pulsar-client-go/pulsar/consumer_test.go | 75 +++++++++++++++++++++++++++++---
4 files changed, 106 insertions(+), 6 deletions(-)
diff --git a/pulsar-client-go/pulsar/c_consumer.go
b/pulsar-client-go/pulsar/c_consumer.go
index c78a58e..6f333f1 100644
--- a/pulsar-client-go/pulsar/c_consumer.go
+++ b/pulsar-client-go/pulsar/c_consumer.go
@@ -278,3 +278,27 @@ func pulsarConsumerCloseCallbackProxy(res C.pulsar_result,
ctx unsafe.Pointer) {
func (c *consumer) RedeliverUnackedMessages() {
C.pulsar_consumer_redeliver_unacknowledged_messages(c.ptr)
}
+
+func (c *consumer) Seek(msgID MessageID) error {
+ channel := make(chan error)
+ c.SeekAsync(msgID, func(err error) {
+ channel <- err
+ close(channel)
+ })
+ return <-channel
+}
+
+func (c *consumer) SeekAsync(msgID MessageID, callback func(error)) {
+ C._pulsar_consumer_seek_async(c.ptr, msgID.(*messageID).ptr,
savePointer(callback))
+}
+
+//export pulsarConsumerSeekCallbackProxy
+func pulsarConsumerSeekCallbackProxy(res C.pulsar_result, ctx unsafe.Pointer) {
+ callback := restorePointer(ctx).(func(err error))
+
+ if res != C.pulsar_result_Ok {
+ go callback(newError(res, "Failed to seek Consumer"))
+ } else {
+ go callback(nil)
+ }
+}
diff --git a/pulsar-client-go/pulsar/c_go_pulsar.h
b/pulsar-client-go/pulsar/c_go_pulsar.h
index 53c19e3..463503d 100644
--- a/pulsar-client-go/pulsar/c_go_pulsar.h
+++ b/pulsar-client-go/pulsar/c_go_pulsar.h
@@ -118,6 +118,12 @@ static inline void
_pulsar_consumer_close_async(pulsar_consumer_t *consumer, voi
pulsar_consumer_close_async(consumer, pulsarConsumerCloseCallbackProxy,
ctx);
}
+void pulsarConsumerSeekCallbackProxy(pulsar_result result, void *ctx);
+
+static inline void _pulsar_consumer_seek_async(pulsar_consumer_t *consumer,
pulsar_message_id_t *messageId,void *ctx) {
+ pulsar_consumer_seek_async(consumer,
messageId,pulsarConsumerSeekCallbackProxy, ctx);
+}
+
//// Reader callbacks
void pulsarCreateReaderCallbackProxy(pulsar_result result, pulsar_reader_t
*reader, void *ctx);
diff --git a/pulsar-client-go/pulsar/consumer.go
b/pulsar-client-go/pulsar/consumer.go
index 6c4ae42..d431daa 100644
--- a/pulsar-client-go/pulsar/consumer.go
+++ b/pulsar-client-go/pulsar/consumer.go
@@ -150,6 +150,13 @@ type Consumer interface {
// Close the consumer and stop the broker to push more messages
Close() error
+ // Reset the subscription associated with this consumer to a specific
message id.
+ // The message id can either be a specific message or represent the
first or last messages in the topic.
+ //
+ // Note: this operation can only be done on non-partitioned topics. For
these, one can rather perform the
+ // seek() on the individual partitions.
+ Seek(msgID MessageID) error
+
// Redelivers all the unacknowledged messages. In Failover mode, the
request is ignored if the consumer is not
// active for the given topic. In Shared mode, the consumers messages
to be redelivered are distributed across all
// the connected consumers. This is a non blocking call and doesn't
throw an exception. In case the connection
diff --git a/pulsar-client-go/pulsar/consumer_test.go
b/pulsar-client-go/pulsar/consumer_test.go
index 5b865a4..963921e 100644
--- a/pulsar-client-go/pulsar/consumer_test.go
+++ b/pulsar-client-go/pulsar/consumer_test.go
@@ -67,13 +67,13 @@ func TestConsumer(t *testing.T) {
defer producer.Close()
consumer, err := client.Subscribe(ConsumerOptions{
- Topic: "my-topic",
- SubscriptionName: "my-sub",
- AckTimeout: 1 * time.Minute,
- Name: "my-consumer-name",
- ReceiverQueueSize: 100,
+ Topic: "my-topic",
+ SubscriptionName: "my-sub",
+ AckTimeout: 1 * time.Minute,
+ Name: "my-consumer-name",
+ ReceiverQueueSize: 100,
MaxTotalReceiverQueueSizeAcrossPartitions: 10000,
- Type: Shared,
+ Type: Shared,
})
assert.Nil(t, err)
@@ -101,6 +101,9 @@ func TestConsumer(t *testing.T) {
consumer.Ack(msg)
}
+ err = consumer.Seek(EarliestMessage)
+ assert.Nil(t, err)
+
consumer.Unsubscribe()
}
@@ -395,3 +398,63 @@ func TestConsumerRegex(t *testing.T) {
consumer.Unsubscribe()
}
+
+func TestConsumer_Seek(t *testing.T) {
+ client, err := NewClient(ClientOptions{
+ URL: "pulsar://localhost:6650",
+ })
+
+ assert.Nil(t, err)
+ defer client.Close()
+
+ topicName := "persistent://public/default/testSeek"
+ subName := "sub-testSeek"
+
+ producer, err := client.CreateProducer(ProducerOptions{
+ Topic: topicName,
+ })
+ assert.Nil(t, err)
+ assert.Equal(t, producer.Topic(), topicName)
+ defer producer.Close()
+
+ consumer, err := client.Subscribe(ConsumerOptions{
+ Topic: topicName,
+ SubscriptionName: subName,
+ })
+ assert.Nil(t, err)
+ assert.Equal(t, consumer.Topic(), topicName)
+ assert.Equal(t, consumer.Subscription(), subName)
+ defer consumer.Close()
+
+ ctx := context.Background()
+
+ // Send 10 messages synchronously
+ t.Log("Publishing 10 messages synchronously")
+ for msgNum := 0; msgNum < 10; msgNum++ {
+ if err := producer.Send(ctx, ProducerMessage{
+ Payload: []byte(fmt.Sprintf("msg-content-%d", msgNum)),
+ }); err != nil {
+ t.Fatal(err)
+ }
+ }
+
+ t.Log("Trying to receive 10 messages")
+ for msgNum := 0; msgNum < 10; msgNum++ {
+ _, err := consumer.Receive(ctx)
+ assert.Nil(t, err)
+ }
+
+ // seek to earliest, expected receive first message.
+ err = consumer.Seek(EarliestMessage)
+ assert.Nil(t, err)
+
+ // Sleeping for 500ms to wait for consumer re-connect
+ time.Sleep(500 * time.Millisecond)
+
+ msg, err := consumer.Receive(ctx)
+ assert.Nil(t, err)
+ t.Logf("again received message:%+v", msg.ID())
+ assert.Equal(t,string(msg.Payload()),"msg-content-0")
+
+ consumer.Unsubscribe()
+}