This is an automated email from the ASF dual-hosted git repository.

rxl pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar-client-go.git


The following commit(s) were added to refs/heads/master by this push:
     new c03f45f  Fix deadlock when connection closed (#376)
c03f45f is described below

commit c03f45fe8191cffbad9f1f658b06bba8e8ddbd4b
Author: Jia Zhai <[email protected]>
AuthorDate: Fri Oct 9 17:11:13 2020 +0800

    Fix deadlock when connection closed (#376)
    
    Fixes #366
    
    ### Motivation
    
    In current code of `pulsar/internal/connection.go` we have 2 channels, 
closeCh and incomingRequestsCh. when the connection closes, the current mis-use 
of these 2 channels may have a deadlock.
    PR #366 has detailed steps to reproduce and the root cause 
[analysis](https://github.com/apache/pulsar-client-go/pull/366#issuecomment-696759873)
 .
    This PR tries to fix the deadlock.
    
    ### Modifications
    - make the close logic independent, not in the same loop of normal events 
handling.
    - when the connection closed, handle the existing requests in the channel 
and return an error to avoid deadlock.
    
    ### Verifying this change
    passed the tests in #366
    current ut passed
---
 go.mod                                 |  2 ++
 pulsar/consumer_partition.go           | 36 +++++++++++++++--------
 pulsar/consumer_regex_test.go          | 28 ++----------------
 pulsar/internal/commands.go            |  2 ++
 pulsar/internal/connection.go          | 54 ++++++++++++++++++++++------------
 pulsar/internal/lookup_service_test.go |  3 +-
 pulsar/internal/rpc_client.go          |  7 ++---
 pulsar/producer_partition.go           | 10 ++++---
 8 files changed, 78 insertions(+), 64 deletions(-)

diff --git a/go.mod b/go.mod
index 45b6241..3e41d06 100644
--- a/go.mod
+++ b/go.mod
@@ -11,6 +11,8 @@ require (
        github.com/inconshreveable/mousetrap v1.0.0 // indirect
        github.com/klauspost/compress v1.10.8
        github.com/kr/pretty v0.2.0 // indirect
+       github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd // 
indirect
+       github.com/modern-go/reflect2 v1.0.1 // indirect
        github.com/pierrec/lz4 v2.0.5+incompatible
        github.com/pkg/errors v0.9.1
        github.com/prometheus/client_golang v1.7.1
diff --git a/pulsar/consumer_partition.go b/pulsar/consumer_partition.go
index 9f555d2..8de3f38 100644
--- a/pulsar/consumer_partition.go
+++ b/pulsar/consumer_partition.go
@@ -150,10 +150,11 @@ type partitionConsumer struct {
        startMessageID  trackingMessageID
        lastDequeuedMsg trackingMessageID
 
-       eventsCh     chan interface{}
-       connectedCh  chan struct{}
-       closeCh      chan struct{}
-       clearQueueCh chan func(id trackingMessageID)
+       eventsCh        chan interface{}
+       connectedCh     chan struct{}
+       connectClosedCh chan connectionClosed
+       closeCh         chan struct{}
+       clearQueueCh    chan func(id trackingMessageID)
 
        nackTracker *negativeAcksTracker
        dlq         *dlqRouter
@@ -174,12 +175,13 @@ func newPartitionConsumer(parent Consumer, client 
*client, options *partitionCon
                name:                 options.consumerName,
                consumerID:           client.rpcClient.NewConsumerID(),
                partitionIdx:         int32(options.partitionIdx),
-               eventsCh:             make(chan interface{}, 3),
+               eventsCh:             make(chan interface{}, 10),
                queueSize:            int32(options.receiverQueueSize),
                queueCh:              make(chan []*message, 
options.receiverQueueSize),
                startMessageID:       options.startMessageID,
                connectedCh:          make(chan struct{}),
                messageCh:            messageCh,
+               connectClosedCh:      make(chan connectionClosed, 10),
                closeCh:              make(chan struct{}),
                clearQueueCh:         make(chan func(id trackingMessageID)),
                compressionProviders: 
make(map[pb.CompressionType]compression.Provider),
@@ -566,7 +568,8 @@ func (pc *partitionConsumer) messageShouldBeDiscarded(msgID 
trackingMessageID) b
 
 func (pc *partitionConsumer) ConnectionClosed() {
        // Trigger reconnection in the consumer goroutine
-       pc.eventsCh <- &connectionClosed{}
+       pc.log.Debug("connection closed and send to connectClosedCh")
+       pc.connectClosedCh <- connectionClosed{}
 }
 
 // Flow command gives additional permits to send messages to the consumer.
@@ -733,11 +736,22 @@ func (pc *partitionConsumer) runEventsLoop() {
        defer func() {
                pc.log.Debug("exiting events loop")
        }()
+       pc.log.Debug("get into runEventsLoop")
+
+       go func() {
+               for {
+                       select {
+                       case <-pc.closeCh:
+                               return
+                       case <-pc.connectClosedCh:
+                               pc.log.Debug("runEventsLoop will reconnect")
+                               pc.reconnectToBroker()
+                       }
+               }
+       }()
+
        for {
-               select {
-               case <-pc.closeCh:
-                       return
-               case i := <-pc.eventsCh:
+               for i := range pc.eventsCh {
                        switch v := i.(type) {
                        case *ackRequest:
                                pc.internalAck(v)
@@ -751,8 +765,6 @@ func (pc *partitionConsumer) runEventsLoop() {
                                pc.internalSeek(v)
                        case *seekByTimeRequest:
                                pc.internalSeekByTime(v)
-                       case *connectionClosed:
-                               pc.reconnectToBroker()
                        case *closeRequest:
                                pc.internalClose(v)
                                return
diff --git a/pulsar/consumer_regex_test.go b/pulsar/consumer_regex_test.go
index ed27373..228fc2d 100644
--- a/pulsar/consumer_regex_test.go
+++ b/pulsar/consumer_regex_test.go
@@ -176,23 +176,11 @@ func runRegexConsumerDiscoverPatternAll(t *testing.T, c 
Client, namespace string
        if err != nil {
                t.Fatal(err)
        }
-
        rc.discover()
-       time.Sleep(300 * time.Millisecond)
+       time.Sleep(2000 * time.Millisecond)
 
        consumers = cloneConsumers(rc)
        assert.Equal(t, 1, len(consumers))
-
-       // delete the topic
-       if err := deleteTopic(topic); err != nil {
-               t.Fatal(err)
-       }
-
-       rc.discover()
-       time.Sleep(300 * time.Millisecond)
-
-       consumers = cloneConsumers(rc)
-       assert.Equal(t, 0, len(consumers))
 }
 
 func runRegexConsumerDiscoverPatternFoo(t *testing.T, c Client, namespace 
string) {
@@ -228,7 +216,7 @@ func runRegexConsumerDiscoverPatternFoo(t *testing.T, c 
Client, namespace string
        defer deleteTopic(myTopic)
 
        rc.discover()
-       time.Sleep(300 * time.Millisecond)
+       time.Sleep(2000 * time.Millisecond)
 
        consumers = cloneConsumers(rc)
        assert.Equal(t, 0, len(consumers))
@@ -241,20 +229,10 @@ func runRegexConsumerDiscoverPatternFoo(t *testing.T, c 
Client, namespace string
        }
 
        rc.discover()
-       time.Sleep(300 * time.Millisecond)
+       time.Sleep(2000 * time.Millisecond)
 
        consumers = cloneConsumers(rc)
        assert.Equal(t, 1, len(consumers))
-
-       // delete the topic
-       err = deleteTopic(fooTopic)
-       assert.Nil(t, err)
-
-       rc.discover()
-       time.Sleep(300 * time.Millisecond)
-
-       consumers = cloneConsumers(rc)
-       assert.Equal(t, 0, len(consumers))
 }
 
 func TestRegexConsumer(t *testing.T) {
diff --git a/pulsar/internal/commands.go b/pulsar/internal/commands.go
index 2536bae..a4d5e5f 100644
--- a/pulsar/internal/commands.go
+++ b/pulsar/internal/commands.go
@@ -45,6 +45,8 @@ var ErrCorruptedMessage = errors.New("corrupted message")
 // ErrEOM is the error returned by ReadMessage when no more input is available.
 var ErrEOM = errors.New("EOF")
 
+var ErrConnectionClosed = errors.New("connection closed")
+
 func NewMessageReader(headersAndPayload Buffer) *MessageReader {
        return &MessageReader{
                buffer: headersAndPayload,
diff --git a/pulsar/internal/connection.go b/pulsar/internal/connection.go
index aff263b..7270c30 100644
--- a/pulsar/internal/connection.go
+++ b/pulsar/internal/connection.go
@@ -89,7 +89,7 @@ type ConnectionListener interface {
 // Connection is a interface of client cnx.
 type Connection interface {
        SendRequest(requestID uint64, req *pb.BaseCommand, callback 
func(*pb.BaseCommand, error))
-       SendRequestNoWait(req *pb.BaseCommand)
+       SendRequestNoWait(req *pb.BaseCommand) error
        WriteData(data Buffer)
        RegisterListener(id uint64, listener ConnectionListener)
        UnregisterListener(id uint64)
@@ -110,21 +110,15 @@ type ConsumerHandler interface {
 type connectionState int32
 
 const (
-       connectionInit         = 0
-       connectionConnecting   = 1
-       connectionTCPConnected = 2
-       connectionReady        = 3
-       connectionClosed       = 4
+       connectionInit   = 0
+       connectionReady  = 1
+       connectionClosed = 2
 )
 
 func (s connectionState) String() string {
        switch s {
        case connectionInit:
                return "Initializing"
-       case connectionConnecting:
-               return "Connecting"
-       case connectionTCPConnected:
-               return "TCPConnected"
        case connectionReady:
                return "Ready"
        case connectionClosed:
@@ -286,8 +280,6 @@ func (c *connection) connect() bool {
        c.log.Info("TCP connection established")
        c.Unlock()
 
-       c.changeState(connectionTCPConnected)
-
        return true
 }
 
@@ -358,11 +350,20 @@ func (c *connection) waitUntilReady() error {
        return nil
 }
 
+func (c *connection) failLeftRequestsWhenClose() {
+       for req := range c.incomingRequestsCh {
+               c.internalSendRequest(req)
+       }
+       close(c.incomingRequestsCh)
+}
+
 func (c *connection) run() {
        // All reads come from the reader goroutine
        go c.reader.readFromConnection()
        go c.runPingCheck()
 
+       c.log.Debugf("Connection run start channel %+v, requestLength %d", c, 
len(c.incomingRequestsCh))
+
        defer func() {
                // all the accesses to the pendingReqs should be happened in 
this run loop thread,
                // including the final cleanup, to avoid the issue 
https://github.com/apache/pulsar-client-go/issues/239
@@ -379,6 +380,7 @@ func (c *connection) run() {
                for {
                        select {
                        case <-c.closeCh:
+                               c.failLeftRequestsWhenClose()
                                return
 
                        case req := <-c.incomingRequestsCh:
@@ -563,19 +565,28 @@ func (c *connection) Write(data Buffer) {
 
 func (c *connection) SendRequest(requestID uint64, req *pb.BaseCommand,
        callback func(command *pb.BaseCommand, err error)) {
-       c.incomingRequestsCh <- &request{
-               id:       &requestID,
-               cmd:      req,
-               callback: callback,
+       if c.state == connectionClosed {
+               callback(req, ErrConnectionClosed)
+       } else {
+               c.incomingRequestsCh <- &request{
+                       id:       &requestID,
+                       cmd:      req,
+                       callback: callback,
+               }
        }
 }
 
-func (c *connection) SendRequestNoWait(req *pb.BaseCommand) {
+func (c *connection) SendRequestNoWait(req *pb.BaseCommand) error {
+       if c.state == connectionClosed {
+               return ErrConnectionClosed
+       }
+
        c.incomingRequestsCh <- &request{
                id:       nil,
                cmd:      req,
                callback: nil,
        }
+       return nil
 }
 
 func (c *connection) internalSendRequest(req *request) {
@@ -584,7 +595,14 @@ func (c *connection) internalSendRequest(req *request) {
                c.pendingReqs[*req.id] = req
        }
        c.pendingLock.Unlock()
-       c.writeCommand(req.cmd)
+       if c.state == connectionClosed {
+               c.log.Warnf("internalSendRequest failed for connectionClosed")
+               if req.callback != nil {
+                       req.callback(req.cmd, ErrConnectionClosed)
+               }
+       } else {
+               c.writeCommand(req.cmd)
+       }
 }
 
 func (c *connection) handleResponse(requestID uint64, response 
*pb.BaseCommand) {
diff --git a/pulsar/internal/lookup_service_test.go 
b/pulsar/internal/lookup_service_test.go
index fa22f18..aea9356 100644
--- a/pulsar/internal/lookup_service_test.go
+++ b/pulsar/internal/lookup_service_test.go
@@ -99,8 +99,9 @@ func (c *mockedRPCClient) RequestOnCnx(cnx Connection, 
requestID uint64, cmdType
        return nil, nil
 }
 
-func (c *mockedRPCClient) RequestOnCnxNoWait(cnx Connection, cmdType 
pb.BaseCommand_Type, message proto.Message) {
+func (c *mockedRPCClient) RequestOnCnxNoWait(cnx Connection, cmdType 
pb.BaseCommand_Type, message proto.Message) error {
        assert.Fail(c.t, "Shouldn't be called")
+       return nil
 }
 
 func responseType(r pb.CommandLookupTopicResponse_LookupType) 
*pb.CommandLookupTopicResponse_LookupType {
diff --git a/pulsar/internal/rpc_client.go b/pulsar/internal/rpc_client.go
index c7d810a..f53c16b 100644
--- a/pulsar/internal/rpc_client.go
+++ b/pulsar/internal/rpc_client.go
@@ -59,7 +59,7 @@ type RPCClient interface {
        Request(logicalAddr *url.URL, physicalAddr *url.URL, requestID uint64,
                cmdType pb.BaseCommand_Type, message proto.Message) 
(*RPCResult, error)
 
-       RequestOnCnxNoWait(cnx Connection, cmdType pb.BaseCommand_Type, message 
proto.Message)
+       RequestOnCnxNoWait(cnx Connection, cmdType pb.BaseCommand_Type, message 
proto.Message) error
 
        RequestOnCnx(cnx Connection, requestID uint64, cmdType 
pb.BaseCommand_Type, message proto.Message) (*RPCResult, error)
 }
@@ -103,7 +103,6 @@ func (c *rpcClient) Request(logicalAddr *url.URL, 
physicalAddr *url.URL, request
        }
        ch := make(chan Res, 10)
 
-       // TODO: in here, the error of callback always nil
        cnx.SendRequest(requestID, baseCommand(cmdType, message), func(response 
*pb.BaseCommand, err error) {
                ch <- Res{&RPCResult{
                        Cnx:      cnx,
@@ -162,9 +161,9 @@ func (c *rpcClient) RequestOnCnx(cnx Connection, requestID 
uint64, cmdType pb.Ba
        return rpcResult, rpcErr
 }
 
-func (c *rpcClient) RequestOnCnxNoWait(cnx Connection, cmdType 
pb.BaseCommand_Type, message proto.Message) {
+func (c *rpcClient) RequestOnCnxNoWait(cnx Connection, cmdType 
pb.BaseCommand_Type, message proto.Message) error {
        rpcRequestCount.Inc()
-       cnx.SendRequestNoWait(baseCommand(cmdType, message))
+       return cnx.SendRequestNoWait(baseCommand(cmdType, message))
 }
 
 func (c *rpcClient) NewRequestID() uint64 {
diff --git a/pulsar/producer_partition.go b/pulsar/producer_partition.go
index e8cf0f7..d245c33 100644
--- a/pulsar/producer_partition.go
+++ b/pulsar/producer_partition.go
@@ -101,6 +101,8 @@ type partitionProducer struct {
        // Channel where app is posting messages to be published
        eventsChan chan interface{}
 
+       connectClosedCh chan connectionClosed
+
        publishSemaphore internal.Semaphore
        pendingQueue     internal.BlockingQueue
        lastSequenceID   int64
@@ -133,6 +135,7 @@ func newPartitionProducer(client *client, topic string, 
options *ProducerOptions
                options:          options,
                producerID:       client.rpcClient.NewProducerID(),
                eventsChan:       make(chan interface{}, maxPendingMessages),
+               connectClosedCh:  make(chan connectionClosed, 10),
                batchFlushTicker: time.NewTicker(batchingMaxPublishDelay),
                publishSemaphore: 
internal.NewSemaphore(int32(maxPendingMessages)),
                pendingQueue:     internal.NewBlockingQueue(maxPendingMessages),
@@ -236,7 +239,7 @@ func (p *partitionProducer) GetBuffer() internal.Buffer {
 func (p *partitionProducer) ConnectionClosed() {
        // Trigger reconnection in the produce goroutine
        p.log.WithField("cnx", p.cnx.ID()).Warn("Connection was closed")
-       p.eventsChan <- &connectionClosed{}
+       p.connectClosedCh <- connectionClosed{}
 }
 
 func (p *partitionProducer) reconnectToBroker() {
@@ -267,15 +270,14 @@ func (p *partitionProducer) runEventsLoop() {
                        switch v := i.(type) {
                        case *sendRequest:
                                p.internalSend(v)
-                       case *connectionClosed:
-                               p.reconnectToBroker()
                        case *flushRequest:
                                p.internalFlush(v)
                        case *closeProducer:
                                p.internalClose(v)
                                return
                        }
-
+               case <-p.connectClosedCh:
+                       p.reconnectToBroker()
                case <-p.batchFlushTicker.C:
                        p.internalFlushCurrentBatch()
                }

Reply via email to