cckellogg commented on a change in pull request #660:
URL: https://github.com/apache/pulsar-client-go/pull/660#discussion_r743310068



##########
File path: pulsar/negative_acks_tracker.go
##########
@@ -35,22 +35,41 @@ type negativeAcksTracker struct {
        doneOnce     sync.Once
        negativeAcks map[messageID]time.Time
        rc           redeliveryConsumer
-       tick         *time.Ticker
+       nackBackoff  NackBackoffPolicy
+       trackFlag    bool
        delay        time.Duration
        log          log.Logger
 }
 
-func newNegativeAcksTracker(rc redeliveryConsumer, delay time.Duration, logger 
log.Logger) *negativeAcksTracker {
-       t := &negativeAcksTracker{
-               doneCh:       make(chan interface{}),
-               negativeAcks: make(map[messageID]time.Time),
-               rc:           rc,
-               tick:         time.NewTicker(delay / 3),
-               delay:        delay,
-               log:          logger,
-       }
+func newNegativeAcksTracker(rc redeliveryConsumer, delay time.Duration,
+       nackBackoffPolicy NackBackoffPolicy, logger log.Logger) 
*negativeAcksTracker {
+
+       t := new(negativeAcksTracker)
+
+       // When using NackBackoffPolicy, the delay time needs to be calculated 
based on the RedeliveryCount field in
+       // the CommandMessage, so for the original default Nack() logic, we 
still keep the negativeAcksTracker created
+       // when we open a gorutine to execute the logic of `t.track()`. But for 
the NackBackoffPolicy method, we need
+       // to execute the logic of `t.track()` when AddMessage().
+       if nackBackoffPolicy != nil {

Review comment:
       I'm a little confused on why we need an if statement. Shouldn't the 
default Implementation of the `NackBackoffPolicy` be what the current behavior 
is? The benefit of the interface is to simply the code and delegate to the 
implementation.
   
   ```
   bp := nackBackoffPolicy
   if bp == nil {
     bp = newDefaultBackoffPolicy(delay)
   }
   t = &negativeAcksTracker{
                        doneCh:       make(chan interface{}),
                        negativeAcks: make(map[messageID]time.Time),
                        nackBackoff:  bp,
                        rc:           rc,
                        log:          logger,
                }
   ```
   
    Thoughts?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to