This is an automated email from the ASF dual-hosted git repository.
sijie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 4c508c5 support subscription dispatch rate on topic level (#8087)
4c508c5 is described below
commit 4c508c586fb9dcd08d7e13dfe4aeca19a790f51c
Author: hangc0276 <[email protected]>
AuthorDate: Tue Sep 22 07:47:21 2020 +0800
support subscription dispatch rate on topic level (#8087)
### Modifications
Support set subscription dispatch rate on topic level.
Support get subscription dispatch rate on topic level.
Support remove subscription dispatch rate on topic level.
---
.../broker/admin/impl/PersistentTopicsBase.java | 42 +++++++
.../pulsar/broker/admin/v2/PersistentTopics.java | 87 ++++++++++++++
.../service/persistent/DispatchRateLimiter.java | 34 +++---
.../broker/service/persistent/PersistentTopic.java | 16 ++-
.../broker/admin/TopicPoliciesDisableTest.java | 21 ++++
.../pulsar/broker/admin/TopicPoliciesTest.java | 128 +++++++++++++++++++++
.../org/apache/pulsar/client/admin/Topics.java | 64 +++++++++++
.../pulsar/client/admin/internal/TopicsImpl.java | 76 ++++++++++++
.../org/apache/pulsar/admin/cli/CmdTopics.java | 59 ++++++++++
.../pulsar/common/policies/data/TopicPolicies.java | 5 +
10 files changed, 518 insertions(+), 14 deletions(-)
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
index 749c493..13c592e 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
@@ -3261,6 +3261,48 @@ public class PersistentTopicsBase extends AdminResource {
}
+ protected Optional<DispatchRate> internalGetSubscriptionDispatchRate() {
+ validateAdminAccessForTenant(namespaceName.getTenant());
+ validatePoliciesReadOnlyAccess();
+ if (topicName.isGlobal()) {
+ validateGlobalNamespaceOwnership(namespaceName);
+ }
+ checkTopicLevelPolicyEnable();
+ return
getTopicPolicies(topicName).map(TopicPolicies::getSubscriptionDispatchRate);
+ }
+
+ protected CompletableFuture<Void>
internalSetSubscriptionDispatchRate(DispatchRate dispatchRate) {
+ validateAdminAccessForTenant(namespaceName.getTenant());
+ validatePoliciesReadOnlyAccess();
+ if (topicName.isGlobal()) {
+ validateGlobalNamespaceOwnership(namespaceName);
+ }
+ checkTopicLevelPolicyEnable();
+ if (dispatchRate == null) {
+ return CompletableFuture.completedFuture(null);
+ }
+ TopicPolicies topicPolicies = getTopicPolicies(topicName)
+ .orElseGet(TopicPolicies::new);
+ topicPolicies.setSubscriptionDispatchRate(dispatchRate);
+ return
pulsar().getTopicPoliciesService().updateTopicPoliciesAsync(topicName,
topicPolicies);
+ }
+
+ protected CompletableFuture<Void> internalRemoveSubscriptionDispatchRate()
{
+ validateAdminAccessForTenant(namespaceName.getTenant());
+ validatePoliciesReadOnlyAccess();
+ if (topicName.isGlobal()) {
+ validateGlobalNamespaceOwnership(namespaceName);
+ }
+ checkTopicLevelPolicyEnable();
+ Optional<TopicPolicies> topicPolicies = getTopicPolicies(topicName);
+ if (!topicPolicies.isPresent()) {
+ return CompletableFuture.completedFuture(null);
+ }
+ topicPolicies.get().setSubscriptionDispatchRate(null);
+ return
pulsar().getTopicPoliciesService().updateTopicPoliciesAsync(topicName,
topicPolicies.get());
+ }
+
+
protected Optional<Integer> internalGetMaxConsumersPerSubscription() {
validateAdminAccessForTenant(namespaceName.getTenant());
validatePoliciesReadOnlyAccess();
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java
index 5e93c91..6762538 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java
@@ -2088,6 +2088,93 @@ public class PersistentTopics extends
PersistentTopicsBase {
}
@GET
+ @Path("/{tenant}/{namespace}/{topic}/subscriptionDispatchRate")
+ @ApiOperation(value = "Get subscription message dispatch rate
configuration for specified topic.")
+ @ApiResponses(value = { @ApiResponse(code = 403, message = "Don't have
admin permission"),
+ @ApiResponse(code = 404, message = "Topic does not exist"),
+ @ApiResponse(code = 405, message = "Topic level policy is
disabled, please enable the topic level policy and retry"),
+ @ApiResponse(code = 409, message = "Concurrent modification")})
+ public void getSubscriptionDispatchRate(@Suspended final AsyncResponse
asyncResponse,
+ @PathParam("tenant") String tenant,
+ @PathParam("namespace") String namespace,
+ @PathParam("topic") @Encoded String encodedTopic) {
+ validateTopicName(tenant, namespace, encodedTopic);
+ try {
+ Optional<DispatchRate> dispatchRate =
internalGetSubscriptionDispatchRate();
+ if (!dispatchRate.isPresent()) {
+ asyncResponse.resume(Response.noContent().build());
+ } else {
+ asyncResponse.resume(dispatchRate.get());
+ }
+ } catch (RestException e) {
+ asyncResponse.resume(e);
+ } catch (Exception e) {
+ asyncResponse.resume(new RestException(e));
+ }
+ }
+
+ @POST
+ @Path("/{tenant}/{namespace}/{topic}/subscriptionDispatchRate")
+ @ApiOperation(value = "Set subscription message dispatch rate
configuration for specified topic.")
+ @ApiResponses(value = { @ApiResponse(code = 403, message = "Topic does not
exist"),
+ @ApiResponse(code = 404, message = "Topic does not exist"),
+ @ApiResponse(code = 405, message = "Topic level policy is
disabled, please enable the topic level policy and retry"),
+ @ApiResponse(code = 409, message = "Concurrent modification")})
+ public void setSubscriptionDispatchRate(@Suspended final AsyncResponse
asyncResponse,
+ @PathParam("tenant") String tenant,
+ @PathParam("namespace") String namespace,
+ @PathParam("topic") @Encoded String
encodedTopic,
+ @ApiParam(value = "Subscription message
dispatch rate for the specified topic") DispatchRate dispatchRate) {
+ validateTopicName(tenant, namespace, encodedTopic);
+ internalSetSubscriptionDispatchRate(dispatchRate).whenComplete((r, ex)
-> {
+ if (ex instanceof RestException) {
+ log.error("Failed to set topic: {} subscription dispatch
rate", topicName.getLocalName(), ex);
+ asyncResponse.resume(ex);
+ } else if (ex != null) {
+ log.error("Failed to set topic: {} subscription dispatch
rate", topicName.getLocalName());
+ asyncResponse.resume(new RestException(ex));
+ } else {
+ try {
+ log.info("[{}] Successfully set topic subscription
dispatch rate: tenant={}, namespace={}, topic={}, dispatchRate={}",
+ clientAppId(),
+ tenant,
+ namespace,
+ topicName.getLocalName(),
+ jsonMapper().writeValueAsString(dispatchRate));
+ } catch (JsonProcessingException ignore) {}
+ asyncResponse.resume(Response.noContent().build());
+ }
+ });
+ }
+
+ @DELETE
+ @Path("/{tenant}/{namespace}/{topic}/subscriptionDispatchRate")
+ @ApiOperation(value = "Remove subscription message dispatch rate
configuration for specified topic.")
+ @ApiResponses(value = { @ApiResponse(code = 403, message = "Topic does not
exist"),
+ @ApiResponse(code = 404, message = "Topic does not exist"),
+ @ApiResponse(code = 405, message = "Topic level policy is disabled,
please enable the topic level policy and retry"),
+ @ApiResponse(code = 409, message = "Concurrent modification")})
+ public void removeSubscriptionDispatchRate(@Suspended final AsyncResponse
asyncResponse,
+ @PathParam("tenant") String tenant,
+ @PathParam("namespace") String namespace,
+ @PathParam("topic") @Encoded String
encodedTopic) {
+ validateTopicName(tenant, namespace, encodedTopic);
+ internalRemoveSubscriptionDispatchRate().whenComplete((r, ex) -> {
+ if (ex != null) {
+ log.error("Failed to remove topic: {} subscription dispatch
rate", topicName.getLocalName(), ex);
+ asyncResponse.resume(new RestException(ex));
+ } else {
+ log.info("[{}] Successfully remove topic subscription dispatch
rate: tenant={}, namespace={}, topic={}",
+ clientAppId(),
+ tenant,
+ namespace,
+ topicName.getLocalName());
+ asyncResponse.resume(Response.noContent().build());
+ }
+ });
+ }
+
+ @GET
@Path("/{tenant}/{namespace}/{topic}/compactionThreshold")
@ApiOperation(value = "Get compaction threshold configuration for
specified topic.")
@ApiResponses(value = { @ApiResponse(code = 403, message = "Don't have
admin permission"),
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/DispatchRateLimiter.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/DispatchRateLimiter.java
index 4cc711f..29d1107 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/DispatchRateLimiter.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/DispatchRateLimiter.java
@@ -149,7 +149,7 @@ public class DispatchRateLimiter {
* default broker dispatch-throttling-rate
*/
public void updateDispatchRate() {
- Optional<DispatchRate> dispatchRate =
getSystemTopicDispatchRate(brokerService, topicName);
+ Optional<DispatchRate> dispatchRate =
getTopicPolicyDispatchRate(brokerService, topicName, type);
if (!dispatchRate.isPresent()) {
dispatchRate
=Optional.ofNullable(getPoliciesDispatchRate(brokerService));
@@ -165,29 +165,37 @@ public class DispatchRateLimiter {
public static boolean isDispatchRateNeeded(BrokerService brokerService,
Optional<Policies> policies,
String topicName, Type type) {
final ServiceConfiguration serviceConfig =
brokerService.pulsar().getConfiguration();
- if (serviceConfig.isTopicLevelPoliciesEnabled() && type == Type.TOPIC)
{
- Optional<DispatchRate> dispatchRate =
getSystemTopicDispatchRate(brokerService, topicName);
- if (dispatchRate.isPresent()) {
- return true;
- }
+ Optional<DispatchRate> dispatchRate =
getTopicPolicyDispatchRate(brokerService, topicName, type);
+ if (dispatchRate.isPresent()) {
+ return true;
}
policies = policies.isPresent() ? policies :
getPolicies(brokerService, topicName);
return isDispatchRateNeeded(serviceConfig, policies, topicName, type);
}
- public static Optional<DispatchRate>
getSystemTopicDispatchRate(BrokerService brokerService, String topicName) {
+ public static Optional<DispatchRate>
getTopicPolicyDispatchRate(BrokerService brokerService,
+ String
topicName, Type type) {
Optional<DispatchRate> dispatchRate = Optional.empty();
final ServiceConfiguration serviceConfiguration =
brokerService.pulsar().getConfiguration();
- if (serviceConfiguration.isTopicLevelPoliciesEnabled()) {
+ if (serviceConfiguration.isSystemTopicEnabled() &&
serviceConfiguration.isTopicLevelPoliciesEnabled()) {
try {
- dispatchRate = Optional.ofNullable(brokerService.pulsar()
-
.getTopicPoliciesService().getTopicPolicies(TopicName.get(topicName)))
- .map(TopicPolicies::getDispatchRate);
- } catch (BrokerServiceException.TopicPoliciesCacheNotInitException
e){
+ switch (type) {
+ case TOPIC:
+ dispatchRate =
Optional.ofNullable(brokerService.pulsar().getTopicPoliciesService()
+ .getTopicPolicies(TopicName.get(topicName)))
+ .map(TopicPolicies::getDispatchRate);
+ break;
+ case SUBSCRIPTION:
+ dispatchRate =
Optional.ofNullable(brokerService.pulsar().getTopicPoliciesService()
+ .getTopicPolicies(TopicName.get(topicName)))
+
.map(TopicPolicies::getSubscriptionDispatchRate);
+ break;
+ }
+ } catch (BrokerServiceException.TopicPoliciesCacheNotInitException
e) {
log.debug("Topic {} policies cache have not init.", topicName);
} catch (Exception e) {
- log.debug("[{}] Failed to get topic policies. Exception: {}",
topicName, e);
+ log.debug("[{}] Failed to get topic dispatch rate. ",
topicName, e);
}
}
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
index f83677e..91c8b7d 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
@@ -1873,7 +1873,9 @@ public class PersistentTopic extends AbstractTopic
implements Topic, AddEntryCal
subscriptions.forEach((subName, sub) -> {
sub.getConsumers().forEach(Consumer::checkPermissions);
Dispatcher dispatcher = sub.getDispatcher();
- if (dispatcher != null) {
+ // If the topic-level policy already exists, the namespace-level
policy cannot override
+ // the topic-level policy.
+ if (dispatcher != null && (topicPolicies == null ||
!topicPolicies.isSubscriptionDispatchRateSet())) {
dispatcher.getRateLimiter().ifPresent(rateLimiter ->
rateLimiter.onPoliciesUpdate(data));
}
});
@@ -2389,6 +2391,18 @@ public class PersistentTopic extends AbstractTopic
implements Topic, AddEntryCal
}
});
+ subscriptions.forEach((subName, sub) -> {
+ sub.getConsumers().forEach(Consumer::checkPermissions);
+ Dispatcher dispatcher = sub.getDispatcher();
+ if (policies.isSubscriptionDispatchRateSet()) {
+ dispatcher.getRateLimiter().ifPresent(rateLimiter ->
+
rateLimiter.updateDispatchRate(policies.getSubscriptionDispatchRate()));
+ } else {
+ dispatcher.getRateLimiter().ifPresent(rateLimiter ->
+ rateLimiter.updateDispatchRate());
+ }
+ });
+
if (policies.getPublishRate() != null) {
topicPolicyPublishRate = policies.getPublishRate();
updateTopicPublishDispatcher();
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TopicPoliciesDisableTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TopicPoliciesDisableTest.java
index f0f8d79..2309107 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TopicPoliciesDisableTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TopicPoliciesDisableTest.java
@@ -154,6 +154,27 @@ public class TopicPoliciesDisableTest extends
MockedPulsarServiceBaseTest {
}
@Test
+ public void testSubscriptionDispatchRateDisabled() throws Exception {
+ DispatchRate dispatchRate = new DispatchRate(1000,
+ 1020*1024, 1);
+ log.info("Dispatch Rate: {} will set to the topic: {}", dispatchRate,
testTopic);
+
+ try {
+ admin.topics().setSubscriptionDispatchRate(testTopic,
dispatchRate);
+ Assert.fail();
+ } catch (PulsarAdminException e) {
+ Assert.assertEquals(e.getStatusCode(), 405);
+ }
+
+ try {
+ admin.topics().getSubscriptionDispatchRate(testTopic);
+ Assert.fail();
+ } catch (PulsarAdminException e) {
+ Assert.assertEquals(e.getStatusCode(), 405);
+ }
+ }
+
+ @Test
public void testCompactionThresholdDisabled() {
Long compactionThreshold = 10000L;
log.info("Compaction threshold: {} will set to the topic: {}",
compactionThreshold, testTopic);
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TopicPoliciesTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TopicPoliciesTest.java
index c0e7655..803cc36 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TopicPoliciesTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TopicPoliciesTest.java
@@ -561,6 +561,134 @@ public class TopicPoliciesTest extends
MockedPulsarServiceBaseTest {
}
@Test
+ public void testGetSetSubscriptionDispatchRate() throws Exception {
+ final String topic = testTopic + UUID.randomUUID();
+ admin.topics().createNonPartitionedTopic(topic);
+ Producer producer = pulsarClient.newProducer().topic(topic).create();
+ producer.close();
+ Thread.sleep(3000);
+
+ DispatchRate dispatchRate = new DispatchRate(1000,
+ 1024 * 1024, 1);
+ log.info("Subscription Dispatch Rate: {} will set to the topic: {}",
dispatchRate, topic);
+
+ admin.topics().setSubscriptionDispatchRate(topic, dispatchRate);
+ log.info("Subscription dispatch rate set success on topic: {}", topic);
+
+ Thread.sleep(3000);
+
+ String subscriptionName = "test_subscription_rate";
+ Consumer consumer =
pulsarClient.newConsumer().subscriptionName(subscriptionName).topic(topic).subscribe();
+ Thread.sleep(3000);
+
+ DispatchRateLimiter dispatchRateLimiter =
pulsar.getBrokerService().getTopicIfExists(topic)
+
.get().get().getSubscription(subscriptionName).getDispatcher().getRateLimiter().get();
+ Assert.assertNotNull(dispatchRateLimiter);
+ Assert.assertEquals(dispatchRateLimiter.getDispatchRateOnByte(),
dispatchRate.dispatchThrottlingRateInByte);
+ Assert.assertEquals(dispatchRateLimiter.getDispatchRateOnMsg(),
dispatchRate.dispatchThrottlingRateInMsg);
+
+
+ DispatchRate getDispatchRate =
admin.topics().getSubscriptionDispatchRate(topic);
+ log.info("Subscription dispatch rate: {} get on topic: {}",
getDispatchRate, topic);
+ Assert.assertEquals(getDispatchRate, dispatchRate);
+
+ producer.close();
+ admin.topics().delete(topic, true);
+ }
+
+ @Test
+ public void testRemoveSubscriptionDispatchRate() throws Exception {
+ final String topic = testTopic + UUID.randomUUID();
+ admin.topics().createNonPartitionedTopic(topic);
+ Producer producer = pulsarClient.newProducer().topic(topic).create();
+ producer.close();
+ Thread.sleep(3000);
+
+ DispatchRate dispatchRate = new DispatchRate(1000,
+ 1024 * 1024, 1);
+ log.info("Subscription Dispatch Rate: {} will set to the topic: {}",
dispatchRate, topic);
+
+ admin.topics().setSubscriptionDispatchRate(topic, dispatchRate);
+ log.info("Subscription dispatch rate set success on topic: {}", topic);
+
+ Thread.sleep(3000);
+
+ String subscriptionName = "test_subscription_rate";
+ Consumer consumer =
pulsarClient.newConsumer().subscriptionName(subscriptionName).topic(topic).subscribe();
+ Thread.sleep(3000);
+
+ DispatchRateLimiter dispatchRateLimiter =
pulsar.getBrokerService().getTopicIfExists(topic)
+
.get().get().getSubscription(subscriptionName).getDispatcher().getRateLimiter().get();
+ Assert.assertNotNull(dispatchRateLimiter);
+ Assert.assertEquals(dispatchRateLimiter.getDispatchRateOnByte(),
dispatchRate.dispatchThrottlingRateInByte);
+ Assert.assertEquals(dispatchRateLimiter.getDispatchRateOnMsg(),
dispatchRate.dispatchThrottlingRateInMsg);
+
+ DispatchRate getDispatchRate =
admin.topics().getSubscriptionDispatchRate(topic);
+ log.info("Subscription dispatch rate: {} get on topic: {}",
getDispatchRate, topic);
+
+ // remove subscription dispatch rate
+ admin.topics().removeSubscriptionDispatchRate(topic);
+ Thread.sleep(3000);
+ getDispatchRate = admin.topics().getSubscriptionDispatchRate(topic);
+ log.info("Subscription dispatch rate get on topic is {} after remove",
getDispatchRate);
+ Assert.assertNull(getDispatchRate);
+
+ dispatchRateLimiter = pulsar.getBrokerService().getTopicIfExists(topic)
+
.get().get().getSubscription(subscriptionName).getDispatcher().getRateLimiter().get();
+ Assert.assertNotEquals(dispatchRateLimiter.getDispatchRateOnMsg(),
dispatchRate.dispatchThrottlingRateInMsg);
+ Assert.assertNotEquals(dispatchRateLimiter.getDispatchRateOnByte(),
dispatchRate.dispatchThrottlingRateInByte);
+
+ producer.close();
+ admin.topics().delete(topic, true);
+ }
+
+ @Test
+ public void testSubscriptionDispatchRatePolicyOverwrittenNamespaceLevel()
throws Exception {
+ final String topic = testTopic + UUID.randomUUID();
+ admin.topics().createNonPartitionedTopic(topic);
+ Producer producer = pulsarClient.newProducer().topic(topic).create();
+ producer.close();
+ Thread.sleep(3000);
+
+ // set namespace level subscription dispatch rate
+ DispatchRate namespaceDispatchRate = new DispatchRate(100, 1024 *
1024, 1);
+ admin.namespaces().setSubscriptionDispatchRate(myNamespace,
namespaceDispatchRate);
+ Thread.sleep(3000);
+
+ String subscriptionName = "test_subscription_rate";
+ Consumer consumer =
pulsarClient.newConsumer().subscriptionName(subscriptionName).topic(topic).subscribe();
+
+ // get subscription dispatch Rate limiter
+ DispatchRateLimiter dispatchRateLimiter =
pulsar.getBrokerService().getTopicIfExists(topic)
+
.get().get().getSubscription(subscriptionName).getDispatcher().getRateLimiter().get();
+ Assert.assertEquals(dispatchRateLimiter.getDispatchRateOnMsg(),
namespaceDispatchRate.dispatchThrottlingRateInMsg);
+ Assert.assertEquals(dispatchRateLimiter.getDispatchRateOnByte(),
namespaceDispatchRate.dispatchThrottlingRateInByte);
+
+ // set topic level subscription dispatch rate
+ DispatchRate topicDispatchRate = new DispatchRate(200, 2 * 1024 *
1024, 1);
+ admin.topics().setSubscriptionDispatchRate(topic, topicDispatchRate);
+ Thread.sleep(3000);
+
+ // get subscription dispatch rate limiter
+ dispatchRateLimiter =
pulsar.getBrokerService().getTopicIfExists(topic).get().get()
+
.getSubscription(subscriptionName).getDispatcher().getRateLimiter().get();
+ Assert.assertEquals(dispatchRateLimiter.getDispatchRateOnByte(),
topicDispatchRate.dispatchThrottlingRateInByte);
+ Assert.assertEquals(dispatchRateLimiter.getDispatchRateOnMsg(),
topicDispatchRate.dispatchThrottlingRateInMsg);
+
+ // remove topic level subscription dispatch rate limiter
+ admin.topics().removeSubscriptionDispatchRate(topic);
+ Thread.sleep(3000);
+
+ // get subscription dispatch rate limiter
+ dispatchRateLimiter =
pulsar.getBrokerService().getTopicIfExists(topic).get().get()
+
.getSubscription(subscriptionName).getDispatcher().getRateLimiter().get();
+ Assert.assertEquals(dispatchRateLimiter.getDispatchRateOnByte(),
namespaceDispatchRate.dispatchThrottlingRateInByte);
+ Assert.assertEquals(dispatchRateLimiter.getDispatchRateOnMsg(),
namespaceDispatchRate.dispatchThrottlingRateInMsg);
+
+ admin.topics().delete(topic, true);
+ }
+
+ @Test
public void testGetSetCompactionThreshold() throws Exception {
long compactionThreshold = 100000;
log.info("Compaction threshold: {} will set to the topic: {}",
compactionThreshold, testTopic);
diff --git
a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/Topics.java
b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/Topics.java
index 1994ee2..e039083 100644
---
a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/Topics.java
+++
b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/Topics.java
@@ -2008,6 +2008,70 @@ public interface Topics {
CompletableFuture<Void> removeDispatchRateAsync(String topic) throws
PulsarAdminException;
/**
+ * Set subscription-message-dispatch-rate for the topic.
+ * <p/>
+ * Subscriptions under this namespace can dispatch this many messages per
second
+ *
+ * @param topic
+ * @param dispatchRate
+ * number of messages per second
+ * @throws PulsarAdminException
+ * Unexpected error
+ */
+ void setSubscriptionDispatchRate(String topic, DispatchRate dispatchRate)
throws PulsarAdminException;
+
+ /**
+ * Set subscription-message-dispatch-rate for the topic asynchronously.
+ * <p/>
+ * Subscriptions under this namespace can dispatch this many messages per
second.
+ *
+ * @param topic
+ * @param dispatchRate
+ * number of messages per second
+ */
+ CompletableFuture<Void> setSubscriptionDispatchRateAsync(String topic,
DispatchRate dispatchRate);
+
+ /**
+ * Get subscription-message-dispatch-rate for the topic.
+ * <p/>
+ * Subscriptions under this namespace can dispatch this many messages per
second.
+ *
+ * @param topic
+ * @returns DispatchRate
+ * number of messages per second
+ * @throws PulsarAdminException
+ * Unexpected error
+ */
+ DispatchRate getSubscriptionDispatchRate(String topic) throws
PulsarAdminException;
+
+ /**
+ * Get subscription-message-dispatch-rate asynchronously.
+ * <p/>
+ * Subscriptions under this namespace can dispatch this many messages per
second.
+ *
+ * @param topic
+ * @returns DispatchRate
+ * number of messages per second
+ */
+ CompletableFuture<DispatchRate> getSubscriptionDispatchRateAsync(String
topic);
+
+ /**
+ * Remove subscription-message-dispatch-rate for a topic.
+ * @param topic
+ * Topic name
+ * @throws PulsarAdminException
+ * Unexpected error
+ */
+ void removeSubscriptionDispatchRate(String topic) throws
PulsarAdminException;
+
+ /**
+ * Remove subscription-message-dispatch-rate for a topic asynchronously.
+ * @param topic
+ * Topic name
+ */
+ CompletableFuture<Void> removeSubscriptionDispatchRateAsync(String topic);
+
+ /**
* Get the compactionThreshold for a topic. The maximum number of bytes
* can have before compaction is triggered. 0 disables.
* <p/>
diff --git
a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/TopicsImpl.java
b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/TopicsImpl.java
index eb30d9c..9238c07 100644
---
a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/TopicsImpl.java
+++
b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/TopicsImpl.java
@@ -2165,6 +2165,82 @@ public class TopicsImpl extends BaseResource implements
Topics {
}
@Override
+ public DispatchRate getSubscriptionDispatchRate(String topic) throws
PulsarAdminException {
+ try {
+ return
getSubscriptionDispatchRateAsync(topic).get(this.readTimeoutMs,
TimeUnit.MILLISECONDS);
+ } catch (ExecutionException e) {
+ throw (PulsarAdminException) e.getCause();
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ throw new PulsarAdminException(e);
+ } catch (TimeoutException e) {
+ throw new PulsarAdminException.TimeoutException(e);
+ }
+ }
+
+ @Override
+ public CompletableFuture<DispatchRate>
getSubscriptionDispatchRateAsync(String topic) {
+ TopicName topicName = validateTopic(topic);
+ WebTarget path = topicPath(topicName, "subscriptionDispatchRate");
+ final CompletableFuture<DispatchRate> future = new
CompletableFuture<>();
+ asyncGetRequest(path,
+ new InvocationCallback<DispatchRate>() {
+ @Override
+ public void completed(DispatchRate dispatchRate) {
+ future.complete(dispatchRate);
+ }
+
+ @Override
+ public void failed(Throwable throwable) {
+
future.completeExceptionally(getApiException(throwable.getCause()));
+ }
+ });
+ return future;
+ }
+
+ @Override
+ public void setSubscriptionDispatchRate(String topic, DispatchRate
dispatchRate) throws PulsarAdminException {
+ try {
+ setSubscriptionDispatchRateAsync(topic,
dispatchRate).get(this.readTimeoutMs, TimeUnit.MILLISECONDS);
+ } catch (ExecutionException e) {
+ throw (PulsarAdminException) e.getCause();
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ throw new PulsarAdminException(e);
+ } catch (TimeoutException e) {
+ throw new PulsarAdminException.TimeoutException(e);
+ }
+ }
+
+ @Override
+ public CompletableFuture<Void> setSubscriptionDispatchRateAsync(String
topic, DispatchRate dispatchRate) {
+ TopicName topicName = validateTopic(topic);
+ WebTarget path = topicPath(topicName, "subscriptionDispatchRate");
+ return asyncPostRequest(path, Entity.entity(dispatchRate,
MediaType.APPLICATION_JSON));
+ }
+
+ @Override
+ public void removeSubscriptionDispatchRate(String topic) throws
PulsarAdminException {
+ try {
+ removeSubscriptionDispatchRateAsync(topic).get(this.readTimeoutMs,
TimeUnit.MILLISECONDS);
+ } catch (ExecutionException e) {
+ throw (PulsarAdminException) e.getCause();
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ throw new PulsarAdminException(e);
+ } catch (TimeoutException e) {
+ throw new PulsarAdminException.TimeoutException(e);
+ }
+ }
+
+ @Override
+ public CompletableFuture<Void> removeSubscriptionDispatchRateAsync(String
topic) {
+ TopicName topicName = validateTopic(topic);
+ WebTarget path = topicPath(topicName, "subscriptionDispatchRate");
+ return asyncDeleteRequest(path);
+ }
+
+ @Override
public Long getCompactionThreshold(String topic) throws
PulsarAdminException {
try {
return getCompactionThresholdAsync(topic).get(this.readTimeoutMs,
TimeUnit.MILLISECONDS);
diff --git
a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdTopics.java
b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdTopics.java
index 9649cad..1c6a131 100644
---
a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdTopics.java
+++
b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdTopics.java
@@ -133,9 +133,15 @@ public class CmdTopics extends CmdBase {
jcommander.addCommand("get-offload-policies", new
GetOffloadPolicies());
jcommander.addCommand("set-offload-policies", new
SetOffloadPolicies());
jcommander.addCommand("remove-offload-policies", new
RemoveOffloadPolicies());
+
jcommander.addCommand("get-dispatch-rate", new GetDispatchRate());
jcommander.addCommand("set-dispatch-rate", new SetDispatchRate());
jcommander.addCommand("remove-dispatch-rate", new
RemoveDispatchRate());
+
+ jcommander.addCommand("get-subscription-dispatch-rate", new
GetSubscriptionDispatchRate());
+ jcommander.addCommand("set-subscription-dispatch-rate", new
SetSubscriptionDispatchRate());
+ jcommander.addCommand("remove-subscription-dispatch-rate", new
RemoveSubscriptionDispatchRate());
+
jcommander.addCommand("get-compaction-threshold", new
GetCompactionThreshold());
jcommander.addCommand("set-compaction-threshold", new
SetCompactionThreshold());
jcommander.addCommand("remove-compaction-threshold", new
RemoveCompactionThreshold());
@@ -1478,6 +1484,59 @@ public class CmdTopics extends CmdBase {
}
}
+ @Parameters(commandDescription = "Get subscription message-dispatch-rate
for a topic")
+ private class GetSubscriptionDispatchRate extends CliCommand {
+ @Parameter(description = "persistent://tenant/namespace/topic",
required = true)
+ private java.util.List<String> params;
+
+ @Override
+ void run() throws PulsarAdminException {
+ String persistentTopic = validatePersistentTopic(params);
+ print(admin.topics().getSubscriptionDispatchRate(persistentTopic));
+ }
+ }
+
+ @Parameters(commandDescription = "Set subscription message-dispatch-rate
for a topic")
+ private class SetSubscriptionDispatchRate extends CliCommand {
+ @Parameter(description = "persistent://tenant/namespace/topic",
required = true)
+ private java.util.List<String> params;
+
+ @Parameter(names = { "--msg-dispatch-rate",
+ "-md" }, description = "message-dispatch-rate (default -1 will be
overwrite if not passed)\n", required = false)
+ private int msgDispatchRate = -1;
+
+ @Parameter(names = { "--byte-dispatch-rate",
+ "-bd" }, description = "byte-dispatch-rate (default -1 will be
overwrite if not passed)\n", required = false)
+ private long byteDispatchRate = -1;
+
+ @Parameter(names = { "--dispatch-rate-period",
+ "-dt" }, description = "dispatch-rate-period in second type
(default 1 second will be overwrite if not passed)\n", required = false)
+ private int dispatchRatePeriodSec = 1;
+
+ @Parameter(names = { "--relative-to-publish-rate",
+ "-rp" }, description = "dispatch rate relative to publish-rate
(if publish-relative flag is enabled then broker will apply throttling value to
(publish-rate + dispatch rate))\n", required = false)
+ private boolean relativeToPublishRate = false;
+
+ @Override
+ void run() throws PulsarAdminException {
+ String persistentTopic = validatePersistentTopic(params);
+ admin.topics().setSubscriptionDispatchRate(persistentTopic,
+ new DispatchRate(msgDispatchRate, byteDispatchRate,
dispatchRatePeriodSec, relativeToPublishRate));
+ }
+ }
+
+ @Parameters(commandDescription = "Remove subscription
message-dispatch-rate for a topic")
+ private class RemoveSubscriptionDispatchRate extends CliCommand {
+ @Parameter(description = "persistent://tenant/namespace/topic",
required = true)
+ private java.util.List<String> params;
+
+ @Override
+ void run() throws PulsarAdminException {
+ String persistentTopic = validatePersistentTopic(params);
+ admin.topics().removeSubscriptionDispatchRate(persistentTopic);
+ }
+ }
+
@Parameters(commandDescription = "Get max number of producers for a topic")
private class GetMaxProducers extends CliCommand {
@Parameter(description = "persistent://tenant/namespace/topic",
required = true)
diff --git
a/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/TopicPolicies.java
b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/TopicPolicies.java
index fb785b6..0e3440b 100644
---
a/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/TopicPolicies.java
+++
b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/TopicPolicies.java
@@ -52,6 +52,7 @@ public class TopicPolicies {
private OffloadPolicies offloadPolicies;
private InactiveTopicPolicies inactiveTopicPolicies = null;
private DispatchRate dispatchRate = null;
+ private DispatchRate subscriptionDispatchRate = null;
private Long compactionThreshold = null;
private PublishRate publishRate = null;
private SubscribeRate subscribeRate = null;
@@ -116,6 +117,10 @@ public class TopicPolicies {
return dispatchRate != null;
}
+ public boolean isSubscriptionDispatchRateSet() {
+ return subscriptionDispatchRate != null;
+ }
+
public boolean isCompactionThresholdSet() {
return compactionThreshold != null;
}