315157973 commented on a change in pull request #11325:
URL: https://github.com/apache/pulsar/pull/11325#discussion_r724747615



##########
File path: 
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/DispatchRateLimiter.java
##########
@@ -317,6 +340,9 @@ public static DispatchRateImpl 
getPoliciesDispatchRate(final String cluster,
      * @return
      */
     public DispatchRate getPoliciesDispatchRate(BrokerService brokerService) {
+        if (type == Type.BROKER) {

Review comment:
       According to the comment of this method, this judgment should not be put 
into this method.

##########
File path: 
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/DispatchRateLimiter.java
##########
@@ -173,12 +188,20 @@ public void updateDispatchRate() {
         }
 
         updateDispatchRate(dispatchRate.get());
-        log.info("[{}] configured {} message-dispatch rate at broker {}", 
this.topicName, type, dispatchRate.get());
-    }
+        if (type == Type.BROKER) {

Review comment:
       The existing log has been able to print out the type and rate, why we 
need to output the log separately?

##########
File path: 
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractBaseDispatcher.java
##########
@@ -238,6 +240,27 @@ public void resetCloseFuture() {
         // noop
     }
 
+    protected abstract void reScheduleRead();
+
+    protected boolean reachDispatchRateLimit(DispatchRateLimiter 
dispatchRateLimiter,
+                                             MutablePair<Integer, Long> 
calculateToRead) {
+        if (dispatchRateLimiter.isDispatchRateLimitingEnabled()) {
+            if (!dispatchRateLimiter.hasMessageDispatchPermit()) {
+                reScheduleRead();
+                return true;
+            } else {
+                // update messagesToRead according to available dispatch rate 
limit.
+                Pair<Integer, Long> calculateResult = 
computeReadLimits(calculateToRead.getLeft(),
+                        (int) 
dispatchRateLimiter.getAvailableDispatchRateLimitOnMsg(),
+                        calculateToRead.getRight(),
+                        
dispatchRateLimiter.getAvailableDispatchRateLimitOnByte());
+                calculateToRead.setLeft(calculateResult.getLeft());
+                calculateToRead.setRight(calculateResult.getRight());

Review comment:
       It looks like it can be abstracted to another method, executed when 
reachDispatchRateLimit=false

##########
File path: 
pulsar-broker/src/test/java/org/apache/pulsar/client/api/SubscriptionMessageDispatchThrottlingTest.java
##########
@@ -310,6 +310,102 @@ public void 
testBytesRateLimitingReceiveAllMessagesAfterThrottling(SubscriptionT
         log.info("-- Exiting {} test --", methodName);
     }
 
+    /**
+     * Verify whether the broker level rate-limiting is throttle 
message-dispatching based on byte-rate or not
+     *
+     * <pre>
+     *  1. Broker level dispatch-byte-rate is equal to 1000 bytes per second.
+     *  2. Start two consumers for two topics.
+     *  3. Send 15 msgs to each of the two topics. Each msgs with 100 bytes, 
thus 3000 bytes in total.
+     *  4. It should take up to 2 seconds to receive all messages of the two 
topics.
+     * </pre>
+     *
+     * @param subscription
+     * @throws Exception
+     */
+    @Test(dataProvider = "subscriptions", timeOut = 8000)
+    public void 
testBrokerBytesRateLimitingReceiveAllMessagesAfterThrottling(SubscriptionType 
subscription) throws Exception {
+        log.info("-- Starting {} test --", methodName);
+
+        final String namespace1 = "my-property/throttling_ns1";
+        final String topicName1 = BrokerTestUtil.newUniqueName("persistent://" 
+ namespace1 + "/throttlingAll");
+        final String namespace2 = "my-property/throttling_ns2";
+        final String topicName2 = BrokerTestUtil.newUniqueName("persistent://" 
+ namespace2 + "/throttlingAll");
+        final String subName = "my-subscriber-name-" + subscription;
+
+        final int byteRate = 1000;
+        
admin.brokers().updateDynamicConfiguration("brokerDispatchThrottlingMaxByteRate",
 "" + byteRate);
+        admin.namespaces().createNamespace(namespace1, 
Sets.newHashSet("test"));
+        admin.namespaces().createNamespace(namespace2, 
Sets.newHashSet("test"));
+
+        final int numProducedMessagesEachTopic = 15;
+        final int numProducedMessages = numProducedMessagesEachTopic * 2;
+        final CountDownLatch latch = new CountDownLatch(numProducedMessages);
+        final AtomicInteger totalReceived = new AtomicInteger(0);
+        // enable throttling for nonBacklog consumers
+        conf.setDispatchThrottlingOnNonBacklogConsumerEnabled(true);
+
+        Consumer<byte[]> consumer1 = 
pulsarClient.newConsumer().topic(topicName1).subscriptionName(subName)
+                .receiverQueueSize(10)
+                .subscriptionType(subscription).messageListener((c1, msg) -> {
+                    Assert.assertNotNull(msg, "Message cannot be null");
+                    String receivedMessage = new String(msg.getData());
+                    log.debug("Received message [{}] in topic1", 
receivedMessage);
+                    totalReceived.incrementAndGet();
+                    latch.countDown();
+                }).subscribe();
+
+        Consumer<byte[]> consumer2 = 
pulsarClient.newConsumer().topic(topicName2).subscriptionName(subName)
+                .receiverQueueSize(10)
+                .subscriptionType(subscription).messageListener((c1, msg) -> {
+                    Assert.assertNotNull(msg, "Message cannot be null");
+                    String receivedMessage = new String(msg.getData());
+                    log.debug("Received message [{}] in topic2", 
receivedMessage);
+                    totalReceived.incrementAndGet();
+                    latch.countDown();
+                }).subscribe();
+
+        Producer<byte[]> producer1 = 
pulsarClient.newProducer().topic(topicName1).create();
+        Producer<byte[]> producer2 = 
pulsarClient.newProducer().topic(topicName2).create();
+
+        boolean isMessageRateUpdate = false;
+        DispatchRateLimiter dispatchRateLimiter;
+        int retry = 5;
+        for (int i = 0; i < retry; i++) {
+            dispatchRateLimiter = 
pulsar.getBrokerService().getBrokerDispatchRateLimiter();
+            if (dispatchRateLimiter != null
+                    && dispatchRateLimiter.getDispatchRateOnByte() > 0) {
+                isMessageRateUpdate = true;
+                break;
+            } else {
+                if (i != retry - 1) {
+                    Thread.sleep(100);
+                }
+            }
+        }
+        Assert.assertTrue(isMessageRateUpdate);
+
+        long start = System.currentTimeMillis();
+        // Asynchronously produce messages
+        for (int i = 0; i < numProducedMessagesEachTopic; i++) {
+            producer1.send(new byte[byteRate / 10]);
+            producer2.send(new byte[byteRate / 10]);
+        }
+        latch.await();
+        Assert.assertEquals(totalReceived.get(), numProducedMessages, 10);
+        long end = System.currentTimeMillis();
+        log.info("-- time to receive all messages: {} ", end - start);
+
+        // first 10 messages, which equals receiverQueueSize, will not wait.
+        Assert.assertTrue((end - start) >= 2000);
+
+        consumer1.close();
+        consumer2.close();
+        producer1.close();
+        producer2.close();
+        log.info("-- Exiting {} test --", methodName);

Review comment:
       Please add a unit test that test the priority of policies at different 
levels
   

##########
File path: 
pulsar-broker/src/test/java/org/apache/pulsar/client/api/SubscriptionMessageDispatchThrottlingTest.java
##########
@@ -310,6 +310,102 @@ public void 
testBytesRateLimitingReceiveAllMessagesAfterThrottling(SubscriptionT
         log.info("-- Exiting {} test --", methodName);
     }
 
+    /**
+     * Verify whether the broker level rate-limiting is throttle 
message-dispatching based on byte-rate or not
+     *
+     * <pre>
+     *  1. Broker level dispatch-byte-rate is equal to 1000 bytes per second.
+     *  2. Start two consumers for two topics.
+     *  3. Send 15 msgs to each of the two topics. Each msgs with 100 bytes, 
thus 3000 bytes in total.
+     *  4. It should take up to 2 seconds to receive all messages of the two 
topics.
+     * </pre>
+     *
+     * @param subscription
+     * @throws Exception
+     */
+    @Test(dataProvider = "subscriptions", timeOut = 8000)
+    public void 
testBrokerBytesRateLimitingReceiveAllMessagesAfterThrottling(SubscriptionType 
subscription) throws Exception {
+        log.info("-- Starting {} test --", methodName);
+
+        final String namespace1 = "my-property/throttling_ns1";
+        final String topicName1 = BrokerTestUtil.newUniqueName("persistent://" 
+ namespace1 + "/throttlingAll");
+        final String namespace2 = "my-property/throttling_ns2";
+        final String topicName2 = BrokerTestUtil.newUniqueName("persistent://" 
+ namespace2 + "/throttlingAll");
+        final String subName = "my-subscriber-name-" + subscription;
+
+        final int byteRate = 1000;
+        
admin.brokers().updateDynamicConfiguration("brokerDispatchThrottlingMaxByteRate",
 "" + byteRate);
+        admin.namespaces().createNamespace(namespace1, 
Sets.newHashSet("test"));
+        admin.namespaces().createNamespace(namespace2, 
Sets.newHashSet("test"));
+
+        final int numProducedMessagesEachTopic = 15;
+        final int numProducedMessages = numProducedMessagesEachTopic * 2;
+        final CountDownLatch latch = new CountDownLatch(numProducedMessages);
+        final AtomicInteger totalReceived = new AtomicInteger(0);
+        // enable throttling for nonBacklog consumers
+        conf.setDispatchThrottlingOnNonBacklogConsumerEnabled(true);
+
+        Consumer<byte[]> consumer1 = 
pulsarClient.newConsumer().topic(topicName1).subscriptionName(subName)
+                .receiverQueueSize(10)
+                .subscriptionType(subscription).messageListener((c1, msg) -> {
+                    Assert.assertNotNull(msg, "Message cannot be null");
+                    String receivedMessage = new String(msg.getData());
+                    log.debug("Received message [{}] in topic1", 
receivedMessage);
+                    totalReceived.incrementAndGet();
+                    latch.countDown();
+                }).subscribe();
+
+        Consumer<byte[]> consumer2 = 
pulsarClient.newConsumer().topic(topicName2).subscriptionName(subName)
+                .receiverQueueSize(10)
+                .subscriptionType(subscription).messageListener((c1, msg) -> {
+                    Assert.assertNotNull(msg, "Message cannot be null");
+                    String receivedMessage = new String(msg.getData());
+                    log.debug("Received message [{}] in topic2", 
receivedMessage);
+                    totalReceived.incrementAndGet();
+                    latch.countDown();
+                }).subscribe();
+
+        Producer<byte[]> producer1 = 
pulsarClient.newProducer().topic(topicName1).create();
+        Producer<byte[]> producer2 = 
pulsarClient.newProducer().topic(topicName2).create();
+
+        boolean isMessageRateUpdate = false;
+        DispatchRateLimiter dispatchRateLimiter;
+        int retry = 5;
+        for (int i = 0; i < retry; i++) {

Review comment:
       we can use `awaitility`

##########
File path: 
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java
##########
@@ -296,55 +303,47 @@ public synchronized void readMoreEntries() {
             messagesToRead = 1;
         }
 
+        MutablePair<Integer, Long> calculateToRead = 
MutablePair.of(messagesToRead, bytesToRead);

Review comment:
       `calculateToRead ` is a high-frequency operation, I think we can avoid 
creating new objects, just pass two params to `reachDispatchRateLimit `

##########
File path: 
pulsar-broker/src/test/java/org/apache/pulsar/client/api/SubscriptionMessageDispatchThrottlingTest.java
##########
@@ -310,6 +310,102 @@ public void 
testBytesRateLimitingReceiveAllMessagesAfterThrottling(SubscriptionT
         log.info("-- Exiting {} test --", methodName);
     }
 
+    /**
+     * Verify whether the broker level rate-limiting is throttle 
message-dispatching based on byte-rate or not
+     *
+     * <pre>
+     *  1. Broker level dispatch-byte-rate is equal to 1000 bytes per second.
+     *  2. Start two consumers for two topics.
+     *  3. Send 15 msgs to each of the two topics. Each msgs with 100 bytes, 
thus 3000 bytes in total.
+     *  4. It should take up to 2 seconds to receive all messages of the two 
topics.
+     * </pre>
+     *
+     * @param subscription
+     * @throws Exception
+     */
+    @Test(dataProvider = "subscriptions", timeOut = 8000)
+    public void 
testBrokerBytesRateLimitingReceiveAllMessagesAfterThrottling(SubscriptionType 
subscription) throws Exception {
+        log.info("-- Starting {} test --", methodName);
+
+        final String namespace1 = "my-property/throttling_ns1";
+        final String topicName1 = BrokerTestUtil.newUniqueName("persistent://" 
+ namespace1 + "/throttlingAll");
+        final String namespace2 = "my-property/throttling_ns2";
+        final String topicName2 = BrokerTestUtil.newUniqueName("persistent://" 
+ namespace2 + "/throttlingAll");
+        final String subName = "my-subscriber-name-" + subscription;
+
+        final int byteRate = 1000;
+        
admin.brokers().updateDynamicConfiguration("brokerDispatchThrottlingMaxByteRate",
 "" + byteRate);
+        admin.namespaces().createNamespace(namespace1, 
Sets.newHashSet("test"));
+        admin.namespaces().createNamespace(namespace2, 
Sets.newHashSet("test"));
+
+        final int numProducedMessagesEachTopic = 15;
+        final int numProducedMessages = numProducedMessagesEachTopic * 2;
+        final CountDownLatch latch = new CountDownLatch(numProducedMessages);
+        final AtomicInteger totalReceived = new AtomicInteger(0);
+        // enable throttling for nonBacklog consumers
+        conf.setDispatchThrottlingOnNonBacklogConsumerEnabled(true);
+
+        Consumer<byte[]> consumer1 = 
pulsarClient.newConsumer().topic(topicName1).subscriptionName(subName)
+                .receiverQueueSize(10)
+                .subscriptionType(subscription).messageListener((c1, msg) -> {
+                    Assert.assertNotNull(msg, "Message cannot be null");
+                    String receivedMessage = new String(msg.getData());
+                    log.debug("Received message [{}] in topic1", 
receivedMessage);
+                    totalReceived.incrementAndGet();
+                    latch.countDown();
+                }).subscribe();
+
+        Consumer<byte[]> consumer2 = 
pulsarClient.newConsumer().topic(topicName2).subscriptionName(subName)
+                .receiverQueueSize(10)
+                .subscriptionType(subscription).messageListener((c1, msg) -> {
+                    Assert.assertNotNull(msg, "Message cannot be null");
+                    String receivedMessage = new String(msg.getData());
+                    log.debug("Received message [{}] in topic2", 
receivedMessage);
+                    totalReceived.incrementAndGet();
+                    latch.countDown();
+                }).subscribe();
+
+        Producer<byte[]> producer1 = 
pulsarClient.newProducer().topic(topicName1).create();
+        Producer<byte[]> producer2 = 
pulsarClient.newProducer().topic(topicName2).create();
+
+        boolean isMessageRateUpdate = false;
+        DispatchRateLimiter dispatchRateLimiter;
+        int retry = 5;
+        for (int i = 0; i < retry; i++) {
+            dispatchRateLimiter = 
pulsar.getBrokerService().getBrokerDispatchRateLimiter();
+            if (dispatchRateLimiter != null
+                    && dispatchRateLimiter.getDispatchRateOnByte() > 0) {
+                isMessageRateUpdate = true;
+                break;
+            } else {
+                if (i != retry - 1) {
+                    Thread.sleep(100);
+                }
+            }
+        }
+        Assert.assertTrue(isMessageRateUpdate);
+
+        long start = System.currentTimeMillis();
+        // Asynchronously produce messages
+        for (int i = 0; i < numProducedMessagesEachTopic; i++) {

Review comment:
       I suggest you stop the thread pool that refreshes the quota at first,  
otherwise this unit test will often fail because we cannot predict the running 
of the thread

##########
File path: 
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/DispatchRateLimiter.java
##########
@@ -173,12 +188,20 @@ public void updateDispatchRate() {
         }
 
         updateDispatchRate(dispatchRate.get());
-        log.info("[{}] configured {} message-dispatch rate at broker {}", 
this.topicName, type, dispatchRate.get());
-    }
+        if (type == Type.BROKER) {
+            log.info("configured broker message-dispatch rate {}", 
dispatchRate.get());
+        } else {
+            log.info("[{}] configured {} message-dispatch rate at broker {}", 
this.topicName, type, dispatchRate.get());
+        }
+}
 
     public static boolean isDispatchRateNeeded(BrokerService brokerService, 
Optional<Policies> policies,
             String topicName, Type type) {
         final ServiceConfiguration serviceConfig = 
brokerService.pulsar().getConfiguration();
+        if (type == Type.BROKER) {

Review comment:
       Whether the priority of Broker level should be after Topic level?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to