codelipenghui commented on a change in pull request #7646:
URL: https://github.com/apache/pulsar/pull/7646#discussion_r459477823
##########
File path:
pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
##########
@@ -2000,6 +2004,94 @@ protected PersistentOfflineTopicStats
internalGetBacklog(boolean authoritative)
return offlineTopicStats;
}
+ protected void internalSetBacklogQuota(AsyncResponse asyncResponse,
BacklogQuota.BacklogQuotaType backlogQuotaType, BacklogQuota backlogQuota) {
+ validateAdminAccessForTenant(namespaceName.getTenant());
+ validatePoliciesReadOnlyAccess();
+ if (topicName.isGlobal()) {
+ validateGlobalNamespaceOwnership(namespaceName);
+ }
+ if (backlogQuotaType == null) {
+ backlogQuotaType =
BacklogQuota.BacklogQuotaType.destination_storage;
+ }
+ checkTopicLevelPolicyEnable();
+ TopicPolicies topicPolicies;
+ try {
+ topicPolicies =
pulsar().getTopicPoliciesService().getTopicPolicies(topicName);
+ } catch (BrokerServiceException.TopicPoliciesCacheNotInitException e) {
+ log.warn("Topic {} policies cache have not init.", topicName);
+ asyncResponse.resume(new RestException(e));
+ return;
+ }
+ if (topicPolicies == null){
+ topicPolicies = new TopicPolicies();
+ }
+
+ RetentionPolicies retentionPolicies = getRetentionPolicies(topicName,
topicPolicies);
+ if(!checkQuotas(backlogQuota,retentionPolicies)){
+ log.warn(
+ "[{}] Failed to update backlog configuration for topic {}:
conflicts with retention quota",
+ clientAppId(), topicName);
+ throw new RestException(Status.PRECONDITION_FAILED,
+ "Backlog Quota exceeds configured retention quota for
topic. Please increase retention quota and retry");
+ }
+
+ if(backlogQuota!=null){
Review comment:
```suggestion
if(backlogQuota != null){
```
Please check all.
##########
File path:
pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
##########
@@ -2000,6 +2004,94 @@ protected PersistentOfflineTopicStats
internalGetBacklog(boolean authoritative)
return offlineTopicStats;
}
+ protected void internalSetBacklogQuota(AsyncResponse asyncResponse,
BacklogQuota.BacklogQuotaType backlogQuotaType, BacklogQuota backlogQuota) {
+ validateAdminAccessForTenant(namespaceName.getTenant());
+ validatePoliciesReadOnlyAccess();
+ if (topicName.isGlobal()) {
+ validateGlobalNamespaceOwnership(namespaceName);
+ }
+ if (backlogQuotaType == null) {
+ backlogQuotaType =
BacklogQuota.BacklogQuotaType.destination_storage;
+ }
+ checkTopicLevelPolicyEnable();
+ TopicPolicies topicPolicies;
+ try {
+ topicPolicies =
pulsar().getTopicPoliciesService().getTopicPolicies(topicName);
+ } catch (BrokerServiceException.TopicPoliciesCacheNotInitException e) {
+ log.warn("Topic {} policies cache have not init.", topicName);
+ asyncResponse.resume(new RestException(e));
+ return;
+ }
+ if (topicPolicies == null){
+ topicPolicies = new TopicPolicies();
+ }
+
+ RetentionPolicies retentionPolicies = getRetentionPolicies(topicName,
topicPolicies);
+ if(!checkQuotas(backlogQuota,retentionPolicies)){
+ log.warn(
+ "[{}] Failed to update backlog configuration for topic {}:
conflicts with retention quota",
+ clientAppId(), topicName);
+ throw new RestException(Status.PRECONDITION_FAILED,
+ "Backlog Quota exceeds configured retention quota for
topic. Please increase retention quota and retry");
Review comment:
asyncResponse.resume(new RestException(Status.PRECONDITION_FAILED,
"Backlog Quota exceeds configured retention quota for
topic. Please increase retention quota and retry"))
##########
File path:
pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
##########
@@ -2000,6 +2004,94 @@ protected PersistentOfflineTopicStats
internalGetBacklog(boolean authoritative)
return offlineTopicStats;
}
+ protected void internalSetBacklogQuota(AsyncResponse asyncResponse,
BacklogQuota.BacklogQuotaType backlogQuotaType, BacklogQuota backlogQuota) {
+ validateAdminAccessForTenant(namespaceName.getTenant());
+ validatePoliciesReadOnlyAccess();
+ if (topicName.isGlobal()) {
+ validateGlobalNamespaceOwnership(namespaceName);
+ }
+ if (backlogQuotaType == null) {
+ backlogQuotaType =
BacklogQuota.BacklogQuotaType.destination_storage;
+ }
+ checkTopicLevelPolicyEnable();
+ TopicPolicies topicPolicies;
+ try {
+ topicPolicies =
pulsar().getTopicPoliciesService().getTopicPolicies(topicName);
+ } catch (BrokerServiceException.TopicPoliciesCacheNotInitException e) {
+ log.warn("Topic {} policies cache have not init.", topicName);
+ asyncResponse.resume(new RestException(e));
+ return;
+ }
+ if (topicPolicies == null){
+ topicPolicies = new TopicPolicies();
+ }
+
+ RetentionPolicies retentionPolicies = getRetentionPolicies(topicName,
topicPolicies);
+ if(!checkQuotas(backlogQuota,retentionPolicies)){
+ log.warn(
+ "[{}] Failed to update backlog configuration for topic {}:
conflicts with retention quota",
+ clientAppId(), topicName);
+ throw new RestException(Status.PRECONDITION_FAILED,
+ "Backlog Quota exceeds configured retention quota for
topic. Please increase retention quota and retry");
+ }
+
+ if(backlogQuota!=null){
+ topicPolicies.getBackLogQuotaMap().put(backlogQuotaType.name(),
backlogQuota);
+ }else {
+ topicPolicies.getBackLogQuotaMap().remove(backlogQuotaType.name());
+ }
+ Map<String, BacklogQuota> backLogQuotaMap =
topicPolicies.getBackLogQuotaMap();
+ pulsar().getTopicPoliciesService().updateTopicPoliciesAsync(topicName,
topicPolicies)
+ .whenComplete((r, ex) -> {
+ if (ex != null) {
+ log.error("Failed updated backlog quota map",ex);
+ asyncResponse.resume(new RestException(ex));
+ } else {
+ try {
+ log.info("[{}] Successfully updated backlog quota
map: namespace={}, topic={}, map={}",
+ clientAppId(),
+ namespaceName,
+ topicName.getLocalName(),
+
jsonMapper().writeValueAsString(backLogQuotaMap));
+ } catch (JsonProcessingException ignore) { }
+ asyncResponse.resume(Response.noContent().build());
+ }
+ });
+ }
+
+ private RetentionPolicies getRetentionPolicies(TopicName topicName,
TopicPolicies topicPolicies) {
+ RetentionPolicies retentionPolicies =
topicPolicies.getRetentionPolicies();
+ if (retentionPolicies == null){
+ try {
+ retentionPolicies =
getNamespacePoliciesAsync(topicName.getNamespaceObject())
+ .thenApply(policies -> policies.retention_policies)
+ .get(1L, TimeUnit.SECONDS);
+ } catch (Exception e) {
+ throw new RestException(e);
+ }
+ }
+ return retentionPolicies;
+ }
+
+ protected void internalRemoveBacklogQuota(AsyncResponse asyncResponse,
+ BacklogQuota.BacklogQuotaType backlogQuotaType) {
+ internalSetBacklogQuota(asyncResponse, backlogQuotaType, null);
+ }
+
+ private boolean checkQuotas(BacklogQuota quota, RetentionPolicies
retention) {
+ if (retention==null||retention.getRetentionSizeInMB() == 0 ||
+ retention.getRetentionSizeInMB() == -1) {
+ return true;
+ }
+ if (quota == null) {
+ quota =
pulsar().getBrokerService().getBacklogQuotaManager().getDefaultQuota();
+ }
+ if (quota.getLimit() >= ( retention.getRetentionSizeInMB() * 1024 *
1024)) {
+ return false;
+ }
+ return true;
+ }
+
Review comment:
Should keep consistent with checkQuotas in the NamespacesBase, And you
can move this method to AdminResource so that you can use it both in the
NamespacesBase and PersistentTopicsBase
##########
File path:
pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java
##########
@@ -977,6 +980,50 @@ public PersistentOfflineTopicStats getBacklog(
return internalGetBacklog(authoritative);
}
+ @GET
+ @Path("/{tenant}/{namespace}/{topic}/backlogQuotaMap")
Review comment:
```suggestion
@Path("/{tenant}/{namespace}/{topic}/backlogQuota")
```
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]