This is an automated email from the ASF dual-hosted git repository.

penghui pushed a commit to branch branch-2.8
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit e2774643d9e900244b2758b05be7496ec43c0e58
Author: Masahiro Sakamoto <[email protected]>
AuthorDate: Fri Jul 9 10:38:51 2021 +0900

    Allow null to be set as namespace level subscription TTL (#11253)
    
    ### Motivation
    
    If the subscription expiration time is set to a value greater than 0 at the 
broker level, setting 0 at the namespace level will not disable automatic 
subscription deletion.
    
    For example, suppose `subscriptionExpirationTimeMinutes=5` is written in 
`broker.conf`. This means that subscriptions to which no consumer is connected 
will be automatically deleted in 5 minutes.
    
    Now, suppose a user runs the following command to disable automatic 
subscription deletion in the namespace `tenant/ns`.
    
    ```sh
    $ ./bin/pulsar-admin namespaces set-subscription-expiration-time -t 0 
tenant/ns
    ```
    
    However, subscriptions in namespace `tenant/ns` will actually be deleted in 
5 minutes like any other namespace.
    
    ### Modifications
    
    Change the type of `subscription_expiration_time_minutes` in the namespace 
policies from `int` to `java.lang.Integer` so that it can be set to null.
    
    If `subscription_expiration_time_minutes` is 0, automatic subscription 
deletion is disabled. If it is null, the broker-level setting is used.
    
    This fix is similar to https://github.com/apache/pulsar/pull/8178.
    
    (cherry picked from commit 406ef7307bb12a8475e9764530ecf9fe2b19918d)
---
 .../pulsar/broker/admin/impl/NamespacesBase.java   |  4 ++--
 .../apache/pulsar/broker/admin/v1/Namespaces.java  | 12 ++++++++++-
 .../apache/pulsar/broker/admin/v2/Namespaces.java  | 13 ++++++++++-
 .../service/nonpersistent/NonPersistentTopic.java  |  5 ++---
 .../broker/service/persistent/PersistentTopic.java |  5 ++---
 .../apache/pulsar/broker/admin/AdminApiTest.java   | 15 +++++++++++--
 .../org/apache/pulsar/client/admin/Namespaces.java | 25 +++++++++++++++++++++-
 .../pulsar/common/policies/data/Policies.java      |  2 +-
 .../client/admin/internal/NamespacesImpl.java      | 23 +++++++++++++++++++-
 .../pulsar/admin/cli/PulsarAdminToolTest.java      |  3 +++
 .../org/apache/pulsar/admin/cli/CmdNamespaces.java | 13 +++++++++++
 11 files changed, 105 insertions(+), 15 deletions(-)

diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java
index ec9de23..19a228c 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java
@@ -780,11 +780,11 @@ public abstract class NamespacesBase extends 
AdminResource {
         });
     }
 
-    protected void internalSetSubscriptionExpirationTime(int expirationTime) {
+    protected void internalSetSubscriptionExpirationTime(Integer 
expirationTime) {
         validateNamespacePolicyOperation(namespaceName, 
PolicyName.SUBSCRIPTION_EXPIRATION_TIME, PolicyOperation.WRITE);
         validatePoliciesReadOnlyAccess();
 
-        if (expirationTime < 0) {
+        if (expirationTime != null && expirationTime < 0) {
             throw new RestException(Status.PRECONDITION_FAILED, "Invalid value 
for subscription expiration time");
         }
         updatePolicies(path(POLICIES, namespaceName.toString()), (policies) -> 
{
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/Namespaces.java 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/Namespaces.java
index 3e7067b..529b8ce 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/Namespaces.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/Namespaces.java
@@ -358,7 +358,7 @@ public class Namespaces extends NamespacesBase {
     @ApiOperation(hidden = true, value = "Get the subscription expiration time 
for the namespace")
     @ApiResponses(value = { @ApiResponse(code = 403, message = "Don't have 
admin permission"),
             @ApiResponse(code = 404, message = "Property or cluster or 
namespace doesn't exist") })
-    public int getSubscriptionExpirationTime(@PathParam("property") String 
property,
+    public Integer getSubscriptionExpirationTime(@PathParam("property") String 
property,
             @PathParam("cluster") String cluster, @PathParam("namespace") 
String namespace) {
         validateAdminAccessForTenant(property);
         validateNamespaceName(property, cluster, namespace);
@@ -379,6 +379,16 @@ public class Namespaces extends NamespacesBase {
         internalSetSubscriptionExpirationTime(expirationTime);
     }
 
+    @DELETE
+    @Path("/{property}/{cluster}/{namespace}/subscriptionExpirationTime")
+    @ApiOperation(hidden = true, value = "Remove subscription expiration time 
for namespace")
+    @ApiResponses(value = { @ApiResponse(code = 403, message = "Don't have 
admin permission"),
+            @ApiResponse(code = 404, message = "Property or cluster or 
namespace doesn't exist") })
+    public void removeSubscriptionExpirationTime(@PathParam("property") String 
property,
+            @PathParam("cluster") String cluster, @PathParam("namespace") 
String namespace) {
+        validateNamespaceName(property, cluster, namespace);
+        internalSetSubscriptionExpirationTime(null);
+    }
 
     @POST
     @Path("/{property}/{cluster}/{namespace}/antiAffinity")
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/Namespaces.java 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/Namespaces.java
index 4609377..9a1f684 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/Namespaces.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/Namespaces.java
@@ -314,7 +314,7 @@ public class Namespaces extends NamespacesBase {
     @ApiOperation(value = "Get the subscription expiration time for the 
namespace")
     @ApiResponses(value = { @ApiResponse(code = 403, message = "Don't have 
admin permission"),
             @ApiResponse(code = 404, message = "Tenant or cluster or namespace 
doesn't exist") })
-    public int getSubscriptionExpirationTime(@PathParam("tenant") String 
tenant,
+    public Integer getSubscriptionExpirationTime(@PathParam("tenant") String 
tenant,
             @PathParam("namespace") String namespace) {
         validateAdminAccessForTenant(tenant);
         validateNamespaceName(tenant, namespace);
@@ -338,6 +338,17 @@ public class Namespaces extends NamespacesBase {
         internalSetSubscriptionExpirationTime(expirationTime);
     }
 
+    @DELETE
+    @Path("/{tenant}/{namespace}/subscriptionExpirationTime")
+    @ApiOperation(value = "Remove subscription expiration time for namespace")
+    @ApiResponses(value = {@ApiResponse(code = 403, message = "Don't have 
admin permission"),
+            @ApiResponse(code = 404, message = "Tenant or cluster or namespace 
doesn't exist")})
+    public void removeSubscriptionExpirationTime(@PathParam("tenant") String 
tenant,
+                                                 @PathParam("namespace") 
String namespace) {
+        validateNamespaceName(tenant, namespace);
+        internalSetSubscriptionExpirationTime(null);
+    }
+
     @GET
     @Path("/{tenant}/{namespace}/deduplication")
     @ApiOperation(value = "Get broker side deduplication for all topics in a 
namespace")
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java
index 464cd8e..5807d00 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java
@@ -892,10 +892,9 @@ public class NonPersistentTopic extends AbstractTopic 
implements Topic {
                     .orElseThrow(KeeperException.NoNodeException::new);
             final int defaultExpirationTime = 
brokerService.pulsar().getConfiguration()
                     .getSubscriptionExpirationTimeMinutes();
+            final Integer nsExpirationTime = 
policies.subscription_expiration_time_minutes;
             final long expirationTimeMillis = TimeUnit.MINUTES
-                    .toMillis((policies.subscription_expiration_time_minutes 
<= 0 && defaultExpirationTime > 0)
-                            ? defaultExpirationTime
-                            : policies.subscription_expiration_time_minutes);
+                    .toMillis(nsExpirationTime == null ? defaultExpirationTime 
: nsExpirationTime);
             if (expirationTimeMillis > 0) {
                 subscriptions.forEach((subName, sub) -> {
                     if (sub.getDispatcher() != null
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
index 325d96d..ad781cd 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
@@ -2164,10 +2164,9 @@ public class PersistentTopic extends AbstractTopic
                     .orElseThrow(() -> new KeeperException.NoNodeException());
             final int defaultExpirationTime = 
brokerService.pulsar().getConfiguration()
                     .getSubscriptionExpirationTimeMinutes();
+            final Integer nsExpirationTime = 
policies.subscription_expiration_time_minutes;
             final long expirationTimeMillis = TimeUnit.MINUTES
-                    .toMillis((policies.subscription_expiration_time_minutes 
<= 0 && defaultExpirationTime > 0)
-                            ? defaultExpirationTime
-                            : policies.subscription_expiration_time_minutes);
+                    .toMillis(nsExpirationTime == null ? defaultExpirationTime 
: nsExpirationTime);
             if (expirationTimeMillis > 0) {
                 subscriptions.forEach((subName, sub) -> {
                     if (sub.dispatcher != null && 
sub.dispatcher.isConsumerConnected() || sub.isReplicated()) {
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiTest.java 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiTest.java
index b145e4d..74b2925 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiTest.java
@@ -2770,19 +2770,27 @@ public class AdminApiTest extends 
MockedPulsarServiceBaseTest {
     public void testSubscriptionExpiry() throws Exception {
         final String namespace1 = "prop-xyz/sub-gc1";
         final String namespace2 = "prop-xyz/sub-gc2";
+        final String namespace3 = "prop-xyz/sub-gc3";
         final String topic1 = "persistent://" + namespace1 + 
"/testSubscriptionExpiry";
         final String topic2 = "persistent://" + namespace2 + 
"/testSubscriptionExpiry";
+        final String topic3 = "persistent://" + namespace3 + 
"/testSubscriptionExpiry";
         final String sub = "sub1";
 
         admin.namespaces().createNamespace(namespace1, 
Sets.newHashSet("test"));
         admin.namespaces().createNamespace(namespace2, 
Sets.newHashSet("test"));
+        admin.namespaces().createNamespace(namespace3, 
Sets.newHashSet("test"));
         admin.topics().createSubscription(topic1, sub, MessageId.latest);
         admin.topics().createSubscription(topic2, sub, MessageId.latest);
+        admin.topics().createSubscription(topic3, sub, MessageId.latest);
         admin.namespaces().setSubscriptionExpirationTime(namespace1, 0);
         admin.namespaces().setSubscriptionExpirationTime(namespace2, 1);
+        admin.namespaces().setSubscriptionExpirationTime(namespace3, 1);
+        admin.namespaces().removeSubscriptionExpirationTime(namespace3);
+
+        Assert.assertEquals((int) 
admin.namespaces().getSubscriptionExpirationTime(namespace1), 0);
+        Assert.assertEquals((int) 
admin.namespaces().getSubscriptionExpirationTime(namespace2), 1);
+        
Assert.assertNull(admin.namespaces().getSubscriptionExpirationTime(namespace3));
 
-        
Assert.assertEquals(admin.namespaces().getSubscriptionExpirationTime(namespace1),
 0);
-        
Assert.assertEquals(admin.namespaces().getSubscriptionExpirationTime(namespace2),
 1);
         Thread.sleep(60000);
         for (int i = 0; i < 60; i++) {
             if (admin.topics().getSubscriptions(topic2).size() == 0) {
@@ -2792,11 +2800,14 @@ public class AdminApiTest extends 
MockedPulsarServiceBaseTest {
         }
         Assert.assertEquals(admin.topics().getSubscriptions(topic1).size(), 1);
         Assert.assertEquals(admin.topics().getSubscriptions(topic2).size(), 0);
+        Assert.assertEquals(admin.topics().getSubscriptions(topic3).size(), 1);
 
         admin.topics().delete(topic1);
         admin.topics().delete(topic2);
+        admin.topics().delete(topic3);
         admin.namespaces().deleteNamespace(namespace1);
         admin.namespaces().deleteNamespace(namespace2);
+        admin.namespaces().deleteNamespace(namespace3);
     }
 
     @Test
diff --git 
a/pulsar-client-admin-api/src/main/java/org/apache/pulsar/client/admin/Namespaces.java
 
b/pulsar-client-admin-api/src/main/java/org/apache/pulsar/client/admin/Namespaces.java
index 5564180..6e66d3b 100644
--- 
a/pulsar-client-admin-api/src/main/java/org/apache/pulsar/client/admin/Namespaces.java
+++ 
b/pulsar-client-admin-api/src/main/java/org/apache/pulsar/client/admin/Namespaces.java
@@ -910,7 +910,7 @@ public interface Namespaces {
      * @throws PulsarAdminException
      *             Unexpected error
      */
-    int getSubscriptionExpirationTime(String namespace) throws 
PulsarAdminException;
+    Integer getSubscriptionExpirationTime(String namespace) throws 
PulsarAdminException;
 
     /**
      * Get the subscription expiration time for a namespace asynchronously.
@@ -966,6 +966,29 @@ public interface Namespaces {
     CompletableFuture<Void> setSubscriptionExpirationTimeAsync(String 
namespace, int expirationTime);
 
     /**
+     * Remove the subscription expiration time for a namespace.
+     *
+     * @param namespace
+     *            Namespace name
+     *
+     * @throws NotAuthorizedException
+     *             Don't have admin permission
+     * @throws NotFoundException
+     *             Namespace does not exist
+     * @throws PulsarAdminException
+     *             Unexpected error
+     */
+    void removeSubscriptionExpirationTime(String namespace) throws 
PulsarAdminException;
+
+    /**
+     * Remove the subscription expiration time for a namespace asynchronously.
+     *
+     * @param namespace
+     *            Namespace name
+     */
+    CompletableFuture<Void> removeSubscriptionExpirationTimeAsync(String 
namespace);
+
+    /**
      * Set anti-affinity group name for a namespace.
      * <p/>
      * Request example:
diff --git 
a/pulsar-client-admin-api/src/main/java/org/apache/pulsar/common/policies/data/Policies.java
 
b/pulsar-client-admin-api/src/main/java/org/apache/pulsar/common/policies/data/Policies.java
index b2376f8..ff773f6 100644
--- 
a/pulsar-client-admin-api/src/main/java/org/apache/pulsar/common/policies/data/Policies.java
+++ 
b/pulsar-client-admin-api/src/main/java/org/apache/pulsar/common/policies/data/Policies.java
@@ -60,7 +60,7 @@ public class Policies {
     @SuppressWarnings("checkstyle:MemberName")
     public Integer message_ttl_in_seconds = null;
     @SuppressWarnings("checkstyle:MemberName")
-    public int subscription_expiration_time_minutes = 0;
+    public Integer subscription_expiration_time_minutes = null;
     @SuppressWarnings("checkstyle:MemberName")
     public RetentionPolicies retention_policies = null;
     public boolean deleted = false;
diff --git 
a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/NamespacesImpl.java
 
b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/NamespacesImpl.java
index ccf7890..eaecfbe 100644
--- 
a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/NamespacesImpl.java
+++ 
b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/NamespacesImpl.java
@@ -698,7 +698,7 @@ public class NamespacesImpl extends BaseResource implements 
Namespaces {
     }
 
     @Override
-    public int getSubscriptionExpirationTime(String namespace) throws 
PulsarAdminException {
+    public Integer getSubscriptionExpirationTime(String namespace) throws 
PulsarAdminException {
         try {
             return 
getSubscriptionExpirationTimeAsync(namespace).get(this.readTimeoutMs, 
TimeUnit.MILLISECONDS);
         } catch (ExecutionException e) {
@@ -754,6 +754,27 @@ public class NamespacesImpl extends BaseResource 
implements Namespaces {
     }
 
     @Override
+    public void removeSubscriptionExpirationTime(String namespace) throws 
PulsarAdminException {
+        try {
+            
removeSubscriptionExpirationTimeAsync(namespace).get(this.readTimeoutMs, 
TimeUnit.MILLISECONDS);
+        } catch (ExecutionException e) {
+            throw (PulsarAdminException) e.getCause();
+        } catch (InterruptedException e) {
+            Thread.currentThread().interrupt();
+            throw new PulsarAdminException(e);
+        } catch (TimeoutException e) {
+            throw new PulsarAdminException.TimeoutException(e);
+        }
+    }
+
+    @Override
+    public CompletableFuture<Void> 
removeSubscriptionExpirationTimeAsync(String namespace) {
+        NamespaceName ns = NamespaceName.get(namespace);
+        WebTarget path = namespacePath(ns, "subscriptionExpirationTime");
+        return asyncDeleteRequest(path);
+    }
+
+    @Override
     public void setNamespaceAntiAffinityGroup(String namespace, String 
namespaceAntiAffinityGroup)
             throws PulsarAdminException {
         try {
diff --git 
a/pulsar-client-tools-test/src/test/java/org/apache/pulsar/admin/cli/PulsarAdminToolTest.java
 
b/pulsar-client-tools-test/src/test/java/org/apache/pulsar/admin/cli/PulsarAdminToolTest.java
index 47e075f..acbda97 100644
--- 
a/pulsar-client-tools-test/src/test/java/org/apache/pulsar/admin/cli/PulsarAdminToolTest.java
+++ 
b/pulsar-client-tools-test/src/test/java/org/apache/pulsar/admin/cli/PulsarAdminToolTest.java
@@ -517,6 +517,9 @@ public class PulsarAdminToolTest {
         namespaces.run(split("get-subscription-expiration-time 
myprop/clust/ns1"));
         
verify(mockNamespaces).getSubscriptionExpirationTime("myprop/clust/ns1");
 
+        namespaces.run(split("remove-subscription-expiration-time 
myprop/clust/ns1"));
+        
verify(mockNamespaces).removeSubscriptionExpirationTime("myprop/clust/ns1");
+
         namespaces.run(split("set-anti-affinity-group myprop/clust/ns1 -g 
group"));
         
verify(mockNamespaces).setNamespaceAntiAffinityGroup("myprop/clust/ns1", 
"group");
 
diff --git 
a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdNamespaces.java
 
b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdNamespaces.java
index bae1ae21..a44022e 100644
--- 
a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdNamespaces.java
+++ 
b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdNamespaces.java
@@ -424,6 +424,18 @@ public class CmdNamespaces extends CmdBase {
         }
     }
 
+    @Parameters(commandDescription = "Remove subscription expiration time for 
a namespace")
+    private class RemoveSubscriptionExpirationTime extends CliCommand {
+        @Parameter(description = "tenant/namespace", required = true)
+        private java.util.List<String> params;
+
+        @Override
+        void run() throws PulsarAdminException {
+            String namespace = validateNamespace(params);
+            
getAdmin().namespaces().removeSubscriptionExpirationTime(namespace);
+        }
+    }
+
     @Parameters(commandDescription = "Set Anti-affinity group name for a 
namespace")
     private class SetAntiAffinityGroup extends CliCommand {
         @Parameter(description = "tenant/namespace", required = true)
@@ -2330,6 +2342,7 @@ public class CmdNamespaces extends CmdBase {
 
         jcommander.addCommand("get-subscription-expiration-time", new 
GetSubscriptionExpirationTime());
         jcommander.addCommand("set-subscription-expiration-time", new 
SetSubscriptionExpirationTime());
+        jcommander.addCommand("remove-subscription-expiration-time", new 
RemoveSubscriptionExpirationTime());
 
         jcommander.addCommand("get-anti-affinity-group", new 
GetAntiAffinityGroup());
         jcommander.addCommand("set-anti-affinity-group", new 
SetAntiAffinityGroup());

Reply via email to