This is an automated email from the ASF dual-hosted git repository.

zhaijia pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 549994b  Enable users to set subscription expiration time for each 
namespace (#6851)
549994b is described below

commit 549994b4e0a85ac203fe6944636abe5681281296
Author: Masahiro Sakamoto <[email protected]>
AuthorDate: Sat May 2 00:35:01 2020 +0900

    Enable users to set subscription expiration time for each namespace (#6851)
    
    ## Motivation
    
    We can automatically delete inactive subscriptions by setting 
`subscriptionExpirationTimeMinutes` in broker.conf to a value greater than 0.
    ```sh
    # How long to delete inactive subscriptions from last consuming
    # When it is 0, inactive subscriptions are not deleted automatically
    subscriptionExpirationTimeMinutes=0
    ```
    
    However, since this setting value applies to all topics, we have to set it 
to 0 if there is even one topic whose subscriptions should not be deleted.
    
    ### Modifications
    
    Enable users to set a subscription expiration time for each namespace. This 
value overrides `subscriptionExpirationTimeMinutes` in broker.conf.
    ```sh
    $ ./bin/pulsar-admin namespaces set-subscription-expiration-time --time 60 
tenant1/ns1
    $ ./bin/pulsar-admin namespaces get-subscription-expiration-time tenant1/ns1
    
    60
    ```
---
 .../apache/pulsar/broker/ServiceConfiguration.java |  4 +-
 .../pulsar/broker/admin/impl/NamespacesBase.java   | 39 ++++++++++++
 .../apache/pulsar/broker/admin/v1/Namespaces.java  | 27 ++++++++
 .../apache/pulsar/broker/admin/v2/Namespaces.java  | 26 ++++++++
 .../pulsar/broker/service/BrokerService.java       |  2 +-
 .../broker/service/persistent/PersistentTopic.java | 36 ++++++++---
 .../apache/pulsar/broker/admin/AdminApiTest.java   | 35 ++++++++++
 .../org/apache/pulsar/client/admin/Namespaces.java | 74 ++++++++++++++++++++++
 .../client/admin/internal/NamespacesImpl.java      | 56 ++++++++++++++++
 .../pulsar/admin/cli/PulsarAdminToolTest.java      |  6 ++
 .../org/apache/pulsar/admin/cli/CmdNamespaces.java | 30 +++++++++
 .../pulsar/common/policies/data/Policies.java      |  9 ++-
 12 files changed, 330 insertions(+), 14 deletions(-)

diff --git 
a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
 
b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
index a50611a..b9d6dcb 100644
--- 
a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
+++ 
b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
@@ -307,7 +307,7 @@ public class ServiceConfiguration implements 
PulsarConfiguration {
         doc = "How long to delete inactive subscriptions from last consuming."
             + " When it is 0, inactive subscriptions are not deleted 
automatically"
     )
-    private long subscriptionExpirationTimeMinutes = 0;
+    private int subscriptionExpirationTimeMinutes = 0;
     @FieldContext(
             category = CATEGORY_POLICIES,
             dynamic = true,
@@ -319,7 +319,7 @@ public class ServiceConfiguration implements 
PulsarConfiguration {
         category = CATEGORY_POLICIES,
         doc = "How frequently to proactively check and purge expired 
subscription"
     )
-    private long subscriptionExpiryCheckIntervalInMinutes = 5;
+    private int subscriptionExpiryCheckIntervalInMinutes = 5;
 
     @FieldContext(
         category = CATEGORY_POLICIES,
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java
index 5de3915..b0504ee 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java
@@ -562,6 +562,45 @@ public abstract class NamespacesBase extends AdminResource 
{
         }
     }
 
+    protected void internalSetSubscriptionExpirationTime(int expirationTime) {
+        validateAdminAccessForTenant(namespaceName.getTenant());
+        validatePoliciesReadOnlyAccess();
+
+        if (expirationTime < 0) {
+            throw new RestException(Status.PRECONDITION_FAILED, "Invalid value 
for subscription expiration time");
+        }
+
+        Entry<Policies, Stat> policiesNode = null;
+
+        try {
+            // Force to read the data s.t. the watch to the cache content is 
setup.
+            policiesNode = policiesCache().getWithStat(path(POLICIES, 
namespaceName.toString())).orElseThrow(
+                    () -> new RestException(Status.NOT_FOUND, "Namespace " + 
namespaceName + " does not exist"));
+            policiesNode.getKey().subscription_expiration_time_minutes = 
expirationTime;
+
+            // Write back the new policies into zookeeper
+            globalZk().setData(path(POLICIES, namespaceName.toString()),
+                    jsonMapper().writeValueAsBytes(policiesNode.getKey()), 
policiesNode.getValue().getVersion());
+            policiesCache().invalidate(path(POLICIES, 
namespaceName.toString()));
+
+            log.info("[{}] Successfully updated the subscription expiration 
time on namespace {}", clientAppId(),
+                    namespaceName);
+        } catch (KeeperException.NoNodeException e) {
+            log.warn("[{}] Failed to update the subscription expiration time 
for namespace {}: does not exist",
+                    clientAppId(), namespaceName);
+            throw new RestException(Status.NOT_FOUND, "Namespace does not 
exist");
+        } catch (KeeperException.BadVersionException e) {
+            log.warn(
+                    "[{}] Failed to update the subscription expiration time on 
namespace {} expected policy node version={} : concurrent modification",
+                    clientAppId(), namespaceName, 
policiesNode.getValue().getVersion());
+            throw new RestException(Status.CONFLICT, "Concurrent 
modification");
+        } catch (Exception e) {
+            log.error("[{}] Failed to update the subscription expiration time 
on namespace {}", clientAppId(),
+                    namespaceName, e);
+            throw new RestException(e);
+        }
+    }
+
     protected void internalSetAutoTopicCreation(AsyncResponse asyncResponse, 
AutoTopicCreationOverride autoTopicCreationOverride) {
         final int maxPartitions = 
pulsar().getConfig().getMaxNumPartitionsPerPartitionedTopic();
         validateAdminAccessForTenant(namespaceName.getTenant());
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/Namespaces.java 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/Namespaces.java
index 957a23a..700381d 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/Namespaces.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/Namespaces.java
@@ -339,6 +339,33 @@ public class Namespaces extends NamespacesBase {
         internalSetNamespaceMessageTTL(messageTTL);
     }
 
+    @GET
+    @Path("/{property}/{cluster}/{namespace}/subscriptionExpirationTime")
+    @ApiOperation(hidden = true, value = "Get the subscription expiration time 
for the namespace")
+    @ApiResponses(value = { @ApiResponse(code = 403, message = "Don't have 
admin permission"),
+            @ApiResponse(code = 404, message = "Property or cluster or 
namespace doesn't exist") })
+    public int getSubscriptionExpirationTime(@PathParam("property") String 
property,
+            @PathParam("cluster") String cluster, @PathParam("namespace") 
String namespace) {
+        validateAdminAccessForTenant(property);
+        validateNamespaceName(property, cluster, namespace);
+
+        Policies policies = getNamespacePolicies(namespaceName);
+        return policies.subscription_expiration_time_minutes;
+    }
+
+    @POST
+    @Path("/{property}/{cluster}/{namespace}/subscriptionExpirationTime")
+    @ApiOperation(hidden = true, value = "Set subscription expiration time in 
minutes for namespace")
+    @ApiResponses(value = { @ApiResponse(code = 403, message = "Don't have 
admin permission"),
+            @ApiResponse(code = 404, message = "Property or cluster or 
namespace doesn't exist"),
+            @ApiResponse(code = 412, message = "Invalid expiration time") })
+    public void setSubscriptionExpirationTime(@PathParam("property") String 
property,
+            @PathParam("cluster") String cluster, @PathParam("namespace") 
String namespace, int expirationTime) {
+        validateNamespaceName(property, cluster, namespace);
+        internalSetSubscriptionExpirationTime(expirationTime);
+    }
+
+
     @POST
     @Path("/{property}/{cluster}/{namespace}/antiAffinity")
     @ApiOperation(value = "Set anti-affinity group for a namespace")
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/Namespaces.java 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/Namespaces.java
index dd2c57e..95cf65b 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/Namespaces.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/Namespaces.java
@@ -290,6 +290,32 @@ public class Namespaces extends NamespacesBase {
         internalSetNamespaceMessageTTL(messageTTL);
     }
 
+    @GET
+    @Path("/{tenant}/{namespace}/subscriptionExpirationTime")
+    @ApiOperation(value = "Get the subscription expiration time for the 
namespace")
+    @ApiResponses(value = { @ApiResponse(code = 403, message = "Don't have 
admin permission"),
+            @ApiResponse(code = 404, message = "Tenant or cluster or namespace 
doesn't exist") })
+    public int getSubscriptionExpirationTime(@PathParam("tenant") String 
tenant,
+            @PathParam("namespace") String namespace) {
+        validateAdminAccessForTenant(tenant);
+        validateNamespaceName(tenant, namespace);
+
+        Policies policies = getNamespacePolicies(namespaceName);
+        return policies.subscription_expiration_time_minutes;
+    }
+
+    @POST
+    @Path("/{tenant}/{namespace}/subscriptionExpirationTime")
+    @ApiOperation(value = "Set subscription expiration time in minutes for 
namespace")
+    @ApiResponses(value = { @ApiResponse(code = 403, message = "Don't have 
admin permission"),
+            @ApiResponse(code = 404, message = "Tenant or cluster or namespace 
doesn't exist"),
+            @ApiResponse(code = 412, message = "Invalid expiration time") })
+    public void setSubscriptionExpirationTime(@PathParam("tenant") String 
tenant,
+            @PathParam("namespace") String namespace, int expirationTime) {
+        validateNamespaceName(tenant, namespace);
+        internalSetSubscriptionExpirationTime(expirationTime);
+    }
+
     @POST
     @Path("/{tenant}/{namespace}/deduplication")
     @ApiOperation(value = "Enable or disable broker side deduplication for all 
topics in a namespace")
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
index 4c935d1..5d28fed 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
@@ -439,7 +439,7 @@ public class BrokerService implements Closeable, 
ZooKeeperCacheListener<Policies
                 duplicationCheckerIntervalInSeconds, TimeUnit.SECONDS);
 
         // Inactive subscriber checker
-        if (pulsar().getConfiguration().getSubscriptionExpirationTimeMinutes() 
> 0) {
+        if 
(pulsar().getConfiguration().getSubscriptionExpiryCheckIntervalInMinutes() > 0) 
{
             long subscriptionExpiryCheckIntervalInSeconds =
                     
TimeUnit.MINUTES.toSeconds(pulsar().getConfiguration().getSubscriptionExpiryCheckIntervalInMinutes());
             
inactivityMonitor.scheduleAtFixedRate(safeRun(this::checkInactiveSubscriptions),
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
index 8086209..436f1cc 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
@@ -1694,15 +1694,33 @@ public class PersistentTopic extends AbstractTopic 
implements Topic, AddEntryCal
 
     @Override
     public void checkInactiveSubscriptions() {
-        final long expirationTime = 
TimeUnit.MINUTES.toMillis(brokerService.pulsar().getConfiguration().getSubscriptionExpirationTimeMinutes());
-        if (expirationTime <= 0) return;
-        subscriptions.forEach((subName, sub) -> {
-            if (sub.dispatcher != null && 
sub.dispatcher.isConsumerConnected()) return;
-            if (System.currentTimeMillis() - sub.cursor.getLastActive() > 
expirationTime) {
-                sub.delete().thenAccept(
-                        v -> log.info("[{}][{}] The subscription was deleted 
due to expiration", topic, subName));
+        TopicName name = TopicName.get(topic);
+        try {
+            Policies policies = 
brokerService.pulsar().getConfigurationCache().policiesCache()
+                    .get(AdminResource.path(POLICIES, name.getNamespace()))
+                    .orElseThrow(() -> new KeeperException.NoNodeException());
+            final int defaultExpirationTime = 
brokerService.pulsar().getConfiguration()
+                    .getSubscriptionExpirationTimeMinutes();
+            final long expirationTimeMillis = TimeUnit.MINUTES
+                    .toMillis((policies.subscription_expiration_time_minutes 
<= 0 && defaultExpirationTime > 0)
+                            ? defaultExpirationTime
+                            : policies.subscription_expiration_time_minutes);
+            if (expirationTimeMillis > 0) {
+                subscriptions.forEach((subName, sub) -> {
+                    if (sub.dispatcher != null && 
sub.dispatcher.isConsumerConnected()) {
+                        return;
+                    }
+                    if (System.currentTimeMillis() - 
sub.cursor.getLastActive() > expirationTimeMillis) {
+                        sub.delete().thenAccept(v -> log.info("[{}][{}] The 
subscription was deleted due to expiration",
+                                topic, subName));
+                    }
+                });
             }
-        });
+        } catch (Exception e) {
+            if (log.isDebugEnabled()) {
+                log.debug("[{}] Error getting policies", topic);
+            }
+        }
     }
 
     @Override
@@ -2115,4 +2133,4 @@ public class PersistentTopic extends AbstractTopic 
implements Topic, AddEntryCal
     public CompactedTopic getCompactedTopic() {
         return compactedTopic;
     }
-}
\ No newline at end of file
+}
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiTest.java 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiTest.java
index 7f138dd..cf04cc2 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiTest.java
@@ -159,6 +159,8 @@ public class AdminApiTest extends 
MockedPulsarServiceBaseTest {
         conf.setTlsCertificateFilePath(TLS_SERVER_CERT_FILE_PATH);
         conf.setTlsKeyFilePath(TLS_SERVER_KEY_FILE_PATH);
         conf.setMessageExpiryCheckIntervalInMinutes(1);
+        conf.setSubscriptionExpiryCheckIntervalInMinutes(1);
+        conf.setBrokerDeleteInactiveTopicsEnabled(false);
 
         super.internalSetup();
 
@@ -2425,6 +2427,39 @@ public class AdminApiTest extends 
MockedPulsarServiceBaseTest {
         
Assert.assertTrue(admin.topics().getStats(topic).subscriptions.values().iterator().next().lastExpireTimestamp
 > 0L);
     }
 
+    @Test(timeOut = 150000)
+    public void testSubscriptionExpiry() throws Exception {
+        final String namespace1 = "prop-xyz/sub-gc1";
+        final String namespace2 = "prop-xyz/sub-gc2";
+        final String topic1 = "persistent://" + namespace1 + 
"/testSubscriptionExpiry";
+        final String topic2 = "persistent://" + namespace2 + 
"/testSubscriptionExpiry";
+        final String sub = "sub1";
+
+        admin.namespaces().createNamespace(namespace1, 
Sets.newHashSet("test"));
+        admin.namespaces().createNamespace(namespace2, 
Sets.newHashSet("test"));
+        admin.topics().createSubscription(topic1, sub, MessageId.latest);
+        admin.topics().createSubscription(topic2, sub, MessageId.latest);
+        admin.namespaces().setSubscriptionExpirationTime(namespace1, 0);
+        admin.namespaces().setSubscriptionExpirationTime(namespace2, 1);
+
+        
Assert.assertEquals(admin.namespaces().getSubscriptionExpirationTime(namespace1),
 0);
+        
Assert.assertEquals(admin.namespaces().getSubscriptionExpirationTime(namespace2),
 1);
+        Thread.sleep(60000);
+        for (int i = 0; i < 60; i++) {
+            if (admin.topics().getSubscriptions(topic2).size() == 0) {
+                break;
+            }
+            Thread.sleep(1000);
+        }
+        Assert.assertEquals(admin.topics().getSubscriptions(topic1).size(), 1);
+        Assert.assertEquals(admin.topics().getSubscriptions(topic2).size(), 0);
+
+        admin.topics().delete(topic1);
+        admin.topics().delete(topic2);
+        admin.namespaces().deleteNamespace(namespace1);
+        admin.namespaces().deleteNamespace(namespace2);
+    }
+
     @Test
     public void testCreateAndDeleteNamespaceWithBundles() throws Exception {
         admin.clusters().createCluster("usw", new ClusterData());
diff --git 
a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/Namespaces.java
 
b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/Namespaces.java
index e397600..7cb7349 100644
--- 
a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/Namespaces.java
+++ 
b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/Namespaces.java
@@ -776,6 +776,80 @@ public interface Namespaces {
     CompletableFuture<Void> setNamespaceMessageTTLAsync(String namespace, int 
ttlInSeconds);
 
     /**
+     * Get the subscription expiration time for a namespace.
+     * <p/>
+     * Response example:
+     *
+     * <pre>
+     * <code>1440</code>
+     * </pre>
+     *
+     * @param namespace
+     *            Namespace name
+     *
+     * @throws NotAuthorizedException
+     *             Don't have admin permission
+     * @throws NotFoundException
+     *             Namespace does not exist
+     * @throws PulsarAdminException
+     *             Unexpected error
+     */
+    int getSubscriptionExpirationTime(String namespace) throws 
PulsarAdminException;
+
+    /**
+     * Get the subscription expiration time for a namespace asynchronously.
+     * <p/>
+     * Response example:
+     *
+     * <pre>
+     * <code>1440</code>
+     * </pre>
+     *
+     * @param namespace
+     *            Namespace name
+     */
+    CompletableFuture<Integer> getSubscriptionExpirationTimeAsync(String 
namespace);
+
+    /**
+     * Set the subscription expiration time in minutes for all the topics 
within a namespace.
+     * <p/>
+     * Request example:
+     *
+     * <pre>
+     * <code>1440</code>
+     * </pre>
+     *
+     * @param namespace
+     *            Namespace name
+     * @param expirationTime
+     *            Expiration time values for all subscriptions for all topics 
in this namespace
+     *
+     * @throws NotAuthorizedException
+     *             Don't have admin permission
+     * @throws NotFoundException
+     *             Namespace does not exist
+     * @throws PulsarAdminException
+     *             Unexpected error
+     */
+    void setSubscriptionExpirationTime(String namespace, int expirationTime) 
throws PulsarAdminException;
+
+    /**
+     * Set the subscription expiration time in minutes for all the topics 
within a namespace asynchronously.
+     * <p/>
+     * Request example:
+     *
+     * <pre>
+     * <code>1440</code>
+     * </pre>
+     *
+     * @param namespace
+     *            Namespace name
+     * @param expirationTime
+     *            Expiration time values for all subscriptions for all topics 
in this namespace
+     */
+    CompletableFuture<Void> setSubscriptionExpirationTimeAsync(String 
namespace, int expirationTime);
+
+    /**
      * Set anti-affinity group name for a namespace.
      * <p/>
      * Request example:
diff --git 
a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/NamespacesImpl.java
 
b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/NamespacesImpl.java
index a013adf..1a9f010 100644
--- 
a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/NamespacesImpl.java
+++ 
b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/NamespacesImpl.java
@@ -600,6 +600,62 @@ public class NamespacesImpl extends BaseResource 
implements Namespaces {
     }
 
     @Override
+    public int getSubscriptionExpirationTime(String namespace) throws 
PulsarAdminException {
+        try {
+            return 
getSubscriptionExpirationTimeAsync(namespace).get(this.readTimeoutMs, 
TimeUnit.MILLISECONDS);
+        } catch (ExecutionException e) {
+            throw (PulsarAdminException) e.getCause();
+        } catch (InterruptedException e) {
+            Thread.currentThread().interrupt();
+            throw new PulsarAdminException(e);
+        } catch (TimeoutException e) {
+            throw new PulsarAdminException.TimeoutException(e);
+        }
+    }
+
+    @Override
+    public CompletableFuture<Integer> 
getSubscriptionExpirationTimeAsync(String namespace) {
+        NamespaceName ns = NamespaceName.get(namespace);
+        WebTarget path = namespacePath(ns, "subscriptionExpirationTime");
+        final CompletableFuture<Integer> future = new CompletableFuture<>();
+        asyncGetRequest(path, new InvocationCallback<Integer>() {
+            @Override
+            public void completed(Integer expirationTime) {
+                future.complete(expirationTime);
+            }
+
+            @Override
+            public void failed(Throwable throwable) {
+                
future.completeExceptionally(getApiException(throwable.getCause()));
+            }
+        });
+        return future;
+    }
+
+    @Override
+    public void setSubscriptionExpirationTime(String namespace, int 
expirationTime)
+            throws PulsarAdminException {
+        try {
+            setSubscriptionExpirationTimeAsync(namespace, 
expirationTime).get(this.readTimeoutMs,
+                    TimeUnit.MILLISECONDS);
+        } catch (ExecutionException e) {
+            throw (PulsarAdminException) e.getCause();
+        } catch (InterruptedException e) {
+            Thread.currentThread().interrupt();
+            throw new PulsarAdminException(e);
+        } catch (TimeoutException e) {
+            throw new PulsarAdminException.TimeoutException(e);
+        }
+    }
+
+    @Override
+    public CompletableFuture<Void> setSubscriptionExpirationTimeAsync(String 
namespace, int expirationTime) {
+        NamespaceName ns = NamespaceName.get(namespace);
+        WebTarget path = namespacePath(ns, "subscriptionExpirationTime");
+        return asyncPostRequest(path, Entity.entity(expirationTime, 
MediaType.APPLICATION_JSON));
+    }
+
+    @Override
     public void setNamespaceAntiAffinityGroup(String namespace, String 
namespaceAntiAffinityGroup)
             throws PulsarAdminException {
         try {
diff --git 
a/pulsar-client-tools-test/src/test/java/org/apache/pulsar/admin/cli/PulsarAdminToolTest.java
 
b/pulsar-client-tools-test/src/test/java/org/apache/pulsar/admin/cli/PulsarAdminToolTest.java
index a68f7bf..7b5d01d 100644
--- 
a/pulsar-client-tools-test/src/test/java/org/apache/pulsar/admin/cli/PulsarAdminToolTest.java
+++ 
b/pulsar-client-tools-test/src/test/java/org/apache/pulsar/admin/cli/PulsarAdminToolTest.java
@@ -356,6 +356,9 @@ public class PulsarAdminToolTest {
         namespaces.run(split("set-message-ttl myprop/clust/ns1 -ttl 300"));
         verify(mockNamespaces).setNamespaceMessageTTL("myprop/clust/ns1", 300);
 
+        namespaces.run(split("set-subscription-expiration-time 
myprop/clust/ns1 -t 60"));
+        
verify(mockNamespaces).setSubscriptionExpirationTime("myprop/clust/ns1", 60);
+
         namespaces.run(split("set-deduplication myprop/clust/ns1 --enable"));
         verify(mockNamespaces).setDeduplicationStatus("myprop/clust/ns1", 
true);
 
@@ -376,6 +379,9 @@ public class PulsarAdminToolTest {
         namespaces.run(split("get-message-ttl myprop/clust/ns1"));
         verify(mockNamespaces).getNamespaceMessageTTL("myprop/clust/ns1");
 
+        namespaces.run(split("get-subscription-expiration-time 
myprop/clust/ns1"));
+        
verify(mockNamespaces).getSubscriptionExpirationTime("myprop/clust/ns1");
+
         namespaces.run(split("set-anti-affinity-group myprop/clust/ns1 -g 
group"));
         
verify(mockNamespaces).setNamespaceAntiAffinityGroup("myprop/clust/ns1", 
"group");
 
diff --git 
a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdNamespaces.java
 
b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdNamespaces.java
index e90e856..78d034f 100644
--- 
a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdNamespaces.java
+++ 
b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdNamespaces.java
@@ -300,6 +300,21 @@ public class CmdNamespaces extends CmdBase {
         }
     }
 
+    @Parameters(commandDescription = "Set subscription expiration time for a 
namespace")
+    private class SetSubscriptionExpirationTime extends CliCommand {
+        @Parameter(description = "tenant/namespace", required = true)
+        private java.util.List<String> params;
+
+        @Parameter(names = { "-t", "--time" }, description = "Subscription 
expiration time in minutes", required = true)
+        private int expirationTime;
+
+        @Override
+        void run() throws PulsarAdminException {
+            String namespace = validateNamespace(params);
+            admin.namespaces().setSubscriptionExpirationTime(namespace, 
expirationTime);
+        }
+    }
+
     @Parameters(commandDescription = "Set Anti-affinity group name for a 
namespace")
     private class SetAntiAffinityGroup extends CliCommand {
         @Parameter(description = "tenant/namespace", required = true)
@@ -568,6 +583,18 @@ public class CmdNamespaces extends CmdBase {
         }
     }
 
+    @Parameters(commandDescription = "Get subscription expiration time for a 
namespace")
+    private class GetSubscriptionExpirationTime extends CliCommand {
+        @Parameter(description = "tenant/namespace\n", required = true)
+        private java.util.List<String> params;
+
+        @Override
+        void run() throws PulsarAdminException {
+            String namespace = validateNamespace(params);
+            print(admin.namespaces().getSubscriptionExpirationTime(namespace));
+        }
+    }
+
     @Parameters(commandDescription = "Unload a namespace from the current 
serving broker")
     private class Unload extends CliCommand {
         @Parameter(description = "tenant/namespace\n", required = true)
@@ -1617,6 +1644,9 @@ public class CmdNamespaces extends CmdBase {
         jcommander.addCommand("get-message-ttl", new GetMessageTTL());
         jcommander.addCommand("set-message-ttl", new SetMessageTTL());
 
+        jcommander.addCommand("get-subscription-expiration-time", new 
GetSubscriptionExpirationTime());
+        jcommander.addCommand("set-subscription-expiration-time", new 
SetSubscriptionExpirationTime());
+
         jcommander.addCommand("get-anti-affinity-group", new 
GetAntiAffinityGroup());
         jcommander.addCommand("set-anti-affinity-group", new 
SetAntiAffinityGroup());
         jcommander.addCommand("get-anti-affinity-namespaces", new 
GetAntiAffinityNamespaces());
diff --git 
a/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/Policies.java
 
b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/Policies.java
index 4e7a8a0..d54e0c7 100644
--- 
a/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/Policies.java
+++ 
b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/Policies.java
@@ -60,6 +60,8 @@ public class Policies {
     @SuppressWarnings("checkstyle:MemberName")
     public int message_ttl_in_seconds = 0;
     @SuppressWarnings("checkstyle:MemberName")
+    public int subscription_expiration_time_minutes = 0;
+    @SuppressWarnings("checkstyle:MemberName")
     public RetentionPolicies retention_policies = null;
     public boolean deleted = false;
     public String antiAffinityGroup;
@@ -117,7 +119,7 @@ public class Policies {
                 clusterSubscribeRate, deduplicationEnabled, 
autoTopicCreationOverride,
                 autoSubscriptionCreationOverride, persistence,
                 bundles, latency_stats_sample_rate,
-                message_ttl_in_seconds, retention_policies,
+                message_ttl_in_seconds, subscription_expiration_time_minutes, 
retention_policies,
                 encryption_required, delayed_delivery_policies,
                 subscription_auth_mode,
                 antiAffinityGroup, max_producers_per_topic,
@@ -152,6 +154,7 @@ public class Policies {
                     && Objects.equals(latency_stats_sample_rate, 
other.latency_stats_sample_rate)
                     && Objects.equals(message_ttl_in_seconds,
                             other.message_ttl_in_seconds)
+                    && Objects.equals(subscription_expiration_time_minutes, 
other.subscription_expiration_time_minutes)
                     && Objects.equals(retention_policies, 
other.retention_policies)
                     && Objects.equals(encryption_required, 
other.encryption_required)
                     && Objects.equals(delayed_delivery_policies, 
other.delayed_delivery_policies)
@@ -207,7 +210,9 @@ public class Policies {
                 .add("publishMaxMessageRate", publishMaxMessageRate)
                 .add("latency_stats_sample_rate", latency_stats_sample_rate)
                 .add("antiAffinityGroup", antiAffinityGroup)
-                .add("message_ttl_in_seconds", 
message_ttl_in_seconds).add("retention_policies", retention_policies)
+                .add("message_ttl_in_seconds", message_ttl_in_seconds)
+                .add("subscription_expiration_time_minutes", 
subscription_expiration_time_minutes)
+                .add("retention_policies", retention_policies)
                 .add("deleted", deleted)
                 .add("encryption_required", encryption_required)
                 .add("delayed_delivery_policies", delayed_delivery_policies)

Reply via email to