dferstay commented on a change in pull request #539:
URL: https://github.com/apache/pulsar-client-go/pull/539#discussion_r650127814



##########
File path: pulsar/internal/connection.go
##########
@@ -331,10 +335,15 @@ func (c *connection) waitUntilReady() error {
 }
 
 func (c *connection) failLeftRequestsWhenClose() {
+       // wait for outstanding incoming requests to complete before draining
+       // and closing the channel
+       c.incomingRequestsWG.Wait()

Review comment:
       @cckellogg ,
   At this point we are sure that the closeCh is closed and that the state of 
the connection is either connectionClosing or connectionClosed.  We wait to be 
sure that there will be no further writes to the incomingRequestsCh before 
attempting to drain and close it.  It is possible for the sending go-routine to 
be executing either of the following lines at the time this function is 
executed:
   - 
https://github.com/apache/pulsar-client-go/blob/master/pulsar/internal/connection.go#L552
   - 
https://github.com/apache/pulsar-client-go/blob/master/pulsar/internal/connection.go#L570
 
   
   As mentioned in the motivation for this MR:
   
   > When two cases of a select are valid, the case executed is chosen at 
random; see https://tour.golang.org/concurrency/5
   
   Whenever I see a channel being closed by a reader go-routine I am suspicious 
of writers attempting to write to the channel after it is closed.

##########
File path: pulsar/internal/connection.go
##########
@@ -546,8 +555,13 @@ func (c *connection) Write(data Buffer) {
 
 func (c *connection) SendRequest(requestID uint64, req *pb.BaseCommand,
        callback func(command *pb.BaseCommand, err error)) {
-       if c.getState() == connectionClosed {
+       c.incomingRequestsWG.Add(1)

Review comment:
       As mentioned 
https://github.com/apache/pulsar-client-go/pull/539#discussion_r650127814 we 
need to track all writes to the incomingRequestsCh




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to