gunli commented on PR #1249:
URL:
https://github.com/apache/pulsar-client-go/pull/1249#issuecomment-2245565450
Good job, but I think it can be better, my idea is:
1. add mutex and cancelfunc to producer
```go
type partitionProducer struct {
reconnectMutex sync.Mutex
reconnectCancelFun context.CancelFunc
}
```
2. when reconnecting, pass a context to it so that we can cancel it outside
```go
case connectionClosed := <-p.connectClosedCh:
p.log.Info("runEventsLoop will reconnect in producer")
ctx, cancel := context.WithCancel(context.Background())
p.reconnectMutex.Lock()
p.reconnectCancelFun = cancel
p.reconnectMutex.Unlock()
p.reconnectToBroker(ctx, connectionClosed)
cancel()
```
```go
func (p *partitionProducer) reconnectToBroker(ctx context.Context,
connectionClosed *connectionClosed) {
var maxRetry int
if p.options.MaxReconnectToBroker == nil {
maxRetry = -1
} else {
maxRetry = int(*p.options.MaxReconnectToBroker)
}
var (
delayReconnectTime time.Duration
defaultBackoff = internal.DefaultBackoff{}
)
for maxRetry != 0 {
select {
case <-ctx.Done():
return // quit
default:
}
if p.getProducerState() != producerReady {
// Producer is already closing
p.log.Info("producer state not ready, exit reconnect")
return
}
var assignedBrokerURL string
if connectionClosed != nil && connectionClosed.HasURL() {
delayReconnectTime = 0
assignedBrokerURL = connectionClosed.assignedBrokerURL
connectionClosed = nil // Only attempt once
} else if p.options.BackoffPolicy == nil {
delayReconnectTime = defaultBackoff.Next()
} else {
delayReconnectTime = p.options.BackoffPolicy.Next()
}
p.log.WithFields(log.Fields{
"assignedBrokerURL": assignedBrokerURL,
"delayReconnectTime": delayReconnectTime,
}).Info("Reconnecting to broker")
time.Sleep(delayReconnectTime)
// double check
if p.getProducerState() != producerReady {
// Producer is already closing
p.log.Info("producer state not ready, exit reconnect")
return
}
atomic.AddUint64(&p.epoch, 1)
err := p.grabCnx(assignedBrokerURL)
if err == nil {
// Successfully reconnected
p.log.WithField("cnx",
p._getConn().ID()).Info("Reconnected producer to broker")
return
}
p.log.WithError(err).Error("Failed to create producer at
reconnect")
errMsg := err.Error()
if strings.Contains(errMsg, errMsgTopicNotFound) {
// when topic is deleted, we should give up
reconnection.
p.log.Warn("Topic not found, stop reconnecting, close
the producer")
p.doClose(joinErrors(ErrTopicNotfound, err))
break
}
if strings.Contains(errMsg, errMsgTopicTerminated) {
p.log.Warn("Topic was terminated, failing pending
messages, stop reconnecting, close the producer")
p.doClose(joinErrors(ErrTopicTerminated, err))
break
}
if strings.Contains(errMsg,
errMsgProducerBlockedQuotaExceededException) {
p.log.Warn("Producer was blocked by quota exceed
exception, failing pending messages, stop reconnecting")
p.failPendingMessages(joinErrors(ErrProducerBlockedQuotaExceeded, err))
break
}
if strings.Contains(errMsg, errMsgProducerFenced) {
p.log.Warn("Producer was fenced, failing pending
messages, stop reconnecting")
p.doClose(joinErrors(ErrProducerFenced, err))
break
}
if maxRetry > 0 {
maxRetry--
}
p.metrics.ProducersReconnectFailure.Inc()
if maxRetry == 0 || defaultBackoff.IsMaxBackoffReached() {
p.metrics.ProducersReconnectMaxRetry.Inc()
}
}
}
```
3. when closing, check if we are in reconnecting, if so, cancel it and wait
for closing done
```go
func (p *partitionProducer) Close() {
if p.getProducerState() != producerReady {
// Producer is closing
return
}
cp := &closeProducer{doneCh: make(chan struct{})}
p.cmdChan <- cp
// wait for close producer request to complete
ctx := context.Background()
for {
select {
case <-cp.doneCh: // close done
return
case <-ctx.Done(): // app exit
return
default:
// if we are stuck in reconnecting, cancel it
p.reconnectMutex.Lock()
if p.reconnectCancelFun != nil {
p.reconnectCancelFun()
}
p.reconnectMutex.Unlock()
time.Sleep(10 * time.Millisecond)
}
}
}
```
In this way, we can make sure
1. that the closing and reconnecting procedures are run in the same
goroutine with no race condition;
2. the main goroutine and the event loop goroutine just share the cancel func
3. when closing, we still can wait for closing done even if we are stuck in
reconnecting.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]