This is an automated email from the ASF dual-hosted git repository.

thetumbled pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar-client-go.git


The following commit(s) were added to refs/heads/master by this push:
     new d99d2c06 [improve] modify the negativeACK structure to reduce memory 
overhead (#1410)
d99d2c06 is described below

commit d99d2c06cf591a3b47db9d890160d3a5a6689b32
Author: Guangyang Deng <[email protected]>
AuthorDate: Sat Oct 18 15:32:50 2025 +0800

    [improve] modify the negativeACK structure to reduce memory overhead (#1410)
    
    * improve: modify the structure of NegativeAckTracker to reduce memory 
overhead
    
    * fix TestNegativeAckPrecisionBitCnt bug
    
    * chore: formatting issues
    
    * fix: fix issues with TestConsumerNack
    
    * chore: revert changes to TestConsumerNack
    
    * fix: add defaultNackPrecisionBitVal
    
    * fix: revert changes to TestZeroQueueConsumer_Nack
    
    * add TestNackPrecisionBitDefaultBehavior
    
    * chore: update TestNegativeAckPrecisionBitCnt test
    
    * fix: make TestNegativeAckPrecisionBitCnt more stable
---
 go.mod                               |   5 +-
 go.sum                               |  11 ++-
 pulsar/consumer.go                   |   5 ++
 pulsar/consumer_impl.go              |   7 ++
 pulsar/consumer_partition.go         |   4 +-
 pulsar/consumer_test.go              |  96 +++++++++++++++++++++++++
 pulsar/negative_acks_tracker.go      | 136 ++++++++++++++++++++++++-----------
 pulsar/negative_acks_tracker_test.go |   9 ++-
 8 files changed, 224 insertions(+), 49 deletions(-)

diff --git a/go.mod b/go.mod
index 6c388209..0e9fb7d8 100644
--- a/go.mod
+++ b/go.mod
@@ -6,9 +6,11 @@ require (
        github.com/99designs/keyring v1.2.1
        github.com/AthenZ/athenz v1.12.13
        github.com/DataDog/zstd v1.5.0
-       github.com/bits-and-blooms/bitset v1.4.0
+       github.com/RoaringBitmap/roaring/v2 v2.8.0
+       github.com/bits-and-blooms/bitset v1.12.0
        github.com/bmizerany/perks v0.0.0-20141205001514-d9a9656a3a4b
        github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc
+       github.com/emirpasic/gods v1.18.1
        github.com/docker/docker v28.0.0+incompatible
        github.com/docker/go-connections v0.5.0
        github.com/golang-jwt/jwt/v5 v5.2.2
@@ -78,6 +80,7 @@ require (
        github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd // 
indirect
        github.com/modern-go/reflect2 v1.0.2 // indirect
        github.com/morikuni/aec v1.0.0 // indirect
+       github.com/mschoch/smat v0.2.0 // indirect
        github.com/mtibben/percent v0.2.1 // indirect
        github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822 // 
indirect
        github.com/nxadm/tail v1.4.8 // indirect
diff --git a/go.sum b/go.sum
index c7183334..44ec10eb 100644
--- a/go.sum
+++ b/go.sum
@@ -14,12 +14,14 @@ github.com/DataDog/zstd v1.5.0 
h1:+K/VEwIAaPcHiMtQvpLD4lqW7f0Gk3xdYZmI1hD+CXo=
 github.com/DataDog/zstd v1.5.0/go.mod 
h1:g4AWEaM3yOg3HYfnJ3YIawPnVdXJh9QME85blwSAmyw=
 github.com/Microsoft/go-winio v0.6.2 
h1:F2VQgta7ecxGYO8k3ZZz3RS8fVIXVxONVUPlNERoyfY=
 github.com/Microsoft/go-winio v0.6.2/go.mod 
h1:yd8OoFMLzJbo9gZq8j5qaps8bJ9aShtEA8Ipt1oGCvU=
+github.com/RoaringBitmap/roaring/v2 v2.8.0 
h1:y1rdtixfXvaITKzkfiKvScI0hlBJHe9sfzJp8cgeM7w=
+github.com/RoaringBitmap/roaring/v2 v2.8.0/go.mod 
h1:FiJcsfkGje/nZBZgCu0ZxCPOKD/hVXDS2dXi7/eUFE0=
 github.com/ardielle/ardielle-go v1.5.2 
h1:TilHTpHIQJ27R1Tl/iITBzMwiUGSlVfiVhwDNGM3Zj4=
 github.com/ardielle/ardielle-go v1.5.2/go.mod 
h1:I4hy1n795cUhaVt/ojz83SNVCYIGsAFAONtv2Dr7HUI=
 github.com/beorn7/perks v1.0.1 h1:VlbKKnNfV8bJzeqoa4cOKqO6bYr3WgKZxO8Z16+hsOM=
 github.com/beorn7/perks v1.0.1/go.mod 
h1:G2ZrVWU2WbWT9wwq4/hrbKbnv/1ERSJQ0ibhJ6rlkpw=
-github.com/bits-and-blooms/bitset v1.4.0 
h1:+YZ8ePm+He2pU3dZlIZiOeAKfrBkXi1lSrXJ/Xzgbu8=
-github.com/bits-and-blooms/bitset v1.4.0/go.mod 
h1:gIdJ4wp64HaoK2YrL1Q5/N7Y16edYb8uY+O0FJTyyDA=
+github.com/bits-and-blooms/bitset v1.12.0 
h1:U/q1fAF7xXRhFCrhROzIfffYnu+dlS38vCZtmFVPHmA=
+github.com/bits-and-blooms/bitset v1.12.0/go.mod 
h1:7hO7Gc7Pp1vODcmWvKMRA9BNmbv6a/7QIWpPxHddWR8=
 github.com/bmizerany/perks v0.0.0-20141205001514-d9a9656a3a4b 
h1:AP/Y7sqYicnjGDfD5VcY4CIfh1hRXBUavxrvELjTiOE=
 github.com/bmizerany/perks v0.0.0-20141205001514-d9a9656a3a4b/go.mod 
h1:ac9efd0D1fsDb3EJvhqgXRbFx7bs2wqZ10HQPeU8U/Q=
 github.com/cenkalti/backoff/v4 v4.2.1 
h1:y4OZtCnogmCPw98Zjyt5a6+QwPLGkiQsYW5oUqylYbM=
@@ -53,6 +55,8 @@ github.com/docker/go-units v0.5.0 
h1:69rxXcBk27SvSaaxTtLh/8llcHD8vYHT7WSdRZ/jvr4
 github.com/docker/go-units v0.5.0/go.mod 
h1:fgPhTUdO+D/Jk86RDLlptpiXQzgHJF7gydDDbaIK4Dk=
 github.com/dvsekhvalnov/jose2go v1.6.0 
h1:Y9gnSnP4qEI0+/uQkHvFXeD2PLPJeXEL+ySMEA2EjTY=
 github.com/dvsekhvalnov/jose2go v1.6.0/go.mod 
h1:QsHjhyTlD/lAVqn/NSbVZmSCGeDehTB/mPZadG+mhXU=
+github.com/emirpasic/gods v1.18.1 
h1:FXtiHYKDGKCW2KzwZKx0iC0PQmdlorYgdFG9jPXJ1Bc=
+github.com/emirpasic/gods v1.18.1/go.mod 
h1:8tpGGwCnJ5H4r6BWwaV6OrWmMoPhUl5jm/FMNAnJvWQ=
 github.com/felixge/httpsnoop v1.0.4 
h1:NFTV2Zj1bL4mc9sqWACXbQFVBBg2W3GPvqp8/ESS2Wg=
 github.com/felixge/httpsnoop v1.0.4/go.mod 
h1:m8KPJKqk1gH5J9DgRY2ASl2lWCfGKXixSwevea8zH2U=
 github.com/fsnotify/fsnotify v1.4.7/go.mod 
h1:jwhsz4b93w/PPRr/qN1Yymfu8t87LnFCMoQvtojpjFo=
@@ -154,6 +158,8 @@ github.com/modern-go/reflect2 v1.0.2 
h1:xBagoLtFs94CBntxluKeaWgTMpvLxC4ur3nMaC9G
 github.com/modern-go/reflect2 v1.0.2/go.mod 
h1:yWuevngMOJpCy52FWWMvUC8ws7m/LJsjYzDa0/r8luk=
 github.com/morikuni/aec v1.0.0 h1:nP9CBfwrvYnBRgY6qfDQkygYDmYwOilePFkwzv4dU8A=
 github.com/morikuni/aec v1.0.0/go.mod 
h1:BbKIizmSmc5MMPqRYbxO4ZU0S0+P200+tUnFx7PXmsc=
+github.com/mschoch/smat v0.2.0 h1:8imxQsjDm8yFEAVBe7azKmKSgzSkZXDuKkSq9374khM=
+github.com/mschoch/smat v0.2.0/go.mod 
h1:kc9mz7DoBKqDyiRL7VZN8KvXQMWeTaVnttLRXOlotKw=
 github.com/mtibben/percent v0.2.1 
h1:5gssi8Nqo8QU/r2pynCm+hBQHpkB/uNK7BJCFogWdzs=
 github.com/mtibben/percent v0.2.1/go.mod 
h1:KG9uO+SZkUp+VkRHsCdYQV3XSZrrSpR3O9ibNBTZrns=
 github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822 
h1:C3w9PqII01/Oq1c1nUAm88MOHcQC9l5mIlSMApZMrHA=
@@ -346,6 +352,7 @@ gopkg.in/yaml.v2 v2.2.2/go.mod 
h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI=
 gopkg.in/yaml.v2 v2.2.4/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI=
 gopkg.in/yaml.v2 v2.3.0/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI=
 gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod 
h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
+gopkg.in/yaml.v3 v3.0.0/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
 gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA=
 gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
 gotest.tools/v3 v3.5.1 h1:EENdUnS3pdur5nybKYIh2Vfgc8IUNBjxDPSjtiJcOzU=
diff --git a/pulsar/consumer.go b/pulsar/consumer.go
index 80bbf01c..7996335a 100644
--- a/pulsar/consumer.go
+++ b/pulsar/consumer.go
@@ -191,6 +191,11 @@ type ConsumerOptions struct {
        // processed. Default is 1 min. (See `Consumer.Nack()`)
        NackRedeliveryDelay time.Duration
 
+       // NackPrecisionBit specifies the precision bit for nack redelivery 
delay.
+       // This is used to trim the lower bits of the nack redelivery delay to 
reduce memory usage.
+       // Default is 8 bits.
+       NackPrecisionBit *int64
+
        // Name specifies the consumer name.
        Name string
 
diff --git a/pulsar/consumer_impl.go b/pulsar/consumer_impl.go
index 05db0726..08c825e3 100644
--- a/pulsar/consumer_impl.go
+++ b/pulsar/consumer_impl.go
@@ -118,6 +118,12 @@ func newConsumer(client *client, options ConsumerOptions) 
(Consumer, error) {
                options.NackBackoffPolicy = new(defaultNackBackoffPolicy)
        }
 
+       if options.NackPrecisionBit == nil {
+               options.NackPrecisionBit = ptr(defaultNackPrecisionBit)
+       } else if *options.NackPrecisionBit < 0 {
+               return nil, newError(InvalidConfiguration, "NackPrecisionBit 
cannot be negative")
+       }
+
        // did the user pass in a message channel?
        messageCh := options.MessageChannel
        if options.MessageChannel == nil {
@@ -452,6 +458,7 @@ func newPartitionConsumerOpts(topic, consumerName string, 
idx int, options Consu
                receiverQueueSize:           options.ReceiverQueueSize,
                nackRedeliveryDelay:         nackRedeliveryDelay,
                nackBackoffPolicy:           options.NackBackoffPolicy,
+               nackPrecisionBit:            options.NackPrecisionBit,
                metadata:                    options.Properties,
                subProperties:               options.SubscriptionProperties,
                replicateSubscriptionState:  options.ReplicateSubscriptionState,
diff --git a/pulsar/consumer_partition.go b/pulsar/consumer_partition.go
index fcde4f3a..bfd44b18 100644
--- a/pulsar/consumer_partition.go
+++ b/pulsar/consumer_partition.go
@@ -104,6 +104,7 @@ type partitionConsumerOpts struct {
        autoReceiverQueueSize       bool
        nackRedeliveryDelay         time.Duration
        nackBackoffPolicy           NackBackoffPolicy
+       nackPrecisionBit            *int64
        metadata                    map[string]string
        subProperties               map[string]string
        replicateSubscriptionState  bool
@@ -424,7 +425,8 @@ func newPartitionConsumer(parent Consumer, client *client, 
options *partitionCon
 
        pc.decryptor = decryptor
 
-       pc.nackTracker = newNegativeAcksTracker(pc, 
options.nackRedeliveryDelay, options.nackBackoffPolicy, pc.log)
+       pc.nackTracker = newNegativeAcksTracker(pc, 
options.nackRedeliveryDelay, options.nackBackoffPolicy, pc.log,
+               options.nackPrecisionBit)
 
        err := pc.grabConn("")
        if err != nil {
diff --git a/pulsar/consumer_test.go b/pulsar/consumer_test.go
index c36f1f59..9f20ed84 100644
--- a/pulsar/consumer_test.go
+++ b/pulsar/consumer_test.go
@@ -1225,6 +1225,102 @@ func TestConsumerNack(t *testing.T) {
        }
 }
 
+func TestNegativeAckPrecisionBitCnt(t *testing.T) {
+       // Validate behavior across precision bits and default (nil -> 8)
+       const delay = 300 * time.Millisecond // Tracker scans every 100ms 
(delay/3)
+       ctx := context.Background()
+
+       client, err := NewClient(ClientOptions{URL: lookupURL})
+       assert.Nil(t, err)
+       defer client.Close()
+
+       // Helper to verify behavior for a given NackPrecisionBit and boundary 
bits.
+       testPrecisionBitBehavior := func(nackPrecisionBit *int64, boundaryBits 
int64) {
+               // Create topic, consumer and producer inside the function
+               topicName := fmt.Sprintf("testNackPrecisionBit-%d-%d", 
boundaryBits, time.Now().UnixNano())
+               consumer, err := client.Subscribe(ConsumerOptions{
+                       Topic:               topicName,
+                       SubscriptionName:    fmt.Sprintf("sub-%d", 
boundaryBits),
+                       Type:                Shared,
+                       NackRedeliveryDelay: delay,
+                       NackPrecisionBit:    nackPrecisionBit, // can be nil 
for default behavior
+               })
+               assert.Nil(t, err)
+               defer consumer.Close()
+
+               producer, err := client.CreateProducer(ProducerOptions{Topic: 
topicName})
+               assert.Nil(t, err)
+               defer producer.Close()
+
+               // Align to the next window boundary based on boundaryBits
+               windowMs := int64(1) << boundaryBits
+               nowMs := time.Now().UnixMilli()
+               nextBoundaryMs := ((nowMs / windowMs) + 1) * windowMs // Next 
boundary
+               time.Sleep(time.Duration(nextBoundaryMs-nowMs) * 
time.Millisecond)
+
+               // Send first message at the boundary
+               content1 := fmt.Sprintf("msg1-p%d", boundaryBits)
+               _, err = producer.Send(ctx, &ProducerMessage{Payload: 
[]byte(content1)})
+               assert.Nil(t, err)
+
+               // Send second message around 3/4 into the window (still in 
same window)
+               time.Sleep(time.Duration(windowMs*3/4) * time.Millisecond)
+               content2 := fmt.Sprintf("msg2-p%d", boundaryBits)
+               _, err = producer.Send(ctx, &ProducerMessage{Payload: 
[]byte(content2)})
+               assert.Nil(t, err)
+
+               // Receive and nack both messages
+               m1, err := consumer.Receive(ctx)
+               assert.Nil(t, err)
+               assert.Equal(t, content1, string(m1.Payload()))
+               consumer.Nack(m1)
+               m2, err := consumer.Receive(ctx)
+               assert.Nil(t, err)
+               assert.Equal(t, content2, string(m2.Payload()))
+               consumer.Nack(m2)
+
+               // Expected redelivery window considering precision and tracker 
tick
+               expected := time.Now().Add(delay)
+               deviation := time.Duration(windowMs) * time.Millisecond
+
+               // Both should be redelivered in the same cycle
+               rm1, err := consumer.Receive(ctx)
+               assert.Nil(t, err)
+               redeliveryTime1 := time.Now()
+               rm2, err := consumer.Receive(ctx)
+               assert.Nil(t, err)
+               redeliveryTime2 := time.Now()
+
+               // For both the default precision (nil) and precisionBit=8, 
boundaryBits is 8.
+               // This checks that the default precisionBit is correctly set 
to 8,
+               // and that its redelivery behavior matches a consumer 
explicitly configured with precisionBit=8.
+               if boundaryBits == 8 {
+                       assert.InDelta(t, redeliveryTime1.UnixMilli(), 
redeliveryTime2.UnixMilli(), 1)
+               }
+
+               // Redelivery should occur within [expected-window, 
expected+buffer]
+               minExpected := expected.Add(-deviation)
+               maxExpected := expected.Add(150 * time.Millisecond)
+               assert.GreaterOrEqual(t, redeliveryTime1.UnixMilli(), 
minExpected.UnixMilli())
+               assert.LessOrEqual(t, redeliveryTime2.UnixMilli(), 
maxExpected.UnixMilli())
+
+               consumer.Ack(rm1)
+               consumer.Ack(rm2)
+       }
+
+       // Run for precision bits 1...8 with matching boundary bits
+       for bits := int64(1); bits <= int64(8); bits++ {
+               t.Run(fmt.Sprintf("PrecisionBits=%d", bits), func(_ *testing.T) 
{
+                       testPrecisionBitBehavior(ptr(bits), bits)
+               })
+       }
+
+       // Default behavior (nil) should match precision bit 8
+       t.Run("DefaultPrecisionBits=8", func(_ *testing.T) {
+               testPrecisionBitBehavior(nil, int64(8))
+       })
+}
+
 func TestConsumerCompression(t *testing.T) {
        client, err := NewClient(ClientOptions{
                URL: lookupURL,
diff --git a/pulsar/negative_acks_tracker.go b/pulsar/negative_acks_tracker.go
index 1331c7df..ac9f08d7 100644
--- a/pulsar/negative_acks_tracker.go
+++ b/pulsar/negative_acks_tracker.go
@@ -21,35 +21,57 @@ import (
        "sync"
        "time"
 
+       "github.com/RoaringBitmap/roaring/v2/roaring64"
        log "github.com/apache/pulsar-client-go/pulsar/log"
+       "github.com/emirpasic/gods/trees/avltree"
 )
 
 type redeliveryConsumer interface {
        Redeliver(msgIDs []messageID)
 }
 
+type ledgerID = int64
+
 type negativeAcksTracker struct {
        sync.Mutex
 
-       doneCh       chan interface{}
-       doneOnce     sync.Once
-       negativeAcks map[messageID]time.Time
-       rc           redeliveryConsumer
-       nackBackoff  NackBackoffPolicy
-       tick         *time.Ticker
-       delay        time.Duration
-       log          log.Logger
+       doneCh           chan interface{}
+       doneOnce         sync.Once
+       negativeAcks     *avltree.Tree
+       nackPrecisionBit *int64
+       rc               redeliveryConsumer
+       nackBackoff      NackBackoffPolicy
+       tick             *time.Ticker
+       delay            time.Duration
+       log              log.Logger
 }
 
 func newNegativeAcksTracker(rc redeliveryConsumer, delay time.Duration,
-       nackBackoffPolicy NackBackoffPolicy, logger log.Logger) 
*negativeAcksTracker {
+       nackBackoffPolicy NackBackoffPolicy, logger log.Logger, 
nackPrecisionBit *int64) *negativeAcksTracker {
 
        t := &negativeAcksTracker{
-               doneCh:       make(chan interface{}),
-               negativeAcks: make(map[messageID]time.Time),
-               rc:           rc,
-               nackBackoff:  nackBackoffPolicy,
-               log:          logger,
+               doneCh: make(chan interface{}),
+               negativeAcks: avltree.NewWith(func(a, b interface{}) int {
+                       // Perform type assertions and handle invalid types.
+                       timeA, okA := a.(time.Time)
+                       timeB, okB := b.(time.Time)
+
+                       if !okA || !okB {
+                               panic("invalid type: both values must be of 
type time.Time")
+                       }
+
+                       // Compare the two time.Time values.
+                       if timeA.Before(timeB) {
+                               return -1
+                       } else if timeA.After(timeB) {
+                               return 1
+                       }
+                       return 0 // Equal times.
+               }),
+               rc:               rc,
+               nackBackoff:      nackBackoffPolicy,
+               log:              logger,
+               nackPrecisionBit: nackPrecisionBit,
        }
 
        if nackBackoffPolicy != nil {
@@ -65,6 +87,37 @@ func newNegativeAcksTracker(rc redeliveryConsumer, delay 
time.Duration,
        return t
 }
 
+func trimLowerBit(ts int64, precisionBit int64) int64 {
+       if precisionBit <= 0 {
+               return ts
+       }
+       mask := ^((int64(1) << precisionBit) - 1)
+       return ts & mask
+}
+
+func putNackEntry(t *negativeAcksTracker, batchMsgID *messageID, delay 
time.Duration) {
+       t.Lock()
+       defer t.Unlock()
+
+       targetTime := time.Now().Add(delay)
+       trimmedTime := time.UnixMilli(trimLowerBit(targetTime.UnixMilli(), 
*t.nackPrecisionBit))
+       // try get trimmedTime
+       value, exists := t.negativeAcks.Get(trimmedTime)
+       if !exists {
+               newMap := make(map[ledgerID]*roaring64.Bitmap)
+               t.negativeAcks.Put(trimmedTime, newMap)
+               value = newMap
+       }
+       bitmapMap, ok := value.(map[ledgerID]*roaring64.Bitmap)
+       if !ok {
+               panic("negativeAcksTracker: value is not of expected type 
map[LedgerID]*roaring64.Bitmap")
+       }
+       if _, exists := bitmapMap[batchMsgID.ledgerID]; !exists {
+               bitmapMap[batchMsgID.ledgerID] = roaring64.NewBitmap()
+       }
+       bitmapMap[batchMsgID.ledgerID].Add(uint64(batchMsgID.entryID))
+}
+
 func (t *negativeAcksTracker) Add(msgID *messageID) {
        // Always clear up the batch index since we want to track the nack
        // for the entire batch
@@ -74,17 +127,7 @@ func (t *negativeAcksTracker) Add(msgID *messageID) {
                batchIdx: 0,
        }
 
-       t.Lock()
-       defer t.Unlock()
-
-       _, present := t.negativeAcks[batchMsgID]
-       if present {
-               // The batch is already being tracked
-               return
-       }
-
-       targetTime := time.Now().Add(t.delay)
-       t.negativeAcks[batchMsgID] = targetTime
+       putNackEntry(t, &batchMsgID, t.delay)
 }
 
 func (t *negativeAcksTracker) AddMessage(msg Message) {
@@ -100,17 +143,7 @@ func (t *negativeAcksTracker) AddMessage(msg Message) {
                batchIdx: 0,
        }
 
-       t.Lock()
-       defer t.Unlock()
-
-       _, present := t.negativeAcks[batchMsgID]
-       if present {
-               // The batch is already being tracked
-               return
-       }
-
-       targetTime := time.Now().Add(nackBackoffDelay)
-       t.negativeAcks[batchMsgID] = targetTime
+       putNackEntry(t, &batchMsgID, nackBackoffDelay)
 }
 
 func (t *negativeAcksTracker) track() {
@@ -127,13 +160,28 @@ func (t *negativeAcksTracker) track() {
 
                                t.Lock()
 
-                               for msgID, targetTime := range t.negativeAcks {
-                                       t.log.Debugf("MsgId: %v -- targetTime: 
%v -- now: %v", msgID, targetTime, now)
-                                       if targetTime.Before(now) {
-                                               t.log.Debugf("Adding MsgId: 
%v", msgID)
-                                               msgIDs = append(msgIDs, msgID)
-                                               delete(t.negativeAcks, msgID)
+                               iterator := t.negativeAcks.Iterator()
+                               for iterator.Next() {
+                                       targetTime := iterator.Key().(time.Time)
+                                       // because use ordered map, so we can 
early break
+                                       if targetTime.After(now) {
+                                               break
+                                       }
+
+                                       ledgerMap := 
iterator.Value().(map[ledgerID]*roaring64.Bitmap)
+                                       for ledgerID, entrySet := range 
ledgerMap {
+                                               for _, entryID := range 
entrySet.ToArray() {
+                                                       msgID := messageID{
+                                                               ledgerID: 
ledgerID,
+                                                               entryID:  
int64(entryID),
+                                                               batchIdx: 0,
+                                                       }
+                                                       msgIDs = append(msgIDs, 
msgID)
+                                               }
                                        }
+
+                                       // Safe deletion during iteration
+                                       t.negativeAcks.Remove(targetTime)
                                }
 
                                t.Unlock()
@@ -153,3 +201,7 @@ func (t *negativeAcksTracker) Close() {
                t.doneCh <- nil
        })
 }
+
+func ptr[T any](v T) *T { return &v }
+
+const defaultNackPrecisionBit = int64(8)
diff --git a/pulsar/negative_acks_tracker_test.go 
b/pulsar/negative_acks_tracker_test.go
index 3f03ac44..bf395346 100644
--- a/pulsar/negative_acks_tracker_test.go
+++ b/pulsar/negative_acks_tracker_test.go
@@ -29,6 +29,8 @@ import (
 
 const testNackDelay = 300 * time.Millisecond
 
+var testNackPrecisionBit = ptr(defaultNackPrecisionBit)
+
 type nackMockedConsumer struct {
        ch     chan messageID
        closed bool
@@ -80,7 +82,7 @@ func (nmc *nackMockedConsumer) Wait() <-chan messageID {
 
 func TestNacksTracker(t *testing.T) {
        nmc := newNackMockedConsumer(nil)
-       nacks := newNegativeAcksTracker(nmc, testNackDelay, nil, 
log.DefaultNopLogger())
+       nacks := newNegativeAcksTracker(nmc, testNackDelay, nil, 
log.DefaultNopLogger(), testNackPrecisionBit)
 
        nacks.Add(&messageID{
                ledgerID: 1,
@@ -113,7 +115,7 @@ func TestNacksTracker(t *testing.T) {
 
 func TestNacksWithBatchesTracker(t *testing.T) {
        nmc := newNackMockedConsumer(nil)
-       nacks := newNegativeAcksTracker(nmc, testNackDelay, nil, 
log.DefaultNopLogger())
+       nacks := newNegativeAcksTracker(nmc, testNackDelay, nil, 
log.DefaultNopLogger(), testNackPrecisionBit)
 
        nacks.Add(&messageID{
                ledgerID: 1,
@@ -156,7 +158,8 @@ func TestNacksWithBatchesTracker(t *testing.T) {
 
 func TestNackBackoffTracker(t *testing.T) {
        nmc := newNackMockedConsumer(new(defaultNackBackoffPolicy))
-       nacks := newNegativeAcksTracker(nmc, testNackDelay, 
new(defaultNackBackoffPolicy), log.DefaultNopLogger())
+       nacks := newNegativeAcksTracker(nmc, testNackDelay, 
new(defaultNackBackoffPolicy), log.DefaultNopLogger(),
+               testNackPrecisionBit)
 
        nacks.AddMessage(new(mockMessage1))
        nacks.AddMessage(new(mockMessage2))

Reply via email to