This is an automated email from the ASF dual-hosted git repository.
mmerli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 2ed8fea [go client] add SubscriptionInitPos option in ConsumerOptions
(#3588)
2ed8fea is described below
commit 2ed8fea94e3565ebe2ffc999412377cf920354c5
Author: 冉小龙 <[email protected]>
AuthorDate: Wed Feb 13 21:26:43 2019 +0800
[go client] add SubscriptionInitPos option in ConsumerOptions (#3588)
* [go client] add SubscriptionInitPos option in ConsumerOptions
Signed-off-by: xiaolong.ran <[email protected]>
* add comment for SubscriptionInitPos option
Signed-off-by: xiaolong.ran <[email protected]>
---
pulsar-client-go/pulsar/c_consumer.go | 4 +++
pulsar-client-go/pulsar/consumer.go | 14 ++++++++++
pulsar-client-go/pulsar/consumer_test.go | 48 +++++++++++++++++++++++++++++++-
3 files changed, 65 insertions(+), 1 deletion(-)
diff --git a/pulsar-client-go/pulsar/c_consumer.go
b/pulsar-client-go/pulsar/c_consumer.go
index 6f333f1..9ca73e8 100644
--- a/pulsar-client-go/pulsar/c_consumer.go
+++ b/pulsar-client-go/pulsar/c_consumer.go
@@ -100,6 +100,10 @@ func subscribeAsync(client *client, options
ConsumerOptions, callback func(Consu
C.pulsar_consumer_configuration_set_consumer_type(conf,
C.pulsar_consumer_type(options.Type))
}
+ if options.SubscriptionInitPos != Latest {
+ C.pulsar_consumer_set_subscription_initial_position(conf,
C.initial_position(options.SubscriptionInitPos))
+ }
+
// ReceiverQueueSize==0 means to use the default queue size
// -1 means to disable the consumer prefetching
if options.ReceiverQueueSize > 0 {
diff --git a/pulsar-client-go/pulsar/consumer.go
b/pulsar-client-go/pulsar/consumer.go
index d431daa..d7549c4 100644
--- a/pulsar-client-go/pulsar/consumer.go
+++ b/pulsar-client-go/pulsar/consumer.go
@@ -46,6 +46,16 @@ const (
Failover
)
+type InitialPosition int
+
+const (
+ // Latest position which means the start consuming position will be the
last message
+ Latest InitialPosition = iota
+
+ // Earliest position which means the start consuming position will be
the first message
+ Earliest
+)
+
// ConsumerBuilder is used to configure and create instances of Consumer
type ConsumerOptions struct {
// Specify the topic this consumer will subscribe on.
@@ -77,6 +87,10 @@ type ConsumerOptions struct {
// Default is `Exclusive`
Type SubscriptionType
+ // InitialPosition at which the cursor will be set when subscribe
+ // Default is `Latest`
+ SubscriptionInitPos InitialPosition
+
// Sets a `MessageChannel` for the consumer
// When a message is received, it will be pushed to the channel for
consumption
MessageChannel chan ConsumerMessage
diff --git a/pulsar-client-go/pulsar/consumer_test.go
b/pulsar-client-go/pulsar/consumer_test.go
index 963921e..e82ffe6 100644
--- a/pulsar-client-go/pulsar/consumer_test.go
+++ b/pulsar-client-go/pulsar/consumer_test.go
@@ -454,7 +454,53 @@ func TestConsumer_Seek(t *testing.T) {
msg, err := consumer.Receive(ctx)
assert.Nil(t, err)
t.Logf("again received message:%+v", msg.ID())
- assert.Equal(t,string(msg.Payload()),"msg-content-0")
+ assert.Equal(t, "msg-content-0", string(msg.Payload()))
consumer.Unsubscribe()
}
+
+func TestConsumer_SubscriptionInitPos(t *testing.T) {
+ client, err := NewClient(ClientOptions{
+ URL: "pulsar://localhost:6650",
+ })
+
+ assert.Nil(t, err)
+ defer client.Close()
+
+ topicName := "persistent://public/default/testSeek"
+ subName := "test-subscription-initial-earliest-position"
+
+ // create producer
+ producer, err := client.CreateProducer(ProducerOptions{
+ Topic: topicName,
+ })
+ assert.Nil(t, err)
+ defer producer.Close()
+
+ //sent message
+ ctx := context.Background()
+
+ err = producer.Send(ctx, ProducerMessage{
+ Payload: []byte("msg-1-content-1"),
+ })
+ assert.Nil(t, err)
+
+ err = producer.Send(ctx, ProducerMessage{
+ Payload: []byte("msg-1-content-2"),
+ })
+ assert.Nil(t, err)
+
+ // create consumer
+ consumer, err := client.Subscribe(ConsumerOptions{
+ Topic: topicName,
+ SubscriptionName: subName,
+ SubscriptionInitPos: Earliest,
+ })
+ assert.Nil(t, err)
+ defer consumer.Close()
+
+ msg, err := consumer.Receive(ctx)
+ assert.Nil(t, err)
+
+ assert.Equal(t, "msg-1-content-1", string(msg.Payload()))
+}