This is an automated email from the ASF dual-hosted git repository.

penghui pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new b91d0c4  [Broker] Make PersistentTopicsBase#internalSetBacklogQuota 
async (#14051)
b91d0c4 is described below

commit b91d0c45abdc40980830ec43718d5be4771d2603
Author: Zike Yang <[email protected]>
AuthorDate: Tue Feb 8 23:45:31 2022 +0800

    [Broker] Make PersistentTopicsBase#internalSetBacklogQuota async (#14051)
---
 .../broker/admin/impl/PersistentTopicsBase.java    | 89 +++++++++++-----------
 1 file changed, 45 insertions(+), 44 deletions(-)

diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
index 9b421e0..aef1a55 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
@@ -2859,44 +2859,49 @@ public class PersistentTopicsBase extends AdminResource 
{
     }
 
     protected CompletableFuture<Void> 
internalSetBacklogQuota(BacklogQuota.BacklogQuotaType backlogQuotaType,
-                                           BacklogQuotaImpl backlogQuota, 
boolean isGlobal) {
-        validateTopicPolicyOperation(topicName, PolicyName.BACKLOG, 
PolicyOperation.WRITE);
-        validatePoliciesReadOnlyAccess();
-
+                                                              BacklogQuotaImpl 
backlogQuota, boolean isGlobal) {
         BacklogQuota.BacklogQuotaType finalBacklogQuotaType = backlogQuotaType 
== null
                 ? BacklogQuota.BacklogQuotaType.destination_storage : 
backlogQuotaType;
 
-        return getTopicPoliciesAsyncWithRetry(topicName, isGlobal)
-            .thenCompose(op -> {
-                TopicPolicies topicPolicies = op.orElseGet(TopicPolicies::new);
-                RetentionPolicies retentionPolicies = 
getRetentionPolicies(topicName, topicPolicies);
-                if (!checkBacklogQuota(backlogQuota, retentionPolicies)) {
-                    log.warn(
-                            "[{}] Failed to update backlog configuration for 
topic {}: conflicts with retention quota",
-                            clientAppId(), topicName);
-                    return FutureUtil.failedFuture(new 
RestException(Status.PRECONDITION_FAILED,
-                            "Backlog Quota exceeds configured retention quota 
for topic. "
-                                    + "Please increase retention quota and 
retry"));
-                }
-
-                if (backlogQuota != null) {
-                    
topicPolicies.getBackLogQuotaMap().put(finalBacklogQuotaType.name(), 
backlogQuota);
-                } else {
-                    
topicPolicies.getBackLogQuotaMap().remove(finalBacklogQuotaType.name());
-                }
-                Map<String, BacklogQuotaImpl> backLogQuotaMap = 
topicPolicies.getBackLogQuotaMap();
-                topicPolicies.setIsGlobal(isGlobal);
-                return 
pulsar().getTopicPoliciesService().updateTopicPoliciesAsync(topicName, 
topicPolicies)
-                    .thenRun(() -> {
-                        try {
-                            log.info("[{}] Successfully updated backlog quota 
map: namespace={}, topic={}, map={}",
-                                    clientAppId(),
-                                    namespaceName,
-                                    topicName.getLocalName(),
-                                    
jsonMapper().writeValueAsString(backLogQuotaMap));
-                        } catch (JsonProcessingException ignore) { }
+        return validateTopicPolicyOperationAsync(topicName, 
PolicyName.BACKLOG, PolicyOperation.WRITE)
+                .thenAccept(__ -> validatePoliciesReadOnlyAccess())
+                .thenCompose(__ -> getTopicPoliciesAsyncWithRetry(topicName, 
isGlobal))
+                .thenCompose(op -> {
+                    TopicPolicies topicPolicies = 
op.orElseGet(TopicPolicies::new);
+                    return getRetentionPoliciesAsync(topicName, topicPolicies)
+                            .thenCompose(retentionPolicies -> {
+                                if (!checkBacklogQuota(backlogQuota, 
retentionPolicies)) {
+                                    log.warn(
+                                            "[{}] Failed to update backlog 
configuration for topic {}: conflicts with"
+                                                    + " retention quota",
+                                            clientAppId(), topicName);
+                                    return FutureUtil.failedFuture(new 
RestException(Status.PRECONDITION_FAILED,
+                                            "Backlog Quota exceeds configured 
retention quota for topic. "
+                                                    + "Please increase 
retention quota and retry"));
+                                }
+                                if (backlogQuota != null) {
+                                    
topicPolicies.getBackLogQuotaMap().put(finalBacklogQuotaType.name(), 
backlogQuota);
+                                } else {
+                                    
topicPolicies.getBackLogQuotaMap().remove(finalBacklogQuotaType.name());
+                                }
+                                Map<String, BacklogQuotaImpl> backLogQuotaMap 
= topicPolicies.getBackLogQuotaMap();
+                                topicPolicies.setIsGlobal(isGlobal);
+                                return pulsar().getTopicPoliciesService()
+                                        .updateTopicPoliciesAsync(topicName, 
topicPolicies)
+                                        .thenRun(() -> {
+                                            try {
+                                                log.info(
+                                                        "[{}] Successfully 
updated backlog quota map: namespace={}, "
+                                                                + "topic={}, 
map={}",
+                                                        clientAppId(),
+                                                        namespaceName,
+                                                        
topicName.getLocalName(),
+                                                        
jsonMapper().writeValueAsString(backLogQuotaMap));
+                                            } catch (JsonProcessingException 
ignore) {
+                                            }
+                                        });
+                            });
                 });
-            });
     }
 
     protected CompletableFuture<Void> 
internalSetReplicationClusters(List<String> clusterIds) {
@@ -2994,18 +2999,14 @@ public class PersistentTopicsBase extends AdminResource 
{
             });
     }
 
-    private RetentionPolicies getRetentionPolicies(TopicName topicName, 
TopicPolicies topicPolicies) {
+    private CompletableFuture<RetentionPolicies> 
getRetentionPoliciesAsync(TopicName topicName,
+                                                                           
TopicPolicies topicPolicies) {
         RetentionPolicies retentionPolicies = 
topicPolicies.getRetentionPolicies();
-        if (retentionPolicies == null){
-            try {
-                retentionPolicies = 
getNamespacePoliciesAsync(topicName.getNamespaceObject())
-                        .thenApply(policies -> policies.retention_policies)
-                        .get(1L, TimeUnit.SECONDS);
-            } catch (Exception e) {
-               throw new RestException(e);
-            }
+        if (retentionPolicies != null) {
+            return CompletableFuture.completedFuture(retentionPolicies);
         }
-        return retentionPolicies;
+        return getNamespacePoliciesAsync(topicName.getNamespaceObject())
+                .thenApply(policies -> policies.retention_policies);
     }
 
     protected CompletableFuture<RetentionPolicies> 
internalGetRetention(boolean applied, boolean isGlobal) {

Reply via email to