gunli commented on code in PR #1333:
URL: https://github.com/apache/pulsar-client-go/pull/1333#discussion_r1960834782
##########
pulsar/producer_partition.go:
##########
@@ -581,7 +581,8 @@ func (p *partitionProducer) runEventsLoop() {
}
case connectionClosed := <-p.connectClosedCh:
p.log.Info("runEventsLoop will reconnect in producer")
- p.reconnectToBroker(connectionClosed)
+ // reconnect to broker in a new goroutine so that it
won't block the event loop, see issue #1332
+ go p.reconnectToBroker(connectionClosed)
Review Comment:
> Even with the check below, we can't be 100% sure of using the old
connection, because it's inherently concurrent.
>
> ```
> conn := p._getConn()
> if conn.Closed() {
> return
> }
> ```
I agree with that. Actually, I think that it is quite strange to pass a
buffer to a connection's channel when sending a message, as this causes the
buffer to be read and written by two different goroutines, leading to a data
race. What's worse is that if a buffer is sent into a connection's channel and
the connection is closed at the same time, the buffer ends up in an new
uncertain or pending state, we need to pay more attention to handling this
situation again, currently, this situation is not handled when the connection
is closed by network or by server notification
pb.BaseCommand_CLOSE_PRODUCER/pb.BaseCommand_CLOSE_CONSUMER, may be we need a
new PR to handle this.
In my opinion, it would be better to keep the message in the pending queue
and use timeout events and server ack events to determine whether a message has
timed out or succeeded.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]