This is an automated email from the ASF dual-hosted git repository.

crossoverJie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar-client-go.git


The following commit(s) were added to refs/heads/master by this push:
     new c2bc7e3c test: fix TestPriorityConsumer (#1495)
c2bc7e3c is described below

commit c2bc7e3c90c8bd9b8801b93fef483c2e5de603ac
Author: Zixuan Liu <[email protected]>
AuthorDate: Mon May 18 17:18:30 2026 +0800

    test: fix TestPriorityConsumer (#1495)
---
 pulsar/consumer_test.go | 30 ++++++++++++++++++------------
 1 file changed, 18 insertions(+), 12 deletions(-)

diff --git a/pulsar/consumer_test.go b/pulsar/consumer_test.go
index 4e232b8a..4e2cbc4a 100644
--- a/pulsar/consumer_test.go
+++ b/pulsar/consumer_test.go
@@ -296,27 +296,33 @@ func TestPriorityConsumer(t *testing.T) {
        assert.Nil(t, err)
        defer producer.Close()
 
-       for i := 0; i < 10; i++ {
+       // Phase 1: Send 15 messages — distributed among the three priority-1 
consumers
+       for i := 0; i < 15; i++ {
                _, err := producer.Send(context.Background(), &ProducerMessage{
                        Payload: []byte(fmt.Sprintf("hello-%d", i)),
                })
                assert.Nil(t, err)
        }
 
-       // Drain permits from consumer1 and consumer2
-       for i := 0; i < 5; i++ {
-               ctx, cancel := context.WithTimeout(context.Background(), 
2*time.Second)
-               msg, err := consumer1.Receive(ctx)
-               cancel()
-               assert.Nil(t, err)
-               assert.NotNil(t, msg)
+       // Phase 2: Drain 20 messages from consumer1 and consumer2 to replenish 
their permits.
+       // After receiving, each consumer sends individual permits back to the 
broker,
+       // so consumer1 and consumer2 will have more permits than consumer3.
+       for i := 0; i < 20; i++ {
+               ctx1, cancel1 := context.WithTimeout(context.Background(), 
2*time.Second)
+               _, _ = consumer1.Receive(ctx1)
+               cancel1()
+               ctx2, cancel2 := context.WithTimeout(context.Background(), 
2*time.Second)
+               _, _ = consumer2.Receive(ctx2)
+               cancel2()
        }
+
+       // Phase 3: Send 5 more messages — broker should dispatch only to 
consumer1/consumer2
+       // because they have more available permits at priority level 1.
        for i := 0; i < 5; i++ {
-               ctx, cancel := context.WithTimeout(context.Background(), 
2*time.Second)
-               msg, err := consumer2.Receive(ctx)
-               cancel()
+               _, err := producer.Send(context.Background(), &ProducerMessage{
+                       Payload: []byte(fmt.Sprintf("hello-extra-%d", i)),
+               })
                assert.Nil(t, err)
-               assert.NotNil(t, msg)
        }
 
        // Low-priority consumer should not have received any messages

Reply via email to