This is an automated email from the ASF dual-hosted git repository.
linlin pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 53bc0d5 Deduplication status support cross multiple clusters (#13487)
53bc0d5 is described below
commit 53bc0d5b2a4f5221b053d7ea49595468b445a2d3
Author: feynmanlin <[email protected]>
AuthorDate: Sun Dec 26 17:13:33 2021 +0800
Deduplication status support cross multiple clusters (#13487)
* Persistence policies support cross multiple clusters
---
.../broker/admin/impl/PersistentTopicsBase.java | 9 ++--
.../pulsar/broker/admin/v2/PersistentTopics.java | 9 ++--
.../service/ReplicatorTopicPoliciesTest.java | 20 +++++++
.../pulsar/admin/cli/PulsarAdminToolTest.java | 14 +++++
.../apache/pulsar/admin/cli/CmdTopicPolicies.java | 61 ++++++++++++++++++++++
.../org/apache/pulsar/admin/cli/CmdTopics.java | 4 ++
6 files changed, 110 insertions(+), 7 deletions(-)
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
index 0f56e2b..8520d11 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
@@ -2893,8 +2893,8 @@ public class PersistentTopicsBase extends AdminResource {
);
}
- protected CompletableFuture<Boolean> internalGetDeduplication(boolean
applied) {
- return getTopicPoliciesAsyncWithRetry(topicName)
+ protected CompletableFuture<Boolean> internalGetDeduplication(boolean
applied, boolean isGlobal) {
+ return getTopicPoliciesAsyncWithRetry(topicName, isGlobal)
.thenApply(op -> op.map(TopicPolicies::getDeduplicationEnabled)
.orElseGet(() -> {
if (applied) {
@@ -2905,11 +2905,12 @@ public class PersistentTopicsBase extends AdminResource
{
}));
}
- protected CompletableFuture<Void> internalSetDeduplication(Boolean
enabled) {
- return getTopicPoliciesAsyncWithRetry(topicName)
+ protected CompletableFuture<Void> internalSetDeduplication(Boolean
enabled, boolean isGlobal) {
+ return getTopicPoliciesAsyncWithRetry(topicName, isGlobal)
.thenCompose(op -> {
TopicPolicies topicPolicies = op.orElseGet(TopicPolicies::new);
topicPolicies.setDeduplicationEnabled(enabled);
+ topicPolicies.setIsGlobal(isGlobal);
return
pulsar().getTopicPoliciesService().updateTopicPoliciesAsync(topicName,
topicPolicies);
});
}
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java
index 253b388..50eb291 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java
@@ -1872,11 +1872,12 @@ public class PersistentTopics extends
PersistentTopicsBase {
@PathParam("namespace") String namespace,
@PathParam("topic") @Encoded String encodedTopic,
@QueryParam("applied") boolean applied,
+ @QueryParam("isGlobal") @DefaultValue("false") boolean isGlobal,
@ApiParam(value = "Is authentication required to perform this
operation")
@QueryParam("authoritative") @DefaultValue("false") boolean
authoritative) {
validateTopicName(tenant, namespace, encodedTopic);
preValidation(authoritative)
- .thenCompose(__ -> internalGetDeduplication(applied))
+ .thenCompose(__ -> internalGetDeduplication(applied, isGlobal))
.thenApply(asyncResponse::resume)
.exceptionally(ex -> {
handleTopicPolicyException("getDeduplication", ex,
asyncResponse);
@@ -1897,12 +1898,13 @@ public class PersistentTopics extends
PersistentTopicsBase {
@PathParam("namespace") String namespace,
@PathParam("topic") @Encoded String encodedTopic,
@ApiParam(value = "Is authentication required to perform this
operation")
+ @QueryParam("isGlobal") @DefaultValue("false") boolean isGlobal,
@QueryParam("authoritative") @DefaultValue("false") boolean
authoritative,
@ApiParam(value = "DeduplicationEnabled policies for the specified
topic")
Boolean enabled) {
validateTopicName(tenant, namespace, encodedTopic);
preValidation(authoritative)
- .thenCompose(__ -> internalSetDeduplication(enabled))
+ .thenCompose(__ -> internalSetDeduplication(enabled, isGlobal))
.thenRun(() -> asyncResponse.resume(Response.noContent().build()))
.exceptionally(ex -> {
handleTopicPolicyException("setDeduplication", ex,
asyncResponse);
@@ -1923,10 +1925,11 @@ public class PersistentTopics extends
PersistentTopicsBase {
@PathParam("namespace") String namespace,
@PathParam("topic") @Encoded String encodedTopic,
@ApiParam(value = "Is authentication required to perform this
operation")
+ @QueryParam("isGlobal") @DefaultValue("false") boolean isGlobal,
@QueryParam("authoritative") @DefaultValue("false") boolean
authoritative) {
validateTopicName(tenant, namespace, encodedTopic);
preValidation(authoritative)
- .thenCompose(__ -> internalSetDeduplication(null))
+ .thenCompose(__ -> internalSetDeduplication(null, isGlobal))
.thenRun(() -> asyncResponse.resume(Response.noContent().build()))
.exceptionally(ex -> {
handleTopicPolicyException("removeDeduplication", ex,
asyncResponse);
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorTopicPoliciesTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorTopicPoliciesTest.java
index 959f6b9..3ac9917 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorTopicPoliciesTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorTopicPoliciesTest.java
@@ -127,6 +127,26 @@ public class ReplicatorTopicPoliciesTest extends
ReplicatorTestBase {
}
@Test
+ public void testReplicateDeduplicationStatusPolicies() throws Exception {
+ final String namespace = "pulsar/partitionedNs-" + UUID.randomUUID();
+ final String topic = "persistent://" + namespace + "/topic" +
UUID.randomUUID();
+ init(namespace, topic);
+ // set subscription types policies
+ admin1.topicPolicies(true).setDeduplicationStatus(topic, true);
+ Awaitility.await().ignoreExceptions().untilAsserted(() ->
+
assertTrue(admin2.topicPolicies(true).getDeduplicationStatus(topic)));
+ Awaitility.await().ignoreExceptions().untilAsserted(() ->
+
assertTrue(admin3.topicPolicies(true).getDeduplicationStatus(topic)));
+ // remove subscription types policies
+ admin1.topicPolicies(true).removeDeduplicationStatus(topic);
+ Awaitility.await().untilAsserted(() ->
+
assertNull(admin2.topicPolicies(true).getDeduplicationStatus(topic)));
+ Awaitility.await().untilAsserted(() ->
+
assertNull(admin3.topicPolicies(true).getDeduplicationStatus(topic)));
+
+ }
+
+ @Test
public void testReplicatorTopicPolicies() throws Exception {
final String namespace = "pulsar/partitionedNs-" + UUID.randomUUID();
final String persistentTopicName = "persistent://" + namespace +
"/topic" + UUID.randomUUID();
diff --git
a/pulsar-client-tools-test/src/test/java/org/apache/pulsar/admin/cli/PulsarAdminToolTest.java
b/pulsar-client-tools-test/src/test/java/org/apache/pulsar/admin/cli/PulsarAdminToolTest.java
index d19047f..5394475 100644
---
a/pulsar-client-tools-test/src/test/java/org/apache/pulsar/admin/cli/PulsarAdminToolTest.java
+++
b/pulsar-client-tools-test/src/test/java/org/apache/pulsar/admin/cli/PulsarAdminToolTest.java
@@ -915,6 +915,13 @@ public class PulsarAdminToolTest {
cmdTopics.run(split("remove-message-ttl
persistent://myprop/clust/ns1/ds1"));
verify(mockTopicsPolicies).removeMessageTTL("persistent://myprop/clust/ns1/ds1");
+ cmdTopics.run(split("get-deduplication
persistent://myprop/clust/ns1/ds1"));
+
verify(mockTopicsPolicies).getDeduplicationStatus("persistent://myprop/clust/ns1/ds1");
+ cmdTopics.run(split("set-deduplication
persistent://myprop/clust/ns1/ds1 --disable"));
+
verify(mockTopicsPolicies).setDeduplicationStatus("persistent://myprop/clust/ns1/ds1",
false);
+ cmdTopics.run(split("remove-deduplication
persistent://myprop/clust/ns1/ds1"));
+
verify(mockTopicsPolicies).removeDeduplicationStatus("persistent://myprop/clust/ns1/ds1");
+
// Reset the cmd, and check global option
cmdTopics = new CmdTopicPolicies(() -> admin);
cmdTopics.run(split("get-retention persistent://myprop/clust/ns1/ds1
-g"));
@@ -967,6 +974,13 @@ public class PulsarAdminToolTest {
new PersistencePolicies(2, 1, 1, 100.0d));
cmdTopics.run(split("remove-persistence
persistent://myprop/clust/ns1/ds1 -g"));
verify(mockGlobalTopicsPolicies).removePersistence("persistent://myprop/clust/ns1/ds1");
+
+ cmdTopics.run(split("get-deduplication
persistent://myprop/clust/ns1/ds1 -g"));
+
verify(mockGlobalTopicsPolicies).getDeduplicationStatus("persistent://myprop/clust/ns1/ds1");
+ cmdTopics.run(split("set-deduplication
persistent://myprop/clust/ns1/ds1 --disable -g"));
+
verify(mockGlobalTopicsPolicies).setDeduplicationStatus("persistent://myprop/clust/ns1/ds1",
false);
+ cmdTopics.run(split("remove-deduplication
persistent://myprop/clust/ns1/ds1 -g"));
+
verify(mockGlobalTopicsPolicies).removeDeduplicationStatus("persistent://myprop/clust/ns1/ds1");
}
@Test
diff --git
a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdTopicPolicies.java
b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdTopicPolicies.java
index 1850f4b..ddbe091 100644
---
a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdTopicPolicies.java
+++
b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdTopicPolicies.java
@@ -49,6 +49,10 @@ public class CmdTopicPolicies extends CmdBase {
jcommander.addCommand("set-backlog-quota", new SetBacklogQuota());
jcommander.addCommand("remove-backlog-quota", new
RemoveBacklogQuota());
+ jcommander.addCommand("set-deduplication", new
SetDeduplicationStatus());
+ jcommander.addCommand("get-deduplication", new
GetDeduplicationStatus());
+ jcommander.addCommand("remove-deduplication", new
RemoveDeduplicationStatus());
+
jcommander.addCommand("get-persistence", new GetPersistence());
jcommander.addCommand("set-persistence", new SetPersistence());
jcommander.addCommand("remove-persistence", new RemovePersistence());
@@ -191,6 +195,63 @@ public class CmdTopicPolicies extends CmdBase {
}
}
+ @Parameters(commandDescription = "Enable or disable status for a topic")
+ private class SetDeduplicationStatus extends CliCommand {
+ @Parameter(description = "persistent://tenant/namespace/topic",
required = true)
+ private java.util.List<String> params;
+
+ @Parameter(names = { "--enable", "-e" }, description = "Enable
deduplication")
+ private boolean enable = false;
+
+ @Parameter(names = { "--disable", "-d" }, description = "Disable
deduplication")
+ private boolean disable = false;
+
+ @Parameter(names = { "--global", "-g" }, description = "Whether to set
this policy globally. "
+ + "If set to true, the removing operation will be replicate to
other clusters asynchronously")
+ private boolean isGlobal = false;
+
+ @Override
+ void run() throws PulsarAdminException {
+ String persistentTopic = validatePersistentTopic(params);
+
+ if (enable == disable) {
+ throw new ParameterException("Need to specify either --enable
or --disable");
+ }
+ getTopicPolicies(isGlobal).setDeduplicationStatus(persistentTopic,
enable);
+ }
+ }
+
+ @Parameters(commandDescription = "Get the deduplication status for a
topic")
+ private class GetDeduplicationStatus extends CliCommand {
+ @Parameter(description = "persistent://tenant/namespace/topic",
required = true)
+ private java.util.List<String> params;
+
+ @Parameter(names = { "--global", "-g" }, description = "Whether to get
this policy globally. ")
+ private boolean isGlobal = false;
+
+ @Override
+ void run() throws PulsarAdminException {
+ String persistentTopic = validatePersistentTopic(params);
+
print(getTopicPolicies(isGlobal).getDeduplicationStatus(persistentTopic));
+ }
+ }
+
+ @Parameters(commandDescription = "Remove the deduplication status for a
topic")
+ private class RemoveDeduplicationStatus extends CliCommand {
+ @Parameter(description = "persistent://tenant/namespace/topic",
required = true)
+ private java.util.List<String> params;
+
+ @Parameter(names = { "--global", "-g" }, description = "Whether to
remove this policy globally. "
+ + "If set to true, the removing operation will be replicate to
other clusters asynchronously")
+ private boolean isGlobal = false;
+
+ @Override
+ void run() throws PulsarAdminException {
+ String persistentTopic = validatePersistentTopic(params);
+
getTopicPolicies(isGlobal).removeDeduplicationStatus(persistentTopic);
+ }
+ }
+
@Parameters(commandDescription = "Get the backlog quota policies for a
topic")
private class GetBacklogQuotaMap extends CliCommand {
@Parameter(description = "persistent://tenant/namespace/topic",
required = true)
diff --git
a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdTopics.java
b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdTopics.java
index 6930b0e..c97aaac 100644
---
a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdTopics.java
+++
b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdTopics.java
@@ -281,6 +281,10 @@ public class CmdTopics extends CmdBase {
cmdUsageFormatter.addDeprecatedCommand("get-persistence");
cmdUsageFormatter.addDeprecatedCommand("set-persistence");
cmdUsageFormatter.addDeprecatedCommand("remove-persistence");
+
+ cmdUsageFormatter.addDeprecatedCommand("get-deduplication");
+ cmdUsageFormatter.addDeprecatedCommand("set-deduplication");
+ cmdUsageFormatter.addDeprecatedCommand("remove-deduplication");
}
}