Gleiphir2769 commented on code in PR #991:
URL: https://github.com/apache/pulsar-client-go/pull/991#discussion_r1145845933
##########
pulsar/consumer_test.go:
##########
@@ -4113,3 +4113,204 @@ func TestConsumerBatchIndexAckDisabled(t *testing.T) {
assert.Nil(t, err)
assert.Equal(t, []byte("done"), message.Payload())
}
+
+func TestConsumerMemoryLimit(t *testing.T) {
+ // Create client 1 without memory limit
+ cli1, err := NewClient(ClientOptions{
+ URL: lookupURL,
+ })
+
+ assert.Nil(t, err)
+ defer cli1.Close()
+
+ // Create client 1 with memory limit
+ cli2, err := NewClient(ClientOptions{
+ URL: lookupURL,
+ MemoryLimitBytes: 10 * 1024,
+ })
+
+ assert.Nil(t, err)
+ defer cli2.Close()
+
+ topic := newTopicName()
+
+ // Use client 1 to create producer p1
+ p1, err := cli1.CreateProducer(ProducerOptions{
+ Topic: topic,
+ DisableBatching: false,
+ })
+ assert.Nil(t, err)
+ defer p1.Close()
+
+ // Use mem-limited client 2 to create consumer c1
+ c1, err := cli2.Subscribe(ConsumerOptions{
+ Topic: topic,
+ SubscriptionName: "my-sub-1",
+ Type: Exclusive,
+ EnableAutoScaledReceiverQueueSize: true,
+ })
+ assert.Nil(t, err)
+ defer c1.Close()
+ pc1 := c1.(*consumer).consumers[0]
+
+ // Fill up the messageCh of c1
+ for i := 0; i < 10; i++ {
+ p1.SendAsync(
+ context.Background(),
+ &ProducerMessage{Payload: createTestMessagePayload(1)},
+ func(id MessageID, producerMessage *ProducerMessage,
err error) {
+ },
+ )
+ }
+
+ retryAssert(t, 5, 200, func() {}, func(t assert.TestingT) bool {
+ return assert.Equal(t, 10, len(pc1.messageCh))
+ })
+
+ // Get current receiver queue size of c1
+ prevQueueSize := pc1.currentQueueSize.Load()
+
+ // Make the client 1 exceed the memory limit
+ _, err = p1.Send(context.Background(), &ProducerMessage{
+ Payload: createTestMessagePayload(10*1024 + 1),
+ })
+ assert.NoError(t, err)
+
+ // c1 should shrink it's receiver queue size
+ retryAssert(t, 5, 200, func() {}, func(t assert.TestingT) bool {
+ return assert.Equal(t, prevQueueSize/2,
pc1.currentQueueSize.Load())
+ })
+
+ // Use mem-limited client 2 to create consumer c2
+ c2, err := cli2.Subscribe(ConsumerOptions{
+ Topic: topic,
+ SubscriptionName: "my-sub-2",
+ Type: Exclusive,
+ SubscriptionInitialPosition: SubscriptionPositionEarliest,
+ EnableAutoScaledReceiverQueueSize: true,
+ })
+ assert.Nil(t, err)
+ defer c2.Close()
+ pc2 := c2.(*consumer).consumers[0]
+
+ // Try to induce c2 receiver queue size expansion
+ for i := 0; i < 10; i++ {
+ p1.SendAsync(
+ context.Background(),
+ &ProducerMessage{Payload: createTestMessagePayload(1)},
+ func(id MessageID, producerMessage *ProducerMessage,
err error) {
+ },
+ )
+ }
+
+ retryAssert(t, 5, 200, func() {}, func(t assert.TestingT) bool {
+ return assert.Equal(t, 10, len(pc1.messageCh))
+ })
+
+ // c2 receiver queue size should not expansion because client 1 has
exceeded the memory limit
+ assert.Equal(t, 1, int(pc2.currentQueueSize.Load()))
+
+ // Use mem-limited client 2 to create producer p2
+ p2, err := cli2.CreateProducer(ProducerOptions{
+ Topic: topic,
+ DisableBatching: false,
+ DisableBlockIfQueueFull: true,
+ })
+ assert.Nil(t, err)
+ defer p2.Close()
+
+ _, err = p2.Send(context.Background(), &ProducerMessage{
+ Payload: createTestMessagePayload(1),
+ })
+ // Producer can't send message
+ assert.Equal(t, true, errors.Is(err, errMemoryBufferIsFull))
+}
+
+func TestMultiConsumerMemoryLimit(t *testing.T) {
+ // Create client 1 without memory limit
+ cli1, err := NewClient(ClientOptions{
+ URL: lookupURL,
+ })
+
+ assert.Nil(t, err)
+ defer cli1.Close()
+
+ // Create client 1 with memory limit
+ cli2, err := NewClient(ClientOptions{
+ URL: lookupURL,
+ MemoryLimitBytes: 10 * 1024,
+ })
+
+ assert.Nil(t, err)
+ defer cli2.Close()
+
+ topic := newTopicName()
+
+ // Use client 1 to create producer p1
+ p1, err := cli1.CreateProducer(ProducerOptions{
+ Topic: topic,
+ DisableBatching: false,
+ })
+ assert.Nil(t, err)
+ defer p1.Close()
+
+ // Use mem-limited client 2 to create consumer c1
+ c1, err := cli2.Subscribe(ConsumerOptions{
+ Topic: topic,
+ SubscriptionName: "my-sub-1",
+ Type: Exclusive,
+ EnableAutoScaledReceiverQueueSize: true,
+ })
+ assert.Nil(t, err)
+ defer c1.Close()
+ pc1 := c1.(*consumer).consumers[0]
+
+ // Use mem-limited client 2 to create consumer c1
+ c2, err := cli2.Subscribe(ConsumerOptions{
+ Topic: topic,
+ SubscriptionName: "my-sub-2",
+ Type: Exclusive,
+ EnableAutoScaledReceiverQueueSize: true,
+ })
+ assert.Nil(t, err)
+ defer c2.Close()
+ pc2 := c1.(*consumer).consumers[0]
Review Comment:
Done.
```
pc2 := c2.(*consumer).consumers[0]
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]