This is an automated email from the ASF dual-hosted git repository. eolivelli pushed a commit to branch 2.7.2_ds_tmp in repository https://gitbox.apache.org/repos/asf/pulsar.git
commit a26de54f1bfe68fdd3d2084dc4505fceb768f96c Author: Rajan Dhabalia <rdhaba...@apache.org> AuthorDate: Tue May 11 15:31:50 2021 -0700 [pulsar-broker] Dispatch messaages to consumer with permits (#10417) * [pulsar-broker] Dispatch messaages to consumer with permits * move test (cherry picked from commit 3550f2e7c1bf41ff548d2cf5d16ae50ebf4c0556) --- .../PersistentDispatcherMultipleConsumers.java | 25 ++++++--- .../pulsar/client/api/ConsumerRedeliveryTest.java | 59 ++++++++++++++++++++++ .../apache/pulsar/client/impl/ConsumerBase.java | 11 ++++ 3 files changed, 89 insertions(+), 6 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java index f3c6d94..bba7c98 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java @@ -249,8 +249,9 @@ public class PersistentDispatcherMultipleConsumers extends AbstractDispatcherMul public synchronized void readMoreEntries() { // totalAvailablePermits may be updated by other threads - int currentTotalAvailablePermits = totalAvailablePermits; - if (currentTotalAvailablePermits > 0 && isAtleastOneConsumerAvailable()) { + int firstAvailableConsumerPermits = getFirstAvailableConsumerPermits(); + int currentTotalAvailablePermits = Math.max(totalAvailablePermits, firstAvailableConsumerPermits); + if (currentTotalAvailablePermits > 0 && firstAvailableConsumerPermits > 0) { int messagesToRead = calculateNumOfMessageToRead(currentTotalAvailablePermits); if (-1 == messagesToRead) { @@ -510,7 +511,15 @@ public class PersistentDispatcherMultipleConsumers extends AbstractDispatcherMul long totalMessagesSent = 0; long totalBytesSent = 0; - while (entriesToDispatch > 0 && totalAvailablePermits > 0 && isAtleastOneConsumerAvailable()) { + int firstAvailableConsumerPermits, currentTotalAvailablePermits; + boolean dispatchMessage; + while (entriesToDispatch > 0) { + firstAvailableConsumerPermits = getFirstAvailableConsumerPermits(); + currentTotalAvailablePermits = Math.max(totalAvailablePermits, firstAvailableConsumerPermits); + dispatchMessage = currentTotalAvailablePermits > 0 && firstAvailableConsumerPermits > 0; + if (!dispatchMessage) { + break; + } Consumer c = getNextConsumer(); if (c == null) { // Do nothing, cursor will be rewind at reconnection @@ -668,16 +677,20 @@ public class PersistentDispatcherMultipleConsumers extends AbstractDispatcherMul * @return */ protected boolean isAtleastOneConsumerAvailable() { + return getFirstAvailableConsumerPermits() > 0; + } + + protected int getFirstAvailableConsumerPermits() { if (consumerList.isEmpty() || IS_CLOSED_UPDATER.get(this) == TRUE) { // abort read if no consumers are connected or if disconnect is initiated - return false; + return 0; } for(Consumer consumer : consumerList) { if (isConsumerAvailable(consumer)) { - return true; + return consumer.getAvailablePermits(); } } - return false; + return 0; } private boolean isConsumerWritable() { diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/ConsumerRedeliveryTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/ConsumerRedeliveryTest.java index e828598..95cf90a 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/ConsumerRedeliveryTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/ConsumerRedeliveryTest.java @@ -21,6 +21,7 @@ package org.apache.pulsar.client.api; import java.util.ArrayList; import java.util.List; import java.util.Set; +import java.util.UUID; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit; @@ -29,6 +30,8 @@ import lombok.Cleanup; import org.apache.pulsar.client.impl.ConsumerImpl; import org.apache.pulsar.client.impl.MessageIdImpl; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import org.testng.annotations.AfterClass; import org.testng.annotations.BeforeClass; import org.testng.annotations.Test; @@ -40,6 +43,9 @@ import static org.testng.Assert.assertTrue; import static org.testng.Assert.assertEquals; public class ConsumerRedeliveryTest extends ProducerConsumerBase { + + private static final Logger log = LoggerFactory.getLogger(ConsumerRedeliveryTest.class); + @BeforeClass @Override protected void setup() throws Exception { @@ -179,4 +185,57 @@ public class ConsumerRedeliveryTest extends ProducerConsumerBase { consumer.close(); } + /** + * Validates broker should dispatch messages to consumer which still has the permit to consume more messages. + * + * @throws Exception + */ + @Test + public void testConsumerWithPermitReceiveBatchMessages() throws Exception { + + log.info("-- Starting {} test --", methodName); + + final int queueSize = 10; + int batchSize = 100; + String subName = "my-subscriber-name"; + String topicName = "permitReceiveBatchMessages"+(UUID.randomUUID().toString()); + ConsumerImpl<byte[]> consumer1 = (ConsumerImpl<byte[]>) pulsarClient.newConsumer().topic(topicName) + .receiverQueueSize(queueSize).subscriptionType(SubscriptionType.Shared).subscriptionName(subName) + .subscribe(); + + ProducerBuilder<byte[]> producerBuilder = pulsarClient.newProducer().topic(topicName); + + producerBuilder.enableBatching(true); + producerBuilder.batchingMaxPublishDelay(2000, TimeUnit.MILLISECONDS); + producerBuilder.batchingMaxMessages(100); + + Producer<byte[]> producer = producerBuilder.create(); + for (int i = 0; i < batchSize; i++) { + String message = "my-message-" + i; + producer.sendAsync(message.getBytes()); + } + producer.flush(); + + for (int i = 0; i < queueSize; i++) { + String message = "my-message-" + i; + producer.sendAsync(message.getBytes()); + } + producer.flush(); + + retryStrategically((test) -> { + return consumer1.getTotalIncomingMessages() == batchSize; + }, 5, 2000); + + assertEquals(consumer1.getTotalIncomingMessages(), batchSize); + + ConsumerImpl<byte[]> consumer2 = (ConsumerImpl<byte[]>) pulsarClient.newConsumer().topic(topicName) + .receiverQueueSize(queueSize).subscriptionType(SubscriptionType.Shared).subscriptionName(subName) + .subscribe(); + + retryStrategically((test) -> { + return consumer2.getTotalIncomingMessages() == queueSize; + }, 5, 2000); + assertEquals(consumer2.getTotalIncomingMessages(), queueSize); + log.info("-- Exiting {} test --", methodName); + } } diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBase.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBase.java index 30c3d01..b296a21 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBase.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBase.java @@ -925,6 +925,17 @@ public abstract class ConsumerBase<T> extends HandlerState implements Consumer<T return INCOMING_MESSAGES_SIZE_UPDATER.get(this); } + public int getTotalIncomingMessages() { + return incomingMessages.size(); + } + + protected void clearIncomingMessages() { + // release messages if they are pooled messages + incomingMessages.forEach(Message::release); + incomingMessages.clear(); + resetIncomingMessageSize(); + } + protected abstract void completeOpBatchReceive(OpBatchReceive<T> op); private static final Logger log = LoggerFactory.getLogger(ConsumerBase.class);