Copilot commented on code in PR #1484:
URL: https://github.com/apache/pulsar-client-go/pull/1484#discussion_r3238625756
##########
pulsar/internal/connection.go:
##########
@@ -373,31 +373,21 @@ func (c *connection) waitUntilReady() error {
}
func (c *connection) failLeftRequestsWhenClose() {
- // wait for outstanding incoming requests to complete before draining
- // and closing the channel
c.incomingRequestsWG.Wait()
- ch := c.incomingRequestsCh
- go func() {
- // send a nil message to drain instead of
- // closing the channel and causing a potential panic
- //
- // if other requests come in after the nil message
- // then the RPC client will time out
- ch <- nil
- c.writeRequestsCh <- nil
- }()
- for req := range ch {
- if nil == req {
- break // we have drained the requests
- }
- c.internalSendRequest(req)
- }
- for req := range c.writeRequestsCh {
- if nil == req {
- break
+ for {
+ select {
+ case req := <-c.incomingRequestsCh:
+ if req != nil && req.callback != nil {
+ req.callback(req.cmd, ErrConnectionClosed)
+ }
+ case req := <-c.writeRequestsCh:
+ if req != nil {
+ req.data.Release()
+ }
+ default:
Review Comment:
The non-blocking drain also drains `writeRequestsCh`, but
`incomingRequestsWG` only tracks `SendRequest`/`SendRequestNoWait`; `WriteData`
can still enqueue a `dataRequest` after this select falls through to `default`
because a send on the buffered channel can win the race with the closed
`closeCh`. Once `run` returns that buffer will never be processed or released.
Either include writes in the same close/drain synchronization or make
`WriteData` unable to enqueue after close before relying on a non-blocking
drain.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]