This is an automated email from the ASF dual-hosted git repository.

penghui pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 36f700f  [PIP-74] Dependency of consumer client memory limit: support 
dynamic limit of consumer receiver queue (#14400)
36f700f is described below

commit 36f700f92bc7ac7b598bdf4fa12d93e4c6dadfd2
Author: JiangHaiting <[email protected]>
AuthorDate: Mon Feb 28 09:53:42 2022 +0800

    [PIP-74] Dependency of consumer client memory limit: support dynamic limit 
of consumer receiver queue (#14400)
    
    ### Motivation
    
    This is part of the work for [PIP 
74](https://github.com/apache/pulsar/wiki/PIP-74%3A-Pulsar-client-memory-limits)
    We need dynamic update `currentReceiverQueue` to control client memory.
    
    ### Modifications
    
    Add getter and setter method for `ConsumerBase#maxReceiverQueueSize`.
    - For `ConsumerImpl`, we need update availablePermits together.
    - For `MultiTopicsConsumerImpl`, we need update inner consumers together 
and trigger paused consumers.
---
 .../client/impl/DynamicReceiverQueueSizeTest.java  | 121 +++++++++++++++++++++
 .../apache/pulsar/client/impl/ConsumerBase.java    |  27 +++--
 .../apache/pulsar/client/impl/ConsumerImpl.java    |  23 ++--
 .../client/impl/MultiTopicsConsumerImpl.java       |  31 ++++--
 .../pulsar/client/impl/ZeroQueueConsumerImpl.java  |   7 ++
 .../pulsar/client/impl/ConsumerImplTest.java       |   9 ++
 6 files changed, 193 insertions(+), 25 deletions(-)

diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/DynamicReceiverQueueSizeTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/DynamicReceiverQueueSizeTest.java
new file mode 100644
index 0000000..9c81aec
--- /dev/null
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/DynamicReceiverQueueSizeTest.java
@@ -0,0 +1,121 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.pulsar.client.impl;
+
+
+import java.nio.charset.StandardCharsets;
+import lombok.Cleanup;
+import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest;
+import org.apache.pulsar.client.api.Producer;
+import org.apache.pulsar.client.api.PulsarClientException;
+import org.awaitility.Awaitility;
+import org.testng.Assert;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Test;
+
+public class DynamicReceiverQueueSizeTest extends MockedPulsarServiceBaseTest {
+    @BeforeClass(alwaysRun = true)
+    @Override
+    protected void setup() throws Exception {
+        super.internalSetup();
+        setupDefaultTenantAndNamespace();
+    }
+
+    @BeforeClass(alwaysRun = true)
+    @Override
+    protected void cleanup() throws Exception {
+        super.internalCleanup();
+    }
+
+    @Test
+    public void testConsumerImpl() throws PulsarClientException {
+        String topic = "persistent://public/default/testConsumerImpl" + 
System.currentTimeMillis();
+        @Cleanup
+        ConsumerImpl<byte[]> consumer = (ConsumerImpl<byte[]>) 
pulsarClient.newConsumer()
+                .topic(topic)
+                .subscriptionName("my-sub")
+                .receiverQueueSize(5)
+                .subscribe();
+        @Cleanup
+        Producer<byte[]> producer = 
pulsarClient.newProducer().topic(topic).create();
+        byte[] data = "data".getBytes(StandardCharsets.UTF_8);
+        for (int i = 0; i < 10; i++) {
+            producer.send(data);
+        }
+        Assert.assertEquals(consumer.getCurrentReceiverQueueSize(), 5);
+        Awaitility.await().untilAsserted(() -> 
Assert.assertEquals(consumer.getTotalIncomingMessages(), 5));
+        Assert.assertEquals(consumer.getAvailablePermits(), 0);
+
+        consumer.setCurrentReceiverQueueSize(8);
+        Assert.assertEquals(consumer.getCurrentReceiverQueueSize(), 8);
+        Assert.assertEquals(consumer.getAvailablePermits(), 3);
+
+        consumer.setCurrentReceiverQueueSize(10);
+        Awaitility.await().untilAsserted(() -> 
Assert.assertEquals(consumer.getTotalIncomingMessages(), 10));
+        Assert.assertEquals(consumer.getAvailablePermits(), 0);
+
+        consumer.setCurrentReceiverQueueSize(3);
+        Assert.assertEquals(consumer.getAvailablePermits(), -7);
+        for (int i = 0; i < 7; i++) {
+            consumer.acknowledge(consumer.receive());
+            Assert.assertEquals(consumer.getAvailablePermits(), -6 + i);
+        }
+        consumer.acknowledge(consumer.receive()); //8
+        consumer.acknowledge(consumer.receive()); //9
+        consumer.acknowledge(consumer.receive()); //10
+        Assert.assertEquals(consumer.getAvailablePermits(), 0);
+
+        for (int i = 0; i < 10; i++) {
+            producer.send(data);
+        }
+        Awaitility.await().untilAsserted(() -> 
Assert.assertEquals(consumer.getTotalIncomingMessages(), 3));
+    }
+
+    @Test
+    public void testMultiConsumerImpl() throws Exception {
+        String topic = "persistent://public/default/testMultiConsumerImpl" + 
System.currentTimeMillis();
+        admin.topics().createPartitionedTopic(topic, 4);
+        @Cleanup
+        MultiTopicsConsumerImpl<byte[]> consumer = 
(MultiTopicsConsumerImpl<byte[]>) pulsarClient.newConsumer()
+                .topic(topic)
+                .subscriptionName("my-sub")
+                .receiverQueueSize(5)
+                .subscribe();
+        @Cleanup
+        Producer<byte[]> producer = 
pulsarClient.newProducer().topic(topic).enableBatching(false).create();
+        byte[] data = "data".getBytes(StandardCharsets.UTF_8);
+        for (int i = 0; i < 30; i++) {
+            producer.send(data);
+        }
+        Assert.assertEquals(consumer.getCurrentReceiverQueueSize(), 5);
+        for (ConsumerImpl<byte[]> c : consumer.getConsumers()) {
+            Assert.assertEquals(c.getCurrentReceiverQueueSize(), 5);
+        }
+        Awaitility.await().untilAsserted(() -> {
+            Assert.assertEquals(consumer.getAvailablePermits(), 0);
+            Assert.assertTrue(consumer.getTotalIncomingMessages() >= 5);
+            Assert.assertTrue(consumer.getTotalIncomingMessages() < 30);
+        });
+        consumer.setCurrentReceiverQueueSize(30);
+        Awaitility.await().untilAsserted(() -> {
+            Assert.assertEquals(consumer.getTotalIncomingMessages(), 30);
+        });
+    }
+}
diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBase.java 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBase.java
index 7f0575a..6d3e81d 100644
--- 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBase.java
+++ 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBase.java
@@ -35,6 +35,7 @@ import java.util.concurrent.ExecutionException;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
 import java.util.concurrent.atomic.AtomicLongFieldUpdater;
 import java.util.concurrent.locks.Lock;
 import java.util.concurrent.locks.ReentrantLock;
@@ -42,6 +43,7 @@ import lombok.Getter;
 import lombok.Setter;
 import org.apache.pulsar.client.api.BatchReceivePolicy;
 import org.apache.pulsar.client.api.Consumer;
+import org.apache.pulsar.client.api.ConsumerBuilder;
 import org.apache.pulsar.client.api.ConsumerEventListener;
 import org.apache.pulsar.client.api.Message;
 import org.apache.pulsar.client.api.MessageId;
@@ -78,7 +80,10 @@ public abstract class ConsumerBase<T> extends HandlerState 
implements Consumer<T
     final BlockingQueue<Message<T>> incomingMessages;
     protected ConcurrentOpenHashMap<MessageIdImpl, MessageIdImpl[]> 
unAckedChunkedMessageIdSequenceMap;
     protected final ConcurrentLinkedQueue<CompletableFuture<Message<T>>> 
pendingReceives;
-    protected int maxReceiverQueueSize;
+    protected final int maxReceiverQueueSize;
+    private volatile int currentReceiverQueueSize;
+    protected static final AtomicIntegerFieldUpdater<ConsumerBase> 
CURRENT_RECEIVER_QUEUE_SIZE_UPDATER =
+            AtomicIntegerFieldUpdater.newUpdater(ConsumerBase.class, 
"currentReceiverQueueSize");
     protected final Schema<T> schema;
     protected final ConsumerInterceptors<T> interceptors;
     protected final BatchReceivePolicy batchReceivePolicy;
@@ -119,6 +124,7 @@ public abstract class ConsumerBase<T> extends HandlerState 
implements Consumer<T
         this.pendingBatchReceives = Queues.newConcurrentLinkedQueue();
         this.schema = schema;
         this.interceptors = interceptors;
+        CURRENT_RECEIVER_QUEUE_SIZE_UPDATER.set(this, receiverQueueSize);
         if (conf.getBatchReceivePolicy() != null) {
             BatchReceivePolicy userBatchReceivePolicy = 
conf.getBatchReceivePolicy();
             if (userBatchReceivePolicy.getMaxNumMessages() > 
this.maxReceiverQueueSize) {
@@ -185,7 +191,7 @@ public abstract class ConsumerBase<T> extends HandlerState 
implements Consumer<T
 
     @Override
     public Message<T> receive(int timeout, TimeUnit unit) throws 
PulsarClientException {
-        if (conf.getReceiverQueueSize() == 0) {
+        if (getCurrentReceiverQueueSize() == 0) {
             throw new PulsarClientException.InvalidConfigurationException(
                     "Can't use receive with timeout, if the queue size is 0");
         }
@@ -721,10 +727,6 @@ public abstract class ConsumerBase<T> extends HandlerState 
implements Consumer<T
                 + '}';
     }
 
-    protected void setMaxReceiverQueueSize(int newSize) {
-        this.maxReceiverQueueSize = newSize;
-    }
-
     protected Message<T> beforeConsume(Message<T> message) {
         if (interceptors != null) {
             return interceptors.beforeConsume(this, message);
@@ -811,7 +813,7 @@ public abstract class ConsumerBase<T> extends HandlerState 
implements Consumer<T
             throw new PulsarClientException.InvalidConfigurationException(
                 "Cannot use receive() when a listener has been set");
         }
-        if (conf.getReceiverQueueSize() == 0) {
+        if (getCurrentReceiverQueueSize() == 0) {
             throw new PulsarClientException.InvalidConfigurationException(
                 "Can't use batch receive, if the queue size is 0");
         }
@@ -1045,6 +1047,17 @@ public abstract class ConsumerBase<T> extends 
HandlerState implements Consumer<T
         resetIncomingMessageSize();
     }
 
+    /**
+     * Update the size of the consumer receive queue.
+     * See {@link ConsumerBuilder#receiverQueueSize(int)}.
+     * @param newSize new size of the receiver queue.
+     */
+    protected abstract void setCurrentReceiverQueueSize(int newSize);
+
+    public int getCurrentReceiverQueueSize() {
+        return CURRENT_RECEIVER_QUEUE_SIZE_UPDATER.get(this);
+    }
+
     protected abstract void completeOpBatchReceive(OpBatchReceive<T> op);
 
     private ExecutorService getExternalExecutor(Message<T> msg) {
diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
index a3a4e2a..7d2e1b6 100644
--- 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
+++ 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
@@ -138,8 +138,6 @@ public class ConsumerImpl<T> extends ConsumerBase<T> 
implements ConnectionHandle
     private final int partitionIndex;
     private final boolean hasParentConsumer;
 
-    private final int receiverQueueRefillThreshold;
-
     private final UnAckedMessageTracker unAckedMessageTracker;
     private final AcknowledgmentsGroupingTracker 
acknowledgmentsGroupingTracker;
     private final NegativeAcksTracker negativeAcksTracker;
@@ -254,7 +252,6 @@ public class ConsumerImpl<T> extends ConsumerBase<T> 
implements ConnectionHandle
         this.lookupDeadline = System.currentTimeMillis() + 
client.getConfiguration().getLookupTimeoutMs();
         this.partitionIndex = partitionIndex;
         this.hasParentConsumer = hasParentConsumer;
-        this.receiverQueueRefillThreshold = conf.getReceiverQueueSize() / 2;
         this.priorityLevel = conf.getPriorityLevel();
         this.readCompacted = conf.isReadCompacted();
         this.subscriptionInitialPosition = 
conf.getSubscriptionInitialPosition();
@@ -839,8 +836,8 @@ public class ConsumerImpl<T> extends ConsumerBase<T> 
implements ConnectionHandle
                 boolean firstTimeConnect = subscribeFuture.complete(this);
                 // if the consumer is not partitioned or is re-connected and 
is partitioned, we send the flow
                 // command to receive messages.
-                if (!(firstTimeConnect && hasParentConsumer) && 
conf.getReceiverQueueSize() != 0) {
-                    increaseAvailablePermits(cnx, conf.getReceiverQueueSize());
+                if (!(firstTimeConnect && hasParentConsumer) && 
getCurrentReceiverQueueSize() != 0) {
+                    increaseAvailablePermits(cnx, 
getCurrentReceiverQueueSize());
                 }
             }).exceptionally((e) -> {
                 deregisterFromClientCnx();
@@ -1449,7 +1446,7 @@ public class ConsumerImpl<T> extends ConsumerBase<T> 
implements ConnectionHandle
             return;
         }
 
-        if (conf.getReceiverQueueSize() == 0) {
+        if (getCurrentReceiverQueueSize() == 0) {
             // call interceptor and complete received callback
             trackMessage(message);
             interceptAndComplete(message, receivedFuture);
@@ -1596,7 +1593,7 @@ public class ConsumerImpl<T> extends ConsumerBase<T> 
implements ConnectionHandle
     protected void increaseAvailablePermits(ClientCnx currentCnx, int delta) {
         int available = AVAILABLE_PERMITS_UPDATER.addAndGet(this, delta);
 
-        while (available >= receiverQueueRefillThreshold && !paused) {
+        while (available >= getCurrentReceiverQueueSize() / 2 && !paused) {
             if (AVAILABLE_PERMITS_UPDATER.compareAndSet(this, available, 0)) {
                 sendFlowPermitsToBroker(currentCnx, available);
                 break;
@@ -1611,6 +1608,18 @@ public class ConsumerImpl<T> extends ConsumerBase<T> 
implements ConnectionHandle
     }
 
     @Override
+    protected void setCurrentReceiverQueueSize(int newSize) {
+        checkArgument(newSize > 0, "receiver queue size should larger than 0");
+        int oldSize = CURRENT_RECEIVER_QUEUE_SIZE_UPDATER.getAndSet(this, 
newSize);
+        int delta = newSize - oldSize;
+        if (log.isDebugEnabled()) {
+            log.debug("[{}][{}] update maxReceiverQueueSize from {} to {}, 
increaseAvailablePermits by {}",
+                    topic, subscription, oldSize, newSize, delta);
+        }
+        increaseAvailablePermits(delta);
+    }
+
+    @Override
     public void pause() {
         paused = true;
     }
diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java
 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java
index 844590b..a92edff 100644
--- 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java
+++ 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java
@@ -85,10 +85,6 @@ public class MultiTopicsConsumerImpl<T> extends 
ConsumerBase<T> {
     // shared incoming queue was full
     private final ConcurrentLinkedQueue<ConsumerImpl<T>> pausedConsumers;
 
-    // Threshold for the shared queue. When the size of the shared queue goes 
below the threshold, we are going to
-    // resume receiving from the paused consumer partitions
-    private final int sharedQueueResumeThreshold;
-
     // sum of topicPartitions, simple topic has 1, partitioned topic equals to 
partition number.
     AtomicInteger allTopicPartitionsNumber;
 
@@ -142,7 +138,6 @@ public class MultiTopicsConsumerImpl<T> extends 
ConsumerBase<T> {
         this.partitionedTopics = new ConcurrentHashMap<>();
         this.consumers = new ConcurrentHashMap<>();
         this.pausedConsumers = new ConcurrentLinkedQueue<>();
-        this.sharedQueueResumeThreshold = maxReceiverQueueSize / 2;
         this.allTopicPartitionsNumber = new AtomicInteger(0);
         this.startMessageId = startMessageId != null
                 ? new 
BatchMessageIdImpl(MessageIdImpl.convertToMessageIdImpl(startMessageId))
@@ -187,8 +182,8 @@ public class MultiTopicsConsumerImpl<T> extends 
ConsumerBase<T> {
                 .collect(Collectors.toList());
         FutureUtil.waitForAll(futures)
             .thenAccept(finalFuture -> {
-                if (allTopicPartitionsNumber.get() > maxReceiverQueueSize) {
-                    setMaxReceiverQueueSize(allTopicPartitionsNumber.get());
+                if (allTopicPartitionsNumber.get() > 
getCurrentReceiverQueueSize()) {
+                    
setCurrentReceiverQueueSize(allTopicPartitionsNumber.get());
                 }
                 setState(State.Ready);
                 // We have successfully created N consumers, so we can start 
receiving messages now
@@ -244,7 +239,8 @@ public class MultiTopicsConsumerImpl<T> extends 
ConsumerBase<T> {
 
         if (getState() == State.Ready) {
             newConsumers.forEach(consumer -> {
-                
consumer.increaseAvailablePermits(consumer.getConnectionHandler().cnx(), 
conf.getReceiverQueueSize());
+                
consumer.increaseAvailablePermits(consumer.getConnectionHandler().cnx(),
+                        consumer.getCurrentReceiverQueueSize());
                 internalPinnedExecutor.execute(() -> 
receiveMessageFromConsumer(consumer));
             });
         }
@@ -260,6 +256,8 @@ public class MultiTopicsConsumerImpl<T> extends 
ConsumerBase<T> {
             messageReceived(consumer, message);
 
             int size = incomingMessages.size();
+            int maxReceiverQueueSize = getCurrentReceiverQueueSize();
+            int sharedQueueResumeThreshold = maxReceiverQueueSize / 2;
             if (size >= maxReceiverQueueSize
                     || (size > sharedQueueResumeThreshold && 
!pausedConsumers.isEmpty())) {
                 // mark this consumer to be resumed later: if No more space 
left in shared queue,
@@ -319,7 +317,7 @@ public class MultiTopicsConsumerImpl<T> extends 
ConsumerBase<T> {
     }
 
     private void resumeReceivingFromPausedConsumersIfNeeded() {
-        if (incomingMessages.size() <= sharedQueueResumeThreshold && 
!pausedConsumers.isEmpty()) {
+        if (incomingMessages.size() <= getCurrentReceiverQueueSize() / 2 && 
!pausedConsumers.isEmpty()) {
             while (true) {
                 ConsumerImpl<T> consumer = pausedConsumers.poll();
                 if (consumer == null) {
@@ -1076,8 +1074,8 @@ public class MultiTopicsConsumerImpl<T> extends 
ConsumerBase<T> {
 
         FutureUtil.waitForAll(futureList)
             .thenAccept(finalFuture -> {
-                if (allTopicPartitionsNumber.get() > maxReceiverQueueSize) {
-                    setMaxReceiverQueueSize(allTopicPartitionsNumber.get());
+                if (allTopicPartitionsNumber.get() > 
getCurrentReceiverQueueSize()) {
+                    
setCurrentReceiverQueueSize(allTopicPartitionsNumber.get());
                 }
 
                 // We have successfully created new consumers, so we can start 
receiving messages for them
@@ -1470,4 +1468,15 @@ public class MultiTopicsConsumerImpl<T> extends 
ConsumerBase<T> {
             acknowledgeCumulativeAsync(msg);
         }
     }
+
+    @Override
+    protected void setCurrentReceiverQueueSize(int newSize) {
+        checkArgument(newSize > 0, "receiver queue size should larger than 0");
+        if (log.isDebugEnabled()) {
+            log.debug("[{}][{}] setMaxReceiverQueueSize={}, previous={}", 
topic, subscription,
+                    getCurrentReceiverQueueSize(), newSize);
+        }
+        CURRENT_RECEIVER_QUEUE_SIZE_UPDATER.set(this, newSize);
+        resumeReceivingFromPausedConsumersIfNeeded();
+    }
 }
diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ZeroQueueConsumerImpl.java
 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ZeroQueueConsumerImpl.java
index fe6b929..5c7b85b 100644
--- 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ZeroQueueConsumerImpl.java
+++ 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ZeroQueueConsumerImpl.java
@@ -26,6 +26,7 @@ import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.locks.Lock;
 import java.util.concurrent.locks.ReentrantLock;
 import lombok.extern.slf4j.Slf4j;
+import org.apache.commons.lang.NotImplementedException;
 import org.apache.pulsar.client.api.Consumer;
 import org.apache.pulsar.client.api.Message;
 import org.apache.pulsar.client.api.MessageId;
@@ -191,4 +192,10 @@ public class ZeroQueueConsumerImpl<T> extends 
ConsumerImpl<T> {
             return null;
         });
     }
+
+    @Override
+    protected void setCurrentReceiverQueueSize(int newSize) {
+        //receiver queue size is fixed as 0.
+        throw new NotImplementedException("Receiver queue size can't be 
changed in ZeroQueueConsumerImpl");
+    }
 }
diff --git 
a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/ConsumerImplTest.java
 
b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/ConsumerImplTest.java
index ebcd5c8..fb38d48 100644
--- 
a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/ConsumerImplTest.java
+++ 
b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/ConsumerImplTest.java
@@ -217,4 +217,13 @@ public class ConsumerImplTest {
 
         Assert.assertTrue(consumer.paused);
     }
+
+    @Test
+    public void testMaxReceiverQueueSize() {
+        int size = consumer.getCurrentReceiverQueueSize();
+        int permits = consumer.getAvailablePermits();
+        consumer.setCurrentReceiverQueueSize(size + 100);
+        Assert.assertEquals(consumer.getCurrentReceiverQueueSize(), size + 
100);
+        Assert.assertEquals(consumer.getAvailablePermits(), permits + 100);
+    }
 }

Reply via email to