This is an automated email from the ASF dual-hosted git repository.
zixuan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar-client-go.git
The following commit(s) were added to refs/heads/master by this push:
new be553141 [improve] stop timer when close timedAckGroupingTracker
(#1279)
be553141 is described below
commit be553141c52d5088981452ea5444aa62135b52d3
Author: zhou zhuohan <[email protected]>
AuthorDate: Wed Sep 4 16:48:39 2024 +0800
[improve] stop timer when close timedAckGroupingTracker (#1279)
---
pulsar/ack_grouping_tracker.go | 5 +++-
pulsar/ack_grouping_tracker_test.go | 46 +++++++++++++++++++++++++++++++++++++
2 files changed, 50 insertions(+), 1 deletion(-)
diff --git a/pulsar/ack_grouping_tracker.go b/pulsar/ack_grouping_tracker.go
index dd051868..97d9e05a 100644
--- a/pulsar/ack_grouping_tracker.go
+++ b/pulsar/ack_grouping_tracker.go
@@ -120,7 +120,7 @@ type timedAckGroupingTracker struct {
// Key is the pair of the ledger id and the entry id,
// Value is the bit set that represents which messages are acknowledged
if the entry stores a batch.
- // The bit 1 represents the message has been acknowledged, i.e. the
bits "111" represents all messages
+ // The bit 1 represents the message has not been acknowledged, i.e. the
bits "111" represents all messages
// in the batch whose batch size is 3 are not acknowledged.
// After the 1st message (i.e. batch index is 0) is acknowledged, the
bits will become "011".
// Value is nil if the entry represents a single message.
@@ -241,6 +241,9 @@ func (t *timedAckGroupingTracker) clearPendingAcks()
map[[2]uint64]*bitset.BitSe
}
func (t *timedAckGroupingTracker) close() {
+ if t.ticker != nil {
+ t.ticker.Stop()
+ }
t.flushAndClean()
if t.exitCh != nil {
close(t.exitCh)
diff --git a/pulsar/ack_grouping_tracker_test.go
b/pulsar/ack_grouping_tracker_test.go
index 0a794f64..202d19d9 100644
--- a/pulsar/ack_grouping_tracker_test.go
+++ b/pulsar/ack_grouping_tracker_test.go
@@ -217,6 +217,52 @@ func TestDuplicateAfterClose(t *testing.T) {
assert.False(t, tracker.isDuplicate(&messageID{ledgerID: 1}))
}
+func TestCloseFlushWithoutTimer(t *testing.T) {
+ var acker mockAcker
+ tracker := newAckGroupingTracker(
+ &AckGroupingOptions{MaxSize: 3, MaxTime: 0},
+ nil,
+ func(id MessageID) { acker.ackCumulative(id) },
+ func(ids []*pb.MessageIdData) { acker.ack(ids) },
+ )
+
+ // case 1: message will not be acked because the cache is not full
+ tracker.add(&messageID{ledgerID: 1})
+ tracker.add(&messageID{ledgerID: 2})
+ assert.True(t, tracker.isDuplicate(&messageID{ledgerID: int64(1)}))
+ assert.True(t, tracker.isDuplicate(&messageID{ledgerID: int64(2)}))
+ assert.Equal(t, 0, len(acker.getLedgerIDs()))
+
+ // case 2: tracker close so that all messages are flushed and acked
+ tracker.close()
+ assert.False(t, tracker.isDuplicate(&messageID{ledgerID: 1}))
+ assert.False(t, tracker.isDuplicate(&messageID{ledgerID: 2}))
+ assert.Equal(t, []int64{1, 2}, acker.getLedgerIDs())
+}
+
+func TestCloseFlushWithTimer(t *testing.T) {
+ var acker mockAcker
+ tracker := newAckGroupingTracker(
+ &AckGroupingOptions{MaxSize: 1000, MaxTime: 10 * 1000},
+ nil,
+ func(id MessageID) { acker.ackCumulative(id) },
+ func(ids []*pb.MessageIdData) { acker.ack(ids) },
+ )
+
+ // case 1: messages are not acked because the cache is not full
+ tracker.add(&messageID{ledgerID: 1})
+ tracker.add(&messageID{ledgerID: 2})
+ assert.True(t, tracker.isDuplicate(&messageID{ledgerID: int64(1)}))
+ assert.True(t, tracker.isDuplicate(&messageID{ledgerID: int64(2)}))
+ assert.Equal(t, 0, len(acker.getLedgerIDs()))
+
+ // case 2: tracker close so that all messages are flushed and acked
+ tracker.close()
+ assert.False(t, tracker.isDuplicate(&messageID{ledgerID: 1}))
+ assert.False(t, tracker.isDuplicate(&messageID{ledgerID: 2}))
+ assert.Equal(t, []int64{1, 2}, acker.getLedgerIDs())
+}
+
func TestTrackerPendingAcks(t *testing.T) {
m := make(map[uint64][]int64)
tracker := newAckGroupingTracker(&AckGroupingOptions{MaxSize: 3,
MaxTime: 0}, nil, nil,