This is an automated email from the ASF dual-hosted git repository. chenhang pushed a commit to branch branch-2.8 in repository https://gitbox.apache.org/repos/asf/pulsar.git
commit 0a57e2c0ec9e536d53b4e20309bb64d0f5f0f17b Author: GuoJiwei <[email protected]> AuthorDate: Fri Aug 6 16:06:18 2021 +0800 Fix Consumer listener does not respect receiver queue size (#11455) Fixes #11008 (cherry picked from commit 09944d9891e0a433da9c7a1cf7b80acf5b3c1116) --- .../client/api/KeySharedSubscriptionTest.java | 6 +- .../client/api/SimpleProducerConsumerTest.java | 95 ++++++++++++++++++---- .../apache/pulsar/client/impl/ConsumerBase.java | 31 ++++--- 3 files changed, 102 insertions(+), 30 deletions(-) diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/KeySharedSubscriptionTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/KeySharedSubscriptionTest.java index e998a02..e785ac1 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/KeySharedSubscriptionTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/KeySharedSubscriptionTest.java @@ -525,9 +525,9 @@ public class KeySharedSubscriptionTest extends ProducerConsumerBase { // Since only 1 out of 10 consumers is stuck, we should be able to receive ~90% messages, // plus or minus for some skew in the key distribution. - Thread.sleep(5000); - - assertEquals((double) receivedMessages.get(), N * 0.9, N * 0.3); + Awaitility.await().untilAsserted(() -> { + assertEquals((double) receivedMessages.get(), N * 0.9, N * 0.3); + }); } finally { for (PulsarClient c : clients) { c.close(); diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleProducerConsumerTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleProducerConsumerTest.java index 5ebb2fb..6302cbd 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleProducerConsumerTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleProducerConsumerTest.java @@ -4078,11 +4078,14 @@ public class SimpleProducerConsumerTest extends ProducerConsumerBase { * <p> * Test starts consumer with 10 partitions where one of the partition listener gets blocked but that will not impact * processing of other 9 partitions and they will be processed successfully. + * As of involved #11455(Fix Consumer listener does not respect receiver queue size), + * This test has changed the purpose that different thread run the messageListener. Because messageListener has to + * be called one by one, it's possible to run by the same one thread. * * @throws Exception */ @Test(timeOut = 20000) - public void testPartitionTopicsOnSeparateListner() throws Exception { + public void testPartitionTopicsOnSeparateListener() throws Exception { log.info("-- Starting {} test --", methodName); final String topicName = "persistent://my-property/my-ns/one-partitioned-topic"; @@ -4098,20 +4101,12 @@ public class SimpleProducerConsumerTest extends ProducerConsumerBase { // each partition int totalMessages = partitions * 2; - CountDownLatch latch = new CountDownLatch(totalMessages - 2); - CountDownLatch blockedMessageLatch = new CountDownLatch(1); + CountDownLatch latch = new CountDownLatch(1); AtomicInteger count = new AtomicInteger(); Set<String> listenerThreads = Sets.newConcurrentHashSet(); MessageListener<byte[]> messageListener = (c, m) -> { - if (count.incrementAndGet() == 1) { - try { - // blocking one of the partition's listener thread will not impact other topics - blockedMessageLatch.await(); - } catch (InterruptedException e) { - // Ok - } - } else { + if (count.incrementAndGet() == totalMessages) { latch.countDown(); } listenerThreads.add(Thread.currentThread().getName()); @@ -4130,9 +4125,79 @@ public class SimpleProducerConsumerTest extends ProducerConsumerBase { producer1.newMessage().value(("one-partitioned-topic-value-producer1-" + i).getBytes(UTF_8)).send(); } latch.await(); - assertEquals(listenerThreads.size(), partitions - 1); - // unblock the listener thread - blockedMessageLatch.countDown(); + assertTrue(listenerThreads.size() >= 1); log.info("-- Exiting {} test --", methodName); } -} \ No newline at end of file + + @Test(timeOut = 30000) + public void testShareConsumerWithMessageListener() throws Exception { + String topic = "testReadAheadWhenAddingConsumers-" + UUID.randomUUID(); + int total = 200; + Set<Integer> resultSet = Sets.newConcurrentHashSet(); + AtomicInteger r1 = new AtomicInteger(0); + AtomicInteger r2 = new AtomicInteger(0); + + @Cleanup + Producer<Integer> producer = pulsarClient.newProducer(Schema.INT32) + .topic(topic) + .maxPendingMessages(500) + .enableBatching(false) + .create(); + + @Cleanup + Consumer<Integer> c1 = pulsarClient.newConsumer(Schema.INT32) + .topic(topic) + .subscriptionName("shared") + .subscriptionType(SubscriptionType.Shared) + .receiverQueueSize(10) + .consumerName("c1") + .messageListener((MessageListener<Integer>) (consumer, msg) -> { + log.info("c1 received : {}", msg.getValue()); + try { + resultSet.add(msg.getValue()); + r1.incrementAndGet(); + consumer.acknowledge(msg); + Thread.sleep(10); + } catch (InterruptedException ignore) { + // + } catch (PulsarClientException ex) { + log.error("c1 acknowledge error", ex); + } + }) + .subscribe(); + + for (int i = 0; i < total; i++) { + producer.newMessage() + .value(i) + .send(); + } + + @Cleanup + Consumer<Integer> c2 = pulsarClient.newConsumer(Schema.INT32) + .topic(topic) + .subscriptionName("shared") + .subscriptionType(SubscriptionType.Shared) + .receiverQueueSize(10) + .consumerName("c2") + .messageListener((MessageListener<Integer>) (consumer, msg) -> { + log.info("c2 received : {}", msg.getValue()); + try { + resultSet.add(msg.getValue()); + r2.incrementAndGet(); + consumer.acknowledge(msg); + Thread.sleep(10); + } catch (InterruptedException ignore) { + // + } catch (PulsarClientException ex) { + log.error("c2 acknowledge error", ex); + } + }) + .subscribe(); + + Awaitility.await().untilAsserted(() -> { + assertTrue(r1.get() >= 1); + assertTrue(r2.get() >= 1); + assertEquals(resultSet.size(), total); + }); + } +} diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBase.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBase.java index 5c02904..9e3757b 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBase.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBase.java @@ -35,6 +35,7 @@ import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicLongFieldUpdater; import io.netty.util.Timeout; @@ -86,6 +87,7 @@ public abstract class ConsumerBase<T> extends HandlerState implements Consumer<T protected volatile long incomingMessagesSize = 0; protected volatile Timeout batchReceiveTimeout = null; protected final Lock reentrantLock = new ReentrantLock(); + private final AtomicInteger executorQueueSize = new AtomicInteger(0); protected ConsumerBase(PulsarClientImpl client, String topic, ConsumerConfigurationData<T> conf, int receiverQueueSize, ExecutorProvider executorProvider, @@ -895,24 +897,26 @@ public abstract class ConsumerBase<T> extends HandlerState implements Consumer<T protected void triggerListener() { // Trigger the notification on the message listener in a separate thread to avoid blocking the networking // thread while the message processing happens - Message<T> msg; - do { - try { - msg = internalReceive(0, TimeUnit.MILLISECONDS); + try { + // Control executor to call MessageListener one by one. + if (executorQueueSize.get() < 1) { + final Message<T> msg = internalReceive(0, TimeUnit.MILLISECONDS); if (msg != null) { - final Message<T> finalMsg = msg; + executorQueueSize.incrementAndGet(); if (SubscriptionType.Key_Shared == conf.getSubscriptionType()) { - executorProvider.getExecutor(peekMessageKey(finalMsg)).execute(() -> - callMessageListener(finalMsg)); + executorProvider.getExecutor(peekMessageKey(msg)).execute(() -> + callMessageListener(msg)); } else { - getExecutor(msg).execute(() -> callMessageListener(finalMsg)); + getExecutor(msg).execute(() -> { + callMessageListener(msg); + }); } } - } catch (PulsarClientException e) { - log.warn("[{}] [{}] Failed to dequeue the message for listener", topic, subscription, e); - return; } - } while (msg != null); + } catch (PulsarClientException e) { + log.warn("[{}] [{}] Failed to dequeue the message for listener", topic, subscription, e); + return; + } if (log.isDebugEnabled()) { log.debug("[{}] [{}] Message has been cleared from the queue", topic, subscription); @@ -929,6 +933,9 @@ public abstract class ConsumerBase<T> extends HandlerState implements Consumer<T } catch (Throwable t) { log.error("[{}][{}] Message listener error in processing message: {}", topic, subscription, msg.getMessageId(), t); + } finally { + executorQueueSize.decrementAndGet(); + triggerListener(); } }
