This is an automated email from the ASF dual-hosted git repository.
zixuan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar-client-go.git
The following commit(s) were added to refs/heads/master by this push:
new edea3eb0 [Issue 1276] Fix multiple consumers using zeroQueueConsumer
(#1278)
edea3eb0 is described below
commit edea3eb0198bfe772fb90690137bfb8da8121cc8
Author: crossoverJie <[email protected]>
AuthorDate: Fri Dec 13 15:51:01 2024 +0800
[Issue 1276] Fix multiple consumers using zeroQueueConsumer (#1278)
* fix #1276
* Update pulsar/consumer_zero_queue_test.go
Co-authored-by: Zixuan Liu <[email protected]>
* fix docker clean
---------
Co-authored-by: Zixuan Liu <[email protected]>
---
.github/workflows/ci.yml | 8 +++-
pulsar/consumer_partition.go | 5 ++-
pulsar/consumer_zero_queue_test.go | 83 ++++++++++++++++++++++++++++++++++++++
3 files changed, 93 insertions(+), 3 deletions(-)
diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml
index 52781048..37290c7e 100644
--- a/.github/workflows/ci.yml
+++ b/.github/workflows/ci.yml
@@ -51,8 +51,12 @@ jobs:
go-version: [ '1.22', '1.23' ]
steps:
- uses: actions/checkout@v3
- - name: clean docker cache
- run: docker rmi $(docker images -q) -f && df -h
+ - name: Check for Docker images
+ id: check_images
+ run: echo "::set-output name=images::$(docker images -q | wc -l)"
+ - name: Clean Docker cache if images exist
+ if: ${{ steps.check_images.outputs.images > 0 }}
+ run: docker rmi $(docker images -q) -f && df -h
- uses: actions/setup-go@v3
with:
go-version: ${{ matrix.go-version }}
diff --git a/pulsar/consumer_partition.go b/pulsar/consumer_partition.go
index dd770ce7..568dcaa9 100644
--- a/pulsar/consumer_partition.go
+++ b/pulsar/consumer_partition.go
@@ -1629,7 +1629,10 @@ func (pc *partitionConsumer) dispatcher() {
messages[0] = nil
messages = messages[1:]
- pc.availablePermits.inc()
+ // for the zeroQueueConsumer, the permits controlled by
itself
+ if pc.options.receiverQueueSize > 0 {
+ pc.availablePermits.inc()
+ }
if pc.options.autoReceiverQueueSize {
pc.incomingMessages.Dec()
diff --git a/pulsar/consumer_zero_queue_test.go
b/pulsar/consumer_zero_queue_test.go
index d8c8f7c9..34e9df9f 100644
--- a/pulsar/consumer_zero_queue_test.go
+++ b/pulsar/consumer_zero_queue_test.go
@@ -95,6 +95,89 @@ func TestNormalZeroQueueConsumer(t *testing.T) {
err = consumer.Unsubscribe()
assert.Nil(t, err)
}
+
+func TestMultipleConsumer(t *testing.T) {
+ client, err := NewClient(ClientOptions{
+ URL: lookupURL,
+ })
+ if err != nil {
+ log.Fatal(err)
+ }
+ defer client.Close()
+
+ topic := newTopicName()
+ ctx := context.Background()
+
+ // create consumer1
+ consumer1, err := client.Subscribe(ConsumerOptions{
+ Topic: topic,
+ SubscriptionName: "my-sub",
+ Type: Shared,
+ EnableZeroQueueConsumer: true,
+ })
+ assert.Nil(t, err)
+ _, ok := consumer1.(*zeroQueueConsumer)
+ assert.True(t, ok)
+ defer consumer1.Close()
+
+ // create consumer2
+ consumer2, err := client.Subscribe(ConsumerOptions{
+ Topic: topic,
+ SubscriptionName: "my-sub",
+ Type: Shared,
+ EnableZeroQueueConsumer: true,
+ })
+ assert.Nil(t, err)
+ _, ok = consumer2.(*zeroQueueConsumer)
+ assert.True(t, ok)
+ defer consumer2.Close()
+
+ // create producer
+ producer, err := client.CreateProducer(ProducerOptions{
+ Topic: topic,
+ DisableBatching: true,
+ })
+ assert.Nil(t, err)
+ defer producer.Close()
+
+ sendNum := 10
+ // send 10 messages
+ for i := 0; i < sendNum; i++ {
+ msg, err := producer.Send(ctx, &ProducerMessage{
+ Payload: []byte(fmt.Sprintf("hello-%d", i)),
+ Key: "pulsar",
+ Properties: map[string]string{
+ "key-1": "pulsar-1",
+ },
+ })
+ assert.Nil(t, err)
+ log.Printf("send message: %s", msg.String())
+ }
+
+ // receive messages
+ for i := 0; i < sendNum/2; i++ {
+ msg, err := consumer1.Receive(context.Background())
+ if err != nil {
+ log.Fatal(err)
+ }
+ log.Printf("consumer1 receive message: %s %s",
msg.ID().String(), msg.Payload())
+ // ack message
+ consumer1.Ack(msg)
+ }
+
+ // receive messages
+ for i := 0; i < sendNum/2; i++ {
+ msg, err := consumer2.Receive(context.Background())
+ if err != nil {
+ log.Fatal(err)
+ }
+ log.Printf("consumer2 receive message: %s %s",
msg.ID().String(), msg.Payload())
+ // ack message
+ consumer2.Ack(msg)
+ }
+
+}
+
func TestPartitionZeroQueueConsumer(t *testing.T) {
client, err := NewClient(ClientOptions{
URL: lookupURL,