This is an automated email from the ASF dual-hosted git repository.

rxl pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar-client-go.git


The following commit(s) were added to refs/heads/master by this push:
     new 8139a2c  Fix panic() in internal/connection when writing to a closed 
channel during close (#539)
8139a2c is described below

commit 8139a2c721d0d9f66727fac16f3705bc254d9017
Author: dferstay <[email protected]>
AuthorDate: Wed Jun 30 06:49:46 2021 -0700

    Fix panic() in internal/connection when writing to a closed channel during 
close (#539)
    
    The race is as follows:
    T1 - calls SendRequestNoWait(), checks the connection state, and prepares
         to enter the select statement
    T2 - calls TriggerClose() closes cnx and the closeCh
    T3 - run() go-routine for processing incomingRequestsCh drops into
         case <-closeCh: and calls failLeftRequestsWhenClose() which drains
         and closes incomingRequestsCh
    T1 - resumes and drops into the select where both closeCh and
         incomingRequestsCh are closed.
    
    When two cases of a `select` are valid, the case executed is chosen at
    random; see https://tour.golang.org/concurrency/5
    
    This commit introduces a connectionClosing state and a wait group to track
    writes by the SendRequest() methods.
    * TriggerClose() moves the connection into the connectionClosing state
      before the closeCh is closed.
    * The failLeftRequestsWhenClosed() method waits on the waitgroup for
      outstanding SendRequest() methods to complete before it closes
      the incomingRequestsCh
    * The SendRequest() methods first add to the waitgroup before checking the
      connection state; if the state is either closing or closed, SendRequest()
      returns an error.
    
    With the above it is not possible for thread to attempt to add a request
    to the incomingRequestsCh without being tracked by the waitgroup, and the
    incomingRequestsCh will not be closed until operations tracked by the
    waitgroup have completed.
    
    Signed-off-by: Daniel Ferstay <[email protected]>
    
    Co-authored-by: Daniel Ferstay <[email protected]>
    Co-authored-by: xiaolongran <[email protected]>
---
 pulsar/internal/connection.go | 27 ++++++++++++++++++++++++---
 1 file changed, 24 insertions(+), 3 deletions(-)

diff --git a/pulsar/internal/connection.go b/pulsar/internal/connection.go
index 1e3ce02..70c1468 100644
--- a/pulsar/internal/connection.go
+++ b/pulsar/internal/connection.go
@@ -89,6 +89,7 @@ type connectionState int32
 const (
        connectionInit = iota
        connectionReady
+       connectionClosing
        connectionClosed
 )
 
@@ -98,6 +99,8 @@ func (s connectionState) String() string {
                return "Initializing"
        case connectionReady:
                return "Ready"
+       case connectionClosing:
+               return "Closing"
        case connectionClosed:
                return "Closed"
        default:
@@ -142,6 +145,7 @@ type connection struct {
 
        requestIDGenerator uint64
 
+       incomingRequestsWG sync.WaitGroup
        incomingRequestsCh chan *request
        incomingCmdCh      chan *incomingCmd
        closeCh            chan interface{}
@@ -333,10 +337,15 @@ func (c *connection) waitUntilReady() error {
 }
 
 func (c *connection) failLeftRequestsWhenClose() {
+       // wait for outstanding incoming requests to complete before draining
+       // and closing the channel
+       c.incomingRequestsWG.Wait()
+
        reqLen := len(c.incomingRequestsCh)
        for i := 0; i < reqLen; i++ {
                c.internalSendRequest(<-c.incomingRequestsCh)
        }
+
        close(c.incomingRequestsCh)
 }
 
@@ -549,8 +558,13 @@ func (c *connection) Write(data Buffer) {
 
 func (c *connection) SendRequest(requestID uint64, req *pb.BaseCommand,
        callback func(command *pb.BaseCommand, err error)) {
-       if c.getState() == connectionClosed {
+       c.incomingRequestsWG.Add(1)
+       defer c.incomingRequestsWG.Done()
+
+       state := c.getState()
+       if state == connectionClosed || state == connectionClosing {
                callback(req, ErrConnectionClosed)
+
        } else {
                select {
                case <-c.closeCh:
@@ -566,7 +580,11 @@ func (c *connection) SendRequest(requestID uint64, req 
*pb.BaseCommand,
 }
 
 func (c *connection) SendRequestNoWait(req *pb.BaseCommand) error {
-       if c.getState() == connectionClosed {
+       c.incomingRequestsWG.Add(1)
+       defer c.incomingRequestsWG.Done()
+
+       state := c.getState()
+       if state == connectionClosed || state == connectionClosing {
                return ErrConnectionClosed
        }
 
@@ -760,6 +778,8 @@ func (c *connection) UnregisterListener(id uint64) {
 // broadcasting the notification on the close channel
 func (c *connection) TriggerClose() {
        c.closeOnce.Do(func() {
+               c.setState(connectionClosing)
+
                cnx := c.cnx
                if cnx != nil {
                        cnx.Close()
@@ -780,9 +800,10 @@ func (c *connection) Close() {
        }
 
        c.log.Info("Connection closed")
+       c.TriggerClose()
        // do not use changeState() since they share the same lock
        c.setState(connectionClosed)
-       c.TriggerClose()
+
        c.pingTicker.Stop()
        c.pingCheckTicker.Stop()
 

Reply via email to