This is an automated email from the ASF dual-hosted git repository.

mmerli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar-client-go.git


The following commit(s) were added to refs/heads/master by this push:
     new a6231f5  [Issue 140] Consumer should not block on received if closed. 
(#142)
a6231f5 is described below

commit a6231f5a7c47ef824b0c9f35c4d68243b139106f
Author: cckellogg <[email protected]>
AuthorDate: Mon Dec 23 14:43:51 2019 -0800

    [Issue 140] Consumer should not block on received if closed. (#142)
    
    * [Issue 140] Consumer should not block on received if closed.
    
    * [Issue #140] Consumer should not block on received if closed.
---
 pulsar/consumer_impl.go | 29 ++++++++++++++++++-----------
 pulsar/consumer_test.go | 33 +++++++++++++++++++++++++++++++++
 2 files changed, 51 insertions(+), 11 deletions(-)

diff --git a/pulsar/consumer_impl.go b/pulsar/consumer_impl.go
index 61c27a7..4a1f85e 100644
--- a/pulsar/consumer_impl.go
+++ b/pulsar/consumer_impl.go
@@ -48,8 +48,9 @@ type consumer struct {
        // channel used to deliver message to clients
        messageCh chan ConsumerMessage
 
-       closeCh chan struct{}
-       errorCh chan error
+       closeOnce sync.Once
+       closeCh   chan struct{}
+       errorCh   chan error
 
        log *log.Entry
 }
@@ -116,6 +117,7 @@ func internalTopicSubscribe(client *client, options 
ConsumerOptions, topic strin
        consumer := &consumer{
                options:   options,
                messageCh: messageCh,
+               closeCh:   make(chan struct{}),
                errorCh:   make(chan error),
                log:       log.WithField("topic", topic),
        }
@@ -226,6 +228,8 @@ func (c *consumer) Unsubscribe() error {
 func (c *consumer) Receive(ctx context.Context) (message Message, err error) {
        for {
                select {
+               case <-c.closeCh:
+                       return nil, ErrConsumerClosed
                case cm, ok := <-c.messageCh:
                        if !ok {
                                return nil, ErrConsumerClosed
@@ -298,15 +302,18 @@ func (c *consumer) NackID(msgID MessageID) {
 }
 
 func (c *consumer) Close() {
-       var wg sync.WaitGroup
-       for i := range c.consumers {
-               wg.Add(1)
-               go func(pc *partitionConsumer) {
-                       defer wg.Done()
-                       pc.Close()
-               }(c.consumers[i])
-       }
-       wg.Wait()
+       c.closeOnce.Do(func() {
+               var wg sync.WaitGroup
+               for i := range c.consumers {
+                       wg.Add(1)
+                       go func(pc *partitionConsumer) {
+                               defer wg.Done()
+                               pc.Close()
+                       }(c.consumers[i])
+               }
+               wg.Wait()
+               close(c.closeCh)
+       })
 }
 
 var r = &random{
diff --git a/pulsar/consumer_test.go b/pulsar/consumer_test.go
index 11401ba..9bc2bd6 100644
--- a/pulsar/consumer_test.go
+++ b/pulsar/consumer_test.go
@@ -794,3 +794,36 @@ func TestConsumerMetadata(t *testing.T) {
                assert.Equal(t, v, mv)
        }
 }
+
+// Test for issue #140
+// Don't block on receive if the consumer has been closed
+func TestConsumerReceiveErrAfterClose(t *testing.T) {
+       client, err := NewClient(ClientOptions{
+               URL: lookupURL,
+       })
+       if err != nil {
+               t.Fatal(err)
+       }
+       defer client.Close()
+
+       topicName := newTopicName()
+       consumer, err := client.Subscribe(ConsumerOptions{
+               Topic:            topicName,
+               SubscriptionName: "my-sub",
+       })
+       if err != nil {
+               t.Fatal(err)
+       }
+       consumer.Close()
+
+       errorCh := make(chan error)
+       go func() {
+               _, err = consumer.Receive(context.Background())
+               errorCh <- err
+       }()
+       select {
+       case <-time.After(200 * time.Millisecond):
+       case err = <-errorCh:
+       }
+       assert.Equal(t, ErrConsumerClosed, err)
+}

Reply via email to