This is an automated email from the ASF dual-hosted git repository.
mmerli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar-client-go.git
The following commit(s) were added to refs/heads/master by this push:
new 9065cb7 Do not allocate MessageIDs on the heap (#319)
9065cb7 is described below
commit 9065cb7e0c3e22e6bd491531ca0344b8c643de00
Author: dferstay <[email protected]>
AuthorDate: Fri Jul 10 09:57:46 2020 -0700
Do not allocate MessageIDs on the heap (#319)
Passing a function parameter by pointer (or writing a pointer into a
channel) will cause the Go escape analysis to allocate the parameter on
the heap.
This change passes messageID struct instances by value instead of by
pointer; this keeps messageID structs on the stack.
Each message produced or consumed by the library is associated with
a MessageID; keeping instances of the MessageID structure on the stack
reduces heap allocation and associated GC cost.
Signed-off-by: Daniel Ferstay <[email protected]>
Co-authored-by: Daniel Ferstay <[email protected]>
---
pulsar/consumer_impl.go | 16 ++++-----
pulsar/consumer_multitopic.go | 8 ++---
pulsar/consumer_partition.go | 67 ++++++++++++++++++------------------
pulsar/consumer_partition_test.go | 14 ++++----
pulsar/consumer_regex.go | 8 ++---
pulsar/impl_message.go | 28 ++++++++-------
pulsar/impl_message_test.go | 12 +++----
pulsar/negative_acks_tracker.go | 2 +-
pulsar/negative_acks_tracker_test.go | 12 +++----
pulsar/producer_test.go | 2 +-
pulsar/reader_impl.go | 20 +++++------
11 files changed, 96 insertions(+), 93 deletions(-)
diff --git a/pulsar/consumer_impl.go b/pulsar/consumer_impl.go
index f9ee004..feebcf2 100644
--- a/pulsar/consumer_impl.go
+++ b/pulsar/consumer_impl.go
@@ -36,8 +36,8 @@ var ErrConsumerClosed = errors.New("consumer closed")
const defaultNackRedeliveryDelay = 1 * time.Minute
type acker interface {
- AckID(id *messageID)
- NackID(id *messageID)
+ AckID(id messageID)
+ NackID(id messageID)
}
type consumer struct {
@@ -239,7 +239,7 @@ func (c *consumer) internalTopicSubscribeToPartitions()
error {
nackRedeliveryDelay: nackRedeliveryDelay,
metadata: metadata,
replicateSubscriptionState:
c.options.ReplicateSubscriptionState,
- startMessageID: nil,
+ startMessageID: messageID{},
subscriptionMode: durable,
readCompacted:
c.options.ReadCompacted,
}
@@ -456,11 +456,11 @@ func toProtoInitialPosition(p
SubscriptionInitialPosition) pb.CommandSubscribe_I
return pb.CommandSubscribe_Latest
}
-func (c *consumer) messageID(msgID MessageID) (*messageID, bool) {
- mid, ok := msgID.(*messageID)
+func (c *consumer) messageID(msgID MessageID) (messageID, bool) {
+ mid, ok := msgID.(messageID)
if !ok {
- c.log.Warnf("invalid message id type")
- return nil, false
+ c.log.Warnf("invalid message id type %T", msgID)
+ return messageID{}, false
}
partition := int(mid.partitionIdx)
@@ -468,7 +468,7 @@ func (c *consumer) messageID(msgID MessageID) (*messageID,
bool) {
if partition < 0 || partition >= len(c.consumers) {
c.log.Warnf("invalid partition index %d expected a partition
between [0-%d]",
partition, len(c.consumers))
- return nil, false
+ return messageID{}, false
}
return mid, true
diff --git a/pulsar/consumer_multitopic.go b/pulsar/consumer_multitopic.go
index a5386cb..ec4d57a 100644
--- a/pulsar/consumer_multitopic.go
+++ b/pulsar/consumer_multitopic.go
@@ -119,9 +119,9 @@ func (c *multiTopicConsumer) Ack(msg Message) {
// Ack the consumption of a single message, identified by its MessageID
func (c *multiTopicConsumer) AckID(msgID MessageID) {
- mid, ok := msgID.(*messageID)
+ mid, ok := msgID.(messageID)
if !ok {
- c.log.Warnf("invalid message id type")
+ c.log.Warnf("invalid message id type %T", msgID)
return
}
@@ -138,9 +138,9 @@ func (c *multiTopicConsumer) Nack(msg Message) {
}
func (c *multiTopicConsumer) NackID(msgID MessageID) {
- mid, ok := msgID.(*messageID)
+ mid, ok := msgID.(messageID)
if !ok {
- c.log.Warnf("invalid message id type")
+ c.log.Warnf("invalid message id type %T", msgID)
return
}
diff --git a/pulsar/consumer_partition.go b/pulsar/consumer_partition.go
index cffc2b4..e539e51 100644
--- a/pulsar/consumer_partition.go
+++ b/pulsar/consumer_partition.go
@@ -63,7 +63,7 @@ type partitionConsumerOpts struct {
nackRedeliveryDelay time.Duration
metadata map[string]string
replicateSubscriptionState bool
- startMessageID *messageID
+ startMessageID messageID
startMessageIDInclusive bool
subscriptionMode subscriptionMode
readCompacted bool
@@ -94,13 +94,13 @@ type partitionConsumer struct {
// the size of the queue channel for buffering messages
queueSize int32
queueCh chan []*message
- startMessageID *messageID
- lastDequeuedMsg *messageID
+ startMessageID messageID
+ lastDequeuedMsg messageID
eventsCh chan interface{}
connectedCh chan struct{}
closeCh chan struct{}
- clearQueueCh chan func(id *messageID)
+ clearQueueCh chan func(id messageID)
nackTracker *negativeAcksTracker
dlq *dlqRouter
@@ -128,7 +128,7 @@ func newPartitionConsumer(parent Consumer, client *client,
options *partitionCon
connectedCh: make(chan struct{}),
messageCh: messageCh,
closeCh: make(chan struct{}),
- clearQueueCh: make(chan func(id *messageID)),
+ clearQueueCh: make(chan func(id messageID)),
compressionProviders:
make(map[pb.CompressionType]compression.Provider),
dlq: dlq,
log: log.WithField("topic", options.topic),
@@ -192,7 +192,7 @@ func (pc *partitionConsumer) internalUnsubscribe(unsub
*unsubscribeRequest) {
pc.state = consumerClosed
}
-func (pc *partitionConsumer) getLastMessageID() (*messageID, error) {
+func (pc *partitionConsumer) getLastMessageID() (messageID, error) {
req := &getLastMsgIDRequest{doneCh: make(chan struct{})}
pc.eventsCh <- req
@@ -220,8 +220,8 @@ func (pc *partitionConsumer) internalGetLastMessageID(req
*getLastMsgIDRequest)
}
}
-func (pc *partitionConsumer) AckID(msgID *messageID) {
- if msgID != nil && msgID.ack() {
+func (pc *partitionConsumer) AckID(msgID messageID) {
+ if !msgID.IsZero() && msgID.ack() {
req := &ackRequest{
msgID: msgID,
}
@@ -229,7 +229,7 @@ func (pc *partitionConsumer) AckID(msgID *messageID) {
}
}
-func (pc *partitionConsumer) NackID(msgID *messageID) {
+func (pc *partitionConsumer) NackID(msgID messageID) {
pc.nackTracker.Add(msgID)
}
@@ -268,7 +268,7 @@ func (pc *partitionConsumer) Close() {
<-req.doneCh
}
-func (pc *partitionConsumer) Seek(msgID *messageID) error {
+func (pc *partitionConsumer) Seek(msgID messageID) error {
req := &seekRequest{
doneCh: make(chan struct{}),
msgID: msgID,
@@ -450,8 +450,8 @@ func (pc *partitionConsumer) MessageReceived(response
*pb.CommandMessage, header
return nil
}
-func (pc *partitionConsumer) messageShouldBeDiscarded(msgID *messageID) bool {
- if pc.startMessageID == nil {
+func (pc *partitionConsumer) messageShouldBeDiscarded(msgID messageID) bool {
+ if pc.startMessageID.IsZero() {
return false
}
@@ -571,7 +571,7 @@ func (pc *partitionConsumer) dispatcher() {
case clearQueueCb := <-pc.clearQueueCh:
// drain the message queue on any new connection by
sending a
// special nil message to the channel so we know when
to stop dropping messages
- var nextMessageInQueue *messageID
+ var nextMessageInQueue messageID
go func() {
pc.queueCh <- nil
}()
@@ -579,8 +579,8 @@ func (pc *partitionConsumer) dispatcher() {
// the queue has been drained
if m == nil {
break
- } else if nextMessageInQueue == nil {
- nextMessageInQueue =
m[0].msgID.(*messageID)
+ } else if nextMessageInQueue.IsZero() {
+ nextMessageInQueue =
m[0].msgID.(messageID)
}
}
@@ -590,7 +590,7 @@ func (pc *partitionConsumer) dispatcher() {
}
type ackRequest struct {
- msgID *messageID
+ msgID messageID
}
type unsubscribeRequest struct {
@@ -608,13 +608,13 @@ type redeliveryRequest struct {
type getLastMsgIDRequest struct {
doneCh chan struct{}
- msgID *messageID
+ msgID messageID
err error
}
type seekRequest struct {
doneCh chan struct{}
- msgID *messageID
+ msgID messageID
err error
}
@@ -794,15 +794,15 @@ func (pc *partitionConsumer) grabConn() error {
}
}
-func (pc *partitionConsumer) clearQueueAndGetNextMessage() *messageID {
+func (pc *partitionConsumer) clearQueueAndGetNextMessage() messageID {
if pc.state != consumerReady {
- return nil
+ return messageID{}
}
wg := &sync.WaitGroup{}
wg.Add(1)
- var msgID *messageID
+ var msgID messageID
- pc.clearQueueCh <- func(id *messageID) {
+ pc.clearQueueCh <- func(id messageID) {
msgID = id
wg.Done()
}
@@ -815,12 +815,12 @@ func (pc *partitionConsumer)
clearQueueAndGetNextMessage() *messageID {
* Clear the internal receiver queue and returns the message id of what was
the 1st message in the queue that was
* not seen by the application
*/
-func (pc *partitionConsumer) clearReceiverQueue() *messageID {
+func (pc *partitionConsumer) clearReceiverQueue() messageID {
nextMessageInQueue := pc.clearQueueAndGetNextMessage()
- if nextMessageInQueue != nil {
+ if !nextMessageInQueue.IsZero() {
return getPreviousMessage(nextMessageInQueue)
- } else if pc.lastDequeuedMsg != nil {
+ } else if !pc.lastDequeuedMsg.IsZero() {
// If the queue was empty we need to restart from the message
just after the last one that has been dequeued
// in the past
return pc.lastDequeuedMsg
@@ -830,9 +830,9 @@ func (pc *partitionConsumer) clearReceiverQueue()
*messageID {
}
}
-func getPreviousMessage(mid *messageID) *messageID {
+func getPreviousMessage(mid messageID) messageID {
if mid.batchIdx >= 0 {
- return &messageID{
+ return messageID{
ledgerID: mid.ledgerID,
entryID: mid.entryID,
batchIdx: mid.batchIdx - 1,
@@ -841,7 +841,7 @@ func getPreviousMessage(mid *messageID) *messageID {
}
// Get on previous message in previous entry
- return &messageID{
+ return messageID{
ledgerID: mid.ledgerID,
entryID: mid.entryID - 1,
batchIdx: mid.batchIdx,
@@ -901,8 +901,8 @@ func (pc *partitionConsumer) discardCorruptedMessage(msgID
*pb.MessageIdData,
})
}
-func convertToMessageIDData(msgID *messageID) *pb.MessageIdData {
- if msgID == nil {
+func convertToMessageIDData(msgID messageID) *pb.MessageIdData {
+ if msgID.IsZero() {
return nil
}
@@ -912,16 +912,15 @@ func convertToMessageIDData(msgID *messageID)
*pb.MessageIdData {
}
}
-func convertToMessageID(id *pb.MessageIdData) *messageID {
+func convertToMessageID(id *pb.MessageIdData) messageID {
if id == nil {
- return nil
+ return messageID{}
}
- msgID := &messageID{
+ msgID := messageID{
ledgerID: int64(*id.LedgerId),
entryID: int64(*id.EntryId),
}
-
if id.BatchIndex != nil {
msgID.batchIdx = *id.BatchIndex
}
diff --git a/pulsar/consumer_partition_test.go
b/pulsar/consumer_partition_test.go
index 5a5a94e..0fcbdc5 100644
--- a/pulsar/consumer_partition_test.go
+++ b/pulsar/consumer_partition_test.go
@@ -44,11 +44,11 @@ func TestSingleMessageIDNoAckTracker(t *testing.T) {
// ensure the tracker was set on the message id
messages := <-pc.queueCh
for _, m := range messages {
- assert.Nil(t, m.ID().(*messageID).tracker)
+ assert.Nil(t, m.ID().(messageID).tracker)
}
// ack the message id
- pc.AckID(messages[0].msgID.(*messageID))
+ pc.AckID(messages[0].msgID.(messageID))
select {
case <-eventsCh:
@@ -73,11 +73,11 @@ func TestBatchMessageIDNoAckTracker(t *testing.T) {
// ensure the tracker was set on the message id
messages := <-pc.queueCh
for _, m := range messages {
- assert.Nil(t, m.ID().(*messageID).tracker)
+ assert.Nil(t, m.ID().(messageID).tracker)
}
// ack the message id
- pc.AckID(messages[0].msgID.(*messageID))
+ pc.AckID(messages[0].msgID.(messageID))
select {
case <-eventsCh:
@@ -102,12 +102,12 @@ func TestBatchMessageIDWithAckTracker(t *testing.T) {
// ensure the tracker was set on the message id
messages := <-pc.queueCh
for _, m := range messages {
- assert.NotNil(t, m.ID().(*messageID).tracker)
+ assert.NotNil(t, m.ID().(messageID).tracker)
}
// ack all message ids except the last one
for i := 0; i < 9; i++ {
- pc.AckID(messages[i].msgID.(*messageID))
+ pc.AckID(messages[i].msgID.(messageID))
}
select {
@@ -117,7 +117,7 @@ func TestBatchMessageIDWithAckTracker(t *testing.T) {
}
// ack last message
- pc.AckID(messages[9].msgID.(*messageID))
+ pc.AckID(messages[9].msgID.(messageID))
select {
case <-eventsCh:
diff --git a/pulsar/consumer_regex.go b/pulsar/consumer_regex.go
index 75ca657..ff7cbca 100644
--- a/pulsar/consumer_regex.go
+++ b/pulsar/consumer_regex.go
@@ -164,9 +164,9 @@ func (c *regexConsumer) Ack(msg Message) {
// Ack the consumption of a single message, identified by its MessageID
func (c *regexConsumer) AckID(msgID MessageID) {
- mid, ok := msgID.(*messageID)
+ mid, ok := msgID.(messageID)
if !ok {
- c.log.Warnf("invalid message id type")
+ c.log.Warnf("invalid message id type %T", msgID)
return
}
@@ -183,9 +183,9 @@ func (c *regexConsumer) Nack(msg Message) {
}
func (c *regexConsumer) NackID(msgID MessageID) {
- mid, ok := msgID.(*messageID)
+ mid, ok := msgID.(messageID)
if !ok {
- c.log.Warnf("invalid message id type")
+ c.log.Warnf("invalid message id type %T", msgID)
return
}
diff --git a/pulsar/impl_message.go b/pulsar/impl_message.go
index d9574cd..f1a9a7c 100644
--- a/pulsar/impl_message.go
+++ b/pulsar/impl_message.go
@@ -38,7 +38,11 @@ type messageID struct {
consumer acker
}
-func (id *messageID) Ack() {
+func (id messageID) IsZero() bool {
+ return id == messageID{}
+}
+
+func (id messageID) Ack() {
if id.consumer == nil {
return
}
@@ -47,21 +51,21 @@ func (id *messageID) Ack() {
}
}
-func (id *messageID) Nack() {
+func (id messageID) Nack() {
if id.consumer == nil {
return
}
id.consumer.NackID(id)
}
-func (id *messageID) ack() bool {
+func (id messageID) ack() bool {
if id.tracker != nil && id.batchIdx > -1 {
return id.tracker.ack(int(id.batchIdx))
}
return true
}
-func (id *messageID) greater(other *messageID) bool {
+func (id messageID) greater(other messageID) bool {
if id.ledgerID != other.ledgerID {
return id.ledgerID > other.ledgerID
}
@@ -73,22 +77,22 @@ func (id *messageID) greater(other *messageID) bool {
return id.batchIdx > other.batchIdx
}
-func (id *messageID) equal(other *messageID) bool {
+func (id messageID) equal(other messageID) bool {
return id.ledgerID == other.ledgerID &&
id.entryID == other.entryID &&
id.batchIdx == other.batchIdx
}
-func (id *messageID) greaterEqual(other *messageID) bool {
+func (id messageID) greaterEqual(other messageID) bool {
return id.equal(other) || id.greater(other)
}
-func (id *messageID) Serialize() []byte {
+func (id messageID) Serialize() []byte {
msgID := &pb.MessageIdData{
LedgerId: proto.Uint64(uint64(id.ledgerID)),
EntryId: proto.Uint64(uint64(id.entryID)),
- BatchIndex: proto.Int(int(id.batchIdx)),
- Partition: proto.Int(int(id.partitionIdx)),
+ BatchIndex: proto.Int32(id.batchIdx),
+ Partition: proto.Int32(id.partitionIdx),
}
data, _ := proto.Marshal(msgID)
return data
@@ -110,7 +114,7 @@ func deserializeMessageID(data []byte) (MessageID, error) {
}
func newMessageID(ledgerID int64, entryID int64, batchIdx int32, partitionIdx
int32) MessageID {
- return &messageID{
+ return messageID{
ledgerID: ledgerID,
entryID: entryID,
batchIdx: batchIdx,
@@ -119,8 +123,8 @@ func newMessageID(ledgerID int64, entryID int64, batchIdx
int32, partitionIdx in
}
func newTrackingMessageID(ledgerID int64, entryID int64, batchIdx int32,
partitionIdx int32,
- tracker *ackTracker) *messageID {
- return &messageID{
+ tracker *ackTracker) messageID {
+ return messageID{
ledgerID: ledgerID,
entryID: entryID,
batchIdx: batchIdx,
diff --git a/pulsar/impl_message_test.go b/pulsar/impl_message_test.go
index 164cff6..8c66411 100644
--- a/pulsar/impl_message_test.go
+++ b/pulsar/impl_message_test.go
@@ -31,10 +31,10 @@ func TestMessageId(t *testing.T) {
assert.NoError(t, err)
assert.NotNil(t, id2)
- assert.Equal(t, int64(1), id2.(*messageID).ledgerID)
- assert.Equal(t, int64(2), id2.(*messageID).entryID)
- assert.Equal(t, int32(3), id2.(*messageID).batchIdx)
- assert.Equal(t, int32(4), id2.(*messageID).partitionIdx)
+ assert.Equal(t, int64(1), id2.(messageID).ledgerID)
+ assert.Equal(t, int64(2), id2.(messageID).entryID)
+ assert.Equal(t, int32(3), id2.(messageID).batchIdx)
+ assert.Equal(t, int32(4), id2.(messageID).partitionIdx)
id, err = DeserializeMessageID(nil)
assert.Error(t, err)
@@ -82,7 +82,7 @@ func TestAckingMessageIDBatchOne(t *testing.T) {
func TestAckingMessageIDBatchTwo(t *testing.T) {
tracker := newAckTracker(2)
- ids := []*messageID{
+ ids := []messageID{
newTrackingMessageID(1, 1, 0, 0, tracker),
newTrackingMessageID(1, 1, 1, 0, tracker),
}
@@ -93,7 +93,7 @@ func TestAckingMessageIDBatchTwo(t *testing.T) {
// try reverse order
tracker = newAckTracker(2)
- ids = []*messageID{
+ ids = []messageID{
newTrackingMessageID(1, 1, 0, 0, tracker),
newTrackingMessageID(1, 1, 1, 0, tracker),
}
diff --git a/pulsar/negative_acks_tracker.go b/pulsar/negative_acks_tracker.go
index f887844..a7dc88e 100644
--- a/pulsar/negative_acks_tracker.go
+++ b/pulsar/negative_acks_tracker.go
@@ -51,7 +51,7 @@ func newNegativeAcksTracker(rc redeliveryConsumer, delay
time.Duration) *negativ
return t
}
-func (t *negativeAcksTracker) Add(msgID *messageID) {
+func (t *negativeAcksTracker) Add(msgID messageID) {
// Always clear up the batch index since we want to track the nack
// for the entire batch
batchMsgID := messageID{
diff --git a/pulsar/negative_acks_tracker_test.go
b/pulsar/negative_acks_tracker_test.go
index f7a1c50..3930b7f 100644
--- a/pulsar/negative_acks_tracker_test.go
+++ b/pulsar/negative_acks_tracker_test.go
@@ -76,13 +76,13 @@ func TestNacksTracker(t *testing.T) {
nmc := newNackMockedConsumer()
nacks := newNegativeAcksTracker(nmc, testNackDelay)
- nacks.Add(&messageID{
+ nacks.Add(messageID{
ledgerID: 1,
entryID: 1,
batchIdx: 1,
})
- nacks.Add(&messageID{
+ nacks.Add(messageID{
ledgerID: 2,
entryID: 2,
batchIdx: 1,
@@ -107,25 +107,25 @@ func TestNacksWithBatchesTracker(t *testing.T) {
nmc := newNackMockedConsumer()
nacks := newNegativeAcksTracker(nmc, testNackDelay)
- nacks.Add(&messageID{
+ nacks.Add(messageID{
ledgerID: 1,
entryID: 1,
batchIdx: 1,
})
- nacks.Add(&messageID{
+ nacks.Add(messageID{
ledgerID: 1,
entryID: 1,
batchIdx: 2,
})
- nacks.Add(&messageID{
+ nacks.Add(messageID{
ledgerID: 1,
entryID: 1,
batchIdx: 3,
})
- nacks.Add(&messageID{
+ nacks.Add(messageID{
ledgerID: 2,
entryID: 2,
batchIdx: 1,
diff --git a/pulsar/producer_test.go b/pulsar/producer_test.go
index 95e4ed1..8d389cb 100644
--- a/pulsar/producer_test.go
+++ b/pulsar/producer_test.go
@@ -322,7 +322,7 @@ func TestFlushInProducer(t *testing.T) {
assert.Nil(t, err)
msgCount++
- msgID := msg.ID().(*messageID)
+ msgID := msg.ID().(messageID)
// Since messages are batched, they will be sharing the same
ledgerId/entryId
if ledgerID == -1 {
ledgerID = msgID.ledgerID
diff --git a/pulsar/reader_impl.go b/pulsar/reader_impl.go
index b3399dc..b74b35b 100644
--- a/pulsar/reader_impl.go
+++ b/pulsar/reader_impl.go
@@ -30,7 +30,7 @@ const (
type reader struct {
pc *partitionConsumer
messageCh chan ConsumerMessage
- lastMessageInBroker *messageID
+ lastMessageInBroker messageID
log *log.Entry
}
@@ -44,17 +44,17 @@ func newReader(client *client, options ReaderOptions)
(Reader, error) {
return nil, newError(ResultInvalidConfiguration,
"StartMessageID is required")
}
- var startMessageID *messageID
+ var startMessageID messageID
var ok bool
- if startMessageID, ok = options.StartMessageID.(*messageID); !ok {
- // a custom type satisfying MessageID may not be a *messageID
- // so re-create *messageID using its data
+ if startMessageID, ok = options.StartMessageID.(messageID); !ok {
+ // a custom type satisfying MessageID may not be a messageID
+ // so re-create messageID using its data
deserMsgID, err :=
deserializeMessageID(options.StartMessageID.Serialize())
if err != nil {
return nil, err
}
- // de-serialized MessageID is a *messageID
- startMessageID = deserMsgID.(*messageID)
+ // de-serialized MessageID is a messageID
+ startMessageID = deserMsgID.(messageID)
}
subscriptionName := options.SubscriptionRolePrefix
@@ -118,7 +118,7 @@ func (r *reader) Next(ctx context.Context) (Message, error)
{
// Acknowledge message immediately because the reader
is based on non-durable subscription. When it reconnects,
// it will specify the subscription position anyway
- msgID := cm.Message.ID().(*messageID)
+ msgID := cm.Message.ID().(messageID)
r.pc.lastDequeuedMsg = msgID
r.pc.AckID(msgID)
return cm.Message, nil
@@ -129,7 +129,7 @@ func (r *reader) Next(ctx context.Context) (Message, error)
{
}
func (r *reader) HasNext() bool {
- if r.lastMessageInBroker != nil && r.hasMoreMessages() {
+ if !r.lastMessageInBroker.IsZero() && r.hasMoreMessages() {
return true
}
@@ -148,7 +148,7 @@ func (r *reader) HasNext() bool {
}
func (r *reader) hasMoreMessages() bool {
- if r.pc.lastDequeuedMsg != nil {
+ if !r.pc.lastDequeuedMsg.IsZero() {
return r.lastMessageInBroker.greater(r.pc.lastDequeuedMsg)
}