This is an automated email from the ASF dual-hosted git repository.
mmerli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar-client-go.git
The following commit(s) were added to refs/heads/master by this push:
new c6d905d Different MessageID implementations for message Production
and Consumption (#324)
c6d905d is described below
commit c6d905ddc6e05f0ddd45689f5a46cb9cfa062b71
Author: dferstay <[email protected]>
AuthorDate: Thu Jul 23 19:17:29 2020 -0700
Different MessageID implementations for message Production and Consumption
(#324)
This change splits the `MessageID` implementation in two:
1. `messageID` - A 24 byte structure that contains message identification
information only; to be used during message production
2. `trackingMessageID` - A 72 byte structucture that shares the same
message identification information as `messageID`
and adds `ackTracker`, `acker`, and `receivedTime`
fields; to be used during message consumption
Micro benchmarks show that passing arguments by value that are less-than
four words of memory are optimized by the Go runtime. Results from the
pulsar/impl_message_bench_test.go module are below.
```
name time/op
ProducerCall 1.46ns ± 5%
ProducerCall-4 1.47ns ± 5%
ConsumerCall 7.62ns ± 1%
ConsumerCall-4 7.53ns ± 5%
```
Co-authored-by: Daniel Ferstay <[email protected]>
---
pulsar/consumer_impl.go | 14 ++---
pulsar/consumer_multitopic.go | 4 +-
pulsar/consumer_partition.go | 104 +++++++++++++++++++++-----------------
pulsar/consumer_partition_test.go | 14 ++---
pulsar/consumer_regex.go | 4 +-
pulsar/impl_message.go | 41 +++++++++++----
pulsar/impl_message_bench_test.go | 49 ++++++++++++++++++
pulsar/impl_message_test.go | 4 +-
pulsar/producer_test.go | 2 +-
pulsar/reader_impl.go | 37 ++++++++------
10 files changed, 180 insertions(+), 93 deletions(-)
diff --git a/pulsar/consumer_impl.go b/pulsar/consumer_impl.go
index ef93037..b5de1f9 100644
--- a/pulsar/consumer_impl.go
+++ b/pulsar/consumer_impl.go
@@ -56,8 +56,8 @@ var ErrConsumerClosed = errors.New("consumer closed")
const defaultNackRedeliveryDelay = 1 * time.Minute
type acker interface {
- AckID(id messageID)
- NackID(id messageID)
+ AckID(id trackingMessageID)
+ NackID(id trackingMessageID)
}
type consumer struct {
@@ -263,7 +263,7 @@ func (c *consumer) internalTopicSubscribeToPartitions()
error {
nackRedeliveryDelay: nackRedeliveryDelay,
metadata: metadata,
replicateSubscriptionState:
c.options.ReplicateSubscriptionState,
- startMessageID: messageID{},
+ startMessageID: trackingMessageID{},
subscriptionMode: durable,
readCompacted:
c.options.ReadCompacted,
interceptors:
c.options.Interceptors,
@@ -488,11 +488,11 @@ func toProtoInitialPosition(p
SubscriptionInitialPosition) pb.CommandSubscribe_I
return pb.CommandSubscribe_Latest
}
-func (c *consumer) messageID(msgID MessageID) (messageID, bool) {
- mid, ok := msgID.(messageID)
+func (c *consumer) messageID(msgID MessageID) (trackingMessageID, bool) {
+ mid, ok := toTrackingMessageID(msgID)
if !ok {
c.log.Warnf("invalid message id type %T", msgID)
- return messageID{}, false
+ return trackingMessageID{}, false
}
partition := int(mid.partitionIdx)
@@ -500,7 +500,7 @@ func (c *consumer) messageID(msgID MessageID) (messageID,
bool) {
if partition < 0 || partition >= len(c.consumers) {
c.log.Warnf("invalid partition index %d expected a partition
between [0-%d]",
partition, len(c.consumers))
- return messageID{}, false
+ return trackingMessageID{}, false
}
return mid, true
diff --git a/pulsar/consumer_multitopic.go b/pulsar/consumer_multitopic.go
index f526487..8d34203 100644
--- a/pulsar/consumer_multitopic.go
+++ b/pulsar/consumer_multitopic.go
@@ -120,7 +120,7 @@ func (c *multiTopicConsumer) Ack(msg Message) {
// Ack the consumption of a single message, identified by its MessageID
func (c *multiTopicConsumer) AckID(msgID MessageID) {
- mid, ok := msgID.(messageID)
+ mid, ok := toTrackingMessageID(msgID)
if !ok {
c.log.Warnf("invalid message id type %T", msgID)
return
@@ -139,7 +139,7 @@ func (c *multiTopicConsumer) Nack(msg Message) {
}
func (c *multiTopicConsumer) NackID(msgID MessageID) {
- mid, ok := msgID.(messageID)
+ mid, ok := toTrackingMessageID(msgID)
if !ok {
c.log.Warnf("invalid message id type %T", msgID)
return
diff --git a/pulsar/consumer_partition.go b/pulsar/consumer_partition.go
index b83cb88..b3fc17c 100644
--- a/pulsar/consumer_partition.go
+++ b/pulsar/consumer_partition.go
@@ -109,7 +109,7 @@ type partitionConsumerOpts struct {
nackRedeliveryDelay time.Duration
metadata map[string]string
replicateSubscriptionState bool
- startMessageID messageID
+ startMessageID trackingMessageID
startMessageIDInclusive bool
subscriptionMode subscriptionMode
readCompacted bool
@@ -141,13 +141,13 @@ type partitionConsumer struct {
// the size of the queue channel for buffering messages
queueSize int32
queueCh chan []*message
- startMessageID messageID
- lastDequeuedMsg messageID
+ startMessageID trackingMessageID
+ lastDequeuedMsg trackingMessageID
eventsCh chan interface{}
connectedCh chan struct{}
closeCh chan struct{}
- clearQueueCh chan func(id messageID)
+ clearQueueCh chan func(id trackingMessageID)
nackTracker *negativeAcksTracker
dlq *dlqRouter
@@ -175,7 +175,7 @@ func newPartitionConsumer(parent Consumer, client *client,
options *partitionCon
connectedCh: make(chan struct{}),
messageCh: messageCh,
closeCh: make(chan struct{}),
- clearQueueCh: make(chan func(id messageID)),
+ clearQueueCh: make(chan func(id trackingMessageID)),
compressionProviders:
make(map[pb.CompressionType]compression.Provider),
dlq: dlq,
log: log.WithField("topic", options.topic),
@@ -241,7 +241,7 @@ func (pc *partitionConsumer) internalUnsubscribe(unsub
*unsubscribeRequest) {
pc.state = consumerClosed
}
-func (pc *partitionConsumer) getLastMessageID() (messageID, error) {
+func (pc *partitionConsumer) getLastMessageID() (trackingMessageID, error) {
req := &getLastMsgIDRequest{doneCh: make(chan struct{})}
pc.eventsCh <- req
@@ -269,8 +269,8 @@ func (pc *partitionConsumer) internalGetLastMessageID(req
*getLastMsgIDRequest)
}
}
-func (pc *partitionConsumer) AckID(msgID messageID) {
- if !msgID.IsZero() && msgID.ack() {
+func (pc *partitionConsumer) AckID(msgID trackingMessageID) {
+ if !msgID.Undefined() && msgID.ack() {
acksCounter.Inc()
processingTime.Observe(float64(time.Now().UnixNano()-msgID.receivedTime.UnixNano())
/ 1.0e9)
req := &ackRequest{
@@ -282,8 +282,8 @@ func (pc *partitionConsumer) AckID(msgID messageID) {
}
}
-func (pc *partitionConsumer) NackID(msgID messageID) {
- pc.nackTracker.Add(msgID)
+func (pc *partitionConsumer) NackID(msgID trackingMessageID) {
+ pc.nackTracker.Add(msgID.messageID)
nacksCounter.Inc()
}
@@ -328,7 +328,7 @@ func (pc *partitionConsumer) Close() {
<-req.doneCh
}
-func (pc *partitionConsumer) Seek(msgID messageID) error {
+func (pc *partitionConsumer) Seek(msgID trackingMessageID) error {
req := &seekRequest{
doneCh: make(chan struct{}),
msgID: msgID,
@@ -522,17 +522,17 @@ func (pc *partitionConsumer) MessageReceived(response
*pb.CommandMessage, header
return nil
}
-func (pc *partitionConsumer) messageShouldBeDiscarded(msgID messageID) bool {
- if pc.startMessageID.IsZero() {
+func (pc *partitionConsumer) messageShouldBeDiscarded(msgID trackingMessageID)
bool {
+ if pc.startMessageID.Undefined() {
return false
}
if pc.options.startMessageIDInclusive {
- return pc.startMessageID.greater(msgID)
+ return pc.startMessageID.greater(msgID.messageID)
}
// Non inclusive
- return pc.startMessageID.greaterEqual(msgID)
+ return pc.startMessageID.greaterEqual(msgID.messageID)
}
func (pc *partitionConsumer) ConnectionClosed() {
@@ -647,7 +647,7 @@ func (pc *partitionConsumer) dispatcher() {
case clearQueueCb := <-pc.clearQueueCh:
// drain the message queue on any new connection by
sending a
// special nil message to the channel so we know when
to stop dropping messages
- var nextMessageInQueue messageID
+ var nextMessageInQueue trackingMessageID
go func() {
pc.queueCh <- nil
}()
@@ -655,8 +655,8 @@ func (pc *partitionConsumer) dispatcher() {
// the queue has been drained
if m == nil {
break
- } else if nextMessageInQueue.IsZero() {
- nextMessageInQueue =
m[0].msgID.(messageID)
+ } else if nextMessageInQueue.Undefined() {
+ nextMessageInQueue =
m[0].msgID.(trackingMessageID)
}
}
@@ -666,7 +666,7 @@ func (pc *partitionConsumer) dispatcher() {
}
type ackRequest struct {
- msgID messageID
+ msgID trackingMessageID
}
type unsubscribeRequest struct {
@@ -684,13 +684,13 @@ type redeliveryRequest struct {
type getLastMsgIDRequest struct {
doneCh chan struct{}
- msgID messageID
+ msgID trackingMessageID
err error
}
type seekRequest struct {
doneCh chan struct{}
- msgID messageID
+ msgID trackingMessageID
err error
}
@@ -870,15 +870,15 @@ func (pc *partitionConsumer) grabConn() error {
}
}
-func (pc *partitionConsumer) clearQueueAndGetNextMessage() messageID {
+func (pc *partitionConsumer) clearQueueAndGetNextMessage() trackingMessageID {
if pc.state != consumerReady {
- return messageID{}
+ return trackingMessageID{}
}
wg := &sync.WaitGroup{}
wg.Add(1)
- var msgID messageID
+ var msgID trackingMessageID
- pc.clearQueueCh <- func(id messageID) {
+ pc.clearQueueCh <- func(id trackingMessageID) {
msgID = id
wg.Done()
}
@@ -891,12 +891,12 @@ func (pc *partitionConsumer)
clearQueueAndGetNextMessage() messageID {
* Clear the internal receiver queue and returns the message id of what was
the 1st message in the queue that was
* not seen by the application
*/
-func (pc *partitionConsumer) clearReceiverQueue() messageID {
+func (pc *partitionConsumer) clearReceiverQueue() trackingMessageID {
nextMessageInQueue := pc.clearQueueAndGetNextMessage()
- if !nextMessageInQueue.IsZero() {
+ if !nextMessageInQueue.Undefined() {
return getPreviousMessage(nextMessageInQueue)
- } else if !pc.lastDequeuedMsg.IsZero() {
+ } else if !pc.lastDequeuedMsg.Undefined() {
// If the queue was empty we need to restart from the message
just after the last one that has been dequeued
// in the past
return pc.lastDequeuedMsg
@@ -906,22 +906,32 @@ func (pc *partitionConsumer) clearReceiverQueue()
messageID {
}
}
-func getPreviousMessage(mid messageID) messageID {
+func getPreviousMessage(mid trackingMessageID) trackingMessageID {
if mid.batchIdx >= 0 {
- return messageID{
- ledgerID: mid.ledgerID,
- entryID: mid.entryID,
- batchIdx: mid.batchIdx - 1,
- partitionIdx: mid.partitionIdx,
+ return trackingMessageID{
+ messageID: messageID{
+ ledgerID: mid.ledgerID,
+ entryID: mid.entryID,
+ batchIdx: mid.batchIdx - 1,
+ partitionIdx: mid.partitionIdx,
+ },
+ tracker: mid.tracker,
+ consumer: mid.consumer,
+ receivedTime: mid.receivedTime,
}
}
// Get on previous message in previous entry
- return messageID{
- ledgerID: mid.ledgerID,
- entryID: mid.entryID - 1,
- batchIdx: mid.batchIdx,
- partitionIdx: mid.partitionIdx,
+ return trackingMessageID{
+ messageID: messageID{
+ ledgerID: mid.ledgerID,
+ entryID: mid.entryID - 1,
+ batchIdx: mid.batchIdx,
+ partitionIdx: mid.partitionIdx,
+ },
+ tracker: mid.tracker,
+ consumer: mid.consumer,
+ receivedTime: mid.receivedTime,
}
}
@@ -977,8 +987,8 @@ func (pc *partitionConsumer) discardCorruptedMessage(msgID
*pb.MessageIdData,
})
}
-func convertToMessageIDData(msgID messageID) *pb.MessageIdData {
- if msgID.IsZero() {
+func convertToMessageIDData(msgID trackingMessageID) *pb.MessageIdData {
+ if msgID.Undefined() {
return nil
}
@@ -988,14 +998,16 @@ func convertToMessageIDData(msgID messageID)
*pb.MessageIdData {
}
}
-func convertToMessageID(id *pb.MessageIdData) messageID {
+func convertToMessageID(id *pb.MessageIdData) trackingMessageID {
if id == nil {
- return messageID{}
+ return trackingMessageID{}
}
- msgID := messageID{
- ledgerID: int64(*id.LedgerId),
- entryID: int64(*id.EntryId),
+ msgID := trackingMessageID{
+ messageID: messageID{
+ ledgerID: int64(*id.LedgerId),
+ entryID: int64(*id.EntryId),
+ },
}
if id.BatchIndex != nil {
msgID.batchIdx = *id.BatchIndex
diff --git a/pulsar/consumer_partition_test.go
b/pulsar/consumer_partition_test.go
index 01831a4..68cbdd6 100644
--- a/pulsar/consumer_partition_test.go
+++ b/pulsar/consumer_partition_test.go
@@ -45,11 +45,11 @@ func TestSingleMessageIDNoAckTracker(t *testing.T) {
// ensure the tracker was set on the message id
messages := <-pc.queueCh
for _, m := range messages {
- assert.Nil(t, m.ID().(messageID).tracker)
+ assert.Nil(t, m.ID().(trackingMessageID).tracker)
}
// ack the message id
- pc.AckID(messages[0].msgID.(messageID))
+ pc.AckID(messages[0].msgID.(trackingMessageID))
select {
case <-eventsCh:
@@ -75,11 +75,11 @@ func TestBatchMessageIDNoAckTracker(t *testing.T) {
// ensure the tracker was set on the message id
messages := <-pc.queueCh
for _, m := range messages {
- assert.Nil(t, m.ID().(messageID).tracker)
+ assert.Nil(t, m.ID().(trackingMessageID).tracker)
}
// ack the message id
- pc.AckID(messages[0].msgID.(messageID))
+ pc.AckID(messages[0].msgID.(trackingMessageID))
select {
case <-eventsCh:
@@ -105,12 +105,12 @@ func TestBatchMessageIDWithAckTracker(t *testing.T) {
// ensure the tracker was set on the message id
messages := <-pc.queueCh
for _, m := range messages {
- assert.NotNil(t, m.ID().(messageID).tracker)
+ assert.NotNil(t, m.ID().(trackingMessageID).tracker)
}
// ack all message ids except the last one
for i := 0; i < 9; i++ {
- pc.AckID(messages[i].msgID.(messageID))
+ pc.AckID(messages[i].msgID.(trackingMessageID))
}
select {
@@ -120,7 +120,7 @@ func TestBatchMessageIDWithAckTracker(t *testing.T) {
}
// ack last message
- pc.AckID(messages[9].msgID.(messageID))
+ pc.AckID(messages[9].msgID.(trackingMessageID))
select {
case <-eventsCh:
diff --git a/pulsar/consumer_regex.go b/pulsar/consumer_regex.go
index a6dfd56..e0fdbcb 100644
--- a/pulsar/consumer_regex.go
+++ b/pulsar/consumer_regex.go
@@ -165,7 +165,7 @@ func (c *regexConsumer) Ack(msg Message) {
// Ack the consumption of a single message, identified by its MessageID
func (c *regexConsumer) AckID(msgID MessageID) {
- mid, ok := msgID.(messageID)
+ mid, ok := toTrackingMessageID(msgID)
if !ok {
c.log.Warnf("invalid message id type %T", msgID)
return
@@ -184,7 +184,7 @@ func (c *regexConsumer) Nack(msg Message) {
}
func (c *regexConsumer) NackID(msgID MessageID) {
- mid, ok := msgID.(messageID)
+ mid, ok := toTrackingMessageID(msgID)
if !ok {
c.log.Warnf("invalid message id type %T", msgID)
return
diff --git a/pulsar/impl_message.go b/pulsar/impl_message.go
index 562dfb6..d670a37 100644
--- a/pulsar/impl_message.go
+++ b/pulsar/impl_message.go
@@ -33,17 +33,21 @@ type messageID struct {
entryID int64
batchIdx int32
partitionIdx int32
+}
+
+type trackingMessageID struct {
+ messageID
tracker *ackTracker
consumer acker
receivedTime time.Time
}
-func (id messageID) IsZero() bool {
- return id == messageID{}
+func (id trackingMessageID) Undefined() bool {
+ return id == trackingMessageID{}
}
-func (id messageID) Ack() {
+func (id trackingMessageID) Ack() {
if id.consumer == nil {
return
}
@@ -52,14 +56,14 @@ func (id messageID) Ack() {
}
}
-func (id messageID) Nack() {
+func (id trackingMessageID) Nack() {
if id.consumer == nil {
return
}
id.consumer.NackID(id)
}
-func (id messageID) ack() bool {
+func (id trackingMessageID) ack() bool {
if id.tracker != nil && id.batchIdx > -1 {
return id.tracker.ack(int(id.batchIdx))
}
@@ -124,17 +128,32 @@ func newMessageID(ledgerID int64, entryID int64, batchIdx
int32, partitionIdx in
}
func newTrackingMessageID(ledgerID int64, entryID int64, batchIdx int32,
partitionIdx int32,
- tracker *ackTracker) messageID {
- return messageID{
- ledgerID: ledgerID,
- entryID: entryID,
- batchIdx: batchIdx,
- partitionIdx: partitionIdx,
+ tracker *ackTracker) trackingMessageID {
+ return trackingMessageID{
+ messageID: messageID{
+ ledgerID: ledgerID,
+ entryID: entryID,
+ batchIdx: batchIdx,
+ partitionIdx: partitionIdx,
+ },
tracker: tracker,
receivedTime: time.Now(),
}
}
+func toTrackingMessageID(msgID MessageID) (trackingMessageID, bool) {
+ if mid, ok := msgID.(messageID); ok {
+ return trackingMessageID{
+ messageID: mid,
+ receivedTime: time.Now(),
+ }, true
+ } else if mid, ok := msgID.(trackingMessageID); ok {
+ return mid, true
+ } else {
+ return trackingMessageID{}, false
+ }
+}
+
func timeFromUnixTimestampMillis(timestamp uint64) time.Time {
ts := int64(timestamp) * int64(time.Millisecond)
seconds := ts / int64(time.Second)
diff --git a/pulsar/impl_message_bench_test.go
b/pulsar/impl_message_bench_test.go
new file mode 100644
index 0000000..4b6ca10
--- /dev/null
+++ b/pulsar/impl_message_bench_test.go
@@ -0,0 +1,49 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package pulsar
+
+import (
+ "testing"
+)
+
+var (
+ usedByProducer messageID
+ usedByConsumer trackingMessageID
+)
+
+func producerCall(id messageID) messageID {
+ id.entryID++
+ return id
+}
+
+func consumerCall(id trackingMessageID) trackingMessageID {
+ id.entryID++
+ return id
+}
+
+func BenchmarkProducerCall(b *testing.B) {
+ for i := 0; i < b.N; i++ {
+ usedByProducer = producerCall(usedByProducer)
+ }
+}
+
+func BenchmarkConsumerCall(b *testing.B) {
+ for i := 0; i < b.N; i++ {
+ usedByConsumer = consumerCall(usedByConsumer)
+ }
+}
diff --git a/pulsar/impl_message_test.go b/pulsar/impl_message_test.go
index 8c66411..4e8b644 100644
--- a/pulsar/impl_message_test.go
+++ b/pulsar/impl_message_test.go
@@ -82,7 +82,7 @@ func TestAckingMessageIDBatchOne(t *testing.T) {
func TestAckingMessageIDBatchTwo(t *testing.T) {
tracker := newAckTracker(2)
- ids := []messageID{
+ ids := []trackingMessageID{
newTrackingMessageID(1, 1, 0, 0, tracker),
newTrackingMessageID(1, 1, 1, 0, tracker),
}
@@ -93,7 +93,7 @@ func TestAckingMessageIDBatchTwo(t *testing.T) {
// try reverse order
tracker = newAckTracker(2)
- ids = []messageID{
+ ids = []trackingMessageID{
newTrackingMessageID(1, 1, 0, 0, tracker),
newTrackingMessageID(1, 1, 1, 0, tracker),
}
diff --git a/pulsar/producer_test.go b/pulsar/producer_test.go
index d06ddbb..20cfd91 100644
--- a/pulsar/producer_test.go
+++ b/pulsar/producer_test.go
@@ -322,7 +322,7 @@ func TestFlushInProducer(t *testing.T) {
assert.Nil(t, err)
msgCount++
- msgID := msg.ID().(messageID)
+ msgID := msg.ID().(trackingMessageID)
// Since messages are batched, they will be sharing the same
ledgerId/entryId
if ledgerID == -1 {
ledgerID = msgID.ledgerID
diff --git a/pulsar/reader_impl.go b/pulsar/reader_impl.go
index d97cc96..474d0db 100644
--- a/pulsar/reader_impl.go
+++ b/pulsar/reader_impl.go
@@ -19,6 +19,8 @@ package pulsar
import (
"context"
+ "fmt"
+ "time"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto"
@@ -45,7 +47,7 @@ var (
type reader struct {
pc *partitionConsumer
messageCh chan ConsumerMessage
- lastMessageInBroker messageID
+ lastMessageInBroker trackingMessageID
log *log.Entry
}
@@ -59,17 +61,19 @@ func newReader(client *client, options ReaderOptions)
(Reader, error) {
return nil, newError(ResultInvalidConfiguration,
"StartMessageID is required")
}
- var startMessageID messageID
- var ok bool
- if startMessageID, ok = options.StartMessageID.(messageID); !ok {
- // a custom type satisfying MessageID may not be a messageID
+ startMessageID, ok := toTrackingMessageID(options.StartMessageID)
+ if !ok {
+ // a custom type satisfying MessageID may not be a messageID or
trackingMessageID
// so re-create messageID using its data
deserMsgID, err :=
deserializeMessageID(options.StartMessageID.Serialize())
if err != nil {
return nil, err
}
// de-serialized MessageID is a messageID
- startMessageID = deserMsgID.(messageID)
+ startMessageID = trackingMessageID{
+ messageID: deserMsgID.(messageID),
+ receivedTime: time.Now(),
+ }
}
subscriptionName := options.SubscriptionRolePrefix
@@ -134,10 +138,13 @@ func (r *reader) Next(ctx context.Context) (Message,
error) {
// Acknowledge message immediately because the reader
is based on non-durable subscription. When it reconnects,
// it will specify the subscription position anyway
- msgID := cm.Message.ID().(messageID)
- r.pc.lastDequeuedMsg = msgID
- r.pc.AckID(msgID)
- return cm.Message, nil
+ msgID := cm.Message.ID()
+ if mid, ok := toTrackingMessageID(msgID); ok {
+ r.pc.lastDequeuedMsg = mid
+ r.pc.AckID(mid)
+ return cm.Message, nil
+ }
+ return nil, fmt.Errorf("invalid message id type %T",
msgID)
case <-ctx.Done():
return nil, ctx.Err()
}
@@ -145,7 +152,7 @@ func (r *reader) Next(ctx context.Context) (Message, error)
{
}
func (r *reader) HasNext() bool {
- if !r.lastMessageInBroker.IsZero() && r.hasMoreMessages() {
+ if !r.lastMessageInBroker.Undefined() && r.hasMoreMessages() {
return true
}
@@ -164,16 +171,16 @@ func (r *reader) HasNext() bool {
}
func (r *reader) hasMoreMessages() bool {
- if !r.pc.lastDequeuedMsg.IsZero() {
- return r.lastMessageInBroker.greater(r.pc.lastDequeuedMsg)
+ if !r.pc.lastDequeuedMsg.Undefined() {
+ return
r.lastMessageInBroker.greater(r.pc.lastDequeuedMsg.messageID)
}
if r.pc.options.startMessageIDInclusive {
- return r.lastMessageInBroker.greaterEqual(r.pc.startMessageID)
+ return
r.lastMessageInBroker.greaterEqual(r.pc.startMessageID.messageID)
}
// Non-inclusive
- return r.lastMessageInBroker.greater(r.pc.startMessageID)
+ return r.lastMessageInBroker.greater(r.pc.startMessageID.messageID)
}
func (r *reader) Close() {