This is an automated email from the ASF dual-hosted git repository. xyz pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/pulsar-client-go.git
The following commit(s) were added to refs/heads/master by this push: new 352c463 Optimize batch index ACK performance (#988) 352c463 is described below commit 352c463194916cab7772c0729c9cdac14404e8bd Author: Yunze Xu <xyzinfern...@163.com> AuthorDate: Fri Mar 10 09:57:38 2023 +0800 Optimize batch index ACK performance (#988) ### Motivation Currently, when `EnableBatchIndexAck` is true, the ACK performance is very poor. There are two main reasons: 1. Acknowledgment by list is not supported. It means that even N MessageIDs are grouped, there are still N ACK requests to send. 2. The implementation of ACK grouping tracker is wrong. Give a batch that has N messages, when batch index ACK is enabled, each MessageID is cached. However, after all these N MessageIDs arrived, the current implementation does not clear them. ### Modifications - Add a `func(id []*pb.MessageIdData)` to the ACK grouping tracker. When flushing individual ACKs, construct the slice and wrap the slice to `CommandAck` directly. - Refactor the implementation of the ACK grouping tracker: - Do not save each MessageID instance, instead, save the ledger id and the entry id as the key of `pendingAcks`. - Release the mutex before calling ACK functions - Add `TestTrackerPendingAcks` to verify the list of MessageIDs to ACK. After this change, the ACK order cannot be guaranteed, sort the acknowledged MessageIDs in the `ack_grouping_tracker_test.go`. --- pulsar/ack_grouping_tracker.go | 281 ++++++++++++++++++------------------ pulsar/ack_grouping_tracker_test.go | 54 +++++-- pulsar/consumer_partition.go | 13 +- pulsar/consumer_partition_test.go | 6 +- pulsar/message.go | 8 - 5 files changed, 196 insertions(+), 166 deletions(-) diff --git a/pulsar/ack_grouping_tracker.go b/pulsar/ack_grouping_tracker.go index 22aba11..dd05186 100644 --- a/pulsar/ack_grouping_tracker.go +++ b/pulsar/ack_grouping_tracker.go @@ -19,8 +19,10 @@ package pulsar import ( "sync" + "sync/atomic" "time" + pb "github.com/apache/pulsar-client-go/pulsar/internal/pulsar_proto" "github.com/bits-and-blooms/bitset" ) @@ -40,7 +42,8 @@ type ackGroupingTracker interface { func newAckGroupingTracker(options *AckGroupingOptions, ackIndividual func(id MessageID), - ackCumulative func(id MessageID)) ackGroupingTracker { + ackCumulative func(id MessageID), + ackList func(ids []*pb.MessageIdData)) ackGroupingTracker { if options == nil { options = &AckGroupingOptions{ MaxSize: 1000, @@ -56,38 +59,28 @@ func newAckGroupingTracker(options *AckGroupingOptions, } t := &timedAckGroupingTracker{ - singleAcks: make([]MessageID, options.MaxSize), - pendingAcks: make(map[int64]*bitset.BitSet), - lastCumulativeAck: EarliestMessageID(), - ackIndividual: ackIndividual, + maxNumAcks: int(options.MaxSize), ackCumulative: ackCumulative, - ackList: func(ids []MessageID) { - // TODO: support ack a list of MessageIDs - for _, id := range ids { - ackIndividual(id) - } - }, - options: *options, - tick: time.NewTicker(time.Hour), - donCh: make(chan struct{}), + ackList: ackList, + pendingAcks: make(map[[2]uint64]*bitset.BitSet), + lastCumulativeAck: EarliestMessageID(), } if options.MaxTime > 0 { - t.tick = time.NewTicker(options.MaxTime) - } else { - t.tick.Stop() + t.ticker = time.NewTicker(options.MaxTime) + t.exitCh = make(chan struct{}) + go func() { + for { + select { + case <-t.exitCh: + return + case <-t.ticker.C: + t.flush() + } + } + }() } - go func() { - for { - select { - case <-t.donCh: - return - case <-t.tick.C: - t.flush() - } - } - }() return t } @@ -117,157 +110,157 @@ func (i *immediateAckGroupingTracker) flushAndClean() { func (i *immediateAckGroupingTracker) close() { } -func (t *timedAckGroupingTracker) addAndCheckIfFull(id MessageID) bool { - t.mutex.Lock() - defer t.mutex.Unlock() - t.singleAcks[t.index] = id - t.index++ - key := messageIDHash(id) - ackSet, found := t.pendingAcks[key] - if !found { - if messageIDIsBatch(id) { - ackSet = bitset.New(uint(id.BatchSize())) - for i := 0; i < int(id.BatchSize()); i++ { - ackSet.Set(uint(i)) - } - t.pendingAcks[key] = ackSet - } else { - t.pendingAcks[key] = nil - } - } - if ackSet != nil { - ackSet.Clear(uint(id.BatchIdx())) - } - return t.index == len(t.singleAcks) -} - -func (t *timedAckGroupingTracker) tryUpdateLastCumulativeAck(id MessageID) { - t.mutex.Lock() - defer t.mutex.Unlock() - if messageIDCompare(t.lastCumulativeAck, id) < 0 { - t.lastCumulativeAck = id - t.cumulativeAckRequired = true - } -} - -func (t *timedAckGroupingTracker) flushIndividualAcks() { - t.mutex.Lock() - defer t.mutex.Unlock() - if t.index > 0 { - t.ackList(t.singleAcks[0:t.index]) - for _, id := range t.singleAcks[0:t.index] { - key := messageIDHash(id) - ackSet, found := t.pendingAcks[key] - if !found { - continue - } - if ackSet == nil { - delete(t.pendingAcks, key) - } else { - ackSet.Clear(uint(id.BatchIdx())) - if ackSet.None() { // all messages have been acknowledged - delete(t.pendingAcks, key) - } - } - delete(t.pendingAcks, messageIDHash(id)) - } - t.index = 0 - } -} - -func (t *timedAckGroupingTracker) flushCumulativeAck() { - t.mutex.Lock() - defer t.mutex.Unlock() - if t.cumulativeAckRequired { - t.ackCumulative(t.lastCumulativeAck) - t.cumulativeAckRequired = false - } -} - -func (t *timedAckGroupingTracker) clean() { - t.mutex.Lock() - defer t.mutex.Unlock() - maxSize := len(t.singleAcks) - t.singleAcks = make([]MessageID, maxSize) - t.index = 0 - t.pendingAcks = make(map[int64]*bitset.BitSet) - t.lastCumulativeAck = EarliestMessageID() - t.cumulativeAckRequired = false -} - type timedAckGroupingTracker struct { - singleAcks []MessageID - index int + sync.RWMutex - // Key is the hash code of the ledger id and the netry id, + maxNumAcks int + ackCumulative func(id MessageID) + ackList func(ids []*pb.MessageIdData) + ticker *time.Ticker + + // Key is the pair of the ledger id and the entry id, // Value is the bit set that represents which messages are acknowledged if the entry stores a batch. // The bit 1 represents the message has been acknowledged, i.e. the bits "111" represents all messages // in the batch whose batch size is 3 are not acknowledged. // After the 1st message (i.e. batch index is 0) is acknowledged, the bits will become "011". // Value is nil if the entry represents a single message. - pendingAcks map[int64]*bitset.BitSet + pendingAcks map[[2]uint64]*bitset.BitSet lastCumulativeAck MessageID - cumulativeAckRequired bool - - ackIndividual func(id MessageID) - ackCumulative func(id MessageID) - ackList func(ids []MessageID) - - options AckGroupingOptions - donCh chan struct{} - tick *time.Ticker + cumulativeAckRequired int32 - mutex sync.RWMutex + exitCh chan struct{} } func (t *timedAckGroupingTracker) add(id MessageID) { - if t.addAndCheckIfFull(id) { - t.flushIndividualAcks() - if t.options.MaxTime > 0 { - t.tick.Reset(t.options.MaxTime) + if acks := t.tryAddIndividual(id); acks != nil { + t.flushIndividual(acks) + } +} + +func (t *timedAckGroupingTracker) tryAddIndividual(id MessageID) map[[2]uint64]*bitset.BitSet { + t.Lock() + defer t.Unlock() + key := [2]uint64{uint64(id.LedgerID()), uint64(id.EntryID())} + + batchIdx := id.BatchIdx() + batchSize := id.BatchSize() + + if batchIdx >= 0 && batchSize > 0 { + bs, found := t.pendingAcks[key] + if !found { + if batchSize > 1 { + bs = bitset.New(uint(batchSize)) + for i := uint(0); i < uint(batchSize); i++ { + bs.Set(i) + } + } + t.pendingAcks[key] = bs } + if bs != nil { + bs.Clear(uint(batchIdx)) + } + } else { + t.pendingAcks[key] = nil } + + if len(t.pendingAcks) >= t.maxNumAcks { + pendingAcks := t.pendingAcks + t.pendingAcks = make(map[[2]uint64]*bitset.BitSet) + return pendingAcks + } + return nil } func (t *timedAckGroupingTracker) addCumulative(id MessageID) { - t.tryUpdateLastCumulativeAck(id) - if t.options.MaxTime <= 0 { - t.flushCumulativeAck() + if t.tryUpdateCumulative(id) && t.ticker == nil { + t.ackCumulative(id) } } +func (t *timedAckGroupingTracker) tryUpdateCumulative(id MessageID) bool { + t.Lock() + defer t.Unlock() + if messageIDCompare(t.lastCumulativeAck, id) < 0 { + t.lastCumulativeAck = id + atomic.StoreInt32(&t.cumulativeAckRequired, 1) + return true + } + return false +} + func (t *timedAckGroupingTracker) isDuplicate(id MessageID) bool { - t.mutex.RLock() + t.RLock() + defer t.RUnlock() if messageIDCompare(t.lastCumulativeAck, id) >= 0 { - t.mutex.RUnlock() return true } - ackSet, found := t.pendingAcks[messageIDHash(id)] - if !found { - t.mutex.RUnlock() - return false - } - t.mutex.RUnlock() - if ackSet == nil || !messageIDIsBatch(id) { - // NOTE: should we panic when ackSet != nil and messageIDIsBatch(id) is true? - return true + key := [2]uint64{uint64(id.LedgerID()), uint64(id.EntryID())} + if bs, found := t.pendingAcks[key]; found { + if bs == nil { + return true + } + if !bs.Test(uint(id.BatchIdx())) { + return true + } } - // 0 represents the message has been acknowledged - return !ackSet.Test(uint(id.BatchIdx())) + return false } func (t *timedAckGroupingTracker) flush() { - t.flushIndividualAcks() - t.flushCumulativeAck() + if acks := t.clearPendingAcks(); len(acks) > 0 { + t.flushIndividual(acks) + } + if atomic.CompareAndSwapInt32(&t.cumulativeAckRequired, 1, 0) { + t.RLock() + id := t.lastCumulativeAck + t.RUnlock() + t.ackCumulative(id) + } } func (t *timedAckGroupingTracker) flushAndClean() { - t.flush() - t.clean() + if acks := t.clearPendingAcks(); len(acks) > 0 { + t.flushIndividual(acks) + } + if atomic.CompareAndSwapInt32(&t.cumulativeAckRequired, 1, 0) { + t.Lock() + id := t.lastCumulativeAck + t.lastCumulativeAck = EarliestMessageID() + t.Unlock() + t.ackCumulative(id) + } +} + +func (t *timedAckGroupingTracker) clearPendingAcks() map[[2]uint64]*bitset.BitSet { + t.Lock() + defer t.Unlock() + pendingAcks := t.pendingAcks + t.pendingAcks = make(map[[2]uint64]*bitset.BitSet) + return pendingAcks } func (t *timedAckGroupingTracker) close() { t.flushAndClean() - close(t.donCh) + if t.exitCh != nil { + close(t.exitCh) + } +} + +func (t *timedAckGroupingTracker) flushIndividual(pendingAcks map[[2]uint64]*bitset.BitSet) { + msgIDs := make([]*pb.MessageIdData, 0, len(pendingAcks)) + for k, v := range pendingAcks { + ledgerID := k[0] + entryID := k[1] + msgID := &pb.MessageIdData{LedgerId: &ledgerID, EntryId: &entryID} + if v != nil && !v.None() { + bytes := v.Bytes() + msgID.AckSet = make([]int64, len(bytes)) + for i := 0; i < len(bytes); i++ { + msgID.AckSet[i] = int64(bytes[i]) + } + } + msgIDs = append(msgIDs, msgID) + } + t.ackList(msgIDs) } diff --git a/pulsar/ack_grouping_tracker_test.go b/pulsar/ack_grouping_tracker_test.go index 41d24d4..0a794f6 100644 --- a/pulsar/ack_grouping_tracker_test.go +++ b/pulsar/ack_grouping_tracker_test.go @@ -19,11 +19,13 @@ package pulsar import ( "fmt" + "sort" "sync" "sync/atomic" "testing" "time" + pb "github.com/apache/pulsar-client-go/pulsar/internal/pulsar_proto" "github.com/stretchr/testify/assert" ) @@ -45,7 +47,8 @@ func TestNoCacheTracker(t *testing.T) { ledgerID1 := int64(-1) tracker := newAckGroupingTracker(&option, func(id MessageID) { ledgerID0 = id.LedgerID() }, - func(id MessageID) { ledgerID1 = id.LedgerID() }) + func(id MessageID) { ledgerID1 = id.LedgerID() }, + nil) tracker.add(&messageID{ledgerID: 1}) assert.Equal(t, atomic.LoadInt64(&ledgerID0), int64(1)) @@ -61,10 +64,12 @@ type mockAcker struct { cumulativeLedgerID int64 } -func (a *mockAcker) ack(id MessageID) { +func (a *mockAcker) ack(ids []*pb.MessageIdData) { defer a.Unlock() a.Lock() - a.ledgerIDs = append(a.ledgerIDs, id.LedgerID()) + for _, id := range ids { + a.ledgerIDs = append(a.ledgerIDs, int64(*id.LedgerId)) + } } func (a *mockAcker) ackCumulative(id MessageID) { @@ -74,6 +79,8 @@ func (a *mockAcker) ackCumulative(id MessageID) { func (a *mockAcker) getLedgerIDs() []int64 { defer a.Unlock() a.Lock() + + sort.Slice(a.ledgerIDs, func(i, j int) bool { return a.ledgerIDs[i] < a.ledgerIDs[j] }) return a.ledgerIDs } @@ -88,8 +95,8 @@ func (a *mockAcker) reset() { func TestCachedTracker(t *testing.T) { var acker mockAcker - tracker := newAckGroupingTracker(&AckGroupingOptions{MaxSize: 3, MaxTime: 0}, - func(id MessageID) { acker.ack(id) }, func(id MessageID) { acker.ackCumulative(id) }) + tracker := newAckGroupingTracker(&AckGroupingOptions{MaxSize: 3, MaxTime: 0}, nil, + func(id MessageID) { acker.ackCumulative(id) }, func(ids []*pb.MessageIdData) { acker.ack(ids) }) tracker.add(&messageID{ledgerID: 1}) tracker.add(&messageID{ledgerID: 2}) @@ -126,7 +133,8 @@ func TestCachedTracker(t *testing.T) { func TestTimedTrackerIndividualAck(t *testing.T) { var acker mockAcker // MaxSize: 1000, MaxTime: 100ms - tracker := newAckGroupingTracker(nil, func(id MessageID) { acker.ack(id) }, nil) + tracker := newAckGroupingTracker(nil, nil, + func(id MessageID) { acker.ackCumulative(id) }, func(ids []*pb.MessageIdData) { acker.ack(ids) }) expected := make([]int64, 0) for i := 0; i < 999; i++ { @@ -161,7 +169,7 @@ func TestTimedTrackerIndividualAck(t *testing.T) { func TestTimedTrackerCumulativeAck(t *testing.T) { var acker mockAcker // MaxTime is 100ms - tracker := newAckGroupingTracker(nil, nil, func(id MessageID) { acker.ackCumulative(id) }) + tracker := newAckGroupingTracker(nil, nil, func(id MessageID) { acker.ackCumulative(id) }, nil) // case 1: flush because of the timeout tracker.addCumulative(&messageID{ledgerID: 1}) @@ -182,7 +190,8 @@ func TestTimedTrackerCumulativeAck(t *testing.T) { } func TestTimedTrackerIsDuplicate(t *testing.T) { - tracker := newAckGroupingTracker(nil, func(id MessageID) {}, func(id MessageID) {}) + tracker := newAckGroupingTracker(nil, func(id MessageID) {}, func(id MessageID) {}, + func(id []*pb.MessageIdData) {}) tracker.add(&messageID{batchIdx: 0, batchSize: 3}) tracker.add(&messageID{batchIdx: 2, batchSize: 3}) @@ -198,8 +207,8 @@ func TestTimedTrackerIsDuplicate(t *testing.T) { func TestDuplicateAfterClose(t *testing.T) { var acker mockAcker - tracker := newAckGroupingTracker(&AckGroupingOptions{MaxSize: 3, MaxTime: 0}, - func(id MessageID) { acker.ack(id) }, func(id MessageID) { acker.ackCumulative(id) }) + tracker := newAckGroupingTracker(&AckGroupingOptions{MaxSize: 3, MaxTime: 0}, nil, + func(id MessageID) { acker.ackCumulative(id) }, func(ids []*pb.MessageIdData) { acker.ack(ids) }) tracker.add(&messageID{ledgerID: 1}) assert.True(t, tracker.isDuplicate(&messageID{ledgerID: 1})) @@ -207,3 +216,28 @@ func TestDuplicateAfterClose(t *testing.T) { tracker.close() assert.False(t, tracker.isDuplicate(&messageID{ledgerID: 1})) } + +func TestTrackerPendingAcks(t *testing.T) { + m := make(map[uint64][]int64) + tracker := newAckGroupingTracker(&AckGroupingOptions{MaxSize: 3, MaxTime: 0}, nil, nil, + func(ids []*pb.MessageIdData) { + for _, id := range ids { + m[*id.LedgerId] = id.AckSet + } + }) + tracker.add(&messageID{ledgerID: 0, batchIdx: 0, batchSize: 30}) + for i := 0; i < 10; i++ { + tracker.add(&messageID{ledgerID: 1, batchIdx: int32(i), batchSize: 10}) + } + assert.Equal(t, 0, len(m)) // the number of entries is 2, so it's not flushed + tracker.flush() + assert.Equal(t, 2, len(m)) + + ackSet, found := m[0] + assert.True(t, found) + assert.Greater(t, len(ackSet), 0) + + ackSet, found = m[1] + assert.True(t, found) + assert.Equal(t, 0, len(ackSet)) // all messages in the batch are acknowledged +} diff --git a/pulsar/consumer_partition.go b/pulsar/consumer_partition.go index a3dac19..18100a9 100644 --- a/pulsar/consumer_partition.go +++ b/pulsar/consumer_partition.go @@ -310,7 +310,8 @@ func newPartitionConsumer(parent Consumer, client *client, options *partitionCon pc.unAckChunksTracker = newUnAckChunksTracker(pc) pc.ackGroupingTracker = newAckGroupingTracker(options.ackGroupingOptions, func(id MessageID) { pc.sendIndividualAck(id) }, - func(id MessageID) { pc.sendCumulativeAck(id) }) + func(id MessageID) { pc.sendCumulativeAck(id) }, + func(ids []*pb.MessageIdData) { pc.eventsCh <- ids }) pc.setConsumerState(consumerInit) pc.log = client.log.SubLogger(log.Fields{ "name": pc.name, @@ -837,6 +838,14 @@ func (pc *partitionConsumer) internalAck(req *ackRequest) { } } +func (pc *partitionConsumer) internalAckList(msgIDs []*pb.MessageIdData) { + pc.client.rpcClient.RequestOnCnxNoWait(pc._getConn(), pb.BaseCommand_ACK, &pb.CommandAck{ + AckType: pb.CommandAck_Individual.Enum(), + ConsumerId: proto.Uint64(pc.consumerID), + MessageId: msgIDs, + }) +} + func (pc *partitionConsumer) MessageReceived(response *pb.CommandMessage, headersAndPayload internal.Buffer) error { pbMsgID := response.GetMessageId() @@ -1364,6 +1373,8 @@ func (pc *partitionConsumer) runEventsLoop() { switch v := i.(type) { case *ackRequest: pc.internalAck(v) + case []*pb.MessageIdData: + pc.internalAckList(v) case *redeliveryRequest: pc.internalRedeliver(v) case *unsubscribeRequest: diff --git a/pulsar/consumer_partition_test.go b/pulsar/consumer_partition_test.go index 16c4399..21bc0e3 100644 --- a/pulsar/consumer_partition_test.go +++ b/pulsar/consumer_partition_test.go @@ -38,7 +38,7 @@ func TestSingleMessageIDNoAckTracker(t *testing.T) { decryptor: crypto.NewNoopDecryptor(), } pc.ackGroupingTracker = newAckGroupingTracker(&AckGroupingOptions{MaxSize: 0}, - func(id MessageID) { pc.sendIndividualAck(id) }, nil) + func(id MessageID) { pc.sendIndividualAck(id) }, nil, nil) headersAndPayload := internal.NewBufferWrapper(rawCompatSingleMessage) if err := pc.MessageReceived(nil, headersAndPayload); err != nil { @@ -76,7 +76,7 @@ func TestBatchMessageIDNoAckTracker(t *testing.T) { decryptor: crypto.NewNoopDecryptor(), } pc.ackGroupingTracker = newAckGroupingTracker(&AckGroupingOptions{MaxSize: 0}, - func(id MessageID) { pc.sendIndividualAck(id) }, nil) + func(id MessageID) { pc.sendIndividualAck(id) }, nil, nil) headersAndPayload := internal.NewBufferWrapper(rawBatchMessage1) if err := pc.MessageReceived(nil, headersAndPayload); err != nil { @@ -111,7 +111,7 @@ func TestBatchMessageIDWithAckTracker(t *testing.T) { decryptor: crypto.NewNoopDecryptor(), } pc.ackGroupingTracker = newAckGroupingTracker(&AckGroupingOptions{MaxSize: 0}, - func(id MessageID) { pc.sendIndividualAck(id) }, nil) + func(id MessageID) { pc.sendIndividualAck(id) }, nil, nil) headersAndPayload := internal.NewBufferWrapper(rawBatchMessage10) if err := pc.MessageReceived(nil, headersAndPayload); err != nil { diff --git a/pulsar/message.go b/pulsar/message.go index c44957d..98190e9 100644 --- a/pulsar/message.go +++ b/pulsar/message.go @@ -212,11 +212,3 @@ func messageIDCompare(lhs MessageID, rhs MessageID) int { } return 0 } - -func messageIDHash(id MessageID) int64 { - return id.LedgerID() + 31*id.EntryID() -} - -func messageIDIsBatch(id MessageID) bool { - return id.BatchIdx() >= 0 && id.BatchSize() > 0 -}