This is an automated email from the ASF dual-hosted git repository.
rxl pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar-client-go.git
The following commit(s) were added to refs/heads/master by this push:
new 8139a2c Fix panic() in internal/connection when writing to a closed
channel during close (#539)
8139a2c is described below
commit 8139a2c721d0d9f66727fac16f3705bc254d9017
Author: dferstay <[email protected]>
AuthorDate: Wed Jun 30 06:49:46 2021 -0700
Fix panic() in internal/connection when writing to a closed channel during
close (#539)
The race is as follows:
T1 - calls SendRequestNoWait(), checks the connection state, and prepares
to enter the select statement
T2 - calls TriggerClose() closes cnx and the closeCh
T3 - run() go-routine for processing incomingRequestsCh drops into
case <-closeCh: and calls failLeftRequestsWhenClose() which drains
and closes incomingRequestsCh
T1 - resumes and drops into the select where both closeCh and
incomingRequestsCh are closed.
When two cases of a `select` are valid, the case executed is chosen at
random; see https://tour.golang.org/concurrency/5
This commit introduces a connectionClosing state and a wait group to track
writes by the SendRequest() methods.
* TriggerClose() moves the connection into the connectionClosing state
before the closeCh is closed.
* The failLeftRequestsWhenClosed() method waits on the waitgroup for
outstanding SendRequest() methods to complete before it closes
the incomingRequestsCh
* The SendRequest() methods first add to the waitgroup before checking the
connection state; if the state is either closing or closed, SendRequest()
returns an error.
With the above it is not possible for thread to attempt to add a request
to the incomingRequestsCh without being tracked by the waitgroup, and the
incomingRequestsCh will not be closed until operations tracked by the
waitgroup have completed.
Signed-off-by: Daniel Ferstay <[email protected]>
Co-authored-by: Daniel Ferstay <[email protected]>
Co-authored-by: xiaolongran <[email protected]>
---
pulsar/internal/connection.go | 27 ++++++++++++++++++++++++---
1 file changed, 24 insertions(+), 3 deletions(-)
diff --git a/pulsar/internal/connection.go b/pulsar/internal/connection.go
index 1e3ce02..70c1468 100644
--- a/pulsar/internal/connection.go
+++ b/pulsar/internal/connection.go
@@ -89,6 +89,7 @@ type connectionState int32
const (
connectionInit = iota
connectionReady
+ connectionClosing
connectionClosed
)
@@ -98,6 +99,8 @@ func (s connectionState) String() string {
return "Initializing"
case connectionReady:
return "Ready"
+ case connectionClosing:
+ return "Closing"
case connectionClosed:
return "Closed"
default:
@@ -142,6 +145,7 @@ type connection struct {
requestIDGenerator uint64
+ incomingRequestsWG sync.WaitGroup
incomingRequestsCh chan *request
incomingCmdCh chan *incomingCmd
closeCh chan interface{}
@@ -333,10 +337,15 @@ func (c *connection) waitUntilReady() error {
}
func (c *connection) failLeftRequestsWhenClose() {
+ // wait for outstanding incoming requests to complete before draining
+ // and closing the channel
+ c.incomingRequestsWG.Wait()
+
reqLen := len(c.incomingRequestsCh)
for i := 0; i < reqLen; i++ {
c.internalSendRequest(<-c.incomingRequestsCh)
}
+
close(c.incomingRequestsCh)
}
@@ -549,8 +558,13 @@ func (c *connection) Write(data Buffer) {
func (c *connection) SendRequest(requestID uint64, req *pb.BaseCommand,
callback func(command *pb.BaseCommand, err error)) {
- if c.getState() == connectionClosed {
+ c.incomingRequestsWG.Add(1)
+ defer c.incomingRequestsWG.Done()
+
+ state := c.getState()
+ if state == connectionClosed || state == connectionClosing {
callback(req, ErrConnectionClosed)
+
} else {
select {
case <-c.closeCh:
@@ -566,7 +580,11 @@ func (c *connection) SendRequest(requestID uint64, req
*pb.BaseCommand,
}
func (c *connection) SendRequestNoWait(req *pb.BaseCommand) error {
- if c.getState() == connectionClosed {
+ c.incomingRequestsWG.Add(1)
+ defer c.incomingRequestsWG.Done()
+
+ state := c.getState()
+ if state == connectionClosed || state == connectionClosing {
return ErrConnectionClosed
}
@@ -760,6 +778,8 @@ func (c *connection) UnregisterListener(id uint64) {
// broadcasting the notification on the close channel
func (c *connection) TriggerClose() {
c.closeOnce.Do(func() {
+ c.setState(connectionClosing)
+
cnx := c.cnx
if cnx != nil {
cnx.Close()
@@ -780,9 +800,10 @@ func (c *connection) Close() {
}
c.log.Info("Connection closed")
+ c.TriggerClose()
// do not use changeState() since they share the same lock
c.setState(connectionClosed)
- c.TriggerClose()
+
c.pingTicker.Stop()
c.pingCheckTicker.Stop()