codelipenghui commented on a change in pull request #7646:
URL: https://github.com/apache/pulsar/pull/7646#discussion_r459477823



##########
File path: 
pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
##########
@@ -2000,6 +2004,94 @@ protected PersistentOfflineTopicStats 
internalGetBacklog(boolean authoritative)
         return offlineTopicStats;
     }
 
+    protected void internalSetBacklogQuota(AsyncResponse asyncResponse, 
BacklogQuota.BacklogQuotaType backlogQuotaType, BacklogQuota backlogQuota) {
+        validateAdminAccessForTenant(namespaceName.getTenant());
+        validatePoliciesReadOnlyAccess();
+        if (topicName.isGlobal()) {
+            validateGlobalNamespaceOwnership(namespaceName);
+        }
+        if (backlogQuotaType == null) {
+            backlogQuotaType = 
BacklogQuota.BacklogQuotaType.destination_storage;
+        }
+        checkTopicLevelPolicyEnable();
+        TopicPolicies topicPolicies;
+        try {
+            topicPolicies = 
pulsar().getTopicPoliciesService().getTopicPolicies(topicName);
+        } catch (BrokerServiceException.TopicPoliciesCacheNotInitException e) {
+            log.warn("Topic {} policies cache have not init.", topicName);
+            asyncResponse.resume(new RestException(e));
+            return;
+        }
+        if (topicPolicies == null){
+            topicPolicies = new TopicPolicies();
+        }
+
+        RetentionPolicies retentionPolicies = getRetentionPolicies(topicName, 
topicPolicies);
+        if(!checkQuotas(backlogQuota,retentionPolicies)){
+            log.warn(
+                    "[{}] Failed to update backlog configuration for topic {}: 
conflicts with retention quota",
+                    clientAppId(), topicName);
+            throw new RestException(Status.PRECONDITION_FAILED,
+                    "Backlog Quota exceeds configured retention quota for 
topic. Please increase retention quota and retry");
+        }
+
+        if(backlogQuota!=null){

Review comment:
       ```suggestion
           if(backlogQuota != null){
   ```
   Please check all.

##########
File path: 
pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
##########
@@ -2000,6 +2004,94 @@ protected PersistentOfflineTopicStats 
internalGetBacklog(boolean authoritative)
         return offlineTopicStats;
     }
 
+    protected void internalSetBacklogQuota(AsyncResponse asyncResponse, 
BacklogQuota.BacklogQuotaType backlogQuotaType, BacklogQuota backlogQuota) {
+        validateAdminAccessForTenant(namespaceName.getTenant());
+        validatePoliciesReadOnlyAccess();
+        if (topicName.isGlobal()) {
+            validateGlobalNamespaceOwnership(namespaceName);
+        }
+        if (backlogQuotaType == null) {
+            backlogQuotaType = 
BacklogQuota.BacklogQuotaType.destination_storage;
+        }
+        checkTopicLevelPolicyEnable();
+        TopicPolicies topicPolicies;
+        try {
+            topicPolicies = 
pulsar().getTopicPoliciesService().getTopicPolicies(topicName);
+        } catch (BrokerServiceException.TopicPoliciesCacheNotInitException e) {
+            log.warn("Topic {} policies cache have not init.", topicName);
+            asyncResponse.resume(new RestException(e));
+            return;
+        }
+        if (topicPolicies == null){
+            topicPolicies = new TopicPolicies();
+        }
+
+        RetentionPolicies retentionPolicies = getRetentionPolicies(topicName, 
topicPolicies);
+        if(!checkQuotas(backlogQuota,retentionPolicies)){
+            log.warn(
+                    "[{}] Failed to update backlog configuration for topic {}: 
conflicts with retention quota",
+                    clientAppId(), topicName);
+            throw new RestException(Status.PRECONDITION_FAILED,
+                    "Backlog Quota exceeds configured retention quota for 
topic. Please increase retention quota and retry");

Review comment:
       asyncResponse.resume(new RestException(Status.PRECONDITION_FAILED,
                       "Backlog Quota exceeds configured retention quota for 
topic. Please increase retention quota and retry"))

##########
File path: 
pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
##########
@@ -2000,6 +2004,94 @@ protected PersistentOfflineTopicStats 
internalGetBacklog(boolean authoritative)
         return offlineTopicStats;
     }
 
+    protected void internalSetBacklogQuota(AsyncResponse asyncResponse, 
BacklogQuota.BacklogQuotaType backlogQuotaType, BacklogQuota backlogQuota) {
+        validateAdminAccessForTenant(namespaceName.getTenant());
+        validatePoliciesReadOnlyAccess();
+        if (topicName.isGlobal()) {
+            validateGlobalNamespaceOwnership(namespaceName);
+        }
+        if (backlogQuotaType == null) {
+            backlogQuotaType = 
BacklogQuota.BacklogQuotaType.destination_storage;
+        }
+        checkTopicLevelPolicyEnable();
+        TopicPolicies topicPolicies;
+        try {
+            topicPolicies = 
pulsar().getTopicPoliciesService().getTopicPolicies(topicName);
+        } catch (BrokerServiceException.TopicPoliciesCacheNotInitException e) {
+            log.warn("Topic {} policies cache have not init.", topicName);
+            asyncResponse.resume(new RestException(e));
+            return;
+        }
+        if (topicPolicies == null){
+            topicPolicies = new TopicPolicies();
+        }
+
+        RetentionPolicies retentionPolicies = getRetentionPolicies(topicName, 
topicPolicies);
+        if(!checkQuotas(backlogQuota,retentionPolicies)){
+            log.warn(
+                    "[{}] Failed to update backlog configuration for topic {}: 
conflicts with retention quota",
+                    clientAppId(), topicName);
+            throw new RestException(Status.PRECONDITION_FAILED,
+                    "Backlog Quota exceeds configured retention quota for 
topic. Please increase retention quota and retry");
+        }
+
+        if(backlogQuota!=null){
+            topicPolicies.getBackLogQuotaMap().put(backlogQuotaType.name(), 
backlogQuota);
+        }else {
+            topicPolicies.getBackLogQuotaMap().remove(backlogQuotaType.name());
+        }
+        Map<String, BacklogQuota> backLogQuotaMap = 
topicPolicies.getBackLogQuotaMap();
+        pulsar().getTopicPoliciesService().updateTopicPoliciesAsync(topicName, 
topicPolicies)
+                .whenComplete((r, ex) -> {
+                    if (ex != null) {
+                        log.error("Failed updated backlog quota map",ex);
+                        asyncResponse.resume(new RestException(ex));
+                    } else {
+                        try {
+                            log.info("[{}] Successfully updated backlog quota 
map: namespace={}, topic={}, map={}",
+                                    clientAppId(),
+                                    namespaceName,
+                                    topicName.getLocalName(),
+                                    
jsonMapper().writeValueAsString(backLogQuotaMap));
+                        } catch (JsonProcessingException ignore) { }
+                        asyncResponse.resume(Response.noContent().build());
+                    }
+                });
+    }
+
+    private RetentionPolicies getRetentionPolicies(TopicName topicName, 
TopicPolicies topicPolicies) {
+        RetentionPolicies retentionPolicies = 
topicPolicies.getRetentionPolicies();
+        if (retentionPolicies == null){
+            try {
+                retentionPolicies = 
getNamespacePoliciesAsync(topicName.getNamespaceObject())
+                        .thenApply(policies -> policies.retention_policies)
+                        .get(1L, TimeUnit.SECONDS);
+            } catch (Exception e) {
+               throw new RestException(e);
+            }
+        }
+        return retentionPolicies;
+    }
+
+    protected void internalRemoveBacklogQuota(AsyncResponse asyncResponse,
+            BacklogQuota.BacklogQuotaType backlogQuotaType) {
+        internalSetBacklogQuota(asyncResponse, backlogQuotaType, null);
+    }
+
+    private boolean checkQuotas(BacklogQuota quota, RetentionPolicies 
retention) {
+        if (retention==null||retention.getRetentionSizeInMB() == 0 ||
+                retention.getRetentionSizeInMB() == -1) {
+            return true;
+        }
+        if (quota == null) {
+            quota = 
pulsar().getBrokerService().getBacklogQuotaManager().getDefaultQuota();
+        }
+        if (quota.getLimit() >= ( retention.getRetentionSizeInMB() * 1024 * 
1024)) {
+            return false;
+        }
+        return true;
+    }
+

Review comment:
       Should keep consistent with checkQuotas in the NamespacesBase, And you 
can move this method to AdminResource so that you can use it both in the 
NamespacesBase and PersistentTopicsBase

##########
File path: 
pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java
##########
@@ -977,6 +980,50 @@ public PersistentOfflineTopicStats getBacklog(
         return internalGetBacklog(authoritative);
     }
 
+    @GET
+    @Path("/{tenant}/{namespace}/{topic}/backlogQuotaMap")

Review comment:
       ```suggestion
       @Path("/{tenant}/{namespace}/{topic}/backlogQuota")
   ```




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to