wangjialing218 commented on a change in pull request #11325:
URL: https://github.com/apache/pulsar/pull/11325#discussion_r725836625
##########
File path:
pulsar-broker/src/test/java/org/apache/pulsar/client/api/SubscriptionMessageDispatchThrottlingTest.java
##########
@@ -310,6 +310,102 @@ public void
testBytesRateLimitingReceiveAllMessagesAfterThrottling(SubscriptionT
log.info("-- Exiting {} test --", methodName);
}
+ /**
+ * Verify whether the broker level rate-limiting is throttle
message-dispatching based on byte-rate or not
+ *
+ * <pre>
+ * 1. Broker level dispatch-byte-rate is equal to 1000 bytes per second.
+ * 2. Start two consumers for two topics.
+ * 3. Send 15 msgs to each of the two topics. Each msgs with 100 bytes,
thus 3000 bytes in total.
+ * 4. It should take up to 2 seconds to receive all messages of the two
topics.
+ * </pre>
+ *
+ * @param subscription
+ * @throws Exception
+ */
+ @Test(dataProvider = "subscriptions", timeOut = 8000)
+ public void
testBrokerBytesRateLimitingReceiveAllMessagesAfterThrottling(SubscriptionType
subscription) throws Exception {
+ log.info("-- Starting {} test --", methodName);
+
+ final String namespace1 = "my-property/throttling_ns1";
+ final String topicName1 = BrokerTestUtil.newUniqueName("persistent://"
+ namespace1 + "/throttlingAll");
+ final String namespace2 = "my-property/throttling_ns2";
+ final String topicName2 = BrokerTestUtil.newUniqueName("persistent://"
+ namespace2 + "/throttlingAll");
+ final String subName = "my-subscriber-name-" + subscription;
+
+ final int byteRate = 1000;
+
admin.brokers().updateDynamicConfiguration("brokerDispatchThrottlingMaxByteRate",
"" + byteRate);
+ admin.namespaces().createNamespace(namespace1,
Sets.newHashSet("test"));
+ admin.namespaces().createNamespace(namespace2,
Sets.newHashSet("test"));
+
+ final int numProducedMessagesEachTopic = 15;
+ final int numProducedMessages = numProducedMessagesEachTopic * 2;
+ final CountDownLatch latch = new CountDownLatch(numProducedMessages);
+ final AtomicInteger totalReceived = new AtomicInteger(0);
+ // enable throttling for nonBacklog consumers
+ conf.setDispatchThrottlingOnNonBacklogConsumerEnabled(true);
+
+ Consumer<byte[]> consumer1 =
pulsarClient.newConsumer().topic(topicName1).subscriptionName(subName)
+ .receiverQueueSize(10)
+ .subscriptionType(subscription).messageListener((c1, msg) -> {
+ Assert.assertNotNull(msg, "Message cannot be null");
+ String receivedMessage = new String(msg.getData());
+ log.debug("Received message [{}] in topic1",
receivedMessage);
+ totalReceived.incrementAndGet();
+ latch.countDown();
+ }).subscribe();
+
+ Consumer<byte[]> consumer2 =
pulsarClient.newConsumer().topic(topicName2).subscriptionName(subName)
+ .receiverQueueSize(10)
+ .subscriptionType(subscription).messageListener((c1, msg) -> {
+ Assert.assertNotNull(msg, "Message cannot be null");
+ String receivedMessage = new String(msg.getData());
+ log.debug("Received message [{}] in topic2",
receivedMessage);
+ totalReceived.incrementAndGet();
+ latch.countDown();
+ }).subscribe();
+
+ Producer<byte[]> producer1 =
pulsarClient.newProducer().topic(topicName1).create();
+ Producer<byte[]> producer2 =
pulsarClient.newProducer().topic(topicName2).create();
+
+ boolean isMessageRateUpdate = false;
+ DispatchRateLimiter dispatchRateLimiter;
+ int retry = 5;
+ for (int i = 0; i < retry; i++) {
+ dispatchRateLimiter =
pulsar.getBrokerService().getBrokerDispatchRateLimiter();
+ if (dispatchRateLimiter != null
+ && dispatchRateLimiter.getDispatchRateOnByte() > 0) {
+ isMessageRateUpdate = true;
+ break;
+ } else {
+ if (i != retry - 1) {
+ Thread.sleep(100);
+ }
+ }
+ }
+ Assert.assertTrue(isMessageRateUpdate);
+
+ long start = System.currentTimeMillis();
+ // Asynchronously produce messages
+ for (int i = 0; i < numProducedMessagesEachTopic; i++) {
+ producer1.send(new byte[byteRate / 10]);
+ producer2.send(new byte[byteRate / 10]);
+ }
+ latch.await();
+ Assert.assertEquals(totalReceived.get(), numProducedMessages, 10);
+ long end = System.currentTimeMillis();
+ log.info("-- time to receive all messages: {} ", end - start);
+
+ // first 10 messages, which equals receiverQueueSize, will not wait.
+ Assert.assertTrue((end - start) >= 2000);
+
+ consumer1.close();
+ consumer2.close();
+ producer1.close();
+ producer2.close();
+ log.info("-- Exiting {} test --", methodName);
Review comment:
For topic dispatch rate limiter, we can configure from namespace level
policies and topic level policies, and topic level policies will have a high
priority. That's because the enforcement is done at the topic level.
For broker level, topic level and subscription level dispatch rate limiter,
the enforcement is done at different level, so they have same priority and take
effect together.
I have add a test, when all these dispatch rate limiter are configured, the
minimum dispatch rate limiter should take effect in case of one consumer for
one topic.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]