nodece commented on code in PR #1197:
URL: https://github.com/apache/pulsar-client-go/pull/1197#discussion_r1667704680
##########
pulsar/dlq_router.go:
##########
@@ -155,7 +156,7 @@ func (r *dlqRouter) getProducer(schema Schema) Producer {
}
// Retry to create producer indefinitely
- backoff := &internal.DefaultBackoff{}
+ bo := &backoff.DefaultBackoff{}
Review Comment:
Could you pass the backoff to the `dlqRouter`?
##########
pulsar/consumer_partition.go:
##########
@@ -581,11 +583,11 @@ func (pc *partitionConsumer) getLastMessageID()
(*trackingMessageID, error) {
return nil, errors.New("failed to getLastMessageID for the
closing or closed consumer")
}
remainTime := pc.client.operationTimeout
- var backoff internal.BackoffPolicy
+ var bo backoff.Policy
if pc.options.backoffPolicy != nil {
- backoff = pc.options.backoffPolicy
+ bo = pc.options.backoffPolicy
} else {
- backoff = &internal.DefaultBackoff{}
+ bo = &backoff.DefaultBackoff{}
Review Comment:
```suggestion
bo = NewDefaultBackoff()
```
##########
pulsar/producer_partition.go:
##########
@@ -448,17 +450,22 @@ func (p *partitionProducer) getOrCreateSchema(schemaInfo
*SchemaInfo) (schemaVer
}
func (p *partitionProducer) reconnectToBroker(connectionClosed
*connectionClosed) {
- var maxRetry int
+ var (
+ maxRetry int
+ delayReconnectTime, totalDelayReconnectTime time.Duration
+ )
if p.options.MaxReconnectToBroker == nil {
maxRetry = -1
} else {
maxRetry = int(*p.options.MaxReconnectToBroker)
}
- var (
- delayReconnectTime time.Duration
- defaultBackoff = internal.DefaultBackoff{}
- )
+ var bo backoff.Policy
+ if p.options.BackoffPolicy != nil {
+ bo = p.options.BackoffPolicy
+ } else {
+ bo = &backoff.DefaultBackoff{}
Review Comment:
```suggestion
bo = NewDefaultBackoff()
```
##########
pulsar/backoff/backoff.go:
##########
@@ -26,10 +26,14 @@ func init() {
rand.Seed(time.Now().UnixNano())
}
-// BackoffPolicy parameterize the following options in the reconnection logic
to
+// Policy parameterize the following options in the reconnection logic to
// allow users to customize the reconnection logic (minBackoff, maxBackoff and
jitterPercentage)
-type BackoffPolicy interface {
+type Policy interface {
+ // Next returns the delay to wait before next retry
Next() time.Duration
+
+ // IsMaxBackoffReached evaluates if the max number of retries is reached
+ IsMaxBackoffReached(delayReconnectTime, totalDelayReconnectTime
time.Duration) bool
Review Comment:
If you agree with this idea, you can move the backoff object to the global
scope from the local scope, which can reduce the allocation of objects.
##########
pulsar/backoff/backoff.go:
##########
@@ -26,10 +26,14 @@ func init() {
rand.Seed(time.Now().UnixNano())
}
-// BackoffPolicy parameterize the following options in the reconnection logic
to
+// Policy parameterize the following options in the reconnection logic to
// allow users to customize the reconnection logic (minBackoff, maxBackoff and
jitterPercentage)
-type BackoffPolicy interface {
+type Policy interface {
+ // Next returns the delay to wait before next retry
Next() time.Duration
+
+ // IsMaxBackoffReached evaluates if the max number of retries is reached
+ IsMaxBackoffReached(delayReconnectTime, totalDelayReconnectTime
time.Duration) bool
Review Comment:
`delayReconnectTime` and `totalDelayReconnectTime` are unnecessary, we can
record this time in the backoff.
Usually, we need to reset this backoff when some logic works fine, could you
add the `Reset()` to this interface.
##########
pulsar/backoff/backoff.go:
##########
@@ -15,7 +15,7 @@
// specific language governing permissions and limitations
// under the License.
-package internal
+package backoff
Review Comment:
Could you move this pack to the `pulsar` directory and keep `BackoffPolicy`?
##########
pulsar/consumer_partition.go:
##########
@@ -1680,18 +1682,23 @@ func (pc *partitionConsumer) internalClose(req
*closeRequest) {
}
func (pc *partitionConsumer) reconnectToBroker(connectionClosed
*connectionClosed) {
- var maxRetry int
+ var (
+ maxRetry int
+ delayReconnectTime, totalDelayReconnectTime time.Duration
+ )
if pc.options.maxReconnectToBroker == nil {
maxRetry = -1
} else {
maxRetry = int(*pc.options.maxReconnectToBroker)
}
- var (
- delayReconnectTime time.Duration
- defaultBackoff = internal.DefaultBackoff{}
- )
+ var bo backoff.Policy
+ if pc.options.backoffPolicy != nil {
+ bo = pc.options.backoffPolicy
+ } else {
+ bo = &backoff.DefaultBackoff{}
Review Comment:
```suggestion
bo = NewDefaultBackoff()
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]