wolfstudy commented on a change in pull request #660:
URL: https://github.com/apache/pulsar-client-go/pull/660#discussion_r743352175



##########
File path: pulsar/consumer.go
##########
@@ -158,6 +158,17 @@ type ConsumerOptions struct {
 
        // Decryption decryption related fields to decrypt the encrypted message
        Decryption *MessageDecryptionInfo
+
+       // If enabled, the default implementation of NackBackoffPolicy will be 
used to calculate the delay time of
+       // nack backoff, Default: false.
+       EnableDefaultNackBackoffPolicy bool

Review comment:
       If there is no `EnableDefaultNackBackoffPolicy`, it will invade the 
existing code logic. When the NackBackoffPolicy policy is empty, suppose we use 
the default NackBackoffPolicy, then when the user uses the Nack(Message) 
interface, the new implementation will be used.

##########
File path: pulsar/negative_backoff_policy.go
##########
@@ -0,0 +1,46 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package pulsar
+
+import "math"
+
+// NackBackoffPolicy is a interface for custom message negativeAcked policy, 
users can specify a NackBackoffPolicy
+// for a consumer.
+//
+// > Notice: the consumer crashes will trigger the redelivery of the unacked 
message, this case will not respect the
+// > NackBackoffPolicy, which means the message might get redelivered earlier 
than the delay time
+// > from the backoff.
+type NackBackoffPolicy interface {
+       // The redeliveryCount indicates the number of times the message was 
redelivered.
+       // We can get the redeliveryCount from the CommandMessage.
+       Next(redeliveryCount uint32) int64
+}
+
+// defaultNackBackoffPolicy is default impl for NackBackoffPolicy.
+type defaultNackBackoffPolicy struct{}
+
+func (nbp *defaultNackBackoffPolicy) Next(redeliveryCount uint32) int64 {
+       minNackTimeMs := int64(1000 * 10) // 10sec

Review comment:
       ok, will fix this

##########
File path: pulsar/negative_backoff_policy.go
##########
@@ -0,0 +1,46 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package pulsar
+
+import "math"
+
+// NackBackoffPolicy is a interface for custom message negativeAcked policy, 
users can specify a NackBackoffPolicy
+// for a consumer.
+//
+// > Notice: the consumer crashes will trigger the redelivery of the unacked 
message, this case will not respect the
+// > NackBackoffPolicy, which means the message might get redelivered earlier 
than the delay time
+// > from the backoff.
+type NackBackoffPolicy interface {
+       // The redeliveryCount indicates the number of times the message was 
redelivered.
+       // We can get the redeliveryCount from the CommandMessage.
+       Next(redeliveryCount uint32) int64
+}
+
+// defaultNackBackoffPolicy is default impl for NackBackoffPolicy.
+type defaultNackBackoffPolicy struct{}
+
+func (nbp *defaultNackBackoffPolicy) Next(redeliveryCount uint32) int64 {
+       minNackTimeMs := int64(1000 * 10) // 10sec
+       maxNackTimeMs := 1000 * 60 * 10   // 10min
+
+       if redeliveryCount < 0 {
+               return minNackTimeMs
+       }
+
+       return 
int64(math.Min(math.Abs(float64(minNackTimeMs<<redeliveryCount)), 
float64(maxNackTimeMs)))

Review comment:
       Sure, will add comment for this change

##########
File path: pulsar/negative_acks_tracker.go
##########
@@ -35,22 +35,41 @@ type negativeAcksTracker struct {
        doneOnce     sync.Once
        negativeAcks map[messageID]time.Time
        rc           redeliveryConsumer
-       tick         *time.Ticker
+       nackBackoff  NackBackoffPolicy
+       trackFlag    bool
        delay        time.Duration
        log          log.Logger
 }
 
-func newNegativeAcksTracker(rc redeliveryConsumer, delay time.Duration, logger 
log.Logger) *negativeAcksTracker {
-       t := &negativeAcksTracker{
-               doneCh:       make(chan interface{}),
-               negativeAcks: make(map[messageID]time.Time),
-               rc:           rc,
-               tick:         time.NewTicker(delay / 3),
-               delay:        delay,
-               log:          logger,
-       }
+func newNegativeAcksTracker(rc redeliveryConsumer, delay time.Duration,
+       nackBackoffPolicy NackBackoffPolicy, logger log.Logger) 
*negativeAcksTracker {
+
+       t := new(negativeAcksTracker)

Review comment:
       They are the same effect

##########
File path: pulsar/negative_backoff_policy.go
##########
@@ -0,0 +1,46 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package pulsar
+
+import "math"
+
+// NackBackoffPolicy is a interface for custom message negativeAcked policy, 
users can specify a NackBackoffPolicy
+// for a consumer.
+//
+// > Notice: the consumer crashes will trigger the redelivery of the unacked 
message, this case will not respect the
+// > NackBackoffPolicy, which means the message might get redelivered earlier 
than the delay time
+// > from the backoff.
+type NackBackoffPolicy interface {
+       // The redeliveryCount indicates the number of times the message was 
redelivered.
+       // We can get the redeliveryCount from the CommandMessage.
+       Next(redeliveryCount uint32) int64
+}
+
+// defaultNackBackoffPolicy is default impl for NackBackoffPolicy.
+type defaultNackBackoffPolicy struct{}
+
+func (nbp *defaultNackBackoffPolicy) Next(redeliveryCount uint32) int64 {
+       minNackTimeMs := int64(1000 * 10) // 10sec

Review comment:
       ok, will fix this

##########
File path: pulsar/negative_backoff_policy.go
##########
@@ -0,0 +1,46 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package pulsar
+
+import "math"
+
+// NackBackoffPolicy is a interface for custom message negativeAcked policy, 
users can specify a NackBackoffPolicy
+// for a consumer.
+//
+// > Notice: the consumer crashes will trigger the redelivery of the unacked 
message, this case will not respect the
+// > NackBackoffPolicy, which means the message might get redelivered earlier 
than the delay time
+// > from the backoff.
+type NackBackoffPolicy interface {
+       // The redeliveryCount indicates the number of times the message was 
redelivered.
+       // We can get the redeliveryCount from the CommandMessage.
+       Next(redeliveryCount uint32) int64
+}
+
+// defaultNackBackoffPolicy is default impl for NackBackoffPolicy.
+type defaultNackBackoffPolicy struct{}
+
+func (nbp *defaultNackBackoffPolicy) Next(redeliveryCount uint32) int64 {
+       minNackTimeMs := int64(1000 * 10) // 10sec

Review comment:
       Because the << operation is required to cooperate with redeliveryCount, 
the unit conversion is still required, which will be converted to time.Duration 
in subsequent use.

##########
File path: pulsar/negative_backoff_policy.go
##########
@@ -0,0 +1,46 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package pulsar
+
+import "math"
+
+// NackBackoffPolicy is a interface for custom message negativeAcked policy, 
users can specify a NackBackoffPolicy
+// for a consumer.
+//
+// > Notice: the consumer crashes will trigger the redelivery of the unacked 
message, this case will not respect the
+// > NackBackoffPolicy, which means the message might get redelivered earlier 
than the delay time
+// > from the backoff.
+type NackBackoffPolicy interface {
+       // The redeliveryCount indicates the number of times the message was 
redelivered.
+       // We can get the redeliveryCount from the CommandMessage.
+       Next(redeliveryCount uint32) int64
+}
+
+// defaultNackBackoffPolicy is default impl for NackBackoffPolicy.
+type defaultNackBackoffPolicy struct{}
+
+func (nbp *defaultNackBackoffPolicy) Next(redeliveryCount uint32) int64 {
+       minNackTimeMs := int64(1000 * 10) // 10sec

Review comment:
       Because the `<<` operation is required to cooperate with 
redeliveryCount, the unit conversion is still required, which will be converted 
to time.Duration in subsequent use.

##########
File path: pulsar/negative_backoff_policy.go
##########
@@ -0,0 +1,46 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package pulsar
+
+import "math"
+
+// NackBackoffPolicy is a interface for custom message negativeAcked policy, 
users can specify a NackBackoffPolicy
+// for a consumer.
+//
+// > Notice: the consumer crashes will trigger the redelivery of the unacked 
message, this case will not respect the
+// > NackBackoffPolicy, which means the message might get redelivered earlier 
than the delay time
+// > from the backoff.
+type NackBackoffPolicy interface {
+       // The redeliveryCount indicates the number of times the message was 
redelivered.
+       // We can get the redeliveryCount from the CommandMessage.
+       Next(redeliveryCount uint32) int64
+}
+
+// defaultNackBackoffPolicy is default impl for NackBackoffPolicy.
+type defaultNackBackoffPolicy struct{}
+
+func (nbp *defaultNackBackoffPolicy) Next(redeliveryCount uint32) int64 {
+       minNackTimeMs := int64(1000 * 10) // 10sec
+       maxNackTimeMs := 1000 * 60 * 10   // 10min
+
+       if redeliveryCount < 0 {
+               return minNackTimeMs
+       }
+
+       return 
int64(math.Min(math.Abs(float64(minNackTimeMs<<redeliveryCount)), 
float64(maxNackTimeMs)))

Review comment:
       Here we will first get the redeliveryCount object from the 
CommandMessage, and then start the << operation from minNackTimeMs to calculate 
the current length of time that nack needs to be executed, and then compare it 
with maxNackTimeMs, and take their maximum value as the nack duration. nack 
will increase from minNackTimeMs to maxNackTimeMs according to the above rule

##########
File path: pulsar/negative_backoff_policy.go
##########
@@ -0,0 +1,46 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package pulsar
+
+import "math"
+
+// NackBackoffPolicy is a interface for custom message negativeAcked policy, 
users can specify a NackBackoffPolicy
+// for a consumer.
+//
+// > Notice: the consumer crashes will trigger the redelivery of the unacked 
message, this case will not respect the
+// > NackBackoffPolicy, which means the message might get redelivered earlier 
than the delay time
+// > from the backoff.
+type NackBackoffPolicy interface {
+       // The redeliveryCount indicates the number of times the message was 
redelivered.
+       // We can get the redeliveryCount from the CommandMessage.
+       Next(redeliveryCount uint32) int64
+}
+
+// defaultNackBackoffPolicy is default impl for NackBackoffPolicy.
+type defaultNackBackoffPolicy struct{}
+
+func (nbp *defaultNackBackoffPolicy) Next(redeliveryCount uint32) int64 {
+       minNackTimeMs := int64(1000 * 10) // 10sec
+       maxNackTimeMs := 1000 * 60 * 10   // 10min
+
+       if redeliveryCount < 0 {
+               return minNackTimeMs
+       }
+
+       return 
int64(math.Min(math.Abs(float64(minNackTimeMs<<redeliveryCount)), 
float64(maxNackTimeMs)))

Review comment:
       Here we will first get the redeliveryCount object from the 
CommandMessage, and then start the << operation from minNackTimeMs to calculate 
the current length of time that nack needs to be executed, and then compare it 
with maxNackTimeMs, and take their maximum value as the nack duration. nack 
will increase from minNackTimeMs to maxNackTimeMs according to the above rule

##########
File path: pulsar/negative_acks_tracker.go
##########
@@ -35,22 +35,41 @@ type negativeAcksTracker struct {
        doneOnce     sync.Once
        negativeAcks map[messageID]time.Time
        rc           redeliveryConsumer
-       tick         *time.Ticker
+       nackBackoff  NackBackoffPolicy
+       trackFlag    bool
        delay        time.Duration
        log          log.Logger
 }
 
-func newNegativeAcksTracker(rc redeliveryConsumer, delay time.Duration, logger 
log.Logger) *negativeAcksTracker {
-       t := &negativeAcksTracker{
-               doneCh:       make(chan interface{}),
-               negativeAcks: make(map[messageID]time.Time),
-               rc:           rc,
-               tick:         time.NewTicker(delay / 3),
-               delay:        delay,
-               log:          logger,
-       }
+func newNegativeAcksTracker(rc redeliveryConsumer, delay time.Duration,
+       nackBackoffPolicy NackBackoffPolicy, logger log.Logger) 
*negativeAcksTracker {
+
+       t := new(negativeAcksTracker)
+
+       // When using NackBackoffPolicy, the delay time needs to be calculated 
based on the RedeliveryCount field in
+       // the CommandMessage, so for the original default Nack() logic, we 
still keep the negativeAcksTracker created
+       // when we open a gorutine to execute the logic of `t.track()`. But for 
the NackBackoffPolicy method, we need
+       // to execute the logic of `t.track()` when AddMessage().
+       if nackBackoffPolicy != nil {

Review comment:
       Yes, agree with your point of view. The problem here is because, for 
nackbackoff, we can't directly get the corresponding nackDelayTime, we need to 
get the redeliveryCount through the CommandMessage and then calculate the 
nackDelayTime, then we can determine the time.NewTicker based on the 
nackDelayTime. It is precisely because of such a relationship that the if 
statement is added

##########
File path: pulsar/negative_acks_tracker.go
##########
@@ -76,14 +95,48 @@ func (t *negativeAcksTracker) Add(msgID messageID) {
        t.negativeAcks[batchMsgID] = targetTime
 }
 
-func (t *negativeAcksTracker) track() {
+func (t *negativeAcksTracker) AddMessage(msg Message) {

Review comment:
       Because we need to get redeliveryCount through the Message interface

##########
File path: pulsar/negative_acks_tracker.go
##########
@@ -105,15 +158,13 @@ func (t *negativeAcksTracker) track() {
                                        t.rc.Redeliver(msgIds)
                                }
                        }
-
                }
        }
 }
 
 func (t *negativeAcksTracker) Close() {
        // allow Close() to be invoked multiple times by consumer_partition to 
avoid panic
        t.doneOnce.Do(func() {
-               t.tick.Stop()

Review comment:
       In the current implementation situation, if we use the t.ticker in the 
struct, there will be a data race, so now we use the temporary variables of the 
ticker, and there is no good way to see how to close the temporarily created 
ticker.

##########
File path: pulsar/negative_backoff_policy.go
##########
@@ -0,0 +1,46 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package pulsar
+
+import "math"
+
+// NackBackoffPolicy is a interface for custom message negativeAcked policy, 
users can specify a NackBackoffPolicy
+// for a consumer.
+//
+// > Notice: the consumer crashes will trigger the redelivery of the unacked 
message, this case will not respect the
+// > NackBackoffPolicy, which means the message might get redelivered earlier 
than the delay time
+// > from the backoff.
+type NackBackoffPolicy interface {
+       // The redeliveryCount indicates the number of times the message was 
redelivered.
+       // We can get the redeliveryCount from the CommandMessage.
+       Next(redeliveryCount uint32) int64
+}
+
+// defaultNackBackoffPolicy is default impl for NackBackoffPolicy.
+type defaultNackBackoffPolicy struct{}
+
+func (nbp *defaultNackBackoffPolicy) Next(redeliveryCount uint32) int64 {
+       minNackTimeMs := int64(1000 * 10) // 10sec
+       maxNackTimeMs := 1000 * 60 * 10   // 10min
+
+       if redeliveryCount < 0 {
+               return minNackTimeMs
+       }
+
+       return 
int64(math.Min(math.Abs(float64(minNackTimeMs<<redeliveryCount)), 
float64(maxNackTimeMs)))

Review comment:
       Here we will first get the redeliveryCount object from the 
CommandMessage, and then start the << operation from minNackTimeMs to calculate 
the current length of time that nack needs to be executed, and then compare it 
with maxNackTimeMs, and take their maximum value as the nack duration. nack 
will increase from minNackTimeMs to maxNackTimeMs according to the above rule

##########
File path: pulsar/consumer.go
##########
@@ -158,6 +158,17 @@ type ConsumerOptions struct {
 
        // Decryption decryption related fields to decrypt the encrypted message
        Decryption *MessageDecryptionInfo
+
+       // If enabled, the default implementation of NackBackoffPolicy will be 
used to calculate the delay time of
+       // nack backoff, Default: false.
+       EnableDefaultNackBackoffPolicy bool

Review comment:
       If there is no `EnableDefaultNackBackoffPolicy`, it will invade the 
existing code logic. When the NackBackoffPolicy policy is empty, suppose we use 
the default NackBackoffPolicy, then when the user uses the Nack(Message) 
interface, the new implementation will be used.

##########
File path: pulsar/negative_backoff_policy.go
##########
@@ -0,0 +1,46 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package pulsar
+
+import "math"
+
+// NackBackoffPolicy is a interface for custom message negativeAcked policy, 
users can specify a NackBackoffPolicy
+// for a consumer.
+//
+// > Notice: the consumer crashes will trigger the redelivery of the unacked 
message, this case will not respect the
+// > NackBackoffPolicy, which means the message might get redelivered earlier 
than the delay time
+// > from the backoff.
+type NackBackoffPolicy interface {
+       // The redeliveryCount indicates the number of times the message was 
redelivered.
+       // We can get the redeliveryCount from the CommandMessage.
+       Next(redeliveryCount uint32) int64
+}
+
+// defaultNackBackoffPolicy is default impl for NackBackoffPolicy.
+type defaultNackBackoffPolicy struct{}
+
+func (nbp *defaultNackBackoffPolicy) Next(redeliveryCount uint32) int64 {
+       minNackTimeMs := int64(1000 * 10) // 10sec

Review comment:
       ok, will fix this

##########
File path: pulsar/negative_backoff_policy.go
##########
@@ -0,0 +1,46 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package pulsar
+
+import "math"
+
+// NackBackoffPolicy is a interface for custom message negativeAcked policy, 
users can specify a NackBackoffPolicy
+// for a consumer.
+//
+// > Notice: the consumer crashes will trigger the redelivery of the unacked 
message, this case will not respect the
+// > NackBackoffPolicy, which means the message might get redelivered earlier 
than the delay time
+// > from the backoff.
+type NackBackoffPolicy interface {
+       // The redeliveryCount indicates the number of times the message was 
redelivered.
+       // We can get the redeliveryCount from the CommandMessage.
+       Next(redeliveryCount uint32) int64
+}
+
+// defaultNackBackoffPolicy is default impl for NackBackoffPolicy.
+type defaultNackBackoffPolicy struct{}
+
+func (nbp *defaultNackBackoffPolicy) Next(redeliveryCount uint32) int64 {
+       minNackTimeMs := int64(1000 * 10) // 10sec
+       maxNackTimeMs := 1000 * 60 * 10   // 10min
+
+       if redeliveryCount < 0 {
+               return minNackTimeMs
+       }
+
+       return 
int64(math.Min(math.Abs(float64(minNackTimeMs<<redeliveryCount)), 
float64(maxNackTimeMs)))

Review comment:
       Sure, will add comment for this change

##########
File path: pulsar/negative_acks_tracker.go
##########
@@ -35,22 +35,41 @@ type negativeAcksTracker struct {
        doneOnce     sync.Once
        negativeAcks map[messageID]time.Time
        rc           redeliveryConsumer
-       tick         *time.Ticker
+       nackBackoff  NackBackoffPolicy
+       trackFlag    bool
        delay        time.Duration
        log          log.Logger
 }
 
-func newNegativeAcksTracker(rc redeliveryConsumer, delay time.Duration, logger 
log.Logger) *negativeAcksTracker {
-       t := &negativeAcksTracker{
-               doneCh:       make(chan interface{}),
-               negativeAcks: make(map[messageID]time.Time),
-               rc:           rc,
-               tick:         time.NewTicker(delay / 3),
-               delay:        delay,
-               log:          logger,
-       }
+func newNegativeAcksTracker(rc redeliveryConsumer, delay time.Duration,
+       nackBackoffPolicy NackBackoffPolicy, logger log.Logger) 
*negativeAcksTracker {
+
+       t := new(negativeAcksTracker)

Review comment:
       They are the same effect

##########
File path: pulsar/negative_backoff_policy.go
##########
@@ -0,0 +1,46 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package pulsar
+
+import "math"
+
+// NackBackoffPolicy is a interface for custom message negativeAcked policy, 
users can specify a NackBackoffPolicy
+// for a consumer.
+//
+// > Notice: the consumer crashes will trigger the redelivery of the unacked 
message, this case will not respect the
+// > NackBackoffPolicy, which means the message might get redelivered earlier 
than the delay time
+// > from the backoff.
+type NackBackoffPolicy interface {
+       // The redeliveryCount indicates the number of times the message was 
redelivered.
+       // We can get the redeliveryCount from the CommandMessage.
+       Next(redeliveryCount uint32) int64
+}
+
+// defaultNackBackoffPolicy is default impl for NackBackoffPolicy.
+type defaultNackBackoffPolicy struct{}
+
+func (nbp *defaultNackBackoffPolicy) Next(redeliveryCount uint32) int64 {
+       minNackTimeMs := int64(1000 * 10) // 10sec

Review comment:
       ok, will fix this

##########
File path: pulsar/negative_backoff_policy.go
##########
@@ -0,0 +1,46 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package pulsar
+
+import "math"
+
+// NackBackoffPolicy is a interface for custom message negativeAcked policy, 
users can specify a NackBackoffPolicy
+// for a consumer.
+//
+// > Notice: the consumer crashes will trigger the redelivery of the unacked 
message, this case will not respect the
+// > NackBackoffPolicy, which means the message might get redelivered earlier 
than the delay time
+// > from the backoff.
+type NackBackoffPolicy interface {
+       // The redeliveryCount indicates the number of times the message was 
redelivered.
+       // We can get the redeliveryCount from the CommandMessage.
+       Next(redeliveryCount uint32) int64
+}
+
+// defaultNackBackoffPolicy is default impl for NackBackoffPolicy.
+type defaultNackBackoffPolicy struct{}
+
+func (nbp *defaultNackBackoffPolicy) Next(redeliveryCount uint32) int64 {
+       minNackTimeMs := int64(1000 * 10) // 10sec

Review comment:
       Because the << operation is required to cooperate with redeliveryCount, 
the unit conversion is still required, which will be converted to time.Duration 
in subsequent use.

##########
File path: pulsar/negative_backoff_policy.go
##########
@@ -0,0 +1,46 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package pulsar
+
+import "math"
+
+// NackBackoffPolicy is a interface for custom message negativeAcked policy, 
users can specify a NackBackoffPolicy
+// for a consumer.
+//
+// > Notice: the consumer crashes will trigger the redelivery of the unacked 
message, this case will not respect the
+// > NackBackoffPolicy, which means the message might get redelivered earlier 
than the delay time
+// > from the backoff.
+type NackBackoffPolicy interface {
+       // The redeliveryCount indicates the number of times the message was 
redelivered.
+       // We can get the redeliveryCount from the CommandMessage.
+       Next(redeliveryCount uint32) int64
+}
+
+// defaultNackBackoffPolicy is default impl for NackBackoffPolicy.
+type defaultNackBackoffPolicy struct{}
+
+func (nbp *defaultNackBackoffPolicy) Next(redeliveryCount uint32) int64 {
+       minNackTimeMs := int64(1000 * 10) // 10sec

Review comment:
       Because the `<<` operation is required to cooperate with 
redeliveryCount, the unit conversion is still required, which will be converted 
to time.Duration in subsequent use.

##########
File path: pulsar/negative_backoff_policy.go
##########
@@ -0,0 +1,46 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package pulsar
+
+import "math"
+
+// NackBackoffPolicy is a interface for custom message negativeAcked policy, 
users can specify a NackBackoffPolicy
+// for a consumer.
+//
+// > Notice: the consumer crashes will trigger the redelivery of the unacked 
message, this case will not respect the
+// > NackBackoffPolicy, which means the message might get redelivered earlier 
than the delay time
+// > from the backoff.
+type NackBackoffPolicy interface {
+       // The redeliveryCount indicates the number of times the message was 
redelivered.
+       // We can get the redeliveryCount from the CommandMessage.
+       Next(redeliveryCount uint32) int64
+}
+
+// defaultNackBackoffPolicy is default impl for NackBackoffPolicy.
+type defaultNackBackoffPolicy struct{}
+
+func (nbp *defaultNackBackoffPolicy) Next(redeliveryCount uint32) int64 {
+       minNackTimeMs := int64(1000 * 10) // 10sec
+       maxNackTimeMs := 1000 * 60 * 10   // 10min
+
+       if redeliveryCount < 0 {
+               return minNackTimeMs
+       }
+
+       return 
int64(math.Min(math.Abs(float64(minNackTimeMs<<redeliveryCount)), 
float64(maxNackTimeMs)))

Review comment:
       Here we will first get the redeliveryCount object from the 
CommandMessage, and then start the << operation from minNackTimeMs to calculate 
the current length of time that nack needs to be executed, and then compare it 
with maxNackTimeMs, and take their maximum value as the nack duration. nack 
will increase from minNackTimeMs to maxNackTimeMs according to the above rule

##########
File path: pulsar/negative_backoff_policy.go
##########
@@ -0,0 +1,46 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package pulsar
+
+import "math"
+
+// NackBackoffPolicy is a interface for custom message negativeAcked policy, 
users can specify a NackBackoffPolicy
+// for a consumer.
+//
+// > Notice: the consumer crashes will trigger the redelivery of the unacked 
message, this case will not respect the
+// > NackBackoffPolicy, which means the message might get redelivered earlier 
than the delay time
+// > from the backoff.
+type NackBackoffPolicy interface {
+       // The redeliveryCount indicates the number of times the message was 
redelivered.
+       // We can get the redeliveryCount from the CommandMessage.
+       Next(redeliveryCount uint32) int64
+}
+
+// defaultNackBackoffPolicy is default impl for NackBackoffPolicy.
+type defaultNackBackoffPolicy struct{}
+
+func (nbp *defaultNackBackoffPolicy) Next(redeliveryCount uint32) int64 {
+       minNackTimeMs := int64(1000 * 10) // 10sec
+       maxNackTimeMs := 1000 * 60 * 10   // 10min
+
+       if redeliveryCount < 0 {
+               return minNackTimeMs
+       }
+
+       return 
int64(math.Min(math.Abs(float64(minNackTimeMs<<redeliveryCount)), 
float64(maxNackTimeMs)))

Review comment:
       Here we will first get the redeliveryCount object from the 
CommandMessage, and then start the << operation from minNackTimeMs to calculate 
the current length of time that nack needs to be executed, and then compare it 
with maxNackTimeMs, and take their maximum value as the nack duration. nack 
will increase from minNackTimeMs to maxNackTimeMs according to the above rule

##########
File path: pulsar/negative_acks_tracker.go
##########
@@ -35,22 +35,41 @@ type negativeAcksTracker struct {
        doneOnce     sync.Once
        negativeAcks map[messageID]time.Time
        rc           redeliveryConsumer
-       tick         *time.Ticker
+       nackBackoff  NackBackoffPolicy
+       trackFlag    bool
        delay        time.Duration
        log          log.Logger
 }
 
-func newNegativeAcksTracker(rc redeliveryConsumer, delay time.Duration, logger 
log.Logger) *negativeAcksTracker {
-       t := &negativeAcksTracker{
-               doneCh:       make(chan interface{}),
-               negativeAcks: make(map[messageID]time.Time),
-               rc:           rc,
-               tick:         time.NewTicker(delay / 3),
-               delay:        delay,
-               log:          logger,
-       }
+func newNegativeAcksTracker(rc redeliveryConsumer, delay time.Duration,
+       nackBackoffPolicy NackBackoffPolicy, logger log.Logger) 
*negativeAcksTracker {
+
+       t := new(negativeAcksTracker)
+
+       // When using NackBackoffPolicy, the delay time needs to be calculated 
based on the RedeliveryCount field in
+       // the CommandMessage, so for the original default Nack() logic, we 
still keep the negativeAcksTracker created
+       // when we open a gorutine to execute the logic of `t.track()`. But for 
the NackBackoffPolicy method, we need
+       // to execute the logic of `t.track()` when AddMessage().
+       if nackBackoffPolicy != nil {

Review comment:
       Yes, agree with your point of view. The problem here is because, for 
nackbackoff, we can't directly get the corresponding nackDelayTime, we need to 
get the redeliveryCount through the CommandMessage and then calculate the 
nackDelayTime, then we can determine the time.NewTicker based on the 
nackDelayTime. It is precisely because of such a relationship that the if 
statement is added

##########
File path: pulsar/negative_acks_tracker.go
##########
@@ -76,14 +95,48 @@ func (t *negativeAcksTracker) Add(msgID messageID) {
        t.negativeAcks[batchMsgID] = targetTime
 }
 
-func (t *negativeAcksTracker) track() {
+func (t *negativeAcksTracker) AddMessage(msg Message) {

Review comment:
       Because we need to get redeliveryCount through the Message interface

##########
File path: pulsar/negative_acks_tracker.go
##########
@@ -105,15 +158,13 @@ func (t *negativeAcksTracker) track() {
                                        t.rc.Redeliver(msgIds)
                                }
                        }
-
                }
        }
 }
 
 func (t *negativeAcksTracker) Close() {
        // allow Close() to be invoked multiple times by consumer_partition to 
avoid panic
        t.doneOnce.Do(func() {
-               t.tick.Stop()

Review comment:
       In the current implementation situation, if we use the t.ticker in the 
struct, there will be a data race, so now we use the temporary variables of the 
ticker, and there is no good way to see how to close the temporarily created 
ticker.

##########
File path: pulsar/negative_backoff_policy.go
##########
@@ -0,0 +1,46 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package pulsar
+
+import "math"
+
+// NackBackoffPolicy is a interface for custom message negativeAcked policy, 
users can specify a NackBackoffPolicy
+// for a consumer.
+//
+// > Notice: the consumer crashes will trigger the redelivery of the unacked 
message, this case will not respect the
+// > NackBackoffPolicy, which means the message might get redelivered earlier 
than the delay time
+// > from the backoff.
+type NackBackoffPolicy interface {
+       // The redeliveryCount indicates the number of times the message was 
redelivered.
+       // We can get the redeliveryCount from the CommandMessage.
+       Next(redeliveryCount uint32) int64
+}
+
+// defaultNackBackoffPolicy is default impl for NackBackoffPolicy.
+type defaultNackBackoffPolicy struct{}
+
+func (nbp *defaultNackBackoffPolicy) Next(redeliveryCount uint32) int64 {
+       minNackTimeMs := int64(1000 * 10) // 10sec
+       maxNackTimeMs := 1000 * 60 * 10   // 10min
+
+       if redeliveryCount < 0 {
+               return minNackTimeMs
+       }
+
+       return 
int64(math.Min(math.Abs(float64(minNackTimeMs<<redeliveryCount)), 
float64(maxNackTimeMs)))

Review comment:
       Here we will first get the redeliveryCount object from the 
CommandMessage, and then start the << operation from minNackTimeMs to calculate 
the current length of time that nack needs to be executed, and then compare it 
with maxNackTimeMs, and take their maximum value as the nack duration. nack 
will increase from minNackTimeMs to maxNackTimeMs according to the above rule




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to