This is an automated email from the ASF dual-hosted git repository.
penghui pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 76883d4 Support get applied clusterSubscribeRate (#9832)
76883d4 is described below
commit 76883d455b44b7b0b73684d0212002397dbb9b2c
Author: feynmanlin <[email protected]>
AuthorDate: Wed Mar 10 14:47:32 2021 +0800
Support get applied clusterSubscribeRate (#9832)
Master Issue: #9216
### Modifications
1. Add applied API for topic-level
2. Add remove for namespace-level
### Verifying this change
Verify applied API and CMD
---
.../apache/pulsar/broker/admin/AdminResource.java | 3 ---
.../pulsar/broker/admin/impl/NamespacesBase.java | 25 ++++++++++++-----
.../broker/admin/impl/PersistentTopicsBase.java | 14 ++++++++--
.../apache/pulsar/broker/admin/v2/Namespaces.java | 9 +++++++
.../pulsar/broker/admin/v2/PersistentTopics.java | 18 ++++---------
.../apache/pulsar/broker/admin/AdminApiTest.java | 1 -
.../pulsar/broker/admin/TopicPoliciesTest.java | 31 ++++++++++++++++++++++
.../pulsar/broker/admin/v1/V1_AdminApiTest.java | 1 -
.../org/apache/pulsar/client/admin/Namespaces.java | 15 +++++++++++
.../org/apache/pulsar/client/admin/Topics.java | 18 +++++++++++++
.../client/admin/internal/NamespacesImpl.java | 21 +++++++++++++++
.../pulsar/client/admin/internal/TopicsImpl.java | 15 +++++++++--
.../pulsar/admin/cli/PulsarAdminToolTest.java | 12 +++++++++
.../org/apache/pulsar/admin/cli/CmdNamespaces.java | 13 +++++++++
.../org/apache/pulsar/admin/cli/CmdTopics.java | 5 +++-
15 files changed, 171 insertions(+), 30 deletions(-)
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/AdminResource.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/AdminResource.java
index a528a79..d541fc4 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/AdminResource.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/AdminResource.java
@@ -394,9 +394,6 @@ public abstract class AdminResource extends
PulsarWebResource {
policies.topicDispatchRate.put(cluster, dispatchRate());
}
- if (policies.clusterSubscribeRate.isEmpty()) {
- policies.clusterSubscribeRate.put(cluster, subscribeRate());
- }
}
protected BacklogQuota namespaceBacklogQuota(String namespace, String
namespacePath) {
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java
index 2455d56..dfc265e 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java
@@ -1256,16 +1256,27 @@ public abstract class NamespacesBase extends
AdminResource {
}
}
+ protected void internalDeleteSubscribeRate() {
+ validateSuperUserAccess();
+ try {
+ final String path = path(POLICIES, namespaceName.toString());
+ updatePolicies(path, (policies) -> {
+
policies.clusterSubscribeRate.remove(pulsar().getConfiguration().getClusterName());
+ return policies;
+ });
+ log.info("[{}] Successfully delete the subscribeRate for cluster
on namespace {}", clientAppId(),
+ namespaceName);
+ } catch (Exception e) {
+ log.error("[{}] Failed to delete the subscribeRate for cluster on
namespace {}", clientAppId(),
+ namespaceName, e);
+ throw new RestException(e);
+ }
+ }
+
protected SubscribeRate internalGetSubscribeRate() {
validateNamespacePolicyOperation(namespaceName, PolicyName.RATE,
PolicyOperation.READ);
Policies policies = getNamespacePolicies(namespaceName);
- SubscribeRate subscribeRate =
policies.clusterSubscribeRate.get(pulsar().getConfiguration().getClusterName());
- if (subscribeRate != null) {
- return subscribeRate;
- } else {
- throw new RestException(Status.NOT_FOUND,
- "Subscribe-rate is not configured for cluster " +
pulsar().getConfiguration().getClusterName());
- }
+ return
policies.clusterSubscribeRate.get(pulsar().getConfiguration().getClusterName());
}
protected void internalRemoveReplicatorDispatchRate() {
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
index dc77aba..78dd656 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
@@ -3850,8 +3850,18 @@ public class PersistentTopicsBase extends AdminResource {
return
pulsar().getTopicPoliciesService().updateTopicPoliciesAsync(topicName,
topicPolicies.get());
}
- protected Optional<SubscribeRate> internalGetSubscribeRate() {
- return
getTopicPolicies(topicName).map(TopicPolicies::getSubscribeRate);
+ protected CompletableFuture<SubscribeRate>
internalGetSubscribeRate(boolean applied) {
+ SubscribeRate subscribeRate = getTopicPolicies(topicName)
+ .map(TopicPolicies::getSubscribeRate)
+ .orElseGet(() -> {
+ if (applied) {
+ SubscribeRate namespacePolicy =
getNamespacePolicies(namespaceName)
+
.clusterSubscribeRate.get(pulsar().getConfiguration().getClusterName());
+ return namespacePolicy == null ? subscribeRate() :
namespacePolicy;
+ }
+ return null;
+ });
+ return CompletableFuture.completedFuture(subscribeRate);
}
protected CompletableFuture<Void> internalSetSubscribeRate(SubscribeRate
subscribeRate) {
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/Namespaces.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/Namespaces.java
index 5044cf9..3195f6b 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/Namespaces.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/Namespaces.java
@@ -607,6 +607,15 @@ public class Namespaces extends NamespacesBase {
internalDeleteSubscriptionDispatchRate();
}
+ @DELETE
+ @Path("/{tenant}/{namespace}/subscribeRate")
+ @ApiOperation(value = "Delete subscribe-rate throttling for all topics of
the namespace")
+ @ApiResponses(value = { @ApiResponse(code = 403, message = "Don't have
admin permission") })
+ public void deleteSubscribeRate(@PathParam("tenant") String tenant,
@PathParam("namespace") String namespace) {
+ validateNamespaceName(tenant, namespace);
+ internalDeleteSubscribeRate();
+ }
+
@POST
@Path("/{tenant}/{namespace}/subscribeRate")
@ApiOperation(value = "Set subscribe-rate throttling for all topics of the
namespace")
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java
index d286bca..5353cf9 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java
@@ -3029,21 +3029,13 @@ public class PersistentTopics extends
PersistentTopicsBase {
public void getSubscribeRate(@Suspended final AsyncResponse asyncResponse,
@PathParam("tenant") String tenant,
@PathParam("namespace") String namespace,
- @PathParam("topic") @Encoded String
encodedTopic) {
+ @PathParam("topic") @Encoded String
encodedTopic,
+ @QueryParam("applied") boolean applied) {
validateTopicName(tenant, namespace, encodedTopic);
preValidation();
- try {
- Optional<SubscribeRate> subscribeRate = internalGetSubscribeRate();
- if (!subscribeRate.isPresent()) {
- asyncResponse.resume(Response.noContent().build());
- } else {
- asyncResponse.resume(subscribeRate.get());
- }
- } catch (RestException e) {
- asyncResponse.resume(e);
- } catch (Exception e) {
- asyncResponse.resume(new RestException(e));
- }
+ internalGetSubscribeRate(applied).whenComplete((res, ex) -> {
+ internalHandleResult(asyncResponse, res, ex, "Failed get subscribe
rate");
+ });
}
@POST
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiTest.java
index 924d980..6b121d8 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiTest.java
@@ -708,7 +708,6 @@ public class AdminApiTest extends
MockedPulsarServiceBaseTest {
policies.auth_policies.namespace_auth.put("my-role",
EnumSet.allOf(AuthAction.class));
policies.topicDispatchRate.put("test",
ConfigHelper.topicDispatchRate(conf));
- policies.clusterSubscribeRate.put("test",
ConfigHelper.subscribeRate(conf));
assertEquals(admin.namespaces().getPolicies("prop-xyz/ns1"), policies);
assertEquals(admin.namespaces().getPermissions("prop-xyz/ns1"),
policies.auth_policies.namespace_auth);
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TopicPoliciesTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TopicPoliciesTest.java
index 5e93430..456af80 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TopicPoliciesTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TopicPoliciesTest.java
@@ -1317,6 +1317,37 @@ public class TopicPoliciesTest extends
MockedPulsarServiceBaseTest {
admin.topics().deletePartitionedTopic(persistenceTopic, true);
}
+ @Test(timeOut = 20000)
+ public void testGetSubscribeRateApplied() throws Exception {
+ final String topic = testTopic + UUID.randomUUID();
+ pulsarClient.newProducer().topic(topic).create().close();
+ Awaitility.await().atMost(5, TimeUnit.SECONDS)
+ .until(() ->
pulsar.getTopicPoliciesService().cacheIsInitialized(TopicName.get(topic)));
+ assertNull(admin.topics().getSubscribeRate(topic));
+ assertNull(admin.namespaces().getSubscribeRate(myNamespace));
+ SubscribeRate brokerPolicy = new SubscribeRate(
+
pulsar.getConfiguration().getSubscribeThrottlingRatePerConsumer(),
+
pulsar.getConfiguration().getSubscribeRatePeriodPerConsumerInSecond()
+ );
+ assertEquals(admin.topics().getSubscribeRate(topic, true),
brokerPolicy);
+ SubscribeRate namespacePolicy = new SubscribeRate(10, 11);
+
+ admin.namespaces().setSubscribeRate(myNamespace, namespacePolicy);
+ Awaitility.await().untilAsserted(() ->
assertNotNull(admin.namespaces().getSubscribeRate(myNamespace)));
+ assertEquals(admin.topics().getSubscribeRate(topic, true),
namespacePolicy);
+
+ SubscribeRate topicPolicy = new SubscribeRate(20, 21);
+ admin.topics().setSubscribeRate(topic, topicPolicy);
+ Awaitility.await().untilAsserted(() ->
assertNotNull(admin.topics().getSubscribeRate(topic)));
+ assertEquals(admin.topics().getSubscribeRate(topic, true),
topicPolicy);
+
+ admin.namespaces().removeSubscribeRate(myNamespace);
+ admin.topics().removeSubscribeRate(topic);
+ Awaitility.await().untilAsserted(() ->
assertNull(admin.namespaces().getSubscribeRate(myNamespace)));
+ Awaitility.await().untilAsserted(() ->
assertNull(admin.topics().getSubscribeRate(topic)));
+ assertEquals(admin.topics().getSubscribeRate(topic, true),
brokerPolicy);
+ }
+
@Test
public void testRemoveSubscribeRate() throws Exception {
admin.topics().createPartitionedTopic(persistenceTopic, 2);
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/v1/V1_AdminApiTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/v1/V1_AdminApiTest.java
index 6f56bb2..c843de2 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/v1/V1_AdminApiTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/v1/V1_AdminApiTest.java
@@ -644,7 +644,6 @@ public class V1_AdminApiTest extends
MockedPulsarServiceBaseTest {
policies.auth_policies.namespace_auth.put("my-role",
EnumSet.allOf(AuthAction.class));
policies.topicDispatchRate.put("test",
ConfigHelper.topicDispatchRate(conf));
- policies.clusterSubscribeRate.put("test",
ConfigHelper.subscribeRate(conf));
assertEquals(admin.namespaces().getPolicies("prop-xyz/use/ns1"),
policies);
assertEquals(admin.namespaces().getPermissions("prop-xyz/use/ns1"),
policies.auth_policies.namespace_auth);
diff --git
a/pulsar-client-admin-api/src/main/java/org/apache/pulsar/client/admin/Namespaces.java
b/pulsar-client-admin-api/src/main/java/org/apache/pulsar/client/admin/Namespaces.java
index 9a6bc42..3d28bd5 100644
---
a/pulsar-client-admin-api/src/main/java/org/apache/pulsar/client/admin/Namespaces.java
+++
b/pulsar-client-admin-api/src/main/java/org/apache/pulsar/client/admin/Namespaces.java
@@ -2081,6 +2081,21 @@ public interface Namespaces {
CompletableFuture<Void> setSubscribeRateAsync(String namespace,
SubscribeRate subscribeRate);
/**
+ * Remove namespace-subscribe-rate (topics under this namespace will limit
by subscribeRate).
+ *
+ * @param namespace
+ * @throws PulsarAdminException
+ */
+ void removeSubscribeRate(String namespace) throws PulsarAdminException;
+
+ /**
+ * Remove namespace-subscribe-rate (topics under this namespace will limit
by subscribeRate) asynchronously.
+ *
+ * @param namespace
+ */
+ CompletableFuture<Void> removeSubscribeRateAsync(String namespace);
+
+ /**
* Get namespace-subscribe-rate (topics under this namespace allow
subscribe times per consumer in a period).
*
* @param namespace
diff --git
a/pulsar-client-admin-api/src/main/java/org/apache/pulsar/client/admin/Topics.java
b/pulsar-client-admin-api/src/main/java/org/apache/pulsar/client/admin/Topics.java
index 881979f..eec595f 100644
---
a/pulsar-client-admin-api/src/main/java/org/apache/pulsar/client/admin/Topics.java
+++
b/pulsar-client-admin-api/src/main/java/org/apache/pulsar/client/admin/Topics.java
@@ -3126,6 +3126,24 @@ public interface Topics {
CompletableFuture<SubscribeRate> getSubscribeRateAsync(String topic);
/**
+ * Get applied topic-subscribe-rate (topics allow subscribe times per
consumer in a period).
+ *
+ * @param topic
+ * @returns subscribeRate
+ * @throws PulsarAdminException
+ * Unexpected error
+ */
+ SubscribeRate getSubscribeRate(String topic, boolean applied) throws
PulsarAdminException;
+
+ /**
+ * Get applied topic-subscribe-rate asynchronously.
+ *
+ * @param topic
+ * @returns subscribeRate
+ */
+ CompletableFuture<SubscribeRate> getSubscribeRateAsync(String topic,
boolean applied);
+
+ /**
* Remove topic-subscribe-rate.
* <p/>
* Remove topic subscribe rate
diff --git
a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/NamespacesImpl.java
b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/NamespacesImpl.java
index 8786b95..2b819a7 100644
---
a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/NamespacesImpl.java
+++
b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/NamespacesImpl.java
@@ -1685,6 +1685,27 @@ public class NamespacesImpl extends BaseResource
implements Namespaces {
}
@Override
+ public void removeSubscribeRate(String namespace) throws
PulsarAdminException {
+ try {
+ removeSubscribeRateAsync(namespace).get(this.readTimeoutMs,
TimeUnit.MILLISECONDS);
+ } catch (ExecutionException e) {
+ throw (PulsarAdminException) e.getCause();
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ throw new PulsarAdminException(e);
+ } catch (TimeoutException e) {
+ throw new PulsarAdminException.TimeoutException(e);
+ }
+ }
+
+ @Override
+ public CompletableFuture<Void> removeSubscribeRateAsync(String namespace) {
+ NamespaceName ns = NamespaceName.get(namespace);
+ WebTarget path = namespacePath(ns, "subscribeRate");
+ return asyncDeleteRequest(path);
+ }
+
+ @Override
public SubscribeRate getSubscribeRate(String namespace) throws
PulsarAdminException {
try {
return getSubscribeRateAsync(namespace).
diff --git
a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/TopicsImpl.java
b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/TopicsImpl.java
index 1a3135c..e15c06d 100644
---
a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/TopicsImpl.java
+++
b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/TopicsImpl.java
@@ -3335,8 +3335,18 @@ public class TopicsImpl extends BaseResource implements
Topics {
@Override
public SubscribeRate getSubscribeRate(String topic) throws
PulsarAdminException {
+ return getSubscribeRate(topic, false);
+ }
+
+ @Override
+ public CompletableFuture<SubscribeRate> getSubscribeRateAsync(String
topic) {
+ return getSubscribeRateAsync(topic, false);
+ }
+
+ @Override
+ public SubscribeRate getSubscribeRate(String topic, boolean applied)
throws PulsarAdminException {
try {
- return getSubscribeRateAsync(topic).get(this.readTimeoutMs,
TimeUnit.MILLISECONDS);
+ return getSubscribeRateAsync(topic,
applied).get(this.readTimeoutMs, TimeUnit.MILLISECONDS);
} catch (ExecutionException e) {
throw (PulsarAdminException) e.getCause();
} catch (InterruptedException e) {
@@ -3348,9 +3358,10 @@ public class TopicsImpl extends BaseResource implements
Topics {
}
@Override
- public CompletableFuture<SubscribeRate> getSubscribeRateAsync(String
topic) {
+ public CompletableFuture<SubscribeRate> getSubscribeRateAsync(String
topic, boolean applied) {
TopicName topicName = validateTopic(topic);
WebTarget path = topicPath(topicName, "subscribeRate");
+ path = path.queryParam("applied", applied);
final CompletableFuture<SubscribeRate> future = new
CompletableFuture<>();
asyncGetRequest(path,
new InvocationCallback<SubscribeRate>() {
diff --git
a/pulsar-client-tools-test/src/test/java/org/apache/pulsar/admin/cli/PulsarAdminToolTest.java
b/pulsar-client-tools-test/src/test/java/org/apache/pulsar/admin/cli/PulsarAdminToolTest.java
index 1c11db5..3e9dbc3 100644
---
a/pulsar-client-tools-test/src/test/java/org/apache/pulsar/admin/cli/PulsarAdminToolTest.java
+++
b/pulsar-client-tools-test/src/test/java/org/apache/pulsar/admin/cli/PulsarAdminToolTest.java
@@ -548,6 +548,9 @@ public class PulsarAdminToolTest {
namespaces.run(split("get-subscribe-rate myprop/clust/ns1"));
verify(mockNamespaces).getSubscribeRate("myprop/clust/ns1");
+ namespaces.run(split("remove-subscribe-rate myprop/clust/ns1"));
+ verify(mockNamespaces).removeSubscribeRate("myprop/clust/ns1");
+
namespaces.run(split("set-subscription-dispatch-rate myprop/clust/ns1
-md -1 -bd -1 -dt 2"));
verify(mockNamespaces).setSubscriptionDispatchRate("myprop/clust/ns1",
new DispatchRate(-1, -1, 2));
@@ -766,6 +769,15 @@ public class PulsarAdminToolTest {
cmdTopics.run(split("expire-messages persistent://myprop/clust/ns1/ds1
-s sub1 -t 100"));
verify(mockTopics).expireMessages("persistent://myprop/clust/ns1/ds1",
"sub1", 100);
+ cmdTopics.run(split("get-subscribe-rate
persistent://myprop/clust/ns1/ds1 -ap"));
+
verify(mockTopics).getSubscribeRate("persistent://myprop/clust/ns1/ds1", true);
+
+ cmdTopics.run(split("set-subscribe-rate
persistent://myprop/clust/ns1/ds1 -sr 2 -st 60"));
+
verify(mockTopics).setSubscribeRate("persistent://myprop/clust/ns1/ds1", new
SubscribeRate(2, 60));
+
+ cmdTopics.run(split("remove-subscribe-rate
persistent://myprop/clust/ns1/ds1"));
+
verify(mockTopics).removeSubscribeRate("persistent://myprop/clust/ns1/ds1");
+
//cmd with option cannot be executed repeatedly.
cmdTopics = new CmdTopics(() -> admin);
cmdTopics.run(split("expire-messages persistent://myprop/clust/ns1/ds1
-s sub1 -p 1:1 -e"));
diff --git
a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdNamespaces.java
b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdNamespaces.java
index 02f6654..aff6ea8 100644
---
a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdNamespaces.java
+++
b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdNamespaces.java
@@ -853,6 +853,18 @@ public class CmdNamespaces extends CmdBase {
}
}
+ @Parameters(commandDescription = "Remove configured subscribe-rate per
consumer for all topics of the namespace")
+ private class RemoveSubscribeRate extends CliCommand {
+ @Parameter(description = "tenant/namespace\n", required = true)
+ private java.util.List<String> params;
+
+ @Override
+ void run() throws PulsarAdminException {
+ String namespace = validateNamespace(params);
+ getAdmin().namespaces().removeSubscribeRate(namespace);
+ }
+ }
+
@Parameters(commandDescription = "Set subscription message-dispatch-rate
for all subscription of the namespace")
private class SetSubscriptionDispatchRate extends CliCommand {
@@ -2114,6 +2126,7 @@ public class CmdNamespaces extends CmdBase {
jcommander.addCommand("set-subscribe-rate", new SetSubscribeRate());
jcommander.addCommand("get-subscribe-rate", new GetSubscribeRate());
+ jcommander.addCommand("remove-subscribe-rate", new
RemoveSubscribeRate());
jcommander.addCommand("set-subscription-dispatch-rate", new
SetSubscriptionDispatchRate());
jcommander.addCommand("get-subscription-dispatch-rate", new
GetSubscriptionDispatchRate());
diff --git
a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdTopics.java
b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdTopics.java
index 4d4c177..ee36b4d 100644
---
a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdTopics.java
+++
b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdTopics.java
@@ -2202,10 +2202,13 @@ public class CmdTopics extends CmdBase {
@Parameter(description = "persistent://tenant/namespace/topic",
required = true)
private java.util.List<String> params;
+ @Parameter(names = { "-ap", "--applied" }, description = "Get the
applied policy of the topic")
+ private boolean applied = false;
+
@Override
void run() throws PulsarAdminException {
String persistentTopic = validatePersistentTopic(params);
- print(getAdmin().topics().getSubscribeRate(persistentTopic));
+ print(getAdmin().topics().getSubscribeRate(persistentTopic,
applied));
}
}