This is an automated email from the ASF dual-hosted git repository.
jianghaiting pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new a1fb200ff70 [PIP-152] Support subscription level dispatch rate limiter
setting. (#15295)
a1fb200ff70 is described below
commit a1fb200ff707e9855efb563a27a894664a59c58b
Author: JiangHaiting <[email protected]>
AuthorDate: Fri May 6 20:34:25 2022 +0800
[PIP-152] Support subscription level dispatch rate limiter setting. (#15295)
---
.../broker/admin/impl/PersistentTopicsBase.java | 55 +++++++++++
.../pulsar/broker/admin/v2/PersistentTopics.java | 102 +++++++++++++++++++++
.../broker/service/AbstractBaseDispatcher.java | 4 +
.../pulsar/broker/service/AbstractTopic.java | 12 ++-
.../service/persistent/DispatchRateLimiter.java | 9 +-
.../PersistentDispatcherMultipleConsumers.java | 7 +-
.../PersistentDispatcherSingleActiveConsumer.java | 7 +-
.../pulsar/broker/admin/TopicPoliciesTest.java | 52 +++++++++++
.../pulsar/broker/service/PersistentTopicTest.java | 4 +-
.../apache/pulsar/client/admin/TopicPolicies.java | 45 +++++++++
.../client/admin/internal/TopicPoliciesImpl.java | 64 +++++++++++++
.../common/policies/data/SubscriptionPolicies.java | 45 +++++++++
.../pulsar/common/policies/data/TopicPolicies.java | 6 ++
13 files changed, 402 insertions(+), 10 deletions(-)
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
index aa3fa6011dc..847406dfc89 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
@@ -125,6 +125,7 @@ import org.apache.pulsar.common.policies.data.PublishRate;
import org.apache.pulsar.common.policies.data.RetentionPolicies;
import org.apache.pulsar.common.policies.data.SchemaCompatibilityStrategy;
import org.apache.pulsar.common.policies.data.SubscribeRate;
+import org.apache.pulsar.common.policies.data.SubscriptionPolicies;
import org.apache.pulsar.common.policies.data.SubscriptionStats;
import org.apache.pulsar.common.policies.data.TopicOperation;
import org.apache.pulsar.common.policies.data.TopicPolicies;
@@ -4385,6 +4386,60 @@ public class PersistentTopicsBase extends AdminResource {
});
}
+ protected CompletableFuture<DispatchRate>
internalGetSubscriptionLevelDispatchRate(String subName, boolean applied,
+
boolean isGlobal) {
+ return getTopicPoliciesAsyncWithRetry(topicName, isGlobal)
+ .thenCompose(otp -> {
+ DispatchRateImpl rate = otp.map(tp ->
tp.getSubscriptionPolicies().get(subName))
+ .map(SubscriptionPolicies::getDispatchRate)
+ .orElse(null);
+ if (applied && rate == null) {
+ return internalGetSubscriptionDispatchRate(true,
isGlobal);
+ } else {
+ return CompletableFuture.completedFuture(rate);
+ }
+ });
+ }
+
+ protected CompletableFuture<Void>
internalSetSubscriptionLevelDispatchRate(String subName,
+
DispatchRateImpl dispatchRate,
+
boolean isGlobal) {
+ final DispatchRateImpl newDispatchRate =
DispatchRateImpl.normalize(dispatchRate);
+ if (newDispatchRate == null) {
+ return CompletableFuture.completedFuture(null);
+ }
+ return getTopicPoliciesAsyncWithRetry(topicName, isGlobal)
+ .thenCompose(op -> {
+ TopicPolicies topicPolicies =
op.orElseGet(TopicPolicies::new);
+ topicPolicies.setIsGlobal(isGlobal);
+ topicPolicies.getSubscriptionPolicies()
+ .computeIfAbsent(subName, k -> new
SubscriptionPolicies())
+ .setDispatchRate(newDispatchRate);
+ return
pulsar().getTopicPoliciesService().updateTopicPoliciesAsync(topicName,
topicPolicies);
+ });
+ }
+
+ protected CompletableFuture<Void>
internalRemoveSubscriptionLevelDispatchRate(String subName, boolean isGlobal) {
+ return getTopicPoliciesAsyncWithRetry(topicName, isGlobal)
+ .thenCompose(op -> {
+ if (!op.isPresent()) {
+ return CompletableFuture.completedFuture(null);
+ }
+ TopicPolicies topicPolicies = op.get();
+ SubscriptionPolicies sp =
topicPolicies.getSubscriptionPolicies().get(subName);
+ if (sp == null) {
+ return CompletableFuture.completedFuture(null);
+ }
+ sp.setDispatchRate(null);
+ if (sp.checkEmpty()) {
+ // cleanup empty SubscriptionPolicies
+ topicPolicies.getSubscriptionPolicies().remove(subName,
sp);
+ }
+ topicPolicies.setIsGlobal(isGlobal);
+ return
pulsar().getTopicPoliciesService().updateTopicPoliciesAsync(topicName,
op.get());
+ });
+ }
+
protected CompletableFuture<Optional<Integer>>
internalGetMaxConsumersPerSubscription(boolean isGlobal) {
return getTopicPoliciesAsyncWithRetry(topicName, isGlobal)
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java
index 4095530340c..b6424f5c425 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java
@@ -71,6 +71,7 @@ import org.apache.pulsar.common.policies.data.TopicPolicies;
import org.apache.pulsar.common.policies.data.TopicStats;
import org.apache.pulsar.common.policies.data.impl.BacklogQuotaImpl;
import org.apache.pulsar.common.policies.data.impl.DispatchRateImpl;
+import org.apache.pulsar.common.util.Codec;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -3102,6 +3103,107 @@ public class PersistentTopics extends
PersistentTopicsBase {
});
}
+ @GET
+ @Path("/{tenant}/{namespace}/{topic}/{subName}/dispatchRate")
+ @ApiOperation(value = "Get message dispatch rate configuration for
specified subscription.")
+ @ApiResponses(value = {@ApiResponse(code = 403, message = "Don't have
admin permission"),
+ @ApiResponse(code = 404, message = "Topic does not exist"),
+ @ApiResponse(code = 405,
+ message = "Topic level policy is disabled, please enable
the topic level policy and retry"),
+ @ApiResponse(code = 409, message = "Concurrent modification")})
+ public void getSubscriptionLevelDispatchRate(@Suspended final
AsyncResponse asyncResponse,
+ @PathParam("tenant") String tenant,
+ @PathParam("namespace") String namespace,
+ @PathParam("topic") @Encoded String encodedTopic,
+ @PathParam("subName") @Encoded String encodedSubscriptionName,
+ @QueryParam("applied") @DefaultValue("false") boolean applied,
+ @QueryParam("isGlobal") @DefaultValue("false") boolean isGlobal,
+ @ApiParam(value = "Is authentication required to perform this
operation")
+ @QueryParam("authoritative") @DefaultValue("false") boolean
authoritative) {
+ validateTopicName(tenant, namespace, encodedTopic);
+ preValidation(authoritative)
+ .thenCompose(__ -> internalGetSubscriptionLevelDispatchRate(
+ Codec.decode(encodedSubscriptionName), applied, isGlobal))
+ .thenApply(asyncResponse::resume)
+ .exceptionally(ex -> {
+ handleTopicPolicyException("getSubscriptionLevelDispatchRate",
ex, asyncResponse);
+ return null;
+ });
+ }
+
+ @POST
+ @Path("/{tenant}/{namespace}/{topic}/{subName}/dispatchRate")
+ @ApiOperation(value = "Set message dispatch rate configuration for
specified subscription.")
+ @ApiResponses(value = {@ApiResponse(code = 403, message = "Don't have
admin permission"),
+ @ApiResponse(code = 404, message = "Topic does not exist"),
+ @ApiResponse(code = 405,
+ message = "Topic level policy is disabled, please enable
the topic level policy and retry"),
+ @ApiResponse(code = 409, message = "Concurrent modification")})
+ public void setSubscriptionLevelDispatchRate(
+ @Suspended final AsyncResponse asyncResponse,
+ @PathParam("tenant") String tenant,
+ @PathParam("namespace") String namespace,
+ @PathParam("topic") @Encoded String encodedTopic,
+ @PathParam("subName") @Encoded String encodedSubscriptionName,
+ @ApiParam(value = "Is authentication required to perform this
operation")
+ @QueryParam("authoritative") @DefaultValue("false") boolean
authoritative,
+ @QueryParam("isGlobal") @DefaultValue("false") boolean isGlobal,
+ @ApiParam(value = "Subscription message dispatch rate for the
specified topic")
+ DispatchRateImpl dispatchRate) {
+ validateTopicName(tenant, namespace, encodedTopic);
+ preValidation(authoritative)
+ .thenCompose(__ -> internalSetSubscriptionLevelDispatchRate(
+ Codec.decode(encodedSubscriptionName), dispatchRate,
isGlobal))
+ .thenRun(() -> {
+ log.info("[{}] Successfully set subscription level dispatch
rate:"
+ + " tenant={}, namespace={}, topic={}, sub={},
dispatchRate={}",
+ clientAppId(),
+ tenant,
+ namespace,
+ topicName.getLocalName(),
+ encodedSubscriptionName,
+ dispatchRate);
+ asyncResponse.resume(Response.noContent().build());
+ })
+ .exceptionally(ex -> {
+ handleTopicPolicyException("setSubscriptionLevelDispatchRate",
ex, asyncResponse);
+ return null;
+ });
+ }
+
+ @DELETE
+ @Path("/{tenant}/{namespace}/{topic}/{subName}/dispatchRate")
+ @ApiOperation(value = "Remove message dispatch rate configuration for
specified subscription.")
+ @ApiResponses(value = {@ApiResponse(code = 403, message = "Don't have
admin permission"),
+ @ApiResponse(code = 404, message = "Topic does not exist"),
+ @ApiResponse(code = 405,
+ message = "Topic level policy is disabled, please enable
the topic level policy and retry"),
+ @ApiResponse(code = 409, message = "Concurrent modification")})
+ public void removeSubscriptionLevelDispatchRate(
+ @Suspended final AsyncResponse asyncResponse,
+ @PathParam("tenant") String tenant,
+ @PathParam("namespace") String namespace,
+ @PathParam("topic") @Encoded String encodedTopic,
+ @PathParam("subName") @Encoded String encodedSubscriptionName,
+ @QueryParam("isGlobal") @DefaultValue("false") boolean isGlobal,
+ @ApiParam(value = "Is authentication required to perform this
operation")
+ @QueryParam("authoritative") @DefaultValue("false") boolean
authoritative) {
+ validateTopicName(tenant, namespace, encodedTopic);
+ preValidation(authoritative)
+ .thenCompose(__ -> internalRemoveSubscriptionLevelDispatchRate(
+ Codec.decode(encodedSubscriptionName), isGlobal))
+ .thenRun(() -> {
+ log.info("[{}] Successfully remove subscription level dispatch
rate: "
+ + "tenant={}, namespace={}, topic={}, sub={}",
+ clientAppId(), tenant, namespace,
topicName.getLocalName(), encodedSubscriptionName);
+ asyncResponse.resume(Response.noContent().build());
+ })
+ .exceptionally(ex -> {
+
handleTopicPolicyException("removeSubscriptionLevelDispatchRate", ex,
asyncResponse);
+ return null;
+ });
+ }
+
@GET
@Path("/{tenant}/{namespace}/{topic}/compactionThreshold")
@ApiOperation(value = "Get compaction threshold configuration for
specified topic.")
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractBaseDispatcher.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractBaseDispatcher.java
index d14f683070b..e7c840b9279 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractBaseDispatcher.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractBaseDispatcher.java
@@ -317,4 +317,8 @@ public abstract class AbstractBaseDispatcher implements
Dispatcher {
protected byte[] peekStickyKey(ByteBuf metadataAndPayload) {
return Commands.peekStickyKey(metadataAndPayload,
subscription.getTopicName(), subscription.getName());
}
+
+ protected String getSubscriptionName() {
+ return subscription == null ? null : subscription.getName();
+ }
}
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java
index b853f665043..2c0d8841008 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java
@@ -65,6 +65,7 @@ import org.apache.pulsar.common.policies.data.PublishRate;
import org.apache.pulsar.common.policies.data.RetentionPolicies;
import org.apache.pulsar.common.policies.data.SchemaCompatibilityStrategy;
import org.apache.pulsar.common.policies.data.SubscribeRate;
+import org.apache.pulsar.common.policies.data.SubscriptionPolicies;
import org.apache.pulsar.common.policies.data.TopicPolicies;
import org.apache.pulsar.common.policies.data.impl.DispatchRateImpl;
import org.apache.pulsar.common.protocol.schema.SchemaData;
@@ -138,6 +139,8 @@ public abstract class AbstractTopic implements Topic,
TopicPolicyListener<TopicP
AtomicLongFieldUpdater.newUpdater(AbstractTopic.class,
"usageCount");
private volatile long usageCount = 0;
+ private Map<String/*subscription*/, SubscriptionPolicies>
subscriptionPolicies = Collections.emptyMap();
+
public AbstractTopic(String topic, BrokerService brokerService) {
this.topic = topic;
this.brokerService = brokerService;
@@ -157,8 +160,11 @@ public abstract class AbstractTopic implements Topic,
TopicPolicyListener<TopicP
return this.topicPolicies.getSubscribeRate().get();
}
- public DispatchRateImpl getSubscriptionDispatchRate() {
- return this.topicPolicies.getSubscriptionDispatchRate().get();
+ public DispatchRateImpl getSubscriptionDispatchRate(String
subscriptionName) {
+ return Optional.ofNullable(subscriptionPolicies.get(subscriptionName))
+ .map(SubscriptionPolicies::getDispatchRate)
+ .map(DispatchRateImpl::normalize)
+
.orElse(this.topicPolicies.getSubscriptionDispatchRate().get());
}
public SchemaCompatibilityStrategy getSchemaCompatibilityStrategy() {
@@ -214,6 +220,8 @@ public abstract class AbstractTopic implements Topic,
TopicPolicyListener<TopicP
DispatchRateImpl.normalize(data.getSubscriptionDispatchRate()));
topicPolicies.getCompactionThreshold().updateTopicValue(data.getCompactionThreshold());
topicPolicies.getDispatchRate().updateTopicValue(DispatchRateImpl.normalize(data.getDispatchRate()));
+
+ this.subscriptionPolicies = data.getSubscriptionPolicies();
}
protected void updateTopicPolicyByNamespacePolicy(Policies
namespacePolicies) {
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/DispatchRateLimiter.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/DispatchRateLimiter.java
index af2f45e9dcc..e8c14c99685 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/DispatchRateLimiter.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/DispatchRateLimiter.java
@@ -46,6 +46,7 @@ public class DispatchRateLimiter {
private final PersistentTopic topic;
private final String topicName;
+ private final String subscriptionName;
private final Type type;
private final BrokerService brokerService;
@@ -53,8 +54,13 @@ public class DispatchRateLimiter {
private RateLimiter dispatchRateLimiterOnByte;
public DispatchRateLimiter(PersistentTopic topic, Type type) {
+ this(topic, null, type);
+ }
+
+ public DispatchRateLimiter(PersistentTopic topic, String subscriptionName,
Type type) {
this.topic = topic;
this.topicName = topic.getName();
+ this.subscriptionName = subscriptionName;
this.brokerService = topic.getBrokerService();
this.type = type;
updateDispatchRate();
@@ -63,6 +69,7 @@ public class DispatchRateLimiter {
public DispatchRateLimiter(BrokerService brokerService) {
this.topic = null;
this.topicName = null;
+ this.subscriptionName = null;
this.brokerService = brokerService;
this.type = Type.BROKER;
updateDispatchRate();
@@ -175,7 +182,7 @@ public class DispatchRateLimiter {
dispatchRate = topic.getDispatchRate();
break;
case SUBSCRIPTION:
- dispatchRate = topic.getSubscriptionDispatchRate();
+ dispatchRate =
topic.getSubscriptionDispatchRate(subscriptionName);
break;
case REPLICATOR:
dispatchRate = topic.getReplicatorDispatchRate();
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java
index 517b08a4bd7..c9e5bcbcce6 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java
@@ -836,9 +836,10 @@ public class PersistentDispatcherMultipleConsumers extends
AbstractDispatcherMul
@Override
public boolean initializeDispatchRateLimiterIfNeeded() {
- if (!dispatchRateLimiter.isPresent()
- &&
DispatchRateLimiter.isDispatchRateEnabled(topic.getSubscriptionDispatchRate()))
{
- this.dispatchRateLimiter = Optional.of(new
DispatchRateLimiter(topic, Type.SUBSCRIPTION));
+ if (!dispatchRateLimiter.isPresent() &&
DispatchRateLimiter.isDispatchRateEnabled(
+ topic.getSubscriptionDispatchRate(getSubscriptionName()))) {
+ this.dispatchRateLimiter =
+ Optional.of(new DispatchRateLimiter(topic,
getSubscriptionName(), Type.SUBSCRIPTION));
return true;
}
return false;
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherSingleActiveConsumer.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherSingleActiveConsumer.java
index c0effad9bbd..e062b070a65 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherSingleActiveConsumer.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherSingleActiveConsumer.java
@@ -558,9 +558,10 @@ public class PersistentDispatcherSingleActiveConsumer
extends AbstractDispatcher
@Override
public boolean initializeDispatchRateLimiterIfNeeded() {
- if (!dispatchRateLimiter.isPresent()
- &&
DispatchRateLimiter.isDispatchRateEnabled(topic.getSubscriptionDispatchRate()))
{
- this.dispatchRateLimiter = Optional.of(new
DispatchRateLimiter(topic, Type.SUBSCRIPTION));
+ if (!dispatchRateLimiter.isPresent() &&
DispatchRateLimiter.isDispatchRateEnabled(
+ topic.getSubscriptionDispatchRate(getSubscriptionName()))) {
+ this.dispatchRateLimiter =
+ Optional.of(new DispatchRateLimiter(topic,
getSubscriptionName(), Type.SUBSCRIPTION));
return true;
}
return false;
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TopicPoliciesTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TopicPoliciesTest.java
index 320a375145a..4d6a0a7b15e 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TopicPoliciesTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TopicPoliciesTest.java
@@ -1316,6 +1316,58 @@ public class TopicPoliciesTest extends
MockedPulsarServiceBaseTest {
admin.topics().delete(topic, true);
}
+ @Test
+ public void testSubscriptionLevelDispatchRate() throws Exception{
+ final String topic = testTopic + UUID.randomUUID();
+ admin.topics().createNonPartitionedTopic(topic);
+
+ String subscriptionName = "testSubscriptionLevelDispatchRate";
+ @Cleanup
+ Consumer<byte[]> consumer =
pulsarClient.newConsumer().subscriptionName(subscriptionName).topic(topic).subscribe();
+ AbstractTopic abstractTopic = (AbstractTopic)
pulsar.getBrokerService().getTopicIfExists(topic).get().get();
+
+ DispatchRate topicLevelRate = DispatchRate.builder()
+ .dispatchThrottlingRateInMsg(100)
+ .dispatchThrottlingRateInByte(1024)
+ .ratePeriodInSecond(1)
+ .build();
+ admin.topicPolicies().setSubscriptionDispatchRate(topic,
topicLevelRate);
+ Awaitility.await().untilAsserted(() ->
+
Assert.assertEquals(admin.topicPolicies().getSubscriptionDispatchRate(topic),
topicLevelRate));
+
+ // topic level is set, applied value should be topic level setting.
+
Assert.assertEquals(admin.topicPolicies().getSubscriptionDispatchRate(topic,
subscriptionName, true),
+ topicLevelRate);
+ // subscription level setting is not set.
+
Assert.assertNull(admin.topicPolicies().getSubscriptionDispatchRate(topic,
subscriptionName));
+
+ DispatchRate subLevelRate = DispatchRate.builder()
+ .dispatchThrottlingRateInMsg(1000)
+ .dispatchThrottlingRateInByte(1024 * 1024)
+ .ratePeriodInSecond(1)
+ .build();
+
+ admin.topicPolicies().setSubscriptionDispatchRate(topic,
subscriptionName, subLevelRate);
+ Awaitility.await().untilAsserted(() ->
+
Assert.assertEquals(admin.topicPolicies().getSubscriptionDispatchRate(topic,
subscriptionName),
+ subLevelRate));
+
+ Awaitility.await().untilAsserted(() -> Assert.assertEquals(
+ abstractTopic.getSubscriptionDispatchRate(subscriptionName),
subLevelRate));
+
+ DispatchRateLimiter dispatchRateLimiter =
abstractTopic.getSubscription(subscriptionName)
+ .getDispatcher().getRateLimiter().get();
+ Assert.assertNotNull(dispatchRateLimiter);
+ Assert.assertEquals(dispatchRateLimiter.getDispatchRateOnByte(),
subLevelRate.getDispatchThrottlingRateInByte());
+ Assert.assertEquals(dispatchRateLimiter.getDispatchRateOnMsg(),
subLevelRate.getDispatchThrottlingRateInMsg());
+
+ admin.topicPolicies().removeSubscriptionDispatchRate(topic,
subscriptionName);
+ Awaitility.await().untilAsserted(() ->
+
Assert.assertNull(admin.topicPolicies().getSubscriptionDispatchRate(topic,
subscriptionName)));
+
+ admin.topics().delete(topic, true);
+ }
+
@Test
public void testGetSetSubscriptionDispatchRateAfterTopicLoaded() throws
Exception {
final String topic = testTopic + UUID.randomUUID();
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicTest.java
index c41584a1dae..5383c35daa5 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicTest.java
@@ -364,7 +364,9 @@ public class PersistentTopicTest extends
MockedBookKeeperTestCase {
PersistentTopic topic =
spyWithClassAndConstructorArgs(PersistentTopic.class, successTopicName,
ledgerMock, brokerService);
ManagedCursor cursor = mock(ManagedCursor.class);
when(cursor.getName()).thenReturn("cursor");
- PersistentDispatcherMultipleConsumers dispatcher = new
PersistentDispatcherMultipleConsumers(topic, cursor, null);
+ Subscription subscription = mock(Subscription.class);
+ when(subscription.getName()).thenReturn("sub");
+ PersistentDispatcherMultipleConsumers dispatcher = new
PersistentDispatcherMultipleConsumers(topic, cursor, subscription);
dispatcher.readEntriesFailed(new
ManagedLedgerException.InvalidCursorPositionException("failed"), null);
verify(topic, atLeast(1)).getBrokerService();
}
diff --git
a/pulsar-client-admin-api/src/main/java/org/apache/pulsar/client/admin/TopicPolicies.java
b/pulsar-client-admin-api/src/main/java/org/apache/pulsar/client/admin/TopicPolicies.java
index 1dfb79d7ba0..dabe69301cf 100644
---
a/pulsar-client-admin-api/src/main/java/org/apache/pulsar/client/admin/TopicPolicies.java
+++
b/pulsar-client-admin-api/src/main/java/org/apache/pulsar/client/admin/TopicPolicies.java
@@ -910,6 +910,51 @@ public interface TopicPolicies {
*/
CompletableFuture<Void> removeSubscriptionDispatchRateAsync(String topic);
+ /**
+ * Set dispatch rate limiter for a specific subscription.
+ */
+ void setSubscriptionDispatchRate(String topic, String subscriptionName,
DispatchRate dispatchRate)
+ throws PulsarAdminException;
+
+ /**
+ * Async version of {@link #setSubscriptionDispatchRate(String, String,
DispatchRate)}.
+ */
+ CompletableFuture<Void> setSubscriptionDispatchRateAsync(String topic,
String subscriptionName,
+ DispatchRate
dispatchRate);
+
+ /**
+ * If applied is true, get dispatch rate limiter for a specific
subscription.
+ * Or else, return subscription level setting.
+ */
+ DispatchRate getSubscriptionDispatchRate(String topic, String
subscriptionName, boolean applied)
+ throws PulsarAdminException;
+
+ /**
+ * Async version of {@link #getSubscriptionDispatchRate(String, String,
boolean)}.
+ */
+ CompletableFuture<DispatchRate> getSubscriptionDispatchRateAsync(String
topic, String subscriptionName,
+ boolean
applied);
+
+ /**
+ * Get subscription level dispatch rate limiter setting for a specific
subscription.
+ */
+ DispatchRate getSubscriptionDispatchRate(String topic, String
subscriptionName) throws PulsarAdminException;
+
+ /**
+ * Async version of {@link #getSubscriptionDispatchRate(String, String)}.
+ */
+ CompletableFuture<DispatchRate> getSubscriptionDispatchRateAsync(String
topic, String subscriptionName);
+
+ /**
+ * Remove subscription level dispatch rate limiter setting for a specific
subscription.
+ */
+ void removeSubscriptionDispatchRate(String topic, String subscriptionName)
throws PulsarAdminException;
+
+ /**
+ * Async version of {@link #removeSubscriptionDispatchRate(String,
String)}.
+ */
+ CompletableFuture<Void> removeSubscriptionDispatchRateAsync(String topic,
String subscriptionName);
+
/**
* Set replicatorDispatchRate for the topic.
* <p/>
diff --git
a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/TopicPoliciesImpl.java
b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/TopicPoliciesImpl.java
index e5b1d697c06..5cba329f4e0 100644
---
a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/TopicPoliciesImpl.java
+++
b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/TopicPoliciesImpl.java
@@ -753,6 +753,70 @@ public class TopicPoliciesImpl extends BaseResource
implements TopicPolicies {
return asyncDeleteRequest(path);
}
+ @Override
+ public void setSubscriptionDispatchRate(String topic, String
subscriptionName, DispatchRate dispatchRate)
+ throws PulsarAdminException {
+ sync(() -> setSubscriptionDispatchRateAsync(topic, subscriptionName,
dispatchRate));
+ }
+
+ @Override
+ public CompletableFuture<Void> setSubscriptionDispatchRateAsync(String
topic, String subscriptionName,
+
DispatchRate dispatchRate) {
+ TopicName topicName = validateTopic(topic);
+ WebTarget path = topicPath(topicName, subscriptionName,
"dispatchRate");
+ return asyncPostRequest(path, Entity.entity(dispatchRate,
MediaType.APPLICATION_JSON));
+ }
+
+ @Override
+ public DispatchRate getSubscriptionDispatchRate(String topic, String
subscriptionName, boolean applied)
+ throws PulsarAdminException {
+ return sync(() -> getSubscriptionDispatchRateAsync(topic,
subscriptionName, applied));
+ }
+
+ @Override
+ public CompletableFuture<DispatchRate>
getSubscriptionDispatchRateAsync(String topic, String subscriptionName,
+
boolean applied) {
+ TopicName topicName = validateTopic(topic);
+ WebTarget path = topicPath(topicName, subscriptionName,
"dispatchRate");
+ path = path.queryParam("applied", applied);
+ final CompletableFuture<DispatchRate> future = new
CompletableFuture<>();
+ asyncGetRequest(path,
+ new InvocationCallback<DispatchRate>() {
+ @Override
+ public void completed(DispatchRate dispatchRate) {
+ future.complete(dispatchRate);
+ }
+
+ @Override
+ public void failed(Throwable throwable) {
+
future.completeExceptionally(getApiException(throwable.getCause()));
+ }
+ });
+ return future;
+ }
+
+ @Override
+ public DispatchRate getSubscriptionDispatchRate(String topic, String
subscriptionName) throws PulsarAdminException {
+ return sync(() -> getSubscriptionDispatchRateAsync(topic,
subscriptionName));
+ }
+
+ @Override
+ public CompletableFuture<DispatchRate>
getSubscriptionDispatchRateAsync(String topic, String subscriptionName) {
+ return getSubscriptionDispatchRateAsync(topic, subscriptionName,
false);
+ }
+
+ @Override
+ public void removeSubscriptionDispatchRate(String topic, String
subscriptionName) throws PulsarAdminException {
+ sync(() -> removeSubscriptionDispatchRateAsync(topic,
subscriptionName));
+ }
+
+ @Override
+ public CompletableFuture<Void> removeSubscriptionDispatchRateAsync(String
topic, String subscriptionName) {
+ TopicName topicName = validateTopic(topic);
+ WebTarget path = topicPath(topicName, subscriptionName,
"dispatchRate");
+ return asyncDeleteRequest(path);
+ }
+
@Override
public Long getCompactionThreshold(String topic) throws
PulsarAdminException {
return getCompactionThreshold(topic, false);
diff --git
a/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/SubscriptionPolicies.java
b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/SubscriptionPolicies.java
new file mode 100644
index 00000000000..055ed28af97
--- /dev/null
+++
b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/SubscriptionPolicies.java
@@ -0,0 +1,45 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.pulsar.common.policies.data;
+
+import lombok.AllArgsConstructor;
+import lombok.Builder;
+import lombok.Data;
+import lombok.NoArgsConstructor;
+import org.apache.pulsar.common.policies.data.impl.DispatchRateImpl;
+
+/**
+ * Subscription policies.
+ */
+@Data
+@Builder
+@NoArgsConstructor
+@AllArgsConstructor
+public class SubscriptionPolicies {
+ private DispatchRateImpl dispatchRate;
+
+ /**
+ * Check if this SubscriptionPolicies is empty. Empty SubscriptionPolicies
can be auto removed from TopicPolicies.
+ * @return true if this SubscriptionPolicies is empty.
+ */
+ public boolean checkEmpty() {
+ return dispatchRate == null;
+ }
+}
diff --git
a/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/TopicPolicies.java
b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/TopicPolicies.java
index 6e81509c830..ae04c2e2178 100644
---
a/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/TopicPolicies.java
+++
b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/TopicPolicies.java
@@ -72,6 +72,12 @@ public class TopicPolicies {
private DispatchRateImpl replicatorDispatchRate;
private SchemaCompatibilityStrategy schemaCompatibilityStrategy;
+ /**
+ * Subscription level policies for specific subscription.
+ */
+ @Builder.Default
+ private Map<String/*subscription*/, SubscriptionPolicies>
subscriptionPolicies = new HashMap<>();
+
public boolean isGlobalPolicies() {
return isGlobal != null && isGlobal;
}