This is an automated email from the ASF dual-hosted git repository.

rxl pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar-client-go.git


The following commit(s) were added to refs/heads/master by this push:
     new 25f3075  Fix unsubscribe blocked when consumer is closing or has 
closed (#457)
25f3075 is described below

commit 25f3075176f853f5f09d89b3b625e0646432735f
Author: wuYin <[email protected]>
AuthorDate: Tue Feb 9 10:43:50 2021 +0800

    Fix unsubscribe blocked when consumer is closing or has closed (#457)
    
    ### Motivation
    
    For the present consumer, `Close()` and `Unsubscribe()` handled by the same 
eventloop goroutine.
    The eventloop exited after `Close()`, then unsubscribe event wouldn't be 
selected and handled anymore, lead to block.
    
    example:
    ```go
    func main() {
        client, err := pulsar.NewClient(pulsar.ClientOptions{URL: 
"pulsar://localhost:6650"})
        if err != nil {
                log.Fatal(err)
        }
        defer client.Close()
    
        consumer, err := client.Subscribe(pulsar.ConsumerOptions{
                Topic:            "topic-1",
                SubscriptionName: "my-sub",
        })
        if err != nil {
                log.Fatal(err)
        }
    
        defer consumer.Unsubscribe() // unintentional
        defer consumer.Close()
    }
    ```
    
    `Unsubscribe()` blocked:
    
    
![image](https://user-images.githubusercontent.com/24536920/106294060-ab5d6b80-6289-11eb-913c-85e1d18467a0.png)
    
    
    
    
    ### Modifications
    
    Check consumer state before send unsubscribe event, if consumer is closing 
or has closed, just logging it
    
    ### Verifying this change
    
    - [x] Make sure that the change passes the CI checks.
---
 pulsar/consumer_partition.go | 31 +++++++++++++++++++++++++------
 1 file changed, 25 insertions(+), 6 deletions(-)

diff --git a/pulsar/consumer_partition.go b/pulsar/consumer_partition.go
index b9c9b12..b75a7d7 100644
--- a/pulsar/consumer_partition.go
+++ b/pulsar/consumer_partition.go
@@ -47,6 +47,21 @@ const (
        consumerClosed
 )
 
+func (s consumerState) String() string {
+       switch s {
+       case consumerInit:
+               return "Initializing"
+       case consumerReady:
+               return "Ready"
+       case consumerClosing:
+               return "Closing"
+       case consumerClosed:
+               return "Closed"
+       default:
+               return "Unknown"
+       }
+}
+
 type subscriptionMode int
 
 const (
@@ -195,6 +210,11 @@ func newPartitionConsumer(parent Consumer, client *client, 
options *partitionCon
 }
 
 func (pc *partitionConsumer) Unsubscribe() error {
+       if state := pc.getConsumerState(); state == consumerClosed || state == 
consumerClosing {
+               pc.log.WithField("state", state).Error("Failed to unsubscribe 
closing or closed consumer")
+               return nil
+       }
+
        req := &unsubscribeRequest{doneCh: make(chan struct{})}
        pc.eventsCh <- req
 
@@ -206,9 +226,8 @@ func (pc *partitionConsumer) Unsubscribe() error {
 func (pc *partitionConsumer) internalUnsubscribe(unsub *unsubscribeRequest) {
        defer close(unsub.doneCh)
 
-       state := pc.getConsumerState()
-       if state == consumerClosed || state == consumerClosing {
-               pc.log.Error("Failed to unsubscribe consumer, the consumer is 
closing or consumer has been closed")
+       if state := pc.getConsumerState(); state == consumerClosed || state == 
consumerClosing {
+               pc.log.WithField("state", state).Error("Failed to unsubscribe 
closing or closed consumer")
                return
        }
 
@@ -354,7 +373,7 @@ func (pc *partitionConsumer) internalSeek(seek 
*seekRequest) {
 func (pc *partitionConsumer) requestSeek(msgID messageID) error {
        state := pc.getConsumerState()
        if state == consumerClosing || state == consumerClosed {
-               pc.log.Error("Consumer was already closed")
+               pc.log.WithField("state", state).Error("Consumer is closing or 
has closed")
                return nil
        }
 
@@ -398,7 +417,7 @@ func (pc *partitionConsumer) internalSeekByTime(seek 
*seekByTimeRequest) {
 
        state := pc.getConsumerState()
        if state == consumerClosing || state == consumerClosed {
-               pc.log.Error("Consumer was already closed")
+               pc.log.WithField("state", pc.state).Error("Consumer is closing 
or has closed")
                return
        }
 
@@ -798,7 +817,7 @@ func (pc *partitionConsumer) internalClose(req 
*closeRequest) {
        }
 
        if state == consumerClosed || state == consumerClosing {
-               pc.log.Error("The consumer is closing or has been closed")
+               pc.log.WithField("state", state).Error("Consumer is closing or 
has closed")
                if pc.nackTracker != nil {
                        pc.nackTracker.Close()
                }

Reply via email to