This is an automated email from the ASF dual-hosted git repository.
linlin pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 9f599c9 Persistence topic policies support cross multiple clusters
(#13483)
9f599c9 is described below
commit 9f599c9572e5d9b1f15efa6e895e7eb29b284e57
Author: feynmanlin <[email protected]>
AuthorDate: Fri Dec 24 15:44:16 2021 +0800
Persistence topic policies support cross multiple clusters (#13483)
* Persistence policies support cross multiple clusters
---
.../broker/admin/impl/PersistentTopicsBase.java | 15 +++--
.../pulsar/broker/admin/v2/PersistentTopics.java | 9 ++-
.../service/ReplicatorTopicPoliciesTest.java | 22 +++++++
.../pulsar/admin/cli/PulsarAdminToolTest.java | 16 +++++
.../apache/pulsar/admin/cli/CmdTopicPolicies.java | 71 ++++++++++++++++++++++
.../org/apache/pulsar/admin/cli/CmdTopics.java | 4 ++
6 files changed, 128 insertions(+), 9 deletions(-)
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
index 21b8540..f383fc7 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
@@ -2999,8 +2999,8 @@ public class PersistentTopicsBase extends AdminResource {
});
}
- protected CompletableFuture<PersistencePolicies>
internalGetPersistence(boolean applied) {
- return getTopicPoliciesAsyncWithRetry(topicName)
+ protected CompletableFuture<PersistencePolicies>
internalGetPersistence(boolean applied, boolean isGlobal) {
+ return getTopicPoliciesAsyncWithRetry(topicName, isGlobal)
.thenApply(op -> op.map(TopicPolicies::getPersistence)
.orElseGet(() -> {
if (applied) {
@@ -3018,23 +3018,26 @@ public class PersistentTopicsBase extends AdminResource
{
}));
}
- protected CompletableFuture<Void>
internalSetPersistence(PersistencePolicies persistencePolicies) {
+ protected CompletableFuture<Void>
internalSetPersistence(PersistencePolicies persistencePolicies,
+ boolean isGlobal)
{
validatePersistencePolicies(persistencePolicies);
- return getTopicPoliciesAsyncWithRetry(topicName)
+ return getTopicPoliciesAsyncWithRetry(topicName, isGlobal)
.thenCompose(op -> {
TopicPolicies topicPolicies = op.orElseGet(TopicPolicies::new);
topicPolicies.setPersistence(persistencePolicies);
+ topicPolicies.setIsGlobal(isGlobal);
return
pulsar().getTopicPoliciesService().updateTopicPoliciesAsync(topicName,
topicPolicies);
});
}
- protected CompletableFuture<Void> internalRemovePersistence() {
- return getTopicPoliciesAsyncWithRetry(topicName)
+ protected CompletableFuture<Void> internalRemovePersistence(boolean
isGlobal) {
+ return getTopicPoliciesAsyncWithRetry(topicName, isGlobal)
.thenCompose(op -> {
if (!op.isPresent()) {
return CompletableFuture.completedFuture(null);
}
op.get().setPersistence(null);
+ op.get().setIsGlobal(isGlobal);
return
pulsar().getTopicPoliciesService().updateTopicPoliciesAsync(topicName,
op.get());
});
}
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java
index 39b1c03..25fbf83 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java
@@ -2039,11 +2039,12 @@ public class PersistentTopics extends
PersistentTopicsBase {
@PathParam("namespace") String namespace,
@PathParam("topic") @Encoded String encodedTopic,
@QueryParam("applied") boolean applied,
+ @QueryParam("isGlobal") @DefaultValue("false") boolean isGlobal,
@ApiParam(value = "Is authentication required to perform this
operation")
@QueryParam("authoritative") @DefaultValue("false") boolean
authoritative) {
validateTopicName(tenant, namespace, encodedTopic);
preValidation(authoritative)
- .thenCompose(__ -> internalGetPersistence(applied))
+ .thenCompose(__ -> internalGetPersistence(applied, isGlobal))
.thenApply(asyncResponse::resume)
.exceptionally(ex -> {
handleTopicPolicyException("getPersistence", ex,
asyncResponse);
@@ -2066,11 +2067,12 @@ public class PersistentTopics extends
PersistentTopicsBase {
@PathParam("topic") @Encoded String encodedTopic,
@ApiParam(value = "Is authentication required to perform this
operation")
@QueryParam("authoritative") @DefaultValue("false") boolean
authoritative,
+ @QueryParam("isGlobal") @DefaultValue("false") boolean isGlobal,
@ApiParam(value = "Bookkeeper persistence policies for specified
topic")
PersistencePolicies persistencePolicies) {
validateTopicName(tenant, namespace, encodedTopic);
preValidation(authoritative)
- .thenCompose(__ -> internalSetPersistence(persistencePolicies))
+ .thenCompose(__ -> internalSetPersistence(persistencePolicies,
isGlobal))
.thenRun(() -> {
try {
log.info("[{}] Successfully updated persistence policies: "
@@ -2101,11 +2103,12 @@ public class PersistentTopics extends
PersistentTopicsBase {
@PathParam("tenant") String tenant,
@PathParam("namespace") String namespace,
@PathParam("topic") @Encoded String encodedTopic,
+ @QueryParam("isGlobal") @DefaultValue("false") boolean isGlobal,
@ApiParam(value = "Is authentication required to perform this
operation")
@QueryParam("authoritative") @DefaultValue("false") boolean
authoritative) {
validateTopicName(tenant, namespace, encodedTopic);
preValidation(authoritative)
- .thenCompose(__ -> internalRemovePersistence())
+ .thenCompose(__ -> internalRemovePersistence(isGlobal))
.thenRun(() -> {
log.info("[{}] Successfully remove persistence policies:
namespace={}, topic={}",
clientAppId(),
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorTopicPoliciesTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorTopicPoliciesTest.java
index c8c5553..3c0e3fb 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorTopicPoliciesTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorTopicPoliciesTest.java
@@ -28,6 +28,7 @@ import org.apache.pulsar.broker.PulsarServerException;
import org.apache.pulsar.client.admin.PulsarAdminException;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.common.naming.TopicName;
+import org.apache.pulsar.common.policies.data.PersistencePolicies;
import org.apache.pulsar.common.policies.data.RetentionPolicies;
import org.apache.pulsar.common.policies.data.impl.BacklogQuotaImpl;
import org.awaitility.Awaitility;
@@ -86,6 +87,27 @@ public class ReplicatorTopicPoliciesTest extends
ReplicatorTestBase {
}
@Test
+ public void testReplicatePersistentPolicies() throws Exception {
+ final String namespace = "pulsar/partitionedNs-" + UUID.randomUUID();
+ final String topic = "persistent://" + namespace + "/topic" +
UUID.randomUUID();
+ init(namespace, topic);
+ // set PersistencePolicies
+ PersistencePolicies policies = new PersistencePolicies(5, 3, 2, 1000);
+ admin1.topicPolicies(true).setPersistence(topic, policies);
+
+ Awaitility.await().untilAsserted(() ->
+ assertEquals(admin2.topicPolicies(true).getPersistence(topic),
policies));
+ Awaitility.await().untilAsserted(() ->
+ assertEquals(admin3.topicPolicies(true).getPersistence(topic),
policies));
+ //remove PersistencePolicies
+ admin1.topicPolicies(true).removePersistence(topic);
+ Awaitility.await().untilAsserted(() ->
+ assertNull(admin2.topicPolicies(true).getPersistence(topic)));
+ Awaitility.await().untilAsserted(() ->
+ assertNull(admin3.topicPolicies(true).getPersistence(topic)));
+ }
+
+ @Test
public void testReplicatorTopicPolicies() throws Exception {
final String namespace = "pulsar/partitionedNs-" + UUID.randomUUID();
final String persistentTopicName = "persistent://" + namespace +
"/topic" + UUID.randomUUID();
diff --git
a/pulsar-client-tools-test/src/test/java/org/apache/pulsar/admin/cli/PulsarAdminToolTest.java
b/pulsar-client-tools-test/src/test/java/org/apache/pulsar/admin/cli/PulsarAdminToolTest.java
index 90ae899..b4adf64 100644
---
a/pulsar-client-tools-test/src/test/java/org/apache/pulsar/admin/cli/PulsarAdminToolTest.java
+++
b/pulsar-client-tools-test/src/test/java/org/apache/pulsar/admin/cli/PulsarAdminToolTest.java
@@ -900,6 +900,14 @@ public class PulsarAdminToolTest {
cmdTopics.run(split("remove-retention
persistent://myprop/clust/ns1/ds1"));
verify(mockTopicsPolicies).removeRetention("persistent://myprop/clust/ns1/ds1");
+ cmdTopics.run(split("get-persistence
persistent://myprop/clust/ns1/ds1"));
+
verify(mockTopicsPolicies).getPersistence("persistent://myprop/clust/ns1/ds1");
+ cmdTopics.run(split("set-persistence persistent://myprop/clust/ns1/ds1
-e 2 -w 1 -a 1 -r 100.0"));
+
verify(mockTopicsPolicies).setPersistence("persistent://myprop/clust/ns1/ds1",
+ new PersistencePolicies(2, 1, 1, 100.0d));
+ cmdTopics.run(split("remove-persistence
persistent://myprop/clust/ns1/ds1"));
+
verify(mockTopicsPolicies).removePersistence("persistent://myprop/clust/ns1/ds1");
+
// Reset the cmd, and check global option
cmdTopics = new CmdTopicPolicies(() -> admin);
cmdTopics.run(split("get-retention persistent://myprop/clust/ns1/ds1
-g"));
@@ -937,6 +945,14 @@ public class PulsarAdminToolTest {
cmdTopics = new CmdTopicPolicies(() -> admin);
cmdTopics.run(split("remove-backlog-quota
persistent://myprop/clust/ns1/ds1 -t message_age"));
verify(mockTopicsPolicies).removeBacklogQuota("persistent://myprop/clust/ns1/ds1",
BacklogQuota.BacklogQuotaType.message_age);
+
+ cmdTopics.run(split("get-persistence persistent://myprop/clust/ns1/ds1
-g"));
+
verify(mockGlobalTopicsPolicies).getPersistence("persistent://myprop/clust/ns1/ds1");
+ cmdTopics.run(split("set-persistence persistent://myprop/clust/ns1/ds1
-e 2 -w 1 -a 1 -r 100.0 -g"));
+
verify(mockGlobalTopicsPolicies).setPersistence("persistent://myprop/clust/ns1/ds1",
+ new PersistencePolicies(2, 1, 1, 100.0d));
+ cmdTopics.run(split("remove-persistence
persistent://myprop/clust/ns1/ds1 -g"));
+
verify(mockGlobalTopicsPolicies).removePersistence("persistent://myprop/clust/ns1/ds1");
}
@Test
diff --git
a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdTopicPolicies.java
b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdTopicPolicies.java
index ceea877..e54a4e6 100644
---
a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdTopicPolicies.java
+++
b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdTopicPolicies.java
@@ -29,6 +29,7 @@ import org.apache.pulsar.client.admin.PulsarAdmin;
import org.apache.pulsar.client.admin.PulsarAdminException;
import org.apache.pulsar.client.admin.TopicPolicies;
import org.apache.pulsar.common.policies.data.BacklogQuota;
+import org.apache.pulsar.common.policies.data.PersistencePolicies;
import org.apache.pulsar.common.policies.data.RetentionPolicies;
import org.apache.pulsar.common.util.RelativeTimeUtil;
@@ -44,6 +45,10 @@ public class CmdTopicPolicies extends CmdBase {
jcommander.addCommand("get-backlog-quota", new GetBacklogQuotaMap());
jcommander.addCommand("set-backlog-quota", new SetBacklogQuota());
jcommander.addCommand("remove-backlog-quota", new
RemoveBacklogQuota());
+
+ jcommander.addCommand("get-persistence", new GetPersistence());
+ jcommander.addCommand("set-persistence", new SetPersistence());
+ jcommander.addCommand("remove-persistence", new RemovePersistence());
}
@Parameters(commandDescription = "Get the retention policy for a topic")
@@ -221,6 +226,72 @@ public class CmdTopicPolicies extends CmdBase {
}
}
+ @Parameters(commandDescription = "Get the persistence policies for a
topic")
+ private class GetPersistence extends CliCommand {
+ @Parameter(description = "persistent://tenant/namespace/topic",
required = true)
+ private java.util.List<String> params;
+
+ @Parameter(names = { "--global", "-g" }, description = "Whether to get
this policy globally. "
+ + "If set to true, broker returned global topic policies",
arity = 0)
+ private boolean isGlobal = false;
+
+ @Override
+ void run() throws PulsarAdminException {
+ String persistentTopic = validatePersistentTopic(params);
+ print(getTopicPolicies(isGlobal).getPersistence(persistentTopic));
+ }
+ }
+
+ @Parameters(commandDescription = "Set the persistence policies for a
topic")
+ private class SetPersistence extends CliCommand {
+ @Parameter(description = "persistent://tenant/namespace/topic",
required = true)
+ private java.util.List<String> params;
+
+ @Parameter(names = { "-e",
+ "--bookkeeper-ensemble" }, description = "Number of bookies to
use for a topic", required = true)
+ private int bookkeeperEnsemble;
+
+ @Parameter(names = { "-w",
+ "--bookkeeper-write-quorum" }, description = "How many writes
to make of each entry", required = true)
+ private int bookkeeperWriteQuorum;
+
+ @Parameter(names = { "-a",
+ "--bookkeeper-ack-quorum" }, description = "Number of acks
(guaranteed copies) to wait for each entry", required = true)
+ private int bookkeeperAckQuorum;
+
+ @Parameter(names = { "-r",
+ "--ml-mark-delete-max-rate" }, description = "Throttling rate
of mark-delete operation (0 means no throttle)", required = true)
+ private double managedLedgerMaxMarkDeleteRate;
+
+ @Parameter(names = { "--global", "-g" }, description = "Whether to set
this policy globally. "
+ + "If set to true, the policy will be replicate to other
clusters asynchronously", arity = 0)
+ private boolean isGlobal = false;
+
+ @Override
+ void run() throws PulsarAdminException {
+ String persistentTopic = validatePersistentTopic(params);
+ getTopicPolicies(isGlobal).setPersistence(persistentTopic, new
PersistencePolicies(bookkeeperEnsemble,
+ bookkeeperWriteQuorum, bookkeeperAckQuorum,
managedLedgerMaxMarkDeleteRate));
+ }
+ }
+
+ @Parameters(commandDescription = "Remove the persistence policy for a
topic")
+ private class RemovePersistence extends CliCommand {
+ @Parameter(description = "persistent://tenant/namespace/topic",
required = true)
+ private java.util.List<String> params;
+
+ @Parameter(names = { "--global", "-g" }, description = "Whether to
remove this policy globally. "
+ + "If set to true, the removing operation will be replicate to
other clusters asynchronously"
+ , arity = 0)
+ private boolean isGlobal = false;
+
+ @Override
+ void run() throws PulsarAdminException {
+ String persistentTopic = validatePersistentTopic(params);
+ getTopicPolicies(isGlobal).removePersistence(persistentTopic);
+ }
+ }
+
private TopicPolicies getTopicPolicies(boolean isGlobal) {
return getAdmin().topicPolicies(isGlobal);
}
diff --git
a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdTopics.java
b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdTopics.java
index 9bd49ab..c228125 100644
---
a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdTopics.java
+++
b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdTopics.java
@@ -273,6 +273,10 @@ public class CmdTopics extends CmdBase {
cmdUsageFormatter.addDeprecatedCommand("get-backlog-quotas");
cmdUsageFormatter.addDeprecatedCommand("set-backlog-quota");
cmdUsageFormatter.addDeprecatedCommand("remove-backlog-quota");
+
+ cmdUsageFormatter.addDeprecatedCommand("get-persistence");
+ cmdUsageFormatter.addDeprecatedCommand("set-persistence");
+ cmdUsageFormatter.addDeprecatedCommand("remove-persistence");
}
}