This is an automated email from the ASF dual-hosted git repository.
thetumbled pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar-client-go.git
The following commit(s) were added to refs/heads/master by this push:
new d99d2c06 [improve] modify the negativeACK structure to reduce memory
overhead (#1410)
d99d2c06 is described below
commit d99d2c06cf591a3b47db9d890160d3a5a6689b32
Author: Guangyang Deng <[email protected]>
AuthorDate: Sat Oct 18 15:32:50 2025 +0800
[improve] modify the negativeACK structure to reduce memory overhead (#1410)
* improve: modify the structure of NegativeAckTracker to reduce memory
overhead
* fix TestNegativeAckPrecisionBitCnt bug
* chore: formatting issues
* fix: fix issues with TestConsumerNack
* chore: revert changes to TestConsumerNack
* fix: add defaultNackPrecisionBitVal
* fix: revert changes to TestZeroQueueConsumer_Nack
* add TestNackPrecisionBitDefaultBehavior
* chore: update TestNegativeAckPrecisionBitCnt test
* fix: make TestNegativeAckPrecisionBitCnt more stable
---
go.mod | 5 +-
go.sum | 11 ++-
pulsar/consumer.go | 5 ++
pulsar/consumer_impl.go | 7 ++
pulsar/consumer_partition.go | 4 +-
pulsar/consumer_test.go | 96 +++++++++++++++++++++++++
pulsar/negative_acks_tracker.go | 136 ++++++++++++++++++++++++-----------
pulsar/negative_acks_tracker_test.go | 9 ++-
8 files changed, 224 insertions(+), 49 deletions(-)
diff --git a/go.mod b/go.mod
index 6c388209..0e9fb7d8 100644
--- a/go.mod
+++ b/go.mod
@@ -6,9 +6,11 @@ require (
github.com/99designs/keyring v1.2.1
github.com/AthenZ/athenz v1.12.13
github.com/DataDog/zstd v1.5.0
- github.com/bits-and-blooms/bitset v1.4.0
+ github.com/RoaringBitmap/roaring/v2 v2.8.0
+ github.com/bits-and-blooms/bitset v1.12.0
github.com/bmizerany/perks v0.0.0-20141205001514-d9a9656a3a4b
github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc
+ github.com/emirpasic/gods v1.18.1
github.com/docker/docker v28.0.0+incompatible
github.com/docker/go-connections v0.5.0
github.com/golang-jwt/jwt/v5 v5.2.2
@@ -78,6 +80,7 @@ require (
github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd //
indirect
github.com/modern-go/reflect2 v1.0.2 // indirect
github.com/morikuni/aec v1.0.0 // indirect
+ github.com/mschoch/smat v0.2.0 // indirect
github.com/mtibben/percent v0.2.1 // indirect
github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822 //
indirect
github.com/nxadm/tail v1.4.8 // indirect
diff --git a/go.sum b/go.sum
index c7183334..44ec10eb 100644
--- a/go.sum
+++ b/go.sum
@@ -14,12 +14,14 @@ github.com/DataDog/zstd v1.5.0
h1:+K/VEwIAaPcHiMtQvpLD4lqW7f0Gk3xdYZmI1hD+CXo=
github.com/DataDog/zstd v1.5.0/go.mod
h1:g4AWEaM3yOg3HYfnJ3YIawPnVdXJh9QME85blwSAmyw=
github.com/Microsoft/go-winio v0.6.2
h1:F2VQgta7ecxGYO8k3ZZz3RS8fVIXVxONVUPlNERoyfY=
github.com/Microsoft/go-winio v0.6.2/go.mod
h1:yd8OoFMLzJbo9gZq8j5qaps8bJ9aShtEA8Ipt1oGCvU=
+github.com/RoaringBitmap/roaring/v2 v2.8.0
h1:y1rdtixfXvaITKzkfiKvScI0hlBJHe9sfzJp8cgeM7w=
+github.com/RoaringBitmap/roaring/v2 v2.8.0/go.mod
h1:FiJcsfkGje/nZBZgCu0ZxCPOKD/hVXDS2dXi7/eUFE0=
github.com/ardielle/ardielle-go v1.5.2
h1:TilHTpHIQJ27R1Tl/iITBzMwiUGSlVfiVhwDNGM3Zj4=
github.com/ardielle/ardielle-go v1.5.2/go.mod
h1:I4hy1n795cUhaVt/ojz83SNVCYIGsAFAONtv2Dr7HUI=
github.com/beorn7/perks v1.0.1 h1:VlbKKnNfV8bJzeqoa4cOKqO6bYr3WgKZxO8Z16+hsOM=
github.com/beorn7/perks v1.0.1/go.mod
h1:G2ZrVWU2WbWT9wwq4/hrbKbnv/1ERSJQ0ibhJ6rlkpw=
-github.com/bits-and-blooms/bitset v1.4.0
h1:+YZ8ePm+He2pU3dZlIZiOeAKfrBkXi1lSrXJ/Xzgbu8=
-github.com/bits-and-blooms/bitset v1.4.0/go.mod
h1:gIdJ4wp64HaoK2YrL1Q5/N7Y16edYb8uY+O0FJTyyDA=
+github.com/bits-and-blooms/bitset v1.12.0
h1:U/q1fAF7xXRhFCrhROzIfffYnu+dlS38vCZtmFVPHmA=
+github.com/bits-and-blooms/bitset v1.12.0/go.mod
h1:7hO7Gc7Pp1vODcmWvKMRA9BNmbv6a/7QIWpPxHddWR8=
github.com/bmizerany/perks v0.0.0-20141205001514-d9a9656a3a4b
h1:AP/Y7sqYicnjGDfD5VcY4CIfh1hRXBUavxrvELjTiOE=
github.com/bmizerany/perks v0.0.0-20141205001514-d9a9656a3a4b/go.mod
h1:ac9efd0D1fsDb3EJvhqgXRbFx7bs2wqZ10HQPeU8U/Q=
github.com/cenkalti/backoff/v4 v4.2.1
h1:y4OZtCnogmCPw98Zjyt5a6+QwPLGkiQsYW5oUqylYbM=
@@ -53,6 +55,8 @@ github.com/docker/go-units v0.5.0
h1:69rxXcBk27SvSaaxTtLh/8llcHD8vYHT7WSdRZ/jvr4
github.com/docker/go-units v0.5.0/go.mod
h1:fgPhTUdO+D/Jk86RDLlptpiXQzgHJF7gydDDbaIK4Dk=
github.com/dvsekhvalnov/jose2go v1.6.0
h1:Y9gnSnP4qEI0+/uQkHvFXeD2PLPJeXEL+ySMEA2EjTY=
github.com/dvsekhvalnov/jose2go v1.6.0/go.mod
h1:QsHjhyTlD/lAVqn/NSbVZmSCGeDehTB/mPZadG+mhXU=
+github.com/emirpasic/gods v1.18.1
h1:FXtiHYKDGKCW2KzwZKx0iC0PQmdlorYgdFG9jPXJ1Bc=
+github.com/emirpasic/gods v1.18.1/go.mod
h1:8tpGGwCnJ5H4r6BWwaV6OrWmMoPhUl5jm/FMNAnJvWQ=
github.com/felixge/httpsnoop v1.0.4
h1:NFTV2Zj1bL4mc9sqWACXbQFVBBg2W3GPvqp8/ESS2Wg=
github.com/felixge/httpsnoop v1.0.4/go.mod
h1:m8KPJKqk1gH5J9DgRY2ASl2lWCfGKXixSwevea8zH2U=
github.com/fsnotify/fsnotify v1.4.7/go.mod
h1:jwhsz4b93w/PPRr/qN1Yymfu8t87LnFCMoQvtojpjFo=
@@ -154,6 +158,8 @@ github.com/modern-go/reflect2 v1.0.2
h1:xBagoLtFs94CBntxluKeaWgTMpvLxC4ur3nMaC9G
github.com/modern-go/reflect2 v1.0.2/go.mod
h1:yWuevngMOJpCy52FWWMvUC8ws7m/LJsjYzDa0/r8luk=
github.com/morikuni/aec v1.0.0 h1:nP9CBfwrvYnBRgY6qfDQkygYDmYwOilePFkwzv4dU8A=
github.com/morikuni/aec v1.0.0/go.mod
h1:BbKIizmSmc5MMPqRYbxO4ZU0S0+P200+tUnFx7PXmsc=
+github.com/mschoch/smat v0.2.0 h1:8imxQsjDm8yFEAVBe7azKmKSgzSkZXDuKkSq9374khM=
+github.com/mschoch/smat v0.2.0/go.mod
h1:kc9mz7DoBKqDyiRL7VZN8KvXQMWeTaVnttLRXOlotKw=
github.com/mtibben/percent v0.2.1
h1:5gssi8Nqo8QU/r2pynCm+hBQHpkB/uNK7BJCFogWdzs=
github.com/mtibben/percent v0.2.1/go.mod
h1:KG9uO+SZkUp+VkRHsCdYQV3XSZrrSpR3O9ibNBTZrns=
github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822
h1:C3w9PqII01/Oq1c1nUAm88MOHcQC9l5mIlSMApZMrHA=
@@ -346,6 +352,7 @@ gopkg.in/yaml.v2 v2.2.2/go.mod
h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI=
gopkg.in/yaml.v2 v2.2.4/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI=
gopkg.in/yaml.v2 v2.3.0/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI=
gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod
h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
+gopkg.in/yaml.v3 v3.0.0/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA=
gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
gotest.tools/v3 v3.5.1 h1:EENdUnS3pdur5nybKYIh2Vfgc8IUNBjxDPSjtiJcOzU=
diff --git a/pulsar/consumer.go b/pulsar/consumer.go
index 80bbf01c..7996335a 100644
--- a/pulsar/consumer.go
+++ b/pulsar/consumer.go
@@ -191,6 +191,11 @@ type ConsumerOptions struct {
// processed. Default is 1 min. (See `Consumer.Nack()`)
NackRedeliveryDelay time.Duration
+ // NackPrecisionBit specifies the precision bit for nack redelivery
delay.
+ // This is used to trim the lower bits of the nack redelivery delay to
reduce memory usage.
+ // Default is 8 bits.
+ NackPrecisionBit *int64
+
// Name specifies the consumer name.
Name string
diff --git a/pulsar/consumer_impl.go b/pulsar/consumer_impl.go
index 05db0726..08c825e3 100644
--- a/pulsar/consumer_impl.go
+++ b/pulsar/consumer_impl.go
@@ -118,6 +118,12 @@ func newConsumer(client *client, options ConsumerOptions)
(Consumer, error) {
options.NackBackoffPolicy = new(defaultNackBackoffPolicy)
}
+ if options.NackPrecisionBit == nil {
+ options.NackPrecisionBit = ptr(defaultNackPrecisionBit)
+ } else if *options.NackPrecisionBit < 0 {
+ return nil, newError(InvalidConfiguration, "NackPrecisionBit
cannot be negative")
+ }
+
// did the user pass in a message channel?
messageCh := options.MessageChannel
if options.MessageChannel == nil {
@@ -452,6 +458,7 @@ func newPartitionConsumerOpts(topic, consumerName string,
idx int, options Consu
receiverQueueSize: options.ReceiverQueueSize,
nackRedeliveryDelay: nackRedeliveryDelay,
nackBackoffPolicy: options.NackBackoffPolicy,
+ nackPrecisionBit: options.NackPrecisionBit,
metadata: options.Properties,
subProperties: options.SubscriptionProperties,
replicateSubscriptionState: options.ReplicateSubscriptionState,
diff --git a/pulsar/consumer_partition.go b/pulsar/consumer_partition.go
index fcde4f3a..bfd44b18 100644
--- a/pulsar/consumer_partition.go
+++ b/pulsar/consumer_partition.go
@@ -104,6 +104,7 @@ type partitionConsumerOpts struct {
autoReceiverQueueSize bool
nackRedeliveryDelay time.Duration
nackBackoffPolicy NackBackoffPolicy
+ nackPrecisionBit *int64
metadata map[string]string
subProperties map[string]string
replicateSubscriptionState bool
@@ -424,7 +425,8 @@ func newPartitionConsumer(parent Consumer, client *client,
options *partitionCon
pc.decryptor = decryptor
- pc.nackTracker = newNegativeAcksTracker(pc,
options.nackRedeliveryDelay, options.nackBackoffPolicy, pc.log)
+ pc.nackTracker = newNegativeAcksTracker(pc,
options.nackRedeliveryDelay, options.nackBackoffPolicy, pc.log,
+ options.nackPrecisionBit)
err := pc.grabConn("")
if err != nil {
diff --git a/pulsar/consumer_test.go b/pulsar/consumer_test.go
index c36f1f59..9f20ed84 100644
--- a/pulsar/consumer_test.go
+++ b/pulsar/consumer_test.go
@@ -1225,6 +1225,102 @@ func TestConsumerNack(t *testing.T) {
}
}
+func TestNegativeAckPrecisionBitCnt(t *testing.T) {
+ // Validate behavior across precision bits and default (nil -> 8)
+ const delay = 300 * time.Millisecond // Tracker scans every 100ms
(delay/3)
+ ctx := context.Background()
+
+ client, err := NewClient(ClientOptions{URL: lookupURL})
+ assert.Nil(t, err)
+ defer client.Close()
+
+ // Helper to verify behavior for a given NackPrecisionBit and boundary
bits.
+ testPrecisionBitBehavior := func(nackPrecisionBit *int64, boundaryBits
int64) {
+ // Create topic, consumer and producer inside the function
+ topicName := fmt.Sprintf("testNackPrecisionBit-%d-%d",
boundaryBits, time.Now().UnixNano())
+ consumer, err := client.Subscribe(ConsumerOptions{
+ Topic: topicName,
+ SubscriptionName: fmt.Sprintf("sub-%d",
boundaryBits),
+ Type: Shared,
+ NackRedeliveryDelay: delay,
+ NackPrecisionBit: nackPrecisionBit, // can be nil
for default behavior
+ })
+ assert.Nil(t, err)
+ defer consumer.Close()
+
+ producer, err := client.CreateProducer(ProducerOptions{Topic:
topicName})
+ assert.Nil(t, err)
+ defer producer.Close()
+
+ // Align to the next window boundary based on boundaryBits
+ windowMs := int64(1) << boundaryBits
+ nowMs := time.Now().UnixMilli()
+ nextBoundaryMs := ((nowMs / windowMs) + 1) * windowMs // Next
boundary
+ time.Sleep(time.Duration(nextBoundaryMs-nowMs) *
time.Millisecond)
+
+ // Send first message at the boundary
+ content1 := fmt.Sprintf("msg1-p%d", boundaryBits)
+ _, err = producer.Send(ctx, &ProducerMessage{Payload:
[]byte(content1)})
+ assert.Nil(t, err)
+
+ // Send second message around 3/4 into the window (still in
same window)
+ time.Sleep(time.Duration(windowMs*3/4) * time.Millisecond)
+ content2 := fmt.Sprintf("msg2-p%d", boundaryBits)
+ _, err = producer.Send(ctx, &ProducerMessage{Payload:
[]byte(content2)})
+ assert.Nil(t, err)
+
+ // Receive and nack both messages
+ m1, err := consumer.Receive(ctx)
+ assert.Nil(t, err)
+ assert.Equal(t, content1, string(m1.Payload()))
+ consumer.Nack(m1)
+ m2, err := consumer.Receive(ctx)
+ assert.Nil(t, err)
+ assert.Equal(t, content2, string(m2.Payload()))
+ consumer.Nack(m2)
+
+ // Expected redelivery window considering precision and tracker
tick
+ expected := time.Now().Add(delay)
+ deviation := time.Duration(windowMs) * time.Millisecond
+
+ // Both should be redelivered in the same cycle
+ rm1, err := consumer.Receive(ctx)
+ assert.Nil(t, err)
+ redeliveryTime1 := time.Now()
+ rm2, err := consumer.Receive(ctx)
+ assert.Nil(t, err)
+ redeliveryTime2 := time.Now()
+
+ // For both the default precision (nil) and precisionBit=8,
boundaryBits is 8.
+ // This checks that the default precisionBit is correctly set
to 8,
+ // and that its redelivery behavior matches a consumer
explicitly configured with precisionBit=8.
+ if boundaryBits == 8 {
+ assert.InDelta(t, redeliveryTime1.UnixMilli(),
redeliveryTime2.UnixMilli(), 1)
+ }
+
+ // Redelivery should occur within [expected-window,
expected+buffer]
+ minExpected := expected.Add(-deviation)
+ maxExpected := expected.Add(150 * time.Millisecond)
+ assert.GreaterOrEqual(t, redeliveryTime1.UnixMilli(),
minExpected.UnixMilli())
+ assert.LessOrEqual(t, redeliveryTime2.UnixMilli(),
maxExpected.UnixMilli())
+
+ consumer.Ack(rm1)
+ consumer.Ack(rm2)
+ }
+
+ // Run for precision bits 1...8 with matching boundary bits
+ for bits := int64(1); bits <= int64(8); bits++ {
+ t.Run(fmt.Sprintf("PrecisionBits=%d", bits), func(_ *testing.T)
{
+ testPrecisionBitBehavior(ptr(bits), bits)
+ })
+ }
+
+ // Default behavior (nil) should match precision bit 8
+ t.Run("DefaultPrecisionBits=8", func(_ *testing.T) {
+ testPrecisionBitBehavior(nil, int64(8))
+ })
+}
+
func TestConsumerCompression(t *testing.T) {
client, err := NewClient(ClientOptions{
URL: lookupURL,
diff --git a/pulsar/negative_acks_tracker.go b/pulsar/negative_acks_tracker.go
index 1331c7df..ac9f08d7 100644
--- a/pulsar/negative_acks_tracker.go
+++ b/pulsar/negative_acks_tracker.go
@@ -21,35 +21,57 @@ import (
"sync"
"time"
+ "github.com/RoaringBitmap/roaring/v2/roaring64"
log "github.com/apache/pulsar-client-go/pulsar/log"
+ "github.com/emirpasic/gods/trees/avltree"
)
type redeliveryConsumer interface {
Redeliver(msgIDs []messageID)
}
+type ledgerID = int64
+
type negativeAcksTracker struct {
sync.Mutex
- doneCh chan interface{}
- doneOnce sync.Once
- negativeAcks map[messageID]time.Time
- rc redeliveryConsumer
- nackBackoff NackBackoffPolicy
- tick *time.Ticker
- delay time.Duration
- log log.Logger
+ doneCh chan interface{}
+ doneOnce sync.Once
+ negativeAcks *avltree.Tree
+ nackPrecisionBit *int64
+ rc redeliveryConsumer
+ nackBackoff NackBackoffPolicy
+ tick *time.Ticker
+ delay time.Duration
+ log log.Logger
}
func newNegativeAcksTracker(rc redeliveryConsumer, delay time.Duration,
- nackBackoffPolicy NackBackoffPolicy, logger log.Logger)
*negativeAcksTracker {
+ nackBackoffPolicy NackBackoffPolicy, logger log.Logger,
nackPrecisionBit *int64) *negativeAcksTracker {
t := &negativeAcksTracker{
- doneCh: make(chan interface{}),
- negativeAcks: make(map[messageID]time.Time),
- rc: rc,
- nackBackoff: nackBackoffPolicy,
- log: logger,
+ doneCh: make(chan interface{}),
+ negativeAcks: avltree.NewWith(func(a, b interface{}) int {
+ // Perform type assertions and handle invalid types.
+ timeA, okA := a.(time.Time)
+ timeB, okB := b.(time.Time)
+
+ if !okA || !okB {
+ panic("invalid type: both values must be of
type time.Time")
+ }
+
+ // Compare the two time.Time values.
+ if timeA.Before(timeB) {
+ return -1
+ } else if timeA.After(timeB) {
+ return 1
+ }
+ return 0 // Equal times.
+ }),
+ rc: rc,
+ nackBackoff: nackBackoffPolicy,
+ log: logger,
+ nackPrecisionBit: nackPrecisionBit,
}
if nackBackoffPolicy != nil {
@@ -65,6 +87,37 @@ func newNegativeAcksTracker(rc redeliveryConsumer, delay
time.Duration,
return t
}
+func trimLowerBit(ts int64, precisionBit int64) int64 {
+ if precisionBit <= 0 {
+ return ts
+ }
+ mask := ^((int64(1) << precisionBit) - 1)
+ return ts & mask
+}
+
+func putNackEntry(t *negativeAcksTracker, batchMsgID *messageID, delay
time.Duration) {
+ t.Lock()
+ defer t.Unlock()
+
+ targetTime := time.Now().Add(delay)
+ trimmedTime := time.UnixMilli(trimLowerBit(targetTime.UnixMilli(),
*t.nackPrecisionBit))
+ // try get trimmedTime
+ value, exists := t.negativeAcks.Get(trimmedTime)
+ if !exists {
+ newMap := make(map[ledgerID]*roaring64.Bitmap)
+ t.negativeAcks.Put(trimmedTime, newMap)
+ value = newMap
+ }
+ bitmapMap, ok := value.(map[ledgerID]*roaring64.Bitmap)
+ if !ok {
+ panic("negativeAcksTracker: value is not of expected type
map[LedgerID]*roaring64.Bitmap")
+ }
+ if _, exists := bitmapMap[batchMsgID.ledgerID]; !exists {
+ bitmapMap[batchMsgID.ledgerID] = roaring64.NewBitmap()
+ }
+ bitmapMap[batchMsgID.ledgerID].Add(uint64(batchMsgID.entryID))
+}
+
func (t *negativeAcksTracker) Add(msgID *messageID) {
// Always clear up the batch index since we want to track the nack
// for the entire batch
@@ -74,17 +127,7 @@ func (t *negativeAcksTracker) Add(msgID *messageID) {
batchIdx: 0,
}
- t.Lock()
- defer t.Unlock()
-
- _, present := t.negativeAcks[batchMsgID]
- if present {
- // The batch is already being tracked
- return
- }
-
- targetTime := time.Now().Add(t.delay)
- t.negativeAcks[batchMsgID] = targetTime
+ putNackEntry(t, &batchMsgID, t.delay)
}
func (t *negativeAcksTracker) AddMessage(msg Message) {
@@ -100,17 +143,7 @@ func (t *negativeAcksTracker) AddMessage(msg Message) {
batchIdx: 0,
}
- t.Lock()
- defer t.Unlock()
-
- _, present := t.negativeAcks[batchMsgID]
- if present {
- // The batch is already being tracked
- return
- }
-
- targetTime := time.Now().Add(nackBackoffDelay)
- t.negativeAcks[batchMsgID] = targetTime
+ putNackEntry(t, &batchMsgID, nackBackoffDelay)
}
func (t *negativeAcksTracker) track() {
@@ -127,13 +160,28 @@ func (t *negativeAcksTracker) track() {
t.Lock()
- for msgID, targetTime := range t.negativeAcks {
- t.log.Debugf("MsgId: %v -- targetTime:
%v -- now: %v", msgID, targetTime, now)
- if targetTime.Before(now) {
- t.log.Debugf("Adding MsgId:
%v", msgID)
- msgIDs = append(msgIDs, msgID)
- delete(t.negativeAcks, msgID)
+ iterator := t.negativeAcks.Iterator()
+ for iterator.Next() {
+ targetTime := iterator.Key().(time.Time)
+ // because use ordered map, so we can
early break
+ if targetTime.After(now) {
+ break
+ }
+
+ ledgerMap :=
iterator.Value().(map[ledgerID]*roaring64.Bitmap)
+ for ledgerID, entrySet := range
ledgerMap {
+ for _, entryID := range
entrySet.ToArray() {
+ msgID := messageID{
+ ledgerID:
ledgerID,
+ entryID:
int64(entryID),
+ batchIdx: 0,
+ }
+ msgIDs = append(msgIDs,
msgID)
+ }
}
+
+ // Safe deletion during iteration
+ t.negativeAcks.Remove(targetTime)
}
t.Unlock()
@@ -153,3 +201,7 @@ func (t *negativeAcksTracker) Close() {
t.doneCh <- nil
})
}
+
+func ptr[T any](v T) *T { return &v }
+
+const defaultNackPrecisionBit = int64(8)
diff --git a/pulsar/negative_acks_tracker_test.go
b/pulsar/negative_acks_tracker_test.go
index 3f03ac44..bf395346 100644
--- a/pulsar/negative_acks_tracker_test.go
+++ b/pulsar/negative_acks_tracker_test.go
@@ -29,6 +29,8 @@ import (
const testNackDelay = 300 * time.Millisecond
+var testNackPrecisionBit = ptr(defaultNackPrecisionBit)
+
type nackMockedConsumer struct {
ch chan messageID
closed bool
@@ -80,7 +82,7 @@ func (nmc *nackMockedConsumer) Wait() <-chan messageID {
func TestNacksTracker(t *testing.T) {
nmc := newNackMockedConsumer(nil)
- nacks := newNegativeAcksTracker(nmc, testNackDelay, nil,
log.DefaultNopLogger())
+ nacks := newNegativeAcksTracker(nmc, testNackDelay, nil,
log.DefaultNopLogger(), testNackPrecisionBit)
nacks.Add(&messageID{
ledgerID: 1,
@@ -113,7 +115,7 @@ func TestNacksTracker(t *testing.T) {
func TestNacksWithBatchesTracker(t *testing.T) {
nmc := newNackMockedConsumer(nil)
- nacks := newNegativeAcksTracker(nmc, testNackDelay, nil,
log.DefaultNopLogger())
+ nacks := newNegativeAcksTracker(nmc, testNackDelay, nil,
log.DefaultNopLogger(), testNackPrecisionBit)
nacks.Add(&messageID{
ledgerID: 1,
@@ -156,7 +158,8 @@ func TestNacksWithBatchesTracker(t *testing.T) {
func TestNackBackoffTracker(t *testing.T) {
nmc := newNackMockedConsumer(new(defaultNackBackoffPolicy))
- nacks := newNegativeAcksTracker(nmc, testNackDelay,
new(defaultNackBackoffPolicy), log.DefaultNopLogger())
+ nacks := newNegativeAcksTracker(nmc, testNackDelay,
new(defaultNackBackoffPolicy), log.DefaultNopLogger(),
+ testNackPrecisionBit)
nacks.AddMessage(new(mockMessage1))
nacks.AddMessage(new(mockMessage2))