This is an automated email from the ASF dual-hosted git repository.
rxl pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar-client-go.git
The following commit(s) were added to refs/heads/master by this push:
new f41441f [Issue-148][pulsar-client-go] add seek by messageID (#168)
f41441f is described below
commit f41441fa89edabda47ecca92cdc486d0c7927b03
Author: steven.wang <[email protected]>
AuthorDate: Mon Feb 3 17:58:26 2020 +1300
[Issue-148][pulsar-client-go] add seek by messageID (#168)
### Motivation
Add seek by messageID
---
pulsar/consumer.go | 7 +++++
pulsar/consumer_impl.go | 56 +++++++++++++++++++++++++---------------
pulsar/consumer_multitopic.go | 5 ++++
pulsar/consumer_partition.go | 49 +++++++++++++++++++++++++++++++++++
pulsar/consumer_regex.go | 5 ++++
pulsar/consumer_test.go | 59 ++++++++++++++++++++++++++++++++++++++++++-
pulsar/internal/connection.go | 1 +
7 files changed, 160 insertions(+), 22 deletions(-)
diff --git a/pulsar/consumer.go b/pulsar/consumer.go
index af69471..e832b21 100644
--- a/pulsar/consumer.go
+++ b/pulsar/consumer.go
@@ -167,4 +167,11 @@ type Consumer interface {
// Close the consumer and stop the broker to push more messages
Close()
+
+ // Reset the subscription associated with this consumer to a specific
message id.
+ // The message id can either be a specific message or represent the
first or last messages in the topic.
+ //
+ // Note: this operation can only be done on non-partitioned topics. For
these, one can rather perform the
+ // seek() on the individual partitions.
+ Seek(MessageID) error
}
diff --git a/pulsar/consumer_impl.go b/pulsar/consumer_impl.go
index ede02ef..095050f 100644
--- a/pulsar/consumer_impl.go
+++ b/pulsar/consumer_impl.go
@@ -258,9 +258,8 @@ func (c *consumer) Ack(msg Message) {
// Ack the consumption of a single message, identified by its MessageID
func (c *consumer) AckID(msgID MessageID) {
- mid, ok := msgID.(*messageID)
+ mid, ok := c.messageID(msgID)
if !ok {
- c.log.Warnf("invalid message id type")
return
}
@@ -269,14 +268,7 @@ func (c *consumer) AckID(msgID MessageID) {
return
}
- partition := mid.partitionIdx
- // did we receive a valid partition index?
- if partition < 0 || partition >= len(c.consumers) {
- c.log.Warnf("invalid partition index %d expected a partition
between [0-%d]",
- partition, len(c.consumers))
- return
- }
- c.consumers[partition].AckID(mid)
+ c.consumers[mid.partitionIdx].AckID(mid)
}
func (c *consumer) Nack(msg Message) {
@@ -284,9 +276,8 @@ func (c *consumer) Nack(msg Message) {
}
func (c *consumer) NackID(msgID MessageID) {
- mid, ok := msgID.(*messageID)
+ mid, ok := c.messageID(msgID)
if !ok {
- c.log.Warnf("invalid message id type")
return
}
@@ -295,15 +286,7 @@ func (c *consumer) NackID(msgID MessageID) {
return
}
- partition := mid.partitionIdx
- // did we receive a valid partition index?
- if partition < 0 || partition >= len(c.consumers) {
- c.log.Warnf("invalid partition index %d expected a partition
between [0-%d]",
- partition, len(c.consumers))
- return
- }
-
- c.consumers[partition].NackID(mid)
+ c.consumers[mid.partitionIdx].NackID(mid)
}
func (c *consumer) Close() {
@@ -322,6 +305,19 @@ func (c *consumer) Close() {
})
}
+func (c *consumer) Seek(msgID MessageID) error {
+ if len(c.consumers) > 1 {
+ return errors.New("for partition topic, seek command should
perform on the individual partitions")
+ }
+
+ mid, ok := c.messageID(msgID)
+ if !ok {
+ return nil
+ }
+
+ return c.consumers[mid.partitionIdx].Seek(mid)
+}
+
var r = &random{
R: rand.New(rand.NewSource(time.Now().UnixNano())),
}
@@ -367,3 +363,21 @@ func toProtoInitialPosition(p SubscriptionInitialPosition)
pb.CommandSubscribe_I
return pb.CommandSubscribe_Latest
}
+
+func (c *consumer) messageID(msgID MessageID) (*messageID, bool) {
+ mid, ok := msgID.(*messageID)
+ if !ok {
+ c.log.Warnf("invalid message id type")
+ return nil, false
+ }
+
+ partition := mid.partitionIdx
+ // did we receive a valid partition index?
+ if partition < 0 || partition >= len(c.consumers) {
+ c.log.Warnf("invalid partition index %d expected a partition
between [0-%d]",
+ partition, len(c.consumers))
+ return nil, false
+ }
+
+ return mid, true
+}
diff --git a/pulsar/consumer_multitopic.go b/pulsar/consumer_multitopic.go
index 04a8423..6ba3a4a 100644
--- a/pulsar/consumer_multitopic.go
+++ b/pulsar/consumer_multitopic.go
@@ -19,6 +19,7 @@ package pulsar
import (
"context"
+ "errors"
"fmt"
"sync"
@@ -160,3 +161,7 @@ func (c *multiTopicConsumer) Close() {
close(c.closeCh)
})
}
+
+func (c *multiTopicConsumer) Seek(msgID MessageID) error {
+ return errors.New("seek command not allowed for multi topic consumer")
+}
diff --git a/pulsar/consumer_partition.go b/pulsar/consumer_partition.go
index ae07e76..185f351 100644
--- a/pulsar/consumer_partition.go
+++ b/pulsar/consumer_partition.go
@@ -256,6 +256,47 @@ func (pc *partitionConsumer) Close() {
<-req.doneCh
}
+func (pc *partitionConsumer) Seek(msgID *messageID) error {
+ req := &seekRequest{
+ doneCh: make(chan struct{}),
+ msgID: msgID,
+ }
+ pc.eventsCh <- req
+
+ // wait for the request to complete
+ <-req.doneCh
+ return req.err
+}
+
+func (pc *partitionConsumer) internalSeek(seek *seekRequest) {
+ defer close(seek.doneCh)
+
+ if pc.state == consumerClosing || pc.state == consumerClosed {
+ pc.log.Error("Consumer was already closed")
+ return
+ }
+
+ id := &pb.MessageIdData{}
+ err := proto.Unmarshal(seek.msgID.Serialize(), id)
+ if err != nil {
+ pc.log.WithError(err).Errorf("deserialize message id error:
%s", err.Error())
+ seek.err = err
+ }
+
+ requestID := pc.client.rpcClient.NewRequestID()
+ cmdSeek := &pb.CommandSeek{
+ ConsumerId: proto.Uint64(pc.consumerID),
+ RequestId: proto.Uint64(requestID),
+ MessageId: id,
+ }
+
+ _, err = pc.client.rpcClient.RequestOnCnx(pc.conn, requestID,
pb.BaseCommand_SEEK, cmdSeek)
+ if err != nil {
+ pc.log.WithError(err).Error("Failed to reset to message id")
+ seek.err = err
+ }
+}
+
func (pc *partitionConsumer) internalAck(req *ackRequest) {
msgID := req.msgID
@@ -510,6 +551,12 @@ type getLastMsgIDRequest struct {
err error
}
+type seekRequest struct {
+ doneCh chan struct{}
+ msgID *messageID
+ err error
+}
+
func (pc *partitionConsumer) runEventsLoop() {
defer func() {
pc.log.Debug("exiting events loop")
@@ -528,6 +575,8 @@ func (pc *partitionConsumer) runEventsLoop() {
pc.internalUnsubscribe(v)
case *getLastMsgIDRequest:
pc.internalGetLastMessageID(v)
+ case *seekRequest:
+ pc.internalSeek(v)
case *connectionClosed:
pc.reconnectToBroker()
case *closeRequest:
diff --git a/pulsar/consumer_regex.go b/pulsar/consumer_regex.go
index 1d2f157..4043380 100644
--- a/pulsar/consumer_regex.go
+++ b/pulsar/consumer_regex.go
@@ -19,6 +19,7 @@ package pulsar
import (
"context"
+ "errors"
"fmt"
"regexp"
"strings"
@@ -210,6 +211,10 @@ func (c *regexConsumer) Close() {
})
}
+func (c *regexConsumer) Seek(msgID MessageID) error {
+ return errors.New("seek command not allowed for regex consumer")
+}
+
func (c *regexConsumer) closed() bool {
select {
case <-c.closeCh:
diff --git a/pulsar/consumer_test.go b/pulsar/consumer_test.go
index 52ca7c2..2ba76e8 100644
--- a/pulsar/consumer_test.go
+++ b/pulsar/consumer_test.go
@@ -146,8 +146,9 @@ func TestBatchMessageReceive(t *testing.T) {
SubscriptionName: subName,
})
assert.Nil(t, err)
- count := 0
+ defer consumer.Close()
+ count := 0
for i := 0; i < numOfMessages; i++ {
messageContent := prefix + fmt.Sprintf("%d", i)
msg := &ProducerMessage{
@@ -534,6 +535,7 @@ func TestConsumerFlow(t *testing.T) {
ReceiverQueueSize: 4,
})
assert.Nil(t, err)
+ defer consumer.Close()
for msgNum := 0; msgNum < 100; msgNum++ {
if _, err := producer.Send(ctx, &ProducerMessage{
@@ -640,6 +642,7 @@ func TestConsumerNack(t *testing.T) {
NackRedeliveryDelay: 1 * time.Second,
})
assert.Nil(t, err)
+ defer consumer.Close()
const N = 100
@@ -700,6 +703,7 @@ func TestConsumerCompression(t *testing.T) {
SubscriptionName: "sub-1",
})
assert.Nil(t, err)
+ defer consumer.Close()
const N = 100
@@ -743,6 +747,7 @@ func TestConsumerCompressionWithBatches(t *testing.T) {
SubscriptionName: "sub-1",
})
assert.Nil(t, err)
+ defer consumer.Close()
const N = 100
@@ -762,6 +767,58 @@ func TestConsumerCompressionWithBatches(t *testing.T) {
}
}
+func TestConsumerSeek(t *testing.T) {
+ client, err := NewClient(ClientOptions{
+ URL: lookupURL,
+ })
+ assert.Nil(t, err)
+ defer client.Close()
+
+ topicName := newTopicName()
+ ctx := context.Background()
+
+ producer, err := client.CreateProducer(ProducerOptions{
+ Topic: topicName,
+ DisableBatching: false,
+ })
+ assert.Nil(t, err)
+ defer producer.Close()
+
+ consumer, err := client.Subscribe(ConsumerOptions{
+ Topic: topicName,
+ SubscriptionName: "sub-1",
+ })
+ assert.Nil(t, err)
+ defer consumer.Close()
+
+ const N = 10
+ var seekID MessageID
+ for i := 0; i < 10; i++ {
+ id, err := producer.Send(ctx, &ProducerMessage{
+ Payload: []byte(fmt.Sprintf("hello-%d", i)),
+ })
+ assert.Nil(t, err)
+
+ if i == 4 {
+ seekID = id
+ }
+ }
+
+ for i := 0; i < N; i++ {
+ msg, err := consumer.Receive(ctx)
+ assert.Nil(t, err)
+ assert.Equal(t, fmt.Sprintf("hello-%d", i),
string(msg.Payload()))
+ consumer.Ack(msg)
+ }
+
+ err = consumer.Seek(seekID)
+ assert.Nil(t, err)
+
+ msg, err := consumer.Receive(ctx)
+ assert.Nil(t, err)
+ assert.Equal(t, "hello-4", string(msg.Payload()))
+}
+
func TestConsumerMetadata(t *testing.T) {
client, err := NewClient(ClientOptions{
URL: lookupURL,
diff --git a/pulsar/internal/connection.go b/pulsar/internal/connection.go
index b31e70f..dace305 100644
--- a/pulsar/internal/connection.go
+++ b/pulsar/internal/connection.go
@@ -426,6 +426,7 @@ func (c *connection) internalReceivedCommand(cmd
*pb.BaseCommand, headersAndPayl
case pb.BaseCommand_CLOSE_PRODUCER:
c.handleCloseProducer(cmd.GetCloseProducer())
+
case pb.BaseCommand_CLOSE_CONSUMER:
c.handleCloseConsumer(cmd.GetCloseConsumer())