This is an automated email from the ASF dual-hosted git repository.
penghui pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 36f700f [PIP-74] Dependency of consumer client memory limit: support
dynamic limit of consumer receiver queue (#14400)
36f700f is described below
commit 36f700f92bc7ac7b598bdf4fa12d93e4c6dadfd2
Author: JiangHaiting <[email protected]>
AuthorDate: Mon Feb 28 09:53:42 2022 +0800
[PIP-74] Dependency of consumer client memory limit: support dynamic limit
of consumer receiver queue (#14400)
### Motivation
This is part of the work for [PIP
74](https://github.com/apache/pulsar/wiki/PIP-74%3A-Pulsar-client-memory-limits)
We need dynamic update `currentReceiverQueue` to control client memory.
### Modifications
Add getter and setter method for `ConsumerBase#maxReceiverQueueSize`.
- For `ConsumerImpl`, we need update availablePermits together.
- For `MultiTopicsConsumerImpl`, we need update inner consumers together
and trigger paused consumers.
---
.../client/impl/DynamicReceiverQueueSizeTest.java | 121 +++++++++++++++++++++
.../apache/pulsar/client/impl/ConsumerBase.java | 27 +++--
.../apache/pulsar/client/impl/ConsumerImpl.java | 23 ++--
.../client/impl/MultiTopicsConsumerImpl.java | 31 ++++--
.../pulsar/client/impl/ZeroQueueConsumerImpl.java | 7 ++
.../pulsar/client/impl/ConsumerImplTest.java | 9 ++
6 files changed, 193 insertions(+), 25 deletions(-)
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/DynamicReceiverQueueSizeTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/DynamicReceiverQueueSizeTest.java
new file mode 100644
index 0000000..9c81aec
--- /dev/null
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/DynamicReceiverQueueSizeTest.java
@@ -0,0 +1,121 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.pulsar.client.impl;
+
+
+import java.nio.charset.StandardCharsets;
+import lombok.Cleanup;
+import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest;
+import org.apache.pulsar.client.api.Producer;
+import org.apache.pulsar.client.api.PulsarClientException;
+import org.awaitility.Awaitility;
+import org.testng.Assert;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Test;
+
+public class DynamicReceiverQueueSizeTest extends MockedPulsarServiceBaseTest {
+ @BeforeClass(alwaysRun = true)
+ @Override
+ protected void setup() throws Exception {
+ super.internalSetup();
+ setupDefaultTenantAndNamespace();
+ }
+
+ @BeforeClass(alwaysRun = true)
+ @Override
+ protected void cleanup() throws Exception {
+ super.internalCleanup();
+ }
+
+ @Test
+ public void testConsumerImpl() throws PulsarClientException {
+ String topic = "persistent://public/default/testConsumerImpl" +
System.currentTimeMillis();
+ @Cleanup
+ ConsumerImpl<byte[]> consumer = (ConsumerImpl<byte[]>)
pulsarClient.newConsumer()
+ .topic(topic)
+ .subscriptionName("my-sub")
+ .receiverQueueSize(5)
+ .subscribe();
+ @Cleanup
+ Producer<byte[]> producer =
pulsarClient.newProducer().topic(topic).create();
+ byte[] data = "data".getBytes(StandardCharsets.UTF_8);
+ for (int i = 0; i < 10; i++) {
+ producer.send(data);
+ }
+ Assert.assertEquals(consumer.getCurrentReceiverQueueSize(), 5);
+ Awaitility.await().untilAsserted(() ->
Assert.assertEquals(consumer.getTotalIncomingMessages(), 5));
+ Assert.assertEquals(consumer.getAvailablePermits(), 0);
+
+ consumer.setCurrentReceiverQueueSize(8);
+ Assert.assertEquals(consumer.getCurrentReceiverQueueSize(), 8);
+ Assert.assertEquals(consumer.getAvailablePermits(), 3);
+
+ consumer.setCurrentReceiverQueueSize(10);
+ Awaitility.await().untilAsserted(() ->
Assert.assertEquals(consumer.getTotalIncomingMessages(), 10));
+ Assert.assertEquals(consumer.getAvailablePermits(), 0);
+
+ consumer.setCurrentReceiverQueueSize(3);
+ Assert.assertEquals(consumer.getAvailablePermits(), -7);
+ for (int i = 0; i < 7; i++) {
+ consumer.acknowledge(consumer.receive());
+ Assert.assertEquals(consumer.getAvailablePermits(), -6 + i);
+ }
+ consumer.acknowledge(consumer.receive()); //8
+ consumer.acknowledge(consumer.receive()); //9
+ consumer.acknowledge(consumer.receive()); //10
+ Assert.assertEquals(consumer.getAvailablePermits(), 0);
+
+ for (int i = 0; i < 10; i++) {
+ producer.send(data);
+ }
+ Awaitility.await().untilAsserted(() ->
Assert.assertEquals(consumer.getTotalIncomingMessages(), 3));
+ }
+
+ @Test
+ public void testMultiConsumerImpl() throws Exception {
+ String topic = "persistent://public/default/testMultiConsumerImpl" +
System.currentTimeMillis();
+ admin.topics().createPartitionedTopic(topic, 4);
+ @Cleanup
+ MultiTopicsConsumerImpl<byte[]> consumer =
(MultiTopicsConsumerImpl<byte[]>) pulsarClient.newConsumer()
+ .topic(topic)
+ .subscriptionName("my-sub")
+ .receiverQueueSize(5)
+ .subscribe();
+ @Cleanup
+ Producer<byte[]> producer =
pulsarClient.newProducer().topic(topic).enableBatching(false).create();
+ byte[] data = "data".getBytes(StandardCharsets.UTF_8);
+ for (int i = 0; i < 30; i++) {
+ producer.send(data);
+ }
+ Assert.assertEquals(consumer.getCurrentReceiverQueueSize(), 5);
+ for (ConsumerImpl<byte[]> c : consumer.getConsumers()) {
+ Assert.assertEquals(c.getCurrentReceiverQueueSize(), 5);
+ }
+ Awaitility.await().untilAsserted(() -> {
+ Assert.assertEquals(consumer.getAvailablePermits(), 0);
+ Assert.assertTrue(consumer.getTotalIncomingMessages() >= 5);
+ Assert.assertTrue(consumer.getTotalIncomingMessages() < 30);
+ });
+ consumer.setCurrentReceiverQueueSize(30);
+ Awaitility.await().untilAsserted(() -> {
+ Assert.assertEquals(consumer.getTotalIncomingMessages(), 30);
+ });
+ }
+}
diff --git
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBase.java
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBase.java
index 7f0575a..6d3e81d 100644
---
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBase.java
+++
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBase.java
@@ -35,6 +35,7 @@ import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
import java.util.concurrent.atomic.AtomicLongFieldUpdater;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
@@ -42,6 +43,7 @@ import lombok.Getter;
import lombok.Setter;
import org.apache.pulsar.client.api.BatchReceivePolicy;
import org.apache.pulsar.client.api.Consumer;
+import org.apache.pulsar.client.api.ConsumerBuilder;
import org.apache.pulsar.client.api.ConsumerEventListener;
import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.MessageId;
@@ -78,7 +80,10 @@ public abstract class ConsumerBase<T> extends HandlerState
implements Consumer<T
final BlockingQueue<Message<T>> incomingMessages;
protected ConcurrentOpenHashMap<MessageIdImpl, MessageIdImpl[]>
unAckedChunkedMessageIdSequenceMap;
protected final ConcurrentLinkedQueue<CompletableFuture<Message<T>>>
pendingReceives;
- protected int maxReceiverQueueSize;
+ protected final int maxReceiverQueueSize;
+ private volatile int currentReceiverQueueSize;
+ protected static final AtomicIntegerFieldUpdater<ConsumerBase>
CURRENT_RECEIVER_QUEUE_SIZE_UPDATER =
+ AtomicIntegerFieldUpdater.newUpdater(ConsumerBase.class,
"currentReceiverQueueSize");
protected final Schema<T> schema;
protected final ConsumerInterceptors<T> interceptors;
protected final BatchReceivePolicy batchReceivePolicy;
@@ -119,6 +124,7 @@ public abstract class ConsumerBase<T> extends HandlerState
implements Consumer<T
this.pendingBatchReceives = Queues.newConcurrentLinkedQueue();
this.schema = schema;
this.interceptors = interceptors;
+ CURRENT_RECEIVER_QUEUE_SIZE_UPDATER.set(this, receiverQueueSize);
if (conf.getBatchReceivePolicy() != null) {
BatchReceivePolicy userBatchReceivePolicy =
conf.getBatchReceivePolicy();
if (userBatchReceivePolicy.getMaxNumMessages() >
this.maxReceiverQueueSize) {
@@ -185,7 +191,7 @@ public abstract class ConsumerBase<T> extends HandlerState
implements Consumer<T
@Override
public Message<T> receive(int timeout, TimeUnit unit) throws
PulsarClientException {
- if (conf.getReceiverQueueSize() == 0) {
+ if (getCurrentReceiverQueueSize() == 0) {
throw new PulsarClientException.InvalidConfigurationException(
"Can't use receive with timeout, if the queue size is 0");
}
@@ -721,10 +727,6 @@ public abstract class ConsumerBase<T> extends HandlerState
implements Consumer<T
+ '}';
}
- protected void setMaxReceiverQueueSize(int newSize) {
- this.maxReceiverQueueSize = newSize;
- }
-
protected Message<T> beforeConsume(Message<T> message) {
if (interceptors != null) {
return interceptors.beforeConsume(this, message);
@@ -811,7 +813,7 @@ public abstract class ConsumerBase<T> extends HandlerState
implements Consumer<T
throw new PulsarClientException.InvalidConfigurationException(
"Cannot use receive() when a listener has been set");
}
- if (conf.getReceiverQueueSize() == 0) {
+ if (getCurrentReceiverQueueSize() == 0) {
throw new PulsarClientException.InvalidConfigurationException(
"Can't use batch receive, if the queue size is 0");
}
@@ -1045,6 +1047,17 @@ public abstract class ConsumerBase<T> extends
HandlerState implements Consumer<T
resetIncomingMessageSize();
}
+ /**
+ * Update the size of the consumer receive queue.
+ * See {@link ConsumerBuilder#receiverQueueSize(int)}.
+ * @param newSize new size of the receiver queue.
+ */
+ protected abstract void setCurrentReceiverQueueSize(int newSize);
+
+ public int getCurrentReceiverQueueSize() {
+ return CURRENT_RECEIVER_QUEUE_SIZE_UPDATER.get(this);
+ }
+
protected abstract void completeOpBatchReceive(OpBatchReceive<T> op);
private ExecutorService getExternalExecutor(Message<T> msg) {
diff --git
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
index a3a4e2a..7d2e1b6 100644
---
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
+++
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
@@ -138,8 +138,6 @@ public class ConsumerImpl<T> extends ConsumerBase<T>
implements ConnectionHandle
private final int partitionIndex;
private final boolean hasParentConsumer;
- private final int receiverQueueRefillThreshold;
-
private final UnAckedMessageTracker unAckedMessageTracker;
private final AcknowledgmentsGroupingTracker
acknowledgmentsGroupingTracker;
private final NegativeAcksTracker negativeAcksTracker;
@@ -254,7 +252,6 @@ public class ConsumerImpl<T> extends ConsumerBase<T>
implements ConnectionHandle
this.lookupDeadline = System.currentTimeMillis() +
client.getConfiguration().getLookupTimeoutMs();
this.partitionIndex = partitionIndex;
this.hasParentConsumer = hasParentConsumer;
- this.receiverQueueRefillThreshold = conf.getReceiverQueueSize() / 2;
this.priorityLevel = conf.getPriorityLevel();
this.readCompacted = conf.isReadCompacted();
this.subscriptionInitialPosition =
conf.getSubscriptionInitialPosition();
@@ -839,8 +836,8 @@ public class ConsumerImpl<T> extends ConsumerBase<T>
implements ConnectionHandle
boolean firstTimeConnect = subscribeFuture.complete(this);
// if the consumer is not partitioned or is re-connected and
is partitioned, we send the flow
// command to receive messages.
- if (!(firstTimeConnect && hasParentConsumer) &&
conf.getReceiverQueueSize() != 0) {
- increaseAvailablePermits(cnx, conf.getReceiverQueueSize());
+ if (!(firstTimeConnect && hasParentConsumer) &&
getCurrentReceiverQueueSize() != 0) {
+ increaseAvailablePermits(cnx,
getCurrentReceiverQueueSize());
}
}).exceptionally((e) -> {
deregisterFromClientCnx();
@@ -1449,7 +1446,7 @@ public class ConsumerImpl<T> extends ConsumerBase<T>
implements ConnectionHandle
return;
}
- if (conf.getReceiverQueueSize() == 0) {
+ if (getCurrentReceiverQueueSize() == 0) {
// call interceptor and complete received callback
trackMessage(message);
interceptAndComplete(message, receivedFuture);
@@ -1596,7 +1593,7 @@ public class ConsumerImpl<T> extends ConsumerBase<T>
implements ConnectionHandle
protected void increaseAvailablePermits(ClientCnx currentCnx, int delta) {
int available = AVAILABLE_PERMITS_UPDATER.addAndGet(this, delta);
- while (available >= receiverQueueRefillThreshold && !paused) {
+ while (available >= getCurrentReceiverQueueSize() / 2 && !paused) {
if (AVAILABLE_PERMITS_UPDATER.compareAndSet(this, available, 0)) {
sendFlowPermitsToBroker(currentCnx, available);
break;
@@ -1611,6 +1608,18 @@ public class ConsumerImpl<T> extends ConsumerBase<T>
implements ConnectionHandle
}
@Override
+ protected void setCurrentReceiverQueueSize(int newSize) {
+ checkArgument(newSize > 0, "receiver queue size should larger than 0");
+ int oldSize = CURRENT_RECEIVER_QUEUE_SIZE_UPDATER.getAndSet(this,
newSize);
+ int delta = newSize - oldSize;
+ if (log.isDebugEnabled()) {
+ log.debug("[{}][{}] update maxReceiverQueueSize from {} to {},
increaseAvailablePermits by {}",
+ topic, subscription, oldSize, newSize, delta);
+ }
+ increaseAvailablePermits(delta);
+ }
+
+ @Override
public void pause() {
paused = true;
}
diff --git
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java
index 844590b..a92edff 100644
---
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java
+++
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java
@@ -85,10 +85,6 @@ public class MultiTopicsConsumerImpl<T> extends
ConsumerBase<T> {
// shared incoming queue was full
private final ConcurrentLinkedQueue<ConsumerImpl<T>> pausedConsumers;
- // Threshold for the shared queue. When the size of the shared queue goes
below the threshold, we are going to
- // resume receiving from the paused consumer partitions
- private final int sharedQueueResumeThreshold;
-
// sum of topicPartitions, simple topic has 1, partitioned topic equals to
partition number.
AtomicInteger allTopicPartitionsNumber;
@@ -142,7 +138,6 @@ public class MultiTopicsConsumerImpl<T> extends
ConsumerBase<T> {
this.partitionedTopics = new ConcurrentHashMap<>();
this.consumers = new ConcurrentHashMap<>();
this.pausedConsumers = new ConcurrentLinkedQueue<>();
- this.sharedQueueResumeThreshold = maxReceiverQueueSize / 2;
this.allTopicPartitionsNumber = new AtomicInteger(0);
this.startMessageId = startMessageId != null
? new
BatchMessageIdImpl(MessageIdImpl.convertToMessageIdImpl(startMessageId))
@@ -187,8 +182,8 @@ public class MultiTopicsConsumerImpl<T> extends
ConsumerBase<T> {
.collect(Collectors.toList());
FutureUtil.waitForAll(futures)
.thenAccept(finalFuture -> {
- if (allTopicPartitionsNumber.get() > maxReceiverQueueSize) {
- setMaxReceiverQueueSize(allTopicPartitionsNumber.get());
+ if (allTopicPartitionsNumber.get() >
getCurrentReceiverQueueSize()) {
+
setCurrentReceiverQueueSize(allTopicPartitionsNumber.get());
}
setState(State.Ready);
// We have successfully created N consumers, so we can start
receiving messages now
@@ -244,7 +239,8 @@ public class MultiTopicsConsumerImpl<T> extends
ConsumerBase<T> {
if (getState() == State.Ready) {
newConsumers.forEach(consumer -> {
-
consumer.increaseAvailablePermits(consumer.getConnectionHandler().cnx(),
conf.getReceiverQueueSize());
+
consumer.increaseAvailablePermits(consumer.getConnectionHandler().cnx(),
+ consumer.getCurrentReceiverQueueSize());
internalPinnedExecutor.execute(() ->
receiveMessageFromConsumer(consumer));
});
}
@@ -260,6 +256,8 @@ public class MultiTopicsConsumerImpl<T> extends
ConsumerBase<T> {
messageReceived(consumer, message);
int size = incomingMessages.size();
+ int maxReceiverQueueSize = getCurrentReceiverQueueSize();
+ int sharedQueueResumeThreshold = maxReceiverQueueSize / 2;
if (size >= maxReceiverQueueSize
|| (size > sharedQueueResumeThreshold &&
!pausedConsumers.isEmpty())) {
// mark this consumer to be resumed later: if No more space
left in shared queue,
@@ -319,7 +317,7 @@ public class MultiTopicsConsumerImpl<T> extends
ConsumerBase<T> {
}
private void resumeReceivingFromPausedConsumersIfNeeded() {
- if (incomingMessages.size() <= sharedQueueResumeThreshold &&
!pausedConsumers.isEmpty()) {
+ if (incomingMessages.size() <= getCurrentReceiverQueueSize() / 2 &&
!pausedConsumers.isEmpty()) {
while (true) {
ConsumerImpl<T> consumer = pausedConsumers.poll();
if (consumer == null) {
@@ -1076,8 +1074,8 @@ public class MultiTopicsConsumerImpl<T> extends
ConsumerBase<T> {
FutureUtil.waitForAll(futureList)
.thenAccept(finalFuture -> {
- if (allTopicPartitionsNumber.get() > maxReceiverQueueSize) {
- setMaxReceiverQueueSize(allTopicPartitionsNumber.get());
+ if (allTopicPartitionsNumber.get() >
getCurrentReceiverQueueSize()) {
+
setCurrentReceiverQueueSize(allTopicPartitionsNumber.get());
}
// We have successfully created new consumers, so we can start
receiving messages for them
@@ -1470,4 +1468,15 @@ public class MultiTopicsConsumerImpl<T> extends
ConsumerBase<T> {
acknowledgeCumulativeAsync(msg);
}
}
+
+ @Override
+ protected void setCurrentReceiverQueueSize(int newSize) {
+ checkArgument(newSize > 0, "receiver queue size should larger than 0");
+ if (log.isDebugEnabled()) {
+ log.debug("[{}][{}] setMaxReceiverQueueSize={}, previous={}",
topic, subscription,
+ getCurrentReceiverQueueSize(), newSize);
+ }
+ CURRENT_RECEIVER_QUEUE_SIZE_UPDATER.set(this, newSize);
+ resumeReceivingFromPausedConsumersIfNeeded();
+ }
}
diff --git
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ZeroQueueConsumerImpl.java
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ZeroQueueConsumerImpl.java
index fe6b929..5c7b85b 100644
---
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ZeroQueueConsumerImpl.java
+++
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ZeroQueueConsumerImpl.java
@@ -26,6 +26,7 @@ import java.util.concurrent.CompletableFuture;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import lombok.extern.slf4j.Slf4j;
+import org.apache.commons.lang.NotImplementedException;
import org.apache.pulsar.client.api.Consumer;
import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.MessageId;
@@ -191,4 +192,10 @@ public class ZeroQueueConsumerImpl<T> extends
ConsumerImpl<T> {
return null;
});
}
+
+ @Override
+ protected void setCurrentReceiverQueueSize(int newSize) {
+ //receiver queue size is fixed as 0.
+ throw new NotImplementedException("Receiver queue size can't be
changed in ZeroQueueConsumerImpl");
+ }
}
diff --git
a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/ConsumerImplTest.java
b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/ConsumerImplTest.java
index ebcd5c8..fb38d48 100644
---
a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/ConsumerImplTest.java
+++
b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/ConsumerImplTest.java
@@ -217,4 +217,13 @@ public class ConsumerImplTest {
Assert.assertTrue(consumer.paused);
}
+
+ @Test
+ public void testMaxReceiverQueueSize() {
+ int size = consumer.getCurrentReceiverQueueSize();
+ int permits = consumer.getAvailablePermits();
+ consumer.setCurrentReceiverQueueSize(size + 100);
+ Assert.assertEquals(consumer.getCurrentReceiverQueueSize(), size +
100);
+ Assert.assertEquals(consumer.getAvailablePermits(), permits + 100);
+ }
}