yuanweikang2020 commented on issue #462:
URL:
https://github.com/apache/pulsar-client-go/issues/462#issuecomment-775637212
> thanks @yuanweikang2020 for this issue, can you provide more context how
to reproduce or verify this issue?
when pulsar is failure, pulsar client reconnect
```
func (p *partitionProducer) runEventsLoop() {
for {
select {
case i := <-p.eventsChan:
switch v := i.(type) {
case *sendRequest:
p.internalSend(v)
case *connectionClosed:
p.reconnectToBroker()
case *flushRequest:
p.internalFlush(v)
case *closeProducer:
p.internalClose(v)
return
}
case <-p.batchFlushTicker.C:
p.internalFlushCurrentBatch()
}
}
}
```
when reconnect pulsar Broker
```
func (p *partitionProducer) reconnectToBroker() {
backoff := internal.Backoff{}
for {
if p.state != producerReady {
// Producer is already closing
return
}
d := backoff.Next()
p.log.Info("Reconnecting to broker in ", d)
time.Sleep(d)
err := p.grabCnx()
if err == nil {
// Successfully reconnected
p.log.WithField("cnx", p.cnx.ID()).Info("Reconnected
producer to broker")
return
}
}
}
```
get connection frequently
```
func (c *rpcClient) getConn(logicalAddr *url.URL, physicalAddr *url.URL)
(Connection, error) {
cnx, err := c.pool.GetConnection(logicalAddr, physicalAddr)
backoff := Backoff{1 * time.Second}
startTime := time.Now()
var retryTime time.Duration
if err != nil {
for time.Since(startTime) < c.requestTimeout {
retryTime = backoff.Next()
c.log.Debugf("Reconnecting to broker in {%v} with
timeout in {%v}", retryTime, c.requestTimeout)
time.Sleep(retryTime)
cnx, err = c.pool.GetConnection(logicalAddr,
physicalAddr)
if err == nil {
c.log.Debugf("retry connection success")
return cnx, nil
}
}
return nil, err
}
return cnx, nil
}
```
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]