nodece commented on code in PR #15295:
URL: https://github.com/apache/pulsar/pull/15295#discussion_r857337552


##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java:
##########
@@ -4396,6 +4397,60 @@ protected CompletableFuture<Void> 
internalRemoveSubscriptionDispatchRate(boolean
             });
     }
 
+    protected CompletableFuture<DispatchRate> 
internalGetSubscriptionLevelDispatchRate(String subName, boolean applied,
+                                                                               
        boolean isGlobal) {
+        return getTopicPoliciesAsyncWithRetry(topicName, isGlobal)
+                .thenCompose(otp -> {
+                    DispatchRateImpl rate = otp.map(tp -> 
tp.getSubscriptionPolicies().get(subName))
+                            .map(SubscriptionPolicies::getDispatchRate)
+                            .orElse(null);
+                    if (applied && rate == null) {
+                        return internalGetSubscriptionDispatchRate(true, 
isGlobal);
+                    } else {
+                        return CompletableFuture.completedFuture(rate);
+                    }
+                });
+    }
+
+    protected CompletableFuture<Void> 
internalSetSubscriptionLevelDispatchRate(String subName,
+                                                                               
DispatchRateImpl dispatchRate,
+                                                                               
boolean isGlobal) {
+        final DispatchRateImpl newDispatchRate = 
DispatchRateImpl.normalize(dispatchRate);
+        if (newDispatchRate == null) {
+            return CompletableFuture.completedFuture(null);
+        }
+        return getTopicPoliciesAsyncWithRetry(topicName, isGlobal)
+                .thenCompose(op -> {
+                    TopicPolicies topicPolicies = 
op.orElseGet(TopicPolicies::new);
+                    topicPolicies.setIsGlobal(isGlobal);
+                    topicPolicies.getSubscriptionPolicies()
+                            .computeIfAbsent(subName, k -> new 
SubscriptionPolicies())
+                            .setDispatchRate(newDispatchRate);
+                    return 
pulsar().getTopicPoliciesService().updateTopicPoliciesAsync(topicName, 
topicPolicies);
+                });
+    }
+
+    protected CompletableFuture<Void> 
internalRemoveSubscriptionLevelDispatchRate(String subName, boolean isGlobal) {
+        return getTopicPoliciesAsyncWithRetry(topicName, isGlobal)
+            .thenCompose(op -> {
+                if (!op.isPresent()) {
+                    return CompletableFuture.completedFuture(null);
+                }
+                TopicPolicies topicPolicies = op.get();
+                SubscriptionPolicies sp = 
topicPolicies.getSubscriptionPolicies().get(subName);
+                if (sp == null) {
+                    return CompletableFuture.completedFuture(null);
+                }
+                sp.setDispatchRate(null);
+                if (sp.checkEmpty()) {
+                    // cleanup empty SubscriptionPolicies
+                    topicPolicies.getSubscriptionPolicies().remove(subName, 
sp);
+                }
+                topicPolicies.setIsGlobal(isGlobal);

Review Comment:
   Why call the `topicPolicies.setIsGlobal(isGlobal)`?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to