wangjialing218 commented on a change in pull request #11325:
URL: https://github.com/apache/pulsar/pull/11325#discussion_r725830410



##########
File path: 
pulsar-broker/src/test/java/org/apache/pulsar/client/api/SubscriptionMessageDispatchThrottlingTest.java
##########
@@ -310,6 +310,102 @@ public void 
testBytesRateLimitingReceiveAllMessagesAfterThrottling(SubscriptionT
         log.info("-- Exiting {} test --", methodName);
     }
 
+    /**
+     * Verify whether the broker level rate-limiting is throttle 
message-dispatching based on byte-rate or not
+     *
+     * <pre>
+     *  1. Broker level dispatch-byte-rate is equal to 1000 bytes per second.
+     *  2. Start two consumers for two topics.
+     *  3. Send 15 msgs to each of the two topics. Each msgs with 100 bytes, 
thus 3000 bytes in total.
+     *  4. It should take up to 2 seconds to receive all messages of the two 
topics.
+     * </pre>
+     *
+     * @param subscription
+     * @throws Exception
+     */
+    @Test(dataProvider = "subscriptions", timeOut = 8000)
+    public void 
testBrokerBytesRateLimitingReceiveAllMessagesAfterThrottling(SubscriptionType 
subscription) throws Exception {
+        log.info("-- Starting {} test --", methodName);
+
+        final String namespace1 = "my-property/throttling_ns1";
+        final String topicName1 = BrokerTestUtil.newUniqueName("persistent://" 
+ namespace1 + "/throttlingAll");
+        final String namespace2 = "my-property/throttling_ns2";
+        final String topicName2 = BrokerTestUtil.newUniqueName("persistent://" 
+ namespace2 + "/throttlingAll");
+        final String subName = "my-subscriber-name-" + subscription;
+
+        final int byteRate = 1000;
+        
admin.brokers().updateDynamicConfiguration("brokerDispatchThrottlingMaxByteRate",
 "" + byteRate);
+        admin.namespaces().createNamespace(namespace1, 
Sets.newHashSet("test"));
+        admin.namespaces().createNamespace(namespace2, 
Sets.newHashSet("test"));
+
+        final int numProducedMessagesEachTopic = 15;
+        final int numProducedMessages = numProducedMessagesEachTopic * 2;
+        final CountDownLatch latch = new CountDownLatch(numProducedMessages);
+        final AtomicInteger totalReceived = new AtomicInteger(0);
+        // enable throttling for nonBacklog consumers
+        conf.setDispatchThrottlingOnNonBacklogConsumerEnabled(true);
+
+        Consumer<byte[]> consumer1 = 
pulsarClient.newConsumer().topic(topicName1).subscriptionName(subName)
+                .receiverQueueSize(10)
+                .subscriptionType(subscription).messageListener((c1, msg) -> {
+                    Assert.assertNotNull(msg, "Message cannot be null");
+                    String receivedMessage = new String(msg.getData());
+                    log.debug("Received message [{}] in topic1", 
receivedMessage);
+                    totalReceived.incrementAndGet();
+                    latch.countDown();
+                }).subscribe();
+
+        Consumer<byte[]> consumer2 = 
pulsarClient.newConsumer().topic(topicName2).subscriptionName(subName)
+                .receiverQueueSize(10)
+                .subscriptionType(subscription).messageListener((c1, msg) -> {
+                    Assert.assertNotNull(msg, "Message cannot be null");
+                    String receivedMessage = new String(msg.getData());
+                    log.debug("Received message [{}] in topic2", 
receivedMessage);
+                    totalReceived.incrementAndGet();
+                    latch.countDown();
+                }).subscribe();
+
+        Producer<byte[]> producer1 = 
pulsarClient.newProducer().topic(topicName1).create();
+        Producer<byte[]> producer2 = 
pulsarClient.newProducer().topic(topicName2).create();
+
+        boolean isMessageRateUpdate = false;
+        DispatchRateLimiter dispatchRateLimiter;
+        int retry = 5;
+        for (int i = 0; i < retry; i++) {
+            dispatchRateLimiter = 
pulsar.getBrokerService().getBrokerDispatchRateLimiter();
+            if (dispatchRateLimiter != null
+                    && dispatchRateLimiter.getDispatchRateOnByte() > 0) {
+                isMessageRateUpdate = true;
+                break;
+            } else {
+                if (i != retry - 1) {
+                    Thread.sleep(100);
+                }
+            }
+        }
+        Assert.assertTrue(isMessageRateUpdate);
+
+        long start = System.currentTimeMillis();
+        // Asynchronously produce messages
+        for (int i = 0; i < numProducedMessagesEachTopic; i++) {

Review comment:
       Could you point out the thread which need to stop?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to