nodece commented on code in PR #15295:
URL: https://github.com/apache/pulsar/pull/15295#discussion_r857358213
##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java:
##########
@@ -4396,6 +4397,60 @@ protected CompletableFuture<Void>
internalRemoveSubscriptionDispatchRate(boolean
});
}
+ protected CompletableFuture<DispatchRate>
internalGetSubscriptionLevelDispatchRate(String subName, boolean applied,
+
boolean isGlobal) {
+ return getTopicPoliciesAsyncWithRetry(topicName, isGlobal)
+ .thenCompose(otp -> {
+ DispatchRateImpl rate = otp.map(tp ->
tp.getSubscriptionPolicies().get(subName))
+ .map(SubscriptionPolicies::getDispatchRate)
+ .orElse(null);
+ if (applied && rate == null) {
+ return internalGetSubscriptionDispatchRate(true,
isGlobal);
+ } else {
+ return CompletableFuture.completedFuture(rate);
+ }
+ });
+ }
+
+ protected CompletableFuture<Void>
internalSetSubscriptionLevelDispatchRate(String subName,
+
DispatchRateImpl dispatchRate,
+
boolean isGlobal) {
+ final DispatchRateImpl newDispatchRate =
DispatchRateImpl.normalize(dispatchRate);
+ if (newDispatchRate == null) {
+ return CompletableFuture.completedFuture(null);
+ }
+ return getTopicPoliciesAsyncWithRetry(topicName, isGlobal)
+ .thenCompose(op -> {
+ TopicPolicies topicPolicies =
op.orElseGet(TopicPolicies::new);
+ topicPolicies.setIsGlobal(isGlobal);
+ topicPolicies.getSubscriptionPolicies()
+ .computeIfAbsent(subName, k -> new
SubscriptionPolicies())
+ .setDispatchRate(newDispatchRate);
+ return
pulsar().getTopicPoliciesService().updateTopicPoliciesAsync(topicName,
topicPolicies);
+ });
+ }
+
+ protected CompletableFuture<Void>
internalRemoveSubscriptionLevelDispatchRate(String subName, boolean isGlobal) {
+ return getTopicPoliciesAsyncWithRetry(topicName, isGlobal)
+ .thenCompose(op -> {
+ if (!op.isPresent()) {
+ return CompletableFuture.completedFuture(null);
+ }
+ TopicPolicies topicPolicies = op.get();
+ SubscriptionPolicies sp =
topicPolicies.getSubscriptionPolicies().get(subName);
+ if (sp == null) {
+ return CompletableFuture.completedFuture(null);
+ }
+ sp.setDispatchRate(null);
+ if (sp.checkEmpty()) {
+ // cleanup empty SubscriptionPolicies
+ topicPolicies.getSubscriptionPolicies().remove(subName,
sp);
+ }
+ topicPolicies.setIsGlobal(isGlobal);
Review Comment:
Got
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]