gunli commented on PR #1249:
URL: 
https://github.com/apache/pulsar-client-go/pull/1249#issuecomment-2245565450

   Good job, but I think it can be better, my idea is:
   1. add mutex and cancelfunc to producer
   ```go
   type partitionProducer struct {
   
        reconnectMutex     sync.Mutex
        reconnectCancelFun context.CancelFunc
   }
   ```
   2. when reconnecting, pass a context to it so that we can cancel it outside
   ```go
   case connectionClosed := <-p.connectClosedCh:
                        p.log.Info("runEventsLoop will reconnect in producer")
                        ctx, cancel := context.WithCancel(context.Background())
                        p.reconnectMutex.Lock()
                        p.reconnectCancelFun = cancel
                        p.reconnectMutex.Unlock()
                        p.reconnectToBroker(ctx, connectionClosed)
                        cancel()
   ```
   ```go
   func (p *partitionProducer) reconnectToBroker(ctx context.Context, 
connectionClosed *connectionClosed) {
        var maxRetry int
        if p.options.MaxReconnectToBroker == nil {
                maxRetry = -1
        } else {
                maxRetry = int(*p.options.MaxReconnectToBroker)
        }
   
        var (
                delayReconnectTime time.Duration
                defaultBackoff     = internal.DefaultBackoff{}
        )
   
        for maxRetry != 0 {
                select {
                case <-ctx.Done():
                        return // quit
                default:
   
                }
                if p.getProducerState() != producerReady {
                        // Producer is already closing
                        p.log.Info("producer state not ready, exit reconnect")
                        return
                }
   
                var assignedBrokerURL string
   
                if connectionClosed != nil && connectionClosed.HasURL() {
                        delayReconnectTime = 0
                        assignedBrokerURL = connectionClosed.assignedBrokerURL
                        connectionClosed = nil // Only attempt once
                } else if p.options.BackoffPolicy == nil {
                        delayReconnectTime = defaultBackoff.Next()
                } else {
                        delayReconnectTime = p.options.BackoffPolicy.Next()
                }
   
                p.log.WithFields(log.Fields{
                        "assignedBrokerURL":  assignedBrokerURL,
                        "delayReconnectTime": delayReconnectTime,
                }).Info("Reconnecting to broker")
                time.Sleep(delayReconnectTime)
   
                // double check
                if p.getProducerState() != producerReady {
                        // Producer is already closing
                        p.log.Info("producer state not ready, exit reconnect")
                        return
                }
   
                atomic.AddUint64(&p.epoch, 1)
                err := p.grabCnx(assignedBrokerURL)
                if err == nil {
                        // Successfully reconnected
                        p.log.WithField("cnx", 
p._getConn().ID()).Info("Reconnected producer to broker")
                        return
                }
                p.log.WithError(err).Error("Failed to create producer at 
reconnect")
                errMsg := err.Error()
                if strings.Contains(errMsg, errMsgTopicNotFound) {
                        // when topic is deleted, we should give up 
reconnection.
                        p.log.Warn("Topic not found, stop reconnecting, close 
the producer")
                        p.doClose(joinErrors(ErrTopicNotfound, err))
                        break
                }
   
                if strings.Contains(errMsg, errMsgTopicTerminated) {
                        p.log.Warn("Topic was terminated, failing pending 
messages, stop reconnecting, close the producer")
                        p.doClose(joinErrors(ErrTopicTerminated, err))
                        break
                }
   
                if strings.Contains(errMsg, 
errMsgProducerBlockedQuotaExceededException) {
                        p.log.Warn("Producer was blocked by quota exceed 
exception, failing pending messages, stop reconnecting")
                        
p.failPendingMessages(joinErrors(ErrProducerBlockedQuotaExceeded, err))
                        break
                }
   
                if strings.Contains(errMsg, errMsgProducerFenced) {
                        p.log.Warn("Producer was fenced, failing pending 
messages, stop reconnecting")
                        p.doClose(joinErrors(ErrProducerFenced, err))
                        break
                }
   
                if maxRetry > 0 {
                        maxRetry--
                }
                p.metrics.ProducersReconnectFailure.Inc()
                if maxRetry == 0 || defaultBackoff.IsMaxBackoffReached() {
                        p.metrics.ProducersReconnectMaxRetry.Inc()
                }
        }
   }
   ```
   3. when closing, check if we are in reconnecting, if so, cancel it and wait 
for closing done
   ```go
   func (p *partitionProducer) Close() {
        if p.getProducerState() != producerReady {
                // Producer is closing
                return
        }
   
        cp := &closeProducer{doneCh: make(chan struct{})}
        p.cmdChan <- cp
   
        // wait for close producer request to complete
        ctx := context.Background()
        for {
                select {
                case <-cp.doneCh: // close done
                        return
                case <-ctx.Done(): // app exit
                        return
                default:
                        // if we are stuck in reconnecting, cancel it
                        p.reconnectMutex.Lock()
                        if p.reconnectCancelFun != nil {
                                p.reconnectCancelFun()
                        }
                        p.reconnectMutex.Unlock()
                        time.Sleep(10 * time.Millisecond)
                }
        }
   }
   ```
   
   In this way, we can make sure
   1. that the closing and reconnecting procedures are run in the same 
goroutine with no race condition;
   2. the main goroutine and the event loop goroutine just share the cancel func
   3. when closing, we still can wait for closing done even if we are stuck in 
reconnecting.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to