This is an automated email from the ASF dual-hosted git repository.

zixuan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar-client-go.git


The following commit(s) were added to refs/heads/master by this push:
     new be553141 [improve] stop timer when close timedAckGroupingTracker 
(#1279)
be553141 is described below

commit be553141c52d5088981452ea5444aa62135b52d3
Author: zhou zhuohan <[email protected]>
AuthorDate: Wed Sep 4 16:48:39 2024 +0800

    [improve] stop timer when close timedAckGroupingTracker (#1279)
---
 pulsar/ack_grouping_tracker.go      |  5 +++-
 pulsar/ack_grouping_tracker_test.go | 46 +++++++++++++++++++++++++++++++++++++
 2 files changed, 50 insertions(+), 1 deletion(-)

diff --git a/pulsar/ack_grouping_tracker.go b/pulsar/ack_grouping_tracker.go
index dd051868..97d9e05a 100644
--- a/pulsar/ack_grouping_tracker.go
+++ b/pulsar/ack_grouping_tracker.go
@@ -120,7 +120,7 @@ type timedAckGroupingTracker struct {
 
        // Key is the pair of the ledger id and the entry id,
        // Value is the bit set that represents which messages are acknowledged 
if the entry stores a batch.
-       // The bit 1 represents the message has been acknowledged, i.e. the 
bits "111" represents all messages
+       // The bit 1 represents the message has not been acknowledged, i.e. the 
bits "111" represents all messages
        // in the batch whose batch size is 3 are not acknowledged.
        // After the 1st message (i.e. batch index is 0) is acknowledged, the 
bits will become "011".
        // Value is nil if the entry represents a single message.
@@ -241,6 +241,9 @@ func (t *timedAckGroupingTracker) clearPendingAcks() 
map[[2]uint64]*bitset.BitSe
 }
 
 func (t *timedAckGroupingTracker) close() {
+       if t.ticker != nil {
+               t.ticker.Stop()
+       }
        t.flushAndClean()
        if t.exitCh != nil {
                close(t.exitCh)
diff --git a/pulsar/ack_grouping_tracker_test.go 
b/pulsar/ack_grouping_tracker_test.go
index 0a794f64..202d19d9 100644
--- a/pulsar/ack_grouping_tracker_test.go
+++ b/pulsar/ack_grouping_tracker_test.go
@@ -217,6 +217,52 @@ func TestDuplicateAfterClose(t *testing.T) {
        assert.False(t, tracker.isDuplicate(&messageID{ledgerID: 1}))
 }
 
+func TestCloseFlushWithoutTimer(t *testing.T) {
+       var acker mockAcker
+       tracker := newAckGroupingTracker(
+               &AckGroupingOptions{MaxSize: 3, MaxTime: 0},
+               nil,
+               func(id MessageID) { acker.ackCumulative(id) },
+               func(ids []*pb.MessageIdData) { acker.ack(ids) },
+       )
+
+       // case 1: message will not be acked because the cache is not full
+       tracker.add(&messageID{ledgerID: 1})
+       tracker.add(&messageID{ledgerID: 2})
+       assert.True(t, tracker.isDuplicate(&messageID{ledgerID: int64(1)}))
+       assert.True(t, tracker.isDuplicate(&messageID{ledgerID: int64(2)}))
+       assert.Equal(t, 0, len(acker.getLedgerIDs()))
+
+       // case 2: tracker close so that all messages are flushed and acked
+       tracker.close()
+       assert.False(t, tracker.isDuplicate(&messageID{ledgerID: 1}))
+       assert.False(t, tracker.isDuplicate(&messageID{ledgerID: 2}))
+       assert.Equal(t, []int64{1, 2}, acker.getLedgerIDs())
+}
+
+func TestCloseFlushWithTimer(t *testing.T) {
+       var acker mockAcker
+       tracker := newAckGroupingTracker(
+               &AckGroupingOptions{MaxSize: 1000, MaxTime: 10 * 1000},
+               nil,
+               func(id MessageID) { acker.ackCumulative(id) },
+               func(ids []*pb.MessageIdData) { acker.ack(ids) },
+       )
+
+       // case 1: messages are not acked because the cache is not full
+       tracker.add(&messageID{ledgerID: 1})
+       tracker.add(&messageID{ledgerID: 2})
+       assert.True(t, tracker.isDuplicate(&messageID{ledgerID: int64(1)}))
+       assert.True(t, tracker.isDuplicate(&messageID{ledgerID: int64(2)}))
+       assert.Equal(t, 0, len(acker.getLedgerIDs()))
+
+       // case 2: tracker close so that all messages are flushed and acked
+       tracker.close()
+       assert.False(t, tracker.isDuplicate(&messageID{ledgerID: 1}))
+       assert.False(t, tracker.isDuplicate(&messageID{ledgerID: 2}))
+       assert.Equal(t, []int64{1, 2}, acker.getLedgerIDs())
+}
+
 func TestTrackerPendingAcks(t *testing.T) {
        m := make(map[uint64][]int64)
        tracker := newAckGroupingTracker(&AckGroupingOptions{MaxSize: 3, 
MaxTime: 0}, nil, nil,

Reply via email to