shibd commented on code in PR #976:
URL: https://github.com/apache/pulsar-client-go/pull/976#discussion_r1134916534


##########
pulsar/consumer_partition.go:
##########
@@ -1273,6 +1325,10 @@ func (pc *partitionConsumer) dispatcher() {
 
                        messages = nil
 
+                       if pc.options.autoReceiverQueueSize {
+                               pc.incomingMessages.Store(0)

Review Comment:
   We can't set it directly to 0, Because the `messageReceived` method may 
continue to write messages while cleaning up the queue. 
   
   You should reduce `incomingMessages` value at line 1322, depending on the 
size of the `m` array, until you stop encountering `null`.
   
   



##########
pulsar/consumer_test.go:
##########
@@ -3988,3 +3988,89 @@ func runBatchIndexAckTest(t *testing.T, ackWithResponse 
bool, cumulative bool, o
 
        client.Close()
 }
+
+func TestConsumerWithAutoScaledQueueReceive(t *testing.T) {
+       client, err := NewClient(ClientOptions{
+               URL: lookupURL,
+       })
+
+       assert.Nil(t, err)
+       defer client.Close()
+
+       topic := newTopicName()
+
+       // create consumer
+       c, err := client.Subscribe(ConsumerOptions{
+               Topic:                             topic,
+               SubscriptionName:                  "my-sub",
+               Type:                              Exclusive,
+               ReceiverQueueSize:                 3,
+               EnableAutoScaledReceiverQueueSize: true,
+       })
+       assert.Nil(t, err)
+       pc := c.(*consumer).consumers[0]
+       assert.Equal(t, int32(1), pc.currentQueueSize.Load())
+       defer c.Close()
+
+       // create p
+       p, err := client.CreateProducer(ProducerOptions{
+               Topic:           topic,
+               DisableBatching: false,
+       })
+       assert.Nil(t, err)
+       defer p.Close()
+
+       // send message, it will update scaleReceiverQueueHint from false to 
true
+       _, err = p.Send(context.Background(), &ProducerMessage{
+               Payload: []byte("hello"),
+       })
+       assert.NoError(t, err)
+
+       // this will trigger receiver queue size expanding to 2 because we have 
prefetched 1 message >= currentSize 1.
+       _, err = c.Receive(context.Background())
+       assert.Nil(t, err)
+
+       // waiting for prefetched message passing from queueCh to dispatcher()
+       time.Sleep(time.Second)

Review Comment:
   Is there any other way to avoid sleep?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to