jianyun8023 commented on a change in pull request #7646:
URL: https://github.com/apache/pulsar/pull/7646#discussion_r459879940



##########
File path: 
pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
##########
@@ -2000,6 +2004,94 @@ protected PersistentOfflineTopicStats 
internalGetBacklog(boolean authoritative)
         return offlineTopicStats;
     }
 
+    protected void internalSetBacklogQuota(AsyncResponse asyncResponse, 
BacklogQuota.BacklogQuotaType backlogQuotaType, BacklogQuota backlogQuota) {
+        validateAdminAccessForTenant(namespaceName.getTenant());
+        validatePoliciesReadOnlyAccess();
+        if (topicName.isGlobal()) {
+            validateGlobalNamespaceOwnership(namespaceName);
+        }
+        if (backlogQuotaType == null) {
+            backlogQuotaType = 
BacklogQuota.BacklogQuotaType.destination_storage;
+        }
+        checkTopicLevelPolicyEnable();
+        TopicPolicies topicPolicies;
+        try {
+            topicPolicies = 
pulsar().getTopicPoliciesService().getTopicPolicies(topicName);
+        } catch (BrokerServiceException.TopicPoliciesCacheNotInitException e) {
+            log.warn("Topic {} policies cache have not init.", topicName);
+            asyncResponse.resume(new RestException(e));
+            return;
+        }
+        if (topicPolicies == null){
+            topicPolicies = new TopicPolicies();
+        }
+
+        RetentionPolicies retentionPolicies = getRetentionPolicies(topicName, 
topicPolicies);
+        if(!checkQuotas(backlogQuota,retentionPolicies)){
+            log.warn(
+                    "[{}] Failed to update backlog configuration for topic {}: 
conflicts with retention quota",
+                    clientAppId(), topicName);
+            throw new RestException(Status.PRECONDITION_FAILED,
+                    "Backlog Quota exceeds configured retention quota for 
topic. Please increase retention quota and retry");
+        }
+
+        if(backlogQuota!=null){
+            topicPolicies.getBackLogQuotaMap().put(backlogQuotaType.name(), 
backlogQuota);
+        }else {
+            topicPolicies.getBackLogQuotaMap().remove(backlogQuotaType.name());
+        }
+        Map<String, BacklogQuota> backLogQuotaMap = 
topicPolicies.getBackLogQuotaMap();
+        pulsar().getTopicPoliciesService().updateTopicPoliciesAsync(topicName, 
topicPolicies)
+                .whenComplete((r, ex) -> {
+                    if (ex != null) {
+                        log.error("Failed updated backlog quota map",ex);
+                        asyncResponse.resume(new RestException(ex));
+                    } else {
+                        try {
+                            log.info("[{}] Successfully updated backlog quota 
map: namespace={}, topic={}, map={}",
+                                    clientAppId(),
+                                    namespaceName,
+                                    topicName.getLocalName(),
+                                    
jsonMapper().writeValueAsString(backLogQuotaMap));
+                        } catch (JsonProcessingException ignore) { }
+                        asyncResponse.resume(Response.noContent().build());
+                    }
+                });
+    }
+
+    private RetentionPolicies getRetentionPolicies(TopicName topicName, 
TopicPolicies topicPolicies) {
+        RetentionPolicies retentionPolicies = 
topicPolicies.getRetentionPolicies();
+        if (retentionPolicies == null){
+            try {
+                retentionPolicies = 
getNamespacePoliciesAsync(topicName.getNamespaceObject())
+                        .thenApply(policies -> policies.retention_policies)
+                        .get(1L, TimeUnit.SECONDS);
+            } catch (Exception e) {
+               throw new RestException(e);
+            }
+        }
+        return retentionPolicies;
+    }
+
+    protected void internalRemoveBacklogQuota(AsyncResponse asyncResponse,
+            BacklogQuota.BacklogQuotaType backlogQuotaType) {
+        internalSetBacklogQuota(asyncResponse, backlogQuotaType, null);
+    }
+
+    private boolean checkQuotas(BacklogQuota quota, RetentionPolicies 
retention) {
+        if (retention==null||retention.getRetentionSizeInMB() == 0 ||
+                retention.getRetentionSizeInMB() == -1) {
+            return true;
+        }
+        if (quota == null) {
+            quota = 
pulsar().getBrokerService().getBacklogQuotaManager().getDefaultQuota();
+        }
+        if (quota.getLimit() >= ( retention.getRetentionSizeInMB() * 1024 * 
1024)) {
+            return false;
+        }
+        return true;
+    }
+

Review comment:
       Ok, I extracted it.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to