codelipenghui commented on code in PR #17237:
URL: https://github.com/apache/pulsar/pull/17237#discussion_r952765957


##########
pulsar-broker/src/test/java/org/apache/pulsar/client/api/SubscriptionMessageDispatchThrottlingTest.java:
##########
@@ -220,6 +220,93 @@ public void 
testMessageRateLimitingReceiveAllMessagesAfterThrottling(Subscriptio
         log.info("-- Exiting {} test --", methodName);
     }
 
+    @Test(dataProvider = "subscriptions", timeOut = 30000)
+    private void testMessageNotDuplicated(SubscriptionType subscription) 
throws Exception {
+        int brokerRate = 1000;
+        int topicRate = 5000;
+        int subRate = 10000;
+        int expectRate = 1000;
+        final String namespace = "my-property/throttling_ns_non_dup";
+        final String topicName = BrokerTestUtil.newUniqueName("persistent://" 
+ namespace + "/throttlingAll");
+        final String subName = "my-subscriber-name-" + subscription;
+
+        DispatchRate subscriptionDispatchRate = DispatchRate.builder()
+                .dispatchThrottlingRateInMsg(-1)
+                .dispatchThrottlingRateInByte(subRate)
+                .ratePeriodInSecond(1)
+                .build();
+        DispatchRate topicDispatchRate = DispatchRate.builder()
+                .dispatchThrottlingRateInMsg(-1)
+                .dispatchThrottlingRateInByte(topicRate)
+                .ratePeriodInSecond(1)
+                .build();
+        admin.namespaces().createNamespace(namespace, Sets.newHashSet("test"));
+        admin.namespaces().setSubscriptionDispatchRate(namespace, 
subscriptionDispatchRate);
+        admin.namespaces().setDispatchRate(namespace, topicDispatchRate);
+        
admin.brokers().updateDynamicConfiguration("dispatchThrottlingRateInByte", "" + 
brokerRate);
+
+        final int numProducedMessages = 30;
+        final CountDownLatch latch = new CountDownLatch(numProducedMessages);
+        final AtomicInteger totalReceived = new AtomicInteger(0);
+        // enable throttling for nonBacklog consumers
+        conf.setDispatchThrottlingOnNonBacklogConsumerEnabled(true);
+
+        Consumer<byte[]> consumer = 
pulsarClient.newConsumer().topic(topicName).subscriptionName(subName)
+                .receiverQueueSize(10)
+                .subscriptionType(subscription).messageListener((c1, msg) -> {
+                    Assert.assertNotNull(msg, "Message cannot be null");
+                    String receivedMessage = new String(msg.getData());
+                    log.debug("Received message [{}] in the listener", 
receivedMessage);
+                    totalReceived.incrementAndGet();
+                    latch.countDown();
+                }).subscribe();
+
+        Producer<byte[]> producer = 
pulsarClient.newProducer().topic(topicName).create();
+        PersistentTopic topic = (PersistentTopic) 
pulsar.getBrokerService().getOrCreateTopic(topicName).get();
+
+        DispatchRateLimiter subRateLimiter = null;
+        Dispatcher subDispatcher = 
topic.getSubscription(subName).getDispatcher();
+        if (subDispatcher instanceof PersistentDispatcherMultipleConsumers) {
+            subRateLimiter = subDispatcher.getRateLimiter().get();
+        } else if (subDispatcher instanceof 
PersistentDispatcherSingleActiveConsumer) {
+            subRateLimiter = subDispatcher.getRateLimiter().get();
+        } else {
+            Assert.fail("Should only have PersistentDispatcher in this test");
+        }
+        final DispatchRateLimiter subDispatchRateLimiter = subRateLimiter;
+        Awaitility.await().atMost(Duration.ofMillis(500)).untilAsserted(() -> {
+            DispatchRateLimiter brokerDispatchRateLimiter = 
pulsar.getBrokerService().getBrokerDispatchRateLimiter();
+            Assert.assertTrue(brokerDispatchRateLimiter != null
+                    && brokerDispatchRateLimiter.getDispatchRateOnByte() > 0);
+            DispatchRateLimiter topicDispatchRateLimiter = 
topic.getDispatchRateLimiter().orElse(null);
+            Assert.assertTrue(topicDispatchRateLimiter != null
+                    && topicDispatchRateLimiter.getDispatchRateOnByte() > 0);
+            Assert.assertTrue(subDispatchRateLimiter != null
+                    && subDispatchRateLimiter.getDispatchRateOnByte() > 0);
+        });
+
+        
Assert.assertEquals(admin.namespaces().getSubscriptionDispatchRate(namespace)
+                .getDispatchThrottlingRateInByte(), subRate);
+        Assert.assertEquals(admin.namespaces().getDispatchRate(namespace)
+                .getDispatchThrottlingRateInByte(), topicRate);
+
+        // Asynchronously produce messages
+        for (int i = 0; i < numProducedMessages; i++) {
+            producer.send(new byte[expectRate / 10]);
+        }

Review Comment:
   As the comment said `Asynchronously produce messages`, but actually using 
the sync method.



##########
pulsar-broker/src/test/java/org/apache/pulsar/client/api/SubscriptionMessageDispatchThrottlingTest.java:
##########
@@ -220,6 +220,93 @@ public void 
testMessageRateLimitingReceiveAllMessagesAfterThrottling(Subscriptio
         log.info("-- Exiting {} test --", methodName);
     }
 
+    @Test(dataProvider = "subscriptions", timeOut = 30000)
+    private void testMessageNotDuplicated(SubscriptionType subscription) 
throws Exception {
+        int brokerRate = 1000;
+        int topicRate = 5000;
+        int subRate = 10000;
+        int expectRate = 1000;
+        final String namespace = "my-property/throttling_ns_non_dup";
+        final String topicName = BrokerTestUtil.newUniqueName("persistent://" 
+ namespace + "/throttlingAll");
+        final String subName = "my-subscriber-name-" + subscription;
+
+        DispatchRate subscriptionDispatchRate = DispatchRate.builder()
+                .dispatchThrottlingRateInMsg(-1)
+                .dispatchThrottlingRateInByte(subRate)
+                .ratePeriodInSecond(1)
+                .build();
+        DispatchRate topicDispatchRate = DispatchRate.builder()
+                .dispatchThrottlingRateInMsg(-1)
+                .dispatchThrottlingRateInByte(topicRate)
+                .ratePeriodInSecond(1)
+                .build();
+        admin.namespaces().createNamespace(namespace, Sets.newHashSet("test"));
+        admin.namespaces().setSubscriptionDispatchRate(namespace, 
subscriptionDispatchRate);
+        admin.namespaces().setDispatchRate(namespace, topicDispatchRate);
+        
admin.brokers().updateDynamicConfiguration("dispatchThrottlingRateInByte", "" + 
brokerRate);

Review Comment:
   Does the BUG only happen after the dispatch rate limit is enabled?



##########
pulsar-broker/src/test/java/org/apache/pulsar/client/api/SubscriptionMessageDispatchThrottlingTest.java:
##########
@@ -220,6 +220,93 @@ public void 
testMessageRateLimitingReceiveAllMessagesAfterThrottling(Subscriptio
         log.info("-- Exiting {} test --", methodName);
     }
 
+    @Test(dataProvider = "subscriptions", timeOut = 30000)

Review Comment:
   @mattisonchao If the test can't reproduce the issue stable, we can also add 
the invocationCount to make the test can get passed multiple times on the CI 
environment.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to