This is an automated email from the ASF dual-hosted git repository.
penghui pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new b91d0c4 [Broker] Make PersistentTopicsBase#internalSetBacklogQuota
async (#14051)
b91d0c4 is described below
commit b91d0c45abdc40980830ec43718d5be4771d2603
Author: Zike Yang <[email protected]>
AuthorDate: Tue Feb 8 23:45:31 2022 +0800
[Broker] Make PersistentTopicsBase#internalSetBacklogQuota async (#14051)
---
.../broker/admin/impl/PersistentTopicsBase.java | 89 +++++++++++-----------
1 file changed, 45 insertions(+), 44 deletions(-)
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
index 9b421e0..aef1a55 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
@@ -2859,44 +2859,49 @@ public class PersistentTopicsBase extends AdminResource
{
}
protected CompletableFuture<Void>
internalSetBacklogQuota(BacklogQuota.BacklogQuotaType backlogQuotaType,
- BacklogQuotaImpl backlogQuota,
boolean isGlobal) {
- validateTopicPolicyOperation(topicName, PolicyName.BACKLOG,
PolicyOperation.WRITE);
- validatePoliciesReadOnlyAccess();
-
+ BacklogQuotaImpl
backlogQuota, boolean isGlobal) {
BacklogQuota.BacklogQuotaType finalBacklogQuotaType = backlogQuotaType
== null
? BacklogQuota.BacklogQuotaType.destination_storage :
backlogQuotaType;
- return getTopicPoliciesAsyncWithRetry(topicName, isGlobal)
- .thenCompose(op -> {
- TopicPolicies topicPolicies = op.orElseGet(TopicPolicies::new);
- RetentionPolicies retentionPolicies =
getRetentionPolicies(topicName, topicPolicies);
- if (!checkBacklogQuota(backlogQuota, retentionPolicies)) {
- log.warn(
- "[{}] Failed to update backlog configuration for
topic {}: conflicts with retention quota",
- clientAppId(), topicName);
- return FutureUtil.failedFuture(new
RestException(Status.PRECONDITION_FAILED,
- "Backlog Quota exceeds configured retention quota
for topic. "
- + "Please increase retention quota and
retry"));
- }
-
- if (backlogQuota != null) {
-
topicPolicies.getBackLogQuotaMap().put(finalBacklogQuotaType.name(),
backlogQuota);
- } else {
-
topicPolicies.getBackLogQuotaMap().remove(finalBacklogQuotaType.name());
- }
- Map<String, BacklogQuotaImpl> backLogQuotaMap =
topicPolicies.getBackLogQuotaMap();
- topicPolicies.setIsGlobal(isGlobal);
- return
pulsar().getTopicPoliciesService().updateTopicPoliciesAsync(topicName,
topicPolicies)
- .thenRun(() -> {
- try {
- log.info("[{}] Successfully updated backlog quota
map: namespace={}, topic={}, map={}",
- clientAppId(),
- namespaceName,
- topicName.getLocalName(),
-
jsonMapper().writeValueAsString(backLogQuotaMap));
- } catch (JsonProcessingException ignore) { }
+ return validateTopicPolicyOperationAsync(topicName,
PolicyName.BACKLOG, PolicyOperation.WRITE)
+ .thenAccept(__ -> validatePoliciesReadOnlyAccess())
+ .thenCompose(__ -> getTopicPoliciesAsyncWithRetry(topicName,
isGlobal))
+ .thenCompose(op -> {
+ TopicPolicies topicPolicies =
op.orElseGet(TopicPolicies::new);
+ return getRetentionPoliciesAsync(topicName, topicPolicies)
+ .thenCompose(retentionPolicies -> {
+ if (!checkBacklogQuota(backlogQuota,
retentionPolicies)) {
+ log.warn(
+ "[{}] Failed to update backlog
configuration for topic {}: conflicts with"
+ + " retention quota",
+ clientAppId(), topicName);
+ return FutureUtil.failedFuture(new
RestException(Status.PRECONDITION_FAILED,
+ "Backlog Quota exceeds configured
retention quota for topic. "
+ + "Please increase
retention quota and retry"));
+ }
+ if (backlogQuota != null) {
+
topicPolicies.getBackLogQuotaMap().put(finalBacklogQuotaType.name(),
backlogQuota);
+ } else {
+
topicPolicies.getBackLogQuotaMap().remove(finalBacklogQuotaType.name());
+ }
+ Map<String, BacklogQuotaImpl> backLogQuotaMap
= topicPolicies.getBackLogQuotaMap();
+ topicPolicies.setIsGlobal(isGlobal);
+ return pulsar().getTopicPoliciesService()
+ .updateTopicPoliciesAsync(topicName,
topicPolicies)
+ .thenRun(() -> {
+ try {
+ log.info(
+ "[{}] Successfully
updated backlog quota map: namespace={}, "
+ + "topic={},
map={}",
+ clientAppId(),
+ namespaceName,
+
topicName.getLocalName(),
+
jsonMapper().writeValueAsString(backLogQuotaMap));
+ } catch (JsonProcessingException
ignore) {
+ }
+ });
+ });
});
- });
}
protected CompletableFuture<Void>
internalSetReplicationClusters(List<String> clusterIds) {
@@ -2994,18 +2999,14 @@ public class PersistentTopicsBase extends AdminResource
{
});
}
- private RetentionPolicies getRetentionPolicies(TopicName topicName,
TopicPolicies topicPolicies) {
+ private CompletableFuture<RetentionPolicies>
getRetentionPoliciesAsync(TopicName topicName,
+
TopicPolicies topicPolicies) {
RetentionPolicies retentionPolicies =
topicPolicies.getRetentionPolicies();
- if (retentionPolicies == null){
- try {
- retentionPolicies =
getNamespacePoliciesAsync(topicName.getNamespaceObject())
- .thenApply(policies -> policies.retention_policies)
- .get(1L, TimeUnit.SECONDS);
- } catch (Exception e) {
- throw new RestException(e);
- }
+ if (retentionPolicies != null) {
+ return CompletableFuture.completedFuture(retentionPolicies);
}
- return retentionPolicies;
+ return getNamespacePoliciesAsync(topicName.getNamespaceObject())
+ .thenApply(policies -> policies.retention_policies);
}
protected CompletableFuture<RetentionPolicies>
internalGetRetention(boolean applied, boolean isGlobal) {