This is an automated email from the ASF dual-hosted git repository.

chenhang pushed a commit to branch branch-2.8
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit 353c72da2f68890fd33c22bbb0a7c152287a5ec6
Author: Marvin Cai <[email protected]>
AuthorDate: Thu Aug 5 20:41:18 2021 -0700

    Fix time based backlog quota. (#11509)
    
    Fixes #11404
    
    ### Motivation
    Time based backlog quota type message_age is set separately but when check 
backlog we are only checking against destination_storage type.
    So fix to loop through all BacklogQuotaType when checking if backlog 
exceeded.
    
    ### Modification
    * Added unit test.
    * Added default implementation to make Admin Topic/Namespace backlog quota 
related API backward compatible.
    
    (cherry picked from commit e82df7cb34dee301407d03d14f08368e63b792b5)
---
 .../apache/pulsar/broker/admin/AdminResource.java  |   6 +-
 .../broker/admin/impl/PersistentTopicsBase.java    |  43 ++--
 .../pulsar/broker/service/BacklogQuotaManager.java |  23 ++-
 .../apache/pulsar/broker/service/ServerCnx.java    |  36 ++--
 .../org/apache/pulsar/broker/service/Topic.java    |   4 +-
 .../service/nonpersistent/NonPersistentTopic.java  |   4 +-
 .../broker/service/persistent/PersistentTopic.java |  18 +-
 .../stats/prometheus/NamespaceStatsAggregator.java |   7 +-
 .../org/apache/pulsar/broker/ConfigHelper.java     |  20 +-
 .../broker/admin/TopicPoliciesDisableTest.java     |   4 +-
 .../pulsar/broker/admin/TopicPoliciesTest.java     | 230 +++++++++++++++++++--
 .../broker/service/BacklogQuotaManagerTest.java    |  18 +-
 .../org/apache/pulsar/client/admin/Namespaces.java |  28 ++-
 .../org/apache/pulsar/client/admin/Topics.java     |  19 +-
 .../policies/data/impl/BacklogQuotaImpl.java       |   5 +-
 .../client/admin/internal/NamespacesImpl.java      |  22 +-
 .../pulsar/client/admin/internal/TopicsImpl.java   |  10 +-
 .../pulsar/admin/cli/PulsarAdminToolTest.java      |  39 +++-
 .../org/apache/pulsar/admin/cli/CmdNamespaces.java |  11 +-
 .../org/apache/pulsar/admin/cli/CmdTopics.java     |  15 +-
 site2/docs/cookbooks-retention-expiry.md           |  12 +-
 site2/docs/reference-pulsar-admin.md               |  38 ++++
 22 files changed, 472 insertions(+), 140 deletions(-)

diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/AdminResource.java 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/AdminResource.java
index 568d569..7a3c267 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/AdminResource.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/AdminResource.java
@@ -358,8 +358,10 @@ public abstract class AdminResource extends 
PulsarWebResource {
 
     }
 
-    protected BacklogQuota namespaceBacklogQuota(String namespace, String 
namespacePath) {
-        return 
pulsar().getBrokerService().getBacklogQuotaManager().getBacklogQuota(namespace, 
namespacePath);
+    protected BacklogQuota namespaceBacklogQuota(String namespace, String 
namespacePath,
+                                                 BacklogQuota.BacklogQuotaType 
backlogQuotaType) {
+        return pulsar().getBrokerService().getBacklogQuotaManager()
+                .getBacklogQuota(namespace, namespacePath, backlogQuotaType);
     }
 
     protected CompletableFuture<Optional<TopicPolicies>> 
getTopicPoliciesAsyncWithRetry(TopicName topicName) {
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
index 7540206..9bbf018 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
@@ -2625,14 +2625,17 @@ public class PersistentTopicsBase extends AdminResource 
{
                     quotaMap = 
getNamespacePolicies(namespaceName).backlog_quota_map;
                     if (quotaMap.isEmpty()) {
                         String namespace = namespaceName.toString();
-                        quotaMap.put(
-                                
BacklogQuota.BacklogQuotaType.destination_storage,
-                                namespaceBacklogQuota(namespace, 
AdminResource.path(POLICIES, namespace))
-                        );
+                        for (BacklogQuota.BacklogQuotaType backlogQuotaType : 
BacklogQuota.BacklogQuotaType.values()) {
+                            quotaMap.put(
+                                    backlogQuotaType,
+                                    namespaceBacklogQuota(namespace,
+                                            AdminResource.path(POLICIES, 
namespace), backlogQuotaType)
+                            );
+                        }
                     }
                 }
                 return quotaMap;
-            });
+        });
     }
 
     protected CompletableFuture<Void> 
internalSetBacklogQuota(BacklogQuota.BacklogQuotaType backlogQuotaType,
@@ -2748,21 +2751,21 @@ public class PersistentTopicsBase extends AdminResource 
{
         return getTopicPoliciesAsyncWithRetry(topicName)
             .thenCompose(op -> {
                 TopicPolicies topicPolicies = op.orElseGet(TopicPolicies::new);
-                BacklogQuota backlogQuota =
-                        topicPolicies.getBackLogQuotaMap()
-                                
.get(BacklogQuota.BacklogQuotaType.destination_storage.name());
-                if (backlogQuota == null) {
-                    Policies policies = 
getNamespacePolicies(topicName.getNamespaceObject());
-                    backlogQuota = 
policies.backlog_quota_map.get(BacklogQuota.BacklogQuotaType.destination_storage);
-                }
-                if (!checkBacklogQuota(backlogQuota, retention)) {
-                    log.warn(
-                            "[{}] Failed to update retention quota 
configuration for topic {}: "
-                                    + "conflicts with retention quota",
-                            clientAppId(), topicName);
-                    return FutureUtil.failedFuture(new 
RestException(Status.PRECONDITION_FAILED,
-                            "Retention Quota must exceed configured backlog 
quota for topic. "
-                                    + "Please increase retention quota and 
retry"));
+                for (BacklogQuota.BacklogQuotaType backlogQuotaType : 
BacklogQuota.BacklogQuotaType.values()) {
+                    BacklogQuota backlogQuota = 
topicPolicies.getBackLogQuotaMap().get(backlogQuotaType.name());
+                    if (backlogQuota == null) {
+                        Policies policies = 
getNamespacePolicies(topicName.getNamespaceObject());
+                        backlogQuota = 
policies.backlog_quota_map.get(backlogQuotaType);
+                    }
+                    if (!checkBacklogQuota(backlogQuota, retention)) {
+                        log.warn(
+                                "[{}] Failed to update retention quota 
configuration for topic {}: "
+                                        + "conflicts with retention quota",
+                                clientAppId(), topicName);
+                        return FutureUtil.failedFuture(new 
RestException(Status.PRECONDITION_FAILED,
+                                "Retention Quota must exceed configured 
backlog quota for topic. "
+                                        + "Please increase retention quota and 
retry"));
+                    }
                 }
                 topicPolicies.setRetentionPolicies(retention);
                 return 
pulsar().getTopicPoliciesService().updateTopicPoliciesAsync(topicName, 
topicPolicies);
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BacklogQuotaManager.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BacklogQuotaManager.java
index fa0d24b..bbb3ddf 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BacklogQuotaManager.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BacklogQuotaManager.java
@@ -54,7 +54,8 @@ public class BacklogQuotaManager {
     public BacklogQuotaManager(PulsarService pulsar) {
         this.isTopicLevelPoliciesEnable = 
pulsar.getConfiguration().isTopicLevelPoliciesEnabled();
         this.defaultQuota = BacklogQuotaImpl.builder()
-                
.limitSize(pulsar.getConfiguration().getBacklogQuotaDefaultLimitGB() * 1024 * 
1024 * 1024)
+                
.limitSize(pulsar.getConfiguration().getBacklogQuotaDefaultLimitGB()
+                        * BacklogQuotaImpl.BYTES_IN_GIGABYTE)
                 
.limitTime(pulsar.getConfiguration().getBacklogQuotaDefaultLimitSecond())
                 
.retentionPolicy(pulsar.getConfiguration().getBacklogQuotaDefaultRetentionPolicy())
                 .build();
@@ -66,11 +67,11 @@ public class BacklogQuotaManager {
         return this.defaultQuota;
     }
 
-    public BacklogQuotaImpl getBacklogQuota(String namespace, String 
policyPath) {
+    public BacklogQuotaImpl getBacklogQuota(String namespace, String 
policyPath, BacklogQuotaType backlogQuotaType) {
         try {
             return zkCache.get(policyPath)
                     .map(p -> (BacklogQuotaImpl) p.backlog_quota_map
-                            
.getOrDefault(BacklogQuotaType.destination_storage, defaultQuota))
+                            .getOrDefault(backlogQuotaType, defaultQuota))
                     .orElse(defaultQuota);
         } catch (Exception e) {
             log.warn("Failed to read policies data, will apply the default 
backlog quota: namespace={}", namespace, e);
@@ -78,30 +79,30 @@ public class BacklogQuotaManager {
         }
     }
 
-    public BacklogQuotaImpl getBacklogQuota(TopicName topicName) {
+    public BacklogQuotaImpl getBacklogQuota(TopicName topicName, 
BacklogQuotaType backlogQuotaType) {
         String policyPath = AdminResource.path(POLICIES, 
topicName.getNamespace());
         if (!isTopicLevelPoliciesEnable) {
-            return getBacklogQuota(topicName.getNamespace(), policyPath);
+            return getBacklogQuota(topicName.getNamespace(), policyPath, 
backlogQuotaType);
         }
 
         try {
             return 
Optional.ofNullable(pulsar.getTopicPoliciesService().getTopicPolicies(topicName))
                     .map(TopicPolicies::getBackLogQuotaMap)
-                    .map(map -> 
map.get(BacklogQuotaType.destination_storage.name()))
-                    .orElseGet(() -> getBacklogQuota(topicName.getNamespace(), 
policyPath));
+                    .map(map -> map.get(backlogQuotaType.name()))
+                    .orElseGet(() -> getBacklogQuota(topicName.getNamespace(), 
policyPath, backlogQuotaType));
         } catch (Exception e) {
             log.warn("Failed to read topic policies data, will apply the 
namespace backlog quota: topicName={}",
                     topicName, e);
         }
-        return getBacklogQuota(topicName.getNamespace(), policyPath);
+        return getBacklogQuota(topicName.getNamespace(), policyPath, 
backlogQuotaType);
     }
 
     public long getBacklogQuotaLimitInSize(TopicName topicName) {
-        return getBacklogQuota(topicName).getLimitSize();
+        return getBacklogQuota(topicName, 
BacklogQuotaType.destination_storage).getLimitSize();
     }
 
     public int getBacklogQuotaLimitInTime(TopicName topicName) {
-        return getBacklogQuota(topicName).getLimitTime();
+        return getBacklogQuota(topicName, 
BacklogQuotaType.message_age).getLimitTime();
     }
 
     /**
@@ -112,7 +113,7 @@ public class BacklogQuotaManager {
     public void handleExceededBacklogQuota(PersistentTopic persistentTopic, 
BacklogQuotaType backlogQuotaType,
                                            boolean 
preciseTimeBasedBacklogQuotaCheck) {
         TopicName topicName = TopicName.get(persistentTopic.getName());
-        BacklogQuota quota = getBacklogQuota(topicName);
+        BacklogQuota quota = getBacklogQuota(topicName, backlogQuotaType);
         log.info("Backlog quota type {} exceeded for topic [{}]. Applying [{}] 
policy", backlogQuotaType,
                 persistentTopic.getName(), quota.getPolicy());
         switch (quota.getPolicy()) {
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
index ac50c53..511d5a2 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
@@ -1182,23 +1182,27 @@ public class ServerCnx extends PulsarHandler implements 
TransportCnx {
 
                         
service.getOrCreateTopic(topicName.toString()).thenAccept((Topic topic) -> {
                             // Before creating producer, check if backlog 
quota exceeded
-                            // on topic
-                            if (topic.isBacklogQuotaExceeded(producerName)) {
-                                IllegalStateException illegalStateException = 
new IllegalStateException(
-                                        "Cannot create producer on topic with 
backlog quota exceeded");
-                                BacklogQuota.RetentionPolicy retentionPolicy = 
topic.getBacklogQuota().getPolicy();
-                                if (retentionPolicy == 
BacklogQuota.RetentionPolicy.producer_request_hold) {
-                                    commandSender.sendErrorResponse(requestId,
-                                            
ServerError.ProducerBlockedQuotaExceededError,
-                                            
illegalStateException.getMessage());
-                                } else if (retentionPolicy == 
BacklogQuota.RetentionPolicy.producer_exception) {
-                                    commandSender.sendErrorResponse(requestId,
-                                            
ServerError.ProducerBlockedQuotaExceededException,
-                                            
illegalStateException.getMessage());
+                            // on topic for size based limit and time based 
limit
+                            for (BacklogQuota.BacklogQuotaType 
backlogQuotaType :
+                                    BacklogQuota.BacklogQuotaType.values()) {
+                                if (topic.isBacklogQuotaExceeded(producerName, 
backlogQuotaType)) {
+                                    IllegalStateException 
illegalStateException = new IllegalStateException(
+                                            "Cannot create producer on topic 
with backlog quota exceeded");
+                                    BacklogQuota.RetentionPolicy 
retentionPolicy = topic
+                                            
.getBacklogQuota(backlogQuotaType).getPolicy();
+                                    if (retentionPolicy == 
BacklogQuota.RetentionPolicy.producer_request_hold) {
+                                        
commandSender.sendErrorResponse(requestId,
+                                                
ServerError.ProducerBlockedQuotaExceededError,
+                                                
illegalStateException.getMessage());
+                                    } else if (retentionPolicy == 
BacklogQuota.RetentionPolicy.producer_exception) {
+                                        
commandSender.sendErrorResponse(requestId,
+                                                
ServerError.ProducerBlockedQuotaExceededException,
+                                                
illegalStateException.getMessage());
+                                    }
+                                    
producerFuture.completeExceptionally(illegalStateException);
+                                    producers.remove(producerId, 
producerFuture);
+                                    return;
                                 }
-                                
producerFuture.completeExceptionally(illegalStateException);
-                                producers.remove(producerId, producerFuture);
-                                return;
                             }
 
                             // Check whether the producer will publish 
encrypted messages or not
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Topic.java 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Topic.java
index d1e4506..eaed76c 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Topic.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Topic.java
@@ -176,7 +176,7 @@ public interface Topic {
 
     CompletableFuture<Void> onPoliciesUpdate(Policies data);
 
-    boolean isBacklogQuotaExceeded(String producerName);
+    boolean isBacklogQuotaExceeded(String producerName, 
BacklogQuota.BacklogQuotaType backlogQuotaType);
 
     boolean isEncryptionRequired();
 
@@ -184,7 +184,7 @@ public interface Topic {
 
     boolean isReplicated();
 
-    BacklogQuota getBacklogQuota();
+    BacklogQuota getBacklogQuota(BacklogQuota.BacklogQuotaType 
backlogQuotaType);
 
     void updateRates(NamespaceStats nsStats, NamespaceBundleStats 
currentBundleStats,
             StatsOutputStream topicStatsStream, ClusterReplicationMetrics 
clusterReplicationMetrics,
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java
index 5224eaa..37680d4 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java
@@ -965,7 +965,7 @@ public class NonPersistentTopic extends AbstractTopic 
implements Topic {
      * @return Backlog quota for topic
      */
     @Override
-    public BacklogQuota getBacklogQuota() {
+    public BacklogQuota getBacklogQuota(BacklogQuota.BacklogQuotaType 
backlogQuotaType) {
         // No-op
         throw new UnsupportedOperationException("getBacklogQuota method is not 
supported on non-persistent topic");
     }
@@ -975,7 +975,7 @@ public class NonPersistentTopic extends AbstractTopic 
implements Topic {
      * @return quota exceeded status for blocking producer creation
      */
     @Override
-    public boolean isBacklogQuotaExceeded(String producerName) {
+    public boolean isBacklogQuotaExceeded(String producerName, 
BacklogQuota.BacklogQuotaType backlogQuotaType) {
         // No-op
         return false;
     }
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
index 2f9a91d..73c28f7 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
@@ -2369,9 +2369,9 @@ public class PersistentTopic extends AbstractTopic
      * @return Backlog quota for topic
      */
     @Override
-    public BacklogQuota getBacklogQuota() {
+    public BacklogQuota getBacklogQuota(BacklogQuota.BacklogQuotaType 
backlogQuotaType) {
         TopicName topicName = TopicName.get(this.getName());
-        return 
brokerService.getBacklogQuotaManager().getBacklogQuota(topicName);
+        return 
brokerService.getBacklogQuotaManager().getBacklogQuota(topicName, 
backlogQuotaType);
     }
 
     /**
@@ -2379,17 +2379,19 @@ public class PersistentTopic extends AbstractTopic
      * @return quota exceeded status for blocking producer creation
      */
     @Override
-    public boolean isBacklogQuotaExceeded(String producerName) {
-        BacklogQuota backlogQuota = getBacklogQuota();
+    public boolean isBacklogQuotaExceeded(String producerName, 
BacklogQuota.BacklogQuotaType backlogQuotaType) {
+        BacklogQuota backlogQuota = getBacklogQuota(backlogQuotaType);
 
         if (backlogQuota != null) {
             BacklogQuota.RetentionPolicy retentionPolicy = 
backlogQuota.getPolicy();
 
             if ((retentionPolicy == 
BacklogQuota.RetentionPolicy.producer_request_hold
-                    || retentionPolicy == 
BacklogQuota.RetentionPolicy.producer_exception)
-                    && (isSizeBacklogExceeded() || isTimeBacklogExceeded())) {
-                log.info("[{}] Backlog quota exceeded. Cannot create producer 
[{}]", this.getName(), producerName);
-                return true;
+                    || retentionPolicy == 
BacklogQuota.RetentionPolicy.producer_exception)) {
+                if (backlogQuotaType == 
BacklogQuota.BacklogQuotaType.destination_storage && isSizeBacklogExceeded()
+                || backlogQuotaType == 
BacklogQuota.BacklogQuotaType.message_age && isTimeBacklogExceeded()){
+                    log.info("[{}] Backlog quota exceeded. Cannot create 
producer [{}]", this.getName(), producerName);
+                    return true;
+                }
             } else {
                 return false;
             }
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/NamespaceStatsAggregator.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/NamespaceStatsAggregator.java
index c08641d..e041a01 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/NamespaceStatsAggregator.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/NamespaceStatsAggregator.java
@@ -25,6 +25,7 @@ import 
org.apache.bookkeeper.mledger.impl.ManagedLedgerMBeanImpl;
 import org.apache.pulsar.broker.PulsarService;
 import org.apache.pulsar.broker.service.Topic;
 import org.apache.pulsar.broker.service.persistent.PersistentTopic;
+import org.apache.pulsar.common.policies.data.BacklogQuota;
 import org.apache.pulsar.common.policies.data.stats.ConsumerStatsImpl;
 import org.apache.pulsar.common.policies.data.stats.ReplicatorStatsImpl;
 import org.apache.pulsar.common.policies.data.stats.TopicStatsImpl;
@@ -99,8 +100,10 @@ public class NamespaceStatsAggregator {
             stats.managedLedgerStats.storageLogicalSize = 
mlStats.getStoredMessagesLogicalSize();
             stats.managedLedgerStats.backlogSize = 
ml.getEstimatedBacklogSize();
             stats.managedLedgerStats.offloadedStorageUsed = 
ml.getOffloadedSize();
-            stats.backlogQuotaLimit = topic.getBacklogQuota().getLimitSize();
-            stats.backlogQuotaLimitTime = 
topic.getBacklogQuota().getLimitTime();
+            stats.backlogQuotaLimit = topic
+                    
.getBacklogQuota(BacklogQuota.BacklogQuotaType.destination_storage).getLimitSize();
+            stats.backlogQuotaLimitTime = topic
+                    
.getBacklogQuota(BacklogQuota.BacklogQuotaType.message_age).getLimitTime();
 
             stats.managedLedgerStats.storageWriteLatencyBuckets
                     .addAll(mlStats.getInternalAddEntryLatencyBuckets());
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/ConfigHelper.java 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/ConfigHelper.java
index 86cf2aa..b929636 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/ConfigHelper.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/ConfigHelper.java
@@ -18,11 +18,12 @@
  */
 package org.apache.pulsar.broker;
 
+import com.google.common.collect.ImmutableMap;
 import org.apache.pulsar.common.policies.data.BacklogQuota;
 import org.apache.pulsar.common.policies.data.DispatchRate;
 import org.apache.pulsar.common.policies.data.SubscribeRate;
+import org.apache.pulsar.common.policies.data.impl.BacklogQuotaImpl;
 
-import java.util.Collections;
 import java.util.Map;
 
 public class ConfigHelper {
@@ -30,13 +31,22 @@ public class ConfigHelper {
 
 
     public static Map<BacklogQuota.BacklogQuotaType, BacklogQuota> 
backlogQuotaMap(ServiceConfiguration configuration) {
-        return 
Collections.singletonMap(BacklogQuota.BacklogQuotaType.destination_storage,
-                backlogQuota(configuration));
+        return 
ImmutableMap.of(BacklogQuota.BacklogQuotaType.destination_storage,
+                sizeBacklogQuota(configuration),
+                BacklogQuota.BacklogQuotaType.message_age,
+                timeBacklogQuota(configuration));
     }
 
-    public static BacklogQuota backlogQuota(ServiceConfiguration 
configuration) {
+    public static BacklogQuota sizeBacklogQuota(ServiceConfiguration 
configuration) {
         return BacklogQuota.builder()
-                .limitSize(configuration.getBacklogQuotaDefaultLimitGB() * 
1024 * 1024 * 1024)
+                .limitSize(configuration.getBacklogQuotaDefaultLimitGB() * 
BacklogQuotaImpl.BYTES_IN_GIGABYTE)
+                
.retentionPolicy(configuration.getBacklogQuotaDefaultRetentionPolicy())
+                .build();
+    }
+
+    public static BacklogQuota timeBacklogQuota(ServiceConfiguration 
configuration) {
+        return BacklogQuota.builder()
+                .limitTime(configuration.getBacklogQuotaDefaultLimitSecond())
                 
.retentionPolicy(configuration.getBacklogQuotaDefaultRetentionPolicy())
                 .build();
     }
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TopicPoliciesDisableTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TopicPoliciesDisableTest.java
index 5e9967b..e9ec190 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TopicPoliciesDisableTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TopicPoliciesDisableTest.java
@@ -78,14 +78,14 @@ public class TopicPoliciesDisableTest extends 
MockedPulsarServiceBaseTest {
         log.info("Backlog quota: {} will set to the topic: {}", backlogQuota, 
testTopic);
 
         try {
-            admin.topics().setBacklogQuota(testTopic, backlogQuota);
+            admin.topics().setBacklogQuota(testTopic, backlogQuota, 
BacklogQuota.BacklogQuotaType.destination_storage);
             Assert.fail();
         } catch (PulsarAdminException e) {
             Assert.assertEquals(e.getStatusCode(), 
HttpStatus.METHOD_NOT_ALLOWED_405);
         }
 
         try {
-            admin.topics().removeBacklogQuota(testTopic);
+            admin.topics().removeBacklogQuota(testTopic, 
BacklogQuota.BacklogQuotaType.destination_storage);
             Assert.fail();
         } catch (PulsarAdminException e) {
             Assert.assertEquals(e.getStatusCode(), 
HttpStatus.METHOD_NOT_ALLOWED_405);
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TopicPoliciesTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TopicPoliciesTest.java
index bfb7715..b2a6e83 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TopicPoliciesTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TopicPoliciesTest.java
@@ -122,7 +122,7 @@ public class TopicPoliciesTest extends 
MockedPulsarServiceBaseTest {
     }
 
     @Test
-    public void testSetBacklogQuota() throws Exception {
+    public void testSetSizeBasedBacklogQuota() throws Exception {
 
         BacklogQuota backlogQuota = BacklogQuota.builder()
                 .limitSize(1024)
@@ -130,7 +130,7 @@ public class TopicPoliciesTest extends 
MockedPulsarServiceBaseTest {
                 .build();
         log.info("Backlog quota: {} will set to the topic: {}", backlogQuota, 
testTopic);
 
-        admin.topics().setBacklogQuota(testTopic, backlogQuota);
+        admin.topics().setBacklogQuota(testTopic, backlogQuota, 
BacklogQuota.BacklogQuotaType.destination_storage);
         log.info("Backlog quota set success on topic: {}", testTopic);
 
         Awaitility.await()
@@ -138,7 +138,8 @@ public class TopicPoliciesTest extends 
MockedPulsarServiceBaseTest {
                         
.get(BacklogQuota.BacklogQuotaType.destination_storage), backlogQuota));
 
         BacklogQuotaManager backlogQuotaManager = 
pulsar.getBrokerService().getBacklogQuotaManager();
-        BacklogQuota backlogQuotaInManager = 
backlogQuotaManager.getBacklogQuota(TopicName.get(testTopic));
+        BacklogQuota backlogQuotaInManager = backlogQuotaManager
+                .getBacklogQuota(TopicName.get(testTopic), 
BacklogQuota.BacklogQuotaType.destination_storage);
         log.info("Backlog quota {} in backlog quota manager on topic: {}", 
backlogQuotaInManager, testTopic);
         Assert.assertEquals(backlogQuota, backlogQuotaInManager);
 
@@ -146,14 +147,37 @@ public class TopicPoliciesTest extends 
MockedPulsarServiceBaseTest {
     }
 
     @Test
-    public void testRemoveBacklogQuota() throws Exception {
+    public void testSetTimeBasedBacklogQuota() throws Exception {
+
+        BacklogQuota backlogQuota = BacklogQuota.builder()
+                .limitTime(1000)
+                
.retentionPolicy(BacklogQuota.RetentionPolicy.consumer_backlog_eviction)
+                .build();
+
+        admin.topics().setBacklogQuota(testTopic, backlogQuota, 
BacklogQuota.BacklogQuotaType.message_age);
+
+        Awaitility.await()
+                .untilAsserted(() -> 
Assert.assertEquals(admin.topics().getBacklogQuotaMap(testTopic)
+                        .get(BacklogQuota.BacklogQuotaType.message_age), 
backlogQuota));
+
+        BacklogQuotaManager backlogQuotaManager = 
pulsar.getBrokerService().getBacklogQuotaManager();
+        BacklogQuota backlogQuotaInManager = backlogQuotaManager
+                .getBacklogQuota(TopicName.get(testTopic), 
BacklogQuota.BacklogQuotaType.message_age);
+
+        Assert.assertEquals(backlogQuota, backlogQuotaInManager);
+
+        admin.topics().deletePartitionedTopic(testTopic, true);
+    }
+
+    @Test
+    public void testRemoveSizeBasedBacklogQuota() throws Exception {
         BacklogQuota backlogQuota = BacklogQuota.builder()
                 .limitSize(1024)
                 
.retentionPolicy(BacklogQuota.RetentionPolicy.consumer_backlog_eviction)
                 .build();
         log.info("Backlog quota: {} will set to the topic: {}", backlogQuota, 
testTopic);
 
-        admin.topics().setBacklogQuota(testTopic, backlogQuota);
+        admin.topics().setBacklogQuota(testTopic, backlogQuota, 
BacklogQuota.BacklogQuotaType.destination_storage);
         log.info("Backlog quota set success on topic: {}", testTopic);
 
         Awaitility.await()
@@ -161,16 +185,18 @@ public class TopicPoliciesTest extends 
MockedPulsarServiceBaseTest {
                         
.get(BacklogQuota.BacklogQuotaType.destination_storage), backlogQuota));
 
         BacklogQuotaManager backlogQuotaManager = 
pulsar.getBrokerService().getBacklogQuotaManager();
-        BacklogQuota backlogQuotaInManager = 
backlogQuotaManager.getBacklogQuota(TopicName.get(testTopic));
+        BacklogQuota backlogQuotaInManager = backlogQuotaManager
+                .getBacklogQuota(TopicName.get(testTopic), 
BacklogQuota.BacklogQuotaType.destination_storage);
         log.info("Backlog quota {} in backlog quota manager on topic: {}", 
backlogQuotaInManager, testTopic);
         Assert.assertEquals(backlogQuota, backlogQuotaInManager);
 
-        admin.topics().removeBacklogQuota(testTopic);
+        admin.topics().removeBacklogQuota(testTopic, 
BacklogQuota.BacklogQuotaType.destination_storage);
         Awaitility.await()
                 .untilAsserted(() -> 
Assert.assertNull(admin.topics().getBacklogQuotaMap(testTopic)
                         
.get(BacklogQuota.BacklogQuotaType.destination_storage)));
 
-        backlogQuotaInManager = 
backlogQuotaManager.getBacklogQuota(TopicName.get(testTopic));
+        backlogQuotaInManager = backlogQuotaManager
+                .getBacklogQuota(TopicName.get(testTopic), 
BacklogQuota.BacklogQuotaType.destination_storage);
         log.info("Backlog quota {} in backlog quota manager on topic: {} after 
remove", backlogQuotaInManager,
                 testTopic);
         Assert.assertEquals(backlogQuotaManager.getDefaultQuota(), 
backlogQuotaInManager);
@@ -179,7 +205,37 @@ public class TopicPoliciesTest extends 
MockedPulsarServiceBaseTest {
     }
 
     @Test
-    public void testCheckBacklogQuota() throws Exception {
+    public void testRemoveTimeBasedBacklogQuota() throws Exception {
+        BacklogQuota backlogQuota = BacklogQuota.builder()
+                .limitTime(1000)
+                
.retentionPolicy(BacklogQuota.RetentionPolicy.consumer_backlog_eviction)
+                .build();
+
+        admin.topics().setBacklogQuota(testTopic, backlogQuota, 
BacklogQuota.BacklogQuotaType.message_age);
+
+        Awaitility.await()
+                .untilAsserted(() -> 
Assert.assertEquals(admin.topics().getBacklogQuotaMap(testTopic)
+                        .get(BacklogQuota.BacklogQuotaType.message_age), 
backlogQuota));
+
+        BacklogQuotaManager backlogQuotaManager = 
pulsar.getBrokerService().getBacklogQuotaManager();
+        BacklogQuota backlogQuotaInManager = backlogQuotaManager
+                .getBacklogQuota(TopicName.get(testTopic), 
BacklogQuota.BacklogQuotaType.message_age);
+        Assert.assertEquals(backlogQuota, backlogQuotaInManager);
+
+        admin.topics().removeBacklogQuota(testTopic, 
BacklogQuota.BacklogQuotaType.message_age);
+        Awaitility.await()
+                .untilAsserted(() -> 
Assert.assertNull(admin.topics().getBacklogQuotaMap(testTopic)
+                        .get(BacklogQuota.BacklogQuotaType.message_age)));
+
+        backlogQuotaInManager = backlogQuotaManager
+                .getBacklogQuota(TopicName.get(testTopic), 
BacklogQuota.BacklogQuotaType.message_age);
+        Assert.assertEquals(backlogQuotaManager.getDefaultQuota(), 
backlogQuotaInManager);
+
+        admin.topics().deletePartitionedTopic(testTopic, true);
+    }
+
+    @Test
+    public void testCheckSizeBasedBacklogQuota() throws Exception {
         RetentionPolicies retentionPolicies = new RetentionPolicies(10, 10);
         String namespace = TopicName.get(testTopic).getNamespace();
         admin.namespaces().setRetention(namespace, retentionPolicies);
@@ -193,7 +249,7 @@ public class TopicPoliciesTest extends 
MockedPulsarServiceBaseTest {
                 .build();
         log.info("Backlog quota: {} will set to the topic: {}", backlogQuota, 
testTopic);
         try {
-            admin.topics().setBacklogQuota(testTopic, backlogQuota);
+            admin.topics().setBacklogQuota(testTopic, backlogQuota, 
BacklogQuota.BacklogQuotaType.destination_storage);
             Assert.fail();
         } catch (PulsarAdminException e) {
             Assert.assertEquals(e.getStatusCode(), 412);
@@ -205,7 +261,7 @@ public class TopicPoliciesTest extends 
MockedPulsarServiceBaseTest {
                 .build();
         log.info("Backlog quota: {} will set to the topic: {}", backlogQuota, 
testTopic);
         try {
-            admin.topics().setBacklogQuota(testTopic, backlogQuota);
+            admin.topics().setBacklogQuota(testTopic, backlogQuota, 
BacklogQuota.BacklogQuotaType.destination_storage);
             Assert.fail();
         } catch (PulsarAdminException e) {
             Assert.assertEquals(e.getStatusCode(), 412);
@@ -216,7 +272,7 @@ public class TopicPoliciesTest extends 
MockedPulsarServiceBaseTest {
                 
.retentionPolicy(BacklogQuota.RetentionPolicy.consumer_backlog_eviction)
                 .build();
         log.info("Backlog quota: {} will set to the topic: {}", backlogQuota, 
testTopic);
-        admin.topics().setBacklogQuota(testTopic, backlogQuota);
+        admin.topics().setBacklogQuota(testTopic, backlogQuota, 
BacklogQuota.BacklogQuotaType.destination_storage);
 
         BacklogQuota finalBacklogQuota = backlogQuota;
         Awaitility.await()
@@ -226,8 +282,56 @@ public class TopicPoliciesTest extends 
MockedPulsarServiceBaseTest {
         admin.topics().deletePartitionedTopic(testTopic, true);
     }
 
+    @Test
+    public void testCheckTimeBasedBacklogQuota() throws Exception {
+        RetentionPolicies retentionPolicies = new RetentionPolicies(10, 10);
+        String namespace = TopicName.get(testTopic).getNamespace();
+        admin.namespaces().setRetention(namespace, retentionPolicies);
+
+        Awaitility.await()
+                .untilAsserted(() -> 
Assert.assertEquals(admin.namespaces().getRetention(namespace), 
retentionPolicies));
+
+        BacklogQuota backlogQuota = BacklogQuota.builder()
+                .limitTime(10 * 60)
+                
.retentionPolicy(BacklogQuota.RetentionPolicy.consumer_backlog_eviction)
+                .build();
+        log.info("Backlog quota: {} will set to the topic: {}", backlogQuota, 
testTopic);
+        try {
+            admin.topics().setBacklogQuota(testTopic, backlogQuota, 
BacklogQuota.BacklogQuotaType.message_age);
+            Assert.fail();
+        } catch (PulsarAdminException e) {
+            Assert.assertEquals(e.getStatusCode(), 412);
+        }
+
+        backlogQuota = BacklogQuota.builder()
+                .limitTime(10 * 60 + 1)
+                
.retentionPolicy(BacklogQuota.RetentionPolicy.consumer_backlog_eviction)
+                .build();
+        log.info("Backlog quota: {} will set to the topic: {}", backlogQuota, 
testTopic);
+        try {
+            admin.topics().setBacklogQuota(testTopic, backlogQuota, 
BacklogQuota.BacklogQuotaType.message_age);
+            Assert.fail();
+        } catch (PulsarAdminException e) {
+            Assert.assertEquals(e.getStatusCode(), 412);
+        }
+
+        backlogQuota = BacklogQuota.builder()
+                .limitTime(10 * 60 - 1)
+                
.retentionPolicy(BacklogQuota.RetentionPolicy.consumer_backlog_eviction)
+                .build();
+        log.info("Backlog quota: {} will set to the topic: {}", backlogQuota, 
testTopic);
+        admin.topics().setBacklogQuota(testTopic, backlogQuota, 
BacklogQuota.BacklogQuotaType.message_age);
+
+        BacklogQuota finalBacklogQuota = backlogQuota;
+        Awaitility.await()
+                .untilAsserted(() -> 
Assert.assertEquals(admin.topics().getBacklogQuotaMap(testTopic)
+                        .get(BacklogQuota.BacklogQuotaType.message_age), 
finalBacklogQuota));
+
+        admin.topics().deletePartitionedTopic(testTopic, true);
+    }
+
     @Test(timeOut = 20000)
-    public void testGetBacklogQuotaApplied() throws Exception {
+    public void testGetSizeBasedBacklogQuotaApplied() throws Exception {
         final String topic = testTopic + UUID.randomUUID();
         pulsarClient.newProducer().topic(topic).create().close();
         assertEquals(admin.topics().getBacklogQuotaMap(topic), 
Maps.newHashMap());
@@ -236,6 +340,7 @@ public class TopicPoliciesTest extends 
MockedPulsarServiceBaseTest {
         assertEquals(admin.topics().getBacklogQuotaMap(topic, true), 
brokerQuotaMap);
         BacklogQuota namespaceQuota = BacklogQuota.builder()
                 .limitSize(30)
+                .limitTime(10)
                 
.retentionPolicy(BacklogQuota.RetentionPolicy.producer_exception)
                 .build();
 
@@ -243,23 +348,67 @@ public class TopicPoliciesTest extends 
MockedPulsarServiceBaseTest {
         Awaitility.await().untilAsserted(() -> 
assertFalse(admin.namespaces().getBacklogQuotaMap(myNamespace).isEmpty()));
         Map<BacklogQuota.BacklogQuotaType, BacklogQuota> namespaceQuotaMap = 
Maps.newHashMap();
         
namespaceQuotaMap.put(BacklogQuota.BacklogQuotaType.destination_storage, 
namespaceQuota);
+        namespaceQuotaMap.put(BacklogQuota.BacklogQuotaType.message_age, 
BacklogQuota.builder()
+                
.retentionPolicy(BacklogQuota.RetentionPolicy.producer_request_hold).build());
         assertEquals(admin.topics().getBacklogQuotaMap(topic, true), 
namespaceQuotaMap);
 
         BacklogQuota topicQuota = BacklogQuota.builder()
                 .limitSize(40)
                 
.retentionPolicy(BacklogQuota.RetentionPolicy.consumer_backlog_eviction)
                 .build();
-        admin.topics().setBacklogQuota(topic, topicQuota);
+        admin.topics().setBacklogQuota(topic, topicQuota, 
BacklogQuota.BacklogQuotaType.destination_storage);
         Awaitility.await().untilAsserted(() -> 
assertFalse(admin.topics().getBacklogQuotaMap(topic).isEmpty()));
         Map<BacklogQuota.BacklogQuotaType, BacklogQuota> topicQuotaMap = 
Maps.newHashMap();
         topicQuotaMap.put(BacklogQuota.BacklogQuotaType.destination_storage, 
topicQuota);
         assertEquals(admin.topics().getBacklogQuotaMap(topic, true), 
topicQuotaMap);
 
         admin.namespaces().removeBacklogQuota(myNamespace);
-        admin.topics().removeBacklogQuota(topic);
-        Awaitility.await().untilAsserted(() -> 
assertTrue(admin.namespaces().getBacklogQuotaMap(myNamespace).isEmpty()));
+        admin.topics().removeBacklogQuota(topic, 
BacklogQuota.BacklogQuotaType.destination_storage);
+        Awaitility.await().untilAsserted(() -> 
assertTrue(admin.namespaces().getBacklogQuotaMap(myNamespace)
+                .get(BacklogQuota.BacklogQuotaType.destination_storage) == 
null));
         Awaitility.await().untilAsserted(() -> 
assertTrue(admin.topics().getBacklogQuotaMap(topic).isEmpty()));
+        assertTrue(admin.topics().getBacklogQuotaMap(topic, true)
+                .get(BacklogQuota.BacklogQuotaType.destination_storage) == 
null);
+    }
+
+    @Test(timeOut = 20000)
+    public void testGetTimeBasedBacklogQuotaApplied() throws Exception {
+        final String topic = testTopic + UUID.randomUUID();
+        pulsarClient.newProducer().topic(topic).create().close();
+        assertEquals(admin.topics().getBacklogQuotaMap(topic), 
Maps.newHashMap());
+        assertEquals(admin.namespaces().getBacklogQuotaMap(myNamespace), 
Maps.newHashMap());
+        Map<BacklogQuota.BacklogQuotaType, BacklogQuota> brokerQuotaMap = 
ConfigHelper.backlogQuotaMap(conf);
         assertEquals(admin.topics().getBacklogQuotaMap(topic, true), 
brokerQuotaMap);
+        BacklogQuota namespaceQuota = BacklogQuota.builder()
+                .limitTime(30)
+                
.retentionPolicy(BacklogQuota.RetentionPolicy.producer_exception)
+                .build();
+
+        admin.namespaces().setBacklogQuota(myNamespace, namespaceQuota, 
BacklogQuota.BacklogQuotaType.message_age);
+        Awaitility.await().untilAsserted(() -> 
assertFalse(admin.namespaces().getBacklogQuotaMap(myNamespace).isEmpty()));
+        Map<BacklogQuota.BacklogQuotaType, BacklogQuota> namespaceQuotaMap = 
Maps.newHashMap();
+        namespaceQuotaMap.put(BacklogQuota.BacklogQuotaType.message_age, 
namespaceQuota);
+        
namespaceQuotaMap.put(BacklogQuota.BacklogQuotaType.destination_storage, 
BacklogQuota.builder()
+                
.retentionPolicy(BacklogQuota.RetentionPolicy.producer_request_hold).build());
+        assertEquals(admin.topics().getBacklogQuotaMap(topic, true), 
namespaceQuotaMap);
+
+        BacklogQuota topicQuota = BacklogQuota.builder()
+                .limitTime(40)
+                
.retentionPolicy(BacklogQuota.RetentionPolicy.consumer_backlog_eviction)
+                .build();
+        admin.topics().setBacklogQuota(topic, topicQuota, 
BacklogQuota.BacklogQuotaType.message_age);
+        Awaitility.await().untilAsserted(() -> 
assertFalse(admin.topics().getBacklogQuotaMap(topic).isEmpty()));
+        Map<BacklogQuota.BacklogQuotaType, BacklogQuota> topicQuotaMap = 
Maps.newHashMap();
+        topicQuotaMap.put(BacklogQuota.BacklogQuotaType.message_age, 
topicQuota);
+        assertEquals(admin.topics().getBacklogQuotaMap(topic, true), 
topicQuotaMap);
+
+        admin.namespaces().removeBacklogQuota(myNamespace, 
BacklogQuota.BacklogQuotaType.message_age);
+        admin.topics().removeBacklogQuota(topic, 
BacklogQuota.BacklogQuotaType.message_age);
+        Awaitility.await().untilAsserted(() -> 
assertTrue(admin.namespaces().getBacklogQuotaMap(myNamespace)
+                .get(BacklogQuota.BacklogQuotaType.message_age) == null));
+        Awaitility.await().untilAsserted(() -> 
assertTrue(admin.topics().getBacklogQuotaMap(topic).isEmpty()));
+        assertTrue(admin.topics().getBacklogQuotaMap(topic, true)
+                .get(BacklogQuota.BacklogQuotaType.message_age) == null);
     }
 
     @Test
@@ -276,7 +425,7 @@ public class TopicPoliciesTest extends 
MockedPulsarServiceBaseTest {
                 
.retentionPolicy(BacklogQuota.RetentionPolicy.consumer_backlog_eviction)
                 .build();
         try {
-            admin.topics().setBacklogQuota(testTopic, backlogQuota);
+            admin.topics().setBacklogQuota(testTopic, backlogQuota, 
BacklogQuota.BacklogQuotaType.destination_storage);
             Assert.fail();
         } catch (PulsarAdminException e) {
             Assert.assertEquals(e.getStatusCode(), 412);
@@ -288,13 +437,13 @@ public class TopicPoliciesTest extends 
MockedPulsarServiceBaseTest {
     }
 
     @Test
-    public void testCheckRetention() throws Exception {
+    public void testCheckRetentionSizeBasedQuota() throws Exception {
         BacklogQuota backlogQuota = BacklogQuota.builder()
                 .limitSize(10 * 1024 * 1024)
                 
.retentionPolicy(BacklogQuota.RetentionPolicy.consumer_backlog_eviction)
                 .build();
 
-        admin.topics().setBacklogQuota(testTopic, backlogQuota);
+        admin.topics().setBacklogQuota(testTopic, backlogQuota, 
BacklogQuota.BacklogQuotaType.destination_storage);
         Awaitility.await()
                 .untilAsserted(() -> 
Assert.assertEquals(admin.topics().getBacklogQuotaMap(testTopic)
                         
.get(BacklogQuota.BacklogQuotaType.destination_storage), backlogQuota));
@@ -329,6 +478,47 @@ public class TopicPoliciesTest extends 
MockedPulsarServiceBaseTest {
     }
 
     @Test
+    public void testCheckRetentionTimeBasedQuota() throws Exception {
+        BacklogQuota backlogQuota = BacklogQuota.builder()
+                .limitTime(10 * 60)
+                
.retentionPolicy(BacklogQuota.RetentionPolicy.consumer_backlog_eviction)
+                .build();
+
+        admin.topics().setBacklogQuota(testTopic, backlogQuota, 
BacklogQuota.BacklogQuotaType.message_age);
+        Awaitility.await()
+                .untilAsserted(() -> 
Assert.assertEquals(admin.topics().getBacklogQuotaMap(testTopic)
+                        .get(BacklogQuota.BacklogQuotaType.message_age), 
backlogQuota));
+
+        RetentionPolicies retention = new RetentionPolicies(10, 10);
+        log.info("Retention: {} will set to the topic: {}", retention, 
testTopic);
+        try {
+            admin.topics().setRetention(testTopic, retention);
+            Assert.fail();
+        } catch (PulsarAdminException e) {
+            Assert.assertEquals(e.getStatusCode(), 412);
+        }
+
+        retention = new RetentionPolicies(9, 10);
+        log.info("Retention: {} will set to the topic: {}", retention, 
testTopic);
+        try {
+            admin.topics().setRetention(testTopic, retention);
+            Assert.fail();
+        } catch (PulsarAdminException e) {
+            Assert.assertEquals(e.getStatusCode(), 412);
+        }
+
+        retention = new RetentionPolicies(12, 10);
+        log.info("Backlog quota: {} will set to the topic: {}", backlogQuota, 
testTopic);
+        admin.topics().setRetention(testTopic, retention);
+
+        RetentionPolicies finalRetention = retention;
+        Awaitility.await()
+                .untilAsserted(() -> 
Assert.assertEquals(admin.topics().getRetention(testTopic), finalRetention));
+
+        admin.topics().deletePartitionedTopic(testTopic, true);
+    }
+
+    @Test
     public void testSetRetention() throws Exception {
         RetentionPolicies retention = new RetentionPolicies(60, 1024);
         log.info("Retention: {} will set to the topic: {}", retention, 
testTopic);
@@ -2186,7 +2376,7 @@ public class TopicPoliciesTest extends 
MockedPulsarServiceBaseTest {
                 .build();
         log.info("Backlog quota: {} will set to the topic: {}", backlogQuota, 
testTopic);
 
-        admin.topics().setBacklogQuota(testTopic, backlogQuota);
+        admin.topics().setBacklogQuota(testTopic, backlogQuota, 
BacklogQuota.BacklogQuotaType.destination_storage);
         log.info("Backlog quota set success on topic: {}", testTopic);
 
         Awaitility.await()
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BacklogQuotaManagerTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BacklogQuotaManagerTest.java
index 552ba5a..560cd9e 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BacklogQuotaManagerTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BacklogQuotaManagerTest.java
@@ -398,10 +398,9 @@ public class BacklogQuotaManagerTest {
                 Maps.newHashMap());
         admin.namespaces().setBacklogQuota("prop/ns-quota",
                 BacklogQuota.builder()
-                        .limitSize(20 * 1024)
                         .limitTime(TIME_TO_CHECK_BACKLOG_QUOTA)
                         
.retentionPolicy(BacklogQuota.RetentionPolicy.consumer_backlog_eviction)
-                        .build());
+                        .build(), BacklogQuota.BacklogQuotaType.message_age);
         config.setPreciseTimeBasedBacklogQuotaCheck(true);
         PulsarClient client = 
PulsarClient.builder().serviceUrl(adminUrl.toString()).statsInterval(0, 
TimeUnit.SECONDS)
                 .build();
@@ -441,10 +440,9 @@ public class BacklogQuotaManagerTest {
                 Maps.newHashMap());
         admin.namespaces().setBacklogQuota("prop/ns-quota",
                 BacklogQuota.builder()
-                        .limitSize(20 * 1024)
                         .limitTime(TIME_TO_CHECK_BACKLOG_QUOTA)
                         
.retentionPolicy(BacklogQuota.RetentionPolicy.consumer_backlog_eviction)
-                        .build());
+                        .build(), BacklogQuota.BacklogQuotaType.message_age);
         PulsarClient client = 
PulsarClient.builder().serviceUrl(adminUrl.toString()).statsInterval(0, 
TimeUnit.SECONDS)
                 .build();
 
@@ -519,10 +517,9 @@ public class BacklogQuotaManagerTest {
                 Maps.newHashMap());
         admin.namespaces().setBacklogQuota("prop/ns-quota",
                 BacklogQuota.builder()
-                        .limitSize(10 * 1024)
                         .limitTime(TIME_TO_CHECK_BACKLOG_QUOTA)
                         
.retentionPolicy(BacklogQuota.RetentionPolicy.consumer_backlog_eviction)
-                        .build());
+                        .build(), BacklogQuota.BacklogQuotaType.message_age);
         config.setPreciseTimeBasedBacklogQuotaCheck(true);
         PulsarClient client = 
PulsarClient.builder().serviceUrl(adminUrl.toString()).build();
 
@@ -583,10 +580,9 @@ public class BacklogQuotaManagerTest {
                 Maps.newHashMap());
         admin.namespaces().setBacklogQuota("prop/ns-quota",
                 BacklogQuota.builder()
-                        .limitSize(20 * 1024)
                         .limitTime(2 * TIME_TO_CHECK_BACKLOG_QUOTA)
                         
.retentionPolicy(BacklogQuota.RetentionPolicy.consumer_backlog_eviction)
-                        .build());
+                        .build(), BacklogQuota.BacklogQuotaType.message_age);
         @Cleanup
         PulsarClient client = 
PulsarClient.builder().serviceUrl(adminUrl.toString()).build();
 
@@ -1090,10 +1086,9 @@ public class BacklogQuotaManagerTest {
                 Maps.newHashMap());
         admin.namespaces().setBacklogQuota("prop/quotahold",
                 BacklogQuota.builder()
-                        .limitSize(10 * 1024)
                         .limitTime(TIME_TO_CHECK_BACKLOG_QUOTA)
                         
.retentionPolicy(BacklogQuota.RetentionPolicy.producer_exception)
-                        .build());
+                        .build(), BacklogQuota.BacklogQuotaType.message_age);
         config.setPreciseTimeBasedBacklogQuotaCheck(true);
         final PulsarClient client = 
PulsarClient.builder().serviceUrl(adminUrl.toString())
                 .statsInterval(0, TimeUnit.SECONDS).build();
@@ -1157,10 +1152,9 @@ public class BacklogQuotaManagerTest {
                 Maps.newHashMap());
         admin.namespaces().setBacklogQuota("prop/quotahold",
                 BacklogQuota.builder()
-                        .limitSize(15 * 1024)
                         .limitTime(TIME_TO_CHECK_BACKLOG_QUOTA)
                         
.retentionPolicy(BacklogQuota.RetentionPolicy.producer_exception)
-                        .build());
+                        .build(), BacklogQuota.BacklogQuotaType.message_age);
         final PulsarClient client = 
PulsarClient.builder().serviceUrl(adminUrl.toString())
                 .statsInterval(0, TimeUnit.SECONDS).build();
         final String topic1 = "persistent://prop/quotahold/exceptandunblock2";
diff --git 
a/pulsar-client-admin-api/src/main/java/org/apache/pulsar/client/admin/Namespaces.java
 
b/pulsar-client-admin-api/src/main/java/org/apache/pulsar/client/admin/Namespaces.java
index 6e66d3b..f6f8654 100644
--- 
a/pulsar-client-admin-api/src/main/java/org/apache/pulsar/client/admin/Namespaces.java
+++ 
b/pulsar-client-admin-api/src/main/java/org/apache/pulsar/client/admin/Namespaces.java
@@ -1529,7 +1529,12 @@ public interface Namespaces {
      * @throws PulsarAdminException
      *             Unexpected error
      */
-    void setBacklogQuota(String namespace, BacklogQuota backlogQuota) throws 
PulsarAdminException;
+    void setBacklogQuota(String namespace, BacklogQuota backlogQuota, 
BacklogQuota.BacklogQuotaType backlogQuotaType)
+            throws PulsarAdminException;
+
+    default void setBacklogQuota(String namespace, BacklogQuota backlogQuota) 
throws PulsarAdminException {
+        setBacklogQuota(namespace, backlogQuota, 
BacklogQuota.BacklogQuotaType.destination_storage);
+    }
 
     /**
      * Set a backlog quota for all the topics on a namespace asynchronously.
@@ -1554,7 +1559,12 @@ public interface Namespaces {
      * @param backlogQuota
      *            the new BacklogQuota
      */
-    CompletableFuture<Void> setBacklogQuotaAsync(String namespace, 
BacklogQuota backlogQuota);
+    CompletableFuture<Void> setBacklogQuotaAsync(String namespace, 
BacklogQuota backlogQuota,
+                                                 BacklogQuota.BacklogQuotaType 
backlogQuotaType);
+
+    default CompletableFuture<Void> setBacklogQuotaAsync(String namespace, 
BacklogQuota backlogQuota) {
+        return setBacklogQuotaAsync(namespace, backlogQuota, 
BacklogQuota.BacklogQuotaType.destination_storage);
+    }
 
     /**
      * Remove a backlog quota policy from a namespace.
@@ -1573,7 +1583,12 @@ public interface Namespaces {
      * @throws PulsarAdminException
      *             Unexpected error
      */
-    void removeBacklogQuota(String namespace) throws PulsarAdminException;
+    void removeBacklogQuota(String namespace, BacklogQuota.BacklogQuotaType 
backlogQuotaType)
+            throws PulsarAdminException;
+
+    default void removeBacklogQuota(String namespace) throws 
PulsarAdminException {
+        removeBacklogQuota(namespace, 
BacklogQuota.BacklogQuotaType.destination_storage);
+    }
 
     /**
      * Remove a backlog quota policy from a namespace asynchronously.
@@ -1585,7 +1600,12 @@ public interface Namespaces {
      * @param namespace
      *            Namespace name
      */
-    CompletableFuture<Void> removeBacklogQuotaAsync(String namespace);
+    CompletableFuture<Void> removeBacklogQuotaAsync(String namespace, 
BacklogQuota.BacklogQuotaType backlogQuotaType);
+
+    default CompletableFuture<Void> removeBacklogQuotaAsync(String namespace) {
+        return removeBacklogQuotaAsync(namespace, 
BacklogQuota.BacklogQuotaType.destination_storage);
+    }
+
 
     /**
      * Remove the persistence configuration on a namespace.
diff --git 
a/pulsar-client-admin-api/src/main/java/org/apache/pulsar/client/admin/Topics.java
 
b/pulsar-client-admin-api/src/main/java/org/apache/pulsar/client/admin/Topics.java
index 9f7dc5e..c48a44b 100644
--- 
a/pulsar-client-admin-api/src/main/java/org/apache/pulsar/client/admin/Topics.java
+++ 
b/pulsar-client-admin-api/src/main/java/org/apache/pulsar/client/admin/Topics.java
@@ -1584,7 +1584,8 @@ public interface Topics {
      * @throws PulsarAdminException
      *             Unexpected error
      */
-    Map<BacklogQuota.BacklogQuotaType, BacklogQuota> getBacklogQuotaMap(String 
topic) throws PulsarAdminException;
+    Map<BacklogQuota.BacklogQuotaType, BacklogQuota> getBacklogQuotaMap(String 
topic)
+            throws PulsarAdminException;
 
     /**
      * Get applied backlog quota map for a topic.
@@ -1617,6 +1618,7 @@ public interface Topics {
      *            Topic name
      * @param backlogQuota
      *            the new BacklogQuota
+     * @param backlogQuotaType
      *
      * @throws NotAuthorizedException
      *             Don't have admin permission
@@ -1625,7 +1627,12 @@ public interface Topics {
      * @throws PulsarAdminException
      *             Unexpected error
      */
-    void setBacklogQuota(String topic, BacklogQuota backlogQuota) throws 
PulsarAdminException;
+    void setBacklogQuota(String topic, BacklogQuota backlogQuota,
+                         BacklogQuota.BacklogQuotaType backlogQuotaType) 
throws PulsarAdminException;
+
+    default void setBacklogQuota(String topic, BacklogQuota backlogQuota) 
throws PulsarAdminException {
+        setBacklogQuota(topic, backlogQuota, 
BacklogQuota.BacklogQuotaType.destination_storage);
+    }
 
     /**
      * Remove a backlog quota policy from a topic.
@@ -1633,6 +1640,7 @@ public interface Topics {
      *
      * @param topic
      *            Topic name
+     * @param backlogQuotaType
      *
      * @throws NotAuthorizedException
      *             Don't have admin permission
@@ -1641,7 +1649,12 @@ public interface Topics {
      * @throws PulsarAdminException
      *             Unexpected error
      */
-    void removeBacklogQuota(String topic) throws PulsarAdminException;
+    void removeBacklogQuota(String topic, BacklogQuota.BacklogQuotaType 
backlogQuotaType) throws PulsarAdminException;
+
+    default void removeBacklogQuota(String topic)
+            throws PulsarAdminException {
+        removeBacklogQuota(topic, 
BacklogQuota.BacklogQuotaType.destination_storage);
+    }
 
     /**
      * Get the delayed delivery policy applied for a specified topic.
diff --git 
a/pulsar-client-admin-api/src/main/java/org/apache/pulsar/common/policies/data/impl/BacklogQuotaImpl.java
 
b/pulsar-client-admin-api/src/main/java/org/apache/pulsar/common/policies/data/impl/BacklogQuotaImpl.java
index f60f4ca..79e0af2 100644
--- 
a/pulsar-client-admin-api/src/main/java/org/apache/pulsar/common/policies/data/impl/BacklogQuotaImpl.java
+++ 
b/pulsar-client-admin-api/src/main/java/org/apache/pulsar/common/policies/data/impl/BacklogQuotaImpl.java
@@ -27,6 +27,9 @@ import org.apache.pulsar.common.policies.data.BacklogQuota;
 @AllArgsConstructor
 @NoArgsConstructor
 public class BacklogQuotaImpl implements BacklogQuota {
+    public static final long BYTES_IN_GIGABYTE = 1024 * 1024 * 1024;
+
+    // backlog quota by size in byte
     private long limitSize;
     // backlog quota by time in second
     private int limitTime;
@@ -37,7 +40,7 @@ public class BacklogQuotaImpl implements BacklogQuota {
     }
 
     public static class BacklogQuotaImplBuilder implements 
BacklogQuota.Builder {
-        private long limitSize;
+        private long limitSize = -1 * BYTES_IN_GIGABYTE;
         private int limitTime = -1;
         private RetentionPolicy retentionPolicy;
 
diff --git 
a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/NamespacesImpl.java
 
b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/NamespacesImpl.java
index eaecfbe..af30e93 100644
--- 
a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/NamespacesImpl.java
+++ 
b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/NamespacesImpl.java
@@ -1154,9 +1154,11 @@ public class NamespacesImpl extends BaseResource 
implements Namespaces {
     }
 
     @Override
-    public void setBacklogQuota(String namespace, BacklogQuota backlogQuota) 
throws PulsarAdminException {
+    public void setBacklogQuota(String namespace, BacklogQuota backlogQuota,
+                                BacklogQuota.BacklogQuotaType 
backlogQuotaType) throws PulsarAdminException {
         try {
-            setBacklogQuotaAsync(namespace, 
backlogQuota).get(this.readTimeoutMs, TimeUnit.MILLISECONDS);
+            setBacklogQuotaAsync(namespace, backlogQuota, backlogQuotaType)
+                    .get(this.readTimeoutMs, TimeUnit.MILLISECONDS);
         } catch (ExecutionException e) {
             throw (PulsarAdminException) e.getCause();
         } catch (InterruptedException e) {
@@ -1168,16 +1170,19 @@ public class NamespacesImpl extends BaseResource 
implements Namespaces {
     }
 
     @Override
-    public CompletableFuture<Void> setBacklogQuotaAsync(String namespace, 
BacklogQuota backlogQuota) {
+    public CompletableFuture<Void> setBacklogQuotaAsync(String namespace, 
BacklogQuota backlogQuota,
+                                                        
BacklogQuota.BacklogQuotaType backlogQuotaType) {
         NamespaceName ns = NamespaceName.get(namespace);
         WebTarget path = namespacePath(ns, "backlogQuota");
-        return asyncPostRequest(path, Entity.entity(backlogQuota, 
MediaType.APPLICATION_JSON));
+        return asyncPostRequest(path.queryParam("backlogQuotaType", 
backlogQuotaType.toString()),
+                Entity.entity(backlogQuota, MediaType.APPLICATION_JSON));
     }
 
     @Override
-    public void removeBacklogQuota(String namespace) throws 
PulsarAdminException {
+    public void removeBacklogQuota(String namespace, 
BacklogQuota.BacklogQuotaType backlogQuotaType)
+            throws PulsarAdminException {
         try {
-            removeBacklogQuotaAsync(namespace).
+            removeBacklogQuotaAsync(namespace, backlogQuotaType).
                     get(this.readTimeoutMs, TimeUnit.MILLISECONDS);
         } catch (ExecutionException e) {
             throw (PulsarAdminException) e.getCause();
@@ -1212,10 +1217,11 @@ public class NamespacesImpl extends BaseResource 
implements Namespaces {
     }
 
     @Override
-    public CompletableFuture<Void> removeBacklogQuotaAsync(String namespace) {
+    public CompletableFuture<Void> removeBacklogQuotaAsync(String namespace,
+                                                           
BacklogQuota.BacklogQuotaType backlogQuotaType) {
         NamespaceName ns = NamespaceName.get(namespace);
         WebTarget path = namespacePath(ns, "backlogQuota")
-                .queryParam("backlogQuotaType", 
BacklogQuotaType.destination_storage.toString());
+                .queryParam("backlogQuotaType", backlogQuotaType.toString());
         return asyncDeleteRequest(path);
     }
 
diff --git 
a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/TopicsImpl.java
 
b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/TopicsImpl.java
index ff5a8f3..529bb23 100644
--- 
a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/TopicsImpl.java
+++ 
b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/TopicsImpl.java
@@ -1811,22 +1811,24 @@ public class TopicsImpl extends BaseResource implements 
Topics {
     }
 
     @Override
-    public void setBacklogQuota(String topic, BacklogQuota backlogQuota) 
throws PulsarAdminException {
+    public void setBacklogQuota(String topic, BacklogQuota backlogQuota,
+                                BacklogQuotaType backlogQuotaType) throws 
PulsarAdminException {
         try {
             TopicName tn = validateTopic(topic);
             WebTarget path = topicPath(tn, "backlogQuota");
-            request(path).post(Entity.entity(backlogQuota, 
MediaType.APPLICATION_JSON), ErrorData.class);
+            request(path.queryParam("backlogQuotaType", 
backlogQuotaType.toString()))
+                    .post(Entity.entity(backlogQuota, 
MediaType.APPLICATION_JSON), ErrorData.class);
         } catch (Exception e) {
             throw getApiException(e);
         }
     }
 
     @Override
-    public void removeBacklogQuota(String topic) throws PulsarAdminException {
+    public void removeBacklogQuota(String topic, BacklogQuotaType 
backlogQuotaType) throws PulsarAdminException {
         try {
             TopicName tn = validateTopic(topic);
             WebTarget path = topicPath(tn, "backlogQuota");
-            request(path.queryParam("backlogQuotaType", 
BacklogQuotaType.destination_storage.toString()))
+            request(path.queryParam("backlogQuotaType", 
backlogQuotaType.toString()))
                     .delete(ErrorData.class);
         } catch (Exception e) {
             throw getApiException(e);
diff --git 
a/pulsar-client-tools-test/src/test/java/org/apache/pulsar/admin/cli/PulsarAdminToolTest.java
 
b/pulsar-client-tools-test/src/test/java/org/apache/pulsar/admin/cli/PulsarAdminToolTest.java
index 5d27c7f..f83749c 100644
--- 
a/pulsar-client-tools-test/src/test/java/org/apache/pulsar/admin/cli/PulsarAdminToolTest.java
+++ 
b/pulsar-client-tools-test/src/test/java/org/apache/pulsar/admin/cli/PulsarAdminToolTest.java
@@ -418,7 +418,8 @@ public class PulsarAdminToolTest {
                 BacklogQuota.builder()
                         .limitSize(10)
                         .retentionPolicy(RetentionPolicy.producer_request_hold)
-                        .build());
+                        .build(),
+                        BacklogQuota.BacklogQuotaType.destination_storage);
 
         mockNamespaces = mock(Namespaces.class);
         when(admin.namespaces()).thenReturn(mockNamespaces);
@@ -429,7 +430,8 @@ public class PulsarAdminToolTest {
                 BacklogQuota.builder()
                         .limitSize(10 * 1024)
                         .retentionPolicy(RetentionPolicy.producer_exception)
-                        .build());
+                        .build(),
+                        BacklogQuota.BacklogQuotaType.destination_storage);
 
         mockNamespaces = mock(Namespaces.class);
         when(admin.namespaces()).thenReturn(mockNamespaces);
@@ -440,7 +442,8 @@ public class PulsarAdminToolTest {
                 BacklogQuota.builder()
                         .limitSize(10 * 1024 * 1024)
                         .retentionPolicy(RetentionPolicy.producer_exception)
-                        .build());
+                        .build(),
+                        BacklogQuota.BacklogQuotaType.destination_storage);
 
         mockNamespaces = mock(Namespaces.class);
         when(admin.namespaces()).thenReturn(mockNamespaces);
@@ -451,19 +454,21 @@ public class PulsarAdminToolTest {
                 BacklogQuota.builder()
                         .limitSize(10L * 1024 * 1024 * 1024)
                         .retentionPolicy(RetentionPolicy.producer_exception)
-                        .build());
+                        .build(),
+                        BacklogQuota.BacklogQuotaType.destination_storage);
 
         mockNamespaces = mock(Namespaces.class);
         when(admin.namespaces()).thenReturn(mockNamespaces);
         namespaces = new CmdNamespaces(() -> admin);
 
-        namespaces.run(split("set-backlog-quota myprop/clust/ns1 -p 
producer_exception -l 10G -lt 10000"));
+        namespaces.run(split("set-backlog-quota myprop/clust/ns1 -p 
producer_exception -l 10G -lt 10000 -t message_age"));
         verify(mockNamespaces).setBacklogQuota("myprop/clust/ns1",
                 BacklogQuota.builder()
                         .limitSize(10l * 1024 * 1024 * 1024)
                         .limitTime(10000)
                         .retentionPolicy(RetentionPolicy.producer_exception)
-                        .build());
+                        .build(),
+                        BacklogQuota.BacklogQuotaType.message_age);
 
         namespaces.run(split("set-persistence myprop/clust/ns1 -e 2 -w 1 -a 1 
-r 100.0"));
         verify(mockNamespaces).setPersistence("myprop/clust/ns1",
@@ -905,15 +910,31 @@ public class PulsarAdminToolTest {
 
         cmdTopics.run(split("get-backlog-quotas 
persistent://myprop/clust/ns1/ds1 -ap"));
         
verify(mockTopics).getBacklogQuotaMap("persistent://myprop/clust/ns1/ds1", 
true);
-        cmdTopics.run(split("set-backlog-quota 
persistent://myprop/clust/ns1/ds1 -l 10 -lt 1000 -p producer_request_hold"));
+        cmdTopics.run(split("set-backlog-quota 
persistent://myprop/clust/ns1/ds1 -l 10 -p producer_request_hold"));
         verify(mockTopics).setBacklogQuota("persistent://myprop/clust/ns1/ds1",
                 BacklogQuota.builder()
                         .limitSize(10)
+                        .retentionPolicy(RetentionPolicy.producer_request_hold)
+                        .build(),
+                        BacklogQuota.BacklogQuotaType.destination_storage);
+        //cmd with option cannot be executed repeatedly.
+        cmdTopics = new CmdTopics(() -> admin);
+        cmdTopics.run(split("set-backlog-quota 
persistent://myprop/clust/ns1/ds1 -lt 1000 -p producer_request_hold -t 
message_age"));
+        verify(mockTopics).setBacklogQuota("persistent://myprop/clust/ns1/ds1",
+                BacklogQuota.builder()
+                        .limitSize(-1)
                         .limitTime(1000)
                         .retentionPolicy(RetentionPolicy.producer_request_hold)
-                        .build());
+                        .build(),
+                        BacklogQuota.BacklogQuotaType.message_age);
+        //cmd with option cannot be executed repeatedly.
+        cmdTopics = new CmdTopics(() -> admin);
         cmdTopics.run(split("remove-backlog-quota 
persistent://myprop/clust/ns1/ds1"));
-        
verify(mockTopics).removeBacklogQuota("persistent://myprop/clust/ns1/ds1");
+        
verify(mockTopics).removeBacklogQuota("persistent://myprop/clust/ns1/ds1", 
BacklogQuota.BacklogQuotaType.destination_storage);
+        //cmd with option cannot be executed repeatedly.
+        cmdTopics = new CmdTopics(() -> admin);
+        cmdTopics.run(split("remove-backlog-quota 
persistent://myprop/clust/ns1/ds1 -t message_age"));
+        
verify(mockTopics).removeBacklogQuota("persistent://myprop/clust/ns1/ds1", 
BacklogQuota.BacklogQuotaType.message_age);
 
         cmdTopics.run(split("info-internal 
persistent://myprop/clust/ns1/ds1"));
         
verify(mockTopics).getInternalInfo("persistent://myprop/clust/ns1/ds1");
diff --git 
a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdNamespaces.java
 
b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdNamespaces.java
index a44022e..bac6a39 100644
--- 
a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdNamespaces.java
+++ 
b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdNamespaces.java
@@ -1095,6 +1095,9 @@ public class CmdNamespaces extends CmdBase {
                 + "Valid options are: [producer_request_hold, 
producer_exception, consumer_backlog_eviction]", required = true)
         private String policyStr;
 
+        @Parameter(names = {"-t", "--type"}, description = "Backlog quota type 
to set")
+        private String backlogQuotaType = 
BacklogQuota.BacklogQuotaType.destination_storage.name();
+
         @Override
         void run() throws PulsarAdminException {
             BacklogQuota.RetentionPolicy policy;
@@ -1114,7 +1117,8 @@ public class CmdNamespaces extends CmdBase {
                     BacklogQuota.builder().limitSize(limit)
                             .limitTime(limitTime)
                             .retentionPolicy(policy)
-                            .build());
+                            .build(),
+                            
BacklogQuota.BacklogQuotaType.valueOf(backlogQuotaType));
         }
     }
 
@@ -1123,10 +1127,13 @@ public class CmdNamespaces extends CmdBase {
         @Parameter(description = "tenant/namespace", required = true)
         private java.util.List<String> params;
 
+        @Parameter(names = {"-t", "--type"}, description = "Backlog quota type 
to remove")
+        private String backlogQuotaType = 
BacklogQuota.BacklogQuotaType.destination_storage.name();
+
         @Override
         void run() throws PulsarAdminException {
             String namespace = validateNamespace(params);
-            getAdmin().namespaces().removeBacklogQuota(namespace);
+            getAdmin().namespaces().removeBacklogQuota(namespace, 
BacklogQuota.BacklogQuotaType.valueOf(backlogQuotaType));
         }
     }
 
diff --git 
a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdTopics.java 
b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdTopics.java
index 0d0a757..cdd91b1 100644
--- 
a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdTopics.java
+++ 
b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdTopics.java
@@ -1120,8 +1120,8 @@ public class CmdTopics extends CmdBase {
         @Parameter(description = "persistent://tenant/namespace/topic", 
required = true)
         private java.util.List<String> params;
 
-        @Parameter(names = { "-l", "--limit" }, description = "Size limit (eg: 
10M, 16G)", required = true)
-        private String limitStr;
+        @Parameter(names = { "-l", "--limit" }, description = "Size limit (eg: 
10M, 16G)")
+        private String limitStr = "-1";
 
         @Parameter(names = { "-lt", "--limitTime" }, description = "Time limit 
in second, non-positive number for disabling time limit.")
         private int limitTime = -1;
@@ -1130,6 +1130,9 @@ public class CmdTopics extends CmdBase {
                 + "Valid options are: [producer_request_hold, 
producer_exception, consumer_backlog_eviction]", required = true)
         private String policyStr;
 
+        @Parameter(names = {"-t", "--type"}, description = "Backlog quota type 
to set")
+        private String backlogQuotaType = 
BacklogQuota.BacklogQuotaType.destination_storage.name();
+
         @Override
         void run() throws PulsarAdminException {
             BacklogQuota.RetentionPolicy policy;
@@ -1149,7 +1152,8 @@ public class CmdTopics extends CmdBase {
                     .limitSize(limit)
                     .limitTime(limitTime)
                     .retentionPolicy(policy)
-                    .build());
+                    .build(),
+                    BacklogQuota.BacklogQuotaType.valueOf(backlogQuotaType));
         }
     }
 
@@ -1159,10 +1163,13 @@ public class CmdTopics extends CmdBase {
         @Parameter(description = "persistent://tenant/namespace/topic", 
required = true)
         private java.util.List<String> params;
 
+        @Parameter(names = {"-t", "--type"}, description = "Backlog quota type 
to remove")
+        private String backlogQuotaType = 
BacklogQuota.BacklogQuotaType.destination_storage.name();
+
         @Override
         void run() throws PulsarAdminException {
             String persistentTopic = validatePersistentTopic(params);
-            getTopics().removeBacklogQuota(persistentTopic);
+            getTopics().removeBacklogQuota(persistentTopic, 
BacklogQuota.BacklogQuotaType.valueOf(backlogQuotaType));
         }
     }
 
diff --git a/site2/docs/cookbooks-retention-expiry.md 
b/site2/docs/cookbooks-retention-expiry.md
index 78635ba..94d2b83 100644
--- a/site2/docs/cookbooks-retention-expiry.md
+++ b/site2/docs/cookbooks-retention-expiry.md
@@ -179,17 +179,23 @@ You can set a size and/or time threshold and backlog 
retention policy for all of
 
 #### pulsar-admin
 
-Use the [`set-backlog-quota`](reference-pulsar-admin.md#namespaces) subcommand 
and specify a namespace, a size limit using the `-l`/`--limit` flag, and a 
retention policy using the `-p`/`--policy` flag.
+Use the [`set-backlog-quota`](reference-pulsar-admin.md#namespaces) subcommand 
and specify a namespace, a size limit using the `-l`/`--limit` , 
`-lt`/`--limitTime` flag to limit backlog, a retention policy using the 
`-p`/`--policy` flag and a policy type using `-t`/`--type` (default is 
destination_storage).
 
 ##### Example
 
 ```shell
 $ pulsar-admin namespaces set-backlog-quota my-tenant/my-ns \
   --limit 2G \
-  --limitTime 36000 \
   --policy producer_request_hold
 ```
 
+```shell
+$ pulsar-admin namespaces set-backlog-quota my-tenant/my-ns/my-topic \
+--limitTime 3600 \
+--policy producer_request_hold \
+--type message_age
+```
+
 #### REST API
 
 {@inject: 
endpoint|POST|/admin/v2/namespaces/:tenant/:namespace/backlogQuota|operation/getBacklogQuotaMap?version=[[pulsar:version_number]]}
@@ -236,7 +242,7 @@ Map<BacklogQuota.BacklogQuotaType,BacklogQuota> quotas =
 
 #### pulsar-admin
 
-Use the 
[`remove-backlog-quota`](reference-pulsar-admin.md#pulsar-admin-namespaces-remove-backlog-quota)
 subcommand and specify a namespace. Here's an example:
+Use the 
[`remove-backlog-quota`](reference-pulsar-admin.md#pulsar-admin-namespaces-remove-backlog-quota)
 subcommand and specify a namespace, use `t`/`--type` to specify backlog type 
to remove(default is destination_storage). Here's an example:
 
 ```shell
 $ pulsar-admin namespaces remove-backlog-quota my-tenant/my-ns
diff --git a/site2/docs/reference-pulsar-admin.md 
b/site2/docs/reference-pulsar-admin.md
index e15104a..0cba6c3 100644
--- a/site2/docs/reference-pulsar-admin.md
+++ b/site2/docs/reference-pulsar-admin.md
@@ -1120,7 +1120,9 @@ Options
 |Flag|Description|Default|
 |----|---|---|
 |`-l`, `--limit`|The backlog size limit (for example `10M` or `16G`)||
+|`-lt`, `--limitTime`|Time limit in second, non-positive number for disabling 
time limit. (for example 3600 for 1 hour)||
 |`-p`, `--policy`|The retention policy to enforce when the limit is reached. 
The valid options are: `producer_request_hold`, `producer_exception` or 
`consumer_backlog_eviction`|
+|`-t`, `--type`|Backlog quota type to set. The valid options are: 
`destination_storage`, `message_age` |destination_storage|
 
 Example
 ```bash
@@ -1129,9 +1131,20 @@ $ pulsar-admin namespaces set-backlog-quota 
my-tenant/my-ns \
 --policy producer_request_hold
 ```
 
+```bash
+$ pulsar-admin namespaces set-backlog-quota my-tenant/my-ns \
+--limitTime 3600 \
+--policy producer_request_hold \
+--type message_age
+```
+
 ### `remove-backlog-quota`
 Remove a backlog quota policy from a namespace
 
+|Flag|Description|Default|
+|---|---|---|
+|`-t`, `--type`|Backlog quota type to remove. The valid options are: 
`destination_storage`, `message_age` |destination_storage|
+
 Usage
 ```bash
 $ pulsar-admin namespaces remove-backlog-quota tenant/namespace
@@ -2347,14 +2360,39 @@ $ pulsar-admin topics get-backlog-quotas 
tenant/namespace/topic
 ### `set-backlog-quota`
 Set a backlog quota policy for a topic.
 
+|Flag|Description|Default|
+|----|---|---|
+|`-l`, `--limit`|The backlog size limit (for example `10M` or `16G`)||
+|`-lt`, `--limitTime`|Time limit in second, non-positive number for disabling 
time limit. (for example 3600 for 1 hour)||
+|`-p`, `--policy`|The retention policy to enforce when the limit is reached. 
The valid options are: `producer_request_hold`, `producer_exception` or 
`consumer_backlog_eviction`|
+|`-t`, `--type`|Backlog quota type to set. The valid options are: 
`destination_storage`, `message_age` |destination_storage|
+
 Usage
 ```bash
 $ pulsar-admin topics set-backlog-quota tenant/namespace/topic options
 ```
 
+Example
+```bash
+$ pulsar-admin namespaces set-backlog-quota my-tenant/my-ns/my-topic \
+--limit 2G \
+--policy producer_request_hold
+```
+
+```bash
+$ pulsar-admin namespaces set-backlog-quota my-tenant/my-ns/my-topic \
+--limitTime 3600 \
+--policy producer_request_hold \
+--type message_age
+```
+
 ### `remove-backlog-quota`
 Remove a backlog quota policy from a topic.
 
+|Flag|Description|Default|
+|---|---|---|
+|`-t`, `--type`|Backlog quota type to remove. The valid options are: 
`destination_storage`, `message_age` |destination_storage|
+
 Usage
 ```bash
 $ pulsar-admin topics remove-backlog-quota tenant/namespace/topic

Reply via email to