This is an automated email from the ASF dual-hosted git repository.
zhaijia pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 549994b Enable users to set subscription expiration time for each
namespace (#6851)
549994b is described below
commit 549994b4e0a85ac203fe6944636abe5681281296
Author: Masahiro Sakamoto <[email protected]>
AuthorDate: Sat May 2 00:35:01 2020 +0900
Enable users to set subscription expiration time for each namespace (#6851)
## Motivation
We can automatically delete inactive subscriptions by setting
`subscriptionExpirationTimeMinutes` in broker.conf to a value greater than 0.
```sh
# How long to delete inactive subscriptions from last consuming
# When it is 0, inactive subscriptions are not deleted automatically
subscriptionExpirationTimeMinutes=0
```
However, since this setting value applies to all topics, we have to set it
to 0 if there is even one topic whose subscriptions should not be deleted.
### Modifications
Enable users to set a subscription expiration time for each namespace. This
value overrides `subscriptionExpirationTimeMinutes` in broker.conf.
```sh
$ ./bin/pulsar-admin namespaces set-subscription-expiration-time --time 60
tenant1/ns1
$ ./bin/pulsar-admin namespaces get-subscription-expiration-time tenant1/ns1
60
```
---
.../apache/pulsar/broker/ServiceConfiguration.java | 4 +-
.../pulsar/broker/admin/impl/NamespacesBase.java | 39 ++++++++++++
.../apache/pulsar/broker/admin/v1/Namespaces.java | 27 ++++++++
.../apache/pulsar/broker/admin/v2/Namespaces.java | 26 ++++++++
.../pulsar/broker/service/BrokerService.java | 2 +-
.../broker/service/persistent/PersistentTopic.java | 36 ++++++++---
.../apache/pulsar/broker/admin/AdminApiTest.java | 35 ++++++++++
.../org/apache/pulsar/client/admin/Namespaces.java | 74 ++++++++++++++++++++++
.../client/admin/internal/NamespacesImpl.java | 56 ++++++++++++++++
.../pulsar/admin/cli/PulsarAdminToolTest.java | 6 ++
.../org/apache/pulsar/admin/cli/CmdNamespaces.java | 30 +++++++++
.../pulsar/common/policies/data/Policies.java | 9 ++-
12 files changed, 330 insertions(+), 14 deletions(-)
diff --git
a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
index a50611a..b9d6dcb 100644
---
a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
+++
b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
@@ -307,7 +307,7 @@ public class ServiceConfiguration implements
PulsarConfiguration {
doc = "How long to delete inactive subscriptions from last consuming."
+ " When it is 0, inactive subscriptions are not deleted
automatically"
)
- private long subscriptionExpirationTimeMinutes = 0;
+ private int subscriptionExpirationTimeMinutes = 0;
@FieldContext(
category = CATEGORY_POLICIES,
dynamic = true,
@@ -319,7 +319,7 @@ public class ServiceConfiguration implements
PulsarConfiguration {
category = CATEGORY_POLICIES,
doc = "How frequently to proactively check and purge expired
subscription"
)
- private long subscriptionExpiryCheckIntervalInMinutes = 5;
+ private int subscriptionExpiryCheckIntervalInMinutes = 5;
@FieldContext(
category = CATEGORY_POLICIES,
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java
index 5de3915..b0504ee 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java
@@ -562,6 +562,45 @@ public abstract class NamespacesBase extends AdminResource
{
}
}
+ protected void internalSetSubscriptionExpirationTime(int expirationTime) {
+ validateAdminAccessForTenant(namespaceName.getTenant());
+ validatePoliciesReadOnlyAccess();
+
+ if (expirationTime < 0) {
+ throw new RestException(Status.PRECONDITION_FAILED, "Invalid value
for subscription expiration time");
+ }
+
+ Entry<Policies, Stat> policiesNode = null;
+
+ try {
+ // Force to read the data s.t. the watch to the cache content is
setup.
+ policiesNode = policiesCache().getWithStat(path(POLICIES,
namespaceName.toString())).orElseThrow(
+ () -> new RestException(Status.NOT_FOUND, "Namespace " +
namespaceName + " does not exist"));
+ policiesNode.getKey().subscription_expiration_time_minutes =
expirationTime;
+
+ // Write back the new policies into zookeeper
+ globalZk().setData(path(POLICIES, namespaceName.toString()),
+ jsonMapper().writeValueAsBytes(policiesNode.getKey()),
policiesNode.getValue().getVersion());
+ policiesCache().invalidate(path(POLICIES,
namespaceName.toString()));
+
+ log.info("[{}] Successfully updated the subscription expiration
time on namespace {}", clientAppId(),
+ namespaceName);
+ } catch (KeeperException.NoNodeException e) {
+ log.warn("[{}] Failed to update the subscription expiration time
for namespace {}: does not exist",
+ clientAppId(), namespaceName);
+ throw new RestException(Status.NOT_FOUND, "Namespace does not
exist");
+ } catch (KeeperException.BadVersionException e) {
+ log.warn(
+ "[{}] Failed to update the subscription expiration time on
namespace {} expected policy node version={} : concurrent modification",
+ clientAppId(), namespaceName,
policiesNode.getValue().getVersion());
+ throw new RestException(Status.CONFLICT, "Concurrent
modification");
+ } catch (Exception e) {
+ log.error("[{}] Failed to update the subscription expiration time
on namespace {}", clientAppId(),
+ namespaceName, e);
+ throw new RestException(e);
+ }
+ }
+
protected void internalSetAutoTopicCreation(AsyncResponse asyncResponse,
AutoTopicCreationOverride autoTopicCreationOverride) {
final int maxPartitions =
pulsar().getConfig().getMaxNumPartitionsPerPartitionedTopic();
validateAdminAccessForTenant(namespaceName.getTenant());
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/Namespaces.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/Namespaces.java
index 957a23a..700381d 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/Namespaces.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/Namespaces.java
@@ -339,6 +339,33 @@ public class Namespaces extends NamespacesBase {
internalSetNamespaceMessageTTL(messageTTL);
}
+ @GET
+ @Path("/{property}/{cluster}/{namespace}/subscriptionExpirationTime")
+ @ApiOperation(hidden = true, value = "Get the subscription expiration time
for the namespace")
+ @ApiResponses(value = { @ApiResponse(code = 403, message = "Don't have
admin permission"),
+ @ApiResponse(code = 404, message = "Property or cluster or
namespace doesn't exist") })
+ public int getSubscriptionExpirationTime(@PathParam("property") String
property,
+ @PathParam("cluster") String cluster, @PathParam("namespace")
String namespace) {
+ validateAdminAccessForTenant(property);
+ validateNamespaceName(property, cluster, namespace);
+
+ Policies policies = getNamespacePolicies(namespaceName);
+ return policies.subscription_expiration_time_minutes;
+ }
+
+ @POST
+ @Path("/{property}/{cluster}/{namespace}/subscriptionExpirationTime")
+ @ApiOperation(hidden = true, value = "Set subscription expiration time in
minutes for namespace")
+ @ApiResponses(value = { @ApiResponse(code = 403, message = "Don't have
admin permission"),
+ @ApiResponse(code = 404, message = "Property or cluster or
namespace doesn't exist"),
+ @ApiResponse(code = 412, message = "Invalid expiration time") })
+ public void setSubscriptionExpirationTime(@PathParam("property") String
property,
+ @PathParam("cluster") String cluster, @PathParam("namespace")
String namespace, int expirationTime) {
+ validateNamespaceName(property, cluster, namespace);
+ internalSetSubscriptionExpirationTime(expirationTime);
+ }
+
+
@POST
@Path("/{property}/{cluster}/{namespace}/antiAffinity")
@ApiOperation(value = "Set anti-affinity group for a namespace")
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/Namespaces.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/Namespaces.java
index dd2c57e..95cf65b 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/Namespaces.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/Namespaces.java
@@ -290,6 +290,32 @@ public class Namespaces extends NamespacesBase {
internalSetNamespaceMessageTTL(messageTTL);
}
+ @GET
+ @Path("/{tenant}/{namespace}/subscriptionExpirationTime")
+ @ApiOperation(value = "Get the subscription expiration time for the
namespace")
+ @ApiResponses(value = { @ApiResponse(code = 403, message = "Don't have
admin permission"),
+ @ApiResponse(code = 404, message = "Tenant or cluster or namespace
doesn't exist") })
+ public int getSubscriptionExpirationTime(@PathParam("tenant") String
tenant,
+ @PathParam("namespace") String namespace) {
+ validateAdminAccessForTenant(tenant);
+ validateNamespaceName(tenant, namespace);
+
+ Policies policies = getNamespacePolicies(namespaceName);
+ return policies.subscription_expiration_time_minutes;
+ }
+
+ @POST
+ @Path("/{tenant}/{namespace}/subscriptionExpirationTime")
+ @ApiOperation(value = "Set subscription expiration time in minutes for
namespace")
+ @ApiResponses(value = { @ApiResponse(code = 403, message = "Don't have
admin permission"),
+ @ApiResponse(code = 404, message = "Tenant or cluster or namespace
doesn't exist"),
+ @ApiResponse(code = 412, message = "Invalid expiration time") })
+ public void setSubscriptionExpirationTime(@PathParam("tenant") String
tenant,
+ @PathParam("namespace") String namespace, int expirationTime) {
+ validateNamespaceName(tenant, namespace);
+ internalSetSubscriptionExpirationTime(expirationTime);
+ }
+
@POST
@Path("/{tenant}/{namespace}/deduplication")
@ApiOperation(value = "Enable or disable broker side deduplication for all
topics in a namespace")
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
index 4c935d1..5d28fed 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
@@ -439,7 +439,7 @@ public class BrokerService implements Closeable,
ZooKeeperCacheListener<Policies
duplicationCheckerIntervalInSeconds, TimeUnit.SECONDS);
// Inactive subscriber checker
- if (pulsar().getConfiguration().getSubscriptionExpirationTimeMinutes()
> 0) {
+ if
(pulsar().getConfiguration().getSubscriptionExpiryCheckIntervalInMinutes() > 0)
{
long subscriptionExpiryCheckIntervalInSeconds =
TimeUnit.MINUTES.toSeconds(pulsar().getConfiguration().getSubscriptionExpiryCheckIntervalInMinutes());
inactivityMonitor.scheduleAtFixedRate(safeRun(this::checkInactiveSubscriptions),
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
index 8086209..436f1cc 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
@@ -1694,15 +1694,33 @@ public class PersistentTopic extends AbstractTopic
implements Topic, AddEntryCal
@Override
public void checkInactiveSubscriptions() {
- final long expirationTime =
TimeUnit.MINUTES.toMillis(brokerService.pulsar().getConfiguration().getSubscriptionExpirationTimeMinutes());
- if (expirationTime <= 0) return;
- subscriptions.forEach((subName, sub) -> {
- if (sub.dispatcher != null &&
sub.dispatcher.isConsumerConnected()) return;
- if (System.currentTimeMillis() - sub.cursor.getLastActive() >
expirationTime) {
- sub.delete().thenAccept(
- v -> log.info("[{}][{}] The subscription was deleted
due to expiration", topic, subName));
+ TopicName name = TopicName.get(topic);
+ try {
+ Policies policies =
brokerService.pulsar().getConfigurationCache().policiesCache()
+ .get(AdminResource.path(POLICIES, name.getNamespace()))
+ .orElseThrow(() -> new KeeperException.NoNodeException());
+ final int defaultExpirationTime =
brokerService.pulsar().getConfiguration()
+ .getSubscriptionExpirationTimeMinutes();
+ final long expirationTimeMillis = TimeUnit.MINUTES
+ .toMillis((policies.subscription_expiration_time_minutes
<= 0 && defaultExpirationTime > 0)
+ ? defaultExpirationTime
+ : policies.subscription_expiration_time_minutes);
+ if (expirationTimeMillis > 0) {
+ subscriptions.forEach((subName, sub) -> {
+ if (sub.dispatcher != null &&
sub.dispatcher.isConsumerConnected()) {
+ return;
+ }
+ if (System.currentTimeMillis() -
sub.cursor.getLastActive() > expirationTimeMillis) {
+ sub.delete().thenAccept(v -> log.info("[{}][{}] The
subscription was deleted due to expiration",
+ topic, subName));
+ }
+ });
}
- });
+ } catch (Exception e) {
+ if (log.isDebugEnabled()) {
+ log.debug("[{}] Error getting policies", topic);
+ }
+ }
}
@Override
@@ -2115,4 +2133,4 @@ public class PersistentTopic extends AbstractTopic
implements Topic, AddEntryCal
public CompactedTopic getCompactedTopic() {
return compactedTopic;
}
-}
\ No newline at end of file
+}
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiTest.java
index 7f138dd..cf04cc2 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiTest.java
@@ -159,6 +159,8 @@ public class AdminApiTest extends
MockedPulsarServiceBaseTest {
conf.setTlsCertificateFilePath(TLS_SERVER_CERT_FILE_PATH);
conf.setTlsKeyFilePath(TLS_SERVER_KEY_FILE_PATH);
conf.setMessageExpiryCheckIntervalInMinutes(1);
+ conf.setSubscriptionExpiryCheckIntervalInMinutes(1);
+ conf.setBrokerDeleteInactiveTopicsEnabled(false);
super.internalSetup();
@@ -2425,6 +2427,39 @@ public class AdminApiTest extends
MockedPulsarServiceBaseTest {
Assert.assertTrue(admin.topics().getStats(topic).subscriptions.values().iterator().next().lastExpireTimestamp
> 0L);
}
+ @Test(timeOut = 150000)
+ public void testSubscriptionExpiry() throws Exception {
+ final String namespace1 = "prop-xyz/sub-gc1";
+ final String namespace2 = "prop-xyz/sub-gc2";
+ final String topic1 = "persistent://" + namespace1 +
"/testSubscriptionExpiry";
+ final String topic2 = "persistent://" + namespace2 +
"/testSubscriptionExpiry";
+ final String sub = "sub1";
+
+ admin.namespaces().createNamespace(namespace1,
Sets.newHashSet("test"));
+ admin.namespaces().createNamespace(namespace2,
Sets.newHashSet("test"));
+ admin.topics().createSubscription(topic1, sub, MessageId.latest);
+ admin.topics().createSubscription(topic2, sub, MessageId.latest);
+ admin.namespaces().setSubscriptionExpirationTime(namespace1, 0);
+ admin.namespaces().setSubscriptionExpirationTime(namespace2, 1);
+
+
Assert.assertEquals(admin.namespaces().getSubscriptionExpirationTime(namespace1),
0);
+
Assert.assertEquals(admin.namespaces().getSubscriptionExpirationTime(namespace2),
1);
+ Thread.sleep(60000);
+ for (int i = 0; i < 60; i++) {
+ if (admin.topics().getSubscriptions(topic2).size() == 0) {
+ break;
+ }
+ Thread.sleep(1000);
+ }
+ Assert.assertEquals(admin.topics().getSubscriptions(topic1).size(), 1);
+ Assert.assertEquals(admin.topics().getSubscriptions(topic2).size(), 0);
+
+ admin.topics().delete(topic1);
+ admin.topics().delete(topic2);
+ admin.namespaces().deleteNamespace(namespace1);
+ admin.namespaces().deleteNamespace(namespace2);
+ }
+
@Test
public void testCreateAndDeleteNamespaceWithBundles() throws Exception {
admin.clusters().createCluster("usw", new ClusterData());
diff --git
a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/Namespaces.java
b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/Namespaces.java
index e397600..7cb7349 100644
---
a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/Namespaces.java
+++
b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/Namespaces.java
@@ -776,6 +776,80 @@ public interface Namespaces {
CompletableFuture<Void> setNamespaceMessageTTLAsync(String namespace, int
ttlInSeconds);
/**
+ * Get the subscription expiration time for a namespace.
+ * <p/>
+ * Response example:
+ *
+ * <pre>
+ * <code>1440</code>
+ * </pre>
+ *
+ * @param namespace
+ * Namespace name
+ *
+ * @throws NotAuthorizedException
+ * Don't have admin permission
+ * @throws NotFoundException
+ * Namespace does not exist
+ * @throws PulsarAdminException
+ * Unexpected error
+ */
+ int getSubscriptionExpirationTime(String namespace) throws
PulsarAdminException;
+
+ /**
+ * Get the subscription expiration time for a namespace asynchronously.
+ * <p/>
+ * Response example:
+ *
+ * <pre>
+ * <code>1440</code>
+ * </pre>
+ *
+ * @param namespace
+ * Namespace name
+ */
+ CompletableFuture<Integer> getSubscriptionExpirationTimeAsync(String
namespace);
+
+ /**
+ * Set the subscription expiration time in minutes for all the topics
within a namespace.
+ * <p/>
+ * Request example:
+ *
+ * <pre>
+ * <code>1440</code>
+ * </pre>
+ *
+ * @param namespace
+ * Namespace name
+ * @param expirationTime
+ * Expiration time values for all subscriptions for all topics
in this namespace
+ *
+ * @throws NotAuthorizedException
+ * Don't have admin permission
+ * @throws NotFoundException
+ * Namespace does not exist
+ * @throws PulsarAdminException
+ * Unexpected error
+ */
+ void setSubscriptionExpirationTime(String namespace, int expirationTime)
throws PulsarAdminException;
+
+ /**
+ * Set the subscription expiration time in minutes for all the topics
within a namespace asynchronously.
+ * <p/>
+ * Request example:
+ *
+ * <pre>
+ * <code>1440</code>
+ * </pre>
+ *
+ * @param namespace
+ * Namespace name
+ * @param expirationTime
+ * Expiration time values for all subscriptions for all topics
in this namespace
+ */
+ CompletableFuture<Void> setSubscriptionExpirationTimeAsync(String
namespace, int expirationTime);
+
+ /**
* Set anti-affinity group name for a namespace.
* <p/>
* Request example:
diff --git
a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/NamespacesImpl.java
b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/NamespacesImpl.java
index a013adf..1a9f010 100644
---
a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/NamespacesImpl.java
+++
b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/NamespacesImpl.java
@@ -600,6 +600,62 @@ public class NamespacesImpl extends BaseResource
implements Namespaces {
}
@Override
+ public int getSubscriptionExpirationTime(String namespace) throws
PulsarAdminException {
+ try {
+ return
getSubscriptionExpirationTimeAsync(namespace).get(this.readTimeoutMs,
TimeUnit.MILLISECONDS);
+ } catch (ExecutionException e) {
+ throw (PulsarAdminException) e.getCause();
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ throw new PulsarAdminException(e);
+ } catch (TimeoutException e) {
+ throw new PulsarAdminException.TimeoutException(e);
+ }
+ }
+
+ @Override
+ public CompletableFuture<Integer>
getSubscriptionExpirationTimeAsync(String namespace) {
+ NamespaceName ns = NamespaceName.get(namespace);
+ WebTarget path = namespacePath(ns, "subscriptionExpirationTime");
+ final CompletableFuture<Integer> future = new CompletableFuture<>();
+ asyncGetRequest(path, new InvocationCallback<Integer>() {
+ @Override
+ public void completed(Integer expirationTime) {
+ future.complete(expirationTime);
+ }
+
+ @Override
+ public void failed(Throwable throwable) {
+
future.completeExceptionally(getApiException(throwable.getCause()));
+ }
+ });
+ return future;
+ }
+
+ @Override
+ public void setSubscriptionExpirationTime(String namespace, int
expirationTime)
+ throws PulsarAdminException {
+ try {
+ setSubscriptionExpirationTimeAsync(namespace,
expirationTime).get(this.readTimeoutMs,
+ TimeUnit.MILLISECONDS);
+ } catch (ExecutionException e) {
+ throw (PulsarAdminException) e.getCause();
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ throw new PulsarAdminException(e);
+ } catch (TimeoutException e) {
+ throw new PulsarAdminException.TimeoutException(e);
+ }
+ }
+
+ @Override
+ public CompletableFuture<Void> setSubscriptionExpirationTimeAsync(String
namespace, int expirationTime) {
+ NamespaceName ns = NamespaceName.get(namespace);
+ WebTarget path = namespacePath(ns, "subscriptionExpirationTime");
+ return asyncPostRequest(path, Entity.entity(expirationTime,
MediaType.APPLICATION_JSON));
+ }
+
+ @Override
public void setNamespaceAntiAffinityGroup(String namespace, String
namespaceAntiAffinityGroup)
throws PulsarAdminException {
try {
diff --git
a/pulsar-client-tools-test/src/test/java/org/apache/pulsar/admin/cli/PulsarAdminToolTest.java
b/pulsar-client-tools-test/src/test/java/org/apache/pulsar/admin/cli/PulsarAdminToolTest.java
index a68f7bf..7b5d01d 100644
---
a/pulsar-client-tools-test/src/test/java/org/apache/pulsar/admin/cli/PulsarAdminToolTest.java
+++
b/pulsar-client-tools-test/src/test/java/org/apache/pulsar/admin/cli/PulsarAdminToolTest.java
@@ -356,6 +356,9 @@ public class PulsarAdminToolTest {
namespaces.run(split("set-message-ttl myprop/clust/ns1 -ttl 300"));
verify(mockNamespaces).setNamespaceMessageTTL("myprop/clust/ns1", 300);
+ namespaces.run(split("set-subscription-expiration-time
myprop/clust/ns1 -t 60"));
+
verify(mockNamespaces).setSubscriptionExpirationTime("myprop/clust/ns1", 60);
+
namespaces.run(split("set-deduplication myprop/clust/ns1 --enable"));
verify(mockNamespaces).setDeduplicationStatus("myprop/clust/ns1",
true);
@@ -376,6 +379,9 @@ public class PulsarAdminToolTest {
namespaces.run(split("get-message-ttl myprop/clust/ns1"));
verify(mockNamespaces).getNamespaceMessageTTL("myprop/clust/ns1");
+ namespaces.run(split("get-subscription-expiration-time
myprop/clust/ns1"));
+
verify(mockNamespaces).getSubscriptionExpirationTime("myprop/clust/ns1");
+
namespaces.run(split("set-anti-affinity-group myprop/clust/ns1 -g
group"));
verify(mockNamespaces).setNamespaceAntiAffinityGroup("myprop/clust/ns1",
"group");
diff --git
a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdNamespaces.java
b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdNamespaces.java
index e90e856..78d034f 100644
---
a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdNamespaces.java
+++
b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdNamespaces.java
@@ -300,6 +300,21 @@ public class CmdNamespaces extends CmdBase {
}
}
+ @Parameters(commandDescription = "Set subscription expiration time for a
namespace")
+ private class SetSubscriptionExpirationTime extends CliCommand {
+ @Parameter(description = "tenant/namespace", required = true)
+ private java.util.List<String> params;
+
+ @Parameter(names = { "-t", "--time" }, description = "Subscription
expiration time in minutes", required = true)
+ private int expirationTime;
+
+ @Override
+ void run() throws PulsarAdminException {
+ String namespace = validateNamespace(params);
+ admin.namespaces().setSubscriptionExpirationTime(namespace,
expirationTime);
+ }
+ }
+
@Parameters(commandDescription = "Set Anti-affinity group name for a
namespace")
private class SetAntiAffinityGroup extends CliCommand {
@Parameter(description = "tenant/namespace", required = true)
@@ -568,6 +583,18 @@ public class CmdNamespaces extends CmdBase {
}
}
+ @Parameters(commandDescription = "Get subscription expiration time for a
namespace")
+ private class GetSubscriptionExpirationTime extends CliCommand {
+ @Parameter(description = "tenant/namespace\n", required = true)
+ private java.util.List<String> params;
+
+ @Override
+ void run() throws PulsarAdminException {
+ String namespace = validateNamespace(params);
+ print(admin.namespaces().getSubscriptionExpirationTime(namespace));
+ }
+ }
+
@Parameters(commandDescription = "Unload a namespace from the current
serving broker")
private class Unload extends CliCommand {
@Parameter(description = "tenant/namespace\n", required = true)
@@ -1617,6 +1644,9 @@ public class CmdNamespaces extends CmdBase {
jcommander.addCommand("get-message-ttl", new GetMessageTTL());
jcommander.addCommand("set-message-ttl", new SetMessageTTL());
+ jcommander.addCommand("get-subscription-expiration-time", new
GetSubscriptionExpirationTime());
+ jcommander.addCommand("set-subscription-expiration-time", new
SetSubscriptionExpirationTime());
+
jcommander.addCommand("get-anti-affinity-group", new
GetAntiAffinityGroup());
jcommander.addCommand("set-anti-affinity-group", new
SetAntiAffinityGroup());
jcommander.addCommand("get-anti-affinity-namespaces", new
GetAntiAffinityNamespaces());
diff --git
a/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/Policies.java
b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/Policies.java
index 4e7a8a0..d54e0c7 100644
---
a/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/Policies.java
+++
b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/Policies.java
@@ -60,6 +60,8 @@ public class Policies {
@SuppressWarnings("checkstyle:MemberName")
public int message_ttl_in_seconds = 0;
@SuppressWarnings("checkstyle:MemberName")
+ public int subscription_expiration_time_minutes = 0;
+ @SuppressWarnings("checkstyle:MemberName")
public RetentionPolicies retention_policies = null;
public boolean deleted = false;
public String antiAffinityGroup;
@@ -117,7 +119,7 @@ public class Policies {
clusterSubscribeRate, deduplicationEnabled,
autoTopicCreationOverride,
autoSubscriptionCreationOverride, persistence,
bundles, latency_stats_sample_rate,
- message_ttl_in_seconds, retention_policies,
+ message_ttl_in_seconds, subscription_expiration_time_minutes,
retention_policies,
encryption_required, delayed_delivery_policies,
subscription_auth_mode,
antiAffinityGroup, max_producers_per_topic,
@@ -152,6 +154,7 @@ public class Policies {
&& Objects.equals(latency_stats_sample_rate,
other.latency_stats_sample_rate)
&& Objects.equals(message_ttl_in_seconds,
other.message_ttl_in_seconds)
+ && Objects.equals(subscription_expiration_time_minutes,
other.subscription_expiration_time_minutes)
&& Objects.equals(retention_policies,
other.retention_policies)
&& Objects.equals(encryption_required,
other.encryption_required)
&& Objects.equals(delayed_delivery_policies,
other.delayed_delivery_policies)
@@ -207,7 +210,9 @@ public class Policies {
.add("publishMaxMessageRate", publishMaxMessageRate)
.add("latency_stats_sample_rate", latency_stats_sample_rate)
.add("antiAffinityGroup", antiAffinityGroup)
- .add("message_ttl_in_seconds",
message_ttl_in_seconds).add("retention_policies", retention_policies)
+ .add("message_ttl_in_seconds", message_ttl_in_seconds)
+ .add("subscription_expiration_time_minutes",
subscription_expiration_time_minutes)
+ .add("retention_policies", retention_policies)
.add("deleted", deleted)
.add("encryption_required", encryption_required)
.add("delayed_delivery_policies", delayed_delivery_policies)