Gleiphir2769 commented on code in PR #976:
URL: https://github.com/apache/pulsar-client-go/pull/976#discussion_r1135151770
##########
pulsar/consumer_test.go:
##########
@@ -3988,3 +3988,89 @@ func runBatchIndexAckTest(t *testing.T, ackWithResponse
bool, cumulative bool, o
client.Close()
}
+
+func TestConsumerWithAutoScaledQueueReceive(t *testing.T) {
+ client, err := NewClient(ClientOptions{
+ URL: lookupURL,
+ })
+
+ assert.Nil(t, err)
+ defer client.Close()
+
+ topic := newTopicName()
+
+ // create consumer
+ c, err := client.Subscribe(ConsumerOptions{
+ Topic: topic,
+ SubscriptionName: "my-sub",
+ Type: Exclusive,
+ ReceiverQueueSize: 3,
+ EnableAutoScaledReceiverQueueSize: true,
+ })
+ assert.Nil(t, err)
+ pc := c.(*consumer).consumers[0]
+ assert.Equal(t, int32(1), pc.currentQueueSize.Load())
+ defer c.Close()
+
+ // create p
+ p, err := client.CreateProducer(ProducerOptions{
+ Topic: topic,
+ DisableBatching: false,
+ })
+ assert.Nil(t, err)
+ defer p.Close()
+
+ // send message, it will update scaleReceiverQueueHint from false to
true
+ _, err = p.Send(context.Background(), &ProducerMessage{
+ Payload: []byte("hello"),
+ })
+ assert.NoError(t, err)
+
+ // this will trigger receiver queue size expanding to 2 because we have
prefetched 1 message >= currentSize 1.
+ _, err = c.Receive(context.Background())
+ assert.Nil(t, err)
+
+ // waiting for prefetched message passing from queueCh to dispatcher()
+ time.Sleep(time.Second)
Review Comment:
L4034 time.Sleep can be removed. But if remove L4046, `currentQueueSize` may
be 2 or 3 because `flow` and `messageReceive` are concurrent.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]