This is an automated email from the ASF dual-hosted git repository.

mmerli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar-client-go.git


The following commit(s) were added to refs/heads/master by this push:
     new c6d905d  Different MessageID implementations for message Production 
and Consumption (#324)
c6d905d is described below

commit c6d905ddc6e05f0ddd45689f5a46cb9cfa062b71
Author: dferstay <[email protected]>
AuthorDate: Thu Jul 23 19:17:29 2020 -0700

    Different MessageID implementations for message Production and Consumption 
(#324)
    
    This change splits the `MessageID` implementation in two:
    1. `messageID` - A 24 byte structure that contains message identification
                     information only; to be used during message production
    2. `trackingMessageID` - A 72 byte structucture that shares the same
                             message identification information as `messageID`
                             and adds `ackTracker`, `acker`, and `receivedTime`
                             fields; to be used during message consumption
    
    Micro benchmarks show that passing arguments by value that are less-than
    four words of memory are optimized by the Go runtime.  Results from the
    pulsar/impl_message_bench_test.go module are below.
    
    ```
    name            time/op
    ProducerCall    1.46ns ± 5%
    ProducerCall-4  1.47ns ± 5%
    ConsumerCall    7.62ns ± 1%
    ConsumerCall-4  7.53ns ± 5%
    ```
    
    Co-authored-by: Daniel Ferstay <[email protected]>
---
 pulsar/consumer_impl.go           |  14 ++---
 pulsar/consumer_multitopic.go     |   4 +-
 pulsar/consumer_partition.go      | 104 +++++++++++++++++++++-----------------
 pulsar/consumer_partition_test.go |  14 ++---
 pulsar/consumer_regex.go          |   4 +-
 pulsar/impl_message.go            |  41 +++++++++++----
 pulsar/impl_message_bench_test.go |  49 ++++++++++++++++++
 pulsar/impl_message_test.go       |   4 +-
 pulsar/producer_test.go           |   2 +-
 pulsar/reader_impl.go             |  37 ++++++++------
 10 files changed, 180 insertions(+), 93 deletions(-)

diff --git a/pulsar/consumer_impl.go b/pulsar/consumer_impl.go
index ef93037..b5de1f9 100644
--- a/pulsar/consumer_impl.go
+++ b/pulsar/consumer_impl.go
@@ -56,8 +56,8 @@ var ErrConsumerClosed = errors.New("consumer closed")
 const defaultNackRedeliveryDelay = 1 * time.Minute
 
 type acker interface {
-       AckID(id messageID)
-       NackID(id messageID)
+       AckID(id trackingMessageID)
+       NackID(id trackingMessageID)
 }
 
 type consumer struct {
@@ -263,7 +263,7 @@ func (c *consumer) internalTopicSubscribeToPartitions() 
error {
                                nackRedeliveryDelay:        nackRedeliveryDelay,
                                metadata:                   metadata,
                                replicateSubscriptionState: 
c.options.ReplicateSubscriptionState,
-                               startMessageID:             messageID{},
+                               startMessageID:             trackingMessageID{},
                                subscriptionMode:           durable,
                                readCompacted:              
c.options.ReadCompacted,
                                interceptors:               
c.options.Interceptors,
@@ -488,11 +488,11 @@ func toProtoInitialPosition(p 
SubscriptionInitialPosition) pb.CommandSubscribe_I
        return pb.CommandSubscribe_Latest
 }
 
-func (c *consumer) messageID(msgID MessageID) (messageID, bool) {
-       mid, ok := msgID.(messageID)
+func (c *consumer) messageID(msgID MessageID) (trackingMessageID, bool) {
+       mid, ok := toTrackingMessageID(msgID)
        if !ok {
                c.log.Warnf("invalid message id type %T", msgID)
-               return messageID{}, false
+               return trackingMessageID{}, false
        }
 
        partition := int(mid.partitionIdx)
@@ -500,7 +500,7 @@ func (c *consumer) messageID(msgID MessageID) (messageID, 
bool) {
        if partition < 0 || partition >= len(c.consumers) {
                c.log.Warnf("invalid partition index %d expected a partition 
between [0-%d]",
                        partition, len(c.consumers))
-               return messageID{}, false
+               return trackingMessageID{}, false
        }
 
        return mid, true
diff --git a/pulsar/consumer_multitopic.go b/pulsar/consumer_multitopic.go
index f526487..8d34203 100644
--- a/pulsar/consumer_multitopic.go
+++ b/pulsar/consumer_multitopic.go
@@ -120,7 +120,7 @@ func (c *multiTopicConsumer) Ack(msg Message) {
 
 // Ack the consumption of a single message, identified by its MessageID
 func (c *multiTopicConsumer) AckID(msgID MessageID) {
-       mid, ok := msgID.(messageID)
+       mid, ok := toTrackingMessageID(msgID)
        if !ok {
                c.log.Warnf("invalid message id type %T", msgID)
                return
@@ -139,7 +139,7 @@ func (c *multiTopicConsumer) Nack(msg Message) {
 }
 
 func (c *multiTopicConsumer) NackID(msgID MessageID) {
-       mid, ok := msgID.(messageID)
+       mid, ok := toTrackingMessageID(msgID)
        if !ok {
                c.log.Warnf("invalid message id type %T", msgID)
                return
diff --git a/pulsar/consumer_partition.go b/pulsar/consumer_partition.go
index b83cb88..b3fc17c 100644
--- a/pulsar/consumer_partition.go
+++ b/pulsar/consumer_partition.go
@@ -109,7 +109,7 @@ type partitionConsumerOpts struct {
        nackRedeliveryDelay        time.Duration
        metadata                   map[string]string
        replicateSubscriptionState bool
-       startMessageID             messageID
+       startMessageID             trackingMessageID
        startMessageIDInclusive    bool
        subscriptionMode           subscriptionMode
        readCompacted              bool
@@ -141,13 +141,13 @@ type partitionConsumer struct {
        // the size of the queue channel for buffering messages
        queueSize       int32
        queueCh         chan []*message
-       startMessageID  messageID
-       lastDequeuedMsg messageID
+       startMessageID  trackingMessageID
+       lastDequeuedMsg trackingMessageID
 
        eventsCh     chan interface{}
        connectedCh  chan struct{}
        closeCh      chan struct{}
-       clearQueueCh chan func(id messageID)
+       clearQueueCh chan func(id trackingMessageID)
 
        nackTracker *negativeAcksTracker
        dlq         *dlqRouter
@@ -175,7 +175,7 @@ func newPartitionConsumer(parent Consumer, client *client, 
options *partitionCon
                connectedCh:          make(chan struct{}),
                messageCh:            messageCh,
                closeCh:              make(chan struct{}),
-               clearQueueCh:         make(chan func(id messageID)),
+               clearQueueCh:         make(chan func(id trackingMessageID)),
                compressionProviders: 
make(map[pb.CompressionType]compression.Provider),
                dlq:                  dlq,
                log:                  log.WithField("topic", options.topic),
@@ -241,7 +241,7 @@ func (pc *partitionConsumer) internalUnsubscribe(unsub 
*unsubscribeRequest) {
        pc.state = consumerClosed
 }
 
-func (pc *partitionConsumer) getLastMessageID() (messageID, error) {
+func (pc *partitionConsumer) getLastMessageID() (trackingMessageID, error) {
        req := &getLastMsgIDRequest{doneCh: make(chan struct{})}
        pc.eventsCh <- req
 
@@ -269,8 +269,8 @@ func (pc *partitionConsumer) internalGetLastMessageID(req 
*getLastMsgIDRequest)
        }
 }
 
-func (pc *partitionConsumer) AckID(msgID messageID) {
-       if !msgID.IsZero() && msgID.ack() {
+func (pc *partitionConsumer) AckID(msgID trackingMessageID) {
+       if !msgID.Undefined() && msgID.ack() {
                acksCounter.Inc()
                
processingTime.Observe(float64(time.Now().UnixNano()-msgID.receivedTime.UnixNano())
 / 1.0e9)
                req := &ackRequest{
@@ -282,8 +282,8 @@ func (pc *partitionConsumer) AckID(msgID messageID) {
        }
 }
 
-func (pc *partitionConsumer) NackID(msgID messageID) {
-       pc.nackTracker.Add(msgID)
+func (pc *partitionConsumer) NackID(msgID trackingMessageID) {
+       pc.nackTracker.Add(msgID.messageID)
        nacksCounter.Inc()
 }
 
@@ -328,7 +328,7 @@ func (pc *partitionConsumer) Close() {
        <-req.doneCh
 }
 
-func (pc *partitionConsumer) Seek(msgID messageID) error {
+func (pc *partitionConsumer) Seek(msgID trackingMessageID) error {
        req := &seekRequest{
                doneCh: make(chan struct{}),
                msgID:  msgID,
@@ -522,17 +522,17 @@ func (pc *partitionConsumer) MessageReceived(response 
*pb.CommandMessage, header
        return nil
 }
 
-func (pc *partitionConsumer) messageShouldBeDiscarded(msgID messageID) bool {
-       if pc.startMessageID.IsZero() {
+func (pc *partitionConsumer) messageShouldBeDiscarded(msgID trackingMessageID) 
bool {
+       if pc.startMessageID.Undefined() {
                return false
        }
 
        if pc.options.startMessageIDInclusive {
-               return pc.startMessageID.greater(msgID)
+               return pc.startMessageID.greater(msgID.messageID)
        }
 
        // Non inclusive
-       return pc.startMessageID.greaterEqual(msgID)
+       return pc.startMessageID.greaterEqual(msgID.messageID)
 }
 
 func (pc *partitionConsumer) ConnectionClosed() {
@@ -647,7 +647,7 @@ func (pc *partitionConsumer) dispatcher() {
                case clearQueueCb := <-pc.clearQueueCh:
                        // drain the message queue on any new connection by 
sending a
                        // special nil message to the channel so we know when 
to stop dropping messages
-                       var nextMessageInQueue messageID
+                       var nextMessageInQueue trackingMessageID
                        go func() {
                                pc.queueCh <- nil
                        }()
@@ -655,8 +655,8 @@ func (pc *partitionConsumer) dispatcher() {
                                // the queue has been drained
                                if m == nil {
                                        break
-                               } else if nextMessageInQueue.IsZero() {
-                                       nextMessageInQueue = 
m[0].msgID.(messageID)
+                               } else if nextMessageInQueue.Undefined() {
+                                       nextMessageInQueue = 
m[0].msgID.(trackingMessageID)
                                }
                        }
 
@@ -666,7 +666,7 @@ func (pc *partitionConsumer) dispatcher() {
 }
 
 type ackRequest struct {
-       msgID messageID
+       msgID trackingMessageID
 }
 
 type unsubscribeRequest struct {
@@ -684,13 +684,13 @@ type redeliveryRequest struct {
 
 type getLastMsgIDRequest struct {
        doneCh chan struct{}
-       msgID  messageID
+       msgID  trackingMessageID
        err    error
 }
 
 type seekRequest struct {
        doneCh chan struct{}
-       msgID  messageID
+       msgID  trackingMessageID
        err    error
 }
 
@@ -870,15 +870,15 @@ func (pc *partitionConsumer) grabConn() error {
        }
 }
 
-func (pc *partitionConsumer) clearQueueAndGetNextMessage() messageID {
+func (pc *partitionConsumer) clearQueueAndGetNextMessage() trackingMessageID {
        if pc.state != consumerReady {
-               return messageID{}
+               return trackingMessageID{}
        }
        wg := &sync.WaitGroup{}
        wg.Add(1)
-       var msgID messageID
+       var msgID trackingMessageID
 
-       pc.clearQueueCh <- func(id messageID) {
+       pc.clearQueueCh <- func(id trackingMessageID) {
                msgID = id
                wg.Done()
        }
@@ -891,12 +891,12 @@ func (pc *partitionConsumer) 
clearQueueAndGetNextMessage() messageID {
  * Clear the internal receiver queue and returns the message id of what was 
the 1st message in the queue that was
  * not seen by the application
  */
-func (pc *partitionConsumer) clearReceiverQueue() messageID {
+func (pc *partitionConsumer) clearReceiverQueue() trackingMessageID {
        nextMessageInQueue := pc.clearQueueAndGetNextMessage()
 
-       if !nextMessageInQueue.IsZero() {
+       if !nextMessageInQueue.Undefined() {
                return getPreviousMessage(nextMessageInQueue)
-       } else if !pc.lastDequeuedMsg.IsZero() {
+       } else if !pc.lastDequeuedMsg.Undefined() {
                // If the queue was empty we need to restart from the message 
just after the last one that has been dequeued
                // in the past
                return pc.lastDequeuedMsg
@@ -906,22 +906,32 @@ func (pc *partitionConsumer) clearReceiverQueue() 
messageID {
        }
 }
 
-func getPreviousMessage(mid messageID) messageID {
+func getPreviousMessage(mid trackingMessageID) trackingMessageID {
        if mid.batchIdx >= 0 {
-               return messageID{
-                       ledgerID:     mid.ledgerID,
-                       entryID:      mid.entryID,
-                       batchIdx:     mid.batchIdx - 1,
-                       partitionIdx: mid.partitionIdx,
+               return trackingMessageID{
+                       messageID: messageID{
+                               ledgerID:     mid.ledgerID,
+                               entryID:      mid.entryID,
+                               batchIdx:     mid.batchIdx - 1,
+                               partitionIdx: mid.partitionIdx,
+                       },
+                       tracker:      mid.tracker,
+                       consumer:     mid.consumer,
+                       receivedTime: mid.receivedTime,
                }
        }
 
        // Get on previous message in previous entry
-       return messageID{
-               ledgerID:     mid.ledgerID,
-               entryID:      mid.entryID - 1,
-               batchIdx:     mid.batchIdx,
-               partitionIdx: mid.partitionIdx,
+       return trackingMessageID{
+               messageID: messageID{
+                       ledgerID:     mid.ledgerID,
+                       entryID:      mid.entryID - 1,
+                       batchIdx:     mid.batchIdx,
+                       partitionIdx: mid.partitionIdx,
+               },
+               tracker:      mid.tracker,
+               consumer:     mid.consumer,
+               receivedTime: mid.receivedTime,
        }
 }
 
@@ -977,8 +987,8 @@ func (pc *partitionConsumer) discardCorruptedMessage(msgID 
*pb.MessageIdData,
                })
 }
 
-func convertToMessageIDData(msgID messageID) *pb.MessageIdData {
-       if msgID.IsZero() {
+func convertToMessageIDData(msgID trackingMessageID) *pb.MessageIdData {
+       if msgID.Undefined() {
                return nil
        }
 
@@ -988,14 +998,16 @@ func convertToMessageIDData(msgID messageID) 
*pb.MessageIdData {
        }
 }
 
-func convertToMessageID(id *pb.MessageIdData) messageID {
+func convertToMessageID(id *pb.MessageIdData) trackingMessageID {
        if id == nil {
-               return messageID{}
+               return trackingMessageID{}
        }
 
-       msgID := messageID{
-               ledgerID: int64(*id.LedgerId),
-               entryID:  int64(*id.EntryId),
+       msgID := trackingMessageID{
+               messageID: messageID{
+                       ledgerID: int64(*id.LedgerId),
+                       entryID:  int64(*id.EntryId),
+               },
        }
        if id.BatchIndex != nil {
                msgID.batchIdx = *id.BatchIndex
diff --git a/pulsar/consumer_partition_test.go 
b/pulsar/consumer_partition_test.go
index 01831a4..68cbdd6 100644
--- a/pulsar/consumer_partition_test.go
+++ b/pulsar/consumer_partition_test.go
@@ -45,11 +45,11 @@ func TestSingleMessageIDNoAckTracker(t *testing.T) {
        // ensure the tracker was set on the message id
        messages := <-pc.queueCh
        for _, m := range messages {
-               assert.Nil(t, m.ID().(messageID).tracker)
+               assert.Nil(t, m.ID().(trackingMessageID).tracker)
        }
 
        // ack the message id
-       pc.AckID(messages[0].msgID.(messageID))
+       pc.AckID(messages[0].msgID.(trackingMessageID))
 
        select {
        case <-eventsCh:
@@ -75,11 +75,11 @@ func TestBatchMessageIDNoAckTracker(t *testing.T) {
        // ensure the tracker was set on the message id
        messages := <-pc.queueCh
        for _, m := range messages {
-               assert.Nil(t, m.ID().(messageID).tracker)
+               assert.Nil(t, m.ID().(trackingMessageID).tracker)
        }
 
        // ack the message id
-       pc.AckID(messages[0].msgID.(messageID))
+       pc.AckID(messages[0].msgID.(trackingMessageID))
 
        select {
        case <-eventsCh:
@@ -105,12 +105,12 @@ func TestBatchMessageIDWithAckTracker(t *testing.T) {
        // ensure the tracker was set on the message id
        messages := <-pc.queueCh
        for _, m := range messages {
-               assert.NotNil(t, m.ID().(messageID).tracker)
+               assert.NotNil(t, m.ID().(trackingMessageID).tracker)
        }
 
        // ack all message ids except the last one
        for i := 0; i < 9; i++ {
-               pc.AckID(messages[i].msgID.(messageID))
+               pc.AckID(messages[i].msgID.(trackingMessageID))
        }
 
        select {
@@ -120,7 +120,7 @@ func TestBatchMessageIDWithAckTracker(t *testing.T) {
        }
 
        // ack last message
-       pc.AckID(messages[9].msgID.(messageID))
+       pc.AckID(messages[9].msgID.(trackingMessageID))
 
        select {
        case <-eventsCh:
diff --git a/pulsar/consumer_regex.go b/pulsar/consumer_regex.go
index a6dfd56..e0fdbcb 100644
--- a/pulsar/consumer_regex.go
+++ b/pulsar/consumer_regex.go
@@ -165,7 +165,7 @@ func (c *regexConsumer) Ack(msg Message) {
 
 // Ack the consumption of a single message, identified by its MessageID
 func (c *regexConsumer) AckID(msgID MessageID) {
-       mid, ok := msgID.(messageID)
+       mid, ok := toTrackingMessageID(msgID)
        if !ok {
                c.log.Warnf("invalid message id type %T", msgID)
                return
@@ -184,7 +184,7 @@ func (c *regexConsumer) Nack(msg Message) {
 }
 
 func (c *regexConsumer) NackID(msgID MessageID) {
-       mid, ok := msgID.(messageID)
+       mid, ok := toTrackingMessageID(msgID)
        if !ok {
                c.log.Warnf("invalid message id type %T", msgID)
                return
diff --git a/pulsar/impl_message.go b/pulsar/impl_message.go
index 562dfb6..d670a37 100644
--- a/pulsar/impl_message.go
+++ b/pulsar/impl_message.go
@@ -33,17 +33,21 @@ type messageID struct {
        entryID      int64
        batchIdx     int32
        partitionIdx int32
+}
+
+type trackingMessageID struct {
+       messageID
 
        tracker      *ackTracker
        consumer     acker
        receivedTime time.Time
 }
 
-func (id messageID) IsZero() bool {
-       return id == messageID{}
+func (id trackingMessageID) Undefined() bool {
+       return id == trackingMessageID{}
 }
 
-func (id messageID) Ack() {
+func (id trackingMessageID) Ack() {
        if id.consumer == nil {
                return
        }
@@ -52,14 +56,14 @@ func (id messageID) Ack() {
        }
 }
 
-func (id messageID) Nack() {
+func (id trackingMessageID) Nack() {
        if id.consumer == nil {
                return
        }
        id.consumer.NackID(id)
 }
 
-func (id messageID) ack() bool {
+func (id trackingMessageID) ack() bool {
        if id.tracker != nil && id.batchIdx > -1 {
                return id.tracker.ack(int(id.batchIdx))
        }
@@ -124,17 +128,32 @@ func newMessageID(ledgerID int64, entryID int64, batchIdx 
int32, partitionIdx in
 }
 
 func newTrackingMessageID(ledgerID int64, entryID int64, batchIdx int32, 
partitionIdx int32,
-       tracker *ackTracker) messageID {
-       return messageID{
-               ledgerID:     ledgerID,
-               entryID:      entryID,
-               batchIdx:     batchIdx,
-               partitionIdx: partitionIdx,
+       tracker *ackTracker) trackingMessageID {
+       return trackingMessageID{
+               messageID: messageID{
+                       ledgerID:     ledgerID,
+                       entryID:      entryID,
+                       batchIdx:     batchIdx,
+                       partitionIdx: partitionIdx,
+               },
                tracker:      tracker,
                receivedTime: time.Now(),
        }
 }
 
+func toTrackingMessageID(msgID MessageID) (trackingMessageID, bool) {
+       if mid, ok := msgID.(messageID); ok {
+               return trackingMessageID{
+                       messageID:    mid,
+                       receivedTime: time.Now(),
+               }, true
+       } else if mid, ok := msgID.(trackingMessageID); ok {
+               return mid, true
+       } else {
+               return trackingMessageID{}, false
+       }
+}
+
 func timeFromUnixTimestampMillis(timestamp uint64) time.Time {
        ts := int64(timestamp) * int64(time.Millisecond)
        seconds := ts / int64(time.Second)
diff --git a/pulsar/impl_message_bench_test.go 
b/pulsar/impl_message_bench_test.go
new file mode 100644
index 0000000..4b6ca10
--- /dev/null
+++ b/pulsar/impl_message_bench_test.go
@@ -0,0 +1,49 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package pulsar
+
+import (
+       "testing"
+)
+
+var (
+       usedByProducer messageID
+       usedByConsumer trackingMessageID
+)
+
+func producerCall(id messageID) messageID {
+       id.entryID++
+       return id
+}
+
+func consumerCall(id trackingMessageID) trackingMessageID {
+       id.entryID++
+       return id
+}
+
+func BenchmarkProducerCall(b *testing.B) {
+       for i := 0; i < b.N; i++ {
+               usedByProducer = producerCall(usedByProducer)
+       }
+}
+
+func BenchmarkConsumerCall(b *testing.B) {
+       for i := 0; i < b.N; i++ {
+               usedByConsumer = consumerCall(usedByConsumer)
+       }
+}
diff --git a/pulsar/impl_message_test.go b/pulsar/impl_message_test.go
index 8c66411..4e8b644 100644
--- a/pulsar/impl_message_test.go
+++ b/pulsar/impl_message_test.go
@@ -82,7 +82,7 @@ func TestAckingMessageIDBatchOne(t *testing.T) {
 
 func TestAckingMessageIDBatchTwo(t *testing.T) {
        tracker := newAckTracker(2)
-       ids := []messageID{
+       ids := []trackingMessageID{
                newTrackingMessageID(1, 1, 0, 0, tracker),
                newTrackingMessageID(1, 1, 1, 0, tracker),
        }
@@ -93,7 +93,7 @@ func TestAckingMessageIDBatchTwo(t *testing.T) {
 
        // try reverse order
        tracker = newAckTracker(2)
-       ids = []messageID{
+       ids = []trackingMessageID{
                newTrackingMessageID(1, 1, 0, 0, tracker),
                newTrackingMessageID(1, 1, 1, 0, tracker),
        }
diff --git a/pulsar/producer_test.go b/pulsar/producer_test.go
index d06ddbb..20cfd91 100644
--- a/pulsar/producer_test.go
+++ b/pulsar/producer_test.go
@@ -322,7 +322,7 @@ func TestFlushInProducer(t *testing.T) {
                assert.Nil(t, err)
                msgCount++
 
-               msgID := msg.ID().(messageID)
+               msgID := msg.ID().(trackingMessageID)
                // Since messages are batched, they will be sharing the same 
ledgerId/entryId
                if ledgerID == -1 {
                        ledgerID = msgID.ledgerID
diff --git a/pulsar/reader_impl.go b/pulsar/reader_impl.go
index d97cc96..474d0db 100644
--- a/pulsar/reader_impl.go
+++ b/pulsar/reader_impl.go
@@ -19,6 +19,8 @@ package pulsar
 
 import (
        "context"
+       "fmt"
+       "time"
 
        "github.com/prometheus/client_golang/prometheus"
        "github.com/prometheus/client_golang/prometheus/promauto"
@@ -45,7 +47,7 @@ var (
 type reader struct {
        pc                  *partitionConsumer
        messageCh           chan ConsumerMessage
-       lastMessageInBroker messageID
+       lastMessageInBroker trackingMessageID
 
        log *log.Entry
 }
@@ -59,17 +61,19 @@ func newReader(client *client, options ReaderOptions) 
(Reader, error) {
                return nil, newError(ResultInvalidConfiguration, 
"StartMessageID is required")
        }
 
-       var startMessageID messageID
-       var ok bool
-       if startMessageID, ok = options.StartMessageID.(messageID); !ok {
-               // a custom type satisfying MessageID may not be a messageID
+       startMessageID, ok := toTrackingMessageID(options.StartMessageID)
+       if !ok {
+               // a custom type satisfying MessageID may not be a messageID or 
trackingMessageID
                // so re-create messageID using its data
                deserMsgID, err := 
deserializeMessageID(options.StartMessageID.Serialize())
                if err != nil {
                        return nil, err
                }
                // de-serialized MessageID is a messageID
-               startMessageID = deserMsgID.(messageID)
+               startMessageID = trackingMessageID{
+                       messageID:    deserMsgID.(messageID),
+                       receivedTime: time.Now(),
+               }
        }
 
        subscriptionName := options.SubscriptionRolePrefix
@@ -134,10 +138,13 @@ func (r *reader) Next(ctx context.Context) (Message, 
error) {
 
                        // Acknowledge message immediately because the reader 
is based on non-durable subscription. When it reconnects,
                        // it will specify the subscription position anyway
-                       msgID := cm.Message.ID().(messageID)
-                       r.pc.lastDequeuedMsg = msgID
-                       r.pc.AckID(msgID)
-                       return cm.Message, nil
+                       msgID := cm.Message.ID()
+                       if mid, ok := toTrackingMessageID(msgID); ok {
+                               r.pc.lastDequeuedMsg = mid
+                               r.pc.AckID(mid)
+                               return cm.Message, nil
+                       }
+                       return nil, fmt.Errorf("invalid message id type %T", 
msgID)
                case <-ctx.Done():
                        return nil, ctx.Err()
                }
@@ -145,7 +152,7 @@ func (r *reader) Next(ctx context.Context) (Message, error) 
{
 }
 
 func (r *reader) HasNext() bool {
-       if !r.lastMessageInBroker.IsZero() && r.hasMoreMessages() {
+       if !r.lastMessageInBroker.Undefined() && r.hasMoreMessages() {
                return true
        }
 
@@ -164,16 +171,16 @@ func (r *reader) HasNext() bool {
 }
 
 func (r *reader) hasMoreMessages() bool {
-       if !r.pc.lastDequeuedMsg.IsZero() {
-               return r.lastMessageInBroker.greater(r.pc.lastDequeuedMsg)
+       if !r.pc.lastDequeuedMsg.Undefined() {
+               return 
r.lastMessageInBroker.greater(r.pc.lastDequeuedMsg.messageID)
        }
 
        if r.pc.options.startMessageIDInclusive {
-               return r.lastMessageInBroker.greaterEqual(r.pc.startMessageID)
+               return 
r.lastMessageInBroker.greaterEqual(r.pc.startMessageID.messageID)
        }
 
        // Non-inclusive
-       return r.lastMessageInBroker.greater(r.pc.startMessageID)
+       return r.lastMessageInBroker.greater(r.pc.startMessageID.messageID)
 }
 
 func (r *reader) Close() {

Reply via email to