This is an automated email from the ASF dual-hosted git repository.
penghui pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 4a441db Support topic-level inactiveTopicPolicies (#7986)
4a441db is described below
commit 4a441db363efe1adeefcfca83cade5232c5b1666
Author: feynmanlin <[email protected]>
AuthorDate: Sat Sep 5 21:20:50 2020 +0800
Support topic-level inactiveTopicPolicies (#7986)
### Motivation
Support topic-level inactiveTopicPolicies
### Modifications
Support set/get/remove inactiveTopicPolicies policy on topic level.
---
.../broker/admin/impl/PersistentTopicsBase.java | 18 +-
.../pulsar/broker/admin/v2/PersistentTopics.java | 64 +++++++
.../broker/service/persistent/PersistentTopic.java | 49 ++++-
.../broker/service/InactiveTopicDeleteTest.java | 207 ++++++++++++++++++++-
.../org/apache/pulsar/client/admin/Topics.java | 47 +++++
.../pulsar/client/admin/internal/TopicsImpl.java | 80 ++++++++
.../pulsar/admin/cli/PulsarAdminToolTest.java | 8 +
.../org/apache/pulsar/admin/cli/CmdTopics.java | 67 +++++++
.../pulsar/common/policies/data/TopicPolicies.java | 11 +-
9 files changed, 528 insertions(+), 23 deletions(-)
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
index c82d136..b6a0b2d 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
@@ -97,9 +97,8 @@ import org.apache.pulsar.client.impl.MessageIdImpl;
import org.apache.pulsar.common.allocator.PulsarByteBufAllocator;
import org.apache.pulsar.common.naming.PartitionedManagedLedgerInfo;
import org.apache.pulsar.common.policies.data.DispatchRate;
+import org.apache.pulsar.common.policies.data.InactiveTopicPolicies;
import org.apache.pulsar.common.policies.data.PersistencePolicies;
-import org.apache.pulsar.common.policies.data.PolicyName;
-import org.apache.pulsar.common.policies.data.PolicyOperation;
import org.apache.pulsar.common.policies.data.OffloadPolicies;
import org.apache.pulsar.common.policies.data.DelayedDeliveryPolicies;
import org.apache.pulsar.common.policies.data.PublishRate;
@@ -843,6 +842,21 @@ public class PersistentTopicsBase extends AdminResource {
return completableFuture;
}
+ protected CompletableFuture<Void>
internalSetInactiveTopicPolicies(InactiveTopicPolicies inactiveTopicPolicies) {
+ TopicPolicies topicPolicies = null;
+ try {
+ topicPolicies =
pulsar().getTopicPoliciesService().getTopicPolicies(topicName);
+ } catch (BrokerServiceException.TopicPoliciesCacheNotInitException e) {
+ log.error("Topic {} policies cache have not init.", topicName);
+ return FutureUtil.failedFuture(new
RestException(Status.PRECONDITION_FAILED, "Policies cache have not init"));
+ }
+ if (topicPolicies == null) {
+ topicPolicies = new TopicPolicies();
+ }
+ topicPolicies.setInactiveTopicPolicies(inactiveTopicPolicies);
+ return
pulsar().getTopicPoliciesService().updateTopicPoliciesAsync(topicName,
topicPolicies);
+ }
+
private CompletableFuture<Void>
internalUpdateOffloadPolicies(OffloadPolicies offloadPolicies, TopicName
topicName) {
return
pulsar().getBrokerService().getTopicIfExists(topicName.toString())
.thenAccept(optionalTopic -> {
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java
index c897124..936a7df 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java
@@ -57,6 +57,7 @@ import
org.apache.pulsar.common.partition.PartitionedTopicMetadata;
import org.apache.pulsar.common.policies.data.AuthAction;
import org.apache.pulsar.common.policies.data.BacklogQuota;
import org.apache.pulsar.common.policies.data.DispatchRate;
+import org.apache.pulsar.common.policies.data.InactiveTopicPolicies;
import org.apache.pulsar.common.policies.data.PersistencePolicies;
import org.apache.pulsar.common.policies.data.DelayedDeliveryPolicies;
import org.apache.pulsar.common.policies.data.OffloadPolicies;
@@ -378,6 +379,69 @@ public class PersistentTopics extends PersistentTopicsBase
{
}
@GET
+ @Path("/{tenant}/{namespace}/{topic}/inactiveTopicPolicies")
+ @ApiOperation(value = "Get inactive topic policies on a topic.")
+ @ApiResponses(value = {@ApiResponse(code = 403, message = "Don't have
admin permission"),
+ @ApiResponse(code = 404, message = "Tenant or cluster or namespace
or topic doesn't exist"),
+ @ApiResponse(code = 500, message = "Internal server error"),})
+ public void getInactiveTopicPolicies(@Suspended final AsyncResponse
asyncResponse,
+ @PathParam("tenant") String tenant,
+ @PathParam("namespace") String
namespace,
+ @PathParam("topic") @Encoded String
encodedTopic) {
+ validateTopicName(tenant, namespace, encodedTopic);
+ TopicPolicies topicPolicies = getTopicPolicies(topicName).orElse(new
TopicPolicies());
+ if (topicPolicies.isInactiveTopicPoliciesSet()) {
+ asyncResponse.resume(topicPolicies.getInactiveTopicPolicies());
+ } else {
+ asyncResponse.resume(Response.noContent().build());
+ }
+ }
+
+ @POST
+ @Path("/{tenant}/{namespace}/{topic}/inactiveTopicPolicies")
+ @ApiOperation(value = "Set inactive topic policies on a topic.")
+ @ApiResponses(value = {@ApiResponse(code = 403, message = "Don't have
admin permission"),
+ @ApiResponse(code = 404, message = "Tenant or cluster or namespace
or topic doesn't exist"),})
+ public void setInactiveTopicPolicies(@Suspended final AsyncResponse
asyncResponse,
+ @PathParam("tenant") String
tenant,
+ @PathParam("namespace") String
namespace,
+ @PathParam("topic") @Encoded
String encodedTopic,
+ @ApiParam(value = "inactive
topic policies for the specified topic")
+ InactiveTopicPolicies
inactiveTopicPolicies) {
+ validateTopicName(tenant, namespace, encodedTopic);
+ validateAdminAccessForTenant(tenant);
+ validatePoliciesReadOnlyAccess();
+ checkTopicLevelPolicyEnable();
+ if (topicName.isGlobal()) {
+ validateGlobalNamespaceOwnership(namespaceName);
+ }
+
internalSetInactiveTopicPolicies(inactiveTopicPolicies).whenComplete((res, ex)
-> {
+ if (ex instanceof RestException) {
+ log.error("Failed set InactiveTopicPolicies", ex);
+ asyncResponse.resume(ex);
+ } else if (ex != null) {
+ log.error("Failed set InactiveTopicPolicies", ex);
+ asyncResponse.resume(new RestException(ex));
+ } else {
+ asyncResponse.resume(Response.noContent().build());
+ }
+ });
+ }
+
+ @DELETE
+ @Path("/{tenant}/{namespace}/{topic}/inactiveTopicPolicies")
+ @ApiOperation(value = "Delete inactive topic policies on a topic.")
+ @ApiResponses(value = {@ApiResponse(code = 403, message = "Don't have
admin permission"),
+ @ApiResponse(code = 404, message = "Tenant or cluster or namespace
or topic doesn't exist"),})
+ public void deleteInactiveTopicPolicies(@Suspended final AsyncResponse
asyncResponse,
+ @PathParam("tenant")
String tenant,
+ @PathParam("namespace")
String namespace,
+ @PathParam("topic")
@Encoded String encodedTopic) {
+ validateTopicName(tenant, namespace, encodedTopic);
+ setInactiveTopicPolicies(asyncResponse, tenant, namespace,
encodedTopic, null);
+ }
+
+ @GET
@Path("/{tenant}/{namespace}/{topic}/maxUnackedMessagesOnSubscription")
@ApiOperation(value = "Get max unacked messages per subscription config on
a topic.")
@ApiResponses(value = {@ApiResponse(code = 403, message = "Don't have
admin permission"),
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
index 6f9860b..8ff51f6 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
@@ -1825,14 +1825,19 @@ public class PersistentTopic extends AbstractTopic
implements Topic, AddEntryCal
delayedDeliveryTickTimeMillis =
data.delayed_delivery_policies.getTickTime();
delayedDeliveryEnabled = data.delayed_delivery_policies.isActive();
}
+ //If the topic-level policy already exists, the namespace-level policy
cannot override the topic-level policy.
+ TopicPolicies topicPolicies = getTopicPolicies(TopicName.get(topic));
if (data.inactive_topic_policies != null) {
- this.inactiveTopicPolicies = data.inactive_topic_policies;
+ if (topicPolicies == null ||
!topicPolicies.isInactiveTopicPoliciesSet()) {
+ this.inactiveTopicPolicies = data.inactive_topic_policies;
+ }
} else {
ServiceConfiguration cfg =
brokerService.getPulsar().getConfiguration();
resetInactiveTopicPolicies(cfg.getBrokerDeleteInactiveTopicsMode()
,
cfg.getBrokerDeleteInactiveTopicsMaxInactiveDurationSeconds(),
cfg.isBrokerDeleteInactiveTopicsEnabled());
}
+
initializeDispatchRateLimiterIfNeeded(Optional.ofNullable(data));
this.updateMaxPublishRate(data);
@@ -2344,24 +2349,50 @@ public class PersistentTopic extends AbstractTopic
implements Topic, AddEntryCal
if (log.isDebugEnabled()) {
log.debug("[{}] update topic policy: {}", topic, policies);
}
-
-
initializeTopicDispatchRateLimiterIfNeeded(Optional.ofNullable(policies));
- if (this.dispatchRateLimiter.isPresent() && policies != null
- && policies.getDispatchRate() != null) {
+ if (policies == null) {
+ return;
+ }
+ Optional<Policies> namespacePolicies = getNamespacePolicies();
+ initializeTopicDispatchRateLimiterIfNeeded(policies);
+ if (this.dispatchRateLimiter.isPresent() && policies.getDispatchRate()
!= null) {
dispatchRateLimiter.ifPresent(dispatchRateLimiter ->
dispatchRateLimiter.updateDispatchRate(policies.getDispatchRate()));
}
- if (policies != null && policies.getPublishRate() != null) {
+ if (policies.getPublishRate() != null) {
topicPolicyPublishRate = policies.getPublishRate();
updateTopicPublishDispatcher();
}
+
+ if (policies.isInactiveTopicPoliciesSet()) {
+ inactiveTopicPolicies = policies.getInactiveTopicPolicies();
+ } else {
+ //topic-level policies is null , so use namespace-level or
broker-level
+ namespacePolicies.ifPresent(nsPolicies -> {
+ if (nsPolicies.inactive_topic_policies != null) {
+ inactiveTopicPolicies = nsPolicies.inactive_topic_policies;
+ } else {
+ ServiceConfiguration cfg =
brokerService.getPulsar().getConfiguration();
+
resetInactiveTopicPolicies(cfg.getBrokerDeleteInactiveTopicsMode()
+ ,
cfg.getBrokerDeleteInactiveTopicsMaxInactiveDurationSeconds(),
cfg.isBrokerDeleteInactiveTopicsEnabled());
+ }
+ });
+ }
+ }
+
+ private Optional<Policies> getNamespacePolicies(){
+ try {
+ return
Optional.ofNullable(brokerService.pulsar().getAdminClient().namespaces()
+ .getPolicies(TopicName.get(topic).getNamespace()));
+ } catch (Exception e) {
+ log.error("get namespace policies fail", e);
+ }
+ return Optional.empty();
}
- private void
initializeTopicDispatchRateLimiterIfNeeded(Optional<TopicPolicies> policies) {
+ private void initializeTopicDispatchRateLimiterIfNeeded(TopicPolicies
policies) {
synchronized (dispatchRateLimiter) {
- if (!dispatchRateLimiter.isPresent() && policies.isPresent() &&
- policies.get().getDispatchRate() != null) {
+ if (!dispatchRateLimiter.isPresent() && policies.getDispatchRate()
!= null) {
this.dispatchRateLimiter = Optional.of(new
DispatchRateLimiter(this, Type.TOPIC));
}
}
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/InactiveTopicDeleteTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/InactiveTopicDeleteTest.java
index 143a4ff..79a296e 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/InactiveTopicDeleteTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/InactiveTopicDeleteTest.java
@@ -23,6 +23,7 @@ import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
+import java.util.UUID;
import com.google.common.collect.Sets;
import org.apache.pulsar.broker.service.persistent.PersistentTopic;
@@ -34,6 +35,9 @@ import
org.apache.pulsar.common.policies.data.InactiveTopicPolicies;
import org.testng.Assert;
import org.testng.annotations.Test;
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertNull;
+
public class InactiveTopicDeleteTest extends BrokerTestBase {
protected void setup() throws Exception {
@@ -125,9 +129,9 @@ public class InactiveTopicDeleteTest extends BrokerTestBase
{
}
Assert.assertTrue(policies.isDeleteWhileInactive());
- Assert.assertEquals(policies.getInactiveTopicDeleteMode(),
InactiveTopicDeleteMode.delete_when_no_subscriptions);
- Assert.assertEquals(policies.getMaxInactiveDurationSeconds(), 1);
- Assert.assertEquals(policies,
admin.namespaces().getInactiveTopicPolicies(namespace));
+ assertEquals(policies.getInactiveTopicDeleteMode(),
InactiveTopicDeleteMode.delete_when_no_subscriptions);
+ assertEquals(policies.getMaxInactiveDurationSeconds(), 1);
+ assertEquals(policies,
admin.namespaces().getInactiveTopicPolicies(namespace));
admin.namespaces().removeInactiveTopicPolicies(namespace);
while (true) {
@@ -137,14 +141,14 @@ public class InactiveTopicDeleteTest extends
BrokerTestBase {
break;
}
}
- Assert.assertEquals(((PersistentTopic)
pulsar.getBrokerService().getTopic(topic,
false).get().get()).inactiveTopicPolicies
+ assertEquals(((PersistentTopic)
pulsar.getBrokerService().getTopic(topic,
false).get().get()).inactiveTopicPolicies
, defaultPolicy);
policies =
((PersistentTopic)pulsar.getBrokerService().getTopic(topic2,false).get().get()).inactiveTopicPolicies;
Assert.assertTrue(policies.isDeleteWhileInactive());
- Assert.assertEquals(policies.getInactiveTopicDeleteMode(),
InactiveTopicDeleteMode.delete_when_subscriptions_caught_up);
- Assert.assertEquals(policies.getMaxInactiveDurationSeconds(), 1);
- Assert.assertEquals(policies,
admin.namespaces().getInactiveTopicPolicies(namespace2));
+ assertEquals(policies.getInactiveTopicDeleteMode(),
InactiveTopicDeleteMode.delete_when_subscriptions_caught_up);
+ assertEquals(policies.getMaxInactiveDurationSeconds(), 1);
+ assertEquals(policies,
admin.namespaces().getInactiveTopicPolicies(namespace2));
admin.namespaces().removeInactiveTopicPolicies(namespace2);
while (true) {
@@ -154,7 +158,7 @@ public class InactiveTopicDeleteTest extends BrokerTestBase
{
break;
}
}
- Assert.assertEquals(((PersistentTopic)
pulsar.getBrokerService().getTopic(topic2,
false).get().get()).inactiveTopicPolicies
+ assertEquals(((PersistentTopic)
pulsar.getBrokerService().getTopic(topic2,
false).get().get()).inactiveTopicPolicies
, defaultPolicy);
super.internalCleanup();
@@ -166,7 +170,6 @@ public class InactiveTopicDeleteTest extends BrokerTestBase
{
final String namespace2 = "prop/ns-abc2";
final String namespace3 = "prop/ns-abc3";
List<String> namespaceList = Arrays.asList(namespace2, namespace3);
-
conf.setBrokerDeleteInactiveTopicsEnabled(true);
conf.setBrokerDeleteInactiveTopicsFrequencySeconds(1);
super.baseSetup();
@@ -293,4 +296,190 @@ public class InactiveTopicDeleteTest extends
BrokerTestBase {
super.internalCleanup();
}
+
+ @Test(timeOut = 20000)
+ public void testTopicLevelInActiveTopicApi() throws Exception {
+ super.resetConfig();
+ conf.setSystemTopicEnabled(true);
+ conf.setTopicLevelPoliciesEnabled(true);
+ super.baseSetup();
+
+ final String topicName =
"persistent://prop/ns-abc/testMaxInactiveDuration-" +
UUID.randomUUID().toString();
+ admin.topics().createPartitionedTopic(topicName, 3);
+ //wait for cache init
+ Thread.sleep(2000);
+ InactiveTopicPolicies inactiveTopicPolicies =
admin.topics().getInactiveTopicPolicies(topicName);
+ assertNull(inactiveTopicPolicies);
+
+ InactiveTopicPolicies policies = new InactiveTopicPolicies();
+ policies.setDeleteWhileInactive(true);
+
policies.setInactiveTopicDeleteMode(InactiveTopicDeleteMode.delete_when_no_subscriptions);
+ policies.setMaxInactiveDurationSeconds(10);
+ admin.topics().setInactiveTopicPolicies(topicName, policies);
+ for (int i = 0; i < 50; i++) {
+ if (admin.topics().getInactiveTopicPolicies(topicName) != null) {
+ break;
+ }
+ Thread.sleep(100);
+ }
+ assertEquals(admin.topics().getInactiveTopicPolicies(topicName),
policies);
+ admin.topics().removeInactiveTopicPolicies(topicName);
+ for (int i = 0; i < 50; i++) {
+ if (admin.topics().getInactiveTopicPolicies(topicName) == null) {
+ break;
+ }
+ Thread.sleep(100);
+ }
+ assertNull(admin.topics().getInactiveTopicPolicies(topicName));
+
+ super.internalCleanup();
+ }
+
+ @Test(timeOut = 30000)
+ public void testTopicLevelInactivePolicyUpdateAndClean() throws Exception {
+ super.resetConfig();
+ conf.setSystemTopicEnabled(true);
+ conf.setTopicLevelPoliciesEnabled(true);
+ conf.setBrokerDeleteInactiveTopicsEnabled(true);
+ conf.setBrokerDeleteInactiveTopicsMaxInactiveDurationSeconds(1000);
+
conf.setBrokerDeleteInactiveTopicsMode(InactiveTopicDeleteMode.delete_when_no_subscriptions);
+ InactiveTopicPolicies defaultPolicy = new
InactiveTopicPolicies(InactiveTopicDeleteMode.delete_when_no_subscriptions
+ , 1000, true);
+
+ super.baseSetup();
+ //wait for cache init
+ Thread.sleep(2000);
+ final String namespace = "prop/ns-abc";
+ final String topic =
"persistent://prop/ns-abc/testTopicLevelInactivePolicy" +
UUID.randomUUID().toString();
+ final String topic2 =
"persistent://prop/ns-abc/testTopicLevelInactivePolicy" +
UUID.randomUUID().toString();
+ final String topic3 =
"persistent://prop/ns-abc/testTopicLevelInactivePolicy" +
UUID.randomUUID().toString();
+ List<String> topics = Arrays.asList(topic, topic2, topic3);
+
+ for (String tp : topics) {
+ admin.topics().createNonPartitionedTopic(tp);
+ }
+
+ InactiveTopicPolicies inactiveTopicPolicies =
+ new
InactiveTopicPolicies(InactiveTopicDeleteMode.delete_when_no_subscriptions, 1,
true);
+ admin.topics().setInactiveTopicPolicies(topic, inactiveTopicPolicies);
+
inactiveTopicPolicies.setInactiveTopicDeleteMode(InactiveTopicDeleteMode.delete_when_subscriptions_caught_up);
+ admin.topics().setInactiveTopicPolicies(topic2, inactiveTopicPolicies);
+
inactiveTopicPolicies.setInactiveTopicDeleteMode(InactiveTopicDeleteMode.delete_when_no_subscriptions);
+ admin.topics().setInactiveTopicPolicies(topic3, inactiveTopicPolicies);
+
+ //wait for cache
+ for (int i = 0; i < 50; i++) {
+ if (admin.topics().getInactiveTopicPolicies(topic) != null) {
+ break;
+ }
+ Thread.sleep(100);
+ }
+ InactiveTopicPolicies policies = ((PersistentTopic)
pulsar.getBrokerService()
+ .getTopic(topic, false).get().get()).inactiveTopicPolicies;
+ Assert.assertTrue(policies.isDeleteWhileInactive());
+ assertEquals(policies.getInactiveTopicDeleteMode(),
InactiveTopicDeleteMode.delete_when_no_subscriptions);
+ assertEquals(policies.getMaxInactiveDurationSeconds(), 1);
+ assertEquals(policies, admin.topics().getInactiveTopicPolicies(topic));
+
+ admin.topics().removeInactiveTopicPolicies(topic);
+ for (int i = 0; i < 50; i++) {
+ if (admin.topics().getInactiveTopicPolicies(topic) == null) {
+ break;
+ }
+ Thread.sleep(100);
+ }
+ //Only the broker-level policies is set, so after removing the
topic-level policies
+ // , the topic will use the broker-level policies
+ assertEquals(((PersistentTopic)
pulsar.getBrokerService().getTopic(topic,
false).get().get()).inactiveTopicPolicies
+ , defaultPolicy);
+
+ policies =
((PersistentTopic)pulsar.getBrokerService().getTopic(topic2,false).get().get()).inactiveTopicPolicies;
+ Assert.assertTrue(policies.isDeleteWhileInactive());
+ assertEquals(policies.getInactiveTopicDeleteMode(),
InactiveTopicDeleteMode.delete_when_subscriptions_caught_up);
+ assertEquals(policies.getMaxInactiveDurationSeconds(), 1);
+ assertEquals(policies,
admin.topics().getInactiveTopicPolicies(topic2));
+ inactiveTopicPolicies.setMaxInactiveDurationSeconds(999);
+ //Both broker level and namespace level policies are set, so after
removing the topic level policies
+ // , the topic will use the namespace level policies
+ admin.namespaces().setInactiveTopicPolicies(namespace,
inactiveTopicPolicies);
+ //wait for zk
+ Thread.sleep(1000);
+ admin.topics().removeInactiveTopicPolicies(topic2);
+ for (int i = 0; i < 50; i++) {
+ if (admin.topics().getInactiveTopicPolicies(topic2) == null) {
+ break;
+ }
+ Thread.sleep(100);
+ }
+ InactiveTopicPolicies nsPolicies = ((PersistentTopic)
pulsar.getBrokerService()
+ .getTopic(topic2, false).get().get()).inactiveTopicPolicies;
+ assertEquals(nsPolicies.getMaxInactiveDurationSeconds(), 999);
+
+ super.internalCleanup();
+ }
+
+ @Test(timeOut = 30000)
+ public void testDeleteWhenNoSubscriptionsWithTopicLevelPolicies() throws
Exception {
+ final String namespace = "prop/ns-abc";
+ conf.setSystemTopicEnabled(true);
+ conf.setTopicLevelPoliciesEnabled(true);
+ conf.setBrokerDeleteInactiveTopicsEnabled(true);
+ conf.setBrokerDeleteInactiveTopicsFrequencySeconds(1);
+ super.baseSetup();
+ //wait for cache init
+ Thread.sleep(2000);
+ final String topic = "persistent://prop/ns-abc/test-" +
UUID.randomUUID();
+ final String topic2 = "persistent://prop/ns-abc/test-" +
UUID.randomUUID();
+ final String topic3 = "persistent://prop/ns-abc/test-" +
UUID.randomUUID();
+ List<String> topics = Arrays.asList(topic, topic2, topic3);
+ //create producer/consumer and close
+ Map<String, String> topicToSub = new HashMap<>();
+ for (String tp : topics) {
+ Producer<byte[]> producer =
pulsarClient.newProducer().topic(tp).create();
+ String subName = "sub" + System.currentTimeMillis();
+ topicToSub.put(tp, subName);
+ Consumer<byte[]> consumer =
pulsarClient.newConsumer().topic(tp).subscriptionName(subName).subscribe();
+ for (int i = 0; i < 10; i++) {
+ producer.send("Pulsar".getBytes());
+ }
+ consumer.close();
+ producer.close();
+ Thread.sleep(1);
+ }
+ // "topic" use delete_when_no_subscriptions, "topic2" use
delete_when_subscriptions_caught_up
+ // "topic3" use default:delete_when_no_subscriptions
+ InactiveTopicPolicies inactiveTopicPolicies =
+ new
InactiveTopicPolicies(InactiveTopicDeleteMode.delete_when_no_subscriptions,1,true);
+ admin.topics().setInactiveTopicPolicies(topic, inactiveTopicPolicies);
+
inactiveTopicPolicies.setInactiveTopicDeleteMode(InactiveTopicDeleteMode.delete_when_subscriptions_caught_up);
+ admin.topics().setInactiveTopicPolicies(topic2, inactiveTopicPolicies);
+
+ //wait for update
+ for (int i = 0; i < 50; i++) {
+ if (admin.topics().getInactiveTopicPolicies(topic2) != null) {
+ break;
+ }
+ Thread.sleep(100);
+ }
+
+ // topic should still exist
+ Thread.sleep(2000);
+ Assert.assertTrue(admin.topics().getList(namespace).contains(topic));
+ Assert.assertTrue(admin.topics().getList(namespace).contains(topic2));
+ Assert.assertTrue(admin.topics().getList(namespace).contains(topic3));
+
+ // no backlog, trigger delete_when_subscriptions_caught_up
+ admin.topics().skipAllMessages(topic2, topicToSub.remove(topic2));
+ Thread.sleep(2000);
+ Assert.assertFalse(admin.topics().getList(namespace).contains(topic2));
+ // delete subscription, trigger delete_when_no_subscriptions
+ for (Map.Entry<String, String> entry : topicToSub.entrySet()) {
+ admin.topics().deleteSubscription(entry.getKey(),
entry.getValue());
+ }
+ Thread.sleep(2000);
+ Assert.assertFalse(admin.topics().getList(namespace).contains(topic));
+ Assert.assertFalse(admin.topics().getList(namespace).contains(topic3));
+
+ super.internalCleanup();
+ }
}
diff --git
a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/Topics.java
b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/Topics.java
index 89d4ba2..468f222 100644
---
a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/Topics.java
+++
b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/Topics.java
@@ -35,6 +35,7 @@ import org.apache.pulsar.common.policies.data.AuthAction;
import org.apache.pulsar.common.policies.data.BacklogQuota;
import org.apache.pulsar.common.policies.data.DelayedDeliveryPolicies;
import org.apache.pulsar.common.policies.data.DispatchRate;
+import org.apache.pulsar.common.policies.data.InactiveTopicPolicies;
import org.apache.pulsar.common.policies.data.OffloadPolicies;
import org.apache.pulsar.common.policies.data.PartitionedTopicInternalStats;
import org.apache.pulsar.common.policies.data.PartitionedTopicStats;
@@ -1711,6 +1712,52 @@ public interface Topics {
CompletableFuture<Void> removeMaxUnackedMessagesOnConsumerAsync(String
topic);
/**
+ * get inactive topic policies of a topic.
+ * @param topic
+ * @return
+ * @throws PulsarAdminException
+ */
+ InactiveTopicPolicies getInactiveTopicPolicies(String topic) throws
PulsarAdminException;
+
+ /**
+ * get inactive topic policies of a topic asynchronously.
+ * @param topic
+ * @return
+ */
+ CompletableFuture<InactiveTopicPolicies>
getInactiveTopicPoliciesAsync(String topic);
+
+ /**
+ * set inactive topic policies of a topic.
+ * @param topic
+ * @param maxNum
+ * @throws PulsarAdminException
+ */
+ void setInactiveTopicPolicies(String topic
+ , InactiveTopicPolicies inactiveTopicPolicies) throws
PulsarAdminException;
+
+ /**
+ * set inactive topic policies of a topic asynchronously.
+ * @param topic
+ * @param maxNum
+ * @return
+ */
+ CompletableFuture<Void> setInactiveTopicPoliciesAsync(String topic,
InactiveTopicPolicies inactiveTopicPolicies);
+
+ /**
+ * remove inactive topic policies of a topic.
+ * @param topic
+ * @throws PulsarAdminException
+ */
+ void removeInactiveTopicPolicies(String topic) throws PulsarAdminException;
+
+ /**
+ * remove inactive topic policies of a topic asynchronously.
+ * @param topic
+ * @return
+ */
+ CompletableFuture<Void> removeInactiveTopicPoliciesAsync(String topic);
+
+ /**
* get offload policies of a topic.
* @param topic
* @return
diff --git
a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/TopicsImpl.java
b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/TopicsImpl.java
index f5e0a56..b4da586 100644
---
a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/TopicsImpl.java
+++
b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/TopicsImpl.java
@@ -71,6 +71,7 @@ import
org.apache.pulsar.common.policies.data.BacklogQuota.BacklogQuotaType;
import org.apache.pulsar.common.policies.data.DelayedDeliveryPolicies;
import org.apache.pulsar.common.policies.data.DispatchRate;
import org.apache.pulsar.common.policies.data.ErrorData;
+import org.apache.pulsar.common.policies.data.InactiveTopicPolicies;
import org.apache.pulsar.common.policies.data.OffloadPolicies;
import org.apache.pulsar.common.policies.data.PartitionedTopicInternalStats;
import org.apache.pulsar.common.policies.data.PartitionedTopicStats;
@@ -1509,6 +1510,85 @@ public class TopicsImpl extends BaseResource implements
Topics {
}
@Override
+ public InactiveTopicPolicies getInactiveTopicPolicies(String topic) throws
PulsarAdminException {
+ try {
+ return getInactiveTopicPoliciesAsync(topic).
+ get(this.readTimeoutMs, TimeUnit.MILLISECONDS);
+ } catch (ExecutionException e) {
+ throw (PulsarAdminException) e.getCause();
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ throw new PulsarAdminException(e);
+ } catch (TimeoutException e) {
+ throw new PulsarAdminException.TimeoutException(e);
+ }
+ }
+
+ @Override
+ public CompletableFuture<InactiveTopicPolicies>
getInactiveTopicPoliciesAsync(String topic) {
+ TopicName topicName = validateTopic(topic);
+ WebTarget path = topicPath(topicName, "inactiveTopicPolicies");
+ final CompletableFuture<InactiveTopicPolicies> future = new
CompletableFuture<>();
+ asyncGetRequest(path, new InvocationCallback<InactiveTopicPolicies>() {
+ @Override
+ public void completed(InactiveTopicPolicies inactiveTopicPolicies)
{
+ future.complete(inactiveTopicPolicies);
+ }
+
+ @Override
+ public void failed(Throwable throwable) {
+
future.completeExceptionally(getApiException(throwable.getCause()));
+ }
+ });
+ return future;
+ }
+
+ @Override
+ public CompletableFuture<Void> setInactiveTopicPoliciesAsync(String topic
+ , InactiveTopicPolicies inactiveTopicPolicies) {
+ TopicName topicName = validateTopic(topic);
+ WebTarget path = topicPath(topicName, "inactiveTopicPolicies");
+ return asyncPostRequest(path, Entity.entity(inactiveTopicPolicies,
MediaType.APPLICATION_JSON));
+ }
+
+ @Override
+ public void setInactiveTopicPolicies(String topic
+ , InactiveTopicPolicies inactiveTopicPolicies) throws
PulsarAdminException {
+ try {
+ setInactiveTopicPoliciesAsync(topic, inactiveTopicPolicies)
+ .get(this.readTimeoutMs, TimeUnit.MILLISECONDS);
+ } catch (ExecutionException e) {
+ throw (PulsarAdminException) e.getCause();
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ throw new PulsarAdminException(e);
+ } catch (TimeoutException e) {
+ throw new PulsarAdminException.TimeoutException(e);
+ }
+ }
+
+ @Override
+ public CompletableFuture<Void> removeInactiveTopicPoliciesAsync(String
topic) {
+ TopicName topicName = validateTopic(topic);
+ WebTarget path = topicPath(topicName, "inactiveTopicPolicies");
+ return asyncDeleteRequest(path);
+ }
+
+ @Override
+ public void removeInactiveTopicPolicies(String topic) throws
PulsarAdminException {
+ try {
+ removeInactiveTopicPoliciesAsync(topic).get(this.readTimeoutMs,
TimeUnit.MILLISECONDS);
+ } catch (ExecutionException e) {
+ throw (PulsarAdminException) e.getCause();
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ throw new PulsarAdminException(e);
+ } catch (TimeoutException e) {
+ throw new PulsarAdminException.TimeoutException(e);
+ }
+ }
+
+ @Override
public DelayedDeliveryPolicies getDelayedDeliveryPolicy(String topic)
throws PulsarAdminException {
try {
return getDelayedDeliveryPolicyAsync(topic).
diff --git
a/pulsar-client-tools-test/src/test/java/org/apache/pulsar/admin/cli/PulsarAdminToolTest.java
b/pulsar-client-tools-test/src/test/java/org/apache/pulsar/admin/cli/PulsarAdminToolTest.java
index 74bf417..965887d 100644
---
a/pulsar-client-tools-test/src/test/java/org/apache/pulsar/admin/cli/PulsarAdminToolTest.java
+++
b/pulsar-client-tools-test/src/test/java/org/apache/pulsar/admin/cli/PulsarAdminToolTest.java
@@ -760,6 +760,14 @@ public class PulsarAdminToolTest {
cmdTopics.run(split("set-max-unacked-messages-on-subscription
persistent://myprop/clust/ns1/ds1 -m 99"));
verify(mockTopics).setMaxUnackedMessagesOnSubscription("persistent://myprop/clust/ns1/ds1",
99);
+ cmdTopics.run(split("get-inactive-topic-policies
persistent://myprop/clust/ns1/ds1"));
+
verify(mockTopics).getInactiveTopicPolicies("persistent://myprop/clust/ns1/ds1");
+ cmdTopics.run(split("remove-inactive-topic-policies
persistent://myprop/clust/ns1/ds1"));
+
verify(mockTopics).removeInactiveTopicPolicies("persistent://myprop/clust/ns1/ds1");
+ cmdTopics.run(split("set-inactive-topic-policies
persistent://myprop/clust/ns1/ds1 -e -t 1s -m delete_when_no_subscriptions"));
+
verify(mockTopics).setInactiveTopicPolicies("persistent://myprop/clust/ns1/ds1"
+ , new
InactiveTopicPolicies(InactiveTopicDeleteMode.delete_when_no_subscriptions, 1,
true));
+
// argument matcher for the timestamp in reset cursor. Since we can't
verify exact timestamp, we check for a
// range of +/- 1 second of the expected timestamp
class TimestampMatcher implements ArgumentMatcher<Long> {
diff --git
a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdTopics.java
b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdTopics.java
index 7835372..f5ccaa1 100644
---
a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdTopics.java
+++
b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdTopics.java
@@ -50,6 +50,8 @@ import org.apache.pulsar.client.impl.MessageIdImpl;
import org.apache.pulsar.common.policies.data.BacklogQuota;
import org.apache.pulsar.common.policies.data.DelayedDeliveryPolicies;
import org.apache.pulsar.common.policies.data.DispatchRate;
+import org.apache.pulsar.common.policies.data.InactiveTopicDeleteMode;
+import org.apache.pulsar.common.policies.data.InactiveTopicPolicies;
import org.apache.pulsar.common.policies.data.OffloadPolicies;
import org.apache.pulsar.common.policies.data.PersistencePolicies;
import org.apache.pulsar.common.policies.data.PersistentTopicInternalStats;
@@ -148,6 +150,9 @@ public class CmdTopics extends CmdBase {
jcommander.addCommand("get-maxProducers", new GetMaxProducers());
jcommander.addCommand("set-maxProducers", new SetMaxProducers());
jcommander.addCommand("remove-maxProducers", new RemoveMaxProducers());
+ jcommander.addCommand("get-inactive-topic-policies", new
GetInactiveTopicPolicies());
+ jcommander.addCommand("set-inactive-topic-policies", new
SetInactiveTopicPolicies());
+ jcommander.addCommand("remove-inactive-topic-policies", new
RemoveInactiveTopicPolicies());
}
@Parameters(commandDescription = "Get the list of topics under a
namespace.")
@@ -1496,4 +1501,66 @@ public class CmdTopics extends CmdBase {
admin.topics().removeMaxProducers(persistentTopic);
}
}
+
+ @Parameters(commandDescription = "Get the inactive topic policies on a
topic")
+ private class GetInactiveTopicPolicies extends CliCommand {
+ @Parameter(description = "persistent://tenant/namespace/topic",
required = true)
+ private java.util.List<String> params;
+
+ @Override
+ void run() throws PulsarAdminException {
+ String persistentTopic = validatePersistentTopic(params);
+ print(admin.topics().getInactiveTopicPolicies(persistentTopic));
+ }
+ }
+
+ @Parameters(commandDescription = "Set the inactive topic policies on a
topic")
+ private class SetInactiveTopicPolicies extends CliCommand {
+ @Parameter(description = "persistent://tenant/namespace/topic",
required = true)
+ private java.util.List<String> params;
+
+ @Parameter(names = { "--enable-delete-while-inactive", "-e" },
description = "Enable delete while inactive")
+ private boolean enableDeleteWhileInactive = false;
+
+ @Parameter(names = { "--disable-delete-while-inactive", "-d" },
description = "Disable delete while inactive")
+ private boolean disableDeleteWhileInactive = false;
+
+ @Parameter(names = {"--max-inactive-duration", "-t"}, description =
"Max duration of topic inactivity in seconds" +
+ ",topics that are inactive for longer than this value will be
deleted (eg: 1s, 10s, 1m, 5h, 3d)", required = true)
+ private String deleteInactiveTopicsMaxInactiveDuration;
+
+ @Parameter(names = { "--delete-mode", "-m" }, description = "Mode of
delete inactive topic" +
+ ",Valid options are: [delete_when_no_subscriptions,
delete_when_subscriptions_caught_up]", required = true)
+ private String inactiveTopicDeleteMode;
+
+ @Override
+ void run() throws PulsarAdminException {
+ String persistentTopic = validatePersistentTopic(params);
+ long maxInactiveDurationInSeconds =
TimeUnit.SECONDS.toSeconds(RelativeTimeUtil.parseRelativeTimeInSeconds(deleteInactiveTopicsMaxInactiveDuration));
+
+ if (enableDeleteWhileInactive == disableDeleteWhileInactive) {
+ throw new ParameterException("Need to specify either
enable-delete-while-inactive or disable-delete-while-inactive");
+ }
+ InactiveTopicDeleteMode deleteMode = null;
+ try {
+ deleteMode =
InactiveTopicDeleteMode.valueOf(inactiveTopicDeleteMode);
+ } catch (IllegalArgumentException e) {
+ throw new ParameterException("delete mode can only be set to
delete_when_no_subscriptions or delete_when_subscriptions_caught_up");
+ }
+ admin.topics().setInactiveTopicPolicies(persistentTopic,
+ new InactiveTopicPolicies(deleteMode, (int)
maxInactiveDurationInSeconds, enableDeleteWhileInactive));
+ }
+ }
+
+ @Parameters(commandDescription = "Remove inactive topic policies from a
topic")
+ private class RemoveInactiveTopicPolicies extends CliCommand {
+ @Parameter(description = "persistent://tenant/namespace/topic",
required = true)
+ private java.util.List<String> params;
+
+ @Override
+ void run() throws PulsarAdminException {
+ String persistentTopic = validatePersistentTopic(params);
+ admin.topics().removeInactiveTopicPolicies(persistentTopic);
+ }
+ }
}
diff --git
a/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/TopicPolicies.java
b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/TopicPolicies.java
index baa8aea..343770d 100644
---
a/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/TopicPolicies.java
+++
b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/TopicPolicies.java
@@ -50,13 +50,18 @@ public class TopicPolicies {
private Long delayedDeliveryTickTimeMillis = null;
private Boolean delayedDeliveryEnabled = null;
private OffloadPolicies offloadPolicies;
+ private InactiveTopicPolicies inactiveTopicPolicies = null;
+ private DispatchRate dispatchRate = null;
+ private Long compactionThreshold = null;
+ private PublishRate publishRate = null;
+
+ public boolean isInactiveTopicPoliciesSet() {
+ return inactiveTopicPolicies != null;
+ }
public boolean isOffloadPoliciesSet() {
return offloadPolicies != null;
}
- private DispatchRate dispatchRate = null;
- private Long compactionThreshold = null;
- private PublishRate publishRate = null;
public boolean isMaxUnackedMessagesOnConsumerSet() {
return maxUnackedMessagesOnConsumer != null;