jianyun8023 commented on a change in pull request #7968:
URL: https://github.com/apache/pulsar/pull/7968#discussion_r483568822
##########
File path:
pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TopicPoliciesTest.java
##########
@@ -590,4 +591,121 @@ public void testRemovePublishRate() throws Exception {
admin.topics().deletePartitionedTopic(testTopic, true);
}
+
+ @Test
+ public void testCheckMaxConsumers() throws Exception {
+ Integer maxProducers = new Integer(-1);
+ log.info("MaxConsumers: {} will set to the topic: {}", maxProducers,
testTopic);
+ try {
+ admin.topics().setMaxConsumers(testTopic, maxProducers);
+ Assert.fail();
+ } catch (PulsarAdminException e) {
+ Assert.assertEquals(e.getStatusCode(), 412);
+ }
+
+ admin.topics().deletePartitionedTopic(testTopic, true);
+ }
+
+ @Test
+ public void testSetMaxConsumers() throws Exception {
+ admin.namespaces().setMaxConsumersPerTopic(myNamespace, 1);
+ log.info("MaxConsumers: {} will set to the namespace: {}", 1,
myNamespace);
+ Integer maxConsumers = 2;
+ log.info("MaxConsumers: {} will set to the topic: {}", maxConsumers,
persistenceTopic);
+ admin.topics().setMaxConsumers(persistenceTopic, maxConsumers);
+ Thread.sleep(3000);
+
+ admin.topics().createPartitionedTopic(persistenceTopic, 2);
+ Consumer consumer1 = null;
+ Consumer consumer2 = null;
+ Consumer consumer3 = null;
+ try {
+ consumer1 =
pulsarClient.newConsumer().subscriptionName("sub1").topic(persistenceTopic).subscribe();
+ } catch (PulsarClientException e) {
+ Assert.fail();
+ }
+ try {
+ consumer2 =
pulsarClient.newConsumer().subscriptionName("sub2").topic(persistenceTopic).subscribe();
+ } catch (PulsarClientException e) {
+ Assert.fail();
+ }
+ try {
+ consumer3 =
pulsarClient.newConsumer().subscriptionName("sub3").topic(persistenceTopic).subscribe();
+ Assert.fail();
+ } catch (PulsarClientException e) {
+ log.info("Topic reached max consumers limit");
+ }
+ Assert.assertNotNull(consumer1);
+ Assert.assertNotNull(consumer2);
+ Assert.assertNull(consumer3);
+ consumer1.close();
+ consumer2.close();
+
+ Integer getMaxConsumers =
admin.topics().getMaxConsumers(persistenceTopic);
+ log.info("MaxConsumers {} get on topic: {}", getMaxConsumers,
persistenceTopic);
+ Assert.assertEquals(getMaxConsumers, maxConsumers);
+
+ admin.topics().deletePartitionedTopic(persistenceTopic, true);
+ admin.topics().deletePartitionedTopic(testTopic, true);
Review comment:
remove `admin.topics().deletePartitionedTopic(testTopic, true);`
##########
File path:
pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
##########
@@ -2417,6 +2417,53 @@ protected void internalGetMaxProducers(AsyncResponse
asyncResponse) {
return
pulsar().getTopicPoliciesService().updateTopicPoliciesAsync(topicName,
topicPolicies.get());
}
+ protected void internalGetMaxConsumers(AsyncResponse asyncResponse) {
+ validateAdminAccessForTenant(namespaceName.getTenant());
+ validatePoliciesReadOnlyAccess();
+ if (topicName.isGlobal()) {
+ validateGlobalNamespaceOwnership(namespaceName);
+ }
+ checkTopicLevelPolicyEnable();
+ Optional<Integer> maxConsumers = getTopicPolicies(topicName)
+ .map(TopicPolicies::getMaxConsumerPerTopic);
+ if (!maxConsumers.isPresent()) {
+ asyncResponse.resume(Response.noContent().build());
+ } else {
+ asyncResponse.resume(maxConsumers.get());
+ }
+ }
Review comment:
It is not recommended to use `AsyncResponse`, just return the value
directly.
more info to see https://github.com/apache/pulsar/issues/7884
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]