This is an automated email from the ASF dual-hosted git repository.

mmerli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar-client-go.git


The following commit(s) were added to refs/heads/master by this push:
     new 1eb64f0  Added a message id tracker for acking messages that are 
batched. (#82)
1eb64f0 is described below

commit 1eb64f05d980d9d7e3b47a88351c53acd80796ef
Author: cckellogg <[email protected]>
AuthorDate: Mon Nov 11 09:37:04 2019 -0800

    Added a message id tracker for acking messages that are batched. (#82)
    
    * Added a message id tracker for acking messages that are batched.
    
    * Update ack tracker functions.
---
 pulsar/impl_message.go      | 77 ++++++++++++++++++++++++++++++++++++---------
 pulsar/impl_message_test.go | 57 +++++++++++++++++++++++++++++++++
 pulsar/message.go           | 19 ++++++-----
 3 files changed, 132 insertions(+), 21 deletions(-)

diff --git a/pulsar/impl_message.go b/pulsar/impl_message.go
index 38d372a..a8e079d 100644
--- a/pulsar/impl_message.go
+++ b/pulsar/impl_message.go
@@ -18,30 +18,31 @@
 package pulsar
 
 import (
+       "math/big"
+       "strings"
+       "sync"
        "time"
 
-       "github.com/apache/pulsar-client-go/pkg/pb"
        "github.com/golang/protobuf/proto"
-)
 
-func earliestMessageID() MessageID {
-       return newMessageID(-1, -1, -1, -1)
-}
+       "github.com/apache/pulsar-client-go/pkg/pb"
+)
 
 type messageID struct {
        ledgerID     int64
        entryID      int64
        batchIdx     int
        partitionIdx int
+
+       tracker *ackTracker
 }
 
-func newMessageID(ledgerID int64, entryID int64, batchIdx int, partitionIdx 
int) MessageID {
-       return &messageID{
-               ledgerID:     ledgerID,
-               entryID:      entryID,
-               batchIdx:     batchIdx,
-               partitionIdx: partitionIdx,
+func (id *messageID) ack() bool {
+       if id.tracker != nil && id.batchIdx > -1 {
+               return id.tracker.ack(id.batchIdx)
        }
+
+       return true
 }
 
 func (id *messageID) Serialize() []byte {
@@ -70,10 +71,24 @@ func deserializeMessageID(data []byte) (MessageID, error) {
        return id, nil
 }
 
-const maxLong int64 = 0x7fffffffffffffff
+func newMessageID(ledgerID int64, entryID int64, batchIdx int, partitionIdx 
int) MessageID {
+       return &messageID{
+               ledgerID:     ledgerID,
+               entryID:      entryID,
+               batchIdx:     batchIdx,
+               partitionIdx: partitionIdx,
+       }
+}
 
-func latestMessageID() MessageID {
-       return newMessageID(maxLong, maxLong, -1, -1)
+func newTrackingMessageID(ledgerID int64, entryID int64, batchIdx int, 
partitionIdx int,
+       tracker *ackTracker) *messageID {
+       return &messageID{
+               ledgerID:     ledgerID,
+               entryID:      entryID,
+               batchIdx:     batchIdx,
+               partitionIdx: partitionIdx,
+               tracker:      tracker,
+       }
 }
 
 func timeFromUnixTimestampMillis(timestamp uint64) time.Time {
@@ -126,3 +141,37 @@ func (msg *message) EventTime() time.Time {
 func (msg *message) Key() string {
        return msg.key
 }
+
+func newAckTracker(size int) *ackTracker {
+       var batchIDs *big.Int
+       if size <= 64 {
+               shift := uint32(64 - size)
+               setBits := ^uint64(0) >> shift
+               batchIDs = new(big.Int).SetUint64(setBits)
+       } else {
+               batchIDs, _ = new(big.Int).SetString(strings.Repeat("1", size), 
2)
+       }
+       return &ackTracker{
+               size:     size,
+               batchIDs: batchIDs,
+       }
+}
+
+type ackTracker struct {
+       sync.Mutex
+       size     int
+       batchIDs *big.Int
+}
+
+func (t *ackTracker) ack(batchID int) bool {
+       t.Lock()
+       defer t.Unlock()
+       t.batchIDs = t.batchIDs.SetBit(t.batchIDs, batchID, 0)
+       return len(t.batchIDs.Bits()) == 0
+}
+
+func (t *ackTracker) completed() bool {
+       t.Lock()
+       defer t.Unlock()
+       return len(t.batchIDs.Bits()) == 0
+}
diff --git a/pulsar/impl_message_test.go b/pulsar/impl_message_test.go
index bd48495..e4ba3c1 100644
--- a/pulsar/impl_message_test.go
+++ b/pulsar/impl_message_test.go
@@ -44,3 +44,60 @@ func TestMessageId(t *testing.T) {
        assert.Error(t, err)
        assert.Nil(t, id)
 }
+
+func TestAckTracker(t *testing.T) {
+       tracker := newAckTracker(1)
+       assert.Equal(t, true, tracker.ack(0))
+
+       // test 64
+       tracker = newAckTracker(64)
+       for i := 0; i < 64; i++ {
+               if i < 63 {
+                       assert.Equal(t, false, tracker.ack(i))
+               } else {
+                       assert.Equal(t, true, tracker.ack(i))
+               }
+       }
+       assert.Equal(t, true, tracker.completed())
+
+       // test large number 1000
+       tracker = newAckTracker(1000)
+       for i := 0; i < 1000; i++ {
+               if i < 999 {
+                       assert.Equal(t, false, tracker.ack(i))
+               } else {
+                       assert.Equal(t, true, tracker.ack(i))
+               }
+
+       }
+       assert.Equal(t, true, tracker.completed())
+}
+
+func TestAckingMessageIDBatchOne(t *testing.T) {
+       tracker := newAckTracker(1)
+       msgId := newTrackingMessageID(1, 1, 0, 0, tracker)
+       assert.Equal(t, true, msgId.ack())
+       assert.Equal(t, true, tracker.completed())
+}
+
+func TestAckingMessageIDBatchTwo(t *testing.T) {
+       tracker := newAckTracker(2)
+       ids := []*messageID{
+               newTrackingMessageID(1, 1, 0, 0, tracker),
+               newTrackingMessageID(1, 1, 1, 0, tracker),
+       }
+
+       assert.Equal(t, false, ids[0].ack())
+       assert.Equal(t, true, ids[1].ack())
+       assert.Equal(t, true, tracker.completed())
+
+       // try reverse order
+       tracker = newAckTracker(2)
+       ids = []*messageID{
+               newTrackingMessageID(1, 1, 0, 0, tracker),
+               newTrackingMessageID(1, 1, 1, 0, tracker),
+       }
+       assert.Equal(t, false, ids[1].ack())
+       assert.Equal(t, true, ids[0].ack())
+       assert.Equal(t, true, tracker.completed())
+}
diff --git a/pulsar/message.go b/pulsar/message.go
index a70827d..dd4fcff 100644
--- a/pulsar/message.go
+++ b/pulsar/message.go
@@ -17,7 +17,10 @@
 
 package pulsar
 
-import "time"
+import (
+       "math"
+       "time"
+)
 
 // ProducerMessage abstraction used in Pulsar producer
 type ProducerMessage struct {
@@ -79,10 +82,12 @@ func DeserializeMessageID(data []byte) (MessageID, error) {
        return deserializeMessageID(data)
 }
 
-var (
-       // EarliestMessage messageID that points to the earliest message 
available in a topic
-       EarliestMessage = earliestMessageID()
+// EarliestMessageID returns a messageID that points to the earliest message 
available in a topic
+func EarliestMessageID() MessageID {
+       return newMessageID(-1, -1, -1, -1)
+}
 
-       // LatestMessage messageID that points to the latest message
-       LatestMessage = latestMessageID()
-)
+// LatestMessage returns a messageID that points to the latest message
+func LatestMessageID() MessageID {
+       return newMessageID(math.MaxInt64, math.MaxInt64, -1, -1)
+}

Reply via email to