This is an automated email from the ASF dual-hosted git repository.

penghui pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 4a441db  Support topic-level inactiveTopicPolicies (#7986)
4a441db is described below

commit 4a441db363efe1adeefcfca83cade5232c5b1666
Author: feynmanlin <[email protected]>
AuthorDate: Sat Sep 5 21:20:50 2020 +0800

    Support topic-level inactiveTopicPolicies (#7986)
    
    ### Motivation
    Support topic-level inactiveTopicPolicies
    
    ### Modifications
    Support set/get/remove inactiveTopicPolicies policy on topic level.
---
 .../broker/admin/impl/PersistentTopicsBase.java    |  18 +-
 .../pulsar/broker/admin/v2/PersistentTopics.java   |  64 +++++++
 .../broker/service/persistent/PersistentTopic.java |  49 ++++-
 .../broker/service/InactiveTopicDeleteTest.java    | 207 ++++++++++++++++++++-
 .../org/apache/pulsar/client/admin/Topics.java     |  47 +++++
 .../pulsar/client/admin/internal/TopicsImpl.java   |  80 ++++++++
 .../pulsar/admin/cli/PulsarAdminToolTest.java      |   8 +
 .../org/apache/pulsar/admin/cli/CmdTopics.java     |  67 +++++++
 .../pulsar/common/policies/data/TopicPolicies.java |  11 +-
 9 files changed, 528 insertions(+), 23 deletions(-)

diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
index c82d136..b6a0b2d 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
@@ -97,9 +97,8 @@ import org.apache.pulsar.client.impl.MessageIdImpl;
 import org.apache.pulsar.common.allocator.PulsarByteBufAllocator;
 import org.apache.pulsar.common.naming.PartitionedManagedLedgerInfo;
 import org.apache.pulsar.common.policies.data.DispatchRate;
+import org.apache.pulsar.common.policies.data.InactiveTopicPolicies;
 import org.apache.pulsar.common.policies.data.PersistencePolicies;
-import org.apache.pulsar.common.policies.data.PolicyName;
-import org.apache.pulsar.common.policies.data.PolicyOperation;
 import org.apache.pulsar.common.policies.data.OffloadPolicies;
 import org.apache.pulsar.common.policies.data.DelayedDeliveryPolicies;
 import org.apache.pulsar.common.policies.data.PublishRate;
@@ -843,6 +842,21 @@ public class PersistentTopicsBase extends AdminResource {
         return completableFuture;
     }
 
+    protected CompletableFuture<Void> 
internalSetInactiveTopicPolicies(InactiveTopicPolicies inactiveTopicPolicies) {
+        TopicPolicies topicPolicies = null;
+        try {
+            topicPolicies = 
pulsar().getTopicPoliciesService().getTopicPolicies(topicName);
+        } catch (BrokerServiceException.TopicPoliciesCacheNotInitException e) {
+            log.error("Topic {} policies cache have not init.", topicName);
+            return FutureUtil.failedFuture(new 
RestException(Status.PRECONDITION_FAILED, "Policies cache have not init"));
+        }
+        if (topicPolicies == null) {
+            topicPolicies = new TopicPolicies();
+        }
+        topicPolicies.setInactiveTopicPolicies(inactiveTopicPolicies);
+        return 
pulsar().getTopicPoliciesService().updateTopicPoliciesAsync(topicName, 
topicPolicies);
+    }
+
     private CompletableFuture<Void> 
internalUpdateOffloadPolicies(OffloadPolicies offloadPolicies, TopicName 
topicName) {
         return 
pulsar().getBrokerService().getTopicIfExists(topicName.toString())
                 .thenAccept(optionalTopic -> {
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java
index c897124..936a7df 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java
@@ -57,6 +57,7 @@ import 
org.apache.pulsar.common.partition.PartitionedTopicMetadata;
 import org.apache.pulsar.common.policies.data.AuthAction;
 import org.apache.pulsar.common.policies.data.BacklogQuota;
 import org.apache.pulsar.common.policies.data.DispatchRate;
+import org.apache.pulsar.common.policies.data.InactiveTopicPolicies;
 import org.apache.pulsar.common.policies.data.PersistencePolicies;
 import org.apache.pulsar.common.policies.data.DelayedDeliveryPolicies;
 import org.apache.pulsar.common.policies.data.OffloadPolicies;
@@ -378,6 +379,69 @@ public class PersistentTopics extends PersistentTopicsBase 
{
     }
 
     @GET
+    @Path("/{tenant}/{namespace}/{topic}/inactiveTopicPolicies")
+    @ApiOperation(value = "Get inactive topic policies on a topic.")
+    @ApiResponses(value = {@ApiResponse(code = 403, message = "Don't have 
admin permission"),
+            @ApiResponse(code = 404, message = "Tenant or cluster or namespace 
or topic doesn't exist"),
+            @ApiResponse(code = 500, message = "Internal server error"),})
+    public void getInactiveTopicPolicies(@Suspended final AsyncResponse 
asyncResponse,
+                                         @PathParam("tenant") String tenant,
+                                         @PathParam("namespace") String 
namespace,
+                                         @PathParam("topic") @Encoded String 
encodedTopic) {
+        validateTopicName(tenant, namespace, encodedTopic);
+        TopicPolicies topicPolicies = getTopicPolicies(topicName).orElse(new 
TopicPolicies());
+        if (topicPolicies.isInactiveTopicPoliciesSet()) {
+            asyncResponse.resume(topicPolicies.getInactiveTopicPolicies());
+        } else {
+            asyncResponse.resume(Response.noContent().build());
+        }
+    }
+
+    @POST
+    @Path("/{tenant}/{namespace}/{topic}/inactiveTopicPolicies")
+    @ApiOperation(value = "Set inactive topic policies on a topic.")
+    @ApiResponses(value = {@ApiResponse(code = 403, message = "Don't have 
admin permission"),
+            @ApiResponse(code = 404, message = "Tenant or cluster or namespace 
or topic doesn't exist"),})
+    public void setInactiveTopicPolicies(@Suspended final AsyncResponse 
asyncResponse,
+                                                @PathParam("tenant") String 
tenant,
+                                                @PathParam("namespace") String 
namespace,
+                                                @PathParam("topic") @Encoded 
String encodedTopic,
+                                                @ApiParam(value = "inactive 
topic policies for the specified topic")
+                                                        InactiveTopicPolicies 
inactiveTopicPolicies) {
+        validateTopicName(tenant, namespace, encodedTopic);
+        validateAdminAccessForTenant(tenant);
+        validatePoliciesReadOnlyAccess();
+        checkTopicLevelPolicyEnable();
+        if (topicName.isGlobal()) {
+            validateGlobalNamespaceOwnership(namespaceName);
+        }
+        
internalSetInactiveTopicPolicies(inactiveTopicPolicies).whenComplete((res, ex) 
-> {
+            if (ex instanceof RestException) {
+                log.error("Failed set InactiveTopicPolicies", ex);
+                asyncResponse.resume(ex);
+            } else if (ex != null) {
+                log.error("Failed set InactiveTopicPolicies", ex);
+                asyncResponse.resume(new RestException(ex));
+            } else {
+                asyncResponse.resume(Response.noContent().build());
+            }
+        });
+    }
+
+    @DELETE
+    @Path("/{tenant}/{namespace}/{topic}/inactiveTopicPolicies")
+    @ApiOperation(value = "Delete inactive topic policies on a topic.")
+    @ApiResponses(value = {@ApiResponse(code = 403, message = "Don't have 
admin permission"),
+            @ApiResponse(code = 404, message = "Tenant or cluster or namespace 
or topic doesn't exist"),})
+    public void deleteInactiveTopicPolicies(@Suspended final AsyncResponse 
asyncResponse,
+                                                       @PathParam("tenant") 
String tenant,
+                                                       @PathParam("namespace") 
String namespace,
+                                                       @PathParam("topic") 
@Encoded String encodedTopic) {
+        validateTopicName(tenant, namespace, encodedTopic);
+        setInactiveTopicPolicies(asyncResponse, tenant, namespace, 
encodedTopic, null);
+    }
+
+    @GET
     @Path("/{tenant}/{namespace}/{topic}/maxUnackedMessagesOnSubscription")
     @ApiOperation(value = "Get max unacked messages per subscription config on 
a topic.")
     @ApiResponses(value = {@ApiResponse(code = 403, message = "Don't have 
admin permission"),
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
index 6f9860b..8ff51f6 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
@@ -1825,14 +1825,19 @@ public class PersistentTopic extends AbstractTopic 
implements Topic, AddEntryCal
             delayedDeliveryTickTimeMillis = 
data.delayed_delivery_policies.getTickTime();
             delayedDeliveryEnabled = data.delayed_delivery_policies.isActive();
         }
+        //If the topic-level policy already exists, the namespace-level policy 
cannot override the topic-level policy.
+        TopicPolicies topicPolicies = getTopicPolicies(TopicName.get(topic));
         if (data.inactive_topic_policies != null) {
-            this.inactiveTopicPolicies = data.inactive_topic_policies;
+            if (topicPolicies == null || 
!topicPolicies.isInactiveTopicPoliciesSet()) {
+                this.inactiveTopicPolicies = data.inactive_topic_policies;
+            }
         } else {
             ServiceConfiguration cfg = 
brokerService.getPulsar().getConfiguration();
             resetInactiveTopicPolicies(cfg.getBrokerDeleteInactiveTopicsMode()
                     , 
cfg.getBrokerDeleteInactiveTopicsMaxInactiveDurationSeconds(), 
cfg.isBrokerDeleteInactiveTopicsEnabled());
         }
 
+
         initializeDispatchRateLimiterIfNeeded(Optional.ofNullable(data));
 
         this.updateMaxPublishRate(data);
@@ -2344,24 +2349,50 @@ public class PersistentTopic extends AbstractTopic 
implements Topic, AddEntryCal
         if (log.isDebugEnabled()) {
             log.debug("[{}] update topic policy: {}", topic, policies);
         }
-
-        
initializeTopicDispatchRateLimiterIfNeeded(Optional.ofNullable(policies));
-        if (this.dispatchRateLimiter.isPresent() && policies != null
-                && policies.getDispatchRate() != null) {
+        if (policies == null) {
+            return;
+        }
+        Optional<Policies> namespacePolicies = getNamespacePolicies();
+        initializeTopicDispatchRateLimiterIfNeeded(policies);
+        if (this.dispatchRateLimiter.isPresent() && policies.getDispatchRate() 
!= null) {
             dispatchRateLimiter.ifPresent(dispatchRateLimiter ->
                 
dispatchRateLimiter.updateDispatchRate(policies.getDispatchRate()));
         }
 
-        if (policies != null && policies.getPublishRate() != null) {
+        if (policies.getPublishRate() != null) {
             topicPolicyPublishRate = policies.getPublishRate();
             updateTopicPublishDispatcher();
         }
+
+        if (policies.isInactiveTopicPoliciesSet()) {
+            inactiveTopicPolicies = policies.getInactiveTopicPolicies();
+        } else {
+            //topic-level policies is null , so use namespace-level or 
broker-level
+            namespacePolicies.ifPresent(nsPolicies -> {
+                if (nsPolicies.inactive_topic_policies != null) {
+                    inactiveTopicPolicies = nsPolicies.inactive_topic_policies;
+                } else {
+                    ServiceConfiguration cfg = 
brokerService.getPulsar().getConfiguration();
+                    
resetInactiveTopicPolicies(cfg.getBrokerDeleteInactiveTopicsMode()
+                            , 
cfg.getBrokerDeleteInactiveTopicsMaxInactiveDurationSeconds(), 
cfg.isBrokerDeleteInactiveTopicsEnabled());
+                }
+            });
+        }
+    }
+
+    private Optional<Policies> getNamespacePolicies(){
+        try {
+            return 
Optional.ofNullable(brokerService.pulsar().getAdminClient().namespaces()
+                    .getPolicies(TopicName.get(topic).getNamespace()));
+        } catch (Exception e) {
+            log.error("get namespace policies fail", e);
+        }
+        return Optional.empty();
     }
 
-    private void 
initializeTopicDispatchRateLimiterIfNeeded(Optional<TopicPolicies> policies) {
+    private void initializeTopicDispatchRateLimiterIfNeeded(TopicPolicies 
policies) {
         synchronized (dispatchRateLimiter) {
-            if (!dispatchRateLimiter.isPresent() && policies.isPresent() &&
-                policies.get().getDispatchRate() != null) {
+            if (!dispatchRateLimiter.isPresent() && policies.getDispatchRate() 
!= null) {
                 this.dispatchRateLimiter = Optional.of(new 
DispatchRateLimiter(this, Type.TOPIC));
             }
         }
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/InactiveTopicDeleteTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/InactiveTopicDeleteTest.java
index 143a4ff..79a296e 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/InactiveTopicDeleteTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/InactiveTopicDeleteTest.java
@@ -23,6 +23,7 @@ import java.util.Arrays;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.UUID;
 
 import com.google.common.collect.Sets;
 import org.apache.pulsar.broker.service.persistent.PersistentTopic;
@@ -34,6 +35,9 @@ import 
org.apache.pulsar.common.policies.data.InactiveTopicPolicies;
 import org.testng.Assert;
 import org.testng.annotations.Test;
 
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertNull;
+
 public class InactiveTopicDeleteTest extends BrokerTestBase {
 
     protected void setup() throws Exception {
@@ -125,9 +129,9 @@ public class InactiveTopicDeleteTest extends BrokerTestBase 
{
         }
 
         Assert.assertTrue(policies.isDeleteWhileInactive());
-        Assert.assertEquals(policies.getInactiveTopicDeleteMode(), 
InactiveTopicDeleteMode.delete_when_no_subscriptions);
-        Assert.assertEquals(policies.getMaxInactiveDurationSeconds(), 1);
-        Assert.assertEquals(policies, 
admin.namespaces().getInactiveTopicPolicies(namespace));
+        assertEquals(policies.getInactiveTopicDeleteMode(), 
InactiveTopicDeleteMode.delete_when_no_subscriptions);
+        assertEquals(policies.getMaxInactiveDurationSeconds(), 1);
+        assertEquals(policies, 
admin.namespaces().getInactiveTopicPolicies(namespace));
 
         admin.namespaces().removeInactiveTopicPolicies(namespace);
         while (true) {
@@ -137,14 +141,14 @@ public class InactiveTopicDeleteTest extends 
BrokerTestBase {
                 break;
             }
         }
-        Assert.assertEquals(((PersistentTopic) 
pulsar.getBrokerService().getTopic(topic, 
false).get().get()).inactiveTopicPolicies
+        assertEquals(((PersistentTopic) 
pulsar.getBrokerService().getTopic(topic, 
false).get().get()).inactiveTopicPolicies
                 , defaultPolicy);
 
         policies = 
((PersistentTopic)pulsar.getBrokerService().getTopic(topic2,false).get().get()).inactiveTopicPolicies;
         Assert.assertTrue(policies.isDeleteWhileInactive());
-        Assert.assertEquals(policies.getInactiveTopicDeleteMode(), 
InactiveTopicDeleteMode.delete_when_subscriptions_caught_up);
-        Assert.assertEquals(policies.getMaxInactiveDurationSeconds(), 1);
-        Assert.assertEquals(policies, 
admin.namespaces().getInactiveTopicPolicies(namespace2));
+        assertEquals(policies.getInactiveTopicDeleteMode(), 
InactiveTopicDeleteMode.delete_when_subscriptions_caught_up);
+        assertEquals(policies.getMaxInactiveDurationSeconds(), 1);
+        assertEquals(policies, 
admin.namespaces().getInactiveTopicPolicies(namespace2));
 
         admin.namespaces().removeInactiveTopicPolicies(namespace2);
         while (true) {
@@ -154,7 +158,7 @@ public class InactiveTopicDeleteTest extends BrokerTestBase 
{
                 break;
             }
         }
-        Assert.assertEquals(((PersistentTopic) 
pulsar.getBrokerService().getTopic(topic2, 
false).get().get()).inactiveTopicPolicies
+        assertEquals(((PersistentTopic) 
pulsar.getBrokerService().getTopic(topic2, 
false).get().get()).inactiveTopicPolicies
                 , defaultPolicy);
 
         super.internalCleanup();
@@ -166,7 +170,6 @@ public class InactiveTopicDeleteTest extends BrokerTestBase 
{
         final String namespace2 = "prop/ns-abc2";
         final String namespace3 = "prop/ns-abc3";
         List<String> namespaceList = Arrays.asList(namespace2, namespace3);
-
         conf.setBrokerDeleteInactiveTopicsEnabled(true);
         conf.setBrokerDeleteInactiveTopicsFrequencySeconds(1);
         super.baseSetup();
@@ -293,4 +296,190 @@ public class InactiveTopicDeleteTest extends 
BrokerTestBase {
 
         super.internalCleanup();
     }
+
+    @Test(timeOut = 20000)
+    public void testTopicLevelInActiveTopicApi() throws Exception {
+        super.resetConfig();
+        conf.setSystemTopicEnabled(true);
+        conf.setTopicLevelPoliciesEnabled(true);
+        super.baseSetup();
+
+        final String topicName = 
"persistent://prop/ns-abc/testMaxInactiveDuration-" + 
UUID.randomUUID().toString();
+        admin.topics().createPartitionedTopic(topicName, 3);
+        //wait for cache init
+        Thread.sleep(2000);
+        InactiveTopicPolicies inactiveTopicPolicies = 
admin.topics().getInactiveTopicPolicies(topicName);
+        assertNull(inactiveTopicPolicies);
+
+        InactiveTopicPolicies policies = new InactiveTopicPolicies();
+        policies.setDeleteWhileInactive(true);
+        
policies.setInactiveTopicDeleteMode(InactiveTopicDeleteMode.delete_when_no_subscriptions);
+        policies.setMaxInactiveDurationSeconds(10);
+        admin.topics().setInactiveTopicPolicies(topicName, policies);
+        for (int i = 0; i < 50; i++) {
+            if (admin.topics().getInactiveTopicPolicies(topicName) != null) {
+                break;
+            }
+            Thread.sleep(100);
+        }
+        assertEquals(admin.topics().getInactiveTopicPolicies(topicName), 
policies);
+        admin.topics().removeInactiveTopicPolicies(topicName);
+        for (int i = 0; i < 50; i++) {
+            if (admin.topics().getInactiveTopicPolicies(topicName) == null) {
+                break;
+            }
+            Thread.sleep(100);
+        }
+        assertNull(admin.topics().getInactiveTopicPolicies(topicName));
+
+        super.internalCleanup();
+    }
+
+    @Test(timeOut = 30000)
+    public void testTopicLevelInactivePolicyUpdateAndClean() throws Exception {
+        super.resetConfig();
+        conf.setSystemTopicEnabled(true);
+        conf.setTopicLevelPoliciesEnabled(true);
+        conf.setBrokerDeleteInactiveTopicsEnabled(true);
+        conf.setBrokerDeleteInactiveTopicsMaxInactiveDurationSeconds(1000);
+        
conf.setBrokerDeleteInactiveTopicsMode(InactiveTopicDeleteMode.delete_when_no_subscriptions);
+        InactiveTopicPolicies defaultPolicy = new 
InactiveTopicPolicies(InactiveTopicDeleteMode.delete_when_no_subscriptions
+                , 1000, true);
+
+        super.baseSetup();
+        //wait for cache init
+        Thread.sleep(2000);
+        final String namespace = "prop/ns-abc";
+        final String topic = 
"persistent://prop/ns-abc/testTopicLevelInactivePolicy" + 
UUID.randomUUID().toString();
+        final String topic2 = 
"persistent://prop/ns-abc/testTopicLevelInactivePolicy" + 
UUID.randomUUID().toString();
+        final String topic3 = 
"persistent://prop/ns-abc/testTopicLevelInactivePolicy" + 
UUID.randomUUID().toString();
+        List<String> topics = Arrays.asList(topic, topic2, topic3);
+
+        for (String tp : topics) {
+            admin.topics().createNonPartitionedTopic(tp);
+        }
+
+        InactiveTopicPolicies inactiveTopicPolicies =
+                new 
InactiveTopicPolicies(InactiveTopicDeleteMode.delete_when_no_subscriptions, 1, 
true);
+        admin.topics().setInactiveTopicPolicies(topic, inactiveTopicPolicies);
+        
inactiveTopicPolicies.setInactiveTopicDeleteMode(InactiveTopicDeleteMode.delete_when_subscriptions_caught_up);
+        admin.topics().setInactiveTopicPolicies(topic2, inactiveTopicPolicies);
+        
inactiveTopicPolicies.setInactiveTopicDeleteMode(InactiveTopicDeleteMode.delete_when_no_subscriptions);
+        admin.topics().setInactiveTopicPolicies(topic3, inactiveTopicPolicies);
+
+        //wait for cache
+        for (int i = 0; i < 50; i++) {
+            if (admin.topics().getInactiveTopicPolicies(topic) != null) {
+                break;
+            }
+            Thread.sleep(100);
+        }
+        InactiveTopicPolicies policies = ((PersistentTopic) 
pulsar.getBrokerService()
+                .getTopic(topic, false).get().get()).inactiveTopicPolicies;
+        Assert.assertTrue(policies.isDeleteWhileInactive());
+        assertEquals(policies.getInactiveTopicDeleteMode(), 
InactiveTopicDeleteMode.delete_when_no_subscriptions);
+        assertEquals(policies.getMaxInactiveDurationSeconds(), 1);
+        assertEquals(policies, admin.topics().getInactiveTopicPolicies(topic));
+
+        admin.topics().removeInactiveTopicPolicies(topic);
+        for (int i = 0; i < 50; i++) {
+            if (admin.topics().getInactiveTopicPolicies(topic) == null) {
+                break;
+            }
+            Thread.sleep(100);
+        }
+        //Only the broker-level policies is set, so after removing the 
topic-level policies
+        // , the topic will use the broker-level policies
+        assertEquals(((PersistentTopic) 
pulsar.getBrokerService().getTopic(topic, 
false).get().get()).inactiveTopicPolicies
+                , defaultPolicy);
+
+        policies = 
((PersistentTopic)pulsar.getBrokerService().getTopic(topic2,false).get().get()).inactiveTopicPolicies;
+        Assert.assertTrue(policies.isDeleteWhileInactive());
+        assertEquals(policies.getInactiveTopicDeleteMode(), 
InactiveTopicDeleteMode.delete_when_subscriptions_caught_up);
+        assertEquals(policies.getMaxInactiveDurationSeconds(), 1);
+        assertEquals(policies, 
admin.topics().getInactiveTopicPolicies(topic2));
+        inactiveTopicPolicies.setMaxInactiveDurationSeconds(999);
+        //Both broker level and namespace level policies are set, so after 
removing the topic level policies
+        // , the topic will use the namespace level policies
+        admin.namespaces().setInactiveTopicPolicies(namespace, 
inactiveTopicPolicies);
+        //wait for zk
+        Thread.sleep(1000);
+        admin.topics().removeInactiveTopicPolicies(topic2);
+        for (int i = 0; i < 50; i++) {
+            if (admin.topics().getInactiveTopicPolicies(topic2) == null) {
+                break;
+            }
+            Thread.sleep(100);
+        }
+        InactiveTopicPolicies nsPolicies = ((PersistentTopic) 
pulsar.getBrokerService()
+                .getTopic(topic2, false).get().get()).inactiveTopicPolicies;
+        assertEquals(nsPolicies.getMaxInactiveDurationSeconds(), 999);
+
+        super.internalCleanup();
+    }
+
+    @Test(timeOut = 30000)
+    public void testDeleteWhenNoSubscriptionsWithTopicLevelPolicies() throws 
Exception {
+        final String namespace = "prop/ns-abc";
+        conf.setSystemTopicEnabled(true);
+        conf.setTopicLevelPoliciesEnabled(true);
+        conf.setBrokerDeleteInactiveTopicsEnabled(true);
+        conf.setBrokerDeleteInactiveTopicsFrequencySeconds(1);
+        super.baseSetup();
+        //wait for cache init
+        Thread.sleep(2000);
+        final String topic = "persistent://prop/ns-abc/test-" + 
UUID.randomUUID();
+        final String topic2 = "persistent://prop/ns-abc/test-" + 
UUID.randomUUID();
+        final String topic3 = "persistent://prop/ns-abc/test-" + 
UUID.randomUUID();
+        List<String> topics = Arrays.asList(topic, topic2, topic3);
+        //create producer/consumer and close
+        Map<String, String> topicToSub = new HashMap<>();
+        for (String tp : topics) {
+            Producer<byte[]> producer = 
pulsarClient.newProducer().topic(tp).create();
+            String subName = "sub" + System.currentTimeMillis();
+            topicToSub.put(tp, subName);
+            Consumer<byte[]> consumer = 
pulsarClient.newConsumer().topic(tp).subscriptionName(subName).subscribe();
+            for (int i = 0; i < 10; i++) {
+                producer.send("Pulsar".getBytes());
+            }
+            consumer.close();
+            producer.close();
+            Thread.sleep(1);
+        }
+        // "topic" use delete_when_no_subscriptions, "topic2" use 
delete_when_subscriptions_caught_up
+        // "topic3" use default:delete_when_no_subscriptions
+        InactiveTopicPolicies inactiveTopicPolicies =
+                new 
InactiveTopicPolicies(InactiveTopicDeleteMode.delete_when_no_subscriptions,1,true);
+        admin.topics().setInactiveTopicPolicies(topic, inactiveTopicPolicies);
+        
inactiveTopicPolicies.setInactiveTopicDeleteMode(InactiveTopicDeleteMode.delete_when_subscriptions_caught_up);
+        admin.topics().setInactiveTopicPolicies(topic2, inactiveTopicPolicies);
+
+        //wait for update
+        for (int i = 0; i < 50; i++) {
+            if (admin.topics().getInactiveTopicPolicies(topic2) != null) {
+                break;
+            }
+            Thread.sleep(100);
+        }
+
+        // topic should still exist
+        Thread.sleep(2000);
+        Assert.assertTrue(admin.topics().getList(namespace).contains(topic));
+        Assert.assertTrue(admin.topics().getList(namespace).contains(topic2));
+        Assert.assertTrue(admin.topics().getList(namespace).contains(topic3));
+
+        // no backlog, trigger delete_when_subscriptions_caught_up
+        admin.topics().skipAllMessages(topic2, topicToSub.remove(topic2));
+        Thread.sleep(2000);
+        Assert.assertFalse(admin.topics().getList(namespace).contains(topic2));
+        // delete subscription, trigger delete_when_no_subscriptions
+        for (Map.Entry<String, String> entry : topicToSub.entrySet()) {
+            admin.topics().deleteSubscription(entry.getKey(), 
entry.getValue());
+        }
+        Thread.sleep(2000);
+        Assert.assertFalse(admin.topics().getList(namespace).contains(topic));
+        Assert.assertFalse(admin.topics().getList(namespace).contains(topic3));
+
+        super.internalCleanup();
+    }
 }
diff --git 
a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/Topics.java 
b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/Topics.java
index 89d4ba2..468f222 100644
--- 
a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/Topics.java
+++ 
b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/Topics.java
@@ -35,6 +35,7 @@ import org.apache.pulsar.common.policies.data.AuthAction;
 import org.apache.pulsar.common.policies.data.BacklogQuota;
 import org.apache.pulsar.common.policies.data.DelayedDeliveryPolicies;
 import org.apache.pulsar.common.policies.data.DispatchRate;
+import org.apache.pulsar.common.policies.data.InactiveTopicPolicies;
 import org.apache.pulsar.common.policies.data.OffloadPolicies;
 import org.apache.pulsar.common.policies.data.PartitionedTopicInternalStats;
 import org.apache.pulsar.common.policies.data.PartitionedTopicStats;
@@ -1711,6 +1712,52 @@ public interface Topics {
     CompletableFuture<Void> removeMaxUnackedMessagesOnConsumerAsync(String 
topic);
 
     /**
+     * get inactive topic policies of a topic.
+     * @param topic
+     * @return
+     * @throws PulsarAdminException
+     */
+    InactiveTopicPolicies getInactiveTopicPolicies(String topic) throws 
PulsarAdminException;
+
+    /**
+     * get inactive topic policies of a topic asynchronously.
+     * @param topic
+     * @return
+     */
+    CompletableFuture<InactiveTopicPolicies> 
getInactiveTopicPoliciesAsync(String topic);
+
+    /**
+     * set inactive topic policies of a topic.
+     * @param topic
+     * @param maxNum
+     * @throws PulsarAdminException
+     */
+    void setInactiveTopicPolicies(String topic
+            , InactiveTopicPolicies inactiveTopicPolicies) throws 
PulsarAdminException;
+
+    /**
+     * set inactive topic policies of a topic asynchronously.
+     * @param topic
+     * @param maxNum
+     * @return
+     */
+    CompletableFuture<Void> setInactiveTopicPoliciesAsync(String topic, 
InactiveTopicPolicies inactiveTopicPolicies);
+
+    /**
+     * remove inactive topic policies of a topic.
+     * @param topic
+     * @throws PulsarAdminException
+     */
+    void removeInactiveTopicPolicies(String topic) throws PulsarAdminException;
+
+    /**
+     * remove inactive topic policies of a topic asynchronously.
+     * @param topic
+     * @return
+     */
+    CompletableFuture<Void> removeInactiveTopicPoliciesAsync(String topic);
+
+    /**
      * get offload policies of a topic.
      * @param topic
      * @return
diff --git 
a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/TopicsImpl.java
 
b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/TopicsImpl.java
index f5e0a56..b4da586 100644
--- 
a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/TopicsImpl.java
+++ 
b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/TopicsImpl.java
@@ -71,6 +71,7 @@ import 
org.apache.pulsar.common.policies.data.BacklogQuota.BacklogQuotaType;
 import org.apache.pulsar.common.policies.data.DelayedDeliveryPolicies;
 import org.apache.pulsar.common.policies.data.DispatchRate;
 import org.apache.pulsar.common.policies.data.ErrorData;
+import org.apache.pulsar.common.policies.data.InactiveTopicPolicies;
 import org.apache.pulsar.common.policies.data.OffloadPolicies;
 import org.apache.pulsar.common.policies.data.PartitionedTopicInternalStats;
 import org.apache.pulsar.common.policies.data.PartitionedTopicStats;
@@ -1509,6 +1510,85 @@ public class TopicsImpl extends BaseResource implements 
Topics {
     }
 
     @Override
+    public InactiveTopicPolicies getInactiveTopicPolicies(String topic) throws 
PulsarAdminException {
+        try {
+            return getInactiveTopicPoliciesAsync(topic).
+                    get(this.readTimeoutMs, TimeUnit.MILLISECONDS);
+        } catch (ExecutionException e) {
+            throw (PulsarAdminException) e.getCause();
+        } catch (InterruptedException e) {
+            Thread.currentThread().interrupt();
+            throw new PulsarAdminException(e);
+        } catch (TimeoutException e) {
+            throw new PulsarAdminException.TimeoutException(e);
+        }
+    }
+
+    @Override
+    public CompletableFuture<InactiveTopicPolicies> 
getInactiveTopicPoliciesAsync(String topic) {
+        TopicName topicName = validateTopic(topic);
+        WebTarget path = topicPath(topicName, "inactiveTopicPolicies");
+        final CompletableFuture<InactiveTopicPolicies> future = new 
CompletableFuture<>();
+        asyncGetRequest(path, new InvocationCallback<InactiveTopicPolicies>() {
+            @Override
+            public void completed(InactiveTopicPolicies inactiveTopicPolicies) 
{
+                future.complete(inactiveTopicPolicies);
+            }
+
+            @Override
+            public void failed(Throwable throwable) {
+                
future.completeExceptionally(getApiException(throwable.getCause()));
+            }
+        });
+        return future;
+    }
+
+    @Override
+    public CompletableFuture<Void> setInactiveTopicPoliciesAsync(String topic
+            , InactiveTopicPolicies inactiveTopicPolicies) {
+        TopicName topicName = validateTopic(topic);
+        WebTarget path = topicPath(topicName, "inactiveTopicPolicies");
+        return asyncPostRequest(path, Entity.entity(inactiveTopicPolicies, 
MediaType.APPLICATION_JSON));
+    }
+
+    @Override
+    public void setInactiveTopicPolicies(String topic
+            , InactiveTopicPolicies inactiveTopicPolicies) throws 
PulsarAdminException {
+        try {
+            setInactiveTopicPoliciesAsync(topic, inactiveTopicPolicies)
+                    .get(this.readTimeoutMs, TimeUnit.MILLISECONDS);
+        } catch (ExecutionException e) {
+            throw (PulsarAdminException) e.getCause();
+        } catch (InterruptedException e) {
+            Thread.currentThread().interrupt();
+            throw new PulsarAdminException(e);
+        } catch (TimeoutException e) {
+            throw new PulsarAdminException.TimeoutException(e);
+        }
+    }
+
+    @Override
+    public CompletableFuture<Void> removeInactiveTopicPoliciesAsync(String 
topic) {
+        TopicName topicName = validateTopic(topic);
+        WebTarget path = topicPath(topicName, "inactiveTopicPolicies");
+        return asyncDeleteRequest(path);
+    }
+
+    @Override
+    public void removeInactiveTopicPolicies(String topic) throws 
PulsarAdminException {
+        try {
+            removeInactiveTopicPoliciesAsync(topic).get(this.readTimeoutMs, 
TimeUnit.MILLISECONDS);
+        } catch (ExecutionException e) {
+            throw (PulsarAdminException) e.getCause();
+        } catch (InterruptedException e) {
+            Thread.currentThread().interrupt();
+            throw new PulsarAdminException(e);
+        } catch (TimeoutException e) {
+            throw new PulsarAdminException.TimeoutException(e);
+        }
+    }
+
+    @Override
     public DelayedDeliveryPolicies getDelayedDeliveryPolicy(String topic) 
throws PulsarAdminException {
         try {
             return getDelayedDeliveryPolicyAsync(topic).
diff --git 
a/pulsar-client-tools-test/src/test/java/org/apache/pulsar/admin/cli/PulsarAdminToolTest.java
 
b/pulsar-client-tools-test/src/test/java/org/apache/pulsar/admin/cli/PulsarAdminToolTest.java
index 74bf417..965887d 100644
--- 
a/pulsar-client-tools-test/src/test/java/org/apache/pulsar/admin/cli/PulsarAdminToolTest.java
+++ 
b/pulsar-client-tools-test/src/test/java/org/apache/pulsar/admin/cli/PulsarAdminToolTest.java
@@ -760,6 +760,14 @@ public class PulsarAdminToolTest {
         cmdTopics.run(split("set-max-unacked-messages-on-subscription 
persistent://myprop/clust/ns1/ds1 -m 99"));
         
verify(mockTopics).setMaxUnackedMessagesOnSubscription("persistent://myprop/clust/ns1/ds1",
 99);
 
+        cmdTopics.run(split("get-inactive-topic-policies 
persistent://myprop/clust/ns1/ds1"));
+        
verify(mockTopics).getInactiveTopicPolicies("persistent://myprop/clust/ns1/ds1");
+        cmdTopics.run(split("remove-inactive-topic-policies 
persistent://myprop/clust/ns1/ds1"));
+        
verify(mockTopics).removeInactiveTopicPolicies("persistent://myprop/clust/ns1/ds1");
+        cmdTopics.run(split("set-inactive-topic-policies 
persistent://myprop/clust/ns1/ds1 -e -t 1s -m delete_when_no_subscriptions"));
+        
verify(mockTopics).setInactiveTopicPolicies("persistent://myprop/clust/ns1/ds1"
+                , new 
InactiveTopicPolicies(InactiveTopicDeleteMode.delete_when_no_subscriptions, 1, 
true));
+
         // argument matcher for the timestamp in reset cursor. Since we can't 
verify exact timestamp, we check for a
         // range of +/- 1 second of the expected timestamp
         class TimestampMatcher implements ArgumentMatcher<Long> {
diff --git 
a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdTopics.java 
b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdTopics.java
index 7835372..f5ccaa1 100644
--- 
a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdTopics.java
+++ 
b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdTopics.java
@@ -50,6 +50,8 @@ import org.apache.pulsar.client.impl.MessageIdImpl;
 import org.apache.pulsar.common.policies.data.BacklogQuota;
 import org.apache.pulsar.common.policies.data.DelayedDeliveryPolicies;
 import org.apache.pulsar.common.policies.data.DispatchRate;
+import org.apache.pulsar.common.policies.data.InactiveTopicDeleteMode;
+import org.apache.pulsar.common.policies.data.InactiveTopicPolicies;
 import org.apache.pulsar.common.policies.data.OffloadPolicies;
 import org.apache.pulsar.common.policies.data.PersistencePolicies;
 import org.apache.pulsar.common.policies.data.PersistentTopicInternalStats;
@@ -148,6 +150,9 @@ public class CmdTopics extends CmdBase {
         jcommander.addCommand("get-maxProducers", new GetMaxProducers());
         jcommander.addCommand("set-maxProducers", new SetMaxProducers());
         jcommander.addCommand("remove-maxProducers", new RemoveMaxProducers());
+        jcommander.addCommand("get-inactive-topic-policies", new 
GetInactiveTopicPolicies());
+        jcommander.addCommand("set-inactive-topic-policies", new 
SetInactiveTopicPolicies());
+        jcommander.addCommand("remove-inactive-topic-policies", new 
RemoveInactiveTopicPolicies());
     }
 
     @Parameters(commandDescription = "Get the list of topics under a 
namespace.")
@@ -1496,4 +1501,66 @@ public class CmdTopics extends CmdBase {
             admin.topics().removeMaxProducers(persistentTopic);
         }
     }
+
+    @Parameters(commandDescription = "Get the inactive topic policies on a 
topic")
+    private class GetInactiveTopicPolicies extends CliCommand {
+        @Parameter(description = "persistent://tenant/namespace/topic", 
required = true)
+        private java.util.List<String> params;
+
+        @Override
+        void run() throws PulsarAdminException {
+            String persistentTopic = validatePersistentTopic(params);
+            print(admin.topics().getInactiveTopicPolicies(persistentTopic));
+        }
+    }
+
+    @Parameters(commandDescription = "Set the inactive topic policies on a 
topic")
+    private class SetInactiveTopicPolicies extends CliCommand {
+        @Parameter(description = "persistent://tenant/namespace/topic", 
required = true)
+        private java.util.List<String> params;
+
+        @Parameter(names = { "--enable-delete-while-inactive", "-e" }, 
description = "Enable delete while inactive")
+        private boolean enableDeleteWhileInactive = false;
+
+        @Parameter(names = { "--disable-delete-while-inactive", "-d" }, 
description = "Disable delete while inactive")
+        private boolean disableDeleteWhileInactive = false;
+
+        @Parameter(names = {"--max-inactive-duration", "-t"}, description = 
"Max duration of topic inactivity in seconds" +
+                ",topics that are inactive for longer than this value will be 
deleted (eg: 1s, 10s, 1m, 5h, 3d)", required = true)
+        private String deleteInactiveTopicsMaxInactiveDuration;
+
+        @Parameter(names = { "--delete-mode", "-m" }, description = "Mode of 
delete inactive topic" +
+                ",Valid options are: [delete_when_no_subscriptions, 
delete_when_subscriptions_caught_up]", required = true)
+        private String inactiveTopicDeleteMode;
+
+        @Override
+        void run() throws PulsarAdminException {
+            String persistentTopic = validatePersistentTopic(params);
+            long maxInactiveDurationInSeconds = 
TimeUnit.SECONDS.toSeconds(RelativeTimeUtil.parseRelativeTimeInSeconds(deleteInactiveTopicsMaxInactiveDuration));
+
+            if (enableDeleteWhileInactive == disableDeleteWhileInactive) {
+                throw new ParameterException("Need to specify either 
enable-delete-while-inactive or disable-delete-while-inactive");
+            }
+            InactiveTopicDeleteMode deleteMode = null;
+            try {
+                deleteMode = 
InactiveTopicDeleteMode.valueOf(inactiveTopicDeleteMode);
+            } catch (IllegalArgumentException e) {
+                throw new ParameterException("delete mode can only be set to 
delete_when_no_subscriptions or delete_when_subscriptions_caught_up");
+            }
+            admin.topics().setInactiveTopicPolicies(persistentTopic,
+                    new InactiveTopicPolicies(deleteMode, (int) 
maxInactiveDurationInSeconds, enableDeleteWhileInactive));
+        }
+    }
+
+    @Parameters(commandDescription = "Remove inactive topic policies from a 
topic")
+    private class RemoveInactiveTopicPolicies extends CliCommand {
+        @Parameter(description = "persistent://tenant/namespace/topic", 
required = true)
+        private java.util.List<String> params;
+
+        @Override
+        void run() throws PulsarAdminException {
+            String persistentTopic = validatePersistentTopic(params);
+            admin.topics().removeInactiveTopicPolicies(persistentTopic);
+        }
+    }
 }
diff --git 
a/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/TopicPolicies.java
 
b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/TopicPolicies.java
index baa8aea..343770d 100644
--- 
a/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/TopicPolicies.java
+++ 
b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/TopicPolicies.java
@@ -50,13 +50,18 @@ public class TopicPolicies {
     private Long delayedDeliveryTickTimeMillis = null;
     private Boolean delayedDeliveryEnabled = null;
     private OffloadPolicies offloadPolicies;
+    private InactiveTopicPolicies inactiveTopicPolicies = null;
+    private DispatchRate dispatchRate = null;
+    private Long compactionThreshold = null;
+    private PublishRate publishRate = null;
+
+    public boolean isInactiveTopicPoliciesSet() {
+        return inactiveTopicPolicies != null;
+    }
 
     public boolean isOffloadPoliciesSet() {
         return offloadPolicies != null;
     }
-    private DispatchRate dispatchRate = null;
-    private Long compactionThreshold = null;
-    private PublishRate publishRate = null;
 
     public boolean isMaxUnackedMessagesOnConsumerSet() {
         return maxUnackedMessagesOnConsumer != null;

Reply via email to