This is an automated email from the ASF dual-hosted git repository.
penghui pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 74db656 Support get topic applied policy for DeduplicationStatus
(#9339)
74db656 is described below
commit 74db656a76fc26036d72250d755ea20c89f65ccd
Author: feynmanlin <[email protected]>
AuthorDate: Mon Mar 1 13:06:12 2021 +0800
Support get topic applied policy for DeduplicationStatus (#9339)
Master Issue: #9216
### Modifications
1) The api name of topic-level is consistent with that of namespace-level
2) Fix the problem that the namespace-level policy cannot be removed
3) Fix the problem that topic-level does not take effect when multiple
levels of policy are set at the same time
4) Added applied API
### Verifying this change
Verify that the new API is correct
Verify that the applied API is correct
Verify that policies of different levels exist at the same time and whether
the priority is correct
---
.../pulsar/broker/admin/impl/NamespacesBase.java | 7 +-
.../broker/admin/impl/PersistentTopicsBase.java | 15 ++-
.../apache/pulsar/broker/admin/v2/Namespaces.java | 20 +++
.../pulsar/broker/admin/v2/PersistentTopics.java | 30 +++--
.../broker/service/persistent/PersistentTopic.java | 2 +
.../service/persistent/TopicDuplicationTest.java | 148 +++++++++++++++++----
.../pulsar/client/api/ClientDeduplicationTest.java | 16 +++
.../org/apache/pulsar/client/admin/Namespaces.java | 27 ++++
.../org/apache/pulsar/client/admin/Topics.java | 65 +++++++++
.../client/admin/internal/NamespacesImpl.java | 57 ++++++++
.../pulsar/client/admin/internal/TopicsImpl.java | 88 ++++++++++++
.../pulsar/admin/cli/PulsarAdminToolTest.java | 13 +-
.../org/apache/pulsar/admin/cli/CmdNamespaces.java | 25 ++++
.../org/apache/pulsar/admin/cli/CmdTopics.java | 27 +++-
14 files changed, 490 insertions(+), 50 deletions(-)
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java
index 78e3d2f..b8841c1 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java
@@ -856,7 +856,7 @@ public abstract class NamespacesBase extends AdminResource {
internalSetAutoSubscriptionCreation(asyncResponse, null);
}
- protected void internalModifyDeduplication(boolean enableDeduplication) {
+ protected void internalModifyDeduplication(Boolean enableDeduplication) {
validateNamespacePolicyOperation(namespaceName,
PolicyName.DEDUPLICATION, PolicyOperation.WRITE);
validatePoliciesReadOnlyAccess();
updatePolicies(path(POLICIES, namespaceName.toString()), policies ->{
@@ -1999,6 +1999,11 @@ public abstract class NamespacesBase extends
AdminResource {
}
}
+ protected Boolean internalGetDeduplication() {
+ validateNamespacePolicyOperation(namespaceName,
PolicyName.DEDUPLICATION, PolicyOperation.READ);
+ return getNamespacePolicies(namespaceName).deduplicationEnabled;
+ }
+
protected Integer internalGetMaxConsumersPerTopic() {
validateNamespacePolicyOperation(namespaceName,
PolicyName.MAX_CONSUMERS, PolicyOperation.READ);
return getNamespacePolicies(namespaceName).max_consumers_per_topic;
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
index 159a6be..b6844cb 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
@@ -2561,7 +2561,20 @@ public class PersistentTopicsBase extends AdminResource {
});
}
- protected CompletableFuture<Void> internalSetDeduplicationEnabled(Boolean
enabled) {
+ protected CompletableFuture<Boolean> internalGetDeduplication(boolean
applied) {
+ Boolean deduplicationEnabled = getTopicPolicies(topicName)
+ .map(TopicPolicies::getDeduplicationEnabled)
+ .orElseGet(() -> {
+ if (applied) {
+ Boolean enabled =
getNamespacePolicies(namespaceName).deduplicationEnabled;
+ return enabled == null ?
config().isBrokerDeduplicationEnabled() : enabled;
+ }
+ return null;
+ });
+ return CompletableFuture.completedFuture(deduplicationEnabled);
+ }
+
+ protected CompletableFuture<Void> internalSetDeduplication(Boolean
enabled) {
TopicPolicies topicPolicies = null;
try {
topicPolicies =
pulsar().getTopicPoliciesService().getTopicPolicies(topicName);
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/Namespaces.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/Namespaces.java
index a9ff3dd..79c4cc4 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/Namespaces.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/Namespaces.java
@@ -336,6 +336,16 @@ public class Namespaces extends NamespacesBase {
internalSetSubscriptionExpirationTime(expirationTime);
}
+ @GET
+ @Path("/{tenant}/{namespace}/deduplication")
+ @ApiOperation(value = "Get broker side deduplication for all topics in a
namespace")
+ @ApiResponses(value = { @ApiResponse(code = 403, message = "Don't have
admin permission"),
+ @ApiResponse(code = 404, message = "Tenant or cluster or namespace
doesn't exist") })
+ public Boolean getDeduplication(@PathParam("tenant") String tenant,
@PathParam("namespace") String namespace) {
+ validateNamespaceName(tenant, namespace);
+ return internalGetDeduplication();
+ }
+
@POST
@Path("/{tenant}/{namespace}/deduplication")
@ApiOperation(value = "Enable or disable broker side deduplication for all
topics in a namespace")
@@ -349,6 +359,16 @@ public class Namespaces extends NamespacesBase {
internalModifyDeduplication(enableDeduplication);
}
+ @DELETE
+ @Path("/{tenant}/{namespace}/deduplication")
+ @ApiOperation(value = "Remove broker side deduplication for all topics in
a namespace")
+ @ApiResponses(value = { @ApiResponse(code = 403, message = "Don't have
admin permission"),
+ @ApiResponse(code = 404, message = "Tenant or cluster or namespace
doesn't exist") })
+ public void removeDeduplication(@PathParam("tenant") String tenant,
@PathParam("namespace") String namespace) {
+ validateNamespaceName(tenant, namespace);
+ internalModifyDeduplication(null);
+ }
+
@POST
@Path("/{tenant}/{namespace}/autoTopicCreation")
@ApiOperation(value = "Override broker's allowAutoTopicCreation setting
for a namespace")
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java
index f98a2ee..1c41475 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java
@@ -1657,17 +1657,23 @@ public class PersistentTopics extends
PersistentTopicsBase {
@ApiResponse(code = 404, message = "Tenant or cluster or namespace
or topic doesn't exist"),
@ApiResponse(code = 405,
message = "Topic level policy is disabled, to enable the
topic level policy and retry")})
- public void getDeduplicationEnabled(@Suspended final AsyncResponse
asyncResponse,
+ public void getDeduplication(@Suspended final AsyncResponse asyncResponse,
@PathParam("tenant") String tenant,
@PathParam("namespace") String namespace,
- @PathParam("topic") @Encoded String encodedTopic)
{
+ @PathParam("topic") @Encoded String encodedTopic,
+ @QueryParam("applied") boolean applied) {
validateTopicName(tenant, namespace, encodedTopic);
- TopicPolicies topicPolicies = getTopicPolicies(topicName).orElse(new
TopicPolicies());
- if (topicPolicies.isDeduplicationSet()) {
- asyncResponse.resume(topicPolicies.getDeduplicationEnabled());
- } else {
- asyncResponse.resume(Response.noContent().build());
- }
+ internalGetDeduplication(applied).whenComplete((res, ex) -> {
+ if (ex instanceof RestException) {
+ log.error("Failed get Deduplication", ex);
+ asyncResponse.resume(ex);
+ } else if (ex != null) {
+ log.error("Failed get Deduplication", ex);
+ asyncResponse.resume(new RestException(ex));
+ } else {
+ asyncResponse.resume(res);
+ }
+ });
}
@POST
@@ -1677,7 +1683,7 @@ public class PersistentTopics extends
PersistentTopicsBase {
@ApiResponse(code = 404, message = "Tenant or cluster or namespace
or topic doesn't exist"),
@ApiResponse(code = 405,
message = "Topic level policy is disabled, to enable the
topic level policy and retry")})
- public void setDeduplicationEnabled(
+ public void setDeduplication(
@Suspended final AsyncResponse asyncResponse,
@PathParam("tenant") String tenant,
@PathParam("namespace") String namespace,
@@ -1685,7 +1691,7 @@ public class PersistentTopics extends
PersistentTopicsBase {
@ApiParam(value = "DeduplicationEnabled policies for the specified
topic")
Boolean enabled) {
validateTopicName(tenant, namespace, encodedTopic);
- internalSetDeduplicationEnabled(enabled).whenComplete((r, ex) -> {
+ internalSetDeduplication(enabled).whenComplete((r, ex) -> {
if (ex instanceof RestException) {
log.error("Failed updated deduplication", ex);
asyncResponse.resume(ex);
@@ -1706,12 +1712,12 @@ public class PersistentTopics extends
PersistentTopicsBase {
@ApiResponse(code = 405,
message = "Topic level policy is disabled, to enable the
topic level policy and retry"),
@ApiResponse(code = 409, message = "Concurrent modification")})
- public void removeDeduplicationEnabled(@Suspended final AsyncResponse
asyncResponse,
+ public void removeDeduplication(@Suspended final AsyncResponse
asyncResponse,
@PathParam("tenant") String tenant,
@PathParam("namespace") String
namespace,
@PathParam("topic") @Encoded String
encodedTopic) {
validateTopicName(tenant, namespace, encodedTopic);
- setDeduplicationEnabled(asyncResponse, tenant, namespace,
encodedTopic, null);
+ setDeduplication(asyncResponse, tenant, namespace, encodedTopic, null);
}
@GET
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
index 2b40965..3edf30a 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
@@ -2741,6 +2741,8 @@ public class PersistentTopic extends AbstractTopic
replicators.forEach((name, replicator) -> replicator.getRateLimiter()
.ifPresent(DispatchRateLimiter::updateDispatchRate));
updateUnackedMessagesExceededOnConsumer(null);
+
+ checkDeduplicationStatus();
}
private Optional<Policies> getNamespacePolicies() {
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/TopicDuplicationTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/TopicDuplicationTest.java
index fdea264..4a37d63 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/TopicDuplicationTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/TopicDuplicationTest.java
@@ -38,7 +38,9 @@ import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Test;
import org.awaitility.Awaitility;
+import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertNotEquals;
@@ -87,6 +89,121 @@ public class TopicDuplicationTest extends
ProducerConsumerBase {
}
@Test(timeOut = 10000)
+ public void testTopicDuplicationApi2() throws Exception {
+ final String topicName = testTopic + UUID.randomUUID().toString();
+ admin.topics().createPartitionedTopic(topicName, 3);
+ waitCacheInit(topicName);
+ Boolean enabled = admin.topics().getDeduplicationStatus(topicName);
+ assertNull(enabled);
+
+ admin.topics().setDeduplicationStatus(topicName, true);
+ Awaitility.await().atMost(5, TimeUnit.SECONDS)
+ .until(() -> admin.topics().getDeduplicationStatus(topicName)
!= null);
+ Assert.assertEquals(admin.topics().getDeduplicationStatus(topicName),
true);
+
+ admin.topics().removeDeduplicationStatus(topicName);
+ Awaitility.await().atMost(5, TimeUnit.SECONDS)
+ .until(() ->
admin.topics().getMaxUnackedMessagesOnSubscription(topicName) == null);
+ assertNull(admin.topics().getDeduplicationStatus(topicName));
+ }
+
+ @Test(timeOut = 10000)
+ public void testTopicDuplicationAppliedApi() throws Exception {
+ final String topicName = testTopic + UUID.randomUUID().toString();
+ waitCacheInit(topicName);
+ assertNull(admin.namespaces().getDeduplicationStatus(myNamespace));
+ assertNull(admin.topics().getDeduplicationStatus(topicName));
+ assertEquals(admin.topics().getDeduplicationStatus(topicName,
true).booleanValue(),
+ conf.isBrokerDeduplicationEnabled());
+
+ admin.namespaces().setDeduplicationStatus(myNamespace, false);
+ Awaitility.await().untilAsserted(() ->
assertFalse(admin.topics().getDeduplicationStatus(topicName, true)));
+ admin.topics().setDeduplicationStatus(topicName, true);
+ Awaitility.await().untilAsserted(() ->
assertTrue(admin.topics().getDeduplicationStatus(topicName, true)));
+
+ admin.topics().removeDeduplicationStatus(topicName);
+ Awaitility.await().untilAsserted(() ->
assertFalse(admin.topics().getDeduplicationStatus(topicName, true)));
+ admin.namespaces().removeDeduplicationStatus(myNamespace);
+ Awaitility.await().untilAsserted(() ->
assertEquals(admin.topics().getDeduplicationStatus(topicName,
true).booleanValue(),
+ conf.isBrokerDeduplicationEnabled()));
+ }
+
+ @Test(timeOut = 30000)
+ public void testDeduplicationPriority() throws Exception {
+ final String topicName = testTopic + UUID.randomUUID().toString();
+ final String producerName = "my-producer";
+ final int maxMsgNum = 5;
+ waitCacheInit(topicName);
+ //1) Start up producer and send msg.We specified the max sequenceId
+ @Cleanup
+ Producer<String> producer =
pulsarClient.newProducer(Schema.STRING).topic(topicName)
+ .producerName(producerName).create();
+ long maxSeq = sendMessageAndGetMaxSeq(maxMsgNum, producer);
+ PersistentTopic persistentTopic =
+ (PersistentTopic)
pulsar.getBrokerService().getTopicIfExists(topicName).get().get();
+ MessageDeduplication messageDeduplication =
persistentTopic.getMessageDeduplication();
+ //broker-level deduplication is enabled in setup() by default
+ checkDeduplicationEnabled(producerName, messageDeduplication, maxSeq);
+ //disabled in namespace-level
+ admin.namespaces().setDeduplicationStatus(myNamespace, false);
+ Awaitility.await().untilAsserted(() ->
assertNotNull(admin.namespaces().getDeduplicationStatus(myNamespace)));
+ sendMessageAndGetMaxSeq(maxMsgNum, producer);
+ checkDeduplicationDisabled(producerName, messageDeduplication);
+ //enabled in topic-level
+ admin.topics().setDeduplicationStatus(topicName, true);
+ Awaitility.await().untilAsserted(() ->
assertNotNull(admin.topics().getDeduplicationStatus(topicName)));
+ Awaitility.await().untilAsserted(() ->
assertTrue(messageDeduplication.isEnabled()));
+ long maxSeq2 = sendMessageAndGetMaxSeq(maxMsgNum, producer);
+ checkDeduplicationEnabled(producerName, messageDeduplication, maxSeq2);
+ //remove topic-level, use namespace-level
+ admin.topics().removeDeduplicationStatus(topicName);
+ Awaitility.await().untilAsserted(() ->
assertNull(admin.topics().getDeduplicationStatus(topicName)));
+ Awaitility.await().untilAsserted(() ->
assertFalse(messageDeduplication.isEnabled()));
+ producer.newMessage().value("msg").sequenceId(1).send();
+ checkDeduplicationDisabled(producerName, messageDeduplication);
+ //remove namespace-level , use broker-level
+ admin.namespaces().removeDeduplicationStatus(myNamespace);
+ Awaitility.await().untilAsserted(() ->
assertNull(admin.namespaces().getDeduplicationStatus(myNamespace)));
+ Awaitility.await().untilAsserted(() ->
assertTrue(messageDeduplication.isEnabled()));
+ long maxSeq3 = sendMessageAndGetMaxSeq(maxMsgNum, producer);
+ checkDeduplicationEnabled(producerName, messageDeduplication, maxSeq3);
+ }
+
+ private long sendMessageAndGetMaxSeq(int maxMsgNum, Producer producer)
throws Exception{
+ long seq = System.nanoTime();
+ for (int i = 0; i <= maxMsgNum; i++) {
+ producer.newMessage().value("msg-" + i).sequenceId(seq + i).send();
+ }
+ return seq + maxMsgNum;
+ }
+
+ private void checkDeduplicationDisabled(String producerName,
MessageDeduplication messageDeduplication) throws Exception {
+ messageDeduplication.checkStatus().whenComplete((res, ex) -> {
+ if (ex != null) {
+ fail("should not fail");
+ }
+
assertEquals(messageDeduplication.getLastPublishedSequenceId(producerName), -1);
+
assertEquals(messageDeduplication.highestSequencedPersisted.size(), 0);
+ assertEquals(messageDeduplication.highestSequencedPushed.size(),
0);
+ }).get();
+ }
+
+ private void checkDeduplicationEnabled(String producerName,
MessageDeduplication messageDeduplication,
+ long maxSeq) throws Exception {
+ messageDeduplication.checkStatus().whenComplete((res, ex) -> {
+ if (ex != null) {
+ fail("should not fail");
+ }
+ assertNotNull(messageDeduplication.highestSequencedPersisted);
+ assertNotNull(messageDeduplication.highestSequencedPushed);
+ long seqId =
messageDeduplication.getLastPublishedSequenceId(producerName);
+ assertEquals(seqId, maxSeq);
+
assertEquals(messageDeduplication.highestSequencedPersisted.get(producerName).longValue(),
maxSeq);
+
assertEquals(messageDeduplication.highestSequencedPushed.get(producerName).longValue(),
maxSeq);
+ }).get();
+ }
+
+ @Test(timeOut = 10000)
public void testDuplicationSnapshotApi() throws Exception {
final String topicName = testTopic + UUID.randomUUID().toString();
admin.topics().createPartitionedTopic(topicName, 3);
@@ -192,27 +309,13 @@ public class TopicDuplicationTest extends
ProducerConsumerBase {
@Cleanup
Producer<String> producer =
pulsarClient.newProducer(Schema.STRING).topic(topicName)
.producerName(producerName).create();
- long seq = System.currentTimeMillis();
- for (int i = 0; i <= maxMsgNum; i++) {
- producer.newMessage().value("msg-" + i).sequenceId(seq + i).send();
- }
- long maxSeq = seq + maxMsgNum;
+ long maxSeq = sendMessageAndGetMaxSeq(maxMsgNum, producer);
//2) Max sequenceId should be recorded correctly
CompletableFuture<Optional<Topic>> completableFuture =
pulsar.getBrokerService().getTopics().get(topicName);
Topic topic = completableFuture.get(1, TimeUnit.SECONDS).get();
PersistentTopic persistentTopic = (PersistentTopic) topic;
MessageDeduplication messageDeduplication =
persistentTopic.getMessageDeduplication();
- messageDeduplication.checkStatus().whenComplete((res, ex) -> {
- if (ex != null) {
- fail("should not fail");
- }
- assertNotNull(messageDeduplication.highestSequencedPersisted);
- assertNotNull(messageDeduplication.highestSequencedPushed);
- long seqId =
messageDeduplication.getLastPublishedSequenceId(producerName);
- assertEquals(seqId, maxSeq);
-
assertEquals(messageDeduplication.highestSequencedPersisted.get(producerName).longValue(),
maxSeq);
-
assertEquals(messageDeduplication.highestSequencedPushed.get(producerName).longValue(),
maxSeq);
- }).get();
+ checkDeduplicationEnabled(producerName, messageDeduplication, maxSeq);
//3) disable the deduplication check
admin.topics().enableDeduplication(topicName, false);
Awaitility.await().atMost(5, TimeUnit.SECONDS)
@@ -221,14 +324,7 @@ public class TopicDuplicationTest extends
ProducerConsumerBase {
producer.newMessage().value("msg-" + i).sequenceId(maxSeq +
i).send();
}
//4) Max sequenceId record should be clear
- messageDeduplication.checkStatus().whenComplete((res, ex) -> {
- if (ex != null) {
- fail("should not fail");
- }
-
assertEquals(messageDeduplication.getLastPublishedSequenceId(producerName), -1);
-
assertEquals(messageDeduplication.highestSequencedPersisted.size(), 0);
- assertEquals(messageDeduplication.highestSequencedPushed.size(),
0);
- }).get();
+ checkDeduplicationDisabled(producerName, messageDeduplication);
}
@@ -315,7 +411,7 @@ public class TopicDuplicationTest extends
ProducerConsumerBase {
}
@Test(timeOut = 30000)
- private void testNamespacePolicyTakeSnapshot() throws Exception {
+ public void testNamespacePolicyTakeSnapshot() throws Exception {
resetConfig();
conf.setBrokerDeduplicationEnabled(true);
conf.setBrokerDeduplicationSnapshotFrequencyInSeconds(1);
@@ -367,7 +463,7 @@ public class TopicDuplicationTest extends
ProducerConsumerBase {
}
@Test(timeOut = 30000)
- private void testDisableNamespacePolicyTakeSnapshot() throws Exception {
+ public void testDisableNamespacePolicyTakeSnapshot() throws Exception {
resetConfig();
conf.setBrokerDeduplicationEnabled(true);
conf.setBrokerDeduplicationSnapshotFrequencyInSeconds(1);
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/ClientDeduplicationTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/ClientDeduplicationTest.java
index 347903b..bdb0cc4 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/ClientDeduplicationTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/ClientDeduplicationTest.java
@@ -19,9 +19,13 @@
package org.apache.pulsar.client.api;
import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertFalse;
import static org.testng.Assert.assertNull;
+import static org.testng.Assert.assertTrue;
+
import java.util.concurrent.TimeUnit;
+import org.awaitility.Awaitility;
import org.testng.annotations.AfterClass;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.Test;
@@ -41,6 +45,18 @@ public class ClientDeduplicationTest extends
ProducerConsumerBase {
}
@Test
+ public void testNamespaceDeduplicationApi() throws Exception {
+ final String namespace = "my-property/my-ns";
+ assertNull(admin.namespaces().getDeduplicationStatus(namespace));
+ admin.namespaces().setDeduplicationStatus(namespace, true);
+ Awaitility.await().untilAsserted(() ->
assertTrue(admin.namespaces().getDeduplicationStatus(namespace)));
+ admin.namespaces().setDeduplicationStatus(namespace, false);
+ Awaitility.await().untilAsserted(() ->
assertFalse(admin.namespaces().getDeduplicationStatus(namespace)));
+ admin.namespaces().removeDeduplicationStatus(namespace);
+ Awaitility.await().untilAsserted(() ->
assertNull(admin.namespaces().getDeduplicationStatus(namespace)));
+ }
+
+ @Test
public void testProducerSequenceAfterReconnect() throws Exception {
String topic =
"persistent://my-property/my-ns/testProducerSequenceAfterReconnect";
admin.namespaces().setDeduplicationStatus("my-property/my-ns", true);
diff --git
a/pulsar-client-admin-api/src/main/java/org/apache/pulsar/client/admin/Namespaces.java
b/pulsar-client-admin-api/src/main/java/org/apache/pulsar/client/admin/Namespaces.java
index 226877f..1323024 100644
---
a/pulsar-client-admin-api/src/main/java/org/apache/pulsar/client/admin/Namespaces.java
+++
b/pulsar-client-admin-api/src/main/java/org/apache/pulsar/client/admin/Namespaces.java
@@ -1086,6 +1086,33 @@ public interface Namespaces {
CompletableFuture<Void> deleteNamespaceAntiAffinityGroupAsync(String
namespace);
/**
+ * Remove the deduplication status for all topics within a namespace.
+ * @param namespace
+ * @throws PulsarAdminException
+ */
+ void removeDeduplicationStatus(String namespace) throws
PulsarAdminException;
+
+ /**
+ * Get the deduplication status for all topics within a namespace
asynchronously.
+ * @param namespace
+ * @return
+ */
+ CompletableFuture<Void> removeDeduplicationStatusAsync(String namespace);
+ /**
+ * Get the deduplication status for all topics within a namespace .
+ * @param namespace
+ * @return
+ * @throws PulsarAdminException
+ */
+ Boolean getDeduplicationStatus(String namespace) throws
PulsarAdminException;
+
+ /**
+ * Get the deduplication status for all topics within a namespace
asynchronously.
+ * @param namespace
+ * @return
+ */
+ CompletableFuture<Boolean> getDeduplicationStatusAsync(String namespace);
+ /**
* Set the deduplication status for all topics within a namespace.
* <p/>
* When deduplication is enabled, the broker will prevent to store the
same message multiple times.
diff --git
a/pulsar-client-admin-api/src/main/java/org/apache/pulsar/client/admin/Topics.java
b/pulsar-client-admin-api/src/main/java/org/apache/pulsar/client/admin/Topics.java
index 1cbf890..b5cc4ac 100644
---
a/pulsar-client-admin-api/src/main/java/org/apache/pulsar/client/admin/Topics.java
+++
b/pulsar-client-admin-api/src/main/java/org/apache/pulsar/client/admin/Topics.java
@@ -2118,6 +2118,7 @@ public interface Topics {
* @return
* @throws PulsarAdminException
*/
+ @Deprecated
Boolean getDeduplicationEnabled(String topic) throws PulsarAdminException;
/**
@@ -2125,14 +2126,45 @@ public interface Topics {
* @param topic
* @return
*/
+ @Deprecated
CompletableFuture<Boolean> getDeduplicationEnabledAsync(String topic);
/**
+ * get deduplication enabled of a topic.
+ * @param topic
+ * @return
+ * @throws PulsarAdminException
+ */
+ Boolean getDeduplicationStatus(String topic) throws PulsarAdminException;
+
+ /**
+ * get deduplication enabled of a topic asynchronously.
+ * @param topic
+ * @return
+ */
+ CompletableFuture<Boolean> getDeduplicationStatusAsync(String topic);
+ /**
+ * get applied deduplication enabled of a topic.
+ * @param topic
+ * @return
+ * @throws PulsarAdminException
+ */
+ Boolean getDeduplicationStatus(String topic, boolean applied) throws
PulsarAdminException;
+
+ /**
+ * get applied deduplication enabled of a topic asynchronously.
+ * @param topic
+ * @return
+ */
+ CompletableFuture<Boolean> getDeduplicationStatusAsync(String topic,
boolean applied);
+
+ /**
* set deduplication enabled of a topic.
* @param topic
* @param enabled
* @throws PulsarAdminException
*/
+ @Deprecated
void enableDeduplication(String topic, boolean enabled) throws
PulsarAdminException;
/**
@@ -2141,13 +2173,31 @@ public interface Topics {
* @param enabled
* @return
*/
+ @Deprecated
CompletableFuture<Void> enableDeduplicationAsync(String topic, boolean
enabled);
/**
+ * set deduplication enabled of a topic.
+ * @param topic
+ * @param enabled
+ * @throws PulsarAdminException
+ */
+ void setDeduplicationStatus(String topic, boolean enabled) throws
PulsarAdminException;
+
+ /**
+ * set deduplication enabled of a topic asynchronously.
+ * @param topic
+ * @param enabled
+ * @return
+ */
+ CompletableFuture<Void> setDeduplicationStatusAsync(String topic, boolean
enabled);
+
+ /**
* remove deduplication enabled of a topic.
* @param topic
* @throws PulsarAdminException
*/
+ @Deprecated
void disableDeduplication(String topic) throws PulsarAdminException;
/**
@@ -2155,9 +2205,24 @@ public interface Topics {
* @param topic
* @return
*/
+ @Deprecated
CompletableFuture<Void> disableDeduplicationAsync(String topic);
/**
+ * remove deduplication enabled of a topic.
+ * @param topic
+ * @throws PulsarAdminException
+ */
+ void removeDeduplicationStatus(String topic) throws PulsarAdminException;
+
+ /**
+ * remove deduplication enabled of a topic asynchronously.
+ * @param topic
+ * @return
+ */
+ CompletableFuture<Void> removeDeduplicationStatusAsync(String topic);
+
+ /**
* Set message-dispatch-rate (topic can dispatch this many messages per
second).
*
* @param topic
diff --git
a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/NamespacesImpl.java
b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/NamespacesImpl.java
index 40ef320..1b70f79 100644
---
a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/NamespacesImpl.java
+++
b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/NamespacesImpl.java
@@ -870,6 +870,63 @@ public class NamespacesImpl extends BaseResource
implements Namespaces {
}
@Override
+ public void removeDeduplicationStatus(String namespace) throws
PulsarAdminException {
+ try {
+ removeDeduplicationStatusAsync(namespace)
+ .get(this.readTimeoutMs, TimeUnit.MILLISECONDS);
+ } catch (ExecutionException e) {
+ throw (PulsarAdminException) e.getCause();
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ throw new PulsarAdminException(e);
+ } catch (TimeoutException e) {
+ throw new PulsarAdminException.TimeoutException(e);
+ }
+ }
+
+ @Override
+ public CompletableFuture<Void> removeDeduplicationStatusAsync(String
namespace) {
+ NamespaceName ns = NamespaceName.get(namespace);
+ WebTarget path = namespacePath(ns, "deduplication");
+ return asyncDeleteRequest(path);
+ }
+
+ @Override
+ public Boolean getDeduplicationStatus(String namespace) throws
PulsarAdminException {
+ try {
+ return getDeduplicationStatusAsync(namespace)
+ .get(this.readTimeoutMs, TimeUnit.MILLISECONDS);
+ } catch (ExecutionException e) {
+ throw (PulsarAdminException) e.getCause();
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ throw new PulsarAdminException(e);
+ } catch (TimeoutException e) {
+ throw new PulsarAdminException.TimeoutException(e);
+ }
+ }
+
+ @Override
+ public CompletableFuture<Boolean> getDeduplicationStatusAsync(String
namespace) {
+ NamespaceName ns = NamespaceName.get(namespace);
+ WebTarget path = namespacePath(ns, "deduplication");
+ final CompletableFuture<Boolean> future = new CompletableFuture<>();
+ asyncGetRequest(path,
+ new InvocationCallback<Boolean>() {
+ @Override
+ public void completed(Boolean enabled) {
+ future.complete(enabled);
+ }
+
+ @Override
+ public void failed(Throwable throwable) {
+
future.completeExceptionally(getApiException(throwable.getCause()));
+ }
+ });
+ return future;
+ }
+
+ @Override
public void setDeduplicationStatus(String namespace, boolean
enableDeduplication) throws PulsarAdminException {
try {
setDeduplicationStatusAsync(namespace, enableDeduplication)
diff --git
a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/TopicsImpl.java
b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/TopicsImpl.java
index e4ca396..4e5ee73 100644
---
a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/TopicsImpl.java
+++
b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/TopicsImpl.java
@@ -1849,6 +1849,51 @@ public class TopicsImpl extends BaseResource implements
Topics {
}
@Override
+ public Boolean getDeduplicationStatus(String topic) throws
PulsarAdminException {
+ return getDeduplicationStatus(topic, false);
+ }
+
+ @Override
+ public CompletableFuture<Boolean> getDeduplicationStatusAsync(String
topic) {
+ return getDeduplicationStatusAsync(topic, false);
+ }
+
+ @Override
+ public Boolean getDeduplicationStatus(String topic, boolean applied)
throws PulsarAdminException {
+ try {
+ return getDeduplicationStatusAsync(topic, applied).
+ get(this.readTimeoutMs, TimeUnit.MILLISECONDS);
+ } catch (ExecutionException e) {
+ throw (PulsarAdminException) e.getCause();
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ throw new PulsarAdminException(e);
+ } catch (TimeoutException e) {
+ throw new PulsarAdminException.TimeoutException(e);
+ }
+ }
+
+ @Override
+ public CompletableFuture<Boolean> getDeduplicationStatusAsync(String
topic, boolean applied) {
+ TopicName topicName = validateTopic(topic);
+ WebTarget path = topicPath(topicName, "deduplicationEnabled");
+ path = path.queryParam("applied", applied);
+ final CompletableFuture<Boolean> future = new CompletableFuture<>();
+ asyncGetRequest(path, new InvocationCallback<Boolean>() {
+ @Override
+ public void completed(Boolean enabled) {
+ future.complete(enabled);
+ }
+
+ @Override
+ public void failed(Throwable throwable) {
+
future.completeExceptionally(getApiException(throwable.getCause()));
+ }
+ });
+ return future;
+ }
+
+ @Override
public void enableDeduplication(String topic, boolean enabled) throws
PulsarAdminException {
try {
enableDeduplicationAsync(topic, enabled).
@@ -1871,6 +1916,28 @@ public class TopicsImpl extends BaseResource implements
Topics {
}
@Override
+ public void setDeduplicationStatus(String topic, boolean enabled) throws
PulsarAdminException {
+ try {
+ enableDeduplicationAsync(topic, enabled).
+ get(this.readTimeoutMs, TimeUnit.MILLISECONDS);
+ } catch (ExecutionException e) {
+ throw (PulsarAdminException) e.getCause();
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ throw new PulsarAdminException(e);
+ } catch (TimeoutException e) {
+ throw new PulsarAdminException.TimeoutException(e);
+ }
+ }
+
+ @Override
+ public CompletableFuture<Void> setDeduplicationStatusAsync(String topic,
boolean enabled) {
+ TopicName topicName = validateTopic(topic);
+ WebTarget path = topicPath(topicName, "deduplicationEnabled");
+ return asyncPostRequest(path, Entity.entity(enabled,
MediaType.APPLICATION_JSON));
+ }
+
+ @Override
public void disableDeduplication(String topic) throws PulsarAdminException
{
try {
disableDeduplicationAsync(topic).
@@ -1893,6 +1960,27 @@ public class TopicsImpl extends BaseResource implements
Topics {
}
@Override
+ public void removeDeduplicationStatus(String topic) throws
PulsarAdminException {
+ try {
+ removeDeduplicationStatusAsync(topic).get(this.readTimeoutMs,
TimeUnit.MILLISECONDS);
+ } catch (ExecutionException e) {
+ throw (PulsarAdminException) e.getCause();
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ throw new PulsarAdminException(e);
+ } catch (TimeoutException e) {
+ throw new PulsarAdminException.TimeoutException(e);
+ }
+ }
+
+ @Override
+ public CompletableFuture<Void> removeDeduplicationStatusAsync(String
topic) {
+ TopicName tn = validateTopic(topic);
+ WebTarget path = topicPath(tn, "deduplicationEnabled");
+ return asyncDeleteRequest(path);
+ }
+
+ @Override
public OffloadPolicies getOffloadPolicies(String topic) throws
PulsarAdminException {
return getOffloadPolicies(topic, false);
}
diff --git
a/pulsar-client-tools-test/src/test/java/org/apache/pulsar/admin/cli/PulsarAdminToolTest.java
b/pulsar-client-tools-test/src/test/java/org/apache/pulsar/admin/cli/PulsarAdminToolTest.java
index 64dffbc..e519b17 100644
---
a/pulsar-client-tools-test/src/test/java/org/apache/pulsar/admin/cli/PulsarAdminToolTest.java
+++
b/pulsar-client-tools-test/src/test/java/org/apache/pulsar/admin/cli/PulsarAdminToolTest.java
@@ -365,8 +365,12 @@ public class PulsarAdminToolTest {
namespaces.run(split("set-subscription-expiration-time
myprop/clust/ns1 -t 60"));
verify(mockNamespaces).setSubscriptionExpirationTime("myprop/clust/ns1", 60);
+ namespaces.run(split("get-deduplication myprop/clust/ns1"));
+ verify(mockNamespaces).getDeduplicationStatus("myprop/clust/ns1");
namespaces.run(split("set-deduplication myprop/clust/ns1 --enable"));
verify(mockNamespaces).setDeduplicationStatus("myprop/clust/ns1",
true);
+ namespaces.run(split("remove-deduplication myprop/clust/ns1"));
+ verify(mockNamespaces).removeDeduplicationStatus("myprop/clust/ns1");
namespaces.run(split("set-auto-topic-creation myprop/clust/ns1 -e -t
non-partitioned"));
verify(mockNamespaces).setAutoTopicCreation("myprop/clust/ns1",
@@ -773,7 +777,10 @@ public class PulsarAdminToolTest {
verify(mockTopics).enableDeduplication("persistent://myprop/clust/ns1/ds1",
false);
cmdTopics.run(split("set-deduplication
persistent://myprop/clust/ns1/ds1 --disable"));
- verify(mockTopics,
times(2)).enableDeduplication("persistent://myprop/clust/ns1/ds1", false);
+
verify(mockTopics).setDeduplicationStatus("persistent://myprop/clust/ns1/ds1",
false);
+
+ cmdTopics.run(split("remove-deduplication
persistent://myprop/clust/ns1/ds1"));
+
verify(mockTopics).removeDeduplicationStatus("persistent://myprop/clust/ns1/ds1");
cmdTopics.run(split("get-replicator-dispatch-rate
persistent://myprop/clust/ns1/ds1"));
verify(mockTopics).getReplicatorDispatchRate("persistent://myprop/clust/ns1/ds1");
@@ -793,9 +800,9 @@ public class PulsarAdminToolTest {
verify(mockTopics).removeReplicatorDispatchRate("persistent://myprop/clust/ns1/ds1");
cmdTopics.run(split("get-deduplication-enabled
persistent://myprop/clust/ns1/ds1"));
-
verify(mockTopics).getDeduplicationEnabled("persistent://myprop/clust/ns1/ds1");
+
verify(mockTopics).getDeduplicationStatus("persistent://myprop/clust/ns1/ds1");
cmdTopics.run(split("get-deduplication
persistent://myprop/clust/ns1/ds1"));
- verify(mockTopics,
times(2)).getDeduplicationEnabled("persistent://myprop/clust/ns1/ds1");
+ verify(mockTopics,
times(2)).getDeduplicationStatus("persistent://myprop/clust/ns1/ds1");
cmdTopics.run(split("get-offload-policies
persistent://myprop/clust/ns1/ds1"));
verify(mockTopics).getOffloadPolicies("persistent://myprop/clust/ns1/ds1",
false);
diff --git
a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdNamespaces.java
b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdNamespaces.java
index 7e261cc..e988e82 100644
---
a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdNamespaces.java
+++
b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdNamespaces.java
@@ -478,6 +478,29 @@ public class CmdNamespaces extends CmdBase {
}
}
+ @Parameters(commandDescription = "Get Deduplication for a namespace")
+ private class GetDeduplication extends CliCommand {
+ @Parameter(description = "tenant/namespace\n", required = true)
+ private java.util.List<String> params;
+
+ @Override
+ void run() throws PulsarAdminException {
+ String namespace = validateNamespace(params);
+ print(getAdmin().namespaces().getDeduplicationStatus(namespace));
+ }
+ }
+
+ @Parameters(commandDescription = "Remove Deduplication for a namespace")
+ private class RemoveDeduplication extends CliCommand {
+ @Parameter(description = "tenant/namespace\n", required = true)
+ private java.util.List<String> params;
+
+ @Override
+ void run() throws PulsarAdminException {
+ String namespace = validateNamespace(params);
+ getAdmin().namespaces().removeDeduplicationStatus(namespace);
+ }
+ }
@Parameters(commandDescription = "Enable or disable deduplication for a
namespace")
private class SetDeduplication extends CliCommand {
@@ -2014,6 +2037,8 @@ public class CmdNamespaces extends CmdBase {
jcommander.addCommand("delete-anti-affinity-group", new
DeleteAntiAffinityGroup());
jcommander.addCommand("set-deduplication", new SetDeduplication());
+ jcommander.addCommand("get-deduplication", new GetDeduplication());
+ jcommander.addCommand("remove-deduplication", new
RemoveDeduplication());
jcommander.addCommand("set-auto-topic-creation", new
SetAutoTopicCreation());
jcommander.addCommand("remove-auto-topic-creation", new
RemoveAutoTopicCreation());
diff --git
a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdTopics.java
b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdTopics.java
index 2bd133a..3d6f82e 100644
---
a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdTopics.java
+++
b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdTopics.java
@@ -131,10 +131,11 @@ public class CmdTopics extends CmdBase {
//deprecated commands
jcommander.addCommand("enable-deduplication", new
EnableDeduplication());
jcommander.addCommand("disable-deduplication", new
DisableDeduplication());
- jcommander.addCommand("get-deduplication-enabled", new
GetDeduplicationEnabled());
+ jcommander.addCommand("get-deduplication-enabled", new
GetDeduplicationStatus());
- jcommander.addCommand("set-deduplication", new SetDeduplication());
- jcommander.addCommand("get-deduplication", new
GetDeduplicationEnabled());
+ jcommander.addCommand("set-deduplication", new
SetDeduplicationStatus());
+ jcommander.addCommand("get-deduplication", new
GetDeduplicationStatus());
+ jcommander.addCommand("remove-deduplication", new
RemoveDeduplicationStatus());
jcommander.addCommand("get-deduplication-snapshot-interval", new
GetDeduplicationSnapshotInterval());
jcommander.addCommand("set-deduplication-snapshot-interval", new
SetDeduplicationSnapshotInterval());
@@ -1346,7 +1347,7 @@ public class CmdTopics extends CmdBase {
}
@Parameters(commandDescription = "Enable or disable deduplication for a
topic")
- private class SetDeduplication extends CliCommand {
+ private class SetDeduplicationStatus extends CliCommand {
@Parameter(description = "persistent://tenant/namespace/topic",
required = true)
private java.util.List<String> params;
@@ -1363,19 +1364,31 @@ public class CmdTopics extends CmdBase {
if (enable == disable) {
throw new ParameterException("Need to specify either --enable
or --disable");
}
- getAdmin().topics().enableDeduplication(persistentTopic, enable);
+ getAdmin().topics().setDeduplicationStatus(persistentTopic,
enable);
}
}
@Parameters(commandDescription = "Get the deduplication policy for a
topic")
- private class GetDeduplicationEnabled extends CliCommand {
+ private class GetDeduplicationStatus extends CliCommand {
@Parameter(description = "persistent://tenant/namespace/topic",
required = true)
private java.util.List<String> params;
@Override
void run() throws PulsarAdminException {
String persistentTopic = validatePersistentTopic(params);
-
print(getAdmin().topics().getDeduplicationEnabled(persistentTopic));
+ print(getAdmin().topics().getDeduplicationStatus(persistentTopic));
+ }
+ }
+
+ @Parameters(commandDescription = "Remove the deduplication policy for a
topic")
+ private class RemoveDeduplicationStatus extends CliCommand {
+ @Parameter(description = "persistent://tenant/namespace/topic",
required = true)
+ private java.util.List<String> params;
+
+ @Override
+ void run() throws PulsarAdminException {
+ String persistentTopic = validatePersistentTopic(params);
+ getAdmin().topics().removeDeduplicationStatus(persistentTopic);
}
}