This is an automated email from the ASF dual-hosted git repository.

rxl pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar-client-go.git


The following commit(s) were added to refs/heads/master by this push:
     new ba30600  Fix consumer not found (#196)
ba30600 is described below

commit ba306000b0dc0f9b9b983e6f5937b5abc950f05a
Author: 冉小龙 <[email protected]>
AuthorDate: Thu Mar 19 00:39:56 2020 +0800

    Fix consumer not found (#196)
    
    * Fix consumer not found
    
    Signed-off-by: xiaolong.ran <[email protected]>
    
    * fix ci error
    
    Signed-off-by: xiaolong.ran <[email protected]>
---
 .github/workflows/go.yml     |  2 +-
 pulsar/consumer_partition.go | 23 ++++++++++++++++++++++-
 2 files changed, 23 insertions(+), 2 deletions(-)

diff --git a/.github/workflows/go.yml b/.github/workflows/go.yml
index 81745e9..8682423 100644
--- a/.github/workflows/go.yml
+++ b/.github/workflows/go.yml
@@ -14,7 +14,7 @@ jobs:
       id: go
 
     - name: Check out code into the Go module directory
-      uses: actions/checkout@v1
+      uses: actions/checkout@v2
 
     - name: Test
       run: |
diff --git a/pulsar/consumer_partition.go b/pulsar/consumer_partition.go
index e624d82..410e1ad 100644
--- a/pulsar/consumer_partition.go
+++ b/pulsar/consumer_partition.go
@@ -168,6 +168,12 @@ func (pc *partitionConsumer) Unsubscribe() error {
 func (pc *partitionConsumer) internalUnsubscribe(unsub *unsubscribeRequest) {
        defer close(unsub.doneCh)
 
+       if pc.state == consumerClosed || pc.state == consumerClosing {
+               pc.log.Error("Failed to unsubscribe consumer, the consumer is 
closing or consumer has been closed")
+               return
+       }
+
+       pc.state = consumerClosing
        requestID := pc.client.rpcClient.NewRequestID()
        cmdUnsubscribe := &pb.CommandUnsubscribe{
                RequestId:  proto.Uint64(requestID),
@@ -180,6 +186,11 @@ func (pc *partitionConsumer) internalUnsubscribe(unsub 
*unsubscribeRequest) {
        }
 
        pc.conn.DeleteConsumeHandler(pc.consumerID)
+       if pc.nackTracker != nil {
+               pc.nackTracker.Close()
+       }
+       pc.log.Infof("The consumer[%d] successfully unsubscribed", 
pc.consumerID)
+       pc.state = consumerClosed
 }
 
 func (pc *partitionConsumer) getLastMessageID() (*messageID, error) {
@@ -648,6 +659,14 @@ func (pc *partitionConsumer) internalClose(req 
*closeRequest) {
                return
        }
 
+       if pc.state == consumerClosed || pc.state == consumerClosing {
+               pc.log.Error("The consumer is closing or has been closed")
+               if pc.nackTracker != nil {
+                       pc.nackTracker.Close()
+               }
+               return
+       }
+
        pc.state = consumerClosing
        pc.log.Infof("Closing consumer=%d", pc.consumerID)
 
@@ -665,7 +684,9 @@ func (pc *partitionConsumer) internalClose(req 
*closeRequest) {
 
        pc.state = consumerClosed
        pc.conn.DeleteConsumeHandler(pc.consumerID)
-       pc.nackTracker.Close()
+       if pc.nackTracker != nil {
+               pc.nackTracker.Close()
+       }
        close(pc.closeCh)
 }
 

Reply via email to