This is an automated email from the ASF dual-hosted git repository. chenhang pushed a commit to branch branch-2.8 in repository https://gitbox.apache.org/repos/asf/pulsar.git
commit 353c72da2f68890fd33c22bbb0a7c152287a5ec6 Author: Marvin Cai <[email protected]> AuthorDate: Thu Aug 5 20:41:18 2021 -0700 Fix time based backlog quota. (#11509) Fixes #11404 ### Motivation Time based backlog quota type message_age is set separately but when check backlog we are only checking against destination_storage type. So fix to loop through all BacklogQuotaType when checking if backlog exceeded. ### Modification * Added unit test. * Added default implementation to make Admin Topic/Namespace backlog quota related API backward compatible. (cherry picked from commit e82df7cb34dee301407d03d14f08368e63b792b5) --- .../apache/pulsar/broker/admin/AdminResource.java | 6 +- .../broker/admin/impl/PersistentTopicsBase.java | 43 ++-- .../pulsar/broker/service/BacklogQuotaManager.java | 23 ++- .../apache/pulsar/broker/service/ServerCnx.java | 36 ++-- .../org/apache/pulsar/broker/service/Topic.java | 4 +- .../service/nonpersistent/NonPersistentTopic.java | 4 +- .../broker/service/persistent/PersistentTopic.java | 18 +- .../stats/prometheus/NamespaceStatsAggregator.java | 7 +- .../org/apache/pulsar/broker/ConfigHelper.java | 20 +- .../broker/admin/TopicPoliciesDisableTest.java | 4 +- .../pulsar/broker/admin/TopicPoliciesTest.java | 230 +++++++++++++++++++-- .../broker/service/BacklogQuotaManagerTest.java | 18 +- .../org/apache/pulsar/client/admin/Namespaces.java | 28 ++- .../org/apache/pulsar/client/admin/Topics.java | 19 +- .../policies/data/impl/BacklogQuotaImpl.java | 5 +- .../client/admin/internal/NamespacesImpl.java | 22 +- .../pulsar/client/admin/internal/TopicsImpl.java | 10 +- .../pulsar/admin/cli/PulsarAdminToolTest.java | 39 +++- .../org/apache/pulsar/admin/cli/CmdNamespaces.java | 11 +- .../org/apache/pulsar/admin/cli/CmdTopics.java | 15 +- site2/docs/cookbooks-retention-expiry.md | 12 +- site2/docs/reference-pulsar-admin.md | 38 ++++ 22 files changed, 472 insertions(+), 140 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/AdminResource.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/AdminResource.java index 568d569..7a3c267 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/AdminResource.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/AdminResource.java @@ -358,8 +358,10 @@ public abstract class AdminResource extends PulsarWebResource { } - protected BacklogQuota namespaceBacklogQuota(String namespace, String namespacePath) { - return pulsar().getBrokerService().getBacklogQuotaManager().getBacklogQuota(namespace, namespacePath); + protected BacklogQuota namespaceBacklogQuota(String namespace, String namespacePath, + BacklogQuota.BacklogQuotaType backlogQuotaType) { + return pulsar().getBrokerService().getBacklogQuotaManager() + .getBacklogQuota(namespace, namespacePath, backlogQuotaType); } protected CompletableFuture<Optional<TopicPolicies>> getTopicPoliciesAsyncWithRetry(TopicName topicName) { diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java index 7540206..9bbf018 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java @@ -2625,14 +2625,17 @@ public class PersistentTopicsBase extends AdminResource { quotaMap = getNamespacePolicies(namespaceName).backlog_quota_map; if (quotaMap.isEmpty()) { String namespace = namespaceName.toString(); - quotaMap.put( - BacklogQuota.BacklogQuotaType.destination_storage, - namespaceBacklogQuota(namespace, AdminResource.path(POLICIES, namespace)) - ); + for (BacklogQuota.BacklogQuotaType backlogQuotaType : BacklogQuota.BacklogQuotaType.values()) { + quotaMap.put( + backlogQuotaType, + namespaceBacklogQuota(namespace, + AdminResource.path(POLICIES, namespace), backlogQuotaType) + ); + } } } return quotaMap; - }); + }); } protected CompletableFuture<Void> internalSetBacklogQuota(BacklogQuota.BacklogQuotaType backlogQuotaType, @@ -2748,21 +2751,21 @@ public class PersistentTopicsBase extends AdminResource { return getTopicPoliciesAsyncWithRetry(topicName) .thenCompose(op -> { TopicPolicies topicPolicies = op.orElseGet(TopicPolicies::new); - BacklogQuota backlogQuota = - topicPolicies.getBackLogQuotaMap() - .get(BacklogQuota.BacklogQuotaType.destination_storage.name()); - if (backlogQuota == null) { - Policies policies = getNamespacePolicies(topicName.getNamespaceObject()); - backlogQuota = policies.backlog_quota_map.get(BacklogQuota.BacklogQuotaType.destination_storage); - } - if (!checkBacklogQuota(backlogQuota, retention)) { - log.warn( - "[{}] Failed to update retention quota configuration for topic {}: " - + "conflicts with retention quota", - clientAppId(), topicName); - return FutureUtil.failedFuture(new RestException(Status.PRECONDITION_FAILED, - "Retention Quota must exceed configured backlog quota for topic. " - + "Please increase retention quota and retry")); + for (BacklogQuota.BacklogQuotaType backlogQuotaType : BacklogQuota.BacklogQuotaType.values()) { + BacklogQuota backlogQuota = topicPolicies.getBackLogQuotaMap().get(backlogQuotaType.name()); + if (backlogQuota == null) { + Policies policies = getNamespacePolicies(topicName.getNamespaceObject()); + backlogQuota = policies.backlog_quota_map.get(backlogQuotaType); + } + if (!checkBacklogQuota(backlogQuota, retention)) { + log.warn( + "[{}] Failed to update retention quota configuration for topic {}: " + + "conflicts with retention quota", + clientAppId(), topicName); + return FutureUtil.failedFuture(new RestException(Status.PRECONDITION_FAILED, + "Retention Quota must exceed configured backlog quota for topic. " + + "Please increase retention quota and retry")); + } } topicPolicies.setRetentionPolicies(retention); return pulsar().getTopicPoliciesService().updateTopicPoliciesAsync(topicName, topicPolicies); diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BacklogQuotaManager.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BacklogQuotaManager.java index fa0d24b..bbb3ddf 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BacklogQuotaManager.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BacklogQuotaManager.java @@ -54,7 +54,8 @@ public class BacklogQuotaManager { public BacklogQuotaManager(PulsarService pulsar) { this.isTopicLevelPoliciesEnable = pulsar.getConfiguration().isTopicLevelPoliciesEnabled(); this.defaultQuota = BacklogQuotaImpl.builder() - .limitSize(pulsar.getConfiguration().getBacklogQuotaDefaultLimitGB() * 1024 * 1024 * 1024) + .limitSize(pulsar.getConfiguration().getBacklogQuotaDefaultLimitGB() + * BacklogQuotaImpl.BYTES_IN_GIGABYTE) .limitTime(pulsar.getConfiguration().getBacklogQuotaDefaultLimitSecond()) .retentionPolicy(pulsar.getConfiguration().getBacklogQuotaDefaultRetentionPolicy()) .build(); @@ -66,11 +67,11 @@ public class BacklogQuotaManager { return this.defaultQuota; } - public BacklogQuotaImpl getBacklogQuota(String namespace, String policyPath) { + public BacklogQuotaImpl getBacklogQuota(String namespace, String policyPath, BacklogQuotaType backlogQuotaType) { try { return zkCache.get(policyPath) .map(p -> (BacklogQuotaImpl) p.backlog_quota_map - .getOrDefault(BacklogQuotaType.destination_storage, defaultQuota)) + .getOrDefault(backlogQuotaType, defaultQuota)) .orElse(defaultQuota); } catch (Exception e) { log.warn("Failed to read policies data, will apply the default backlog quota: namespace={}", namespace, e); @@ -78,30 +79,30 @@ public class BacklogQuotaManager { } } - public BacklogQuotaImpl getBacklogQuota(TopicName topicName) { + public BacklogQuotaImpl getBacklogQuota(TopicName topicName, BacklogQuotaType backlogQuotaType) { String policyPath = AdminResource.path(POLICIES, topicName.getNamespace()); if (!isTopicLevelPoliciesEnable) { - return getBacklogQuota(topicName.getNamespace(), policyPath); + return getBacklogQuota(topicName.getNamespace(), policyPath, backlogQuotaType); } try { return Optional.ofNullable(pulsar.getTopicPoliciesService().getTopicPolicies(topicName)) .map(TopicPolicies::getBackLogQuotaMap) - .map(map -> map.get(BacklogQuotaType.destination_storage.name())) - .orElseGet(() -> getBacklogQuota(topicName.getNamespace(), policyPath)); + .map(map -> map.get(backlogQuotaType.name())) + .orElseGet(() -> getBacklogQuota(topicName.getNamespace(), policyPath, backlogQuotaType)); } catch (Exception e) { log.warn("Failed to read topic policies data, will apply the namespace backlog quota: topicName={}", topicName, e); } - return getBacklogQuota(topicName.getNamespace(), policyPath); + return getBacklogQuota(topicName.getNamespace(), policyPath, backlogQuotaType); } public long getBacklogQuotaLimitInSize(TopicName topicName) { - return getBacklogQuota(topicName).getLimitSize(); + return getBacklogQuota(topicName, BacklogQuotaType.destination_storage).getLimitSize(); } public int getBacklogQuotaLimitInTime(TopicName topicName) { - return getBacklogQuota(topicName).getLimitTime(); + return getBacklogQuota(topicName, BacklogQuotaType.message_age).getLimitTime(); } /** @@ -112,7 +113,7 @@ public class BacklogQuotaManager { public void handleExceededBacklogQuota(PersistentTopic persistentTopic, BacklogQuotaType backlogQuotaType, boolean preciseTimeBasedBacklogQuotaCheck) { TopicName topicName = TopicName.get(persistentTopic.getName()); - BacklogQuota quota = getBacklogQuota(topicName); + BacklogQuota quota = getBacklogQuota(topicName, backlogQuotaType); log.info("Backlog quota type {} exceeded for topic [{}]. Applying [{}] policy", backlogQuotaType, persistentTopic.getName(), quota.getPolicy()); switch (quota.getPolicy()) { diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java index ac50c53..511d5a2 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java @@ -1182,23 +1182,27 @@ public class ServerCnx extends PulsarHandler implements TransportCnx { service.getOrCreateTopic(topicName.toString()).thenAccept((Topic topic) -> { // Before creating producer, check if backlog quota exceeded - // on topic - if (topic.isBacklogQuotaExceeded(producerName)) { - IllegalStateException illegalStateException = new IllegalStateException( - "Cannot create producer on topic with backlog quota exceeded"); - BacklogQuota.RetentionPolicy retentionPolicy = topic.getBacklogQuota().getPolicy(); - if (retentionPolicy == BacklogQuota.RetentionPolicy.producer_request_hold) { - commandSender.sendErrorResponse(requestId, - ServerError.ProducerBlockedQuotaExceededError, - illegalStateException.getMessage()); - } else if (retentionPolicy == BacklogQuota.RetentionPolicy.producer_exception) { - commandSender.sendErrorResponse(requestId, - ServerError.ProducerBlockedQuotaExceededException, - illegalStateException.getMessage()); + // on topic for size based limit and time based limit + for (BacklogQuota.BacklogQuotaType backlogQuotaType : + BacklogQuota.BacklogQuotaType.values()) { + if (topic.isBacklogQuotaExceeded(producerName, backlogQuotaType)) { + IllegalStateException illegalStateException = new IllegalStateException( + "Cannot create producer on topic with backlog quota exceeded"); + BacklogQuota.RetentionPolicy retentionPolicy = topic + .getBacklogQuota(backlogQuotaType).getPolicy(); + if (retentionPolicy == BacklogQuota.RetentionPolicy.producer_request_hold) { + commandSender.sendErrorResponse(requestId, + ServerError.ProducerBlockedQuotaExceededError, + illegalStateException.getMessage()); + } else if (retentionPolicy == BacklogQuota.RetentionPolicy.producer_exception) { + commandSender.sendErrorResponse(requestId, + ServerError.ProducerBlockedQuotaExceededException, + illegalStateException.getMessage()); + } + producerFuture.completeExceptionally(illegalStateException); + producers.remove(producerId, producerFuture); + return; } - producerFuture.completeExceptionally(illegalStateException); - producers.remove(producerId, producerFuture); - return; } // Check whether the producer will publish encrypted messages or not diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Topic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Topic.java index d1e4506..eaed76c 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Topic.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Topic.java @@ -176,7 +176,7 @@ public interface Topic { CompletableFuture<Void> onPoliciesUpdate(Policies data); - boolean isBacklogQuotaExceeded(String producerName); + boolean isBacklogQuotaExceeded(String producerName, BacklogQuota.BacklogQuotaType backlogQuotaType); boolean isEncryptionRequired(); @@ -184,7 +184,7 @@ public interface Topic { boolean isReplicated(); - BacklogQuota getBacklogQuota(); + BacklogQuota getBacklogQuota(BacklogQuota.BacklogQuotaType backlogQuotaType); void updateRates(NamespaceStats nsStats, NamespaceBundleStats currentBundleStats, StatsOutputStream topicStatsStream, ClusterReplicationMetrics clusterReplicationMetrics, diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java index 5224eaa..37680d4 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java @@ -965,7 +965,7 @@ public class NonPersistentTopic extends AbstractTopic implements Topic { * @return Backlog quota for topic */ @Override - public BacklogQuota getBacklogQuota() { + public BacklogQuota getBacklogQuota(BacklogQuota.BacklogQuotaType backlogQuotaType) { // No-op throw new UnsupportedOperationException("getBacklogQuota method is not supported on non-persistent topic"); } @@ -975,7 +975,7 @@ public class NonPersistentTopic extends AbstractTopic implements Topic { * @return quota exceeded status for blocking producer creation */ @Override - public boolean isBacklogQuotaExceeded(String producerName) { + public boolean isBacklogQuotaExceeded(String producerName, BacklogQuota.BacklogQuotaType backlogQuotaType) { // No-op return false; } diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java index 2f9a91d..73c28f7 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java @@ -2369,9 +2369,9 @@ public class PersistentTopic extends AbstractTopic * @return Backlog quota for topic */ @Override - public BacklogQuota getBacklogQuota() { + public BacklogQuota getBacklogQuota(BacklogQuota.BacklogQuotaType backlogQuotaType) { TopicName topicName = TopicName.get(this.getName()); - return brokerService.getBacklogQuotaManager().getBacklogQuota(topicName); + return brokerService.getBacklogQuotaManager().getBacklogQuota(topicName, backlogQuotaType); } /** @@ -2379,17 +2379,19 @@ public class PersistentTopic extends AbstractTopic * @return quota exceeded status for blocking producer creation */ @Override - public boolean isBacklogQuotaExceeded(String producerName) { - BacklogQuota backlogQuota = getBacklogQuota(); + public boolean isBacklogQuotaExceeded(String producerName, BacklogQuota.BacklogQuotaType backlogQuotaType) { + BacklogQuota backlogQuota = getBacklogQuota(backlogQuotaType); if (backlogQuota != null) { BacklogQuota.RetentionPolicy retentionPolicy = backlogQuota.getPolicy(); if ((retentionPolicy == BacklogQuota.RetentionPolicy.producer_request_hold - || retentionPolicy == BacklogQuota.RetentionPolicy.producer_exception) - && (isSizeBacklogExceeded() || isTimeBacklogExceeded())) { - log.info("[{}] Backlog quota exceeded. Cannot create producer [{}]", this.getName(), producerName); - return true; + || retentionPolicy == BacklogQuota.RetentionPolicy.producer_exception)) { + if (backlogQuotaType == BacklogQuota.BacklogQuotaType.destination_storage && isSizeBacklogExceeded() + || backlogQuotaType == BacklogQuota.BacklogQuotaType.message_age && isTimeBacklogExceeded()){ + log.info("[{}] Backlog quota exceeded. Cannot create producer [{}]", this.getName(), producerName); + return true; + } } else { return false; } diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/NamespaceStatsAggregator.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/NamespaceStatsAggregator.java index c08641d..e041a01 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/NamespaceStatsAggregator.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/NamespaceStatsAggregator.java @@ -25,6 +25,7 @@ import org.apache.bookkeeper.mledger.impl.ManagedLedgerMBeanImpl; import org.apache.pulsar.broker.PulsarService; import org.apache.pulsar.broker.service.Topic; import org.apache.pulsar.broker.service.persistent.PersistentTopic; +import org.apache.pulsar.common.policies.data.BacklogQuota; import org.apache.pulsar.common.policies.data.stats.ConsumerStatsImpl; import org.apache.pulsar.common.policies.data.stats.ReplicatorStatsImpl; import org.apache.pulsar.common.policies.data.stats.TopicStatsImpl; @@ -99,8 +100,10 @@ public class NamespaceStatsAggregator { stats.managedLedgerStats.storageLogicalSize = mlStats.getStoredMessagesLogicalSize(); stats.managedLedgerStats.backlogSize = ml.getEstimatedBacklogSize(); stats.managedLedgerStats.offloadedStorageUsed = ml.getOffloadedSize(); - stats.backlogQuotaLimit = topic.getBacklogQuota().getLimitSize(); - stats.backlogQuotaLimitTime = topic.getBacklogQuota().getLimitTime(); + stats.backlogQuotaLimit = topic + .getBacklogQuota(BacklogQuota.BacklogQuotaType.destination_storage).getLimitSize(); + stats.backlogQuotaLimitTime = topic + .getBacklogQuota(BacklogQuota.BacklogQuotaType.message_age).getLimitTime(); stats.managedLedgerStats.storageWriteLatencyBuckets .addAll(mlStats.getInternalAddEntryLatencyBuckets()); diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/ConfigHelper.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/ConfigHelper.java index 86cf2aa..b929636 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/ConfigHelper.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/ConfigHelper.java @@ -18,11 +18,12 @@ */ package org.apache.pulsar.broker; +import com.google.common.collect.ImmutableMap; import org.apache.pulsar.common.policies.data.BacklogQuota; import org.apache.pulsar.common.policies.data.DispatchRate; import org.apache.pulsar.common.policies.data.SubscribeRate; +import org.apache.pulsar.common.policies.data.impl.BacklogQuotaImpl; -import java.util.Collections; import java.util.Map; public class ConfigHelper { @@ -30,13 +31,22 @@ public class ConfigHelper { public static Map<BacklogQuota.BacklogQuotaType, BacklogQuota> backlogQuotaMap(ServiceConfiguration configuration) { - return Collections.singletonMap(BacklogQuota.BacklogQuotaType.destination_storage, - backlogQuota(configuration)); + return ImmutableMap.of(BacklogQuota.BacklogQuotaType.destination_storage, + sizeBacklogQuota(configuration), + BacklogQuota.BacklogQuotaType.message_age, + timeBacklogQuota(configuration)); } - public static BacklogQuota backlogQuota(ServiceConfiguration configuration) { + public static BacklogQuota sizeBacklogQuota(ServiceConfiguration configuration) { return BacklogQuota.builder() - .limitSize(configuration.getBacklogQuotaDefaultLimitGB() * 1024 * 1024 * 1024) + .limitSize(configuration.getBacklogQuotaDefaultLimitGB() * BacklogQuotaImpl.BYTES_IN_GIGABYTE) + .retentionPolicy(configuration.getBacklogQuotaDefaultRetentionPolicy()) + .build(); + } + + public static BacklogQuota timeBacklogQuota(ServiceConfiguration configuration) { + return BacklogQuota.builder() + .limitTime(configuration.getBacklogQuotaDefaultLimitSecond()) .retentionPolicy(configuration.getBacklogQuotaDefaultRetentionPolicy()) .build(); } diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TopicPoliciesDisableTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TopicPoliciesDisableTest.java index 5e9967b..e9ec190 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TopicPoliciesDisableTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TopicPoliciesDisableTest.java @@ -78,14 +78,14 @@ public class TopicPoliciesDisableTest extends MockedPulsarServiceBaseTest { log.info("Backlog quota: {} will set to the topic: {}", backlogQuota, testTopic); try { - admin.topics().setBacklogQuota(testTopic, backlogQuota); + admin.topics().setBacklogQuota(testTopic, backlogQuota, BacklogQuota.BacklogQuotaType.destination_storage); Assert.fail(); } catch (PulsarAdminException e) { Assert.assertEquals(e.getStatusCode(), HttpStatus.METHOD_NOT_ALLOWED_405); } try { - admin.topics().removeBacklogQuota(testTopic); + admin.topics().removeBacklogQuota(testTopic, BacklogQuota.BacklogQuotaType.destination_storage); Assert.fail(); } catch (PulsarAdminException e) { Assert.assertEquals(e.getStatusCode(), HttpStatus.METHOD_NOT_ALLOWED_405); diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TopicPoliciesTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TopicPoliciesTest.java index bfb7715..b2a6e83 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TopicPoliciesTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TopicPoliciesTest.java @@ -122,7 +122,7 @@ public class TopicPoliciesTest extends MockedPulsarServiceBaseTest { } @Test - public void testSetBacklogQuota() throws Exception { + public void testSetSizeBasedBacklogQuota() throws Exception { BacklogQuota backlogQuota = BacklogQuota.builder() .limitSize(1024) @@ -130,7 +130,7 @@ public class TopicPoliciesTest extends MockedPulsarServiceBaseTest { .build(); log.info("Backlog quota: {} will set to the topic: {}", backlogQuota, testTopic); - admin.topics().setBacklogQuota(testTopic, backlogQuota); + admin.topics().setBacklogQuota(testTopic, backlogQuota, BacklogQuota.BacklogQuotaType.destination_storage); log.info("Backlog quota set success on topic: {}", testTopic); Awaitility.await() @@ -138,7 +138,8 @@ public class TopicPoliciesTest extends MockedPulsarServiceBaseTest { .get(BacklogQuota.BacklogQuotaType.destination_storage), backlogQuota)); BacklogQuotaManager backlogQuotaManager = pulsar.getBrokerService().getBacklogQuotaManager(); - BacklogQuota backlogQuotaInManager = backlogQuotaManager.getBacklogQuota(TopicName.get(testTopic)); + BacklogQuota backlogQuotaInManager = backlogQuotaManager + .getBacklogQuota(TopicName.get(testTopic), BacklogQuota.BacklogQuotaType.destination_storage); log.info("Backlog quota {} in backlog quota manager on topic: {}", backlogQuotaInManager, testTopic); Assert.assertEquals(backlogQuota, backlogQuotaInManager); @@ -146,14 +147,37 @@ public class TopicPoliciesTest extends MockedPulsarServiceBaseTest { } @Test - public void testRemoveBacklogQuota() throws Exception { + public void testSetTimeBasedBacklogQuota() throws Exception { + + BacklogQuota backlogQuota = BacklogQuota.builder() + .limitTime(1000) + .retentionPolicy(BacklogQuota.RetentionPolicy.consumer_backlog_eviction) + .build(); + + admin.topics().setBacklogQuota(testTopic, backlogQuota, BacklogQuota.BacklogQuotaType.message_age); + + Awaitility.await() + .untilAsserted(() -> Assert.assertEquals(admin.topics().getBacklogQuotaMap(testTopic) + .get(BacklogQuota.BacklogQuotaType.message_age), backlogQuota)); + + BacklogQuotaManager backlogQuotaManager = pulsar.getBrokerService().getBacklogQuotaManager(); + BacklogQuota backlogQuotaInManager = backlogQuotaManager + .getBacklogQuota(TopicName.get(testTopic), BacklogQuota.BacklogQuotaType.message_age); + + Assert.assertEquals(backlogQuota, backlogQuotaInManager); + + admin.topics().deletePartitionedTopic(testTopic, true); + } + + @Test + public void testRemoveSizeBasedBacklogQuota() throws Exception { BacklogQuota backlogQuota = BacklogQuota.builder() .limitSize(1024) .retentionPolicy(BacklogQuota.RetentionPolicy.consumer_backlog_eviction) .build(); log.info("Backlog quota: {} will set to the topic: {}", backlogQuota, testTopic); - admin.topics().setBacklogQuota(testTopic, backlogQuota); + admin.topics().setBacklogQuota(testTopic, backlogQuota, BacklogQuota.BacklogQuotaType.destination_storage); log.info("Backlog quota set success on topic: {}", testTopic); Awaitility.await() @@ -161,16 +185,18 @@ public class TopicPoliciesTest extends MockedPulsarServiceBaseTest { .get(BacklogQuota.BacklogQuotaType.destination_storage), backlogQuota)); BacklogQuotaManager backlogQuotaManager = pulsar.getBrokerService().getBacklogQuotaManager(); - BacklogQuota backlogQuotaInManager = backlogQuotaManager.getBacklogQuota(TopicName.get(testTopic)); + BacklogQuota backlogQuotaInManager = backlogQuotaManager + .getBacklogQuota(TopicName.get(testTopic), BacklogQuota.BacklogQuotaType.destination_storage); log.info("Backlog quota {} in backlog quota manager on topic: {}", backlogQuotaInManager, testTopic); Assert.assertEquals(backlogQuota, backlogQuotaInManager); - admin.topics().removeBacklogQuota(testTopic); + admin.topics().removeBacklogQuota(testTopic, BacklogQuota.BacklogQuotaType.destination_storage); Awaitility.await() .untilAsserted(() -> Assert.assertNull(admin.topics().getBacklogQuotaMap(testTopic) .get(BacklogQuota.BacklogQuotaType.destination_storage))); - backlogQuotaInManager = backlogQuotaManager.getBacklogQuota(TopicName.get(testTopic)); + backlogQuotaInManager = backlogQuotaManager + .getBacklogQuota(TopicName.get(testTopic), BacklogQuota.BacklogQuotaType.destination_storage); log.info("Backlog quota {} in backlog quota manager on topic: {} after remove", backlogQuotaInManager, testTopic); Assert.assertEquals(backlogQuotaManager.getDefaultQuota(), backlogQuotaInManager); @@ -179,7 +205,37 @@ public class TopicPoliciesTest extends MockedPulsarServiceBaseTest { } @Test - public void testCheckBacklogQuota() throws Exception { + public void testRemoveTimeBasedBacklogQuota() throws Exception { + BacklogQuota backlogQuota = BacklogQuota.builder() + .limitTime(1000) + .retentionPolicy(BacklogQuota.RetentionPolicy.consumer_backlog_eviction) + .build(); + + admin.topics().setBacklogQuota(testTopic, backlogQuota, BacklogQuota.BacklogQuotaType.message_age); + + Awaitility.await() + .untilAsserted(() -> Assert.assertEquals(admin.topics().getBacklogQuotaMap(testTopic) + .get(BacklogQuota.BacklogQuotaType.message_age), backlogQuota)); + + BacklogQuotaManager backlogQuotaManager = pulsar.getBrokerService().getBacklogQuotaManager(); + BacklogQuota backlogQuotaInManager = backlogQuotaManager + .getBacklogQuota(TopicName.get(testTopic), BacklogQuota.BacklogQuotaType.message_age); + Assert.assertEquals(backlogQuota, backlogQuotaInManager); + + admin.topics().removeBacklogQuota(testTopic, BacklogQuota.BacklogQuotaType.message_age); + Awaitility.await() + .untilAsserted(() -> Assert.assertNull(admin.topics().getBacklogQuotaMap(testTopic) + .get(BacklogQuota.BacklogQuotaType.message_age))); + + backlogQuotaInManager = backlogQuotaManager + .getBacklogQuota(TopicName.get(testTopic), BacklogQuota.BacklogQuotaType.message_age); + Assert.assertEquals(backlogQuotaManager.getDefaultQuota(), backlogQuotaInManager); + + admin.topics().deletePartitionedTopic(testTopic, true); + } + + @Test + public void testCheckSizeBasedBacklogQuota() throws Exception { RetentionPolicies retentionPolicies = new RetentionPolicies(10, 10); String namespace = TopicName.get(testTopic).getNamespace(); admin.namespaces().setRetention(namespace, retentionPolicies); @@ -193,7 +249,7 @@ public class TopicPoliciesTest extends MockedPulsarServiceBaseTest { .build(); log.info("Backlog quota: {} will set to the topic: {}", backlogQuota, testTopic); try { - admin.topics().setBacklogQuota(testTopic, backlogQuota); + admin.topics().setBacklogQuota(testTopic, backlogQuota, BacklogQuota.BacklogQuotaType.destination_storage); Assert.fail(); } catch (PulsarAdminException e) { Assert.assertEquals(e.getStatusCode(), 412); @@ -205,7 +261,7 @@ public class TopicPoliciesTest extends MockedPulsarServiceBaseTest { .build(); log.info("Backlog quota: {} will set to the topic: {}", backlogQuota, testTopic); try { - admin.topics().setBacklogQuota(testTopic, backlogQuota); + admin.topics().setBacklogQuota(testTopic, backlogQuota, BacklogQuota.BacklogQuotaType.destination_storage); Assert.fail(); } catch (PulsarAdminException e) { Assert.assertEquals(e.getStatusCode(), 412); @@ -216,7 +272,7 @@ public class TopicPoliciesTest extends MockedPulsarServiceBaseTest { .retentionPolicy(BacklogQuota.RetentionPolicy.consumer_backlog_eviction) .build(); log.info("Backlog quota: {} will set to the topic: {}", backlogQuota, testTopic); - admin.topics().setBacklogQuota(testTopic, backlogQuota); + admin.topics().setBacklogQuota(testTopic, backlogQuota, BacklogQuota.BacklogQuotaType.destination_storage); BacklogQuota finalBacklogQuota = backlogQuota; Awaitility.await() @@ -226,8 +282,56 @@ public class TopicPoliciesTest extends MockedPulsarServiceBaseTest { admin.topics().deletePartitionedTopic(testTopic, true); } + @Test + public void testCheckTimeBasedBacklogQuota() throws Exception { + RetentionPolicies retentionPolicies = new RetentionPolicies(10, 10); + String namespace = TopicName.get(testTopic).getNamespace(); + admin.namespaces().setRetention(namespace, retentionPolicies); + + Awaitility.await() + .untilAsserted(() -> Assert.assertEquals(admin.namespaces().getRetention(namespace), retentionPolicies)); + + BacklogQuota backlogQuota = BacklogQuota.builder() + .limitTime(10 * 60) + .retentionPolicy(BacklogQuota.RetentionPolicy.consumer_backlog_eviction) + .build(); + log.info("Backlog quota: {} will set to the topic: {}", backlogQuota, testTopic); + try { + admin.topics().setBacklogQuota(testTopic, backlogQuota, BacklogQuota.BacklogQuotaType.message_age); + Assert.fail(); + } catch (PulsarAdminException e) { + Assert.assertEquals(e.getStatusCode(), 412); + } + + backlogQuota = BacklogQuota.builder() + .limitTime(10 * 60 + 1) + .retentionPolicy(BacklogQuota.RetentionPolicy.consumer_backlog_eviction) + .build(); + log.info("Backlog quota: {} will set to the topic: {}", backlogQuota, testTopic); + try { + admin.topics().setBacklogQuota(testTopic, backlogQuota, BacklogQuota.BacklogQuotaType.message_age); + Assert.fail(); + } catch (PulsarAdminException e) { + Assert.assertEquals(e.getStatusCode(), 412); + } + + backlogQuota = BacklogQuota.builder() + .limitTime(10 * 60 - 1) + .retentionPolicy(BacklogQuota.RetentionPolicy.consumer_backlog_eviction) + .build(); + log.info("Backlog quota: {} will set to the topic: {}", backlogQuota, testTopic); + admin.topics().setBacklogQuota(testTopic, backlogQuota, BacklogQuota.BacklogQuotaType.message_age); + + BacklogQuota finalBacklogQuota = backlogQuota; + Awaitility.await() + .untilAsserted(() -> Assert.assertEquals(admin.topics().getBacklogQuotaMap(testTopic) + .get(BacklogQuota.BacklogQuotaType.message_age), finalBacklogQuota)); + + admin.topics().deletePartitionedTopic(testTopic, true); + } + @Test(timeOut = 20000) - public void testGetBacklogQuotaApplied() throws Exception { + public void testGetSizeBasedBacklogQuotaApplied() throws Exception { final String topic = testTopic + UUID.randomUUID(); pulsarClient.newProducer().topic(topic).create().close(); assertEquals(admin.topics().getBacklogQuotaMap(topic), Maps.newHashMap()); @@ -236,6 +340,7 @@ public class TopicPoliciesTest extends MockedPulsarServiceBaseTest { assertEquals(admin.topics().getBacklogQuotaMap(topic, true), brokerQuotaMap); BacklogQuota namespaceQuota = BacklogQuota.builder() .limitSize(30) + .limitTime(10) .retentionPolicy(BacklogQuota.RetentionPolicy.producer_exception) .build(); @@ -243,23 +348,67 @@ public class TopicPoliciesTest extends MockedPulsarServiceBaseTest { Awaitility.await().untilAsserted(() -> assertFalse(admin.namespaces().getBacklogQuotaMap(myNamespace).isEmpty())); Map<BacklogQuota.BacklogQuotaType, BacklogQuota> namespaceQuotaMap = Maps.newHashMap(); namespaceQuotaMap.put(BacklogQuota.BacklogQuotaType.destination_storage, namespaceQuota); + namespaceQuotaMap.put(BacklogQuota.BacklogQuotaType.message_age, BacklogQuota.builder() + .retentionPolicy(BacklogQuota.RetentionPolicy.producer_request_hold).build()); assertEquals(admin.topics().getBacklogQuotaMap(topic, true), namespaceQuotaMap); BacklogQuota topicQuota = BacklogQuota.builder() .limitSize(40) .retentionPolicy(BacklogQuota.RetentionPolicy.consumer_backlog_eviction) .build(); - admin.topics().setBacklogQuota(topic, topicQuota); + admin.topics().setBacklogQuota(topic, topicQuota, BacklogQuota.BacklogQuotaType.destination_storage); Awaitility.await().untilAsserted(() -> assertFalse(admin.topics().getBacklogQuotaMap(topic).isEmpty())); Map<BacklogQuota.BacklogQuotaType, BacklogQuota> topicQuotaMap = Maps.newHashMap(); topicQuotaMap.put(BacklogQuota.BacklogQuotaType.destination_storage, topicQuota); assertEquals(admin.topics().getBacklogQuotaMap(topic, true), topicQuotaMap); admin.namespaces().removeBacklogQuota(myNamespace); - admin.topics().removeBacklogQuota(topic); - Awaitility.await().untilAsserted(() -> assertTrue(admin.namespaces().getBacklogQuotaMap(myNamespace).isEmpty())); + admin.topics().removeBacklogQuota(topic, BacklogQuota.BacklogQuotaType.destination_storage); + Awaitility.await().untilAsserted(() -> assertTrue(admin.namespaces().getBacklogQuotaMap(myNamespace) + .get(BacklogQuota.BacklogQuotaType.destination_storage) == null)); Awaitility.await().untilAsserted(() -> assertTrue(admin.topics().getBacklogQuotaMap(topic).isEmpty())); + assertTrue(admin.topics().getBacklogQuotaMap(topic, true) + .get(BacklogQuota.BacklogQuotaType.destination_storage) == null); + } + + @Test(timeOut = 20000) + public void testGetTimeBasedBacklogQuotaApplied() throws Exception { + final String topic = testTopic + UUID.randomUUID(); + pulsarClient.newProducer().topic(topic).create().close(); + assertEquals(admin.topics().getBacklogQuotaMap(topic), Maps.newHashMap()); + assertEquals(admin.namespaces().getBacklogQuotaMap(myNamespace), Maps.newHashMap()); + Map<BacklogQuota.BacklogQuotaType, BacklogQuota> brokerQuotaMap = ConfigHelper.backlogQuotaMap(conf); assertEquals(admin.topics().getBacklogQuotaMap(topic, true), brokerQuotaMap); + BacklogQuota namespaceQuota = BacklogQuota.builder() + .limitTime(30) + .retentionPolicy(BacklogQuota.RetentionPolicy.producer_exception) + .build(); + + admin.namespaces().setBacklogQuota(myNamespace, namespaceQuota, BacklogQuota.BacklogQuotaType.message_age); + Awaitility.await().untilAsserted(() -> assertFalse(admin.namespaces().getBacklogQuotaMap(myNamespace).isEmpty())); + Map<BacklogQuota.BacklogQuotaType, BacklogQuota> namespaceQuotaMap = Maps.newHashMap(); + namespaceQuotaMap.put(BacklogQuota.BacklogQuotaType.message_age, namespaceQuota); + namespaceQuotaMap.put(BacklogQuota.BacklogQuotaType.destination_storage, BacklogQuota.builder() + .retentionPolicy(BacklogQuota.RetentionPolicy.producer_request_hold).build()); + assertEquals(admin.topics().getBacklogQuotaMap(topic, true), namespaceQuotaMap); + + BacklogQuota topicQuota = BacklogQuota.builder() + .limitTime(40) + .retentionPolicy(BacklogQuota.RetentionPolicy.consumer_backlog_eviction) + .build(); + admin.topics().setBacklogQuota(topic, topicQuota, BacklogQuota.BacklogQuotaType.message_age); + Awaitility.await().untilAsserted(() -> assertFalse(admin.topics().getBacklogQuotaMap(topic).isEmpty())); + Map<BacklogQuota.BacklogQuotaType, BacklogQuota> topicQuotaMap = Maps.newHashMap(); + topicQuotaMap.put(BacklogQuota.BacklogQuotaType.message_age, topicQuota); + assertEquals(admin.topics().getBacklogQuotaMap(topic, true), topicQuotaMap); + + admin.namespaces().removeBacklogQuota(myNamespace, BacklogQuota.BacklogQuotaType.message_age); + admin.topics().removeBacklogQuota(topic, BacklogQuota.BacklogQuotaType.message_age); + Awaitility.await().untilAsserted(() -> assertTrue(admin.namespaces().getBacklogQuotaMap(myNamespace) + .get(BacklogQuota.BacklogQuotaType.message_age) == null)); + Awaitility.await().untilAsserted(() -> assertTrue(admin.topics().getBacklogQuotaMap(topic).isEmpty())); + assertTrue(admin.topics().getBacklogQuotaMap(topic, true) + .get(BacklogQuota.BacklogQuotaType.message_age) == null); } @Test @@ -276,7 +425,7 @@ public class TopicPoliciesTest extends MockedPulsarServiceBaseTest { .retentionPolicy(BacklogQuota.RetentionPolicy.consumer_backlog_eviction) .build(); try { - admin.topics().setBacklogQuota(testTopic, backlogQuota); + admin.topics().setBacklogQuota(testTopic, backlogQuota, BacklogQuota.BacklogQuotaType.destination_storage); Assert.fail(); } catch (PulsarAdminException e) { Assert.assertEquals(e.getStatusCode(), 412); @@ -288,13 +437,13 @@ public class TopicPoliciesTest extends MockedPulsarServiceBaseTest { } @Test - public void testCheckRetention() throws Exception { + public void testCheckRetentionSizeBasedQuota() throws Exception { BacklogQuota backlogQuota = BacklogQuota.builder() .limitSize(10 * 1024 * 1024) .retentionPolicy(BacklogQuota.RetentionPolicy.consumer_backlog_eviction) .build(); - admin.topics().setBacklogQuota(testTopic, backlogQuota); + admin.topics().setBacklogQuota(testTopic, backlogQuota, BacklogQuota.BacklogQuotaType.destination_storage); Awaitility.await() .untilAsserted(() -> Assert.assertEquals(admin.topics().getBacklogQuotaMap(testTopic) .get(BacklogQuota.BacklogQuotaType.destination_storage), backlogQuota)); @@ -329,6 +478,47 @@ public class TopicPoliciesTest extends MockedPulsarServiceBaseTest { } @Test + public void testCheckRetentionTimeBasedQuota() throws Exception { + BacklogQuota backlogQuota = BacklogQuota.builder() + .limitTime(10 * 60) + .retentionPolicy(BacklogQuota.RetentionPolicy.consumer_backlog_eviction) + .build(); + + admin.topics().setBacklogQuota(testTopic, backlogQuota, BacklogQuota.BacklogQuotaType.message_age); + Awaitility.await() + .untilAsserted(() -> Assert.assertEquals(admin.topics().getBacklogQuotaMap(testTopic) + .get(BacklogQuota.BacklogQuotaType.message_age), backlogQuota)); + + RetentionPolicies retention = new RetentionPolicies(10, 10); + log.info("Retention: {} will set to the topic: {}", retention, testTopic); + try { + admin.topics().setRetention(testTopic, retention); + Assert.fail(); + } catch (PulsarAdminException e) { + Assert.assertEquals(e.getStatusCode(), 412); + } + + retention = new RetentionPolicies(9, 10); + log.info("Retention: {} will set to the topic: {}", retention, testTopic); + try { + admin.topics().setRetention(testTopic, retention); + Assert.fail(); + } catch (PulsarAdminException e) { + Assert.assertEquals(e.getStatusCode(), 412); + } + + retention = new RetentionPolicies(12, 10); + log.info("Backlog quota: {} will set to the topic: {}", backlogQuota, testTopic); + admin.topics().setRetention(testTopic, retention); + + RetentionPolicies finalRetention = retention; + Awaitility.await() + .untilAsserted(() -> Assert.assertEquals(admin.topics().getRetention(testTopic), finalRetention)); + + admin.topics().deletePartitionedTopic(testTopic, true); + } + + @Test public void testSetRetention() throws Exception { RetentionPolicies retention = new RetentionPolicies(60, 1024); log.info("Retention: {} will set to the topic: {}", retention, testTopic); @@ -2186,7 +2376,7 @@ public class TopicPoliciesTest extends MockedPulsarServiceBaseTest { .build(); log.info("Backlog quota: {} will set to the topic: {}", backlogQuota, testTopic); - admin.topics().setBacklogQuota(testTopic, backlogQuota); + admin.topics().setBacklogQuota(testTopic, backlogQuota, BacklogQuota.BacklogQuotaType.destination_storage); log.info("Backlog quota set success on topic: {}", testTopic); Awaitility.await() diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BacklogQuotaManagerTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BacklogQuotaManagerTest.java index 552ba5a..560cd9e 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BacklogQuotaManagerTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BacklogQuotaManagerTest.java @@ -398,10 +398,9 @@ public class BacklogQuotaManagerTest { Maps.newHashMap()); admin.namespaces().setBacklogQuota("prop/ns-quota", BacklogQuota.builder() - .limitSize(20 * 1024) .limitTime(TIME_TO_CHECK_BACKLOG_QUOTA) .retentionPolicy(BacklogQuota.RetentionPolicy.consumer_backlog_eviction) - .build()); + .build(), BacklogQuota.BacklogQuotaType.message_age); config.setPreciseTimeBasedBacklogQuotaCheck(true); PulsarClient client = PulsarClient.builder().serviceUrl(adminUrl.toString()).statsInterval(0, TimeUnit.SECONDS) .build(); @@ -441,10 +440,9 @@ public class BacklogQuotaManagerTest { Maps.newHashMap()); admin.namespaces().setBacklogQuota("prop/ns-quota", BacklogQuota.builder() - .limitSize(20 * 1024) .limitTime(TIME_TO_CHECK_BACKLOG_QUOTA) .retentionPolicy(BacklogQuota.RetentionPolicy.consumer_backlog_eviction) - .build()); + .build(), BacklogQuota.BacklogQuotaType.message_age); PulsarClient client = PulsarClient.builder().serviceUrl(adminUrl.toString()).statsInterval(0, TimeUnit.SECONDS) .build(); @@ -519,10 +517,9 @@ public class BacklogQuotaManagerTest { Maps.newHashMap()); admin.namespaces().setBacklogQuota("prop/ns-quota", BacklogQuota.builder() - .limitSize(10 * 1024) .limitTime(TIME_TO_CHECK_BACKLOG_QUOTA) .retentionPolicy(BacklogQuota.RetentionPolicy.consumer_backlog_eviction) - .build()); + .build(), BacklogQuota.BacklogQuotaType.message_age); config.setPreciseTimeBasedBacklogQuotaCheck(true); PulsarClient client = PulsarClient.builder().serviceUrl(adminUrl.toString()).build(); @@ -583,10 +580,9 @@ public class BacklogQuotaManagerTest { Maps.newHashMap()); admin.namespaces().setBacklogQuota("prop/ns-quota", BacklogQuota.builder() - .limitSize(20 * 1024) .limitTime(2 * TIME_TO_CHECK_BACKLOG_QUOTA) .retentionPolicy(BacklogQuota.RetentionPolicy.consumer_backlog_eviction) - .build()); + .build(), BacklogQuota.BacklogQuotaType.message_age); @Cleanup PulsarClient client = PulsarClient.builder().serviceUrl(adminUrl.toString()).build(); @@ -1090,10 +1086,9 @@ public class BacklogQuotaManagerTest { Maps.newHashMap()); admin.namespaces().setBacklogQuota("prop/quotahold", BacklogQuota.builder() - .limitSize(10 * 1024) .limitTime(TIME_TO_CHECK_BACKLOG_QUOTA) .retentionPolicy(BacklogQuota.RetentionPolicy.producer_exception) - .build()); + .build(), BacklogQuota.BacklogQuotaType.message_age); config.setPreciseTimeBasedBacklogQuotaCheck(true); final PulsarClient client = PulsarClient.builder().serviceUrl(adminUrl.toString()) .statsInterval(0, TimeUnit.SECONDS).build(); @@ -1157,10 +1152,9 @@ public class BacklogQuotaManagerTest { Maps.newHashMap()); admin.namespaces().setBacklogQuota("prop/quotahold", BacklogQuota.builder() - .limitSize(15 * 1024) .limitTime(TIME_TO_CHECK_BACKLOG_QUOTA) .retentionPolicy(BacklogQuota.RetentionPolicy.producer_exception) - .build()); + .build(), BacklogQuota.BacklogQuotaType.message_age); final PulsarClient client = PulsarClient.builder().serviceUrl(adminUrl.toString()) .statsInterval(0, TimeUnit.SECONDS).build(); final String topic1 = "persistent://prop/quotahold/exceptandunblock2"; diff --git a/pulsar-client-admin-api/src/main/java/org/apache/pulsar/client/admin/Namespaces.java b/pulsar-client-admin-api/src/main/java/org/apache/pulsar/client/admin/Namespaces.java index 6e66d3b..f6f8654 100644 --- a/pulsar-client-admin-api/src/main/java/org/apache/pulsar/client/admin/Namespaces.java +++ b/pulsar-client-admin-api/src/main/java/org/apache/pulsar/client/admin/Namespaces.java @@ -1529,7 +1529,12 @@ public interface Namespaces { * @throws PulsarAdminException * Unexpected error */ - void setBacklogQuota(String namespace, BacklogQuota backlogQuota) throws PulsarAdminException; + void setBacklogQuota(String namespace, BacklogQuota backlogQuota, BacklogQuota.BacklogQuotaType backlogQuotaType) + throws PulsarAdminException; + + default void setBacklogQuota(String namespace, BacklogQuota backlogQuota) throws PulsarAdminException { + setBacklogQuota(namespace, backlogQuota, BacklogQuota.BacklogQuotaType.destination_storage); + } /** * Set a backlog quota for all the topics on a namespace asynchronously. @@ -1554,7 +1559,12 @@ public interface Namespaces { * @param backlogQuota * the new BacklogQuota */ - CompletableFuture<Void> setBacklogQuotaAsync(String namespace, BacklogQuota backlogQuota); + CompletableFuture<Void> setBacklogQuotaAsync(String namespace, BacklogQuota backlogQuota, + BacklogQuota.BacklogQuotaType backlogQuotaType); + + default CompletableFuture<Void> setBacklogQuotaAsync(String namespace, BacklogQuota backlogQuota) { + return setBacklogQuotaAsync(namespace, backlogQuota, BacklogQuota.BacklogQuotaType.destination_storage); + } /** * Remove a backlog quota policy from a namespace. @@ -1573,7 +1583,12 @@ public interface Namespaces { * @throws PulsarAdminException * Unexpected error */ - void removeBacklogQuota(String namespace) throws PulsarAdminException; + void removeBacklogQuota(String namespace, BacklogQuota.BacklogQuotaType backlogQuotaType) + throws PulsarAdminException; + + default void removeBacklogQuota(String namespace) throws PulsarAdminException { + removeBacklogQuota(namespace, BacklogQuota.BacklogQuotaType.destination_storage); + } /** * Remove a backlog quota policy from a namespace asynchronously. @@ -1585,7 +1600,12 @@ public interface Namespaces { * @param namespace * Namespace name */ - CompletableFuture<Void> removeBacklogQuotaAsync(String namespace); + CompletableFuture<Void> removeBacklogQuotaAsync(String namespace, BacklogQuota.BacklogQuotaType backlogQuotaType); + + default CompletableFuture<Void> removeBacklogQuotaAsync(String namespace) { + return removeBacklogQuotaAsync(namespace, BacklogQuota.BacklogQuotaType.destination_storage); + } + /** * Remove the persistence configuration on a namespace. diff --git a/pulsar-client-admin-api/src/main/java/org/apache/pulsar/client/admin/Topics.java b/pulsar-client-admin-api/src/main/java/org/apache/pulsar/client/admin/Topics.java index 9f7dc5e..c48a44b 100644 --- a/pulsar-client-admin-api/src/main/java/org/apache/pulsar/client/admin/Topics.java +++ b/pulsar-client-admin-api/src/main/java/org/apache/pulsar/client/admin/Topics.java @@ -1584,7 +1584,8 @@ public interface Topics { * @throws PulsarAdminException * Unexpected error */ - Map<BacklogQuota.BacklogQuotaType, BacklogQuota> getBacklogQuotaMap(String topic) throws PulsarAdminException; + Map<BacklogQuota.BacklogQuotaType, BacklogQuota> getBacklogQuotaMap(String topic) + throws PulsarAdminException; /** * Get applied backlog quota map for a topic. @@ -1617,6 +1618,7 @@ public interface Topics { * Topic name * @param backlogQuota * the new BacklogQuota + * @param backlogQuotaType * * @throws NotAuthorizedException * Don't have admin permission @@ -1625,7 +1627,12 @@ public interface Topics { * @throws PulsarAdminException * Unexpected error */ - void setBacklogQuota(String topic, BacklogQuota backlogQuota) throws PulsarAdminException; + void setBacklogQuota(String topic, BacklogQuota backlogQuota, + BacklogQuota.BacklogQuotaType backlogQuotaType) throws PulsarAdminException; + + default void setBacklogQuota(String topic, BacklogQuota backlogQuota) throws PulsarAdminException { + setBacklogQuota(topic, backlogQuota, BacklogQuota.BacklogQuotaType.destination_storage); + } /** * Remove a backlog quota policy from a topic. @@ -1633,6 +1640,7 @@ public interface Topics { * * @param topic * Topic name + * @param backlogQuotaType * * @throws NotAuthorizedException * Don't have admin permission @@ -1641,7 +1649,12 @@ public interface Topics { * @throws PulsarAdminException * Unexpected error */ - void removeBacklogQuota(String topic) throws PulsarAdminException; + void removeBacklogQuota(String topic, BacklogQuota.BacklogQuotaType backlogQuotaType) throws PulsarAdminException; + + default void removeBacklogQuota(String topic) + throws PulsarAdminException { + removeBacklogQuota(topic, BacklogQuota.BacklogQuotaType.destination_storage); + } /** * Get the delayed delivery policy applied for a specified topic. diff --git a/pulsar-client-admin-api/src/main/java/org/apache/pulsar/common/policies/data/impl/BacklogQuotaImpl.java b/pulsar-client-admin-api/src/main/java/org/apache/pulsar/common/policies/data/impl/BacklogQuotaImpl.java index f60f4ca..79e0af2 100644 --- a/pulsar-client-admin-api/src/main/java/org/apache/pulsar/common/policies/data/impl/BacklogQuotaImpl.java +++ b/pulsar-client-admin-api/src/main/java/org/apache/pulsar/common/policies/data/impl/BacklogQuotaImpl.java @@ -27,6 +27,9 @@ import org.apache.pulsar.common.policies.data.BacklogQuota; @AllArgsConstructor @NoArgsConstructor public class BacklogQuotaImpl implements BacklogQuota { + public static final long BYTES_IN_GIGABYTE = 1024 * 1024 * 1024; + + // backlog quota by size in byte private long limitSize; // backlog quota by time in second private int limitTime; @@ -37,7 +40,7 @@ public class BacklogQuotaImpl implements BacklogQuota { } public static class BacklogQuotaImplBuilder implements BacklogQuota.Builder { - private long limitSize; + private long limitSize = -1 * BYTES_IN_GIGABYTE; private int limitTime = -1; private RetentionPolicy retentionPolicy; diff --git a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/NamespacesImpl.java b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/NamespacesImpl.java index eaecfbe..af30e93 100644 --- a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/NamespacesImpl.java +++ b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/NamespacesImpl.java @@ -1154,9 +1154,11 @@ public class NamespacesImpl extends BaseResource implements Namespaces { } @Override - public void setBacklogQuota(String namespace, BacklogQuota backlogQuota) throws PulsarAdminException { + public void setBacklogQuota(String namespace, BacklogQuota backlogQuota, + BacklogQuota.BacklogQuotaType backlogQuotaType) throws PulsarAdminException { try { - setBacklogQuotaAsync(namespace, backlogQuota).get(this.readTimeoutMs, TimeUnit.MILLISECONDS); + setBacklogQuotaAsync(namespace, backlogQuota, backlogQuotaType) + .get(this.readTimeoutMs, TimeUnit.MILLISECONDS); } catch (ExecutionException e) { throw (PulsarAdminException) e.getCause(); } catch (InterruptedException e) { @@ -1168,16 +1170,19 @@ public class NamespacesImpl extends BaseResource implements Namespaces { } @Override - public CompletableFuture<Void> setBacklogQuotaAsync(String namespace, BacklogQuota backlogQuota) { + public CompletableFuture<Void> setBacklogQuotaAsync(String namespace, BacklogQuota backlogQuota, + BacklogQuota.BacklogQuotaType backlogQuotaType) { NamespaceName ns = NamespaceName.get(namespace); WebTarget path = namespacePath(ns, "backlogQuota"); - return asyncPostRequest(path, Entity.entity(backlogQuota, MediaType.APPLICATION_JSON)); + return asyncPostRequest(path.queryParam("backlogQuotaType", backlogQuotaType.toString()), + Entity.entity(backlogQuota, MediaType.APPLICATION_JSON)); } @Override - public void removeBacklogQuota(String namespace) throws PulsarAdminException { + public void removeBacklogQuota(String namespace, BacklogQuota.BacklogQuotaType backlogQuotaType) + throws PulsarAdminException { try { - removeBacklogQuotaAsync(namespace). + removeBacklogQuotaAsync(namespace, backlogQuotaType). get(this.readTimeoutMs, TimeUnit.MILLISECONDS); } catch (ExecutionException e) { throw (PulsarAdminException) e.getCause(); @@ -1212,10 +1217,11 @@ public class NamespacesImpl extends BaseResource implements Namespaces { } @Override - public CompletableFuture<Void> removeBacklogQuotaAsync(String namespace) { + public CompletableFuture<Void> removeBacklogQuotaAsync(String namespace, + BacklogQuota.BacklogQuotaType backlogQuotaType) { NamespaceName ns = NamespaceName.get(namespace); WebTarget path = namespacePath(ns, "backlogQuota") - .queryParam("backlogQuotaType", BacklogQuotaType.destination_storage.toString()); + .queryParam("backlogQuotaType", backlogQuotaType.toString()); return asyncDeleteRequest(path); } diff --git a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/TopicsImpl.java b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/TopicsImpl.java index ff5a8f3..529bb23 100644 --- a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/TopicsImpl.java +++ b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/TopicsImpl.java @@ -1811,22 +1811,24 @@ public class TopicsImpl extends BaseResource implements Topics { } @Override - public void setBacklogQuota(String topic, BacklogQuota backlogQuota) throws PulsarAdminException { + public void setBacklogQuota(String topic, BacklogQuota backlogQuota, + BacklogQuotaType backlogQuotaType) throws PulsarAdminException { try { TopicName tn = validateTopic(topic); WebTarget path = topicPath(tn, "backlogQuota"); - request(path).post(Entity.entity(backlogQuota, MediaType.APPLICATION_JSON), ErrorData.class); + request(path.queryParam("backlogQuotaType", backlogQuotaType.toString())) + .post(Entity.entity(backlogQuota, MediaType.APPLICATION_JSON), ErrorData.class); } catch (Exception e) { throw getApiException(e); } } @Override - public void removeBacklogQuota(String topic) throws PulsarAdminException { + public void removeBacklogQuota(String topic, BacklogQuotaType backlogQuotaType) throws PulsarAdminException { try { TopicName tn = validateTopic(topic); WebTarget path = topicPath(tn, "backlogQuota"); - request(path.queryParam("backlogQuotaType", BacklogQuotaType.destination_storage.toString())) + request(path.queryParam("backlogQuotaType", backlogQuotaType.toString())) .delete(ErrorData.class); } catch (Exception e) { throw getApiException(e); diff --git a/pulsar-client-tools-test/src/test/java/org/apache/pulsar/admin/cli/PulsarAdminToolTest.java b/pulsar-client-tools-test/src/test/java/org/apache/pulsar/admin/cli/PulsarAdminToolTest.java index 5d27c7f..f83749c 100644 --- a/pulsar-client-tools-test/src/test/java/org/apache/pulsar/admin/cli/PulsarAdminToolTest.java +++ b/pulsar-client-tools-test/src/test/java/org/apache/pulsar/admin/cli/PulsarAdminToolTest.java @@ -418,7 +418,8 @@ public class PulsarAdminToolTest { BacklogQuota.builder() .limitSize(10) .retentionPolicy(RetentionPolicy.producer_request_hold) - .build()); + .build(), + BacklogQuota.BacklogQuotaType.destination_storage); mockNamespaces = mock(Namespaces.class); when(admin.namespaces()).thenReturn(mockNamespaces); @@ -429,7 +430,8 @@ public class PulsarAdminToolTest { BacklogQuota.builder() .limitSize(10 * 1024) .retentionPolicy(RetentionPolicy.producer_exception) - .build()); + .build(), + BacklogQuota.BacklogQuotaType.destination_storage); mockNamespaces = mock(Namespaces.class); when(admin.namespaces()).thenReturn(mockNamespaces); @@ -440,7 +442,8 @@ public class PulsarAdminToolTest { BacklogQuota.builder() .limitSize(10 * 1024 * 1024) .retentionPolicy(RetentionPolicy.producer_exception) - .build()); + .build(), + BacklogQuota.BacklogQuotaType.destination_storage); mockNamespaces = mock(Namespaces.class); when(admin.namespaces()).thenReturn(mockNamespaces); @@ -451,19 +454,21 @@ public class PulsarAdminToolTest { BacklogQuota.builder() .limitSize(10L * 1024 * 1024 * 1024) .retentionPolicy(RetentionPolicy.producer_exception) - .build()); + .build(), + BacklogQuota.BacklogQuotaType.destination_storage); mockNamespaces = mock(Namespaces.class); when(admin.namespaces()).thenReturn(mockNamespaces); namespaces = new CmdNamespaces(() -> admin); - namespaces.run(split("set-backlog-quota myprop/clust/ns1 -p producer_exception -l 10G -lt 10000")); + namespaces.run(split("set-backlog-quota myprop/clust/ns1 -p producer_exception -l 10G -lt 10000 -t message_age")); verify(mockNamespaces).setBacklogQuota("myprop/clust/ns1", BacklogQuota.builder() .limitSize(10l * 1024 * 1024 * 1024) .limitTime(10000) .retentionPolicy(RetentionPolicy.producer_exception) - .build()); + .build(), + BacklogQuota.BacklogQuotaType.message_age); namespaces.run(split("set-persistence myprop/clust/ns1 -e 2 -w 1 -a 1 -r 100.0")); verify(mockNamespaces).setPersistence("myprop/clust/ns1", @@ -905,15 +910,31 @@ public class PulsarAdminToolTest { cmdTopics.run(split("get-backlog-quotas persistent://myprop/clust/ns1/ds1 -ap")); verify(mockTopics).getBacklogQuotaMap("persistent://myprop/clust/ns1/ds1", true); - cmdTopics.run(split("set-backlog-quota persistent://myprop/clust/ns1/ds1 -l 10 -lt 1000 -p producer_request_hold")); + cmdTopics.run(split("set-backlog-quota persistent://myprop/clust/ns1/ds1 -l 10 -p producer_request_hold")); verify(mockTopics).setBacklogQuota("persistent://myprop/clust/ns1/ds1", BacklogQuota.builder() .limitSize(10) + .retentionPolicy(RetentionPolicy.producer_request_hold) + .build(), + BacklogQuota.BacklogQuotaType.destination_storage); + //cmd with option cannot be executed repeatedly. + cmdTopics = new CmdTopics(() -> admin); + cmdTopics.run(split("set-backlog-quota persistent://myprop/clust/ns1/ds1 -lt 1000 -p producer_request_hold -t message_age")); + verify(mockTopics).setBacklogQuota("persistent://myprop/clust/ns1/ds1", + BacklogQuota.builder() + .limitSize(-1) .limitTime(1000) .retentionPolicy(RetentionPolicy.producer_request_hold) - .build()); + .build(), + BacklogQuota.BacklogQuotaType.message_age); + //cmd with option cannot be executed repeatedly. + cmdTopics = new CmdTopics(() -> admin); cmdTopics.run(split("remove-backlog-quota persistent://myprop/clust/ns1/ds1")); - verify(mockTopics).removeBacklogQuota("persistent://myprop/clust/ns1/ds1"); + verify(mockTopics).removeBacklogQuota("persistent://myprop/clust/ns1/ds1", BacklogQuota.BacklogQuotaType.destination_storage); + //cmd with option cannot be executed repeatedly. + cmdTopics = new CmdTopics(() -> admin); + cmdTopics.run(split("remove-backlog-quota persistent://myprop/clust/ns1/ds1 -t message_age")); + verify(mockTopics).removeBacklogQuota("persistent://myprop/clust/ns1/ds1", BacklogQuota.BacklogQuotaType.message_age); cmdTopics.run(split("info-internal persistent://myprop/clust/ns1/ds1")); verify(mockTopics).getInternalInfo("persistent://myprop/clust/ns1/ds1"); diff --git a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdNamespaces.java b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdNamespaces.java index a44022e..bac6a39 100644 --- a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdNamespaces.java +++ b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdNamespaces.java @@ -1095,6 +1095,9 @@ public class CmdNamespaces extends CmdBase { + "Valid options are: [producer_request_hold, producer_exception, consumer_backlog_eviction]", required = true) private String policyStr; + @Parameter(names = {"-t", "--type"}, description = "Backlog quota type to set") + private String backlogQuotaType = BacklogQuota.BacklogQuotaType.destination_storage.name(); + @Override void run() throws PulsarAdminException { BacklogQuota.RetentionPolicy policy; @@ -1114,7 +1117,8 @@ public class CmdNamespaces extends CmdBase { BacklogQuota.builder().limitSize(limit) .limitTime(limitTime) .retentionPolicy(policy) - .build()); + .build(), + BacklogQuota.BacklogQuotaType.valueOf(backlogQuotaType)); } } @@ -1123,10 +1127,13 @@ public class CmdNamespaces extends CmdBase { @Parameter(description = "tenant/namespace", required = true) private java.util.List<String> params; + @Parameter(names = {"-t", "--type"}, description = "Backlog quota type to remove") + private String backlogQuotaType = BacklogQuota.BacklogQuotaType.destination_storage.name(); + @Override void run() throws PulsarAdminException { String namespace = validateNamespace(params); - getAdmin().namespaces().removeBacklogQuota(namespace); + getAdmin().namespaces().removeBacklogQuota(namespace, BacklogQuota.BacklogQuotaType.valueOf(backlogQuotaType)); } } diff --git a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdTopics.java b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdTopics.java index 0d0a757..cdd91b1 100644 --- a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdTopics.java +++ b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdTopics.java @@ -1120,8 +1120,8 @@ public class CmdTopics extends CmdBase { @Parameter(description = "persistent://tenant/namespace/topic", required = true) private java.util.List<String> params; - @Parameter(names = { "-l", "--limit" }, description = "Size limit (eg: 10M, 16G)", required = true) - private String limitStr; + @Parameter(names = { "-l", "--limit" }, description = "Size limit (eg: 10M, 16G)") + private String limitStr = "-1"; @Parameter(names = { "-lt", "--limitTime" }, description = "Time limit in second, non-positive number for disabling time limit.") private int limitTime = -1; @@ -1130,6 +1130,9 @@ public class CmdTopics extends CmdBase { + "Valid options are: [producer_request_hold, producer_exception, consumer_backlog_eviction]", required = true) private String policyStr; + @Parameter(names = {"-t", "--type"}, description = "Backlog quota type to set") + private String backlogQuotaType = BacklogQuota.BacklogQuotaType.destination_storage.name(); + @Override void run() throws PulsarAdminException { BacklogQuota.RetentionPolicy policy; @@ -1149,7 +1152,8 @@ public class CmdTopics extends CmdBase { .limitSize(limit) .limitTime(limitTime) .retentionPolicy(policy) - .build()); + .build(), + BacklogQuota.BacklogQuotaType.valueOf(backlogQuotaType)); } } @@ -1159,10 +1163,13 @@ public class CmdTopics extends CmdBase { @Parameter(description = "persistent://tenant/namespace/topic", required = true) private java.util.List<String> params; + @Parameter(names = {"-t", "--type"}, description = "Backlog quota type to remove") + private String backlogQuotaType = BacklogQuota.BacklogQuotaType.destination_storage.name(); + @Override void run() throws PulsarAdminException { String persistentTopic = validatePersistentTopic(params); - getTopics().removeBacklogQuota(persistentTopic); + getTopics().removeBacklogQuota(persistentTopic, BacklogQuota.BacklogQuotaType.valueOf(backlogQuotaType)); } } diff --git a/site2/docs/cookbooks-retention-expiry.md b/site2/docs/cookbooks-retention-expiry.md index 78635ba..94d2b83 100644 --- a/site2/docs/cookbooks-retention-expiry.md +++ b/site2/docs/cookbooks-retention-expiry.md @@ -179,17 +179,23 @@ You can set a size and/or time threshold and backlog retention policy for all of #### pulsar-admin -Use the [`set-backlog-quota`](reference-pulsar-admin.md#namespaces) subcommand and specify a namespace, a size limit using the `-l`/`--limit` flag, and a retention policy using the `-p`/`--policy` flag. +Use the [`set-backlog-quota`](reference-pulsar-admin.md#namespaces) subcommand and specify a namespace, a size limit using the `-l`/`--limit` , `-lt`/`--limitTime` flag to limit backlog, a retention policy using the `-p`/`--policy` flag and a policy type using `-t`/`--type` (default is destination_storage). ##### Example ```shell $ pulsar-admin namespaces set-backlog-quota my-tenant/my-ns \ --limit 2G \ - --limitTime 36000 \ --policy producer_request_hold ``` +```shell +$ pulsar-admin namespaces set-backlog-quota my-tenant/my-ns/my-topic \ +--limitTime 3600 \ +--policy producer_request_hold \ +--type message_age +``` + #### REST API {@inject: endpoint|POST|/admin/v2/namespaces/:tenant/:namespace/backlogQuota|operation/getBacklogQuotaMap?version=[[pulsar:version_number]]} @@ -236,7 +242,7 @@ Map<BacklogQuota.BacklogQuotaType,BacklogQuota> quotas = #### pulsar-admin -Use the [`remove-backlog-quota`](reference-pulsar-admin.md#pulsar-admin-namespaces-remove-backlog-quota) subcommand and specify a namespace. Here's an example: +Use the [`remove-backlog-quota`](reference-pulsar-admin.md#pulsar-admin-namespaces-remove-backlog-quota) subcommand and specify a namespace, use `t`/`--type` to specify backlog type to remove(default is destination_storage). Here's an example: ```shell $ pulsar-admin namespaces remove-backlog-quota my-tenant/my-ns diff --git a/site2/docs/reference-pulsar-admin.md b/site2/docs/reference-pulsar-admin.md index e15104a..0cba6c3 100644 --- a/site2/docs/reference-pulsar-admin.md +++ b/site2/docs/reference-pulsar-admin.md @@ -1120,7 +1120,9 @@ Options |Flag|Description|Default| |----|---|---| |`-l`, `--limit`|The backlog size limit (for example `10M` or `16G`)|| +|`-lt`, `--limitTime`|Time limit in second, non-positive number for disabling time limit. (for example 3600 for 1 hour)|| |`-p`, `--policy`|The retention policy to enforce when the limit is reached. The valid options are: `producer_request_hold`, `producer_exception` or `consumer_backlog_eviction`| +|`-t`, `--type`|Backlog quota type to set. The valid options are: `destination_storage`, `message_age` |destination_storage| Example ```bash @@ -1129,9 +1131,20 @@ $ pulsar-admin namespaces set-backlog-quota my-tenant/my-ns \ --policy producer_request_hold ``` +```bash +$ pulsar-admin namespaces set-backlog-quota my-tenant/my-ns \ +--limitTime 3600 \ +--policy producer_request_hold \ +--type message_age +``` + ### `remove-backlog-quota` Remove a backlog quota policy from a namespace +|Flag|Description|Default| +|---|---|---| +|`-t`, `--type`|Backlog quota type to remove. The valid options are: `destination_storage`, `message_age` |destination_storage| + Usage ```bash $ pulsar-admin namespaces remove-backlog-quota tenant/namespace @@ -2347,14 +2360,39 @@ $ pulsar-admin topics get-backlog-quotas tenant/namespace/topic ### `set-backlog-quota` Set a backlog quota policy for a topic. +|Flag|Description|Default| +|----|---|---| +|`-l`, `--limit`|The backlog size limit (for example `10M` or `16G`)|| +|`-lt`, `--limitTime`|Time limit in second, non-positive number for disabling time limit. (for example 3600 for 1 hour)|| +|`-p`, `--policy`|The retention policy to enforce when the limit is reached. The valid options are: `producer_request_hold`, `producer_exception` or `consumer_backlog_eviction`| +|`-t`, `--type`|Backlog quota type to set. The valid options are: `destination_storage`, `message_age` |destination_storage| + Usage ```bash $ pulsar-admin topics set-backlog-quota tenant/namespace/topic options ``` +Example +```bash +$ pulsar-admin namespaces set-backlog-quota my-tenant/my-ns/my-topic \ +--limit 2G \ +--policy producer_request_hold +``` + +```bash +$ pulsar-admin namespaces set-backlog-quota my-tenant/my-ns/my-topic \ +--limitTime 3600 \ +--policy producer_request_hold \ +--type message_age +``` + ### `remove-backlog-quota` Remove a backlog quota policy from a topic. +|Flag|Description|Default| +|---|---|---| +|`-t`, `--type`|Backlog quota type to remove. The valid options are: `destination_storage`, `message_age` |destination_storage| + Usage ```bash $ pulsar-admin topics remove-backlog-quota tenant/namespace/topic
