This is an automated email from the ASF dual-hosted git repository.
penghui pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 59e0187 Support get applied PersistencePolicies (#9831)
59e0187 is described below
commit 59e0187c547ff117e95e91fa67be2c283cfda2e4
Author: feynmanlin <[email protected]>
AuthorDate: Wed Mar 10 14:45:42 2021 +0800
Support get applied PersistencePolicies (#9831)
Master Issue: #9216
### Modifications
Add applied API for topic-level
Add remove API for namespace-level
### Verifying this change
Verify the applied API and CMD
---
.../pulsar/broker/admin/impl/NamespacesBase.java | 10 +++++++
.../broker/admin/impl/PersistentTopicsBase.java | 20 +++++++++++--
.../apache/pulsar/broker/admin/v2/Namespaces.java | 10 +++++++
.../pulsar/broker/admin/v2/PersistentTopics.java | 22 +++++++--------
.../pulsar/broker/admin/TopicPoliciesTest.java | 33 ++++++++++++++++++++++
.../org/apache/pulsar/client/admin/Namespaces.java | 13 +++++++++
.../org/apache/pulsar/client/admin/Topics.java | 16 +++++++++++
.../client/admin/internal/NamespacesImpl.java | 21 ++++++++++++++
.../pulsar/client/admin/internal/TopicsImpl.java | 15 ++++++++--
.../pulsar/admin/cli/PulsarAdminToolTest.java | 10 +++++++
.../org/apache/pulsar/admin/cli/CmdNamespaces.java | 13 +++++++++
11 files changed, 168 insertions(+), 15 deletions(-)
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java
index dc9e261..2455d56 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java
@@ -1394,11 +1394,21 @@ public abstract class NamespacesBase extends
AdminResource {
}
}
+ protected void internalDeletePersistence() {
+ validateNamespacePolicyOperation(namespaceName,
PolicyName.PERSISTENCE, PolicyOperation.WRITE);
+ validatePoliciesReadOnlyAccess();
+ doUpdatePersistence(null);
+ }
+
protected void internalSetPersistence(PersistencePolicies persistence) {
validateNamespacePolicyOperation(namespaceName,
PolicyName.PERSISTENCE, PolicyOperation.WRITE);
validatePoliciesReadOnlyAccess();
validatePersistencePolicies(persistence);
+ doUpdatePersistence(persistence);
+ }
+
+ private void doUpdatePersistence(PersistencePolicies persistence) {
try {
final String path = path(POLICIES, namespaceName.toString());
updatePolicies(path, (policies)->{
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
index 69ca54f..dc77aba 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
@@ -2733,8 +2733,24 @@ public class PersistentTopicsBase extends AdminResource {
return
pulsar().getTopicPoliciesService().updateTopicPoliciesAsync(topicName,
topicPolicies.get());
}
- protected Optional<PersistencePolicies> internalGetPersistence(){
- return getTopicPolicies(topicName).map(TopicPolicies::getPersistence);
+ protected CompletableFuture<PersistencePolicies>
internalGetPersistence(boolean applied) {
+ PersistencePolicies persistencePolicies = getTopicPolicies(topicName)
+ .map(TopicPolicies::getPersistence)
+ .orElseGet(() -> {
+ if (applied) {
+ PersistencePolicies namespacePolicy =
getNamespacePolicies(namespaceName)
+ .persistence;
+ return namespacePolicy == null
+ ? new PersistencePolicies(
+
pulsar().getConfiguration().getManagedLedgerDefaultEnsembleSize(),
+
pulsar().getConfiguration().getManagedLedgerDefaultWriteQuorum(),
+
pulsar().getConfiguration().getManagedLedgerDefaultAckQuorum(),
+
pulsar().getConfiguration().getManagedLedgerDefaultMarkDeleteRateLimit())
+ : namespacePolicy;
+ }
+ return null;
+ });
+ return CompletableFuture.completedFuture(persistencePolicies);
}
protected CompletableFuture<Void>
internalSetPersistence(PersistencePolicies persistencePolicies) {
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/Namespaces.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/Namespaces.java
index 815063d..5044cf9 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/Namespaces.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/Namespaces.java
@@ -755,6 +755,16 @@ public class Namespaces extends NamespacesBase {
internalSetPersistence(persistence);
}
+ @DELETE
+ @Path("/{tenant}/{namespace}/persistence")
+ @ApiOperation(value = "Delete the persistence configuration for all topics
on a namespace")
+ @ApiResponses(value = { @ApiResponse(code = 403, message = "Don't have
admin permission") })
+ public void deletePersistence(@PathParam("tenant") String tenant,
+ @PathParam("namespace") String
namespace) {
+ validateNamespaceName(tenant, namespace);
+ internalDeletePersistence();
+ }
+
@POST
@Path("/{tenant}/{namespace}/persistence/bookieAffinity")
@ApiOperation(value = "Set the bookie-affinity-group to
namespace-persistent policy.")
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java
index 498fa02..d286bca 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java
@@ -1743,21 +1743,21 @@ public class PersistentTopics extends
PersistentTopicsBase {
public void getPersistence(@Suspended final AsyncResponse asyncResponse,
@PathParam("tenant") String tenant,
@PathParam("namespace") String namespace,
- @PathParam("topic") @Encoded String
encodedTopic) {
+ @PathParam("topic") @Encoded String
encodedTopic,
+ @QueryParam("applied") boolean applied) {
validateTopicName(tenant, namespace, encodedTopic);
preValidation();
- try {
- Optional<PersistencePolicies> persistencePolicies =
internalGetPersistence();
- if (!persistencePolicies.isPresent()) {
- asyncResponse.resume(Response.noContent().build());
+ internalGetPersistence(applied).whenComplete((res, ex) -> {
+ if (ex instanceof RestException) {
+ log.error("Failed get persistence policies", ex);
+ asyncResponse.resume(ex);
+ } else if (ex != null) {
+ log.error("Failed get persistence policies", ex);
+ asyncResponse.resume(new RestException(ex));
} else {
- asyncResponse.resume(persistencePolicies.get());
+ asyncResponse.resume(res);
}
- } catch (RestException e) {
- asyncResponse.resume(e);
- } catch (Exception e) {
- asyncResponse.resume(new RestException(e));
- }
+ });
}
@POST
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TopicPoliciesTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TopicPoliciesTest.java
index 381afd5..5e93430 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TopicPoliciesTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TopicPoliciesTest.java
@@ -468,6 +468,39 @@ public class TopicPoliciesTest extends
MockedPulsarServiceBaseTest {
assertFalse((boolean) shouldTopicBeRetained.invoke(persistentTopic));
}
+ @Test(timeOut = 20000)
+ public void testGetPersistenceApplied() throws Exception {
+ final String topic = testTopic + UUID.randomUUID();
+ pulsarClient.newProducer().topic(topic).create().close();
+ Awaitility.await().atMost(5, TimeUnit.SECONDS)
+ .until(() ->
pulsar.getTopicPoliciesService().cacheIsInitialized(TopicName.get(topic)));
+ assertNull(admin.topics().getPersistence(topic));
+ assertNull(admin.namespaces().getPersistence(myNamespace));
+ PersistencePolicies brokerPolicy
+ = new
PersistencePolicies(pulsar.getConfiguration().getManagedLedgerDefaultEnsembleSize(),
+ pulsar.getConfiguration().getManagedLedgerDefaultWriteQuorum(),
+ pulsar.getConfiguration().getManagedLedgerDefaultAckQuorum(),
+
pulsar.getConfiguration().getManagedLedgerDefaultMarkDeleteRateLimit());
+ assertEquals(admin.topics().getPersistence(topic, true), brokerPolicy);
+ PersistencePolicies namespacePolicy
+ = new PersistencePolicies(5,4,3,2);
+
+ admin.namespaces().setPersistence(myNamespace, namespacePolicy);
+ Awaitility.await().untilAsserted(() ->
assertNotNull(admin.namespaces().getPersistence(myNamespace)));
+ assertEquals(admin.topics().getPersistence(topic, true),
namespacePolicy);
+
+ PersistencePolicies topicPolicy = new PersistencePolicies(4, 3, 2, 1);
+ admin.topics().setPersistence(topic, topicPolicy);
+ Awaitility.await().untilAsserted(() ->
assertNotNull(admin.topics().getPersistence(topic)));
+ assertEquals(admin.topics().getPersistence(topic, true), topicPolicy);
+
+ admin.namespaces().removePersistence(myNamespace);
+ admin.topics().removePersistence(topic);
+ Awaitility.await().untilAsserted(() ->
assertNull(admin.namespaces().getPersistence(myNamespace)));
+ Awaitility.await().untilAsserted(() ->
assertNull(admin.topics().getPersistence(topic)));
+ assertEquals(admin.topics().getPersistence(topic, true), brokerPolicy);
+ }
+
@Test
public void testCheckPersistence() throws Exception {
PersistencePolicies persistencePolicies = new PersistencePolicies(6,
2, 2, 0.0);
diff --git
a/pulsar-client-admin-api/src/main/java/org/apache/pulsar/client/admin/Namespaces.java
b/pulsar-client-admin-api/src/main/java/org/apache/pulsar/client/admin/Namespaces.java
index b977ae2..9a6bc42 100644
---
a/pulsar-client-admin-api/src/main/java/org/apache/pulsar/client/admin/Namespaces.java
+++
b/pulsar-client-admin-api/src/main/java/org/apache/pulsar/client/admin/Namespaces.java
@@ -1565,6 +1565,19 @@ public interface Namespaces {
CompletableFuture<Void> removeBacklogQuotaAsync(String namespace);
/**
+ * Remove the persistence configuration on a namespace.
+ * @param namespace
+ * @throws PulsarAdminException
+ */
+ void removePersistence(String namespace) throws PulsarAdminException;
+
+ /**
+ * Remove the persistence configuration on a namespace asynchronously.
+ * @param namespace
+ */
+ CompletableFuture<Void> removePersistenceAsync(String namespace);
+
+ /**
* Set the persistence configuration for all the topics on a namespace.
* <p/>
* Set the persistence configuration on a namespace.
diff --git
a/pulsar-client-admin-api/src/main/java/org/apache/pulsar/client/admin/Topics.java
b/pulsar-client-admin-api/src/main/java/org/apache/pulsar/client/admin/Topics.java
index b2a086c..881979f 100644
---
a/pulsar-client-admin-api/src/main/java/org/apache/pulsar/client/admin/Topics.java
+++
b/pulsar-client-admin-api/src/main/java/org/apache/pulsar/client/admin/Topics.java
@@ -2140,6 +2140,22 @@ public interface Topics {
CompletableFuture<PersistencePolicies> getPersistenceAsync(String topic);
/**
+ * Get the applied configuration of persistence policies for specified
topic.
+ *
+ * @param topic Topic name
+ * @return Configuration of bookkeeper persistence policies
+ * @throws PulsarAdminException Unexpected error
+ */
+ PersistencePolicies getPersistence(String topic, boolean applied) throws
PulsarAdminException;
+
+ /**
+ * Get the applied configuration of persistence policies for specified
topic asynchronously.
+ *
+ * @param topic Topic name
+ */
+ CompletableFuture<PersistencePolicies> getPersistenceAsync(String topic,
boolean applied);
+
+ /**
* Remove the configuration of persistence policies for specified topic.
*
* @param topic Topic name
diff --git
a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/NamespacesImpl.java
b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/NamespacesImpl.java
index 6b8b504..8786b95 100644
---
a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/NamespacesImpl.java
+++
b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/NamespacesImpl.java
@@ -1197,6 +1197,27 @@ public class NamespacesImpl extends BaseResource
implements Namespaces {
}
@Override
+ public void removePersistence(String namespace) throws
PulsarAdminException {
+ try {
+ removePersistenceAsync(namespace).get(this.readTimeoutMs,
TimeUnit.MILLISECONDS);
+ } catch (ExecutionException e) {
+ throw (PulsarAdminException) e.getCause();
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ throw new PulsarAdminException(e);
+ } catch (TimeoutException e) {
+ throw new PulsarAdminException.TimeoutException(e);
+ }
+ }
+
+ @Override
+ public CompletableFuture<Void> removePersistenceAsync(String namespace) {
+ NamespaceName ns = NamespaceName.get(namespace);
+ WebTarget path = namespacePath(ns, "persistence");
+ return asyncDeleteRequest(path);
+ }
+
+ @Override
public void setPersistence(String namespace, PersistencePolicies
persistence) throws PulsarAdminException {
try {
setPersistenceAsync(namespace,
persistence).get(this.readTimeoutMs, TimeUnit.MILLISECONDS);
diff --git
a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/TopicsImpl.java
b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/TopicsImpl.java
index 4a2627d..1a3135c 100644
---
a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/TopicsImpl.java
+++
b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/TopicsImpl.java
@@ -2326,8 +2326,18 @@ public class TopicsImpl extends BaseResource implements
Topics {
@Override
public PersistencePolicies getPersistence(String topic) throws
PulsarAdminException {
+ return getPersistence(topic, false);
+ }
+
+ @Override
+ public CompletableFuture<PersistencePolicies> getPersistenceAsync(String
topic) {
+ return getPersistenceAsync(topic, false);
+ }
+
+ @Override
+ public PersistencePolicies getPersistence(String topic, boolean applied)
throws PulsarAdminException {
try {
- return getPersistenceAsync(topic).get(this.readTimeoutMs,
TimeUnit.MILLISECONDS);
+ return getPersistenceAsync(topic, applied).get(this.readTimeoutMs,
TimeUnit.MILLISECONDS);
} catch (ExecutionException e) {
throw (PulsarAdminException) e.getCause();
} catch (InterruptedException e) {
@@ -2339,9 +2349,10 @@ public class TopicsImpl extends BaseResource implements
Topics {
}
@Override
- public CompletableFuture<PersistencePolicies> getPersistenceAsync(String
topic) {
+ public CompletableFuture<PersistencePolicies> getPersistenceAsync(String
topic, boolean applied) {
TopicName tn = validateTopic(topic);
WebTarget path = topicPath(tn, "persistence");
+ path = path.queryParam("applied", applied);
final CompletableFuture<PersistencePolicies> future = new
CompletableFuture<>();
asyncGetRequest(path,
new InvocationCallback<PersistencePolicies>() {
diff --git
a/pulsar-client-tools-test/src/test/java/org/apache/pulsar/admin/cli/PulsarAdminToolTest.java
b/pulsar-client-tools-test/src/test/java/org/apache/pulsar/admin/cli/PulsarAdminToolTest.java
index 1c9a869..1c11db5 100644
---
a/pulsar-client-tools-test/src/test/java/org/apache/pulsar/admin/cli/PulsarAdminToolTest.java
+++
b/pulsar-client-tools-test/src/test/java/org/apache/pulsar/admin/cli/PulsarAdminToolTest.java
@@ -361,6 +361,9 @@ public class PulsarAdminToolTest {
namespaces.run(split("get-persistence myprop/clust/ns1"));
verify(mockNamespaces).getPersistence("myprop/clust/ns1");
+ namespaces.run(split("remove-persistence myprop/clust/ns1"));
+ verify(mockNamespaces).removePersistence("myprop/clust/ns1");
+
namespaces.run(split("get-max-subscriptions-per-topic
myprop/clust/ns1"));
verify(mockNamespaces).getMaxSubscriptionsPerTopic("myprop/clust/ns1");
namespaces.run(split("set-max-subscriptions-per-topic myprop/clust/ns1
-m 300"));
@@ -935,6 +938,13 @@ public class PulsarAdminToolTest {
cmdTopics.run(split("remove-max-subscriptions
persistent://myprop/clust/ns1/ds1"));
verify(mockTopics).removeMaxSubscriptionsPerTopic("persistent://myprop/clust/ns1/ds1");
+ cmdTopics.run(split("get-persistence
persistent://myprop/clust/ns1/ds1"));
+ verify(mockTopics).getPersistence("persistent://myprop/clust/ns1/ds1");
+ cmdTopics.run(split("set-persistence persistent://myprop/clust/ns1/ds1
-e 2 -w 1 -a 1 -r 100.0"));
+ verify(mockTopics).setPersistence("persistent://myprop/clust/ns1/ds1",
new PersistencePolicies(2, 1, 1, 100.0d));
+ cmdTopics.run(split("remove-persistence
persistent://myprop/clust/ns1/ds1"));
+
verify(mockTopics).removePersistence("persistent://myprop/clust/ns1/ds1");
+
// argument matcher for the timestamp in reset cursor. Since we can't
verify exact timestamp, we check for a
// range of +/- 1 second of the expected timestamp
class TimestampMatcher implements ArgumentMatcher<Long> {
diff --git
a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdNamespaces.java
b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdNamespaces.java
index 18563d5..02f6654 100644
---
a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdNamespaces.java
+++
b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdNamespaces.java
@@ -1070,6 +1070,18 @@ public class CmdNamespaces extends CmdBase {
}
}
+ @Parameters(commandDescription = "Remove the persistence policies for a
namespace")
+ private class RemovePersistence extends CliCommand {
+ @Parameter(description = "tenant/namespace\n", required = true)
+ private java.util.List<String> params;
+
+ @Override
+ void run() throws PulsarAdminException {
+ String namespace = validateNamespace(params);
+ getAdmin().namespaces().removePersistence(namespace);
+ }
+ }
+
@Parameters(commandDescription = "Set the persistence policies for a
namespace")
private class SetPersistence extends CliCommand {
@Parameter(description = "tenant/namespace", required = true)
@@ -2057,6 +2069,7 @@ public class CmdNamespaces extends CmdBase {
jcommander.addCommand("get-persistence", new GetPersistence());
jcommander.addCommand("set-persistence", new SetPersistence());
+ jcommander.addCommand("remove-persistence", new RemovePersistence());
jcommander.addCommand("get-message-ttl", new GetMessageTTL());
jcommander.addCommand("set-message-ttl", new SetMessageTTL());