RobertIndie commented on code in PR #1410:
URL: https://github.com/apache/pulsar-client-go/pull/1410#discussion_r2426101204


##########
pulsar/negative_acks_tracker.go:
##########
@@ -21,35 +21,57 @@ import (
        "sync"
        "time"
 
+       "github.com/RoaringBitmap/roaring/v2/roaring64"
        log "github.com/apache/pulsar-client-go/pulsar/log"
+       "github.com/emirpasic/gods/trees/avltree"
 )
 
 type redeliveryConsumer interface {
        Redeliver(msgIDs []messageID)
 }
 
+type LedgerID = int64

Review Comment:
   Please don't expose this type.



##########
pulsar/consumer_test.go:
##########
@@ -1225,6 +1225,201 @@ func TestConsumerNack(t *testing.T) {
        }
 }
 
+func TestNegativeAckPrecisionBitCnt(t *testing.T) {
+       const delay = 1 * time.Second
+
+       for precision := 1; precision <= 8; precision++ {
+               topicName := 
fmt.Sprintf("testNegativeAckPrecisionBitCnt-%d-%d", precision, 
time.Now().UnixNano())
+               ctx := context.Background()
+               client, err := NewClient(ClientOptions{URL: lookupURL})
+               assert.Nil(t, err)
+               defer client.Close()
+
+               precision := int64(precision)
+               consumer, err := client.Subscribe(ConsumerOptions{
+                       Topic:               topicName,
+                       SubscriptionName:    "sub-1",
+                       Type:                Shared,
+                       NackRedeliveryDelay: delay,
+                       NackPrecisionBit:    &precision,
+               })
+               assert.Nil(t, err)
+               defer consumer.Close()
+
+               producer, err := client.CreateProducer(ProducerOptions{
+                       Topic: topicName,
+               })
+               assert.Nil(t, err)
+               defer producer.Close()
+
+               // Send single message
+               content := "test-0"
+               _, err = producer.Send(ctx, &ProducerMessage{
+                       Payload: []byte(content),
+               })
+               assert.Nil(t, err)
+
+               // Receive and send negative ack
+               msg, err := consumer.Receive(ctx)
+               assert.Nil(t, err)
+               assert.Equal(t, content, string(msg.Payload()))
+               consumer.Nack(msg)
+
+               // Calculate expected redelivery window
+               expectedRedelivery := time.Now().Add(delay)
+               deviation := time.Duration(int64(1)<<precision) * 
time.Millisecond
+
+               // Wait for redelivery
+               redelivered, err := consumer.Receive(ctx)
+               assert.Nil(t, err)
+               assert.Equal(t, content, string(redelivered.Payload()))
+
+               now := time.Now()
+               // Assert that redelivery happens >= expected - deviation
+               assert.GreaterOrEqual(t, now.UnixMilli(), 
expectedRedelivery.UnixMilli()-deviation.Milliseconds())
+               // since the client ticks at an interval of delay / 3 (i.e., 
333 ms in this test),
+               // we add an extra 400 milliseconds to reduce the flaky
+               assert.LessOrEqual(t, now.UnixMilli(), 
expectedRedelivery.UnixMilli()+400)
+
+               consumer.Ack(redelivered)
+       }
+}
+
+func TestNackPrecisionBitDefaultBehavior(t *testing.T) {
+       // Test that default NackPrecisionBit (8) behaves the same as 
explicitly setting it to 8
+       // This test uses precise timing to verify that messages within 256ms 
window are grouped together
+
+       const delay = 300 * time.Millisecond // Tracker scans every 100ms 
(delay/3)
+       ctx := context.Background()
+
+       // Create two consumers with different topic names to avoid conflicts
+       topicNameDefault := 
fmt.Sprintf("testNackPrecisionBitDefault-default-%d", time.Now().UnixNano())
+       topicNameExplicit := 
fmt.Sprintf("testNackPrecisionBitDefault-explicit-%d", time.Now().UnixNano())
+
+       client, err := NewClient(ClientOptions{URL: lookupURL})
+       assert.Nil(t, err)
+       defer client.Close()
+
+       // Consumer 1: Default NackPrecisionBit (should be 8)
+       consumerDefault, err := client.Subscribe(ConsumerOptions{
+               Topic:               topicNameDefault,
+               SubscriptionName:    "sub-default",
+               Type:                Shared,
+               NackRedeliveryDelay: delay,
+               // NackPrecisionBit is not set, should use default value of 8
+       })
+       assert.Nil(t, err)
+       defer consumerDefault.Close()
+
+       // Consumer 2: Explicit NackPrecisionBit set to 8
+       consumerExplicit, err := client.Subscribe(ConsumerOptions{
+               Topic:               topicNameExplicit,
+               SubscriptionName:    "sub-explicit",
+               Type:                Shared,
+               NackRedeliveryDelay: delay,
+               NackPrecisionBit:    Ptr(defaultNackPrecisionBit),
+       })
+       assert.Nil(t, err)
+       defer consumerExplicit.Close()
+
+       // Create producers for both topics
+       producerDefault, err := client.CreateProducer(ProducerOptions{
+               Topic: topicNameDefault,
+       })
+       assert.Nil(t, err)
+       defer producerDefault.Close()
+
+       producerExplicit, err := client.CreateProducer(ProducerOptions{
+               Topic: topicNameExplicit,
+       })
+       assert.Nil(t, err)
+       defer producerExplicit.Close()
+
+       // Test function to verify precision bit behavior for a given consumer 
and producer
+       testPrecisionBitBehavior := func(consumer Consumer, producer Producer, 
topicName string) {
+               // Wait for next 256ms boundary to align timing
+               now := time.Now()
+               ms := now.UnixMilli()
+               nextBoundary := ((ms / 256) + 1) * 256 // Next 256ms boundary
+               waitTime := time.Duration(nextBoundary-ms) * time.Millisecond
+               time.Sleep(waitTime)
+
+               // Send first message at 256ms boundary
+               content1 := fmt.Sprintf("msg1-%s", topicName)
+               _, err := producer.Send(ctx, &ProducerMessage{
+                       Payload: []byte(content1),
+               })
+               assert.Nil(t, err)
+
+               // Wait 200ms (within 256ms window)
+               time.Sleep(200 * time.Millisecond)
+
+               // Send second message 200ms after first (still within 256ms 
window)
+               content2 := fmt.Sprintf("msg2-%s", topicName)
+               _, err = producer.Send(ctx, &ProducerMessage{
+                       Payload: []byte(content2),
+               })
+               assert.Nil(t, err)
+
+               // Receive both messages and nack them
+               msg1, err := consumer.Receive(ctx)
+               assert.Nil(t, err)
+               assert.Equal(t, content1, string(msg1.Payload()))
+               consumer.Nack(msg1)
+
+               msg2, err := consumer.Receive(ctx)
+               assert.Nil(t, err)
+               assert.Equal(t, content2, string(msg2.Payload()))
+               consumer.Nack(msg2)
+
+               // Record when nacks were sent
+               nackTime := time.Now()
+               expectedRedelivery := nackTime.Add(delay)
+
+               // Wait for redeliveries - both messages should be redelivered 
together
+               // because they are within the 256ms precision bit window
+               redelivered1, err := consumer.Receive(ctx)
+               assert.Nil(t, err)
+               redeliveryTime1 := time.Now()
+
+               redelivered2, err := consumer.Receive(ctx)
+               assert.Nil(t, err)
+               redeliveryTime2 := time.Now()
+
+               // Verify both messages were redelivered
+               assert.True(t, string(redelivered1.Payload()) == content1 || 
string(redelivered1.Payload()) == content2,
+                       "First redelivered message should be one of the 
original messages")
+               assert.True(t, string(redelivered2.Payload()) == content1 || 
string(redelivered2.Payload()) == content2,
+                       "Second redelivered message should be one of the 
original messages")
+               assert.NotEqual(t, string(redelivered1.Payload()), 
string(redelivered2.Payload()),
+                       "Redelivered messages should be different")
+
+               // KEY TEST: Both messages should be redelivered simultaneously 
(within 10ms of each other to reduce flaky)
+               // because 256ms alignment groups both messages in the same 
redelivery interval
+               timeDiff := redeliveryTime2.Sub(redeliveryTime1)
+               assert.Less(t, timeDiff, 10*time.Millisecond,
+                       "Both redelivered messages should arrive simultaneously 
(within 10ms) due to 256ms alignment "+
+                               "placing them in the same redelivery interval")
+
+               // Redelivery should happen within expected window (considering 
256ms precision)
+               minExpected := expectedRedelivery.Add(-256 * time.Millisecond)
+               maxExpected := expectedRedelivery.Add(150 * time.Millisecond) 
// Buffer for test stability
+
+               assert.GreaterOrEqual(t, redeliveryTime1.UnixMilli(), 
minExpected.UnixMilli(),
+                       "Redelivery should happen after minimum expected time")
+               assert.LessOrEqual(t, redeliveryTime2.UnixMilli(), 
maxExpected.UnixMilli(),
+                       "Redelivery should happen before maximum expected time")
+
+               // Acknowledge both messages
+               consumer.Ack(redelivered1)
+               consumer.Ack(redelivered2)
+       }
+
+       // Test both consumers
+       testPrecisionBitBehavior(consumerDefault, producerDefault, "default")
+       testPrecisionBitBehavior(consumerExplicit, producerExplicit, "explicit")
+}

Review Comment:
   Is it possible to reuse the testing logic in 
`TestNegativeAckPrecisionBitCnt`?



##########
pulsar/negative_acks_tracker.go:
##########
@@ -153,3 +201,7 @@ func (t *negativeAcksTracker) Close() {
                t.doneCh <- nil
        })
 }
+
+func Ptr[T any](v T) *T { return &v }

Review Comment:
   Please don't expose this method.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to