gunli commented on code in PR #1333:
URL: https://github.com/apache/pulsar-client-go/pull/1333#discussion_r1961252474
##########
pulsar/producer_partition.go:
##########
@@ -581,7 +581,8 @@ func (p *partitionProducer) runEventsLoop() {
}
case connectionClosed := <-p.connectClosedCh:
p.log.Info("runEventsLoop will reconnect in producer")
- p.reconnectToBroker(connectionClosed)
+ // reconnect to broker in a new goroutine so that it
won't block the event loop, see issue #1332
+ go p.reconnectToBroker(connectionClosed)
Review Comment:
```go
func (c *connection) run() {
pingSendTicker := time.NewTicker(c.keepAliveInterval)
pingCheckTicker := time.NewTicker(c.keepAliveInterval)
defer func() {
// stop tickers
pingSendTicker.Stop()
pingCheckTicker.Stop()
// all the accesses to the pendingReqs should be happened in
this run loop thread,
// including the final cleanup, to avoid the issue
// https://github.com/apache/pulsar-client-go/issues/239
c.failPendingRequests(errConnectionClosed)
c.Close()
}()
// All reads come from the reader goroutine
go c.reader.readFromConnection()
go c.runPingCheck(pingCheckTicker)
c.log.Debugf("Connection run starting with request capacity=%d
queued=%d",
cap(c.incomingRequestsCh), len(c.incomingRequestsCh))
go func() {
for {
select {
case <-c.closeCh:
c.failLeftRequestsWhenClose()
return
case req := <-c.incomingRequestsCh:
if req == nil {
return // TODO: this never gonna be
happen
}
c.internalSendRequest(req)
}
}
}()
for {
select {
case <-c.closeCh:
return
case cmd := <-c.incomingCmdCh:
c.internalReceivedCommand(cmd.cmd,
cmd.headersAndPayload)
case data := <-c.writeRequestsCh:
if data == nil {
return
}
c.internalWriteData(data)
case <-pingSendTicker.C:
c.sendPing()
}
}
}
```
Let's look at connection.run(), you can see that there are 2 goroutines are
response for sending data (c.internalSendRequest(req) and
c.internalWriteData(data)/c.sendPing()) with the same underlying cnx,
technically speaking, it is unsafe to do that, all the write calls of a cnx
should be synchronous.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]