This is an automated email from the ASF dual-hosted git repository.

zixuan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar-client-go.git


The following commit(s) were added to refs/heads/master by this push:
     new edea3eb0 [Issue 1276] Fix multiple consumers using zeroQueueConsumer 
(#1278)
edea3eb0 is described below

commit edea3eb0198bfe772fb90690137bfb8da8121cc8
Author: crossoverJie <[email protected]>
AuthorDate: Fri Dec 13 15:51:01 2024 +0800

    [Issue 1276] Fix multiple consumers using zeroQueueConsumer (#1278)
    
    * fix #1276
    
    * Update pulsar/consumer_zero_queue_test.go
    
    Co-authored-by: Zixuan Liu <[email protected]>
    
    * fix docker clean
    
    ---------
    
    Co-authored-by: Zixuan Liu <[email protected]>
---
 .github/workflows/ci.yml           |  8 +++-
 pulsar/consumer_partition.go       |  5 ++-
 pulsar/consumer_zero_queue_test.go | 83 ++++++++++++++++++++++++++++++++++++++
 3 files changed, 93 insertions(+), 3 deletions(-)

diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml
index 52781048..37290c7e 100644
--- a/.github/workflows/ci.yml
+++ b/.github/workflows/ci.yml
@@ -51,8 +51,12 @@ jobs:
         go-version: [ '1.22', '1.23' ]
     steps:
       - uses: actions/checkout@v3
-      - name: clean docker cache
-        run: docker rmi $(docker images -q) -f && df -h
+      - name: Check for Docker images
+        id: check_images
+        run: echo "::set-output name=images::$(docker images -q | wc -l)"
+      - name: Clean Docker cache if images exist
+        if: ${{ steps.check_images.outputs.images > 0 }}
+        run: docker rmi $(docker images -q) -f && df -h        
       - uses: actions/setup-go@v3
         with:
           go-version: ${{ matrix.go-version }}
diff --git a/pulsar/consumer_partition.go b/pulsar/consumer_partition.go
index dd770ce7..568dcaa9 100644
--- a/pulsar/consumer_partition.go
+++ b/pulsar/consumer_partition.go
@@ -1629,7 +1629,10 @@ func (pc *partitionConsumer) dispatcher() {
                        messages[0] = nil
                        messages = messages[1:]
 
-                       pc.availablePermits.inc()
+                       // for the zeroQueueConsumer, the permits controlled by 
itself
+                       if pc.options.receiverQueueSize > 0 {
+                               pc.availablePermits.inc()
+                       }
 
                        if pc.options.autoReceiverQueueSize {
                                pc.incomingMessages.Dec()
diff --git a/pulsar/consumer_zero_queue_test.go 
b/pulsar/consumer_zero_queue_test.go
index d8c8f7c9..34e9df9f 100644
--- a/pulsar/consumer_zero_queue_test.go
+++ b/pulsar/consumer_zero_queue_test.go
@@ -95,6 +95,89 @@ func TestNormalZeroQueueConsumer(t *testing.T) {
        err = consumer.Unsubscribe()
        assert.Nil(t, err)
 }
+
+func TestMultipleConsumer(t *testing.T) {
+       client, err := NewClient(ClientOptions{
+               URL: lookupURL,
+       })
+       if err != nil {
+               log.Fatal(err)
+       }
+       defer client.Close()
+
+       topic := newTopicName()
+       ctx := context.Background()
+
+       // create consumer1
+       consumer1, err := client.Subscribe(ConsumerOptions{
+               Topic:                   topic,
+               SubscriptionName:        "my-sub",
+               Type:                    Shared,
+               EnableZeroQueueConsumer: true,
+       })
+       assert.Nil(t, err)
+       _, ok := consumer1.(*zeroQueueConsumer)
+       assert.True(t, ok)
+       defer consumer1.Close()
+
+       // create consumer2
+       consumer2, err := client.Subscribe(ConsumerOptions{
+               Topic:                   topic,
+               SubscriptionName:        "my-sub",
+               Type:                    Shared,
+               EnableZeroQueueConsumer: true,
+       })
+       assert.Nil(t, err)
+       _, ok = consumer2.(*zeroQueueConsumer)
+       assert.True(t, ok)
+       defer consumer2.Close()
+
+       // create producer
+       producer, err := client.CreateProducer(ProducerOptions{
+               Topic:           topic,
+               DisableBatching: true,
+       })
+       assert.Nil(t, err)
+       defer producer.Close()
+
+       sendNum := 10
+       // send 10 messages
+       for i := 0; i < sendNum; i++ {
+               msg, err := producer.Send(ctx, &ProducerMessage{
+                       Payload: []byte(fmt.Sprintf("hello-%d", i)),
+                       Key:     "pulsar",
+                       Properties: map[string]string{
+                               "key-1": "pulsar-1",
+                       },
+               })
+               assert.Nil(t, err)
+               log.Printf("send message: %s", msg.String())
+       }
+
+       // receive messages
+       for i := 0; i < sendNum/2; i++ {
+               msg, err := consumer1.Receive(context.Background())
+               if err != nil {
+                       log.Fatal(err)
+               }
+               log.Printf("consumer1 receive message: %s %s", 
msg.ID().String(), msg.Payload())
+               // ack message
+               consumer1.Ack(msg)
+       }
+
+       // receive messages
+       for i := 0; i < sendNum/2; i++ {
+               msg, err := consumer2.Receive(context.Background())
+               if err != nil {
+                       log.Fatal(err)
+               }
+               log.Printf("consumer2 receive message: %s %s", 
msg.ID().String(), msg.Payload())
+               // ack message
+               consumer2.Ack(msg)
+       }
+
+}
+
 func TestPartitionZeroQueueConsumer(t *testing.T) {
        client, err := NewClient(ClientOptions{
                URL: lookupURL,

Reply via email to