This is an automated email from the ASF dual-hosted git repository.
crossoverJie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar-client-go.git
The following commit(s) were added to refs/heads/master by this push:
new c2bc7e3c test: fix TestPriorityConsumer (#1495)
c2bc7e3c is described below
commit c2bc7e3c90c8bd9b8801b93fef483c2e5de603ac
Author: Zixuan Liu <[email protected]>
AuthorDate: Mon May 18 17:18:30 2026 +0800
test: fix TestPriorityConsumer (#1495)
---
pulsar/consumer_test.go | 30 ++++++++++++++++++------------
1 file changed, 18 insertions(+), 12 deletions(-)
diff --git a/pulsar/consumer_test.go b/pulsar/consumer_test.go
index 4e232b8a..4e2cbc4a 100644
--- a/pulsar/consumer_test.go
+++ b/pulsar/consumer_test.go
@@ -296,27 +296,33 @@ func TestPriorityConsumer(t *testing.T) {
assert.Nil(t, err)
defer producer.Close()
- for i := 0; i < 10; i++ {
+ // Phase 1: Send 15 messages — distributed among the three priority-1
consumers
+ for i := 0; i < 15; i++ {
_, err := producer.Send(context.Background(), &ProducerMessage{
Payload: []byte(fmt.Sprintf("hello-%d", i)),
})
assert.Nil(t, err)
}
- // Drain permits from consumer1 and consumer2
- for i := 0; i < 5; i++ {
- ctx, cancel := context.WithTimeout(context.Background(),
2*time.Second)
- msg, err := consumer1.Receive(ctx)
- cancel()
- assert.Nil(t, err)
- assert.NotNil(t, msg)
+ // Phase 2: Drain 20 messages from consumer1 and consumer2 to replenish
their permits.
+ // After receiving, each consumer sends individual permits back to the
broker,
+ // so consumer1 and consumer2 will have more permits than consumer3.
+ for i := 0; i < 20; i++ {
+ ctx1, cancel1 := context.WithTimeout(context.Background(),
2*time.Second)
+ _, _ = consumer1.Receive(ctx1)
+ cancel1()
+ ctx2, cancel2 := context.WithTimeout(context.Background(),
2*time.Second)
+ _, _ = consumer2.Receive(ctx2)
+ cancel2()
}
+
+ // Phase 3: Send 5 more messages — broker should dispatch only to
consumer1/consumer2
+ // because they have more available permits at priority level 1.
for i := 0; i < 5; i++ {
- ctx, cancel := context.WithTimeout(context.Background(),
2*time.Second)
- msg, err := consumer2.Receive(ctx)
- cancel()
+ _, err := producer.Send(context.Background(), &ProducerMessage{
+ Payload: []byte(fmt.Sprintf("hello-extra-%d", i)),
+ })
assert.Nil(t, err)
- assert.NotNil(t, msg)
}
// Low-priority consumer should not have received any messages