gunli commented on code in PR #1333:
URL: https://github.com/apache/pulsar-client-go/pull/1333#discussion_r1961252474


##########
pulsar/producer_partition.go:
##########
@@ -581,7 +581,8 @@ func (p *partitionProducer) runEventsLoop() {
                        }
                case connectionClosed := <-p.connectClosedCh:
                        p.log.Info("runEventsLoop will reconnect in producer")
-                       p.reconnectToBroker(connectionClosed)
+                       // reconnect to broker in a new goroutine so that it 
won't block the event loop, see issue #1332
+                       go p.reconnectToBroker(connectionClosed)

Review Comment:
   ```go
   func (c *connection) run() {
        pingSendTicker := time.NewTicker(c.keepAliveInterval)
        pingCheckTicker := time.NewTicker(c.keepAliveInterval)
   
        defer func() {
                // stop tickers
                pingSendTicker.Stop()
                pingCheckTicker.Stop()
   
                // all the accesses to the pendingReqs should be happened in 
this run loop thread,
                // including the final cleanup, to avoid the issue
                // https://github.com/apache/pulsar-client-go/issues/239
                c.failPendingRequests(errConnectionClosed)
                c.Close()
        }()
   
        // All reads come from the reader goroutine
        go c.reader.readFromConnection()
        go c.runPingCheck(pingCheckTicker)
   
        c.log.Debugf("Connection run starting with request capacity=%d 
queued=%d",
                cap(c.incomingRequestsCh), len(c.incomingRequestsCh))
   
        go func() {
                for {
                        select {
                        case <-c.closeCh:
                                c.failLeftRequestsWhenClose()
                                return
   
                        case req := <-c.incomingRequestsCh:
                                if req == nil {
                                        return // TODO: this never gonna be 
happen
                                }
                                c.internalSendRequest(req)
                        }
                }
        }()
   
        for {
                select {
                case <-c.closeCh:
                        return
   
                case cmd := <-c.incomingCmdCh:
                        c.internalReceivedCommand(cmd.cmd, 
cmd.headersAndPayload)
                case data := <-c.writeRequestsCh:
                        if data == nil {
                                return
                        }
                        c.internalWriteData(data)
   
                case <-pingSendTicker.C:
                        c.sendPing()
                }
        }
   }
   ```
   Let's look at connection.run(), you can see that there are 2 goroutines are 
response for sending data (c.internalSendRequest(req) and 
c.internalWriteData(data)/c.sendPing()) with the same underlying cnx, 
technically speaking, it is unsafe to do that, all the write calls of a cnx 
should be synchronous.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to