This is an automated email from the ASF dual-hosted git repository.

xyz pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar-client-go.git


The following commit(s) were added to refs/heads/master by this push:
     new 352c463  Optimize batch index ACK performance (#988)
352c463 is described below

commit 352c463194916cab7772c0729c9cdac14404e8bd
Author: Yunze Xu <xyzinfern...@163.com>
AuthorDate: Fri Mar 10 09:57:38 2023 +0800

    Optimize batch index ACK performance (#988)
    
    ### Motivation
    
    Currently, when `EnableBatchIndexAck` is true, the ACK performance is
    very poor. There are two main reasons:
    1. Acknowledgment by list is not supported. It means that even N
       MessageIDs are grouped, there are still N ACK requests to send.
    2. The implementation of ACK grouping tracker is wrong. Give a batch
       that has N messages, when batch index ACK is enabled, each MessageID
       is cached. However, after all these N MessageIDs arrived, the current
       implementation does not clear them.
    
    ### Modifications
    - Add a `func(id []*pb.MessageIdData)` to the ACK grouping tracker. When
      flushing individual ACKs, construct the slice and wrap the slice to
      `CommandAck` directly.
    - Refactor the implementation of the ACK grouping tracker:
      - Do not save each MessageID instance, instead, save the ledger id and
        the entry id as the key of `pendingAcks`.
      - Release the mutex before calling ACK functions
    - Add `TestTrackerPendingAcks` to verify the list of MessageIDs to ACK.
    
    After this change, the ACK order cannot be guaranteed, sort the
    acknowledged MessageIDs in the `ack_grouping_tracker_test.go`.
---
 pulsar/ack_grouping_tracker.go      | 281 ++++++++++++++++++------------------
 pulsar/ack_grouping_tracker_test.go |  54 +++++--
 pulsar/consumer_partition.go        |  13 +-
 pulsar/consumer_partition_test.go   |   6 +-
 pulsar/message.go                   |   8 -
 5 files changed, 196 insertions(+), 166 deletions(-)

diff --git a/pulsar/ack_grouping_tracker.go b/pulsar/ack_grouping_tracker.go
index 22aba11..dd05186 100644
--- a/pulsar/ack_grouping_tracker.go
+++ b/pulsar/ack_grouping_tracker.go
@@ -19,8 +19,10 @@ package pulsar
 
 import (
        "sync"
+       "sync/atomic"
        "time"
 
+       pb "github.com/apache/pulsar-client-go/pulsar/internal/pulsar_proto"
        "github.com/bits-and-blooms/bitset"
 )
 
@@ -40,7 +42,8 @@ type ackGroupingTracker interface {
 
 func newAckGroupingTracker(options *AckGroupingOptions,
        ackIndividual func(id MessageID),
-       ackCumulative func(id MessageID)) ackGroupingTracker {
+       ackCumulative func(id MessageID),
+       ackList func(ids []*pb.MessageIdData)) ackGroupingTracker {
        if options == nil {
                options = &AckGroupingOptions{
                        MaxSize: 1000,
@@ -56,38 +59,28 @@ func newAckGroupingTracker(options *AckGroupingOptions,
        }
 
        t := &timedAckGroupingTracker{
-               singleAcks:        make([]MessageID, options.MaxSize),
-               pendingAcks:       make(map[int64]*bitset.BitSet),
-               lastCumulativeAck: EarliestMessageID(),
-               ackIndividual:     ackIndividual,
+               maxNumAcks:        int(options.MaxSize),
                ackCumulative:     ackCumulative,
-               ackList: func(ids []MessageID) {
-                       // TODO: support ack a list of MessageIDs
-                       for _, id := range ids {
-                               ackIndividual(id)
-                       }
-               },
-               options: *options,
-               tick:    time.NewTicker(time.Hour),
-               donCh:   make(chan struct{}),
+               ackList:           ackList,
+               pendingAcks:       make(map[[2]uint64]*bitset.BitSet),
+               lastCumulativeAck: EarliestMessageID(),
        }
 
        if options.MaxTime > 0 {
-               t.tick = time.NewTicker(options.MaxTime)
-       } else {
-               t.tick.Stop()
+               t.ticker = time.NewTicker(options.MaxTime)
+               t.exitCh = make(chan struct{})
+               go func() {
+                       for {
+                               select {
+                               case <-t.exitCh:
+                                       return
+                               case <-t.ticker.C:
+                                       t.flush()
+                               }
+                       }
+               }()
        }
 
-       go func() {
-               for {
-                       select {
-                       case <-t.donCh:
-                               return
-                       case <-t.tick.C:
-                               t.flush()
-                       }
-               }
-       }()
        return t
 }
 
@@ -117,157 +110,157 @@ func (i *immediateAckGroupingTracker) flushAndClean() {
 func (i *immediateAckGroupingTracker) close() {
 }
 
-func (t *timedAckGroupingTracker) addAndCheckIfFull(id MessageID) bool {
-       t.mutex.Lock()
-       defer t.mutex.Unlock()
-       t.singleAcks[t.index] = id
-       t.index++
-       key := messageIDHash(id)
-       ackSet, found := t.pendingAcks[key]
-       if !found {
-               if messageIDIsBatch(id) {
-                       ackSet = bitset.New(uint(id.BatchSize()))
-                       for i := 0; i < int(id.BatchSize()); i++ {
-                               ackSet.Set(uint(i))
-                       }
-                       t.pendingAcks[key] = ackSet
-               } else {
-                       t.pendingAcks[key] = nil
-               }
-       }
-       if ackSet != nil {
-               ackSet.Clear(uint(id.BatchIdx()))
-       }
-       return t.index == len(t.singleAcks)
-}
-
-func (t *timedAckGroupingTracker) tryUpdateLastCumulativeAck(id MessageID) {
-       t.mutex.Lock()
-       defer t.mutex.Unlock()
-       if messageIDCompare(t.lastCumulativeAck, id) < 0 {
-               t.lastCumulativeAck = id
-               t.cumulativeAckRequired = true
-       }
-}
-
-func (t *timedAckGroupingTracker) flushIndividualAcks() {
-       t.mutex.Lock()
-       defer t.mutex.Unlock()
-       if t.index > 0 {
-               t.ackList(t.singleAcks[0:t.index])
-               for _, id := range t.singleAcks[0:t.index] {
-                       key := messageIDHash(id)
-                       ackSet, found := t.pendingAcks[key]
-                       if !found {
-                               continue
-                       }
-                       if ackSet == nil {
-                               delete(t.pendingAcks, key)
-                       } else {
-                               ackSet.Clear(uint(id.BatchIdx()))
-                               if ackSet.None() { // all messages have been 
acknowledged
-                                       delete(t.pendingAcks, key)
-                               }
-                       }
-                       delete(t.pendingAcks, messageIDHash(id))
-               }
-               t.index = 0
-       }
-}
-
-func (t *timedAckGroupingTracker) flushCumulativeAck() {
-       t.mutex.Lock()
-       defer t.mutex.Unlock()
-       if t.cumulativeAckRequired {
-               t.ackCumulative(t.lastCumulativeAck)
-               t.cumulativeAckRequired = false
-       }
-}
-
-func (t *timedAckGroupingTracker) clean() {
-       t.mutex.Lock()
-       defer t.mutex.Unlock()
-       maxSize := len(t.singleAcks)
-       t.singleAcks = make([]MessageID, maxSize)
-       t.index = 0
-       t.pendingAcks = make(map[int64]*bitset.BitSet)
-       t.lastCumulativeAck = EarliestMessageID()
-       t.cumulativeAckRequired = false
-}
-
 type timedAckGroupingTracker struct {
-       singleAcks []MessageID
-       index      int
+       sync.RWMutex
 
-       // Key is the hash code of the ledger id and the netry id,
+       maxNumAcks    int
+       ackCumulative func(id MessageID)
+       ackList       func(ids []*pb.MessageIdData)
+       ticker        *time.Ticker
+
+       // Key is the pair of the ledger id and the entry id,
        // Value is the bit set that represents which messages are acknowledged 
if the entry stores a batch.
        // The bit 1 represents the message has been acknowledged, i.e. the 
bits "111" represents all messages
        // in the batch whose batch size is 3 are not acknowledged.
        // After the 1st message (i.e. batch index is 0) is acknowledged, the 
bits will become "011".
        // Value is nil if the entry represents a single message.
-       pendingAcks map[int64]*bitset.BitSet
+       pendingAcks map[[2]uint64]*bitset.BitSet
 
        lastCumulativeAck     MessageID
-       cumulativeAckRequired bool
-
-       ackIndividual func(id MessageID)
-       ackCumulative func(id MessageID)
-       ackList       func(ids []MessageID)
-
-       options AckGroupingOptions
-       donCh   chan struct{}
-       tick    *time.Ticker
+       cumulativeAckRequired int32
 
-       mutex sync.RWMutex
+       exitCh chan struct{}
 }
 
 func (t *timedAckGroupingTracker) add(id MessageID) {
-       if t.addAndCheckIfFull(id) {
-               t.flushIndividualAcks()
-               if t.options.MaxTime > 0 {
-                       t.tick.Reset(t.options.MaxTime)
+       if acks := t.tryAddIndividual(id); acks != nil {
+               t.flushIndividual(acks)
+       }
+}
+
+func (t *timedAckGroupingTracker) tryAddIndividual(id MessageID) 
map[[2]uint64]*bitset.BitSet {
+       t.Lock()
+       defer t.Unlock()
+       key := [2]uint64{uint64(id.LedgerID()), uint64(id.EntryID())}
+
+       batchIdx := id.BatchIdx()
+       batchSize := id.BatchSize()
+
+       if batchIdx >= 0 && batchSize > 0 {
+               bs, found := t.pendingAcks[key]
+               if !found {
+                       if batchSize > 1 {
+                               bs = bitset.New(uint(batchSize))
+                               for i := uint(0); i < uint(batchSize); i++ {
+                                       bs.Set(i)
+                               }
+                       }
+                       t.pendingAcks[key] = bs
                }
+               if bs != nil {
+                       bs.Clear(uint(batchIdx))
+               }
+       } else {
+               t.pendingAcks[key] = nil
        }
+
+       if len(t.pendingAcks) >= t.maxNumAcks {
+               pendingAcks := t.pendingAcks
+               t.pendingAcks = make(map[[2]uint64]*bitset.BitSet)
+               return pendingAcks
+       }
+       return nil
 }
 
 func (t *timedAckGroupingTracker) addCumulative(id MessageID) {
-       t.tryUpdateLastCumulativeAck(id)
-       if t.options.MaxTime <= 0 {
-               t.flushCumulativeAck()
+       if t.tryUpdateCumulative(id) && t.ticker == nil {
+               t.ackCumulative(id)
        }
 }
 
+func (t *timedAckGroupingTracker) tryUpdateCumulative(id MessageID) bool {
+       t.Lock()
+       defer t.Unlock()
+       if messageIDCompare(t.lastCumulativeAck, id) < 0 {
+               t.lastCumulativeAck = id
+               atomic.StoreInt32(&t.cumulativeAckRequired, 1)
+               return true
+       }
+       return false
+}
+
 func (t *timedAckGroupingTracker) isDuplicate(id MessageID) bool {
-       t.mutex.RLock()
+       t.RLock()
+       defer t.RUnlock()
        if messageIDCompare(t.lastCumulativeAck, id) >= 0 {
-               t.mutex.RUnlock()
                return true
        }
-       ackSet, found := t.pendingAcks[messageIDHash(id)]
-       if !found {
-               t.mutex.RUnlock()
-               return false
-       }
-       t.mutex.RUnlock()
-       if ackSet == nil || !messageIDIsBatch(id) {
-               // NOTE: should we panic when ackSet != nil and 
messageIDIsBatch(id) is true?
-               return true
+       key := [2]uint64{uint64(id.LedgerID()), uint64(id.EntryID())}
+       if bs, found := t.pendingAcks[key]; found {
+               if bs == nil {
+                       return true
+               }
+               if !bs.Test(uint(id.BatchIdx())) {
+                       return true
+               }
        }
-       // 0 represents the message has been acknowledged
-       return !ackSet.Test(uint(id.BatchIdx()))
+       return false
 }
 
 func (t *timedAckGroupingTracker) flush() {
-       t.flushIndividualAcks()
-       t.flushCumulativeAck()
+       if acks := t.clearPendingAcks(); len(acks) > 0 {
+               t.flushIndividual(acks)
+       }
+       if atomic.CompareAndSwapInt32(&t.cumulativeAckRequired, 1, 0) {
+               t.RLock()
+               id := t.lastCumulativeAck
+               t.RUnlock()
+               t.ackCumulative(id)
+       }
 }
 
 func (t *timedAckGroupingTracker) flushAndClean() {
-       t.flush()
-       t.clean()
+       if acks := t.clearPendingAcks(); len(acks) > 0 {
+               t.flushIndividual(acks)
+       }
+       if atomic.CompareAndSwapInt32(&t.cumulativeAckRequired, 1, 0) {
+               t.Lock()
+               id := t.lastCumulativeAck
+               t.lastCumulativeAck = EarliestMessageID()
+               t.Unlock()
+               t.ackCumulative(id)
+       }
+}
+
+func (t *timedAckGroupingTracker) clearPendingAcks() 
map[[2]uint64]*bitset.BitSet {
+       t.Lock()
+       defer t.Unlock()
+       pendingAcks := t.pendingAcks
+       t.pendingAcks = make(map[[2]uint64]*bitset.BitSet)
+       return pendingAcks
 }
 
 func (t *timedAckGroupingTracker) close() {
        t.flushAndClean()
-       close(t.donCh)
+       if t.exitCh != nil {
+               close(t.exitCh)
+       }
+}
+
+func (t *timedAckGroupingTracker) flushIndividual(pendingAcks 
map[[2]uint64]*bitset.BitSet) {
+       msgIDs := make([]*pb.MessageIdData, 0, len(pendingAcks))
+       for k, v := range pendingAcks {
+               ledgerID := k[0]
+               entryID := k[1]
+               msgID := &pb.MessageIdData{LedgerId: &ledgerID, EntryId: 
&entryID}
+               if v != nil && !v.None() {
+                       bytes := v.Bytes()
+                       msgID.AckSet = make([]int64, len(bytes))
+                       for i := 0; i < len(bytes); i++ {
+                               msgID.AckSet[i] = int64(bytes[i])
+                       }
+               }
+               msgIDs = append(msgIDs, msgID)
+       }
+       t.ackList(msgIDs)
 }
diff --git a/pulsar/ack_grouping_tracker_test.go 
b/pulsar/ack_grouping_tracker_test.go
index 41d24d4..0a794f6 100644
--- a/pulsar/ack_grouping_tracker_test.go
+++ b/pulsar/ack_grouping_tracker_test.go
@@ -19,11 +19,13 @@ package pulsar
 
 import (
        "fmt"
+       "sort"
        "sync"
        "sync/atomic"
        "testing"
        "time"
 
+       pb "github.com/apache/pulsar-client-go/pulsar/internal/pulsar_proto"
        "github.com/stretchr/testify/assert"
 )
 
@@ -45,7 +47,8 @@ func TestNoCacheTracker(t *testing.T) {
                                ledgerID1 := int64(-1)
                                tracker := newAckGroupingTracker(&option,
                                        func(id MessageID) { ledgerID0 = 
id.LedgerID() },
-                                       func(id MessageID) { ledgerID1 = 
id.LedgerID() })
+                                       func(id MessageID) { ledgerID1 = 
id.LedgerID() },
+                                       nil)
 
                                tracker.add(&messageID{ledgerID: 1})
                                assert.Equal(t, atomic.LoadInt64(&ledgerID0), 
int64(1))
@@ -61,10 +64,12 @@ type mockAcker struct {
        cumulativeLedgerID int64
 }
 
-func (a *mockAcker) ack(id MessageID) {
+func (a *mockAcker) ack(ids []*pb.MessageIdData) {
        defer a.Unlock()
        a.Lock()
-       a.ledgerIDs = append(a.ledgerIDs, id.LedgerID())
+       for _, id := range ids {
+               a.ledgerIDs = append(a.ledgerIDs, int64(*id.LedgerId))
+       }
 }
 
 func (a *mockAcker) ackCumulative(id MessageID) {
@@ -74,6 +79,8 @@ func (a *mockAcker) ackCumulative(id MessageID) {
 func (a *mockAcker) getLedgerIDs() []int64 {
        defer a.Unlock()
        a.Lock()
+
+       sort.Slice(a.ledgerIDs, func(i, j int) bool { return a.ledgerIDs[i] < 
a.ledgerIDs[j] })
        return a.ledgerIDs
 }
 
@@ -88,8 +95,8 @@ func (a *mockAcker) reset() {
 
 func TestCachedTracker(t *testing.T) {
        var acker mockAcker
-       tracker := newAckGroupingTracker(&AckGroupingOptions{MaxSize: 3, 
MaxTime: 0},
-               func(id MessageID) { acker.ack(id) }, func(id MessageID) { 
acker.ackCumulative(id) })
+       tracker := newAckGroupingTracker(&AckGroupingOptions{MaxSize: 3, 
MaxTime: 0}, nil,
+               func(id MessageID) { acker.ackCumulative(id) }, func(ids 
[]*pb.MessageIdData) { acker.ack(ids) })
 
        tracker.add(&messageID{ledgerID: 1})
        tracker.add(&messageID{ledgerID: 2})
@@ -126,7 +133,8 @@ func TestCachedTracker(t *testing.T) {
 func TestTimedTrackerIndividualAck(t *testing.T) {
        var acker mockAcker
        // MaxSize: 1000, MaxTime: 100ms
-       tracker := newAckGroupingTracker(nil, func(id MessageID) { 
acker.ack(id) }, nil)
+       tracker := newAckGroupingTracker(nil, nil,
+               func(id MessageID) { acker.ackCumulative(id) }, func(ids 
[]*pb.MessageIdData) { acker.ack(ids) })
 
        expected := make([]int64, 0)
        for i := 0; i < 999; i++ {
@@ -161,7 +169,7 @@ func TestTimedTrackerIndividualAck(t *testing.T) {
 func TestTimedTrackerCumulativeAck(t *testing.T) {
        var acker mockAcker
        // MaxTime is 100ms
-       tracker := newAckGroupingTracker(nil, nil, func(id MessageID) { 
acker.ackCumulative(id) })
+       tracker := newAckGroupingTracker(nil, nil, func(id MessageID) { 
acker.ackCumulative(id) }, nil)
 
        // case 1: flush because of the timeout
        tracker.addCumulative(&messageID{ledgerID: 1})
@@ -182,7 +190,8 @@ func TestTimedTrackerCumulativeAck(t *testing.T) {
 }
 
 func TestTimedTrackerIsDuplicate(t *testing.T) {
-       tracker := newAckGroupingTracker(nil, func(id MessageID) {}, func(id 
MessageID) {})
+       tracker := newAckGroupingTracker(nil, func(id MessageID) {}, func(id 
MessageID) {},
+               func(id []*pb.MessageIdData) {})
 
        tracker.add(&messageID{batchIdx: 0, batchSize: 3})
        tracker.add(&messageID{batchIdx: 2, batchSize: 3})
@@ -198,8 +207,8 @@ func TestTimedTrackerIsDuplicate(t *testing.T) {
 
 func TestDuplicateAfterClose(t *testing.T) {
        var acker mockAcker
-       tracker := newAckGroupingTracker(&AckGroupingOptions{MaxSize: 3, 
MaxTime: 0},
-               func(id MessageID) { acker.ack(id) }, func(id MessageID) { 
acker.ackCumulative(id) })
+       tracker := newAckGroupingTracker(&AckGroupingOptions{MaxSize: 3, 
MaxTime: 0}, nil,
+               func(id MessageID) { acker.ackCumulative(id) }, func(ids 
[]*pb.MessageIdData) { acker.ack(ids) })
 
        tracker.add(&messageID{ledgerID: 1})
        assert.True(t, tracker.isDuplicate(&messageID{ledgerID: 1}))
@@ -207,3 +216,28 @@ func TestDuplicateAfterClose(t *testing.T) {
        tracker.close()
        assert.False(t, tracker.isDuplicate(&messageID{ledgerID: 1}))
 }
+
+func TestTrackerPendingAcks(t *testing.T) {
+       m := make(map[uint64][]int64)
+       tracker := newAckGroupingTracker(&AckGroupingOptions{MaxSize: 3, 
MaxTime: 0}, nil, nil,
+               func(ids []*pb.MessageIdData) {
+                       for _, id := range ids {
+                               m[*id.LedgerId] = id.AckSet
+                       }
+               })
+       tracker.add(&messageID{ledgerID: 0, batchIdx: 0, batchSize: 30})
+       for i := 0; i < 10; i++ {
+               tracker.add(&messageID{ledgerID: 1, batchIdx: int32(i), 
batchSize: 10})
+       }
+       assert.Equal(t, 0, len(m)) // the number of entries is 2, so it's not 
flushed
+       tracker.flush()
+       assert.Equal(t, 2, len(m))
+
+       ackSet, found := m[0]
+       assert.True(t, found)
+       assert.Greater(t, len(ackSet), 0)
+
+       ackSet, found = m[1]
+       assert.True(t, found)
+       assert.Equal(t, 0, len(ackSet)) // all messages in the batch are 
acknowledged
+}
diff --git a/pulsar/consumer_partition.go b/pulsar/consumer_partition.go
index a3dac19..18100a9 100644
--- a/pulsar/consumer_partition.go
+++ b/pulsar/consumer_partition.go
@@ -310,7 +310,8 @@ func newPartitionConsumer(parent Consumer, client *client, 
options *partitionCon
        pc.unAckChunksTracker = newUnAckChunksTracker(pc)
        pc.ackGroupingTracker = 
newAckGroupingTracker(options.ackGroupingOptions,
                func(id MessageID) { pc.sendIndividualAck(id) },
-               func(id MessageID) { pc.sendCumulativeAck(id) })
+               func(id MessageID) { pc.sendCumulativeAck(id) },
+               func(ids []*pb.MessageIdData) { pc.eventsCh <- ids })
        pc.setConsumerState(consumerInit)
        pc.log = client.log.SubLogger(log.Fields{
                "name":         pc.name,
@@ -837,6 +838,14 @@ func (pc *partitionConsumer) internalAck(req *ackRequest) {
        }
 }
 
+func (pc *partitionConsumer) internalAckList(msgIDs []*pb.MessageIdData) {
+       pc.client.rpcClient.RequestOnCnxNoWait(pc._getConn(), 
pb.BaseCommand_ACK, &pb.CommandAck{
+               AckType:    pb.CommandAck_Individual.Enum(),
+               ConsumerId: proto.Uint64(pc.consumerID),
+               MessageId:  msgIDs,
+       })
+}
+
 func (pc *partitionConsumer) MessageReceived(response *pb.CommandMessage, 
headersAndPayload internal.Buffer) error {
        pbMsgID := response.GetMessageId()
 
@@ -1364,6 +1373,8 @@ func (pc *partitionConsumer) runEventsLoop() {
                        switch v := i.(type) {
                        case *ackRequest:
                                pc.internalAck(v)
+                       case []*pb.MessageIdData:
+                               pc.internalAckList(v)
                        case *redeliveryRequest:
                                pc.internalRedeliver(v)
                        case *unsubscribeRequest:
diff --git a/pulsar/consumer_partition_test.go 
b/pulsar/consumer_partition_test.go
index 16c4399..21bc0e3 100644
--- a/pulsar/consumer_partition_test.go
+++ b/pulsar/consumer_partition_test.go
@@ -38,7 +38,7 @@ func TestSingleMessageIDNoAckTracker(t *testing.T) {
                decryptor:            crypto.NewNoopDecryptor(),
        }
        pc.ackGroupingTracker = 
newAckGroupingTracker(&AckGroupingOptions{MaxSize: 0},
-               func(id MessageID) { pc.sendIndividualAck(id) }, nil)
+               func(id MessageID) { pc.sendIndividualAck(id) }, nil, nil)
 
        headersAndPayload := internal.NewBufferWrapper(rawCompatSingleMessage)
        if err := pc.MessageReceived(nil, headersAndPayload); err != nil {
@@ -76,7 +76,7 @@ func TestBatchMessageIDNoAckTracker(t *testing.T) {
                decryptor:            crypto.NewNoopDecryptor(),
        }
        pc.ackGroupingTracker = 
newAckGroupingTracker(&AckGroupingOptions{MaxSize: 0},
-               func(id MessageID) { pc.sendIndividualAck(id) }, nil)
+               func(id MessageID) { pc.sendIndividualAck(id) }, nil, nil)
 
        headersAndPayload := internal.NewBufferWrapper(rawBatchMessage1)
        if err := pc.MessageReceived(nil, headersAndPayload); err != nil {
@@ -111,7 +111,7 @@ func TestBatchMessageIDWithAckTracker(t *testing.T) {
                decryptor:            crypto.NewNoopDecryptor(),
        }
        pc.ackGroupingTracker = 
newAckGroupingTracker(&AckGroupingOptions{MaxSize: 0},
-               func(id MessageID) { pc.sendIndividualAck(id) }, nil)
+               func(id MessageID) { pc.sendIndividualAck(id) }, nil, nil)
 
        headersAndPayload := internal.NewBufferWrapper(rawBatchMessage10)
        if err := pc.MessageReceived(nil, headersAndPayload); err != nil {
diff --git a/pulsar/message.go b/pulsar/message.go
index c44957d..98190e9 100644
--- a/pulsar/message.go
+++ b/pulsar/message.go
@@ -212,11 +212,3 @@ func messageIDCompare(lhs MessageID, rhs MessageID) int {
        }
        return 0
 }
-
-func messageIDHash(id MessageID) int64 {
-       return id.LedgerID() + 31*id.EntryID()
-}
-
-func messageIDIsBatch(id MessageID) bool {
-       return id.BatchIdx() >= 0 && id.BatchSize() > 0
-}

Reply via email to