This is an automated email from the ASF dual-hosted git repository.

penghui pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 59e0187  Support get applied PersistencePolicies (#9831)
59e0187 is described below

commit 59e0187c547ff117e95e91fa67be2c283cfda2e4
Author: feynmanlin <[email protected]>
AuthorDate: Wed Mar 10 14:45:42 2021 +0800

    Support get applied PersistencePolicies (#9831)
    
    Master Issue: #9216
    
    ### Modifications
    Add applied API for topic-level
    Add remove API for namespace-level
    
    ### Verifying this change
    Verify the applied API and CMD
---
 .../pulsar/broker/admin/impl/NamespacesBase.java   | 10 +++++++
 .../broker/admin/impl/PersistentTopicsBase.java    | 20 +++++++++++--
 .../apache/pulsar/broker/admin/v2/Namespaces.java  | 10 +++++++
 .../pulsar/broker/admin/v2/PersistentTopics.java   | 22 +++++++--------
 .../pulsar/broker/admin/TopicPoliciesTest.java     | 33 ++++++++++++++++++++++
 .../org/apache/pulsar/client/admin/Namespaces.java | 13 +++++++++
 .../org/apache/pulsar/client/admin/Topics.java     | 16 +++++++++++
 .../client/admin/internal/NamespacesImpl.java      | 21 ++++++++++++++
 .../pulsar/client/admin/internal/TopicsImpl.java   | 15 ++++++++--
 .../pulsar/admin/cli/PulsarAdminToolTest.java      | 10 +++++++
 .../org/apache/pulsar/admin/cli/CmdNamespaces.java | 13 +++++++++
 11 files changed, 168 insertions(+), 15 deletions(-)

diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java
index dc9e261..2455d56 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java
@@ -1394,11 +1394,21 @@ public abstract class NamespacesBase extends 
AdminResource {
         }
     }
 
+    protected void internalDeletePersistence() {
+        validateNamespacePolicyOperation(namespaceName, 
PolicyName.PERSISTENCE, PolicyOperation.WRITE);
+        validatePoliciesReadOnlyAccess();
+        doUpdatePersistence(null);
+    }
+
     protected void internalSetPersistence(PersistencePolicies persistence) {
         validateNamespacePolicyOperation(namespaceName, 
PolicyName.PERSISTENCE, PolicyOperation.WRITE);
         validatePoliciesReadOnlyAccess();
         validatePersistencePolicies(persistence);
 
+        doUpdatePersistence(persistence);
+    }
+
+    private void doUpdatePersistence(PersistencePolicies persistence) {
         try {
             final String path = path(POLICIES, namespaceName.toString());
             updatePolicies(path, (policies)->{
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
index 69ca54f..dc77aba 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
@@ -2733,8 +2733,24 @@ public class PersistentTopicsBase extends AdminResource {
         return 
pulsar().getTopicPoliciesService().updateTopicPoliciesAsync(topicName, 
topicPolicies.get());
     }
 
-    protected Optional<PersistencePolicies> internalGetPersistence(){
-        return getTopicPolicies(topicName).map(TopicPolicies::getPersistence);
+    protected CompletableFuture<PersistencePolicies> 
internalGetPersistence(boolean applied) {
+        PersistencePolicies persistencePolicies = getTopicPolicies(topicName)
+                .map(TopicPolicies::getPersistence)
+                .orElseGet(() -> {
+                    if (applied) {
+                        PersistencePolicies namespacePolicy = 
getNamespacePolicies(namespaceName)
+                                .persistence;
+                        return namespacePolicy == null
+                                ? new PersistencePolicies(
+                                        
pulsar().getConfiguration().getManagedLedgerDefaultEnsembleSize(),
+                                        
pulsar().getConfiguration().getManagedLedgerDefaultWriteQuorum(),
+                                        
pulsar().getConfiguration().getManagedLedgerDefaultAckQuorum(),
+                                        
pulsar().getConfiguration().getManagedLedgerDefaultMarkDeleteRateLimit())
+                                : namespacePolicy;
+                    }
+                    return null;
+                });
+        return CompletableFuture.completedFuture(persistencePolicies);
     }
 
     protected CompletableFuture<Void> 
internalSetPersistence(PersistencePolicies persistencePolicies) {
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/Namespaces.java 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/Namespaces.java
index 815063d..5044cf9 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/Namespaces.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/Namespaces.java
@@ -755,6 +755,16 @@ public class Namespaces extends NamespacesBase {
         internalSetPersistence(persistence);
     }
 
+    @DELETE
+    @Path("/{tenant}/{namespace}/persistence")
+    @ApiOperation(value = "Delete the persistence configuration for all topics 
on a namespace")
+    @ApiResponses(value = { @ApiResponse(code = 403, message = "Don't have 
admin permission") })
+    public void deletePersistence(@PathParam("tenant") String tenant,
+                                               @PathParam("namespace") String 
namespace) {
+        validateNamespaceName(tenant, namespace);
+        internalDeletePersistence();
+    }
+
     @POST
     @Path("/{tenant}/{namespace}/persistence/bookieAffinity")
     @ApiOperation(value = "Set the bookie-affinity-group to 
namespace-persistent policy.")
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java
index 498fa02..d286bca 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java
@@ -1743,21 +1743,21 @@ public class PersistentTopics extends 
PersistentTopicsBase {
     public void getPersistence(@Suspended final AsyncResponse asyncResponse,
                                @PathParam("tenant") String tenant,
                                @PathParam("namespace") String namespace,
-                               @PathParam("topic") @Encoded String 
encodedTopic) {
+                               @PathParam("topic") @Encoded String 
encodedTopic,
+                               @QueryParam("applied") boolean applied) {
         validateTopicName(tenant, namespace, encodedTopic);
         preValidation();
-        try {
-            Optional<PersistencePolicies> persistencePolicies = 
internalGetPersistence();
-            if (!persistencePolicies.isPresent()) {
-                asyncResponse.resume(Response.noContent().build());
+        internalGetPersistence(applied).whenComplete((res, ex) -> {
+            if (ex instanceof RestException) {
+                log.error("Failed get persistence policies", ex);
+                asyncResponse.resume(ex);
+            } else if (ex != null) {
+                log.error("Failed get persistence policies", ex);
+                asyncResponse.resume(new RestException(ex));
             } else {
-                asyncResponse.resume(persistencePolicies.get());
+                asyncResponse.resume(res);
             }
-        } catch (RestException e) {
-            asyncResponse.resume(e);
-        } catch (Exception e) {
-            asyncResponse.resume(new RestException(e));
-        }
+        });
     }
 
     @POST
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TopicPoliciesTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TopicPoliciesTest.java
index 381afd5..5e93430 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TopicPoliciesTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TopicPoliciesTest.java
@@ -468,6 +468,39 @@ public class TopicPoliciesTest extends 
MockedPulsarServiceBaseTest {
         assertFalse((boolean) shouldTopicBeRetained.invoke(persistentTopic));
     }
 
+    @Test(timeOut = 20000)
+    public void testGetPersistenceApplied() throws Exception {
+        final String topic = testTopic + UUID.randomUUID();
+        pulsarClient.newProducer().topic(topic).create().close();
+        Awaitility.await().atMost(5, TimeUnit.SECONDS)
+                .until(() -> 
pulsar.getTopicPoliciesService().cacheIsInitialized(TopicName.get(topic)));
+        assertNull(admin.topics().getPersistence(topic));
+        assertNull(admin.namespaces().getPersistence(myNamespace));
+        PersistencePolicies brokerPolicy
+                = new 
PersistencePolicies(pulsar.getConfiguration().getManagedLedgerDefaultEnsembleSize(),
+                pulsar.getConfiguration().getManagedLedgerDefaultWriteQuorum(),
+                pulsar.getConfiguration().getManagedLedgerDefaultAckQuorum(),
+                
pulsar.getConfiguration().getManagedLedgerDefaultMarkDeleteRateLimit());
+        assertEquals(admin.topics().getPersistence(topic, true), brokerPolicy);
+        PersistencePolicies namespacePolicy
+                = new PersistencePolicies(5,4,3,2);
+
+        admin.namespaces().setPersistence(myNamespace, namespacePolicy);
+        Awaitility.await().untilAsserted(() -> 
assertNotNull(admin.namespaces().getPersistence(myNamespace)));
+        assertEquals(admin.topics().getPersistence(topic, true), 
namespacePolicy);
+
+        PersistencePolicies topicPolicy = new PersistencePolicies(4, 3, 2, 1);
+        admin.topics().setPersistence(topic, topicPolicy);
+        Awaitility.await().untilAsserted(() -> 
assertNotNull(admin.topics().getPersistence(topic)));
+        assertEquals(admin.topics().getPersistence(topic, true), topicPolicy);
+
+        admin.namespaces().removePersistence(myNamespace);
+        admin.topics().removePersistence(topic);
+        Awaitility.await().untilAsserted(() -> 
assertNull(admin.namespaces().getPersistence(myNamespace)));
+        Awaitility.await().untilAsserted(() -> 
assertNull(admin.topics().getPersistence(topic)));
+        assertEquals(admin.topics().getPersistence(topic, true), brokerPolicy);
+    }
+
     @Test
     public void testCheckPersistence() throws Exception {
         PersistencePolicies persistencePolicies = new PersistencePolicies(6, 
2, 2, 0.0);
diff --git 
a/pulsar-client-admin-api/src/main/java/org/apache/pulsar/client/admin/Namespaces.java
 
b/pulsar-client-admin-api/src/main/java/org/apache/pulsar/client/admin/Namespaces.java
index b977ae2..9a6bc42 100644
--- 
a/pulsar-client-admin-api/src/main/java/org/apache/pulsar/client/admin/Namespaces.java
+++ 
b/pulsar-client-admin-api/src/main/java/org/apache/pulsar/client/admin/Namespaces.java
@@ -1565,6 +1565,19 @@ public interface Namespaces {
     CompletableFuture<Void> removeBacklogQuotaAsync(String namespace);
 
     /**
+     * Remove the persistence configuration on a namespace.
+     * @param namespace
+     * @throws PulsarAdminException
+     */
+    void removePersistence(String namespace) throws PulsarAdminException;
+
+    /**
+     * Remove the persistence configuration on a namespace asynchronously.
+     * @param namespace
+     */
+    CompletableFuture<Void> removePersistenceAsync(String namespace);
+
+    /**
      * Set the persistence configuration for all the topics on a namespace.
      * <p/>
      * Set the persistence configuration on a namespace.
diff --git 
a/pulsar-client-admin-api/src/main/java/org/apache/pulsar/client/admin/Topics.java
 
b/pulsar-client-admin-api/src/main/java/org/apache/pulsar/client/admin/Topics.java
index b2a086c..881979f 100644
--- 
a/pulsar-client-admin-api/src/main/java/org/apache/pulsar/client/admin/Topics.java
+++ 
b/pulsar-client-admin-api/src/main/java/org/apache/pulsar/client/admin/Topics.java
@@ -2140,6 +2140,22 @@ public interface Topics {
     CompletableFuture<PersistencePolicies> getPersistenceAsync(String topic);
 
     /**
+     * Get the applied configuration of persistence policies for specified 
topic.
+     *
+     * @param topic Topic name
+     * @return Configuration of bookkeeper persistence policies
+     * @throws PulsarAdminException Unexpected error
+     */
+    PersistencePolicies getPersistence(String topic, boolean applied) throws 
PulsarAdminException;
+
+    /**
+     * Get the applied configuration of persistence policies for specified 
topic asynchronously.
+     *
+     * @param topic Topic name
+     */
+    CompletableFuture<PersistencePolicies> getPersistenceAsync(String topic, 
boolean applied);
+
+    /**
      * Remove the configuration of persistence policies for specified topic.
      *
      * @param topic Topic name
diff --git 
a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/NamespacesImpl.java
 
b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/NamespacesImpl.java
index 6b8b504..8786b95 100644
--- 
a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/NamespacesImpl.java
+++ 
b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/NamespacesImpl.java
@@ -1197,6 +1197,27 @@ public class NamespacesImpl extends BaseResource 
implements Namespaces {
     }
 
     @Override
+    public void removePersistence(String namespace) throws 
PulsarAdminException {
+        try {
+            removePersistenceAsync(namespace).get(this.readTimeoutMs, 
TimeUnit.MILLISECONDS);
+        } catch (ExecutionException e) {
+            throw (PulsarAdminException) e.getCause();
+        } catch (InterruptedException e) {
+            Thread.currentThread().interrupt();
+            throw new PulsarAdminException(e);
+        } catch (TimeoutException e) {
+            throw new PulsarAdminException.TimeoutException(e);
+        }
+    }
+
+    @Override
+    public CompletableFuture<Void> removePersistenceAsync(String namespace) {
+        NamespaceName ns = NamespaceName.get(namespace);
+        WebTarget path = namespacePath(ns, "persistence");
+        return asyncDeleteRequest(path);
+    }
+
+    @Override
     public void setPersistence(String namespace, PersistencePolicies 
persistence) throws PulsarAdminException {
         try {
             setPersistenceAsync(namespace, 
persistence).get(this.readTimeoutMs, TimeUnit.MILLISECONDS);
diff --git 
a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/TopicsImpl.java
 
b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/TopicsImpl.java
index 4a2627d..1a3135c 100644
--- 
a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/TopicsImpl.java
+++ 
b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/TopicsImpl.java
@@ -2326,8 +2326,18 @@ public class TopicsImpl extends BaseResource implements 
Topics {
 
     @Override
     public PersistencePolicies getPersistence(String topic) throws 
PulsarAdminException {
+        return getPersistence(topic, false);
+    }
+
+    @Override
+    public CompletableFuture<PersistencePolicies> getPersistenceAsync(String 
topic) {
+        return getPersistenceAsync(topic, false);
+    }
+
+    @Override
+    public PersistencePolicies getPersistence(String topic, boolean applied) 
throws PulsarAdminException {
         try {
-            return getPersistenceAsync(topic).get(this.readTimeoutMs, 
TimeUnit.MILLISECONDS);
+            return getPersistenceAsync(topic, applied).get(this.readTimeoutMs, 
TimeUnit.MILLISECONDS);
         } catch (ExecutionException e) {
             throw (PulsarAdminException) e.getCause();
         } catch (InterruptedException e) {
@@ -2339,9 +2349,10 @@ public class TopicsImpl extends BaseResource implements 
Topics {
     }
 
     @Override
-    public CompletableFuture<PersistencePolicies> getPersistenceAsync(String 
topic) {
+    public CompletableFuture<PersistencePolicies> getPersistenceAsync(String 
topic, boolean applied) {
         TopicName tn = validateTopic(topic);
         WebTarget path = topicPath(tn, "persistence");
+        path = path.queryParam("applied", applied);
         final CompletableFuture<PersistencePolicies> future = new 
CompletableFuture<>();
         asyncGetRequest(path,
                 new InvocationCallback<PersistencePolicies>() {
diff --git 
a/pulsar-client-tools-test/src/test/java/org/apache/pulsar/admin/cli/PulsarAdminToolTest.java
 
b/pulsar-client-tools-test/src/test/java/org/apache/pulsar/admin/cli/PulsarAdminToolTest.java
index 1c9a869..1c11db5 100644
--- 
a/pulsar-client-tools-test/src/test/java/org/apache/pulsar/admin/cli/PulsarAdminToolTest.java
+++ 
b/pulsar-client-tools-test/src/test/java/org/apache/pulsar/admin/cli/PulsarAdminToolTest.java
@@ -361,6 +361,9 @@ public class PulsarAdminToolTest {
         namespaces.run(split("get-persistence myprop/clust/ns1"));
         verify(mockNamespaces).getPersistence("myprop/clust/ns1");
 
+        namespaces.run(split("remove-persistence myprop/clust/ns1"));
+        verify(mockNamespaces).removePersistence("myprop/clust/ns1");
+
         namespaces.run(split("get-max-subscriptions-per-topic 
myprop/clust/ns1"));
         verify(mockNamespaces).getMaxSubscriptionsPerTopic("myprop/clust/ns1");
         namespaces.run(split("set-max-subscriptions-per-topic myprop/clust/ns1 
-m 300"));
@@ -935,6 +938,13 @@ public class PulsarAdminToolTest {
         cmdTopics.run(split("remove-max-subscriptions 
persistent://myprop/clust/ns1/ds1"));
         
verify(mockTopics).removeMaxSubscriptionsPerTopic("persistent://myprop/clust/ns1/ds1");
 
+        cmdTopics.run(split("get-persistence 
persistent://myprop/clust/ns1/ds1"));
+        verify(mockTopics).getPersistence("persistent://myprop/clust/ns1/ds1");
+        cmdTopics.run(split("set-persistence persistent://myprop/clust/ns1/ds1 
-e 2 -w 1 -a 1 -r 100.0"));
+        verify(mockTopics).setPersistence("persistent://myprop/clust/ns1/ds1", 
new PersistencePolicies(2, 1, 1, 100.0d));
+        cmdTopics.run(split("remove-persistence 
persistent://myprop/clust/ns1/ds1"));
+        
verify(mockTopics).removePersistence("persistent://myprop/clust/ns1/ds1");
+
         // argument matcher for the timestamp in reset cursor. Since we can't 
verify exact timestamp, we check for a
         // range of +/- 1 second of the expected timestamp
         class TimestampMatcher implements ArgumentMatcher<Long> {
diff --git 
a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdNamespaces.java
 
b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdNamespaces.java
index 18563d5..02f6654 100644
--- 
a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdNamespaces.java
+++ 
b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdNamespaces.java
@@ -1070,6 +1070,18 @@ public class CmdNamespaces extends CmdBase {
         }
     }
 
+    @Parameters(commandDescription = "Remove the persistence policies for a 
namespace")
+    private class RemovePersistence extends CliCommand {
+        @Parameter(description = "tenant/namespace\n", required = true)
+        private java.util.List<String> params;
+
+        @Override
+        void run() throws PulsarAdminException {
+            String namespace = validateNamespace(params);
+            getAdmin().namespaces().removePersistence(namespace);
+        }
+    }
+
     @Parameters(commandDescription = "Set the persistence policies for a 
namespace")
     private class SetPersistence extends CliCommand {
         @Parameter(description = "tenant/namespace", required = true)
@@ -2057,6 +2069,7 @@ public class CmdNamespaces extends CmdBase {
 
         jcommander.addCommand("get-persistence", new GetPersistence());
         jcommander.addCommand("set-persistence", new SetPersistence());
+        jcommander.addCommand("remove-persistence", new RemovePersistence());
 
         jcommander.addCommand("get-message-ttl", new GetMessageTTL());
         jcommander.addCommand("set-message-ttl", new SetMessageTTL());

Reply via email to