This is an automated email from the ASF dual-hosted git repository.

penghui pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 74db656  Support get topic applied policy for DeduplicationStatus 
(#9339)
74db656 is described below

commit 74db656a76fc26036d72250d755ea20c89f65ccd
Author: feynmanlin <[email protected]>
AuthorDate: Mon Mar 1 13:06:12 2021 +0800

    Support get topic applied policy for DeduplicationStatus (#9339)
    
    Master Issue: #9216
    
    ### Modifications
    1) The api name of topic-level is consistent with that of namespace-level
    2) Fix the problem that the namespace-level policy cannot be removed
    3) Fix the problem that topic-level does not take effect when multiple 
levels of policy are set at the same time
    4) Added applied API
    
    ### Verifying this change
    Verify that the new API is correct
    Verify that the applied API is correct
    Verify that policies of different levels exist at the same time and whether 
the priority is correct
---
 .../pulsar/broker/admin/impl/NamespacesBase.java   |   7 +-
 .../broker/admin/impl/PersistentTopicsBase.java    |  15 ++-
 .../apache/pulsar/broker/admin/v2/Namespaces.java  |  20 +++
 .../pulsar/broker/admin/v2/PersistentTopics.java   |  30 +++--
 .../broker/service/persistent/PersistentTopic.java |   2 +
 .../service/persistent/TopicDuplicationTest.java   | 148 +++++++++++++++++----
 .../pulsar/client/api/ClientDeduplicationTest.java |  16 +++
 .../org/apache/pulsar/client/admin/Namespaces.java |  27 ++++
 .../org/apache/pulsar/client/admin/Topics.java     |  65 +++++++++
 .../client/admin/internal/NamespacesImpl.java      |  57 ++++++++
 .../pulsar/client/admin/internal/TopicsImpl.java   |  88 ++++++++++++
 .../pulsar/admin/cli/PulsarAdminToolTest.java      |  13 +-
 .../org/apache/pulsar/admin/cli/CmdNamespaces.java |  25 ++++
 .../org/apache/pulsar/admin/cli/CmdTopics.java     |  27 +++-
 14 files changed, 490 insertions(+), 50 deletions(-)

diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java
index 78e3d2f..b8841c1 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java
@@ -856,7 +856,7 @@ public abstract class NamespacesBase extends AdminResource {
         internalSetAutoSubscriptionCreation(asyncResponse, null);
     }
 
-    protected void internalModifyDeduplication(boolean enableDeduplication) {
+    protected void internalModifyDeduplication(Boolean enableDeduplication) {
         validateNamespacePolicyOperation(namespaceName, 
PolicyName.DEDUPLICATION, PolicyOperation.WRITE);
         validatePoliciesReadOnlyAccess();
         updatePolicies(path(POLICIES, namespaceName.toString()), policies ->{
@@ -1999,6 +1999,11 @@ public abstract class NamespacesBase extends 
AdminResource {
         }
     }
 
+    protected Boolean internalGetDeduplication() {
+        validateNamespacePolicyOperation(namespaceName, 
PolicyName.DEDUPLICATION, PolicyOperation.READ);
+        return getNamespacePolicies(namespaceName).deduplicationEnabled;
+    }
+
     protected Integer internalGetMaxConsumersPerTopic() {
         validateNamespacePolicyOperation(namespaceName, 
PolicyName.MAX_CONSUMERS, PolicyOperation.READ);
         return getNamespacePolicies(namespaceName).max_consumers_per_topic;
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
index 159a6be..b6844cb 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
@@ -2561,7 +2561,20 @@ public class PersistentTopicsBase extends AdminResource {
                 });
     }
 
-    protected CompletableFuture<Void> internalSetDeduplicationEnabled(Boolean 
enabled) {
+    protected CompletableFuture<Boolean> internalGetDeduplication(boolean 
applied) {
+        Boolean deduplicationEnabled = getTopicPolicies(topicName)
+                .map(TopicPolicies::getDeduplicationEnabled)
+                .orElseGet(() -> {
+                    if (applied) {
+                        Boolean enabled = 
getNamespacePolicies(namespaceName).deduplicationEnabled;
+                        return enabled == null ? 
config().isBrokerDeduplicationEnabled() : enabled;
+                    }
+                    return null;
+                });
+        return CompletableFuture.completedFuture(deduplicationEnabled);
+    }
+
+    protected CompletableFuture<Void> internalSetDeduplication(Boolean 
enabled) {
         TopicPolicies topicPolicies = null;
         try {
             topicPolicies = 
pulsar().getTopicPoliciesService().getTopicPolicies(topicName);
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/Namespaces.java 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/Namespaces.java
index a9ff3dd..79c4cc4 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/Namespaces.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/Namespaces.java
@@ -336,6 +336,16 @@ public class Namespaces extends NamespacesBase {
         internalSetSubscriptionExpirationTime(expirationTime);
     }
 
+    @GET
+    @Path("/{tenant}/{namespace}/deduplication")
+    @ApiOperation(value = "Get broker side deduplication for all topics in a 
namespace")
+    @ApiResponses(value = { @ApiResponse(code = 403, message = "Don't have 
admin permission"),
+            @ApiResponse(code = 404, message = "Tenant or cluster or namespace 
doesn't exist") })
+    public Boolean getDeduplication(@PathParam("tenant") String tenant, 
@PathParam("namespace") String namespace) {
+        validateNamespaceName(tenant, namespace);
+        return internalGetDeduplication();
+    }
+
     @POST
     @Path("/{tenant}/{namespace}/deduplication")
     @ApiOperation(value = "Enable or disable broker side deduplication for all 
topics in a namespace")
@@ -349,6 +359,16 @@ public class Namespaces extends NamespacesBase {
         internalModifyDeduplication(enableDeduplication);
     }
 
+    @DELETE
+    @Path("/{tenant}/{namespace}/deduplication")
+    @ApiOperation(value = "Remove broker side deduplication for all topics in 
a namespace")
+    @ApiResponses(value = { @ApiResponse(code = 403, message = "Don't have 
admin permission"),
+            @ApiResponse(code = 404, message = "Tenant or cluster or namespace 
doesn't exist") })
+    public void removeDeduplication(@PathParam("tenant") String tenant, 
@PathParam("namespace") String namespace) {
+        validateNamespaceName(tenant, namespace);
+        internalModifyDeduplication(null);
+    }
+
     @POST
     @Path("/{tenant}/{namespace}/autoTopicCreation")
     @ApiOperation(value = "Override broker's allowAutoTopicCreation setting 
for a namespace")
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java
index f98a2ee..1c41475 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java
@@ -1657,17 +1657,23 @@ public class PersistentTopics extends 
PersistentTopicsBase {
             @ApiResponse(code = 404, message = "Tenant or cluster or namespace 
or topic doesn't exist"),
             @ApiResponse(code = 405,
                     message = "Topic level policy is disabled, to enable the 
topic level policy and retry")})
-    public void getDeduplicationEnabled(@Suspended final AsyncResponse 
asyncResponse,
+    public void getDeduplication(@Suspended final AsyncResponse asyncResponse,
                              @PathParam("tenant") String tenant,
                              @PathParam("namespace") String namespace,
-                             @PathParam("topic") @Encoded String encodedTopic) 
{
+                             @PathParam("topic") @Encoded String encodedTopic,
+                             @QueryParam("applied") boolean applied) {
         validateTopicName(tenant, namespace, encodedTopic);
-        TopicPolicies topicPolicies = getTopicPolicies(topicName).orElse(new 
TopicPolicies());
-        if (topicPolicies.isDeduplicationSet()) {
-            asyncResponse.resume(topicPolicies.getDeduplicationEnabled());
-        } else {
-            asyncResponse.resume(Response.noContent().build());
-        }
+        internalGetDeduplication(applied).whenComplete((res, ex) -> {
+            if (ex instanceof RestException) {
+                log.error("Failed get Deduplication", ex);
+                asyncResponse.resume(ex);
+            } else if (ex != null) {
+                log.error("Failed get Deduplication", ex);
+                asyncResponse.resume(new RestException(ex));
+            } else {
+                asyncResponse.resume(res);
+            }
+        });
     }
 
     @POST
@@ -1677,7 +1683,7 @@ public class PersistentTopics extends 
PersistentTopicsBase {
             @ApiResponse(code = 404, message = "Tenant or cluster or namespace 
or topic doesn't exist"),
             @ApiResponse(code = 405,
                     message = "Topic level policy is disabled, to enable the 
topic level policy and retry")})
-    public void setDeduplicationEnabled(
+    public void setDeduplication(
             @Suspended final AsyncResponse asyncResponse,
             @PathParam("tenant") String tenant,
             @PathParam("namespace") String namespace,
@@ -1685,7 +1691,7 @@ public class PersistentTopics extends 
PersistentTopicsBase {
             @ApiParam(value = "DeduplicationEnabled policies for the specified 
topic")
                     Boolean enabled) {
         validateTopicName(tenant, namespace, encodedTopic);
-        internalSetDeduplicationEnabled(enabled).whenComplete((r, ex) -> {
+        internalSetDeduplication(enabled).whenComplete((r, ex) -> {
             if (ex instanceof RestException) {
                 log.error("Failed updated deduplication", ex);
                 asyncResponse.resume(ex);
@@ -1706,12 +1712,12 @@ public class PersistentTopics extends 
PersistentTopicsBase {
             @ApiResponse(code = 405,
                     message = "Topic level policy is disabled, to enable the 
topic level policy and retry"),
             @ApiResponse(code = 409, message = "Concurrent modification")})
-    public void removeDeduplicationEnabled(@Suspended final AsyncResponse 
asyncResponse,
+    public void removeDeduplication(@Suspended final AsyncResponse 
asyncResponse,
                                            @PathParam("tenant") String tenant,
                                            @PathParam("namespace") String 
namespace,
                                            @PathParam("topic") @Encoded String 
encodedTopic) {
         validateTopicName(tenant, namespace, encodedTopic);
-        setDeduplicationEnabled(asyncResponse, tenant, namespace, 
encodedTopic, null);
+        setDeduplication(asyncResponse, tenant, namespace, encodedTopic, null);
     }
 
     @GET
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
index 2b40965..3edf30a 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
@@ -2741,6 +2741,8 @@ public class PersistentTopic extends AbstractTopic
         replicators.forEach((name, replicator) -> replicator.getRateLimiter()
                 .ifPresent(DispatchRateLimiter::updateDispatchRate));
         updateUnackedMessagesExceededOnConsumer(null);
+
+        checkDeduplicationStatus();
     }
 
     private Optional<Policies> getNamespacePolicies() {
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/TopicDuplicationTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/TopicDuplicationTest.java
index fdea264..4a37d63 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/TopicDuplicationTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/TopicDuplicationTest.java
@@ -38,7 +38,9 @@ import org.testng.annotations.BeforeMethod;
 import org.testng.annotations.Test;
 import org.awaitility.Awaitility;
 
+import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
 import static org.testng.Assert.assertEquals;
 import static org.testng.Assert.assertNotEquals;
@@ -87,6 +89,121 @@ public class TopicDuplicationTest extends 
ProducerConsumerBase {
     }
 
     @Test(timeOut = 10000)
+    public void testTopicDuplicationApi2() throws Exception {
+        final String topicName = testTopic + UUID.randomUUID().toString();
+        admin.topics().createPartitionedTopic(topicName, 3);
+        waitCacheInit(topicName);
+        Boolean enabled = admin.topics().getDeduplicationStatus(topicName);
+        assertNull(enabled);
+
+        admin.topics().setDeduplicationStatus(topicName, true);
+        Awaitility.await().atMost(5, TimeUnit.SECONDS)
+                .until(() -> admin.topics().getDeduplicationStatus(topicName) 
!= null);
+        Assert.assertEquals(admin.topics().getDeduplicationStatus(topicName), 
true);
+
+        admin.topics().removeDeduplicationStatus(topicName);
+        Awaitility.await().atMost(5, TimeUnit.SECONDS)
+                .until(() -> 
admin.topics().getMaxUnackedMessagesOnSubscription(topicName) == null);
+        assertNull(admin.topics().getDeduplicationStatus(topicName));
+    }
+
+    @Test(timeOut = 10000)
+    public void testTopicDuplicationAppliedApi() throws Exception {
+        final String topicName = testTopic + UUID.randomUUID().toString();
+        waitCacheInit(topicName);
+        assertNull(admin.namespaces().getDeduplicationStatus(myNamespace));
+        assertNull(admin.topics().getDeduplicationStatus(topicName));
+        assertEquals(admin.topics().getDeduplicationStatus(topicName, 
true).booleanValue(),
+                conf.isBrokerDeduplicationEnabled());
+
+        admin.namespaces().setDeduplicationStatus(myNamespace, false);
+        Awaitility.await().untilAsserted(() -> 
assertFalse(admin.topics().getDeduplicationStatus(topicName, true)));
+        admin.topics().setDeduplicationStatus(topicName, true);
+        Awaitility.await().untilAsserted(() -> 
assertTrue(admin.topics().getDeduplicationStatus(topicName, true)));
+
+        admin.topics().removeDeduplicationStatus(topicName);
+        Awaitility.await().untilAsserted(() -> 
assertFalse(admin.topics().getDeduplicationStatus(topicName, true)));
+        admin.namespaces().removeDeduplicationStatus(myNamespace);
+        Awaitility.await().untilAsserted(() -> 
assertEquals(admin.topics().getDeduplicationStatus(topicName, 
true).booleanValue(),
+                conf.isBrokerDeduplicationEnabled()));
+    }
+
+    @Test(timeOut = 30000)
+    public void testDeduplicationPriority() throws Exception {
+        final String topicName = testTopic + UUID.randomUUID().toString();
+        final String producerName = "my-producer";
+        final int maxMsgNum = 5;
+        waitCacheInit(topicName);
+        //1) Start up producer and send msg.We specified the max sequenceId
+        @Cleanup
+        Producer<String> producer = 
pulsarClient.newProducer(Schema.STRING).topic(topicName)
+                .producerName(producerName).create();
+        long maxSeq = sendMessageAndGetMaxSeq(maxMsgNum, producer);
+        PersistentTopic persistentTopic =
+                (PersistentTopic) 
pulsar.getBrokerService().getTopicIfExists(topicName).get().get();
+        MessageDeduplication messageDeduplication = 
persistentTopic.getMessageDeduplication();
+        //broker-level deduplication is enabled in setup() by default
+        checkDeduplicationEnabled(producerName, messageDeduplication, maxSeq);
+        //disabled in namespace-level
+        admin.namespaces().setDeduplicationStatus(myNamespace, false);
+        Awaitility.await().untilAsserted(() -> 
assertNotNull(admin.namespaces().getDeduplicationStatus(myNamespace)));
+        sendMessageAndGetMaxSeq(maxMsgNum, producer);
+        checkDeduplicationDisabled(producerName, messageDeduplication);
+        //enabled in topic-level
+        admin.topics().setDeduplicationStatus(topicName, true);
+        Awaitility.await().untilAsserted(() -> 
assertNotNull(admin.topics().getDeduplicationStatus(topicName)));
+        Awaitility.await().untilAsserted(() -> 
assertTrue(messageDeduplication.isEnabled()));
+        long maxSeq2 = sendMessageAndGetMaxSeq(maxMsgNum, producer);
+        checkDeduplicationEnabled(producerName, messageDeduplication, maxSeq2);
+        //remove topic-level, use namespace-level
+        admin.topics().removeDeduplicationStatus(topicName);
+        Awaitility.await().untilAsserted(() -> 
assertNull(admin.topics().getDeduplicationStatus(topicName)));
+        Awaitility.await().untilAsserted(() -> 
assertFalse(messageDeduplication.isEnabled()));
+        producer.newMessage().value("msg").sequenceId(1).send();
+        checkDeduplicationDisabled(producerName, messageDeduplication);
+        //remove namespace-level , use broker-level
+        admin.namespaces().removeDeduplicationStatus(myNamespace);
+        Awaitility.await().untilAsserted(() -> 
assertNull(admin.namespaces().getDeduplicationStatus(myNamespace)));
+        Awaitility.await().untilAsserted(() -> 
assertTrue(messageDeduplication.isEnabled()));
+        long maxSeq3 = sendMessageAndGetMaxSeq(maxMsgNum, producer);
+        checkDeduplicationEnabled(producerName, messageDeduplication, maxSeq3);
+    }
+
+    private long sendMessageAndGetMaxSeq(int maxMsgNum, Producer producer) 
throws Exception{
+        long seq = System.nanoTime();
+        for (int i = 0; i <= maxMsgNum; i++) {
+            producer.newMessage().value("msg-" + i).sequenceId(seq + i).send();
+        }
+        return seq + maxMsgNum;
+    }
+
+    private void checkDeduplicationDisabled(String producerName, 
MessageDeduplication messageDeduplication) throws Exception {
+        messageDeduplication.checkStatus().whenComplete((res, ex) -> {
+            if (ex != null) {
+                fail("should not fail");
+            }
+            
assertEquals(messageDeduplication.getLastPublishedSequenceId(producerName), -1);
+            
assertEquals(messageDeduplication.highestSequencedPersisted.size(), 0);
+            assertEquals(messageDeduplication.highestSequencedPushed.size(), 
0);
+        }).get();
+    }
+
+    private void checkDeduplicationEnabled(String producerName, 
MessageDeduplication messageDeduplication,
+                                           long maxSeq) throws Exception {
+        messageDeduplication.checkStatus().whenComplete((res, ex) -> {
+            if (ex != null) {
+                fail("should not fail");
+            }
+            assertNotNull(messageDeduplication.highestSequencedPersisted);
+            assertNotNull(messageDeduplication.highestSequencedPushed);
+            long seqId = 
messageDeduplication.getLastPublishedSequenceId(producerName);
+            assertEquals(seqId, maxSeq);
+            
assertEquals(messageDeduplication.highestSequencedPersisted.get(producerName).longValue(),
 maxSeq);
+            
assertEquals(messageDeduplication.highestSequencedPushed.get(producerName).longValue(),
 maxSeq);
+        }).get();
+    }
+
+    @Test(timeOut = 10000)
     public void testDuplicationSnapshotApi() throws Exception {
         final String topicName = testTopic + UUID.randomUUID().toString();
         admin.topics().createPartitionedTopic(topicName, 3);
@@ -192,27 +309,13 @@ public class TopicDuplicationTest extends 
ProducerConsumerBase {
         @Cleanup
         Producer<String> producer = 
pulsarClient.newProducer(Schema.STRING).topic(topicName)
                 .producerName(producerName).create();
-        long seq = System.currentTimeMillis();
-        for (int i = 0; i <= maxMsgNum; i++) {
-            producer.newMessage().value("msg-" + i).sequenceId(seq + i).send();
-        }
-        long maxSeq = seq + maxMsgNum;
+        long maxSeq = sendMessageAndGetMaxSeq(maxMsgNum, producer);
         //2) Max sequenceId should be recorded correctly
         CompletableFuture<Optional<Topic>> completableFuture = 
pulsar.getBrokerService().getTopics().get(topicName);
         Topic topic = completableFuture.get(1, TimeUnit.SECONDS).get();
         PersistentTopic persistentTopic = (PersistentTopic) topic;
         MessageDeduplication messageDeduplication = 
persistentTopic.getMessageDeduplication();
-        messageDeduplication.checkStatus().whenComplete((res, ex) -> {
-            if (ex != null) {
-                fail("should not fail");
-            }
-            assertNotNull(messageDeduplication.highestSequencedPersisted);
-            assertNotNull(messageDeduplication.highestSequencedPushed);
-            long seqId = 
messageDeduplication.getLastPublishedSequenceId(producerName);
-            assertEquals(seqId, maxSeq);
-            
assertEquals(messageDeduplication.highestSequencedPersisted.get(producerName).longValue(),
 maxSeq);
-            
assertEquals(messageDeduplication.highestSequencedPushed.get(producerName).longValue(),
 maxSeq);
-        }).get();
+        checkDeduplicationEnabled(producerName, messageDeduplication, maxSeq);
         //3) disable the deduplication check
         admin.topics().enableDeduplication(topicName, false);
         Awaitility.await().atMost(5, TimeUnit.SECONDS)
@@ -221,14 +324,7 @@ public class TopicDuplicationTest extends 
ProducerConsumerBase {
             producer.newMessage().value("msg-" + i).sequenceId(maxSeq + 
i).send();
         }
         //4) Max sequenceId record should be clear
-        messageDeduplication.checkStatus().whenComplete((res, ex) -> {
-            if (ex != null) {
-                fail("should not fail");
-            }
-            
assertEquals(messageDeduplication.getLastPublishedSequenceId(producerName), -1);
-            
assertEquals(messageDeduplication.highestSequencedPersisted.size(), 0);
-            assertEquals(messageDeduplication.highestSequencedPushed.size(), 
0);
-        }).get();
+        checkDeduplicationDisabled(producerName, messageDeduplication);
 
     }
 
@@ -315,7 +411,7 @@ public class TopicDuplicationTest extends 
ProducerConsumerBase {
     }
 
     @Test(timeOut = 30000)
-    private void testNamespacePolicyTakeSnapshot() throws Exception {
+    public void testNamespacePolicyTakeSnapshot() throws Exception {
         resetConfig();
         conf.setBrokerDeduplicationEnabled(true);
         conf.setBrokerDeduplicationSnapshotFrequencyInSeconds(1);
@@ -367,7 +463,7 @@ public class TopicDuplicationTest extends 
ProducerConsumerBase {
     }
 
     @Test(timeOut = 30000)
-    private void testDisableNamespacePolicyTakeSnapshot() throws Exception {
+    public void testDisableNamespacePolicyTakeSnapshot() throws Exception {
         resetConfig();
         conf.setBrokerDeduplicationEnabled(true);
         conf.setBrokerDeduplicationSnapshotFrequencyInSeconds(1);
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/ClientDeduplicationTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/ClientDeduplicationTest.java
index 347903b..bdb0cc4 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/ClientDeduplicationTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/ClientDeduplicationTest.java
@@ -19,9 +19,13 @@
 package org.apache.pulsar.client.api;
 
 import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertFalse;
 import static org.testng.Assert.assertNull;
+import static org.testng.Assert.assertTrue;
+
 import java.util.concurrent.TimeUnit;
 
+import org.awaitility.Awaitility;
 import org.testng.annotations.AfterClass;
 import org.testng.annotations.BeforeClass;
 import org.testng.annotations.Test;
@@ -41,6 +45,18 @@ public class ClientDeduplicationTest extends 
ProducerConsumerBase {
     }
 
     @Test
+    public void testNamespaceDeduplicationApi() throws Exception {
+        final String namespace = "my-property/my-ns";
+        assertNull(admin.namespaces().getDeduplicationStatus(namespace));
+        admin.namespaces().setDeduplicationStatus(namespace, true);
+        Awaitility.await().untilAsserted(() -> 
assertTrue(admin.namespaces().getDeduplicationStatus(namespace)));
+        admin.namespaces().setDeduplicationStatus(namespace, false);
+        Awaitility.await().untilAsserted(() -> 
assertFalse(admin.namespaces().getDeduplicationStatus(namespace)));
+        admin.namespaces().removeDeduplicationStatus(namespace);
+        Awaitility.await().untilAsserted(() -> 
assertNull(admin.namespaces().getDeduplicationStatus(namespace)));
+    }
+
+    @Test
     public void testProducerSequenceAfterReconnect() throws Exception {
         String topic = 
"persistent://my-property/my-ns/testProducerSequenceAfterReconnect";
         admin.namespaces().setDeduplicationStatus("my-property/my-ns", true);
diff --git 
a/pulsar-client-admin-api/src/main/java/org/apache/pulsar/client/admin/Namespaces.java
 
b/pulsar-client-admin-api/src/main/java/org/apache/pulsar/client/admin/Namespaces.java
index 226877f..1323024 100644
--- 
a/pulsar-client-admin-api/src/main/java/org/apache/pulsar/client/admin/Namespaces.java
+++ 
b/pulsar-client-admin-api/src/main/java/org/apache/pulsar/client/admin/Namespaces.java
@@ -1086,6 +1086,33 @@ public interface Namespaces {
     CompletableFuture<Void> deleteNamespaceAntiAffinityGroupAsync(String 
namespace);
 
     /**
+     * Remove the deduplication status for all topics within a namespace.
+     * @param namespace
+     * @throws PulsarAdminException
+     */
+    void removeDeduplicationStatus(String namespace) throws 
PulsarAdminException;
+
+    /**
+     * Get the deduplication status for all topics within a namespace 
asynchronously.
+     * @param namespace
+     * @return
+     */
+    CompletableFuture<Void> removeDeduplicationStatusAsync(String namespace);
+    /**
+     * Get the deduplication status for all topics within a namespace .
+     * @param namespace
+     * @return
+     * @throws PulsarAdminException
+     */
+    Boolean getDeduplicationStatus(String namespace) throws 
PulsarAdminException;
+
+    /**
+     * Get the deduplication status for all topics within a namespace 
asynchronously.
+     * @param namespace
+     * @return
+     */
+    CompletableFuture<Boolean> getDeduplicationStatusAsync(String namespace);
+    /**
      * Set the deduplication status for all topics within a namespace.
      * <p/>
      * When deduplication is enabled, the broker will prevent to store the 
same message multiple times.
diff --git 
a/pulsar-client-admin-api/src/main/java/org/apache/pulsar/client/admin/Topics.java
 
b/pulsar-client-admin-api/src/main/java/org/apache/pulsar/client/admin/Topics.java
index 1cbf890..b5cc4ac 100644
--- 
a/pulsar-client-admin-api/src/main/java/org/apache/pulsar/client/admin/Topics.java
+++ 
b/pulsar-client-admin-api/src/main/java/org/apache/pulsar/client/admin/Topics.java
@@ -2118,6 +2118,7 @@ public interface Topics {
      * @return
      * @throws PulsarAdminException
      */
+    @Deprecated
     Boolean getDeduplicationEnabled(String topic) throws PulsarAdminException;
 
     /**
@@ -2125,14 +2126,45 @@ public interface Topics {
      * @param topic
      * @return
      */
+    @Deprecated
     CompletableFuture<Boolean> getDeduplicationEnabledAsync(String topic);
 
     /**
+     * get deduplication enabled of a topic.
+     * @param topic
+     * @return
+     * @throws PulsarAdminException
+     */
+    Boolean getDeduplicationStatus(String topic) throws PulsarAdminException;
+
+    /**
+     * get deduplication enabled of a topic asynchronously.
+     * @param topic
+     * @return
+     */
+    CompletableFuture<Boolean> getDeduplicationStatusAsync(String topic);
+    /**
+     * get applied deduplication enabled of a topic.
+     * @param topic
+     * @return
+     * @throws PulsarAdminException
+     */
+    Boolean getDeduplicationStatus(String topic, boolean applied) throws 
PulsarAdminException;
+
+    /**
+     * get applied deduplication enabled of a topic asynchronously.
+     * @param topic
+     * @return
+     */
+    CompletableFuture<Boolean> getDeduplicationStatusAsync(String topic, 
boolean applied);
+
+    /**
      * set deduplication enabled of a topic.
      * @param topic
      * @param enabled
      * @throws PulsarAdminException
      */
+    @Deprecated
     void enableDeduplication(String topic, boolean enabled) throws 
PulsarAdminException;
 
     /**
@@ -2141,13 +2173,31 @@ public interface Topics {
      * @param enabled
      * @return
      */
+    @Deprecated
     CompletableFuture<Void> enableDeduplicationAsync(String topic, boolean 
enabled);
 
     /**
+     * set deduplication enabled of a topic.
+     * @param topic
+     * @param enabled
+     * @throws PulsarAdminException
+     */
+    void setDeduplicationStatus(String topic, boolean enabled) throws 
PulsarAdminException;
+
+    /**
+     * set deduplication enabled of a topic asynchronously.
+     * @param topic
+     * @param enabled
+     * @return
+     */
+    CompletableFuture<Void> setDeduplicationStatusAsync(String topic, boolean 
enabled);
+
+    /**
      * remove deduplication enabled of a topic.
      * @param topic
      * @throws PulsarAdminException
      */
+    @Deprecated
     void disableDeduplication(String topic) throws PulsarAdminException;
 
     /**
@@ -2155,9 +2205,24 @@ public interface Topics {
      * @param topic
      * @return
      */
+    @Deprecated
     CompletableFuture<Void> disableDeduplicationAsync(String topic);
 
     /**
+     * remove deduplication enabled of a topic.
+     * @param topic
+     * @throws PulsarAdminException
+     */
+    void removeDeduplicationStatus(String topic) throws PulsarAdminException;
+
+    /**
+     * remove deduplication enabled of a topic asynchronously.
+     * @param topic
+     * @return
+     */
+    CompletableFuture<Void> removeDeduplicationStatusAsync(String topic);
+
+    /**
      * Set message-dispatch-rate (topic can dispatch this many messages per 
second).
      *
      * @param topic
diff --git 
a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/NamespacesImpl.java
 
b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/NamespacesImpl.java
index 40ef320..1b70f79 100644
--- 
a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/NamespacesImpl.java
+++ 
b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/NamespacesImpl.java
@@ -870,6 +870,63 @@ public class NamespacesImpl extends BaseResource 
implements Namespaces {
     }
 
     @Override
+    public void removeDeduplicationStatus(String namespace) throws 
PulsarAdminException {
+        try {
+            removeDeduplicationStatusAsync(namespace)
+                    .get(this.readTimeoutMs, TimeUnit.MILLISECONDS);
+        } catch (ExecutionException e) {
+            throw (PulsarAdminException) e.getCause();
+        } catch (InterruptedException e) {
+            Thread.currentThread().interrupt();
+            throw new PulsarAdminException(e);
+        } catch (TimeoutException e) {
+            throw new PulsarAdminException.TimeoutException(e);
+        }
+    }
+
+    @Override
+    public CompletableFuture<Void> removeDeduplicationStatusAsync(String 
namespace) {
+        NamespaceName ns = NamespaceName.get(namespace);
+        WebTarget path = namespacePath(ns, "deduplication");
+        return asyncDeleteRequest(path);
+    }
+
+    @Override
+    public Boolean getDeduplicationStatus(String namespace) throws 
PulsarAdminException {
+        try {
+            return getDeduplicationStatusAsync(namespace)
+                    .get(this.readTimeoutMs, TimeUnit.MILLISECONDS);
+        } catch (ExecutionException e) {
+            throw (PulsarAdminException) e.getCause();
+        } catch (InterruptedException e) {
+            Thread.currentThread().interrupt();
+            throw new PulsarAdminException(e);
+        } catch (TimeoutException e) {
+            throw new PulsarAdminException.TimeoutException(e);
+        }
+    }
+
+    @Override
+    public CompletableFuture<Boolean> getDeduplicationStatusAsync(String 
namespace) {
+        NamespaceName ns = NamespaceName.get(namespace);
+        WebTarget path = namespacePath(ns, "deduplication");
+        final CompletableFuture<Boolean> future = new CompletableFuture<>();
+        asyncGetRequest(path,
+                new InvocationCallback<Boolean>() {
+                    @Override
+                    public void completed(Boolean enabled) {
+                        future.complete(enabled);
+                    }
+
+                    @Override
+                    public void failed(Throwable throwable) {
+                        
future.completeExceptionally(getApiException(throwable.getCause()));
+                    }
+                });
+        return future;
+    }
+
+    @Override
     public void setDeduplicationStatus(String namespace, boolean 
enableDeduplication) throws PulsarAdminException {
         try {
             setDeduplicationStatusAsync(namespace, enableDeduplication)
diff --git 
a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/TopicsImpl.java
 
b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/TopicsImpl.java
index e4ca396..4e5ee73 100644
--- 
a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/TopicsImpl.java
+++ 
b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/TopicsImpl.java
@@ -1849,6 +1849,51 @@ public class TopicsImpl extends BaseResource implements 
Topics {
     }
 
     @Override
+    public Boolean getDeduplicationStatus(String topic) throws 
PulsarAdminException {
+        return getDeduplicationStatus(topic, false);
+    }
+
+    @Override
+    public CompletableFuture<Boolean> getDeduplicationStatusAsync(String 
topic) {
+        return getDeduplicationStatusAsync(topic, false);
+    }
+
+    @Override
+    public Boolean getDeduplicationStatus(String topic, boolean applied) 
throws PulsarAdminException {
+        try {
+            return getDeduplicationStatusAsync(topic, applied).
+                    get(this.readTimeoutMs, TimeUnit.MILLISECONDS);
+        } catch (ExecutionException e) {
+            throw (PulsarAdminException) e.getCause();
+        } catch (InterruptedException e) {
+            Thread.currentThread().interrupt();
+            throw new PulsarAdminException(e);
+        } catch (TimeoutException e) {
+            throw new PulsarAdminException.TimeoutException(e);
+        }
+    }
+
+    @Override
+    public CompletableFuture<Boolean> getDeduplicationStatusAsync(String 
topic, boolean applied) {
+        TopicName topicName = validateTopic(topic);
+        WebTarget path = topicPath(topicName, "deduplicationEnabled");
+        path = path.queryParam("applied", applied);
+        final CompletableFuture<Boolean> future = new CompletableFuture<>();
+        asyncGetRequest(path, new InvocationCallback<Boolean>() {
+            @Override
+            public void completed(Boolean enabled) {
+                future.complete(enabled);
+            }
+
+            @Override
+            public void failed(Throwable throwable) {
+                
future.completeExceptionally(getApiException(throwable.getCause()));
+            }
+        });
+        return future;
+    }
+
+    @Override
     public void enableDeduplication(String topic, boolean enabled) throws 
PulsarAdminException {
         try {
             enableDeduplicationAsync(topic, enabled).
@@ -1871,6 +1916,28 @@ public class TopicsImpl extends BaseResource implements 
Topics {
     }
 
     @Override
+    public void setDeduplicationStatus(String topic, boolean enabled) throws 
PulsarAdminException {
+        try {
+            enableDeduplicationAsync(topic, enabled).
+                    get(this.readTimeoutMs, TimeUnit.MILLISECONDS);
+        } catch (ExecutionException e) {
+            throw (PulsarAdminException) e.getCause();
+        } catch (InterruptedException e) {
+            Thread.currentThread().interrupt();
+            throw new PulsarAdminException(e);
+        } catch (TimeoutException e) {
+            throw new PulsarAdminException.TimeoutException(e);
+        }
+    }
+
+    @Override
+    public CompletableFuture<Void> setDeduplicationStatusAsync(String topic, 
boolean enabled) {
+        TopicName topicName = validateTopic(topic);
+        WebTarget path = topicPath(topicName, "deduplicationEnabled");
+        return asyncPostRequest(path, Entity.entity(enabled, 
MediaType.APPLICATION_JSON));
+    }
+
+    @Override
     public void disableDeduplication(String topic) throws PulsarAdminException 
{
         try {
             disableDeduplicationAsync(topic).
@@ -1893,6 +1960,27 @@ public class TopicsImpl extends BaseResource implements 
Topics {
     }
 
     @Override
+    public void removeDeduplicationStatus(String topic) throws 
PulsarAdminException {
+        try {
+            removeDeduplicationStatusAsync(topic).get(this.readTimeoutMs, 
TimeUnit.MILLISECONDS);
+        } catch (ExecutionException e) {
+            throw (PulsarAdminException) e.getCause();
+        } catch (InterruptedException e) {
+            Thread.currentThread().interrupt();
+            throw new PulsarAdminException(e);
+        } catch (TimeoutException e) {
+            throw new PulsarAdminException.TimeoutException(e);
+        }
+    }
+
+    @Override
+    public CompletableFuture<Void> removeDeduplicationStatusAsync(String 
topic) {
+        TopicName tn = validateTopic(topic);
+        WebTarget path = topicPath(tn, "deduplicationEnabled");
+        return asyncDeleteRequest(path);
+    }
+
+    @Override
     public OffloadPolicies getOffloadPolicies(String topic) throws 
PulsarAdminException {
         return getOffloadPolicies(topic, false);
     }
diff --git 
a/pulsar-client-tools-test/src/test/java/org/apache/pulsar/admin/cli/PulsarAdminToolTest.java
 
b/pulsar-client-tools-test/src/test/java/org/apache/pulsar/admin/cli/PulsarAdminToolTest.java
index 64dffbc..e519b17 100644
--- 
a/pulsar-client-tools-test/src/test/java/org/apache/pulsar/admin/cli/PulsarAdminToolTest.java
+++ 
b/pulsar-client-tools-test/src/test/java/org/apache/pulsar/admin/cli/PulsarAdminToolTest.java
@@ -365,8 +365,12 @@ public class PulsarAdminToolTest {
         namespaces.run(split("set-subscription-expiration-time 
myprop/clust/ns1 -t 60"));
         
verify(mockNamespaces).setSubscriptionExpirationTime("myprop/clust/ns1", 60);
 
+        namespaces.run(split("get-deduplication myprop/clust/ns1"));
+        verify(mockNamespaces).getDeduplicationStatus("myprop/clust/ns1");
         namespaces.run(split("set-deduplication myprop/clust/ns1 --enable"));
         verify(mockNamespaces).setDeduplicationStatus("myprop/clust/ns1", 
true);
+        namespaces.run(split("remove-deduplication myprop/clust/ns1"));
+        verify(mockNamespaces).removeDeduplicationStatus("myprop/clust/ns1");
 
         namespaces.run(split("set-auto-topic-creation myprop/clust/ns1 -e -t 
non-partitioned"));
         verify(mockNamespaces).setAutoTopicCreation("myprop/clust/ns1",
@@ -773,7 +777,10 @@ public class PulsarAdminToolTest {
         
verify(mockTopics).enableDeduplication("persistent://myprop/clust/ns1/ds1", 
false);
 
         cmdTopics.run(split("set-deduplication 
persistent://myprop/clust/ns1/ds1 --disable"));
-        verify(mockTopics, 
times(2)).enableDeduplication("persistent://myprop/clust/ns1/ds1", false);
+        
verify(mockTopics).setDeduplicationStatus("persistent://myprop/clust/ns1/ds1", 
false);
+
+        cmdTopics.run(split("remove-deduplication 
persistent://myprop/clust/ns1/ds1"));
+        
verify(mockTopics).removeDeduplicationStatus("persistent://myprop/clust/ns1/ds1");
 
         cmdTopics.run(split("get-replicator-dispatch-rate 
persistent://myprop/clust/ns1/ds1"));
         
verify(mockTopics).getReplicatorDispatchRate("persistent://myprop/clust/ns1/ds1");
@@ -793,9 +800,9 @@ public class PulsarAdminToolTest {
         
verify(mockTopics).removeReplicatorDispatchRate("persistent://myprop/clust/ns1/ds1");
 
         cmdTopics.run(split("get-deduplication-enabled 
persistent://myprop/clust/ns1/ds1"));
-        
verify(mockTopics).getDeduplicationEnabled("persistent://myprop/clust/ns1/ds1");
+        
verify(mockTopics).getDeduplicationStatus("persistent://myprop/clust/ns1/ds1");
         cmdTopics.run(split("get-deduplication 
persistent://myprop/clust/ns1/ds1"));
-        verify(mockTopics, 
times(2)).getDeduplicationEnabled("persistent://myprop/clust/ns1/ds1");
+        verify(mockTopics, 
times(2)).getDeduplicationStatus("persistent://myprop/clust/ns1/ds1");
 
         cmdTopics.run(split("get-offload-policies 
persistent://myprop/clust/ns1/ds1"));
         
verify(mockTopics).getOffloadPolicies("persistent://myprop/clust/ns1/ds1", 
false);
diff --git 
a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdNamespaces.java
 
b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdNamespaces.java
index 7e261cc..e988e82 100644
--- 
a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdNamespaces.java
+++ 
b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdNamespaces.java
@@ -478,6 +478,29 @@ public class CmdNamespaces extends CmdBase {
         }
     }
 
+    @Parameters(commandDescription = "Get Deduplication for a namespace")
+    private class GetDeduplication extends CliCommand {
+        @Parameter(description = "tenant/namespace\n", required = true)
+        private java.util.List<String> params;
+
+        @Override
+        void run() throws PulsarAdminException {
+            String namespace = validateNamespace(params);
+            print(getAdmin().namespaces().getDeduplicationStatus(namespace));
+        }
+    }
+
+    @Parameters(commandDescription = "Remove Deduplication for a namespace")
+    private class RemoveDeduplication extends CliCommand {
+        @Parameter(description = "tenant/namespace\n", required = true)
+        private java.util.List<String> params;
+
+        @Override
+        void run() throws PulsarAdminException {
+            String namespace = validateNamespace(params);
+            getAdmin().namespaces().removeDeduplicationStatus(namespace);
+        }
+    }
 
     @Parameters(commandDescription = "Enable or disable deduplication for a 
namespace")
     private class SetDeduplication extends CliCommand {
@@ -2014,6 +2037,8 @@ public class CmdNamespaces extends CmdBase {
         jcommander.addCommand("delete-anti-affinity-group", new 
DeleteAntiAffinityGroup());
 
         jcommander.addCommand("set-deduplication", new SetDeduplication());
+        jcommander.addCommand("get-deduplication", new GetDeduplication());
+        jcommander.addCommand("remove-deduplication", new 
RemoveDeduplication());
 
         jcommander.addCommand("set-auto-topic-creation", new 
SetAutoTopicCreation());
         jcommander.addCommand("remove-auto-topic-creation", new 
RemoveAutoTopicCreation());
diff --git 
a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdTopics.java 
b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdTopics.java
index 2bd133a..3d6f82e 100644
--- 
a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdTopics.java
+++ 
b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdTopics.java
@@ -131,10 +131,11 @@ public class CmdTopics extends CmdBase {
         //deprecated commands
         jcommander.addCommand("enable-deduplication", new 
EnableDeduplication());
         jcommander.addCommand("disable-deduplication", new 
DisableDeduplication());
-        jcommander.addCommand("get-deduplication-enabled", new 
GetDeduplicationEnabled());
+        jcommander.addCommand("get-deduplication-enabled", new 
GetDeduplicationStatus());
 
-        jcommander.addCommand("set-deduplication", new SetDeduplication());
-        jcommander.addCommand("get-deduplication", new 
GetDeduplicationEnabled());
+        jcommander.addCommand("set-deduplication", new 
SetDeduplicationStatus());
+        jcommander.addCommand("get-deduplication", new 
GetDeduplicationStatus());
+        jcommander.addCommand("remove-deduplication", new 
RemoveDeduplicationStatus());
 
         jcommander.addCommand("get-deduplication-snapshot-interval", new 
GetDeduplicationSnapshotInterval());
         jcommander.addCommand("set-deduplication-snapshot-interval", new 
SetDeduplicationSnapshotInterval());
@@ -1346,7 +1347,7 @@ public class CmdTopics extends CmdBase {
     }
 
     @Parameters(commandDescription = "Enable or disable deduplication for a 
topic")
-    private class SetDeduplication extends CliCommand {
+    private class SetDeduplicationStatus extends CliCommand {
         @Parameter(description = "persistent://tenant/namespace/topic", 
required = true)
         private java.util.List<String> params;
 
@@ -1363,19 +1364,31 @@ public class CmdTopics extends CmdBase {
             if (enable == disable) {
                 throw new ParameterException("Need to specify either --enable 
or --disable");
             }
-            getAdmin().topics().enableDeduplication(persistentTopic, enable);
+            getAdmin().topics().setDeduplicationStatus(persistentTopic, 
enable);
         }
     }
 
     @Parameters(commandDescription = "Get the deduplication policy for a 
topic")
-    private class GetDeduplicationEnabled extends CliCommand {
+    private class GetDeduplicationStatus extends CliCommand {
         @Parameter(description = "persistent://tenant/namespace/topic", 
required = true)
         private java.util.List<String> params;
 
         @Override
         void run() throws PulsarAdminException {
             String persistentTopic = validatePersistentTopic(params);
-            
print(getAdmin().topics().getDeduplicationEnabled(persistentTopic));
+            print(getAdmin().topics().getDeduplicationStatus(persistentTopic));
+        }
+    }
+
+    @Parameters(commandDescription = "Remove the deduplication policy for a 
topic")
+    private class RemoveDeduplicationStatus extends CliCommand {
+        @Parameter(description = "persistent://tenant/namespace/topic", 
required = true)
+        private java.util.List<String> params;
+
+        @Override
+        void run() throws PulsarAdminException {
+            String persistentTopic = validatePersistentTopic(params);
+            getAdmin().topics().removeDeduplicationStatus(persistentTopic);
         }
     }
 

Reply via email to