Gleiphir2769 commented on code in PR #976:
URL: https://github.com/apache/pulsar-client-go/pull/976#discussion_r1135151770


##########
pulsar/consumer_test.go:
##########
@@ -3988,3 +3988,89 @@ func runBatchIndexAckTest(t *testing.T, ackWithResponse 
bool, cumulative bool, o
 
        client.Close()
 }
+
+func TestConsumerWithAutoScaledQueueReceive(t *testing.T) {
+       client, err := NewClient(ClientOptions{
+               URL: lookupURL,
+       })
+
+       assert.Nil(t, err)
+       defer client.Close()
+
+       topic := newTopicName()
+
+       // create consumer
+       c, err := client.Subscribe(ConsumerOptions{
+               Topic:                             topic,
+               SubscriptionName:                  "my-sub",
+               Type:                              Exclusive,
+               ReceiverQueueSize:                 3,
+               EnableAutoScaledReceiverQueueSize: true,
+       })
+       assert.Nil(t, err)
+       pc := c.(*consumer).consumers[0]
+       assert.Equal(t, int32(1), pc.currentQueueSize.Load())
+       defer c.Close()
+
+       // create p
+       p, err := client.CreateProducer(ProducerOptions{
+               Topic:           topic,
+               DisableBatching: false,
+       })
+       assert.Nil(t, err)
+       defer p.Close()
+
+       // send message, it will update scaleReceiverQueueHint from false to 
true
+       _, err = p.Send(context.Background(), &ProducerMessage{
+               Payload: []byte("hello"),
+       })
+       assert.NoError(t, err)
+
+       // this will trigger receiver queue size expanding to 2 because we have 
prefetched 1 message >= currentSize 1.
+       _, err = c.Receive(context.Background())
+       assert.Nil(t, err)
+
+       // waiting for prefetched message passing from queueCh to dispatcher()
+       time.Sleep(time.Second)

Review Comment:
   L4034 time.Sleep can be removed. But if remove L4046, `currentQueueSize` may 
be 2 or 3 because `flow` and `messageReceive` are concurrent.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to