This is an automated email from the ASF dual-hosted git repository.

mmerli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar-client-go.git


The following commit(s) were added to refs/heads/master by this push:
     new 9065cb7  Do not allocate MessageIDs on the heap (#319)
9065cb7 is described below

commit 9065cb7e0c3e22e6bd491531ca0344b8c643de00
Author: dferstay <[email protected]>
AuthorDate: Fri Jul 10 09:57:46 2020 -0700

    Do not allocate MessageIDs on the heap (#319)
    
    Passing a function parameter by pointer (or writing a pointer into a
    channel) will cause the Go escape analysis to allocate the parameter on
    the heap.
    
    This change passes messageID struct instances by value instead of by
    pointer; this keeps messageID structs on the stack.
    
    Each message produced or consumed by the library is associated with
    a MessageID; keeping instances of the MessageID structure on the stack
    reduces heap allocation and associated GC cost.
    
    Signed-off-by: Daniel Ferstay <[email protected]>
    
    Co-authored-by: Daniel Ferstay <[email protected]>
---
 pulsar/consumer_impl.go              | 16 ++++-----
 pulsar/consumer_multitopic.go        |  8 ++---
 pulsar/consumer_partition.go         | 67 ++++++++++++++++++------------------
 pulsar/consumer_partition_test.go    | 14 ++++----
 pulsar/consumer_regex.go             |  8 ++---
 pulsar/impl_message.go               | 28 ++++++++-------
 pulsar/impl_message_test.go          | 12 +++----
 pulsar/negative_acks_tracker.go      |  2 +-
 pulsar/negative_acks_tracker_test.go | 12 +++----
 pulsar/producer_test.go              |  2 +-
 pulsar/reader_impl.go                | 20 +++++------
 11 files changed, 96 insertions(+), 93 deletions(-)

diff --git a/pulsar/consumer_impl.go b/pulsar/consumer_impl.go
index f9ee004..feebcf2 100644
--- a/pulsar/consumer_impl.go
+++ b/pulsar/consumer_impl.go
@@ -36,8 +36,8 @@ var ErrConsumerClosed = errors.New("consumer closed")
 const defaultNackRedeliveryDelay = 1 * time.Minute
 
 type acker interface {
-       AckID(id *messageID)
-       NackID(id *messageID)
+       AckID(id messageID)
+       NackID(id messageID)
 }
 
 type consumer struct {
@@ -239,7 +239,7 @@ func (c *consumer) internalTopicSubscribeToPartitions() 
error {
                                nackRedeliveryDelay:        nackRedeliveryDelay,
                                metadata:                   metadata,
                                replicateSubscriptionState: 
c.options.ReplicateSubscriptionState,
-                               startMessageID:             nil,
+                               startMessageID:             messageID{},
                                subscriptionMode:           durable,
                                readCompacted:              
c.options.ReadCompacted,
                        }
@@ -456,11 +456,11 @@ func toProtoInitialPosition(p 
SubscriptionInitialPosition) pb.CommandSubscribe_I
        return pb.CommandSubscribe_Latest
 }
 
-func (c *consumer) messageID(msgID MessageID) (*messageID, bool) {
-       mid, ok := msgID.(*messageID)
+func (c *consumer) messageID(msgID MessageID) (messageID, bool) {
+       mid, ok := msgID.(messageID)
        if !ok {
-               c.log.Warnf("invalid message id type")
-               return nil, false
+               c.log.Warnf("invalid message id type %T", msgID)
+               return messageID{}, false
        }
 
        partition := int(mid.partitionIdx)
@@ -468,7 +468,7 @@ func (c *consumer) messageID(msgID MessageID) (*messageID, 
bool) {
        if partition < 0 || partition >= len(c.consumers) {
                c.log.Warnf("invalid partition index %d expected a partition 
between [0-%d]",
                        partition, len(c.consumers))
-               return nil, false
+               return messageID{}, false
        }
 
        return mid, true
diff --git a/pulsar/consumer_multitopic.go b/pulsar/consumer_multitopic.go
index a5386cb..ec4d57a 100644
--- a/pulsar/consumer_multitopic.go
+++ b/pulsar/consumer_multitopic.go
@@ -119,9 +119,9 @@ func (c *multiTopicConsumer) Ack(msg Message) {
 
 // Ack the consumption of a single message, identified by its MessageID
 func (c *multiTopicConsumer) AckID(msgID MessageID) {
-       mid, ok := msgID.(*messageID)
+       mid, ok := msgID.(messageID)
        if !ok {
-               c.log.Warnf("invalid message id type")
+               c.log.Warnf("invalid message id type %T", msgID)
                return
        }
 
@@ -138,9 +138,9 @@ func (c *multiTopicConsumer) Nack(msg Message) {
 }
 
 func (c *multiTopicConsumer) NackID(msgID MessageID) {
-       mid, ok := msgID.(*messageID)
+       mid, ok := msgID.(messageID)
        if !ok {
-               c.log.Warnf("invalid message id type")
+               c.log.Warnf("invalid message id type %T", msgID)
                return
        }
 
diff --git a/pulsar/consumer_partition.go b/pulsar/consumer_partition.go
index cffc2b4..e539e51 100644
--- a/pulsar/consumer_partition.go
+++ b/pulsar/consumer_partition.go
@@ -63,7 +63,7 @@ type partitionConsumerOpts struct {
        nackRedeliveryDelay        time.Duration
        metadata                   map[string]string
        replicateSubscriptionState bool
-       startMessageID             *messageID
+       startMessageID             messageID
        startMessageIDInclusive    bool
        subscriptionMode           subscriptionMode
        readCompacted              bool
@@ -94,13 +94,13 @@ type partitionConsumer struct {
        // the size of the queue channel for buffering messages
        queueSize       int32
        queueCh         chan []*message
-       startMessageID  *messageID
-       lastDequeuedMsg *messageID
+       startMessageID  messageID
+       lastDequeuedMsg messageID
 
        eventsCh     chan interface{}
        connectedCh  chan struct{}
        closeCh      chan struct{}
-       clearQueueCh chan func(id *messageID)
+       clearQueueCh chan func(id messageID)
 
        nackTracker *negativeAcksTracker
        dlq         *dlqRouter
@@ -128,7 +128,7 @@ func newPartitionConsumer(parent Consumer, client *client, 
options *partitionCon
                connectedCh:          make(chan struct{}),
                messageCh:            messageCh,
                closeCh:              make(chan struct{}),
-               clearQueueCh:         make(chan func(id *messageID)),
+               clearQueueCh:         make(chan func(id messageID)),
                compressionProviders: 
make(map[pb.CompressionType]compression.Provider),
                dlq:                  dlq,
                log:                  log.WithField("topic", options.topic),
@@ -192,7 +192,7 @@ func (pc *partitionConsumer) internalUnsubscribe(unsub 
*unsubscribeRequest) {
        pc.state = consumerClosed
 }
 
-func (pc *partitionConsumer) getLastMessageID() (*messageID, error) {
+func (pc *partitionConsumer) getLastMessageID() (messageID, error) {
        req := &getLastMsgIDRequest{doneCh: make(chan struct{})}
        pc.eventsCh <- req
 
@@ -220,8 +220,8 @@ func (pc *partitionConsumer) internalGetLastMessageID(req 
*getLastMsgIDRequest)
        }
 }
 
-func (pc *partitionConsumer) AckID(msgID *messageID) {
-       if msgID != nil && msgID.ack() {
+func (pc *partitionConsumer) AckID(msgID messageID) {
+       if !msgID.IsZero() && msgID.ack() {
                req := &ackRequest{
                        msgID: msgID,
                }
@@ -229,7 +229,7 @@ func (pc *partitionConsumer) AckID(msgID *messageID) {
        }
 }
 
-func (pc *partitionConsumer) NackID(msgID *messageID) {
+func (pc *partitionConsumer) NackID(msgID messageID) {
        pc.nackTracker.Add(msgID)
 }
 
@@ -268,7 +268,7 @@ func (pc *partitionConsumer) Close() {
        <-req.doneCh
 }
 
-func (pc *partitionConsumer) Seek(msgID *messageID) error {
+func (pc *partitionConsumer) Seek(msgID messageID) error {
        req := &seekRequest{
                doneCh: make(chan struct{}),
                msgID:  msgID,
@@ -450,8 +450,8 @@ func (pc *partitionConsumer) MessageReceived(response 
*pb.CommandMessage, header
        return nil
 }
 
-func (pc *partitionConsumer) messageShouldBeDiscarded(msgID *messageID) bool {
-       if pc.startMessageID == nil {
+func (pc *partitionConsumer) messageShouldBeDiscarded(msgID messageID) bool {
+       if pc.startMessageID.IsZero() {
                return false
        }
 
@@ -571,7 +571,7 @@ func (pc *partitionConsumer) dispatcher() {
                case clearQueueCb := <-pc.clearQueueCh:
                        // drain the message queue on any new connection by 
sending a
                        // special nil message to the channel so we know when 
to stop dropping messages
-                       var nextMessageInQueue *messageID
+                       var nextMessageInQueue messageID
                        go func() {
                                pc.queueCh <- nil
                        }()
@@ -579,8 +579,8 @@ func (pc *partitionConsumer) dispatcher() {
                                // the queue has been drained
                                if m == nil {
                                        break
-                               } else if nextMessageInQueue == nil {
-                                       nextMessageInQueue = 
m[0].msgID.(*messageID)
+                               } else if nextMessageInQueue.IsZero() {
+                                       nextMessageInQueue = 
m[0].msgID.(messageID)
                                }
                        }
 
@@ -590,7 +590,7 @@ func (pc *partitionConsumer) dispatcher() {
 }
 
 type ackRequest struct {
-       msgID *messageID
+       msgID messageID
 }
 
 type unsubscribeRequest struct {
@@ -608,13 +608,13 @@ type redeliveryRequest struct {
 
 type getLastMsgIDRequest struct {
        doneCh chan struct{}
-       msgID  *messageID
+       msgID  messageID
        err    error
 }
 
 type seekRequest struct {
        doneCh chan struct{}
-       msgID  *messageID
+       msgID  messageID
        err    error
 }
 
@@ -794,15 +794,15 @@ func (pc *partitionConsumer) grabConn() error {
        }
 }
 
-func (pc *partitionConsumer) clearQueueAndGetNextMessage() *messageID {
+func (pc *partitionConsumer) clearQueueAndGetNextMessage() messageID {
        if pc.state != consumerReady {
-               return nil
+               return messageID{}
        }
        wg := &sync.WaitGroup{}
        wg.Add(1)
-       var msgID *messageID
+       var msgID messageID
 
-       pc.clearQueueCh <- func(id *messageID) {
+       pc.clearQueueCh <- func(id messageID) {
                msgID = id
                wg.Done()
        }
@@ -815,12 +815,12 @@ func (pc *partitionConsumer) 
clearQueueAndGetNextMessage() *messageID {
  * Clear the internal receiver queue and returns the message id of what was 
the 1st message in the queue that was
  * not seen by the application
  */
-func (pc *partitionConsumer) clearReceiverQueue() *messageID {
+func (pc *partitionConsumer) clearReceiverQueue() messageID {
        nextMessageInQueue := pc.clearQueueAndGetNextMessage()
 
-       if nextMessageInQueue != nil {
+       if !nextMessageInQueue.IsZero() {
                return getPreviousMessage(nextMessageInQueue)
-       } else if pc.lastDequeuedMsg != nil {
+       } else if !pc.lastDequeuedMsg.IsZero() {
                // If the queue was empty we need to restart from the message 
just after the last one that has been dequeued
                // in the past
                return pc.lastDequeuedMsg
@@ -830,9 +830,9 @@ func (pc *partitionConsumer) clearReceiverQueue() 
*messageID {
        }
 }
 
-func getPreviousMessage(mid *messageID) *messageID {
+func getPreviousMessage(mid messageID) messageID {
        if mid.batchIdx >= 0 {
-               return &messageID{
+               return messageID{
                        ledgerID:     mid.ledgerID,
                        entryID:      mid.entryID,
                        batchIdx:     mid.batchIdx - 1,
@@ -841,7 +841,7 @@ func getPreviousMessage(mid *messageID) *messageID {
        }
 
        // Get on previous message in previous entry
-       return &messageID{
+       return messageID{
                ledgerID:     mid.ledgerID,
                entryID:      mid.entryID - 1,
                batchIdx:     mid.batchIdx,
@@ -901,8 +901,8 @@ func (pc *partitionConsumer) discardCorruptedMessage(msgID 
*pb.MessageIdData,
                })
 }
 
-func convertToMessageIDData(msgID *messageID) *pb.MessageIdData {
-       if msgID == nil {
+func convertToMessageIDData(msgID messageID) *pb.MessageIdData {
+       if msgID.IsZero() {
                return nil
        }
 
@@ -912,16 +912,15 @@ func convertToMessageIDData(msgID *messageID) 
*pb.MessageIdData {
        }
 }
 
-func convertToMessageID(id *pb.MessageIdData) *messageID {
+func convertToMessageID(id *pb.MessageIdData) messageID {
        if id == nil {
-               return nil
+               return messageID{}
        }
 
-       msgID := &messageID{
+       msgID := messageID{
                ledgerID: int64(*id.LedgerId),
                entryID:  int64(*id.EntryId),
        }
-
        if id.BatchIndex != nil {
                msgID.batchIdx = *id.BatchIndex
        }
diff --git a/pulsar/consumer_partition_test.go 
b/pulsar/consumer_partition_test.go
index 5a5a94e..0fcbdc5 100644
--- a/pulsar/consumer_partition_test.go
+++ b/pulsar/consumer_partition_test.go
@@ -44,11 +44,11 @@ func TestSingleMessageIDNoAckTracker(t *testing.T) {
        // ensure the tracker was set on the message id
        messages := <-pc.queueCh
        for _, m := range messages {
-               assert.Nil(t, m.ID().(*messageID).tracker)
+               assert.Nil(t, m.ID().(messageID).tracker)
        }
 
        // ack the message id
-       pc.AckID(messages[0].msgID.(*messageID))
+       pc.AckID(messages[0].msgID.(messageID))
 
        select {
        case <-eventsCh:
@@ -73,11 +73,11 @@ func TestBatchMessageIDNoAckTracker(t *testing.T) {
        // ensure the tracker was set on the message id
        messages := <-pc.queueCh
        for _, m := range messages {
-               assert.Nil(t, m.ID().(*messageID).tracker)
+               assert.Nil(t, m.ID().(messageID).tracker)
        }
 
        // ack the message id
-       pc.AckID(messages[0].msgID.(*messageID))
+       pc.AckID(messages[0].msgID.(messageID))
 
        select {
        case <-eventsCh:
@@ -102,12 +102,12 @@ func TestBatchMessageIDWithAckTracker(t *testing.T) {
        // ensure the tracker was set on the message id
        messages := <-pc.queueCh
        for _, m := range messages {
-               assert.NotNil(t, m.ID().(*messageID).tracker)
+               assert.NotNil(t, m.ID().(messageID).tracker)
        }
 
        // ack all message ids except the last one
        for i := 0; i < 9; i++ {
-               pc.AckID(messages[i].msgID.(*messageID))
+               pc.AckID(messages[i].msgID.(messageID))
        }
 
        select {
@@ -117,7 +117,7 @@ func TestBatchMessageIDWithAckTracker(t *testing.T) {
        }
 
        // ack last message
-       pc.AckID(messages[9].msgID.(*messageID))
+       pc.AckID(messages[9].msgID.(messageID))
 
        select {
        case <-eventsCh:
diff --git a/pulsar/consumer_regex.go b/pulsar/consumer_regex.go
index 75ca657..ff7cbca 100644
--- a/pulsar/consumer_regex.go
+++ b/pulsar/consumer_regex.go
@@ -164,9 +164,9 @@ func (c *regexConsumer) Ack(msg Message) {
 
 // Ack the consumption of a single message, identified by its MessageID
 func (c *regexConsumer) AckID(msgID MessageID) {
-       mid, ok := msgID.(*messageID)
+       mid, ok := msgID.(messageID)
        if !ok {
-               c.log.Warnf("invalid message id type")
+               c.log.Warnf("invalid message id type %T", msgID)
                return
        }
 
@@ -183,9 +183,9 @@ func (c *regexConsumer) Nack(msg Message) {
 }
 
 func (c *regexConsumer) NackID(msgID MessageID) {
-       mid, ok := msgID.(*messageID)
+       mid, ok := msgID.(messageID)
        if !ok {
-               c.log.Warnf("invalid message id type")
+               c.log.Warnf("invalid message id type %T", msgID)
                return
        }
 
diff --git a/pulsar/impl_message.go b/pulsar/impl_message.go
index d9574cd..f1a9a7c 100644
--- a/pulsar/impl_message.go
+++ b/pulsar/impl_message.go
@@ -38,7 +38,11 @@ type messageID struct {
        consumer acker
 }
 
-func (id *messageID) Ack() {
+func (id messageID) IsZero() bool {
+       return id == messageID{}
+}
+
+func (id messageID) Ack() {
        if id.consumer == nil {
                return
        }
@@ -47,21 +51,21 @@ func (id *messageID) Ack() {
        }
 }
 
-func (id *messageID) Nack() {
+func (id messageID) Nack() {
        if id.consumer == nil {
                return
        }
        id.consumer.NackID(id)
 }
 
-func (id *messageID) ack() bool {
+func (id messageID) ack() bool {
        if id.tracker != nil && id.batchIdx > -1 {
                return id.tracker.ack(int(id.batchIdx))
        }
        return true
 }
 
-func (id *messageID) greater(other *messageID) bool {
+func (id messageID) greater(other messageID) bool {
        if id.ledgerID != other.ledgerID {
                return id.ledgerID > other.ledgerID
        }
@@ -73,22 +77,22 @@ func (id *messageID) greater(other *messageID) bool {
        return id.batchIdx > other.batchIdx
 }
 
-func (id *messageID) equal(other *messageID) bool {
+func (id messageID) equal(other messageID) bool {
        return id.ledgerID == other.ledgerID &&
                id.entryID == other.entryID &&
                id.batchIdx == other.batchIdx
 }
 
-func (id *messageID) greaterEqual(other *messageID) bool {
+func (id messageID) greaterEqual(other messageID) bool {
        return id.equal(other) || id.greater(other)
 }
 
-func (id *messageID) Serialize() []byte {
+func (id messageID) Serialize() []byte {
        msgID := &pb.MessageIdData{
                LedgerId:   proto.Uint64(uint64(id.ledgerID)),
                EntryId:    proto.Uint64(uint64(id.entryID)),
-               BatchIndex: proto.Int(int(id.batchIdx)),
-               Partition:  proto.Int(int(id.partitionIdx)),
+               BatchIndex: proto.Int32(id.batchIdx),
+               Partition:  proto.Int32(id.partitionIdx),
        }
        data, _ := proto.Marshal(msgID)
        return data
@@ -110,7 +114,7 @@ func deserializeMessageID(data []byte) (MessageID, error) {
 }
 
 func newMessageID(ledgerID int64, entryID int64, batchIdx int32, partitionIdx 
int32) MessageID {
-       return &messageID{
+       return messageID{
                ledgerID:     ledgerID,
                entryID:      entryID,
                batchIdx:     batchIdx,
@@ -119,8 +123,8 @@ func newMessageID(ledgerID int64, entryID int64, batchIdx 
int32, partitionIdx in
 }
 
 func newTrackingMessageID(ledgerID int64, entryID int64, batchIdx int32, 
partitionIdx int32,
-       tracker *ackTracker) *messageID {
-       return &messageID{
+       tracker *ackTracker) messageID {
+       return messageID{
                ledgerID:     ledgerID,
                entryID:      entryID,
                batchIdx:     batchIdx,
diff --git a/pulsar/impl_message_test.go b/pulsar/impl_message_test.go
index 164cff6..8c66411 100644
--- a/pulsar/impl_message_test.go
+++ b/pulsar/impl_message_test.go
@@ -31,10 +31,10 @@ func TestMessageId(t *testing.T) {
        assert.NoError(t, err)
        assert.NotNil(t, id2)
 
-       assert.Equal(t, int64(1), id2.(*messageID).ledgerID)
-       assert.Equal(t, int64(2), id2.(*messageID).entryID)
-       assert.Equal(t, int32(3), id2.(*messageID).batchIdx)
-       assert.Equal(t, int32(4), id2.(*messageID).partitionIdx)
+       assert.Equal(t, int64(1), id2.(messageID).ledgerID)
+       assert.Equal(t, int64(2), id2.(messageID).entryID)
+       assert.Equal(t, int32(3), id2.(messageID).batchIdx)
+       assert.Equal(t, int32(4), id2.(messageID).partitionIdx)
 
        id, err = DeserializeMessageID(nil)
        assert.Error(t, err)
@@ -82,7 +82,7 @@ func TestAckingMessageIDBatchOne(t *testing.T) {
 
 func TestAckingMessageIDBatchTwo(t *testing.T) {
        tracker := newAckTracker(2)
-       ids := []*messageID{
+       ids := []messageID{
                newTrackingMessageID(1, 1, 0, 0, tracker),
                newTrackingMessageID(1, 1, 1, 0, tracker),
        }
@@ -93,7 +93,7 @@ func TestAckingMessageIDBatchTwo(t *testing.T) {
 
        // try reverse order
        tracker = newAckTracker(2)
-       ids = []*messageID{
+       ids = []messageID{
                newTrackingMessageID(1, 1, 0, 0, tracker),
                newTrackingMessageID(1, 1, 1, 0, tracker),
        }
diff --git a/pulsar/negative_acks_tracker.go b/pulsar/negative_acks_tracker.go
index f887844..a7dc88e 100644
--- a/pulsar/negative_acks_tracker.go
+++ b/pulsar/negative_acks_tracker.go
@@ -51,7 +51,7 @@ func newNegativeAcksTracker(rc redeliveryConsumer, delay 
time.Duration) *negativ
        return t
 }
 
-func (t *negativeAcksTracker) Add(msgID *messageID) {
+func (t *negativeAcksTracker) Add(msgID messageID) {
        // Always clear up the batch index since we want to track the nack
        // for the entire batch
        batchMsgID := messageID{
diff --git a/pulsar/negative_acks_tracker_test.go 
b/pulsar/negative_acks_tracker_test.go
index f7a1c50..3930b7f 100644
--- a/pulsar/negative_acks_tracker_test.go
+++ b/pulsar/negative_acks_tracker_test.go
@@ -76,13 +76,13 @@ func TestNacksTracker(t *testing.T) {
        nmc := newNackMockedConsumer()
        nacks := newNegativeAcksTracker(nmc, testNackDelay)
 
-       nacks.Add(&messageID{
+       nacks.Add(messageID{
                ledgerID: 1,
                entryID:  1,
                batchIdx: 1,
        })
 
-       nacks.Add(&messageID{
+       nacks.Add(messageID{
                ledgerID: 2,
                entryID:  2,
                batchIdx: 1,
@@ -107,25 +107,25 @@ func TestNacksWithBatchesTracker(t *testing.T) {
        nmc := newNackMockedConsumer()
        nacks := newNegativeAcksTracker(nmc, testNackDelay)
 
-       nacks.Add(&messageID{
+       nacks.Add(messageID{
                ledgerID: 1,
                entryID:  1,
                batchIdx: 1,
        })
 
-       nacks.Add(&messageID{
+       nacks.Add(messageID{
                ledgerID: 1,
                entryID:  1,
                batchIdx: 2,
        })
 
-       nacks.Add(&messageID{
+       nacks.Add(messageID{
                ledgerID: 1,
                entryID:  1,
                batchIdx: 3,
        })
 
-       nacks.Add(&messageID{
+       nacks.Add(messageID{
                ledgerID: 2,
                entryID:  2,
                batchIdx: 1,
diff --git a/pulsar/producer_test.go b/pulsar/producer_test.go
index 95e4ed1..8d389cb 100644
--- a/pulsar/producer_test.go
+++ b/pulsar/producer_test.go
@@ -322,7 +322,7 @@ func TestFlushInProducer(t *testing.T) {
                assert.Nil(t, err)
                msgCount++
 
-               msgID := msg.ID().(*messageID)
+               msgID := msg.ID().(messageID)
                // Since messages are batched, they will be sharing the same 
ledgerId/entryId
                if ledgerID == -1 {
                        ledgerID = msgID.ledgerID
diff --git a/pulsar/reader_impl.go b/pulsar/reader_impl.go
index b3399dc..b74b35b 100644
--- a/pulsar/reader_impl.go
+++ b/pulsar/reader_impl.go
@@ -30,7 +30,7 @@ const (
 type reader struct {
        pc                  *partitionConsumer
        messageCh           chan ConsumerMessage
-       lastMessageInBroker *messageID
+       lastMessageInBroker messageID
 
        log *log.Entry
 }
@@ -44,17 +44,17 @@ func newReader(client *client, options ReaderOptions) 
(Reader, error) {
                return nil, newError(ResultInvalidConfiguration, 
"StartMessageID is required")
        }
 
-       var startMessageID *messageID
+       var startMessageID messageID
        var ok bool
-       if startMessageID, ok = options.StartMessageID.(*messageID); !ok {
-               // a custom type satisfying MessageID may not be a *messageID
-               // so re-create *messageID using its data
+       if startMessageID, ok = options.StartMessageID.(messageID); !ok {
+               // a custom type satisfying MessageID may not be a messageID
+               // so re-create messageID using its data
                deserMsgID, err := 
deserializeMessageID(options.StartMessageID.Serialize())
                if err != nil {
                        return nil, err
                }
-               // de-serialized MessageID is a *messageID
-               startMessageID = deserMsgID.(*messageID)
+               // de-serialized MessageID is a messageID
+               startMessageID = deserMsgID.(messageID)
        }
 
        subscriptionName := options.SubscriptionRolePrefix
@@ -118,7 +118,7 @@ func (r *reader) Next(ctx context.Context) (Message, error) 
{
 
                        // Acknowledge message immediately because the reader 
is based on non-durable subscription. When it reconnects,
                        // it will specify the subscription position anyway
-                       msgID := cm.Message.ID().(*messageID)
+                       msgID := cm.Message.ID().(messageID)
                        r.pc.lastDequeuedMsg = msgID
                        r.pc.AckID(msgID)
                        return cm.Message, nil
@@ -129,7 +129,7 @@ func (r *reader) Next(ctx context.Context) (Message, error) 
{
 }
 
 func (r *reader) HasNext() bool {
-       if r.lastMessageInBroker != nil && r.hasMoreMessages() {
+       if !r.lastMessageInBroker.IsZero() && r.hasMoreMessages() {
                return true
        }
 
@@ -148,7 +148,7 @@ func (r *reader) HasNext() bool {
 }
 
 func (r *reader) hasMoreMessages() bool {
-       if r.pc.lastDequeuedMsg != nil {
+       if !r.pc.lastDequeuedMsg.IsZero() {
                return r.lastMessageInBroker.greater(r.pc.lastDequeuedMsg)
        }
 

Reply via email to