shibd commented on code in PR #976:
URL: https://github.com/apache/pulsar-client-go/pull/976#discussion_r1134916534
##########
pulsar/consumer_partition.go:
##########
@@ -1273,6 +1325,10 @@ func (pc *partitionConsumer) dispatcher() {
messages = nil
+ if pc.options.autoReceiverQueueSize {
+ pc.incomingMessages.Store(0)
Review Comment:
We can't set it directly to 0, Because the `messageReceived` method may
continue to write messages while cleaning up the queue.
You should reduce `incomingMessages` value at line 1322, depending on the
size of the `m` array, until you stop encountering `null`.
##########
pulsar/consumer_test.go:
##########
@@ -3988,3 +3988,89 @@ func runBatchIndexAckTest(t *testing.T, ackWithResponse
bool, cumulative bool, o
client.Close()
}
+
+func TestConsumerWithAutoScaledQueueReceive(t *testing.T) {
+ client, err := NewClient(ClientOptions{
+ URL: lookupURL,
+ })
+
+ assert.Nil(t, err)
+ defer client.Close()
+
+ topic := newTopicName()
+
+ // create consumer
+ c, err := client.Subscribe(ConsumerOptions{
+ Topic: topic,
+ SubscriptionName: "my-sub",
+ Type: Exclusive,
+ ReceiverQueueSize: 3,
+ EnableAutoScaledReceiverQueueSize: true,
+ })
+ assert.Nil(t, err)
+ pc := c.(*consumer).consumers[0]
+ assert.Equal(t, int32(1), pc.currentQueueSize.Load())
+ defer c.Close()
+
+ // create p
+ p, err := client.CreateProducer(ProducerOptions{
+ Topic: topic,
+ DisableBatching: false,
+ })
+ assert.Nil(t, err)
+ defer p.Close()
+
+ // send message, it will update scaleReceiverQueueHint from false to
true
+ _, err = p.Send(context.Background(), &ProducerMessage{
+ Payload: []byte("hello"),
+ })
+ assert.NoError(t, err)
+
+ // this will trigger receiver queue size expanding to 2 because we have
prefetched 1 message >= currentSize 1.
+ _, err = c.Receive(context.Background())
+ assert.Nil(t, err)
+
+ // waiting for prefetched message passing from queueCh to dispatcher()
+ time.Sleep(time.Second)
Review Comment:
Is there any other way to avoid sleep?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]