nodece commented on code in PR #1197:
URL: https://github.com/apache/pulsar-client-go/pull/1197#discussion_r1667704680


##########
pulsar/dlq_router.go:
##########
@@ -155,7 +156,7 @@ func (r *dlqRouter) getProducer(schema Schema) Producer {
        }
 
        // Retry to create producer indefinitely
-       backoff := &internal.DefaultBackoff{}
+       bo := &backoff.DefaultBackoff{}

Review Comment:
   Could you pass the backoff to the `dlqRouter`?
   
   



##########
pulsar/consumer_partition.go:
##########
@@ -581,11 +583,11 @@ func (pc *partitionConsumer) getLastMessageID() 
(*trackingMessageID, error) {
                return nil, errors.New("failed to getLastMessageID for the 
closing or closed consumer")
        }
        remainTime := pc.client.operationTimeout
-       var backoff internal.BackoffPolicy
+       var bo backoff.Policy
        if pc.options.backoffPolicy != nil {
-               backoff = pc.options.backoffPolicy
+               bo = pc.options.backoffPolicy
        } else {
-               backoff = &internal.DefaultBackoff{}
+               bo = &backoff.DefaultBackoff{}

Review Comment:
   ```suggestion
                bo = NewDefaultBackoff()
   ```



##########
pulsar/producer_partition.go:
##########
@@ -448,17 +450,22 @@ func (p *partitionProducer) getOrCreateSchema(schemaInfo 
*SchemaInfo) (schemaVer
 }
 
 func (p *partitionProducer) reconnectToBroker(connectionClosed 
*connectionClosed) {
-       var maxRetry int
+       var (
+               maxRetry                                    int
+               delayReconnectTime, totalDelayReconnectTime time.Duration
+       )
        if p.options.MaxReconnectToBroker == nil {
                maxRetry = -1
        } else {
                maxRetry = int(*p.options.MaxReconnectToBroker)
        }
 
-       var (
-               delayReconnectTime time.Duration
-               defaultBackoff     = internal.DefaultBackoff{}
-       )
+       var bo backoff.Policy
+       if p.options.BackoffPolicy != nil {
+               bo = p.options.BackoffPolicy
+       } else {
+               bo = &backoff.DefaultBackoff{}

Review Comment:
   ```suggestion
                bo = NewDefaultBackoff()
   ```



##########
pulsar/backoff/backoff.go:
##########
@@ -26,10 +26,14 @@ func init() {
        rand.Seed(time.Now().UnixNano())
 }
 
-// BackoffPolicy parameterize the following options in the reconnection logic 
to
+// Policy parameterize the following options in the reconnection logic to
 // allow users to customize the reconnection logic (minBackoff, maxBackoff and 
jitterPercentage)
-type BackoffPolicy interface {
+type Policy interface {
+       // Next returns the delay to wait before next retry
        Next() time.Duration
+
+       // IsMaxBackoffReached evaluates if the max number of retries is reached
+       IsMaxBackoffReached(delayReconnectTime, totalDelayReconnectTime 
time.Duration) bool

Review Comment:
   If you agree with this idea, you can move the backoff object to the global 
scope from the local scope, which can reduce the allocation of objects.



##########
pulsar/backoff/backoff.go:
##########
@@ -26,10 +26,14 @@ func init() {
        rand.Seed(time.Now().UnixNano())
 }
 
-// BackoffPolicy parameterize the following options in the reconnection logic 
to
+// Policy parameterize the following options in the reconnection logic to
 // allow users to customize the reconnection logic (minBackoff, maxBackoff and 
jitterPercentage)
-type BackoffPolicy interface {
+type Policy interface {
+       // Next returns the delay to wait before next retry
        Next() time.Duration
+
+       // IsMaxBackoffReached evaluates if the max number of retries is reached
+       IsMaxBackoffReached(delayReconnectTime, totalDelayReconnectTime 
time.Duration) bool

Review Comment:
   `delayReconnectTime` and `totalDelayReconnectTime` are unnecessary, we can 
record this time in the backoff. 
   
   Usually, we need to reset this backoff when some logic works fine, could you 
add the `Reset()` to this interface.



##########
pulsar/backoff/backoff.go:
##########
@@ -15,7 +15,7 @@
 // specific language governing permissions and limitations
 // under the License.
 
-package internal
+package backoff

Review Comment:
   Could you move this pack to the `pulsar` directory and keep `BackoffPolicy`?
   



##########
pulsar/consumer_partition.go:
##########
@@ -1680,18 +1682,23 @@ func (pc *partitionConsumer) internalClose(req 
*closeRequest) {
 }
 
 func (pc *partitionConsumer) reconnectToBroker(connectionClosed 
*connectionClosed) {
-       var maxRetry int
+       var (
+               maxRetry                                    int
+               delayReconnectTime, totalDelayReconnectTime time.Duration
+       )
 
        if pc.options.maxReconnectToBroker == nil {
                maxRetry = -1
        } else {
                maxRetry = int(*pc.options.maxReconnectToBroker)
        }
 
-       var (
-               delayReconnectTime time.Duration
-               defaultBackoff     = internal.DefaultBackoff{}
-       )
+       var bo backoff.Policy
+       if pc.options.backoffPolicy != nil {
+               bo = pc.options.backoffPolicy
+       } else {
+               bo = &backoff.DefaultBackoff{}

Review Comment:
   ```suggestion
                bo = NewDefaultBackoff()
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to