This is an automated email from the ASF dual-hosted git repository. penghui pushed a commit to branch branch-2.8 in repository https://gitbox.apache.org/repos/asf/pulsar.git
commit e2774643d9e900244b2758b05be7496ec43c0e58 Author: Masahiro Sakamoto <[email protected]> AuthorDate: Fri Jul 9 10:38:51 2021 +0900 Allow null to be set as namespace level subscription TTL (#11253) ### Motivation If the subscription expiration time is set to a value greater than 0 at the broker level, setting 0 at the namespace level will not disable automatic subscription deletion. For example, suppose `subscriptionExpirationTimeMinutes=5` is written in `broker.conf`. This means that subscriptions to which no consumer is connected will be automatically deleted in 5 minutes. Now, suppose a user runs the following command to disable automatic subscription deletion in the namespace `tenant/ns`. ```sh $ ./bin/pulsar-admin namespaces set-subscription-expiration-time -t 0 tenant/ns ``` However, subscriptions in namespace `tenant/ns` will actually be deleted in 5 minutes like any other namespace. ### Modifications Change the type of `subscription_expiration_time_minutes` in the namespace policies from `int` to `java.lang.Integer` so that it can be set to null. If `subscription_expiration_time_minutes` is 0, automatic subscription deletion is disabled. If it is null, the broker-level setting is used. This fix is similar to https://github.com/apache/pulsar/pull/8178. (cherry picked from commit 406ef7307bb12a8475e9764530ecf9fe2b19918d) --- .../pulsar/broker/admin/impl/NamespacesBase.java | 4 ++-- .../apache/pulsar/broker/admin/v1/Namespaces.java | 12 ++++++++++- .../apache/pulsar/broker/admin/v2/Namespaces.java | 13 ++++++++++- .../service/nonpersistent/NonPersistentTopic.java | 5 ++--- .../broker/service/persistent/PersistentTopic.java | 5 ++--- .../apache/pulsar/broker/admin/AdminApiTest.java | 15 +++++++++++-- .../org/apache/pulsar/client/admin/Namespaces.java | 25 +++++++++++++++++++++- .../pulsar/common/policies/data/Policies.java | 2 +- .../client/admin/internal/NamespacesImpl.java | 23 +++++++++++++++++++- .../pulsar/admin/cli/PulsarAdminToolTest.java | 3 +++ .../org/apache/pulsar/admin/cli/CmdNamespaces.java | 13 +++++++++++ 11 files changed, 105 insertions(+), 15 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java index ec9de23..19a228c 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java @@ -780,11 +780,11 @@ public abstract class NamespacesBase extends AdminResource { }); } - protected void internalSetSubscriptionExpirationTime(int expirationTime) { + protected void internalSetSubscriptionExpirationTime(Integer expirationTime) { validateNamespacePolicyOperation(namespaceName, PolicyName.SUBSCRIPTION_EXPIRATION_TIME, PolicyOperation.WRITE); validatePoliciesReadOnlyAccess(); - if (expirationTime < 0) { + if (expirationTime != null && expirationTime < 0) { throw new RestException(Status.PRECONDITION_FAILED, "Invalid value for subscription expiration time"); } updatePolicies(path(POLICIES, namespaceName.toString()), (policies) -> { diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/Namespaces.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/Namespaces.java index 3e7067b..529b8ce 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/Namespaces.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/Namespaces.java @@ -358,7 +358,7 @@ public class Namespaces extends NamespacesBase { @ApiOperation(hidden = true, value = "Get the subscription expiration time for the namespace") @ApiResponses(value = { @ApiResponse(code = 403, message = "Don't have admin permission"), @ApiResponse(code = 404, message = "Property or cluster or namespace doesn't exist") }) - public int getSubscriptionExpirationTime(@PathParam("property") String property, + public Integer getSubscriptionExpirationTime(@PathParam("property") String property, @PathParam("cluster") String cluster, @PathParam("namespace") String namespace) { validateAdminAccessForTenant(property); validateNamespaceName(property, cluster, namespace); @@ -379,6 +379,16 @@ public class Namespaces extends NamespacesBase { internalSetSubscriptionExpirationTime(expirationTime); } + @DELETE + @Path("/{property}/{cluster}/{namespace}/subscriptionExpirationTime") + @ApiOperation(hidden = true, value = "Remove subscription expiration time for namespace") + @ApiResponses(value = { @ApiResponse(code = 403, message = "Don't have admin permission"), + @ApiResponse(code = 404, message = "Property or cluster or namespace doesn't exist") }) + public void removeSubscriptionExpirationTime(@PathParam("property") String property, + @PathParam("cluster") String cluster, @PathParam("namespace") String namespace) { + validateNamespaceName(property, cluster, namespace); + internalSetSubscriptionExpirationTime(null); + } @POST @Path("/{property}/{cluster}/{namespace}/antiAffinity") diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/Namespaces.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/Namespaces.java index 4609377..9a1f684 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/Namespaces.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/Namespaces.java @@ -314,7 +314,7 @@ public class Namespaces extends NamespacesBase { @ApiOperation(value = "Get the subscription expiration time for the namespace") @ApiResponses(value = { @ApiResponse(code = 403, message = "Don't have admin permission"), @ApiResponse(code = 404, message = "Tenant or cluster or namespace doesn't exist") }) - public int getSubscriptionExpirationTime(@PathParam("tenant") String tenant, + public Integer getSubscriptionExpirationTime(@PathParam("tenant") String tenant, @PathParam("namespace") String namespace) { validateAdminAccessForTenant(tenant); validateNamespaceName(tenant, namespace); @@ -338,6 +338,17 @@ public class Namespaces extends NamespacesBase { internalSetSubscriptionExpirationTime(expirationTime); } + @DELETE + @Path("/{tenant}/{namespace}/subscriptionExpirationTime") + @ApiOperation(value = "Remove subscription expiration time for namespace") + @ApiResponses(value = {@ApiResponse(code = 403, message = "Don't have admin permission"), + @ApiResponse(code = 404, message = "Tenant or cluster or namespace doesn't exist")}) + public void removeSubscriptionExpirationTime(@PathParam("tenant") String tenant, + @PathParam("namespace") String namespace) { + validateNamespaceName(tenant, namespace); + internalSetSubscriptionExpirationTime(null); + } + @GET @Path("/{tenant}/{namespace}/deduplication") @ApiOperation(value = "Get broker side deduplication for all topics in a namespace") diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java index 464cd8e..5807d00 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java @@ -892,10 +892,9 @@ public class NonPersistentTopic extends AbstractTopic implements Topic { .orElseThrow(KeeperException.NoNodeException::new); final int defaultExpirationTime = brokerService.pulsar().getConfiguration() .getSubscriptionExpirationTimeMinutes(); + final Integer nsExpirationTime = policies.subscription_expiration_time_minutes; final long expirationTimeMillis = TimeUnit.MINUTES - .toMillis((policies.subscription_expiration_time_minutes <= 0 && defaultExpirationTime > 0) - ? defaultExpirationTime - : policies.subscription_expiration_time_minutes); + .toMillis(nsExpirationTime == null ? defaultExpirationTime : nsExpirationTime); if (expirationTimeMillis > 0) { subscriptions.forEach((subName, sub) -> { if (sub.getDispatcher() != null diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java index 325d96d..ad781cd 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java @@ -2164,10 +2164,9 @@ public class PersistentTopic extends AbstractTopic .orElseThrow(() -> new KeeperException.NoNodeException()); final int defaultExpirationTime = brokerService.pulsar().getConfiguration() .getSubscriptionExpirationTimeMinutes(); + final Integer nsExpirationTime = policies.subscription_expiration_time_minutes; final long expirationTimeMillis = TimeUnit.MINUTES - .toMillis((policies.subscription_expiration_time_minutes <= 0 && defaultExpirationTime > 0) - ? defaultExpirationTime - : policies.subscription_expiration_time_minutes); + .toMillis(nsExpirationTime == null ? defaultExpirationTime : nsExpirationTime); if (expirationTimeMillis > 0) { subscriptions.forEach((subName, sub) -> { if (sub.dispatcher != null && sub.dispatcher.isConsumerConnected() || sub.isReplicated()) { diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiTest.java index b145e4d..74b2925 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiTest.java @@ -2770,19 +2770,27 @@ public class AdminApiTest extends MockedPulsarServiceBaseTest { public void testSubscriptionExpiry() throws Exception { final String namespace1 = "prop-xyz/sub-gc1"; final String namespace2 = "prop-xyz/sub-gc2"; + final String namespace3 = "prop-xyz/sub-gc3"; final String topic1 = "persistent://" + namespace1 + "/testSubscriptionExpiry"; final String topic2 = "persistent://" + namespace2 + "/testSubscriptionExpiry"; + final String topic3 = "persistent://" + namespace3 + "/testSubscriptionExpiry"; final String sub = "sub1"; admin.namespaces().createNamespace(namespace1, Sets.newHashSet("test")); admin.namespaces().createNamespace(namespace2, Sets.newHashSet("test")); + admin.namespaces().createNamespace(namespace3, Sets.newHashSet("test")); admin.topics().createSubscription(topic1, sub, MessageId.latest); admin.topics().createSubscription(topic2, sub, MessageId.latest); + admin.topics().createSubscription(topic3, sub, MessageId.latest); admin.namespaces().setSubscriptionExpirationTime(namespace1, 0); admin.namespaces().setSubscriptionExpirationTime(namespace2, 1); + admin.namespaces().setSubscriptionExpirationTime(namespace3, 1); + admin.namespaces().removeSubscriptionExpirationTime(namespace3); + + Assert.assertEquals((int) admin.namespaces().getSubscriptionExpirationTime(namespace1), 0); + Assert.assertEquals((int) admin.namespaces().getSubscriptionExpirationTime(namespace2), 1); + Assert.assertNull(admin.namespaces().getSubscriptionExpirationTime(namespace3)); - Assert.assertEquals(admin.namespaces().getSubscriptionExpirationTime(namespace1), 0); - Assert.assertEquals(admin.namespaces().getSubscriptionExpirationTime(namespace2), 1); Thread.sleep(60000); for (int i = 0; i < 60; i++) { if (admin.topics().getSubscriptions(topic2).size() == 0) { @@ -2792,11 +2800,14 @@ public class AdminApiTest extends MockedPulsarServiceBaseTest { } Assert.assertEquals(admin.topics().getSubscriptions(topic1).size(), 1); Assert.assertEquals(admin.topics().getSubscriptions(topic2).size(), 0); + Assert.assertEquals(admin.topics().getSubscriptions(topic3).size(), 1); admin.topics().delete(topic1); admin.topics().delete(topic2); + admin.topics().delete(topic3); admin.namespaces().deleteNamespace(namespace1); admin.namespaces().deleteNamespace(namespace2); + admin.namespaces().deleteNamespace(namespace3); } @Test diff --git a/pulsar-client-admin-api/src/main/java/org/apache/pulsar/client/admin/Namespaces.java b/pulsar-client-admin-api/src/main/java/org/apache/pulsar/client/admin/Namespaces.java index 5564180..6e66d3b 100644 --- a/pulsar-client-admin-api/src/main/java/org/apache/pulsar/client/admin/Namespaces.java +++ b/pulsar-client-admin-api/src/main/java/org/apache/pulsar/client/admin/Namespaces.java @@ -910,7 +910,7 @@ public interface Namespaces { * @throws PulsarAdminException * Unexpected error */ - int getSubscriptionExpirationTime(String namespace) throws PulsarAdminException; + Integer getSubscriptionExpirationTime(String namespace) throws PulsarAdminException; /** * Get the subscription expiration time for a namespace asynchronously. @@ -966,6 +966,29 @@ public interface Namespaces { CompletableFuture<Void> setSubscriptionExpirationTimeAsync(String namespace, int expirationTime); /** + * Remove the subscription expiration time for a namespace. + * + * @param namespace + * Namespace name + * + * @throws NotAuthorizedException + * Don't have admin permission + * @throws NotFoundException + * Namespace does not exist + * @throws PulsarAdminException + * Unexpected error + */ + void removeSubscriptionExpirationTime(String namespace) throws PulsarAdminException; + + /** + * Remove the subscription expiration time for a namespace asynchronously. + * + * @param namespace + * Namespace name + */ + CompletableFuture<Void> removeSubscriptionExpirationTimeAsync(String namespace); + + /** * Set anti-affinity group name for a namespace. * <p/> * Request example: diff --git a/pulsar-client-admin-api/src/main/java/org/apache/pulsar/common/policies/data/Policies.java b/pulsar-client-admin-api/src/main/java/org/apache/pulsar/common/policies/data/Policies.java index b2376f8..ff773f6 100644 --- a/pulsar-client-admin-api/src/main/java/org/apache/pulsar/common/policies/data/Policies.java +++ b/pulsar-client-admin-api/src/main/java/org/apache/pulsar/common/policies/data/Policies.java @@ -60,7 +60,7 @@ public class Policies { @SuppressWarnings("checkstyle:MemberName") public Integer message_ttl_in_seconds = null; @SuppressWarnings("checkstyle:MemberName") - public int subscription_expiration_time_minutes = 0; + public Integer subscription_expiration_time_minutes = null; @SuppressWarnings("checkstyle:MemberName") public RetentionPolicies retention_policies = null; public boolean deleted = false; diff --git a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/NamespacesImpl.java b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/NamespacesImpl.java index ccf7890..eaecfbe 100644 --- a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/NamespacesImpl.java +++ b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/NamespacesImpl.java @@ -698,7 +698,7 @@ public class NamespacesImpl extends BaseResource implements Namespaces { } @Override - public int getSubscriptionExpirationTime(String namespace) throws PulsarAdminException { + public Integer getSubscriptionExpirationTime(String namespace) throws PulsarAdminException { try { return getSubscriptionExpirationTimeAsync(namespace).get(this.readTimeoutMs, TimeUnit.MILLISECONDS); } catch (ExecutionException e) { @@ -754,6 +754,27 @@ public class NamespacesImpl extends BaseResource implements Namespaces { } @Override + public void removeSubscriptionExpirationTime(String namespace) throws PulsarAdminException { + try { + removeSubscriptionExpirationTimeAsync(namespace).get(this.readTimeoutMs, TimeUnit.MILLISECONDS); + } catch (ExecutionException e) { + throw (PulsarAdminException) e.getCause(); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw new PulsarAdminException(e); + } catch (TimeoutException e) { + throw new PulsarAdminException.TimeoutException(e); + } + } + + @Override + public CompletableFuture<Void> removeSubscriptionExpirationTimeAsync(String namespace) { + NamespaceName ns = NamespaceName.get(namespace); + WebTarget path = namespacePath(ns, "subscriptionExpirationTime"); + return asyncDeleteRequest(path); + } + + @Override public void setNamespaceAntiAffinityGroup(String namespace, String namespaceAntiAffinityGroup) throws PulsarAdminException { try { diff --git a/pulsar-client-tools-test/src/test/java/org/apache/pulsar/admin/cli/PulsarAdminToolTest.java b/pulsar-client-tools-test/src/test/java/org/apache/pulsar/admin/cli/PulsarAdminToolTest.java index 47e075f..acbda97 100644 --- a/pulsar-client-tools-test/src/test/java/org/apache/pulsar/admin/cli/PulsarAdminToolTest.java +++ b/pulsar-client-tools-test/src/test/java/org/apache/pulsar/admin/cli/PulsarAdminToolTest.java @@ -517,6 +517,9 @@ public class PulsarAdminToolTest { namespaces.run(split("get-subscription-expiration-time myprop/clust/ns1")); verify(mockNamespaces).getSubscriptionExpirationTime("myprop/clust/ns1"); + namespaces.run(split("remove-subscription-expiration-time myprop/clust/ns1")); + verify(mockNamespaces).removeSubscriptionExpirationTime("myprop/clust/ns1"); + namespaces.run(split("set-anti-affinity-group myprop/clust/ns1 -g group")); verify(mockNamespaces).setNamespaceAntiAffinityGroup("myprop/clust/ns1", "group"); diff --git a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdNamespaces.java b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdNamespaces.java index bae1ae21..a44022e 100644 --- a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdNamespaces.java +++ b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdNamespaces.java @@ -424,6 +424,18 @@ public class CmdNamespaces extends CmdBase { } } + @Parameters(commandDescription = "Remove subscription expiration time for a namespace") + private class RemoveSubscriptionExpirationTime extends CliCommand { + @Parameter(description = "tenant/namespace", required = true) + private java.util.List<String> params; + + @Override + void run() throws PulsarAdminException { + String namespace = validateNamespace(params); + getAdmin().namespaces().removeSubscriptionExpirationTime(namespace); + } + } + @Parameters(commandDescription = "Set Anti-affinity group name for a namespace") private class SetAntiAffinityGroup extends CliCommand { @Parameter(description = "tenant/namespace", required = true) @@ -2330,6 +2342,7 @@ public class CmdNamespaces extends CmdBase { jcommander.addCommand("get-subscription-expiration-time", new GetSubscriptionExpirationTime()); jcommander.addCommand("set-subscription-expiration-time", new SetSubscriptionExpirationTime()); + jcommander.addCommand("remove-subscription-expiration-time", new RemoveSubscriptionExpirationTime()); jcommander.addCommand("get-anti-affinity-group", new GetAntiAffinityGroup()); jcommander.addCommand("set-anti-affinity-group", new SetAntiAffinityGroup());
