This is an automated email from the ASF dual-hosted git repository.

jianghaiting pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new a1fb200ff70 [PIP-152] Support subscription level dispatch rate limiter 
setting. (#15295)
a1fb200ff70 is described below

commit a1fb200ff707e9855efb563a27a894664a59c58b
Author: JiangHaiting <[email protected]>
AuthorDate: Fri May 6 20:34:25 2022 +0800

    [PIP-152] Support subscription level dispatch rate limiter setting. (#15295)
---
 .../broker/admin/impl/PersistentTopicsBase.java    |  55 +++++++++++
 .../pulsar/broker/admin/v2/PersistentTopics.java   | 102 +++++++++++++++++++++
 .../broker/service/AbstractBaseDispatcher.java     |   4 +
 .../pulsar/broker/service/AbstractTopic.java       |  12 ++-
 .../service/persistent/DispatchRateLimiter.java    |   9 +-
 .../PersistentDispatcherMultipleConsumers.java     |   7 +-
 .../PersistentDispatcherSingleActiveConsumer.java  |   7 +-
 .../pulsar/broker/admin/TopicPoliciesTest.java     |  52 +++++++++++
 .../pulsar/broker/service/PersistentTopicTest.java |   4 +-
 .../apache/pulsar/client/admin/TopicPolicies.java  |  45 +++++++++
 .../client/admin/internal/TopicPoliciesImpl.java   |  64 +++++++++++++
 .../common/policies/data/SubscriptionPolicies.java |  45 +++++++++
 .../pulsar/common/policies/data/TopicPolicies.java |   6 ++
 13 files changed, 402 insertions(+), 10 deletions(-)

diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
index aa3fa6011dc..847406dfc89 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
@@ -125,6 +125,7 @@ import org.apache.pulsar.common.policies.data.PublishRate;
 import org.apache.pulsar.common.policies.data.RetentionPolicies;
 import org.apache.pulsar.common.policies.data.SchemaCompatibilityStrategy;
 import org.apache.pulsar.common.policies.data.SubscribeRate;
+import org.apache.pulsar.common.policies.data.SubscriptionPolicies;
 import org.apache.pulsar.common.policies.data.SubscriptionStats;
 import org.apache.pulsar.common.policies.data.TopicOperation;
 import org.apache.pulsar.common.policies.data.TopicPolicies;
@@ -4385,6 +4386,60 @@ public class PersistentTopicsBase extends AdminResource {
             });
     }
 
+    protected CompletableFuture<DispatchRate> 
internalGetSubscriptionLevelDispatchRate(String subName, boolean applied,
+                                                                               
        boolean isGlobal) {
+        return getTopicPoliciesAsyncWithRetry(topicName, isGlobal)
+                .thenCompose(otp -> {
+                    DispatchRateImpl rate = otp.map(tp -> 
tp.getSubscriptionPolicies().get(subName))
+                            .map(SubscriptionPolicies::getDispatchRate)
+                            .orElse(null);
+                    if (applied && rate == null) {
+                        return internalGetSubscriptionDispatchRate(true, 
isGlobal);
+                    } else {
+                        return CompletableFuture.completedFuture(rate);
+                    }
+                });
+    }
+
+    protected CompletableFuture<Void> 
internalSetSubscriptionLevelDispatchRate(String subName,
+                                                                               
DispatchRateImpl dispatchRate,
+                                                                               
boolean isGlobal) {
+        final DispatchRateImpl newDispatchRate = 
DispatchRateImpl.normalize(dispatchRate);
+        if (newDispatchRate == null) {
+            return CompletableFuture.completedFuture(null);
+        }
+        return getTopicPoliciesAsyncWithRetry(topicName, isGlobal)
+                .thenCompose(op -> {
+                    TopicPolicies topicPolicies = 
op.orElseGet(TopicPolicies::new);
+                    topicPolicies.setIsGlobal(isGlobal);
+                    topicPolicies.getSubscriptionPolicies()
+                            .computeIfAbsent(subName, k -> new 
SubscriptionPolicies())
+                            .setDispatchRate(newDispatchRate);
+                    return 
pulsar().getTopicPoliciesService().updateTopicPoliciesAsync(topicName, 
topicPolicies);
+                });
+    }
+
+    protected CompletableFuture<Void> 
internalRemoveSubscriptionLevelDispatchRate(String subName, boolean isGlobal) {
+        return getTopicPoliciesAsyncWithRetry(topicName, isGlobal)
+            .thenCompose(op -> {
+                if (!op.isPresent()) {
+                    return CompletableFuture.completedFuture(null);
+                }
+                TopicPolicies topicPolicies = op.get();
+                SubscriptionPolicies sp = 
topicPolicies.getSubscriptionPolicies().get(subName);
+                if (sp == null) {
+                    return CompletableFuture.completedFuture(null);
+                }
+                sp.setDispatchRate(null);
+                if (sp.checkEmpty()) {
+                    // cleanup empty SubscriptionPolicies
+                    topicPolicies.getSubscriptionPolicies().remove(subName, 
sp);
+                }
+                topicPolicies.setIsGlobal(isGlobal);
+                return 
pulsar().getTopicPoliciesService().updateTopicPoliciesAsync(topicName, 
op.get());
+            });
+    }
+
 
     protected CompletableFuture<Optional<Integer>> 
internalGetMaxConsumersPerSubscription(boolean isGlobal) {
         return getTopicPoliciesAsyncWithRetry(topicName, isGlobal)
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java
index 4095530340c..b6424f5c425 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java
@@ -71,6 +71,7 @@ import org.apache.pulsar.common.policies.data.TopicPolicies;
 import org.apache.pulsar.common.policies.data.TopicStats;
 import org.apache.pulsar.common.policies.data.impl.BacklogQuotaImpl;
 import org.apache.pulsar.common.policies.data.impl.DispatchRateImpl;
+import org.apache.pulsar.common.util.Codec;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -3102,6 +3103,107 @@ public class PersistentTopics extends 
PersistentTopicsBase {
             });
     }
 
+    @GET
+    @Path("/{tenant}/{namespace}/{topic}/{subName}/dispatchRate")
+    @ApiOperation(value = "Get message dispatch rate configuration for 
specified subscription.")
+    @ApiResponses(value = {@ApiResponse(code = 403, message = "Don't have 
admin permission"),
+            @ApiResponse(code = 404, message = "Topic does not exist"),
+            @ApiResponse(code = 405,
+                    message = "Topic level policy is disabled, please enable 
the topic level policy and retry"),
+            @ApiResponse(code = 409, message = "Concurrent modification")})
+    public void getSubscriptionLevelDispatchRate(@Suspended final 
AsyncResponse asyncResponse,
+            @PathParam("tenant") String tenant,
+            @PathParam("namespace") String namespace,
+            @PathParam("topic") @Encoded String encodedTopic,
+            @PathParam("subName") @Encoded String encodedSubscriptionName,
+            @QueryParam("applied") @DefaultValue("false") boolean applied,
+            @QueryParam("isGlobal") @DefaultValue("false") boolean isGlobal,
+            @ApiParam(value = "Is authentication required to perform this 
operation")
+            @QueryParam("authoritative") @DefaultValue("false") boolean 
authoritative) {
+        validateTopicName(tenant, namespace, encodedTopic);
+        preValidation(authoritative)
+            .thenCompose(__ -> internalGetSubscriptionLevelDispatchRate(
+                    Codec.decode(encodedSubscriptionName), applied, isGlobal))
+            .thenApply(asyncResponse::resume)
+            .exceptionally(ex -> {
+                handleTopicPolicyException("getSubscriptionLevelDispatchRate", 
ex, asyncResponse);
+                return null;
+            });
+    }
+
+    @POST
+    @Path("/{tenant}/{namespace}/{topic}/{subName}/dispatchRate")
+    @ApiOperation(value = "Set message dispatch rate configuration for 
specified subscription.")
+    @ApiResponses(value = {@ApiResponse(code = 403, message = "Don't have 
admin permission"),
+            @ApiResponse(code = 404, message = "Topic does not exist"),
+            @ApiResponse(code = 405,
+                    message = "Topic level policy is disabled, please enable 
the topic level policy and retry"),
+            @ApiResponse(code = 409, message = "Concurrent modification")})
+    public void setSubscriptionLevelDispatchRate(
+            @Suspended final AsyncResponse asyncResponse,
+            @PathParam("tenant") String tenant,
+            @PathParam("namespace") String namespace,
+            @PathParam("topic") @Encoded String encodedTopic,
+            @PathParam("subName") @Encoded String encodedSubscriptionName,
+            @ApiParam(value = "Is authentication required to perform this 
operation")
+            @QueryParam("authoritative") @DefaultValue("false") boolean 
authoritative,
+            @QueryParam("isGlobal") @DefaultValue("false") boolean isGlobal,
+            @ApiParam(value = "Subscription message dispatch rate for the 
specified topic")
+                    DispatchRateImpl dispatchRate) {
+        validateTopicName(tenant, namespace, encodedTopic);
+        preValidation(authoritative)
+            .thenCompose(__ -> internalSetSubscriptionLevelDispatchRate(
+                    Codec.decode(encodedSubscriptionName), dispatchRate, 
isGlobal))
+            .thenRun(() -> {
+                log.info("[{}] Successfully set subscription level dispatch 
rate:"
+                                + " tenant={}, namespace={}, topic={}, sub={}, 
dispatchRate={}",
+                        clientAppId(),
+                        tenant,
+                        namespace,
+                        topicName.getLocalName(),
+                        encodedSubscriptionName,
+                        dispatchRate);
+                asyncResponse.resume(Response.noContent().build());
+            })
+            .exceptionally(ex -> {
+                handleTopicPolicyException("setSubscriptionLevelDispatchRate", 
ex, asyncResponse);
+                return null;
+            });
+    }
+
+    @DELETE
+    @Path("/{tenant}/{namespace}/{topic}/{subName}/dispatchRate")
+    @ApiOperation(value = "Remove message dispatch rate configuration for 
specified subscription.")
+    @ApiResponses(value = {@ApiResponse(code = 403, message = "Don't have 
admin permission"),
+            @ApiResponse(code = 404, message = "Topic does not exist"),
+            @ApiResponse(code = 405,
+                    message = "Topic level policy is disabled, please enable 
the topic level policy and retry"),
+            @ApiResponse(code = 409, message = "Concurrent modification")})
+    public void removeSubscriptionLevelDispatchRate(
+            @Suspended final AsyncResponse asyncResponse,
+            @PathParam("tenant") String tenant,
+            @PathParam("namespace") String namespace,
+            @PathParam("topic") @Encoded String encodedTopic,
+            @PathParam("subName") @Encoded String encodedSubscriptionName,
+            @QueryParam("isGlobal") @DefaultValue("false") boolean isGlobal,
+            @ApiParam(value = "Is authentication required to perform this 
operation")
+            @QueryParam("authoritative") @DefaultValue("false") boolean 
authoritative) {
+        validateTopicName(tenant, namespace, encodedTopic);
+        preValidation(authoritative)
+            .thenCompose(__ -> internalRemoveSubscriptionLevelDispatchRate(
+                    Codec.decode(encodedSubscriptionName), isGlobal))
+            .thenRun(() -> {
+                log.info("[{}] Successfully remove subscription level dispatch 
rate: "
+                                + "tenant={}, namespace={}, topic={}, sub={}",
+                        clientAppId(), tenant, namespace, 
topicName.getLocalName(), encodedSubscriptionName);
+                asyncResponse.resume(Response.noContent().build());
+            })
+            .exceptionally(ex -> {
+                
handleTopicPolicyException("removeSubscriptionLevelDispatchRate", ex, 
asyncResponse);
+                return null;
+            });
+    }
+
     @GET
     @Path("/{tenant}/{namespace}/{topic}/compactionThreshold")
     @ApiOperation(value = "Get compaction threshold configuration for 
specified topic.")
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractBaseDispatcher.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractBaseDispatcher.java
index d14f683070b..e7c840b9279 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractBaseDispatcher.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractBaseDispatcher.java
@@ -317,4 +317,8 @@ public abstract class AbstractBaseDispatcher implements 
Dispatcher {
     protected byte[] peekStickyKey(ByteBuf metadataAndPayload) {
         return Commands.peekStickyKey(metadataAndPayload, 
subscription.getTopicName(), subscription.getName());
     }
+
+    protected String getSubscriptionName() {
+        return subscription == null ? null : subscription.getName();
+    }
 }
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java
index b853f665043..2c0d8841008 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java
@@ -65,6 +65,7 @@ import org.apache.pulsar.common.policies.data.PublishRate;
 import org.apache.pulsar.common.policies.data.RetentionPolicies;
 import org.apache.pulsar.common.policies.data.SchemaCompatibilityStrategy;
 import org.apache.pulsar.common.policies.data.SubscribeRate;
+import org.apache.pulsar.common.policies.data.SubscriptionPolicies;
 import org.apache.pulsar.common.policies.data.TopicPolicies;
 import org.apache.pulsar.common.policies.data.impl.DispatchRateImpl;
 import org.apache.pulsar.common.protocol.schema.SchemaData;
@@ -138,6 +139,8 @@ public abstract class AbstractTopic implements Topic, 
TopicPolicyListener<TopicP
             AtomicLongFieldUpdater.newUpdater(AbstractTopic.class, 
"usageCount");
     private volatile long usageCount = 0;
 
+    private Map<String/*subscription*/, SubscriptionPolicies> 
subscriptionPolicies = Collections.emptyMap();
+
     public AbstractTopic(String topic, BrokerService brokerService) {
         this.topic = topic;
         this.brokerService = brokerService;
@@ -157,8 +160,11 @@ public abstract class AbstractTopic implements Topic, 
TopicPolicyListener<TopicP
         return this.topicPolicies.getSubscribeRate().get();
     }
 
-    public DispatchRateImpl getSubscriptionDispatchRate() {
-        return this.topicPolicies.getSubscriptionDispatchRate().get();
+    public DispatchRateImpl getSubscriptionDispatchRate(String 
subscriptionName) {
+        return Optional.ofNullable(subscriptionPolicies.get(subscriptionName))
+                .map(SubscriptionPolicies::getDispatchRate)
+                .map(DispatchRateImpl::normalize)
+                
.orElse(this.topicPolicies.getSubscriptionDispatchRate().get());
     }
 
     public SchemaCompatibilityStrategy getSchemaCompatibilityStrategy() {
@@ -214,6 +220,8 @@ public abstract class AbstractTopic implements Topic, 
TopicPolicyListener<TopicP
             DispatchRateImpl.normalize(data.getSubscriptionDispatchRate()));
         
topicPolicies.getCompactionThreshold().updateTopicValue(data.getCompactionThreshold());
         
topicPolicies.getDispatchRate().updateTopicValue(DispatchRateImpl.normalize(data.getDispatchRate()));
+
+        this.subscriptionPolicies = data.getSubscriptionPolicies();
     }
 
     protected void updateTopicPolicyByNamespacePolicy(Policies 
namespacePolicies) {
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/DispatchRateLimiter.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/DispatchRateLimiter.java
index af2f45e9dcc..e8c14c99685 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/DispatchRateLimiter.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/DispatchRateLimiter.java
@@ -46,6 +46,7 @@ public class DispatchRateLimiter {
 
     private final PersistentTopic topic;
     private final String topicName;
+    private final String subscriptionName;
     private final Type type;
 
     private final BrokerService brokerService;
@@ -53,8 +54,13 @@ public class DispatchRateLimiter {
     private RateLimiter dispatchRateLimiterOnByte;
 
     public DispatchRateLimiter(PersistentTopic topic, Type type) {
+        this(topic, null, type);
+    }
+
+    public DispatchRateLimiter(PersistentTopic topic, String subscriptionName, 
Type type) {
         this.topic = topic;
         this.topicName = topic.getName();
+        this.subscriptionName = subscriptionName;
         this.brokerService = topic.getBrokerService();
         this.type = type;
         updateDispatchRate();
@@ -63,6 +69,7 @@ public class DispatchRateLimiter {
     public DispatchRateLimiter(BrokerService brokerService) {
         this.topic = null;
         this.topicName = null;
+        this.subscriptionName = null;
         this.brokerService = brokerService;
         this.type = Type.BROKER;
         updateDispatchRate();
@@ -175,7 +182,7 @@ public class DispatchRateLimiter {
                 dispatchRate = topic.getDispatchRate();
                 break;
             case SUBSCRIPTION:
-                dispatchRate = topic.getSubscriptionDispatchRate();
+                dispatchRate = 
topic.getSubscriptionDispatchRate(subscriptionName);
                 break;
             case REPLICATOR:
                 dispatchRate = topic.getReplicatorDispatchRate();
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java
index 517b08a4bd7..c9e5bcbcce6 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java
@@ -836,9 +836,10 @@ public class PersistentDispatcherMultipleConsumers extends 
AbstractDispatcherMul
 
     @Override
     public boolean initializeDispatchRateLimiterIfNeeded() {
-        if (!dispatchRateLimiter.isPresent()
-            && 
DispatchRateLimiter.isDispatchRateEnabled(topic.getSubscriptionDispatchRate())) 
{
-            this.dispatchRateLimiter = Optional.of(new 
DispatchRateLimiter(topic, Type.SUBSCRIPTION));
+        if (!dispatchRateLimiter.isPresent() && 
DispatchRateLimiter.isDispatchRateEnabled(
+                topic.getSubscriptionDispatchRate(getSubscriptionName()))) {
+            this.dispatchRateLimiter =
+                    Optional.of(new DispatchRateLimiter(topic, 
getSubscriptionName(), Type.SUBSCRIPTION));
             return true;
         }
         return false;
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherSingleActiveConsumer.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherSingleActiveConsumer.java
index c0effad9bbd..e062b070a65 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherSingleActiveConsumer.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherSingleActiveConsumer.java
@@ -558,9 +558,10 @@ public class PersistentDispatcherSingleActiveConsumer 
extends AbstractDispatcher
 
     @Override
     public boolean initializeDispatchRateLimiterIfNeeded() {
-        if (!dispatchRateLimiter.isPresent()
-            && 
DispatchRateLimiter.isDispatchRateEnabled(topic.getSubscriptionDispatchRate())) 
{
-            this.dispatchRateLimiter = Optional.of(new 
DispatchRateLimiter(topic, Type.SUBSCRIPTION));
+        if (!dispatchRateLimiter.isPresent() && 
DispatchRateLimiter.isDispatchRateEnabled(
+                topic.getSubscriptionDispatchRate(getSubscriptionName()))) {
+            this.dispatchRateLimiter =
+                    Optional.of(new DispatchRateLimiter(topic, 
getSubscriptionName(), Type.SUBSCRIPTION));
             return true;
         }
         return false;
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TopicPoliciesTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TopicPoliciesTest.java
index 320a375145a..4d6a0a7b15e 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TopicPoliciesTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TopicPoliciesTest.java
@@ -1316,6 +1316,58 @@ public class TopicPoliciesTest extends 
MockedPulsarServiceBaseTest {
         admin.topics().delete(topic, true);
     }
 
+    @Test
+    public void testSubscriptionLevelDispatchRate() throws Exception{
+        final String topic = testTopic + UUID.randomUUID();
+        admin.topics().createNonPartitionedTopic(topic);
+
+        String subscriptionName = "testSubscriptionLevelDispatchRate";
+        @Cleanup
+        Consumer<byte[]> consumer = 
pulsarClient.newConsumer().subscriptionName(subscriptionName).topic(topic).subscribe();
+        AbstractTopic abstractTopic = (AbstractTopic) 
pulsar.getBrokerService().getTopicIfExists(topic).get().get();
+
+        DispatchRate topicLevelRate = DispatchRate.builder()
+                .dispatchThrottlingRateInMsg(100)
+                .dispatchThrottlingRateInByte(1024)
+                .ratePeriodInSecond(1)
+                .build();
+        admin.topicPolicies().setSubscriptionDispatchRate(topic, 
topicLevelRate);
+        Awaitility.await().untilAsserted(() ->
+                
Assert.assertEquals(admin.topicPolicies().getSubscriptionDispatchRate(topic), 
topicLevelRate));
+
+        // topic level is set, applied value should be topic level setting.
+        
Assert.assertEquals(admin.topicPolicies().getSubscriptionDispatchRate(topic, 
subscriptionName, true),
+                topicLevelRate);
+        // subscription level setting is not set.
+        
Assert.assertNull(admin.topicPolicies().getSubscriptionDispatchRate(topic, 
subscriptionName));
+
+        DispatchRate subLevelRate = DispatchRate.builder()
+                .dispatchThrottlingRateInMsg(1000)
+                .dispatchThrottlingRateInByte(1024 * 1024)
+                .ratePeriodInSecond(1)
+                .build();
+
+        admin.topicPolicies().setSubscriptionDispatchRate(topic, 
subscriptionName, subLevelRate);
+        Awaitility.await().untilAsserted(() ->
+                
Assert.assertEquals(admin.topicPolicies().getSubscriptionDispatchRate(topic, 
subscriptionName),
+                        subLevelRate));
+
+        Awaitility.await().untilAsserted(() -> Assert.assertEquals(
+                abstractTopic.getSubscriptionDispatchRate(subscriptionName), 
subLevelRate));
+
+        DispatchRateLimiter dispatchRateLimiter = 
abstractTopic.getSubscription(subscriptionName)
+                .getDispatcher().getRateLimiter().get();
+        Assert.assertNotNull(dispatchRateLimiter);
+        Assert.assertEquals(dispatchRateLimiter.getDispatchRateOnByte(), 
subLevelRate.getDispatchThrottlingRateInByte());
+        Assert.assertEquals(dispatchRateLimiter.getDispatchRateOnMsg(), 
subLevelRate.getDispatchThrottlingRateInMsg());
+
+        admin.topicPolicies().removeSubscriptionDispatchRate(topic, 
subscriptionName);
+        Awaitility.await().untilAsserted(() ->
+                
Assert.assertNull(admin.topicPolicies().getSubscriptionDispatchRate(topic, 
subscriptionName)));
+
+        admin.topics().delete(topic, true);
+    }
+
     @Test
     public void testGetSetSubscriptionDispatchRateAfterTopicLoaded() throws 
Exception {
         final String topic = testTopic + UUID.randomUUID();
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicTest.java
index c41584a1dae..5383c35daa5 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicTest.java
@@ -364,7 +364,9 @@ public class PersistentTopicTest extends 
MockedBookKeeperTestCase {
         PersistentTopic topic = 
spyWithClassAndConstructorArgs(PersistentTopic.class, successTopicName, 
ledgerMock, brokerService);
         ManagedCursor cursor = mock(ManagedCursor.class);
         when(cursor.getName()).thenReturn("cursor");
-        PersistentDispatcherMultipleConsumers dispatcher = new 
PersistentDispatcherMultipleConsumers(topic, cursor, null);
+        Subscription subscription = mock(Subscription.class);
+        when(subscription.getName()).thenReturn("sub");
+        PersistentDispatcherMultipleConsumers dispatcher = new 
PersistentDispatcherMultipleConsumers(topic, cursor, subscription);
         dispatcher.readEntriesFailed(new 
ManagedLedgerException.InvalidCursorPositionException("failed"), null);
         verify(topic, atLeast(1)).getBrokerService();
     }
diff --git 
a/pulsar-client-admin-api/src/main/java/org/apache/pulsar/client/admin/TopicPolicies.java
 
b/pulsar-client-admin-api/src/main/java/org/apache/pulsar/client/admin/TopicPolicies.java
index 1dfb79d7ba0..dabe69301cf 100644
--- 
a/pulsar-client-admin-api/src/main/java/org/apache/pulsar/client/admin/TopicPolicies.java
+++ 
b/pulsar-client-admin-api/src/main/java/org/apache/pulsar/client/admin/TopicPolicies.java
@@ -910,6 +910,51 @@ public interface TopicPolicies {
      */
     CompletableFuture<Void> removeSubscriptionDispatchRateAsync(String topic);
 
+    /**
+     * Set dispatch rate limiter for a specific subscription.
+     */
+    void setSubscriptionDispatchRate(String topic, String subscriptionName, 
DispatchRate dispatchRate)
+            throws PulsarAdminException;
+
+    /**
+     * Async version of {@link #setSubscriptionDispatchRate(String, String, 
DispatchRate)}.
+     */
+    CompletableFuture<Void> setSubscriptionDispatchRateAsync(String topic, 
String subscriptionName,
+                                                             DispatchRate 
dispatchRate);
+
+    /**
+     * If applied is true, get dispatch rate limiter for a specific 
subscription.
+     * Or else, return subscription level setting.
+     */
+    DispatchRate getSubscriptionDispatchRate(String topic, String 
subscriptionName, boolean applied)
+            throws PulsarAdminException;
+
+    /**
+     * Async version of {@link #getSubscriptionDispatchRate(String, String, 
boolean)}.
+     */
+    CompletableFuture<DispatchRate> getSubscriptionDispatchRateAsync(String 
topic, String subscriptionName,
+                                                                     boolean 
applied);
+
+    /**
+     * Get subscription level dispatch rate limiter setting for a specific 
subscription.
+     */
+    DispatchRate getSubscriptionDispatchRate(String topic, String 
subscriptionName) throws PulsarAdminException;
+
+    /**
+     * Async version of {@link #getSubscriptionDispatchRate(String, String)}.
+     */
+    CompletableFuture<DispatchRate> getSubscriptionDispatchRateAsync(String 
topic, String subscriptionName);
+
+    /**
+     * Remove subscription level dispatch rate limiter setting for a specific 
subscription.
+     */
+    void removeSubscriptionDispatchRate(String topic, String subscriptionName) 
throws PulsarAdminException;
+
+    /**
+     * Async version of {@link #removeSubscriptionDispatchRate(String, 
String)}.
+     */
+    CompletableFuture<Void> removeSubscriptionDispatchRateAsync(String topic, 
String subscriptionName);
+
     /**
      * Set replicatorDispatchRate for the topic.
      * <p/>
diff --git 
a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/TopicPoliciesImpl.java
 
b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/TopicPoliciesImpl.java
index e5b1d697c06..5cba329f4e0 100644
--- 
a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/TopicPoliciesImpl.java
+++ 
b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/TopicPoliciesImpl.java
@@ -753,6 +753,70 @@ public class TopicPoliciesImpl extends BaseResource 
implements TopicPolicies {
         return asyncDeleteRequest(path);
     }
 
+    @Override
+    public void setSubscriptionDispatchRate(String topic, String 
subscriptionName, DispatchRate dispatchRate)
+            throws PulsarAdminException {
+        sync(() -> setSubscriptionDispatchRateAsync(topic, subscriptionName, 
dispatchRate));
+    }
+
+    @Override
+    public CompletableFuture<Void> setSubscriptionDispatchRateAsync(String 
topic, String subscriptionName,
+                                                                    
DispatchRate dispatchRate) {
+        TopicName topicName = validateTopic(topic);
+        WebTarget path = topicPath(topicName, subscriptionName, 
"dispatchRate");
+        return asyncPostRequest(path, Entity.entity(dispatchRate, 
MediaType.APPLICATION_JSON));
+    }
+
+    @Override
+    public DispatchRate getSubscriptionDispatchRate(String topic, String 
subscriptionName, boolean applied)
+            throws PulsarAdminException {
+        return sync(() -> getSubscriptionDispatchRateAsync(topic, 
subscriptionName, applied));
+    }
+
+    @Override
+    public CompletableFuture<DispatchRate> 
getSubscriptionDispatchRateAsync(String topic, String subscriptionName,
+                                                                            
boolean applied) {
+        TopicName topicName = validateTopic(topic);
+        WebTarget path = topicPath(topicName, subscriptionName, 
"dispatchRate");
+        path = path.queryParam("applied", applied);
+        final CompletableFuture<DispatchRate> future = new 
CompletableFuture<>();
+        asyncGetRequest(path,
+                new InvocationCallback<DispatchRate>() {
+                    @Override
+                    public void completed(DispatchRate dispatchRate) {
+                        future.complete(dispatchRate);
+                    }
+
+                    @Override
+                    public void failed(Throwable throwable) {
+                        
future.completeExceptionally(getApiException(throwable.getCause()));
+                    }
+                });
+        return future;
+    }
+
+    @Override
+    public DispatchRate getSubscriptionDispatchRate(String topic, String 
subscriptionName) throws PulsarAdminException {
+        return sync(() -> getSubscriptionDispatchRateAsync(topic, 
subscriptionName));
+    }
+
+    @Override
+    public CompletableFuture<DispatchRate> 
getSubscriptionDispatchRateAsync(String topic, String subscriptionName) {
+        return getSubscriptionDispatchRateAsync(topic, subscriptionName, 
false);
+    }
+
+    @Override
+    public void removeSubscriptionDispatchRate(String topic, String 
subscriptionName) throws PulsarAdminException {
+        sync(() -> removeSubscriptionDispatchRateAsync(topic, 
subscriptionName));
+    }
+
+    @Override
+    public CompletableFuture<Void> removeSubscriptionDispatchRateAsync(String 
topic, String subscriptionName) {
+        TopicName topicName = validateTopic(topic);
+        WebTarget path = topicPath(topicName, subscriptionName, 
"dispatchRate");
+        return asyncDeleteRequest(path);
+    }
+
     @Override
     public Long getCompactionThreshold(String topic) throws 
PulsarAdminException {
         return getCompactionThreshold(topic, false);
diff --git 
a/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/SubscriptionPolicies.java
 
b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/SubscriptionPolicies.java
new file mode 100644
index 00000000000..055ed28af97
--- /dev/null
+++ 
b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/SubscriptionPolicies.java
@@ -0,0 +1,45 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.pulsar.common.policies.data;
+
+import lombok.AllArgsConstructor;
+import lombok.Builder;
+import lombok.Data;
+import lombok.NoArgsConstructor;
+import org.apache.pulsar.common.policies.data.impl.DispatchRateImpl;
+
+/**
+ * Subscription policies.
+ */
+@Data
+@Builder
+@NoArgsConstructor
+@AllArgsConstructor
+public class SubscriptionPolicies {
+    private DispatchRateImpl dispatchRate;
+
+    /**
+     * Check if this SubscriptionPolicies is empty. Empty SubscriptionPolicies 
can be auto removed from TopicPolicies.
+     * @return true if this SubscriptionPolicies is empty.
+     */
+    public boolean checkEmpty() {
+        return dispatchRate == null;
+    }
+}
diff --git 
a/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/TopicPolicies.java
 
b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/TopicPolicies.java
index 6e81509c830..ae04c2e2178 100644
--- 
a/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/TopicPolicies.java
+++ 
b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/TopicPolicies.java
@@ -72,6 +72,12 @@ public class TopicPolicies {
     private DispatchRateImpl replicatorDispatchRate;
     private SchemaCompatibilityStrategy schemaCompatibilityStrategy;
 
+    /**
+     * Subscription level policies for specific subscription.
+     */
+    @Builder.Default
+    private Map<String/*subscription*/, SubscriptionPolicies> 
subscriptionPolicies = new HashMap<>();
+
     public boolean isGlobalPolicies() {
         return isGlobal != null && isGlobal;
     }

Reply via email to