This is an automated email from the ASF dual-hosted git repository.
jianghaiting pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new b1003d1281d Add cli cmd for subscription level dispatch-rate-limiter
(#15862)
b1003d1281d is described below
commit b1003d1281d690e3da37d44718d4f335f4cbd23f
Author: JiangHaiting <[email protected]>
AuthorDate: Thu Jun 2 11:01:21 2022 +0800
Add cli cmd for subscription level dispatch-rate-limiter (#15862)
---
.../pulsar/admin/cli/PulsarAdminToolTest.java | 26 +++++++++++++
.../apache/pulsar/admin/cli/CmdTopicPolicies.java | 43 +++++++++++++++++-----
2 files changed, 60 insertions(+), 9 deletions(-)
diff --git
a/pulsar-client-tools-test/src/test/java/org/apache/pulsar/admin/cli/PulsarAdminToolTest.java
b/pulsar-client-tools-test/src/test/java/org/apache/pulsar/admin/cli/PulsarAdminToolTest.java
index dd48ff0f317..2b46d2ab760 100644
---
a/pulsar-client-tools-test/src/test/java/org/apache/pulsar/admin/cli/PulsarAdminToolTest.java
+++
b/pulsar-client-tools-test/src/test/java/org/apache/pulsar/admin/cli/PulsarAdminToolTest.java
@@ -995,6 +995,19 @@ public class PulsarAdminToolTest {
cmdTopics.run(split("remove-subscription-dispatch-rate
persistent://myprop/clust/ns1/ds1"));
verify(mockTopicsPolicies).removeSubscriptionDispatchRate("persistent://myprop/clust/ns1/ds1");
+ cmdTopics = new CmdTopicPolicies(() -> admin);
+ cmdTopics.run(split("set-subscription-dispatch-rate
persistent://myprop/clust/ns1/ds1 -s sub -md -1 -bd -1 -dt 3"));
+
verify(mockTopicsPolicies).setSubscriptionDispatchRate("persistent://myprop/clust/ns1/ds1",
"sub",
+ DispatchRate.builder()
+ .dispatchThrottlingRateInMsg(-1)
+ .dispatchThrottlingRateInByte(-1)
+ .ratePeriodInSecond(3)
+ .build());
+ cmdTopics.run(split("get-subscription-dispatch-rate
persistent://myprop/clust/ns1/ds1 -s sub"));
+
verify(mockTopicsPolicies).getSubscriptionDispatchRate("persistent://myprop/clust/ns1/ds1",
"sub",false);
+ cmdTopics.run(split("remove-subscription-dispatch-rate
persistent://myprop/clust/ns1/ds1 -s sub"));
+
verify(mockTopicsPolicies).removeSubscriptionDispatchRate("persistent://myprop/clust/ns1/ds1",
"sub");
+
cmdTopics.run(split("get-persistence
persistent://myprop/clust/ns1/ds1"));
verify(mockTopicsPolicies).getPersistence("persistent://myprop/clust/ns1/ds1");
cmdTopics.run(split("set-persistence persistent://myprop/clust/ns1/ds1
-e 2 -w 1 -a 1 -r 100.0"));
@@ -1293,6 +1306,19 @@ public class PulsarAdminToolTest {
cmdTopics.run(split("remove-subscription-dispatch-rate
persistent://myprop/clust/ns1/ds1 -g"));
verify(mockGlobalTopicsPolicies).removeSubscriptionDispatchRate("persistent://myprop/clust/ns1/ds1");
+ cmdTopics = new CmdTopicPolicies(() -> admin);
+ cmdTopics.run(split("set-subscription-dispatch-rate
persistent://myprop/clust/ns1/ds1 -s sub -md -1 -bd -1 -dt 2 -g"));
+
verify(mockGlobalTopicsPolicies).setSubscriptionDispatchRate("persistent://myprop/clust/ns1/ds1",
"sub",
+ DispatchRate.builder()
+ .dispatchThrottlingRateInMsg(-1)
+ .dispatchThrottlingRateInByte(-1)
+ .ratePeriodInSecond(2)
+ .build());
+ cmdTopics.run(split("get-subscription-dispatch-rate
persistent://myprop/clust/ns1/ds1 -s sub -g"));
+
verify(mockGlobalTopicsPolicies).getSubscriptionDispatchRate("persistent://myprop/clust/ns1/ds1",
"sub",false);
+ cmdTopics.run(split("remove-subscription-dispatch-rate
persistent://myprop/clust/ns1/ds1 -s sub -g"));
+
verify(mockGlobalTopicsPolicies).removeSubscriptionDispatchRate("persistent://myprop/clust/ns1/ds1","sub");
+
cmdTopics.run(split("get-max-subscriptions-per-topic
persistent://myprop/clust/ns1/ds1 -g"));
verify(mockGlobalTopicsPolicies).getMaxSubscriptionsPerTopic("persistent://myprop/clust/ns1/ds1");
cmdTopics.run(split("set-max-subscriptions-per-topic
persistent://myprop/clust/ns1/ds1 -s 1024 -g"));
diff --git
a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdTopicPolicies.java
b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdTopicPolicies.java
index 1314f649c86..8a4821df152 100644
---
a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdTopicPolicies.java
+++
b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdTopicPolicies.java
@@ -28,6 +28,7 @@ import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.function.Supplier;
import java.util.stream.Collectors;
+import org.apache.commons.lang3.StringUtils;
import org.apache.pulsar.client.admin.PulsarAdmin;
import org.apache.pulsar.client.admin.PulsarAdminException;
import org.apache.pulsar.client.admin.TopicPolicies;
@@ -1461,10 +1462,18 @@ public class CmdTopicPolicies extends CmdBase {
+ "If set to true, broker returned global topic policies")
private boolean isGlobal = false;
+ @Parameter(names = {"--subscription", "-s"},
+ description = "Get message-dispatch-rate of a specific
subscription")
+ private String subName;
+
@Override
void run() throws PulsarAdminException {
String persistentTopic = validatePersistentTopic(params);
-
print(getTopicPolicies(isGlobal).getSubscriptionDispatchRate(persistentTopic,
applied));
+ if (StringUtils.isBlank(subName)) {
+
print(getTopicPolicies(isGlobal).getSubscriptionDispatchRate(persistentTopic,
applied));
+ } else {
+
print(getTopicPolicies(isGlobal).getSubscriptionDispatchRate(persistentTopic,
subName, applied));
+ }
}
}
@@ -1495,16 +1504,24 @@ public class CmdTopicPolicies extends CmdBase {
+ "If set to true, the policy will be replicate to other
clusters asynchronously")
private boolean isGlobal = false;
+ @Parameter(names = {"--subscription", "-s"},
+ description = "Set message-dispatch-rate for a specific
subscription")
+ private String subName;
+
@Override
void run() throws PulsarAdminException {
String persistentTopic = validatePersistentTopic(params);
-
getTopicPolicies(isGlobal).setSubscriptionDispatchRate(persistentTopic,
- DispatchRate.builder()
- .dispatchThrottlingRateInMsg(msgDispatchRate)
- .dispatchThrottlingRateInByte(byteDispatchRate)
- .ratePeriodInSecond(dispatchRatePeriodSec)
- .relativeToPublishRate(relativeToPublishRate)
- .build());
+ DispatchRate rate = DispatchRate.builder()
+ .dispatchThrottlingRateInMsg(msgDispatchRate)
+ .dispatchThrottlingRateInByte(byteDispatchRate)
+ .ratePeriodInSecond(dispatchRatePeriodSec)
+ .relativeToPublishRate(relativeToPublishRate)
+ .build();
+ if (StringUtils.isBlank(subName)) {
+
getTopicPolicies(isGlobal).setSubscriptionDispatchRate(persistentTopic, rate);
+ } else {
+
getTopicPolicies(isGlobal).setSubscriptionDispatchRate(persistentTopic,
subName, rate);
+ }
}
}
@@ -1517,10 +1534,18 @@ public class CmdTopicPolicies extends CmdBase {
+ "If set to true, the policy will be replicate to other
clusters asynchronously")
private boolean isGlobal = false;
+ @Parameter(names = {"--subscription", "-s"},
+ description = "Remove message-dispatch-rate for a specific
subscription")
+ private String subName;
+
@Override
void run() throws PulsarAdminException {
String persistentTopic = validatePersistentTopic(params);
-
getTopicPolicies(isGlobal).removeSubscriptionDispatchRate(persistentTopic);
+ if (StringUtils.isBlank(subName)) {
+
getTopicPolicies(isGlobal).removeSubscriptionDispatchRate(persistentTopic);
+ } else {
+
getTopicPolicies(isGlobal).removeSubscriptionDispatchRate(persistentTopic,
subName);
+ }
}
}